##// END OF EJS Templates
update SGEEngineSet to use SGE job arrays...
Justin Riley -
Show More
@@ -1,914 +1,908 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Start an IPython cluster = (controller + engines)."""
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2008 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 import os
18 18 import re
19 19 import sys
20 20 import signal
21 21 import tempfile
22 22 pjoin = os.path.join
23 23
24 24 from twisted.internet import reactor, defer
25 25 from twisted.internet.protocol import ProcessProtocol
26 26 from twisted.internet.error import ProcessDone, ProcessTerminated
27 27 from twisted.internet.utils import getProcessOutput
28 28 from twisted.python import failure, log
29 29
30 30 from IPython.external import argparse
31 31 from IPython.external import Itpl
32 32 from IPython.genutils import (
33 33 get_ipython_dir,
34 34 get_log_dir,
35 35 get_security_dir,
36 36 num_cpus
37 37 )
38 38 from IPython.kernel.fcutil import have_crypto
39 39
40 40 # Create various ipython directories if they don't exist.
41 41 # This must be done before IPython.kernel.config is imported.
42 42 from IPython.iplib import user_setup
43 43 if os.name == 'posix':
44 44 rc_suffix = ''
45 45 else:
46 46 rc_suffix = '.ini'
47 47 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
48 48 get_log_dir()
49 49 get_security_dir()
50 50
51 51 from IPython.kernel.config import config_manager as kernel_config_manager
52 52 from IPython.kernel.error import SecurityError, FileTimeoutError
53 53 from IPython.kernel.fcutil import have_crypto
54 54 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
55 55 from IPython.kernel.util import printer
56 56
57 57 #-----------------------------------------------------------------------------
58 58 # General process handling code
59 59 #-----------------------------------------------------------------------------
60 60
61 61
62 62 class ProcessStateError(Exception):
63 63 pass
64 64
65 65 class UnknownStatus(Exception):
66 66 pass
67 67
68 68 class LauncherProcessProtocol(ProcessProtocol):
69 69 """
70 70 A ProcessProtocol to go with the ProcessLauncher.
71 71 """
72 72 def __init__(self, process_launcher):
73 73 self.process_launcher = process_launcher
74 74
75 75 def connectionMade(self):
76 76 self.process_launcher.fire_start_deferred(self.transport.pid)
77 77
78 78 def processEnded(self, status):
79 79 value = status.value
80 80 if isinstance(value, ProcessDone):
81 81 self.process_launcher.fire_stop_deferred(0)
82 82 elif isinstance(value, ProcessTerminated):
83 83 self.process_launcher.fire_stop_deferred(
84 84 {'exit_code':value.exitCode,
85 85 'signal':value.signal,
86 86 'status':value.status
87 87 }
88 88 )
89 89 else:
90 90 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
91 91
92 92 def outReceived(self, data):
93 93 log.msg(data)
94 94
95 95 def errReceived(self, data):
96 96 log.err(data)
97 97
98 98 class ProcessLauncher(object):
99 99 """
100 100 Start and stop an external process in an asynchronous manner.
101 101
102 102 Currently this uses deferreds to notify other parties of process state
103 103 changes. This is an awkward design and should be moved to using
104 104 a formal NotificationCenter.
105 105 """
106 106 def __init__(self, cmd_and_args):
107 107 self.cmd = cmd_and_args[0]
108 108 self.args = cmd_and_args
109 109 self._reset()
110 110
111 111 def _reset(self):
112 112 self.process_protocol = None
113 113 self.pid = None
114 114 self.start_deferred = None
115 115 self.stop_deferreds = []
116 116 self.state = 'before' # before, running, or after
117 117
118 118 @property
119 119 def running(self):
120 120 if self.state == 'running':
121 121 return True
122 122 else:
123 123 return False
124 124
125 125 def fire_start_deferred(self, pid):
126 126 self.pid = pid
127 127 self.state = 'running'
128 128 log.msg('Process %r has started with pid=%i' % (self.args, pid))
129 129 self.start_deferred.callback(pid)
130 130
131 131 def start(self):
132 132 if self.state == 'before':
133 133 self.process_protocol = LauncherProcessProtocol(self)
134 134 self.start_deferred = defer.Deferred()
135 135 self.process_transport = reactor.spawnProcess(
136 136 self.process_protocol,
137 137 self.cmd,
138 138 self.args,
139 139 env=os.environ
140 140 )
141 141 return self.start_deferred
142 142 else:
143 143 s = 'the process has already been started and has state: %r' % \
144 144 self.state
145 145 return defer.fail(ProcessStateError(s))
146 146
147 147 def get_stop_deferred(self):
148 148 if self.state == 'running' or self.state == 'before':
149 149 d = defer.Deferred()
150 150 self.stop_deferreds.append(d)
151 151 return d
152 152 else:
153 153 s = 'this process is already complete'
154 154 return defer.fail(ProcessStateError(s))
155 155
156 156 def fire_stop_deferred(self, exit_code):
157 157 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
158 158 self.state = 'after'
159 159 for d in self.stop_deferreds:
160 160 d.callback(exit_code)
161 161
162 162 def signal(self, sig):
163 163 """
164 164 Send a signal to the process.
165 165
166 166 The argument sig can be ('KILL','INT', etc.) or any signal number.
167 167 """
168 168 if self.state == 'running':
169 169 self.process_transport.signalProcess(sig)
170 170
171 171 # def __del__(self):
172 172 # self.signal('KILL')
173 173
174 174 def interrupt_then_kill(self, delay=1.0):
175 175 self.signal('INT')
176 176 reactor.callLater(delay, self.signal, 'KILL')
177 177
178 178
179 179 #-----------------------------------------------------------------------------
180 180 # Code for launching controller and engines
181 181 #-----------------------------------------------------------------------------
182 182
183 183
184 184 class ControllerLauncher(ProcessLauncher):
185 185
186 186 def __init__(self, extra_args=None):
187 187 if sys.platform == 'win32':
188 188 # This logic is needed because the ipcontroller script doesn't
189 189 # always get installed in the same way or in the same location.
190 190 from IPython.kernel.scripts import ipcontroller
191 191 script_location = ipcontroller.__file__.replace('.pyc', '.py')
192 192 # The -u option here turns on unbuffered output, which is required
193 193 # on Win32 to prevent wierd conflict and problems with Twisted.
194 194 # Also, use sys.executable to make sure we are picking up the
195 195 # right python exe.
196 196 args = [sys.executable, '-u', script_location]
197 197 else:
198 198 args = ['ipcontroller']
199 199 self.extra_args = extra_args
200 200 if extra_args is not None:
201 201 args.extend(extra_args)
202 202
203 203 ProcessLauncher.__init__(self, args)
204 204
205 205
206 206 class EngineLauncher(ProcessLauncher):
207 207
208 208 def __init__(self, extra_args=None):
209 209 if sys.platform == 'win32':
210 210 # This logic is needed because the ipcontroller script doesn't
211 211 # always get installed in the same way or in the same location.
212 212 from IPython.kernel.scripts import ipengine
213 213 script_location = ipengine.__file__.replace('.pyc', '.py')
214 214 # The -u option here turns on unbuffered output, which is required
215 215 # on Win32 to prevent wierd conflict and problems with Twisted.
216 216 # Also, use sys.executable to make sure we are picking up the
217 217 # right python exe.
218 218 args = [sys.executable, '-u', script_location]
219 219 else:
220 220 args = ['ipengine']
221 221 self.extra_args = extra_args
222 222 if extra_args is not None:
223 223 args.extend(extra_args)
224 224
225 225 ProcessLauncher.__init__(self, args)
226 226
227 227
228 228 class LocalEngineSet(object):
229 229
230 230 def __init__(self, extra_args=None):
231 231 self.extra_args = extra_args
232 232 self.launchers = []
233 233
234 234 def start(self, n):
235 235 dlist = []
236 236 for i in range(n):
237 237 print "starting engine:", i
238 238 el = EngineLauncher(extra_args=self.extra_args)
239 239 d = el.start()
240 240 self.launchers.append(el)
241 241 dlist.append(d)
242 242 dfinal = gatherBoth(dlist, consumeErrors=True)
243 243 dfinal.addCallback(self._handle_start)
244 244 return dfinal
245 245
246 246 def _handle_start(self, r):
247 247 log.msg('Engines started with pids: %r' % r)
248 248 return r
249 249
250 250 def _handle_stop(self, r):
251 251 log.msg('Engines received signal: %r' % r)
252 252 return r
253 253
254 254 def signal(self, sig):
255 255 dlist = []
256 256 for el in self.launchers:
257 257 d = el.get_stop_deferred()
258 258 dlist.append(d)
259 259 el.signal(sig)
260 260 dfinal = gatherBoth(dlist, consumeErrors=True)
261 261 dfinal.addCallback(self._handle_stop)
262 262 return dfinal
263 263
264 264 def interrupt_then_kill(self, delay=1.0):
265 265 dlist = []
266 266 for el in self.launchers:
267 267 d = el.get_stop_deferred()
268 268 dlist.append(d)
269 269 el.interrupt_then_kill(delay)
270 270 dfinal = gatherBoth(dlist, consumeErrors=True)
271 271 dfinal.addCallback(self._handle_stop)
272 272 return dfinal
273 273
274 274
275 275 class BatchEngineSet(object):
276 276
277 277 # Subclasses must fill these in. See PBSEngineSet
278 278 submit_command = ''
279 279 delete_command = ''
280 280 job_id_regexp = ''
281 281
282 282 def __init__(self, template_file, **kwargs):
283 283 self.template_file = template_file
284 284 self.context = {}
285 285 self.context.update(kwargs)
286 286 self.batch_file = self.template_file+'-run'
287 287
288 288 def parse_job_id(self, output):
289 289 m = re.match(self.job_id_regexp, output)
290 290 if m is not None:
291 291 job_id = m.group()
292 292 else:
293 293 raise Exception("job id couldn't be determined: %s" % output)
294 294 self.job_id = job_id
295 295 log.msg('Job started with job id: %r' % job_id)
296 296 return job_id
297 297
298 298 def write_batch_script(self, n):
299 299 self.context['n'] = n
300 300 template = open(self.template_file, 'r').read()
301 301 log.msg('Using template for batch script: %s' % self.template_file)
302 302 script_as_string = Itpl.itplns(template, self.context)
303 303 log.msg('Writing instantiated batch script: %s' % self.batch_file)
304 304 f = open(self.batch_file,'w')
305 305 f.write(script_as_string)
306 306 f.close()
307 307
308 308 def handle_error(self, f):
309 309 f.printTraceback()
310 310 f.raiseException()
311 311
312 312 def start(self, n):
313 313 self.write_batch_script(n)
314 314 d = getProcessOutput(self.submit_command,
315 315 [self.batch_file],env=os.environ)
316 316 d.addCallback(self.parse_job_id)
317 317 d.addErrback(self.handle_error)
318 318 return d
319 319
320 320 def kill(self):
321 321 d = getProcessOutput(self.delete_command,
322 322 [self.job_id],env=os.environ)
323 323 return d
324 324
325 325 class PBSEngineSet(BatchEngineSet):
326 326
327 327 submit_command = 'qsub'
328 328 delete_command = 'qdel'
329 329 job_id_regexp = '\d+'
330 330
331 331 def __init__(self, template_file, **kwargs):
332 332 BatchEngineSet.__init__(self, template_file, **kwargs)
333 333
334 class SGEEngineSet(BatchEngineSet):
335
336 submit_command = 'qsub'
337 delete_command = 'qdel'
338 job_id_regexp = '\d+'
339
334 class SGEEngineSet(PBSEngineSet):
335
340 336 def __init__(self, template_file, **kwargs):
341 337 BatchEngineSet.__init__(self, template_file, **kwargs)
342 self.num_engines = None
338 self._temp_file = None
343 339
344 340 def parse_job_id(self, output):
345 341 m = re.search(self.job_id_regexp, output)
346 342 if m is not None:
347 343 job_id = m.group()
348 344 else:
349 345 raise Exception("job id couldn't be determined: %s" % output)
350 self.job_id.append(job_id)
351 log.msg('Job started with job id: %r' % job_id)
346 self.job_id = job_id
347 log.msg('job started with job id: %r' % job_id)
352 348 return job_id
353
354 def kill_job(self, output):
355 log.msg(output)
356 return output
357
358 def write_batch_script(self, i):
359 context = {'eid':i}
360 template = open(self.template_file, 'r').read()
361 log.msg('Using template for batch script: %s' % self.template_file)
362 script_as_string = Itpl.itplns(template, context)
363 log.msg('Writing instantiated batch script: %s' % self.batch_file+str(i))
364 f = open(self.batch_file+str(i),'w')
365 f.write(script_as_string)
366 f.close()
367
349
368 350 def start(self, n):
369 dlist = []
370 self.num_engines = 0
371 self.job_id = []
372 for i in range(n):
373 log.msg("starting engine: %d"%i)
374 self.write_batch_script(i)
375 d = getProcessOutput(self.submit_command,
376 [self.batch_file+str(i)],env=os.environ)
377 d.addCallback(self.parse_job_id)
378 d.addErrback(self.handle_error)
379 dlist.append(d)
380 return gatherBoth(dlist, consumeErrors=True)
381
382 def kill(self):
383 dlist = []
384 for i in range(self.num_engines):
385 log.msg("killing job id: %d"%self.job_id[i])
386 d = getProcessOutput(self.delete_command,
387 [self.job_id[i]],env=os.environ)
388 d.addCallback(self.kill_job)
389 dlist.append(d)
390 return gatherBoth(dlist, consumeErrors=True)
391
351 log.msg("starting %d engines" % n)
352 self._temp_file = tempfile.NamedTemporaryFile()
353 regex = re.compile('#\$[ \t]+-t[ \t]+\d+')
354 if self.template_file:
355 log.msg("Using sge script %s" % self.template_file)
356 contents = open(self.template_file, 'r').read()
357 if not regex.search(contents):
358 log.msg("adding job array settings to sge script")
359 contents = ("#$ -t 1-%d\n" % n) + contents
360 self._temp_file.write(contents)
361 self.template_file = self._temp_file.name
362 else:
363 log.msg("using default ipengine sge script: \n%s" %
364 (sge_template % n))
365 self._temp_file.file.write(sge_template % n)
366 self.template_file = self._temp_file.name
367 self._temp_file.file.flush()
368 d = getProcessOutput(self.submit_command,
369 [self.template_file],
370 env=os.environ)
371 d.addCallback(self.parse_job_id)
372 d.addErrback(self.handle_error)
373 return d
374
375 sge_template="""#$ -V
376 #$ -t 1-%d
377 #$ -N ipengine
378 eid=$(($SGE_TASK_ID - 1))
379 ipengine --logfile=ipengine${eid}.log
380 """
381
392 382 sshx_template="""#!/bin/sh
393 383 "$@" &> /dev/null &
394 384 echo $!
395 385 """
396 386
397 387 engine_killer_template="""#!/bin/sh
398 388 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
399 389 """
400 390
401 391 class SSHEngineSet(object):
402 392 sshx_template=sshx_template
403 393 engine_killer_template=engine_killer_template
404 394
405 395 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
406 396 """Start a controller on localhost and engines using ssh.
407 397
408 398 The engine_hosts argument is a dict with hostnames as keys and
409 399 the number of engine (int) as values. sshx is the name of a local
410 400 file that will be used to run remote commands. This file is used
411 401 to setup the environment properly.
412 402 """
413 403
414 404 self.temp_dir = tempfile.gettempdir()
415 405 if sshx is not None:
416 406 self.sshx = sshx
417 407 else:
418 408 # Write the sshx.sh file locally from our template.
419 409 self.sshx = os.path.join(
420 410 self.temp_dir,
421 411 '%s-main-sshx.sh' % os.environ['USER']
422 412 )
423 413 f = open(self.sshx, 'w')
424 414 f.writelines(self.sshx_template)
425 415 f.close()
426 416 self.engine_command = ipengine
427 417 self.engine_hosts = engine_hosts
428 418 # Write the engine killer script file locally from our template.
429 419 self.engine_killer = os.path.join(
430 420 self.temp_dir,
431 421 '%s-local-engine_killer.sh' % os.environ['USER']
432 422 )
433 423 f = open(self.engine_killer, 'w')
434 424 f.writelines(self.engine_killer_template)
435 425 f.close()
436 426
437 427 def start(self, send_furl=False):
438 428 dlist = []
439 429 for host in self.engine_hosts.keys():
440 430 count = self.engine_hosts[host]
441 431 d = self._start(host, count, send_furl)
442 432 dlist.append(d)
443 433 return gatherBoth(dlist, consumeErrors=True)
444 434
445 435 def _start(self, hostname, count=1, send_furl=False):
446 436 if send_furl:
447 437 d = self._scp_furl(hostname)
448 438 else:
449 439 d = defer.succeed(None)
450 440 d.addCallback(lambda r: self._scp_sshx(hostname))
451 441 d.addCallback(lambda r: self._ssh_engine(hostname, count))
452 442 return d
453 443
454 444 def _scp_furl(self, hostname):
455 445 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
456 446 cmd_list = scp_cmd.split()
457 447 cmd_list[1] = os.path.expanduser(cmd_list[1])
458 448 log.msg('Copying furl file: %s' % scp_cmd)
459 449 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
460 450 return d
461 451
462 452 def _scp_sshx(self, hostname):
463 453 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
464 454 self.sshx, hostname,
465 455 self.temp_dir, os.environ['USER']
466 456 )
467 457 print
468 458 log.msg("Copying sshx: %s" % scp_cmd)
469 459 sshx_scp = scp_cmd.split()
470 460 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
471 461 return d
472 462
473 463 def _ssh_engine(self, hostname, count):
474 464 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
475 465 hostname, self.temp_dir,
476 466 os.environ['USER'], self.engine_command
477 467 )
478 468 cmds = exec_engine.split()
479 469 dlist = []
480 470 log.msg("about to start engines...")
481 471 for i in range(count):
482 472 log.msg('Starting engines: %s' % exec_engine)
483 473 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
484 474 dlist.append(d)
485 475 return gatherBoth(dlist, consumeErrors=True)
486 476
487 477 def kill(self):
488 478 dlist = []
489 479 for host in self.engine_hosts.keys():
490 480 d = self._killall(host)
491 481 dlist.append(d)
492 482 return gatherBoth(dlist, consumeErrors=True)
493 483
494 484 def _killall(self, hostname):
495 485 d = self._scp_engine_killer(hostname)
496 486 d.addCallback(lambda r: self._ssh_kill(hostname))
497 487 # d.addErrback(self._exec_err)
498 488 return d
499 489
500 490 def _scp_engine_killer(self, hostname):
501 491 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
502 492 self.engine_killer,
503 493 hostname,
504 494 self.temp_dir,
505 495 os.environ['USER']
506 496 )
507 497 cmds = scp_cmd.split()
508 498 log.msg('Copying engine_killer: %s' % scp_cmd)
509 499 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
510 500 return d
511 501
512 502 def _ssh_kill(self, hostname):
513 503 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
514 504 hostname,
515 505 self.temp_dir,
516 506 os.environ['USER']
517 507 )
518 508 log.msg('Killing engine: %s' % kill_cmd)
519 509 kill_cmd = kill_cmd.split()
520 510 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
521 511 return d
522 512
523 513 def _exec_err(self, r):
524 514 log.msg(r)
525 515
526 516 #-----------------------------------------------------------------------------
527 517 # Main functions for the different types of clusters
528 518 #-----------------------------------------------------------------------------
529 519
530 520 # TODO:
531 521 # The logic in these codes should be moved into classes like LocalCluster
532 522 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
533 523 # The main functions should then just parse the command line arguments, create
534 524 # the appropriate class and call a 'start' method.
535 525
536 526
537 527 def check_security(args, cont_args):
538 528 """Check to see if we should run with SSL support."""
539 529 if (not args.x or not args.y) and not have_crypto:
540 530 log.err("""
541 531 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
542 532 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
543 533 reactor.stop()
544 534 return False
545 535 if args.x:
546 536 cont_args.append('-x')
547 537 if args.y:
548 538 cont_args.append('-y')
549 539 return True
550 540
551 541
552 542 def check_reuse(args, cont_args):
553 543 """Check to see if we should try to resuse FURL files."""
554 544 if args.r:
555 545 cont_args.append('-r')
556 546 if args.client_port == 0 or args.engine_port == 0:
557 547 log.err("""
558 548 To reuse FURL files, you must also set the client and engine ports using
559 549 the --client-port and --engine-port options.""")
560 550 reactor.stop()
561 551 return False
562 552 cont_args.append('--client-port=%i' % args.client_port)
563 553 cont_args.append('--engine-port=%i' % args.engine_port)
564 554 return True
565 555
566 556
567 557 def _err_and_stop(f):
568 558 """Errback to log a failure and halt the reactor on a fatal error."""
569 559 log.err(f)
570 560 reactor.stop()
571 561
572 562
573 563 def _delay_start(cont_pid, start_engines, furl_file, reuse):
574 564 """Wait for controller to create FURL files and the start the engines."""
575 565 if not reuse:
576 566 if os.path.isfile(furl_file):
577 567 os.unlink(furl_file)
578 568 log.msg('Waiting for controller to finish starting...')
579 569 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
580 570 d.addCallback(lambda _: log.msg('Controller started'))
581 571 d.addCallback(lambda _: start_engines(cont_pid))
582 572 return d
583 573
584 574
585 575 def main_local(args):
586 576 cont_args = []
587 577 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
588 578
589 579 # Check security settings before proceeding
590 580 if not check_security(args, cont_args):
591 581 return
592 582
593 583 # See if we are reusing FURL files
594 584 if not check_reuse(args, cont_args):
595 585 return
596 586
597 587 cl = ControllerLauncher(extra_args=cont_args)
598 588 dstart = cl.start()
599 589 def start_engines(cont_pid):
600 590 engine_args = []
601 591 engine_args.append('--logfile=%s' % \
602 592 pjoin(args.logdir,'ipengine%s-' % cont_pid))
603 593 eset = LocalEngineSet(extra_args=engine_args)
604 594 def shutdown(signum, frame):
605 595 log.msg('Stopping local cluster')
606 596 # We are still playing with the times here, but these seem
607 597 # to be reliable in allowing everything to exit cleanly.
608 598 eset.interrupt_then_kill(0.5)
609 599 cl.interrupt_then_kill(0.5)
610 600 reactor.callLater(1.0, reactor.stop)
611 601 signal.signal(signal.SIGINT,shutdown)
612 602 d = eset.start(args.n)
613 603 return d
614 604 config = kernel_config_manager.get_config_obj()
615 605 furl_file = config['controller']['engine_furl_file']
616 606 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
617 607 dstart.addErrback(_err_and_stop)
618 608
619 609
620 610 def main_mpi(args):
621 611 cont_args = []
622 612 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
623 613
624 614 # Check security settings before proceeding
625 615 if not check_security(args, cont_args):
626 616 return
627 617
628 618 # See if we are reusing FURL files
629 619 if not check_reuse(args, cont_args):
630 620 return
631 621
632 622 cl = ControllerLauncher(extra_args=cont_args)
633 623 dstart = cl.start()
634 624 def start_engines(cont_pid):
635 625 raw_args = [args.cmd]
636 626 raw_args.extend(['-n',str(args.n)])
637 627 raw_args.append('ipengine')
638 628 raw_args.append('-l')
639 629 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
640 630 if args.mpi:
641 631 raw_args.append('--mpi=%s' % args.mpi)
642 632 eset = ProcessLauncher(raw_args)
643 633 def shutdown(signum, frame):
644 634 log.msg('Stopping local cluster')
645 635 # We are still playing with the times here, but these seem
646 636 # to be reliable in allowing everything to exit cleanly.
647 637 eset.interrupt_then_kill(1.0)
648 638 cl.interrupt_then_kill(1.0)
649 639 reactor.callLater(2.0, reactor.stop)
650 640 signal.signal(signal.SIGINT,shutdown)
651 641 d = eset.start()
652 642 return d
653 643 config = kernel_config_manager.get_config_obj()
654 644 furl_file = config['controller']['engine_furl_file']
655 645 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
656 646 dstart.addErrback(_err_and_stop)
657 647
658 648
659 649 def main_pbs(args):
660 650 cont_args = []
661 651 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
662 652
663 653 # Check security settings before proceeding
664 654 if not check_security(args, cont_args):
665 655 return
666 656
667 657 # See if we are reusing FURL files
668 658 if not check_reuse(args, cont_args):
669 659 return
670 660
671 661 cl = ControllerLauncher(extra_args=cont_args)
672 662 dstart = cl.start()
673 663 def start_engines(r):
674 664 pbs_set = PBSEngineSet(args.pbsscript)
675 665 def shutdown(signum, frame):
676 666 log.msg('Stopping pbs cluster')
677 667 d = pbs_set.kill()
678 668 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
679 669 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
680 670 signal.signal(signal.SIGINT,shutdown)
681 671 d = pbs_set.start(args.n)
682 672 return d
683 673 config = kernel_config_manager.get_config_obj()
684 674 furl_file = config['controller']['engine_furl_file']
685 675 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
686 676 dstart.addErrback(_err_and_stop)
687 677
688 678 def main_sge(args):
689 679 cont_args = []
690 680 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
691 681
692 682 # Check security settings before proceeding
693 683 if not check_security(args, cont_args):
694 684 return
695 685
696 686 # See if we are reusing FURL files
697 687 if not check_reuse(args, cont_args):
698 688 return
689
690 if args.sgescript and not os.path.isfile(args.sgescript):
691 log.err('SGE script does not exist: %s' % args.sgescript)
692 return
699 693
700 694 cl = ControllerLauncher(extra_args=cont_args)
701 695 dstart = cl.start()
702 696 def start_engines(r):
703 697 sge_set = SGEEngineSet(args.sgescript)
704 698 def shutdown(signum, frame):
705 699 log.msg('Stopping sge cluster')
706 700 d = sge_set.kill()
707 701 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
708 702 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
709 703 signal.signal(signal.SIGINT,shutdown)
710 704 d = sge_set.start(args.n)
711 705 return d
712 706 config = kernel_config_manager.get_config_obj()
713 707 furl_file = config['controller']['engine_furl_file']
714 708 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
715 709 dstart.addErrback(_err_and_stop)
716 710
717 711
718 712 def main_ssh(args):
719 713 """Start a controller on localhost and engines using ssh.
720 714
721 715 Your clusterfile should look like::
722 716
723 717 send_furl = False # True, if you want
724 718 engines = {
725 719 'engine_host1' : engine_count,
726 720 'engine_host2' : engine_count2
727 721 }
728 722 """
729 723 clusterfile = {}
730 724 execfile(args.clusterfile, clusterfile)
731 725 if not clusterfile.has_key('send_furl'):
732 726 clusterfile['send_furl'] = False
733 727
734 728 cont_args = []
735 729 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
736 730
737 731 # Check security settings before proceeding
738 732 if not check_security(args, cont_args):
739 733 return
740 734
741 735 # See if we are reusing FURL files
742 736 if not check_reuse(args, cont_args):
743 737 return
744 738
745 739 cl = ControllerLauncher(extra_args=cont_args)
746 740 dstart = cl.start()
747 741 def start_engines(cont_pid):
748 742 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
749 743 def shutdown(signum, frame):
750 744 d = ssh_set.kill()
751 745 cl.interrupt_then_kill(1.0)
752 746 reactor.callLater(2.0, reactor.stop)
753 747 signal.signal(signal.SIGINT,shutdown)
754 748 d = ssh_set.start(clusterfile['send_furl'])
755 749 return d
756 750 config = kernel_config_manager.get_config_obj()
757 751 furl_file = config['controller']['engine_furl_file']
758 752 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
759 753 dstart.addErrback(_err_and_stop)
760 754
761 755
762 756 def get_args():
763 757 base_parser = argparse.ArgumentParser(add_help=False)
764 758 base_parser.add_argument(
765 759 '-r',
766 760 action='store_true',
767 761 dest='r',
768 762 help='try to reuse FURL files. Use with --client-port and --engine-port'
769 763 )
770 764 base_parser.add_argument(
771 765 '--client-port',
772 766 type=int,
773 767 dest='client_port',
774 768 help='the port the controller will listen on for client connections',
775 769 default=0
776 770 )
777 771 base_parser.add_argument(
778 772 '--engine-port',
779 773 type=int,
780 774 dest='engine_port',
781 775 help='the port the controller will listen on for engine connections',
782 776 default=0
783 777 )
784 778 base_parser.add_argument(
785 779 '-x',
786 780 action='store_true',
787 781 dest='x',
788 782 help='turn off client security'
789 783 )
790 784 base_parser.add_argument(
791 785 '-y',
792 786 action='store_true',
793 787 dest='y',
794 788 help='turn off engine security'
795 789 )
796 790 base_parser.add_argument(
797 791 "--logdir",
798 792 type=str,
799 793 dest="logdir",
800 794 help="directory to put log files (default=$IPYTHONDIR/log)",
801 795 default=pjoin(get_ipython_dir(),'log')
802 796 )
803 797 base_parser.add_argument(
804 798 "-n",
805 799 "--num",
806 800 type=int,
807 801 dest="n",
808 802 default=2,
809 803 help="the number of engines to start"
810 804 )
811 805
812 806 parser = argparse.ArgumentParser(
813 807 description='IPython cluster startup. This starts a controller and\
814 808 engines using various approaches. Use the IPYTHONDIR environment\
815 809 variable to change your IPython directory from the default of\
816 810 .ipython or _ipython. The log and security subdirectories of your\
817 811 IPython directory will be used by this script for log files and\
818 812 security files.'
819 813 )
820 814 subparsers = parser.add_subparsers(
821 815 help='available cluster types. For help, do "ipcluster TYPE --help"')
822 816
823 817 parser_local = subparsers.add_parser(
824 818 'local',
825 819 help='run a local cluster',
826 820 parents=[base_parser]
827 821 )
828 822 parser_local.set_defaults(func=main_local)
829 823
830 824 parser_mpirun = subparsers.add_parser(
831 825 'mpirun',
832 826 help='run a cluster using mpirun (mpiexec also works)',
833 827 parents=[base_parser]
834 828 )
835 829 parser_mpirun.add_argument(
836 830 "--mpi",
837 831 type=str,
838 832 dest="mpi", # Don't put a default here to allow no MPI support
839 833 help="how to call MPI_Init (default=mpi4py)"
840 834 )
841 835 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
842 836
843 837 parser_mpiexec = subparsers.add_parser(
844 838 'mpiexec',
845 839 help='run a cluster using mpiexec (mpirun also works)',
846 840 parents=[base_parser]
847 841 )
848 842 parser_mpiexec.add_argument(
849 843 "--mpi",
850 844 type=str,
851 845 dest="mpi", # Don't put a default here to allow no MPI support
852 846 help="how to call MPI_Init (default=mpi4py)"
853 847 )
854 848 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
855 849
856 850 parser_pbs = subparsers.add_parser(
857 851 'pbs',
858 852 help='run a pbs cluster',
859 853 parents=[base_parser]
860 854 )
861 855 parser_pbs.add_argument(
862 856 '--pbs-script',
863 857 type=str,
864 858 dest='pbsscript',
865 859 help='PBS script template',
866 860 default='pbs.template'
867 861 )
868 862 parser_pbs.set_defaults(func=main_pbs)
869 863
870 864 parser_sge = subparsers.add_parser(
871 865 'sge',
872 866 help='run an sge cluster',
873 867 parents=[base_parser]
874 868 )
875 869 parser_sge.add_argument(
876 870 '--sge-script',
877 871 type=str,
878 872 dest='sgescript',
879 873 help='SGE script template',
880 default='template.sge'
874 default='' # SGEEngineSet will create one if not specified
881 875 )
882 876 parser_sge.set_defaults(func=main_sge)
883 877
884 878 parser_ssh = subparsers.add_parser(
885 879 'ssh',
886 880 help='run a cluster using ssh, should have ssh-keys setup',
887 881 parents=[base_parser]
888 882 )
889 883 parser_ssh.add_argument(
890 884 '--clusterfile',
891 885 type=str,
892 886 dest='clusterfile',
893 887 help='python file describing the cluster',
894 888 default='clusterfile.py',
895 889 )
896 890 parser_ssh.add_argument(
897 891 '--sshx',
898 892 type=str,
899 893 dest='sshx',
900 894 help='sshx launcher helper'
901 895 )
902 896 parser_ssh.set_defaults(func=main_ssh)
903 897
904 898 args = parser.parse_args()
905 899 return args
906 900
907 901 def main():
908 902 args = get_args()
909 903 reactor.callWhenRunning(args.func, args)
910 904 log.startLogging(sys.stdout)
911 905 reactor.run()
912 906
913 907 if __name__ == '__main__':
914 908 main()
General Comments 0
You need to be logged in to leave comments. Login now