##// END OF EJS Templates
Adding information about IPYTHONDIR to usage of ipcluster and friends.
Brian Granger -
Show More
@@ -1,822 +1,825
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Start an IPython cluster = (controller + engines)."""
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2008 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 import os
18 18 import re
19 19 import sys
20 20 import signal
21 21 import tempfile
22 22 pjoin = os.path.join
23 23
24 24 from twisted.internet import reactor, defer
25 25 from twisted.internet.protocol import ProcessProtocol
26 26 from twisted.internet.error import ProcessDone, ProcessTerminated
27 27 from twisted.internet.utils import getProcessOutput
28 28 from twisted.python import failure, log
29 29
30 30 from IPython.external import argparse
31 31 from IPython.external import Itpl
32 32 from IPython.genutils import (
33 33 get_ipython_dir,
34 34 get_log_dir,
35 35 get_security_dir,
36 36 num_cpus
37 37 )
38 38 from IPython.kernel.fcutil import have_crypto
39 39
40 40 # Create various ipython directories if they don't exist.
41 41 # This must be done before IPython.kernel.config is imported.
42 42 from IPython.iplib import user_setup
43 43 if os.name == 'posix':
44 44 rc_suffix = ''
45 45 else:
46 46 rc_suffix = '.ini'
47 47 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
48 48 get_log_dir()
49 49 get_security_dir()
50 50
51 51 from IPython.kernel.config import config_manager as kernel_config_manager
52 52 from IPython.kernel.error import SecurityError, FileTimeoutError
53 53 from IPython.kernel.fcutil import have_crypto
54 54 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
55 55 from IPython.kernel.util import printer
56 56
57 57
58 58 #-----------------------------------------------------------------------------
59 59 # General process handling code
60 60 #-----------------------------------------------------------------------------
61 61
62 62 def find_exe(cmd):
63 63 try:
64 64 import win32api
65 65 except ImportError:
66 66 raise ImportError('you need to have pywin32 installed for this to work')
67 67 else:
68 68 try:
69 69 (path, offest) = win32api.SearchPath(os.environ['PATH'],cmd + '.exe')
70 70 except:
71 71 (path, offset) = win32api.SearchPath(os.environ['PATH'],cmd + '.bat')
72 72 return path
73 73
74 74 class ProcessStateError(Exception):
75 75 pass
76 76
77 77 class UnknownStatus(Exception):
78 78 pass
79 79
80 80 class LauncherProcessProtocol(ProcessProtocol):
81 81 """
82 82 A ProcessProtocol to go with the ProcessLauncher.
83 83 """
84 84 def __init__(self, process_launcher):
85 85 self.process_launcher = process_launcher
86 86
87 87 def connectionMade(self):
88 88 self.process_launcher.fire_start_deferred(self.transport.pid)
89 89
90 90 def processEnded(self, status):
91 91 value = status.value
92 92 if isinstance(value, ProcessDone):
93 93 self.process_launcher.fire_stop_deferred(0)
94 94 elif isinstance(value, ProcessTerminated):
95 95 self.process_launcher.fire_stop_deferred(
96 96 {'exit_code':value.exitCode,
97 97 'signal':value.signal,
98 98 'status':value.status
99 99 }
100 100 )
101 101 else:
102 102 raise UnknownStatus("unknown exit status, this is probably a bug in Twisted")
103 103
104 104 def outReceived(self, data):
105 105 log.msg(data)
106 106
107 107 def errReceived(self, data):
108 108 log.err(data)
109 109
110 110 class ProcessLauncher(object):
111 111 """
112 112 Start and stop an external process in an asynchronous manner.
113 113
114 114 Currently this uses deferreds to notify other parties of process state
115 115 changes. This is an awkward design and should be moved to using
116 116 a formal NotificationCenter.
117 117 """
118 118 def __init__(self, cmd_and_args):
119 119 self.cmd = cmd_and_args[0]
120 120 self.args = cmd_and_args
121 121 self._reset()
122 122
123 123 def _reset(self):
124 124 self.process_protocol = None
125 125 self.pid = None
126 126 self.start_deferred = None
127 127 self.stop_deferreds = []
128 128 self.state = 'before' # before, running, or after
129 129
130 130 @property
131 131 def running(self):
132 132 if self.state == 'running':
133 133 return True
134 134 else:
135 135 return False
136 136
137 137 def fire_start_deferred(self, pid):
138 138 self.pid = pid
139 139 self.state = 'running'
140 140 log.msg('Process %r has started with pid=%i' % (self.args, pid))
141 141 self.start_deferred.callback(pid)
142 142
143 143 def start(self):
144 144 if self.state == 'before':
145 145 self.process_protocol = LauncherProcessProtocol(self)
146 146 self.start_deferred = defer.Deferred()
147 147 self.process_transport = reactor.spawnProcess(
148 148 self.process_protocol,
149 149 self.cmd,
150 150 self.args,
151 151 env=os.environ
152 152 )
153 153 return self.start_deferred
154 154 else:
155 155 s = 'the process has already been started and has state: %r' % \
156 156 self.state
157 157 return defer.fail(ProcessStateError(s))
158 158
159 159 def get_stop_deferred(self):
160 160 if self.state == 'running' or self.state == 'before':
161 161 d = defer.Deferred()
162 162 self.stop_deferreds.append(d)
163 163 return d
164 164 else:
165 165 s = 'this process is already complete'
166 166 return defer.fail(ProcessStateError(s))
167 167
168 168 def fire_stop_deferred(self, exit_code):
169 169 log.msg('Process %r has stopped with %r' % (self.args, exit_code))
170 170 self.state = 'after'
171 171 for d in self.stop_deferreds:
172 172 d.callback(exit_code)
173 173
174 174 def signal(self, sig):
175 175 """
176 176 Send a signal to the process.
177 177
178 178 The argument sig can be ('KILL','INT', etc.) or any signal number.
179 179 """
180 180 if self.state == 'running':
181 181 self.process_transport.signalProcess(sig)
182 182
183 183 # def __del__(self):
184 184 # self.signal('KILL')
185 185
186 186 def interrupt_then_kill(self, delay=1.0):
187 187 self.signal('INT')
188 188 reactor.callLater(delay, self.signal, 'KILL')
189 189
190 190
191 191 #-----------------------------------------------------------------------------
192 192 # Code for launching controller and engines
193 193 #-----------------------------------------------------------------------------
194 194
195 195
196 196 class ControllerLauncher(ProcessLauncher):
197 197
198 198 def __init__(self, extra_args=None):
199 199 if sys.platform == 'win32':
200 200 # This logic is needed because the ipcontroller script doesn't
201 201 # always get installed in the same way or in the same location.
202 202 from IPython.kernel.scripts import ipcontroller
203 203 script_location = ipcontroller.__file__.replace('.pyc', '.py')
204 204 # The -u option here turns on unbuffered output, which is required
205 205 # on Win32 to prevent wierd conflict and problems with Twisted.
206 206 # Also, use sys.executable to make sure we are picking up the
207 207 # right python exe.
208 208 args = [sys.executable, '-u', script_location]
209 209 else:
210 210 args = ['ipcontroller']
211 211 self.extra_args = extra_args
212 212 if extra_args is not None:
213 213 args.extend(extra_args)
214 214
215 215 ProcessLauncher.__init__(self, args)
216 216
217 217
218 218 class EngineLauncher(ProcessLauncher):
219 219
220 220 def __init__(self, extra_args=None):
221 221 if sys.platform == 'win32':
222 222 # This logic is needed because the ipcontroller script doesn't
223 223 # always get installed in the same way or in the same location.
224 224 from IPython.kernel.scripts import ipengine
225 225 script_location = ipengine.__file__.replace('.pyc', '.py')
226 226 # The -u option here turns on unbuffered output, which is required
227 227 # on Win32 to prevent wierd conflict and problems with Twisted.
228 228 # Also, use sys.executable to make sure we are picking up the
229 229 # right python exe.
230 230 args = [sys.executable, '-u', script_location]
231 231 else:
232 232 args = ['ipengine']
233 233 self.extra_args = extra_args
234 234 if extra_args is not None:
235 235 args.extend(extra_args)
236 236
237 237 ProcessLauncher.__init__(self, args)
238 238
239 239
240 240 class LocalEngineSet(object):
241 241
242 242 def __init__(self, extra_args=None):
243 243 self.extra_args = extra_args
244 244 self.launchers = []
245 245
246 246 def start(self, n):
247 247 dlist = []
248 248 for i in range(n):
249 249 el = EngineLauncher(extra_args=self.extra_args)
250 250 d = el.start()
251 251 self.launchers.append(el)
252 252 dlist.append(d)
253 253 dfinal = gatherBoth(dlist, consumeErrors=True)
254 254 dfinal.addCallback(self._handle_start)
255 255 return dfinal
256 256
257 257 def _handle_start(self, r):
258 258 log.msg('Engines started with pids: %r' % r)
259 259 return r
260 260
261 261 def _handle_stop(self, r):
262 262 log.msg('Engines received signal: %r' % r)
263 263 return r
264 264
265 265 def signal(self, sig):
266 266 dlist = []
267 267 for el in self.launchers:
268 268 d = el.get_stop_deferred()
269 269 dlist.append(d)
270 270 el.signal(sig)
271 271 dfinal = gatherBoth(dlist, consumeErrors=True)
272 272 dfinal.addCallback(self._handle_stop)
273 273 return dfinal
274 274
275 275 def interrupt_then_kill(self, delay=1.0):
276 276 dlist = []
277 277 for el in self.launchers:
278 278 d = el.get_stop_deferred()
279 279 dlist.append(d)
280 280 el.interrupt_then_kill(delay)
281 281 dfinal = gatherBoth(dlist, consumeErrors=True)
282 282 dfinal.addCallback(self._handle_stop)
283 283 return dfinal
284 284
285 285
286 286 class BatchEngineSet(object):
287 287
288 288 # Subclasses must fill these in. See PBSEngineSet
289 289 submit_command = ''
290 290 delete_command = ''
291 291 job_id_regexp = ''
292 292
293 293 def __init__(self, template_file, **kwargs):
294 294 self.template_file = template_file
295 295 self.context = {}
296 296 self.context.update(kwargs)
297 297 self.batch_file = self.template_file+'-run'
298 298
299 299 def parse_job_id(self, output):
300 300 m = re.match(self.job_id_regexp, output)
301 301 if m is not None:
302 302 job_id = m.group()
303 303 else:
304 304 raise Exception("job id couldn't be determined: %s" % output)
305 305 self.job_id = job_id
306 306 log.msg('Job started with job id: %r' % job_id)
307 307 return job_id
308 308
309 309 def write_batch_script(self, n):
310 310 self.context['n'] = n
311 311 template = open(self.template_file, 'r').read()
312 312 log.msg('Using template for batch script: %s' % self.template_file)
313 313 script_as_string = Itpl.itplns(template, self.context)
314 314 log.msg('Writing instantiated batch script: %s' % self.batch_file)
315 315 f = open(self.batch_file,'w')
316 316 f.write(script_as_string)
317 317 f.close()
318 318
319 319 def handle_error(self, f):
320 320 f.printTraceback()
321 321 f.raiseException()
322 322
323 323 def start(self, n):
324 324 self.write_batch_script(n)
325 325 d = getProcessOutput(self.submit_command,
326 326 [self.batch_file],env=os.environ)
327 327 d.addCallback(self.parse_job_id)
328 328 d.addErrback(self.handle_error)
329 329 return d
330 330
331 331 def kill(self):
332 332 d = getProcessOutput(self.delete_command,
333 333 [self.job_id],env=os.environ)
334 334 return d
335 335
336 336 class PBSEngineSet(BatchEngineSet):
337 337
338 338 submit_command = 'qsub'
339 339 delete_command = 'qdel'
340 340 job_id_regexp = '\d+'
341 341
342 342 def __init__(self, template_file, **kwargs):
343 343 BatchEngineSet.__init__(self, template_file, **kwargs)
344 344
345 345
346 346 sshx_template="""#!/bin/sh
347 347 "$@" &> /dev/null &
348 348 echo $!
349 349 """
350 350
351 351 engine_killer_template="""#!/bin/sh
352 352 ps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM
353 353 """
354 354
355 355 class SSHEngineSet(object):
356 356 sshx_template=sshx_template
357 357 engine_killer_template=engine_killer_template
358 358
359 359 def __init__(self, engine_hosts, sshx=None, ipengine="ipengine"):
360 360 """Start a controller on localhost and engines using ssh.
361 361
362 362 The engine_hosts argument is a dict with hostnames as keys and
363 363 the number of engine (int) as values. sshx is the name of a local
364 364 file that will be used to run remote commands. This file is used
365 365 to setup the environment properly.
366 366 """
367 367
368 368 self.temp_dir = tempfile.gettempdir()
369 369 if sshx is not None:
370 370 self.sshx = sshx
371 371 else:
372 372 # Write the sshx.sh file locally from our template.
373 373 self.sshx = os.path.join(
374 374 self.temp_dir,
375 375 '%s-main-sshx.sh' % os.environ['USER']
376 376 )
377 377 f = open(self.sshx, 'w')
378 378 f.writelines(self.sshx_template)
379 379 f.close()
380 380 self.engine_command = ipengine
381 381 self.engine_hosts = engine_hosts
382 382 # Write the engine killer script file locally from our template.
383 383 self.engine_killer = os.path.join(
384 384 self.temp_dir,
385 385 '%s-local-engine_killer.sh' % os.environ['USER']
386 386 )
387 387 f = open(self.engine_killer, 'w')
388 388 f.writelines(self.engine_killer_template)
389 389 f.close()
390 390
391 391 def start(self, send_furl=False):
392 392 dlist = []
393 393 for host in self.engine_hosts.keys():
394 394 count = self.engine_hosts[host]
395 395 d = self._start(host, count, send_furl)
396 396 dlist.append(d)
397 397 return gatherBoth(dlist, consumeErrors=True)
398 398
399 399 def _start(self, hostname, count=1, send_furl=False):
400 400 if send_furl:
401 401 d = self._scp_furl(hostname)
402 402 else:
403 403 d = defer.succeed(None)
404 404 d.addCallback(lambda r: self._scp_sshx(hostname))
405 405 d.addCallback(lambda r: self._ssh_engine(hostname, count))
406 406 return d
407 407
408 408 def _scp_furl(self, hostname):
409 409 scp_cmd = "scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/" % (hostname)
410 410 cmd_list = scp_cmd.split()
411 411 cmd_list[1] = os.path.expanduser(cmd_list[1])
412 412 log.msg('Copying furl file: %s' % scp_cmd)
413 413 d = getProcessOutput(cmd_list[0], cmd_list[1:], env=os.environ)
414 414 return d
415 415
416 416 def _scp_sshx(self, hostname):
417 417 scp_cmd = "scp %s %s:%s/%s-sshx.sh" % (
418 418 self.sshx, hostname,
419 419 self.temp_dir, os.environ['USER']
420 420 )
421 421 print
422 422 log.msg("Copying sshx: %s" % scp_cmd)
423 423 sshx_scp = scp_cmd.split()
424 424 d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env=os.environ)
425 425 return d
426 426
427 427 def _ssh_engine(self, hostname, count):
428 428 exec_engine = "ssh %s sh %s/%s-sshx.sh %s" % (
429 429 hostname, self.temp_dir,
430 430 os.environ['USER'], self.engine_command
431 431 )
432 432 cmds = exec_engine.split()
433 433 dlist = []
434 434 log.msg("about to start engines...")
435 435 for i in range(count):
436 436 log.msg('Starting engines: %s' % exec_engine)
437 437 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
438 438 dlist.append(d)
439 439 return gatherBoth(dlist, consumeErrors=True)
440 440
441 441 def kill(self):
442 442 dlist = []
443 443 for host in self.engine_hosts.keys():
444 444 d = self._killall(host)
445 445 dlist.append(d)
446 446 return gatherBoth(dlist, consumeErrors=True)
447 447
448 448 def _killall(self, hostname):
449 449 d = self._scp_engine_killer(hostname)
450 450 d.addCallback(lambda r: self._ssh_kill(hostname))
451 451 # d.addErrback(self._exec_err)
452 452 return d
453 453
454 454 def _scp_engine_killer(self, hostname):
455 455 scp_cmd = "scp %s %s:%s/%s-engine_killer.sh" % (
456 456 self.engine_killer,
457 457 hostname,
458 458 self.temp_dir,
459 459 os.environ['USER']
460 460 )
461 461 cmds = scp_cmd.split()
462 462 log.msg('Copying engine_killer: %s' % scp_cmd)
463 463 d = getProcessOutput(cmds[0], cmds[1:], env=os.environ)
464 464 return d
465 465
466 466 def _ssh_kill(self, hostname):
467 467 kill_cmd = "ssh %s sh %s/%s-engine_killer.sh" % (
468 468 hostname,
469 469 self.temp_dir,
470 470 os.environ['USER']
471 471 )
472 472 log.msg('Killing engine: %s' % kill_cmd)
473 473 kill_cmd = kill_cmd.split()
474 474 d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env=os.environ)
475 475 return d
476 476
477 477 def _exec_err(self, r):
478 478 log.msg(r)
479 479
480 480 #-----------------------------------------------------------------------------
481 481 # Main functions for the different types of clusters
482 482 #-----------------------------------------------------------------------------
483 483
484 484 # TODO:
485 485 # The logic in these codes should be moved into classes like LocalCluster
486 486 # MpirunCluster, PBSCluster, etc. This would remove alot of the duplications.
487 487 # The main functions should then just parse the command line arguments, create
488 488 # the appropriate class and call a 'start' method.
489 489
490 490
491 491 def check_security(args, cont_args):
492 492 """Check to see if we should run with SSL support."""
493 493 if (not args.x or not args.y) and not have_crypto:
494 494 log.err("""
495 495 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
496 496 Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")
497 497 reactor.stop()
498 498 return False
499 499 if args.x:
500 500 cont_args.append('-x')
501 501 if args.y:
502 502 cont_args.append('-y')
503 503 return True
504 504
505 505
506 506 def check_reuse(args, cont_args):
507 507 """Check to see if we should try to resuse FURL files."""
508 508 if args.r:
509 509 cont_args.append('-r')
510 510 if args.client_port == 0 or args.engine_port == 0:
511 511 log.err("""
512 512 To reuse FURL files, you must also set the client and engine ports using
513 513 the --client-port and --engine-port options.""")
514 514 reactor.stop()
515 515 return False
516 516 cont_args.append('--client-port=%i' % args.client_port)
517 517 cont_args.append('--engine-port=%i' % args.engine_port)
518 518 return True
519 519
520 520
521 521 def _err_and_stop(f):
522 522 """Errback to log a failure and halt the reactor on a fatal error."""
523 523 log.err(f)
524 524 reactor.stop()
525 525
526 526
527 527 def _delay_start(cont_pid, start_engines, furl_file, reuse):
528 528 """Wait for controller to create FURL files and the start the engines."""
529 529 if not reuse:
530 530 if os.path.isfile(furl_file):
531 531 os.unlink(furl_file)
532 532 log.msg('Waiting for controller to finish starting...')
533 533 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
534 534 d.addCallback(lambda _: log.msg('Controller started'))
535 535 d.addCallback(lambda _: start_engines(cont_pid))
536 536 return d
537 537
538 538
539 539 def main_local(args):
540 540 cont_args = []
541 541 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
542 542
543 543 # Check security settings before proceeding
544 544 if not check_security(args, cont_args):
545 545 return
546 546
547 547 # See if we are reusing FURL files
548 548 if not check_reuse(args, cont_args):
549 549 return
550 550
551 551 cl = ControllerLauncher(extra_args=cont_args)
552 552 dstart = cl.start()
553 553 def start_engines(cont_pid):
554 554 engine_args = []
555 555 engine_args.append('--logfile=%s' % \
556 556 pjoin(args.logdir,'ipengine%s-' % cont_pid))
557 557 eset = LocalEngineSet(extra_args=engine_args)
558 558 def shutdown(signum, frame):
559 559 log.msg('Stopping local cluster')
560 560 # We are still playing with the times here, but these seem
561 561 # to be reliable in allowing everything to exit cleanly.
562 562 eset.interrupt_then_kill(0.5)
563 563 cl.interrupt_then_kill(0.5)
564 564 reactor.callLater(1.0, reactor.stop)
565 565 signal.signal(signal.SIGINT,shutdown)
566 566 d = eset.start(args.n)
567 567 return d
568 568 config = kernel_config_manager.get_config_obj()
569 569 furl_file = config['controller']['engine_furl_file']
570 570 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
571 571 dstart.addErrback(_err_and_stop)
572 572
573 573
574 574 def main_mpi(args):
575 575 cont_args = []
576 576 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
577 577
578 578 # Check security settings before proceeding
579 579 if not check_security(args, cont_args):
580 580 return
581 581
582 582 # See if we are reusing FURL files
583 583 if not check_reuse(args, cont_args):
584 584 return
585 585
586 586 cl = ControllerLauncher(extra_args=cont_args)
587 587 dstart = cl.start()
588 588 def start_engines(cont_pid):
589 589 raw_args = [args.cmd]
590 590 raw_args.extend(['-n',str(args.n)])
591 591 raw_args.append('ipengine')
592 592 raw_args.append('-l')
593 593 raw_args.append(pjoin(args.logdir,'ipengine%s-' % cont_pid))
594 594 if args.mpi:
595 595 raw_args.append('--mpi=%s' % args.mpi)
596 596 eset = ProcessLauncher(raw_args)
597 597 def shutdown(signum, frame):
598 598 log.msg('Stopping local cluster')
599 599 # We are still playing with the times here, but these seem
600 600 # to be reliable in allowing everything to exit cleanly.
601 601 eset.interrupt_then_kill(1.0)
602 602 cl.interrupt_then_kill(1.0)
603 603 reactor.callLater(2.0, reactor.stop)
604 604 signal.signal(signal.SIGINT,shutdown)
605 605 d = eset.start()
606 606 return d
607 607 config = kernel_config_manager.get_config_obj()
608 608 furl_file = config['controller']['engine_furl_file']
609 609 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
610 610 dstart.addErrback(_err_and_stop)
611 611
612 612
613 613 def main_pbs(args):
614 614 cont_args = []
615 615 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
616 616
617 617 # Check security settings before proceeding
618 618 if not check_security(args, cont_args):
619 619 return
620 620
621 621 # See if we are reusing FURL files
622 622 if not check_reuse(args, cont_args):
623 623 return
624 624
625 625 cl = ControllerLauncher(extra_args=cont_args)
626 626 dstart = cl.start()
627 627 def start_engines(r):
628 628 pbs_set = PBSEngineSet(args.pbsscript)
629 629 def shutdown(signum, frame):
630 630 log.msg('Stopping pbs cluster')
631 631 d = pbs_set.kill()
632 632 d.addBoth(lambda _: cl.interrupt_then_kill(1.0))
633 633 d.addBoth(lambda _: reactor.callLater(2.0, reactor.stop))
634 634 signal.signal(signal.SIGINT,shutdown)
635 635 d = pbs_set.start(args.n)
636 636 return d
637 637 config = kernel_config_manager.get_config_obj()
638 638 furl_file = config['controller']['engine_furl_file']
639 639 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
640 640 dstart.addErrback(_err_and_stop)
641 641
642 642
643 643 def main_ssh(args):
644 644 """Start a controller on localhost and engines using ssh.
645 645
646 646 Your clusterfile should look like::
647 647
648 648 send_furl = False # True, if you want
649 649 engines = {
650 650 'engine_host1' : engine_count,
651 651 'engine_host2' : engine_count2
652 652 }
653 653 """
654 654 clusterfile = {}
655 655 execfile(args.clusterfile, clusterfile)
656 656 if not clusterfile.has_key('send_furl'):
657 657 clusterfile['send_furl'] = False
658 658
659 659 cont_args = []
660 660 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
661 661
662 662 # Check security settings before proceeding
663 663 if not check_security(args, cont_args):
664 664 return
665 665
666 666 # See if we are reusing FURL files
667 667 if not check_reuse(args, cont_args):
668 668 return
669 669
670 670 cl = ControllerLauncher(extra_args=cont_args)
671 671 dstart = cl.start()
672 672 def start_engines(cont_pid):
673 673 ssh_set = SSHEngineSet(clusterfile['engines'], sshx=args.sshx)
674 674 def shutdown(signum, frame):
675 675 d = ssh_set.kill()
676 676 cl.interrupt_then_kill(1.0)
677 677 reactor.callLater(2.0, reactor.stop)
678 678 signal.signal(signal.SIGINT,shutdown)
679 679 d = ssh_set.start(clusterfile['send_furl'])
680 680 return d
681 681 config = kernel_config_manager.get_config_obj()
682 682 furl_file = config['controller']['engine_furl_file']
683 683 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
684 684 dstart.addErrback(_err_and_stop)
685 685
686 686
687 687 def get_args():
688 688 base_parser = argparse.ArgumentParser(add_help=False)
689 689 base_parser.add_argument(
690 690 '-r',
691 691 action='store_true',
692 692 dest='r',
693 693 help='try to reuse FURL files. Use with --client-port and --engine-port'
694 694 )
695 695 base_parser.add_argument(
696 696 '--client-port',
697 697 type=int,
698 698 dest='client_port',
699 699 help='the port the controller will listen on for client connections',
700 700 default=0
701 701 )
702 702 base_parser.add_argument(
703 703 '--engine-port',
704 704 type=int,
705 705 dest='engine_port',
706 706 help='the port the controller will listen on for engine connections',
707 707 default=0
708 708 )
709 709 base_parser.add_argument(
710 710 '-x',
711 711 action='store_true',
712 712 dest='x',
713 713 help='turn off client security'
714 714 )
715 715 base_parser.add_argument(
716 716 '-y',
717 717 action='store_true',
718 718 dest='y',
719 719 help='turn off engine security'
720 720 )
721 721 base_parser.add_argument(
722 722 "--logdir",
723 723 type=str,
724 724 dest="logdir",
725 725 help="directory to put log files (default=$IPYTHONDIR/log)",
726 726 default=pjoin(get_ipython_dir(),'log')
727 727 )
728 728 base_parser.add_argument(
729 729 "-n",
730 730 "--num",
731 731 type=int,
732 732 dest="n",
733 733 default=2,
734 734 help="the number of engines to start"
735 735 )
736 736
737 737 parser = argparse.ArgumentParser(
738 738 description='IPython cluster startup. This starts a controller and\
739 engines using various approaches. THIS IS A TECHNOLOGY PREVIEW AND\
740 THE API WILL CHANGE SIGNIFICANTLY BEFORE THE FINAL RELEASE.'
739 engines using various approaches. Use the IPYTHONDIR environment\
740 variable to change your IPython directory from the default of\
741 .ipython or _ipython. The log and security subdirectories of your\
742 IPython directory will be used by this script for log files and\
743 security files.'
741 744 )
742 745 subparsers = parser.add_subparsers(
743 746 help='available cluster types. For help, do "ipcluster TYPE --help"')
744 747
745 748 parser_local = subparsers.add_parser(
746 749 'local',
747 750 help='run a local cluster',
748 751 parents=[base_parser]
749 752 )
750 753 parser_local.set_defaults(func=main_local)
751 754
752 755 parser_mpirun = subparsers.add_parser(
753 756 'mpirun',
754 757 help='run a cluster using mpirun (mpiexec also works)',
755 758 parents=[base_parser]
756 759 )
757 760 parser_mpirun.add_argument(
758 761 "--mpi",
759 762 type=str,
760 763 dest="mpi", # Don't put a default here to allow no MPI support
761 764 help="how to call MPI_Init (default=mpi4py)"
762 765 )
763 766 parser_mpirun.set_defaults(func=main_mpi, cmd='mpirun')
764 767
765 768 parser_mpiexec = subparsers.add_parser(
766 769 'mpiexec',
767 770 help='run a cluster using mpiexec (mpirun also works)',
768 771 parents=[base_parser]
769 772 )
770 773 parser_mpiexec.add_argument(
771 774 "--mpi",
772 775 type=str,
773 776 dest="mpi", # Don't put a default here to allow no MPI support
774 777 help="how to call MPI_Init (default=mpi4py)"
775 778 )
776 779 parser_mpiexec.set_defaults(func=main_mpi, cmd='mpiexec')
777 780
778 781 parser_pbs = subparsers.add_parser(
779 782 'pbs',
780 783 help='run a pbs cluster',
781 784 parents=[base_parser]
782 785 )
783 786 parser_pbs.add_argument(
784 787 '--pbs-script',
785 788 type=str,
786 789 dest='pbsscript',
787 790 help='PBS script template',
788 791 default='pbs.template'
789 792 )
790 793 parser_pbs.set_defaults(func=main_pbs)
791 794
792 795 parser_ssh = subparsers.add_parser(
793 796 'ssh',
794 797 help='run a cluster using ssh, should have ssh-keys setup',
795 798 parents=[base_parser]
796 799 )
797 800 parser_ssh.add_argument(
798 801 '--clusterfile',
799 802 type=str,
800 803 dest='clusterfile',
801 804 help='python file describing the cluster',
802 805 default='clusterfile.py',
803 806 )
804 807 parser_ssh.add_argument(
805 808 '--sshx',
806 809 type=str,
807 810 dest='sshx',
808 811 help='sshx launcher helper'
809 812 )
810 813 parser_ssh.set_defaults(func=main_ssh)
811 814
812 815 args = parser.parse_args()
813 816 return args
814 817
815 818 def main():
816 819 args = get_args()
817 820 reactor.callWhenRunning(args.func, args)
818 821 log.startLogging(sys.stdout)
819 822 reactor.run()
820 823
821 824 if __name__ == '__main__':
822 825 main()
@@ -1,409 +1,416
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """The IPython controller."""
5 5
6 6 __docformat__ = "restructuredtext en"
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2008 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 # Python looks for an empty string at the beginning of sys.path to enable
20 20 # importing from the cwd.
21 21 import sys
22 22 sys.path.insert(0, '')
23 23
24 24 from optparse import OptionParser
25 25 import os
26 26 import time
27 27 import tempfile
28 28
29 29 from twisted.application import internet, service
30 30 from twisted.internet import reactor, error, defer
31 31 from twisted.python import log
32 32
33 33 from IPython.kernel.fcutil import Tub, UnauthenticatedTub, have_crypto
34 34
35 35 # from IPython.tools import growl
36 36 # growl.start("IPython1 Controller")
37 37
38 38 from IPython.kernel.error import SecurityError
39 39 from IPython.kernel import controllerservice
40 40 from IPython.kernel.fcutil import check_furl_file_security
41 41
42 42 # Create various ipython directories if they don't exist.
43 43 # This must be done before IPython.kernel.config is imported.
44 44 from IPython.iplib import user_setup
45 45 from IPython.genutils import get_ipython_dir, get_log_dir, get_security_dir
46 46 if os.name == 'posix':
47 47 rc_suffix = ''
48 48 else:
49 49 rc_suffix = '.ini'
50 50 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
51 51 get_log_dir()
52 52 get_security_dir()
53 53
54 54 from IPython.kernel.config import config_manager as kernel_config_manager
55 55 from IPython.config.cutils import import_item
56 56
57 57
58 58 #-------------------------------------------------------------------------------
59 59 # Code
60 60 #-------------------------------------------------------------------------------
61 61
62 62 def get_temp_furlfile(filename):
63 63 return tempfile.mktemp(dir=os.path.dirname(filename),
64 64 prefix=os.path.basename(filename))
65 65
66 66 def make_tub(ip, port, secure, cert_file):
67 67 """
68 68 Create a listening tub given an ip, port, and cert_file location.
69 69
70 70 :Parameters:
71 71 ip : str
72 72 The ip address that the tub should listen on. Empty means all
73 73 port : int
74 74 The port that the tub should listen on. A value of 0 means
75 75 pick a random port
76 76 secure: boolean
77 77 Will the connection be secure (in the foolscap sense)
78 78 cert_file:
79 79 A filename of a file to be used for theSSL certificate
80 80 """
81 81 if secure:
82 82 if have_crypto:
83 83 tub = Tub(certFile=cert_file)
84 84 else:
85 85 raise SecurityError("""
86 86 OpenSSL/pyOpenSSL is not available, so we can't run in secure mode.
87 87 Try running without security using 'ipcontroller -xy'.
88 88 """)
89 89 else:
90 90 tub = UnauthenticatedTub()
91 91
92 92 # Set the strport based on the ip and port and start listening
93 93 if ip == '':
94 94 strport = "tcp:%i" % port
95 95 else:
96 96 strport = "tcp:%i:interface=%s" % (port, ip)
97 97 listener = tub.listenOn(strport)
98 98
99 99 return tub, listener
100 100
101 101 def make_client_service(controller_service, config):
102 102 """
103 103 Create a service that will listen for clients.
104 104
105 105 This service is simply a `foolscap.Tub` instance that has a set of Referenceables
106 106 registered with it.
107 107 """
108 108
109 109 # Now create the foolscap tub
110 110 ip = config['controller']['client_tub']['ip']
111 111 port = config['controller']['client_tub'].as_int('port')
112 112 location = config['controller']['client_tub']['location']
113 113 secure = config['controller']['client_tub']['secure']
114 114 cert_file = config['controller']['client_tub']['cert_file']
115 115 client_tub, client_listener = make_tub(ip, port, secure, cert_file)
116 116
117 117 # Set the location in the trivial case of localhost
118 118 if ip == 'localhost' or ip == '127.0.0.1':
119 119 location = "127.0.0.1"
120 120
121 121 if not secure:
122 122 log.msg("WARNING: you are running the controller with no client security")
123 123
124 124 def set_location_and_register():
125 125 """Set the location for the tub and return a deferred."""
126 126
127 127 def register(empty, ref, furl_file):
128 128 # We create and then move to make sure that when the file
129 129 # appears to other processes, the buffer has the flushed
130 130 # and the file has been closed
131 131 temp_furl_file = get_temp_furlfile(furl_file)
132 132 client_tub.registerReference(ref, furlFile=temp_furl_file)
133 133 os.rename(temp_furl_file, furl_file)
134 134
135 135 if location == '':
136 136 d = client_tub.setLocationAutomatically()
137 137 else:
138 138 d = defer.maybeDeferred(client_tub.setLocation, "%s:%i" % (location, client_listener.getPortnum()))
139 139
140 140 for ciname, ci in config['controller']['controller_interfaces'].iteritems():
141 141 log.msg("Adapting Controller to interface: %s" % ciname)
142 142 furl_file = ci['furl_file']
143 143 log.msg("Saving furl for interface [%s] to file: %s" % (ciname, furl_file))
144 144 check_furl_file_security(furl_file, secure)
145 145 adapted_controller = import_item(ci['controller_interface'])(controller_service)
146 146 d.addCallback(register, import_item(ci['fc_interface'])(adapted_controller),
147 147 furl_file=ci['furl_file'])
148 148
149 149 reactor.callWhenRunning(set_location_and_register)
150 150 return client_tub
151 151
152 152
153 153 def make_engine_service(controller_service, config):
154 154 """
155 155 Create a service that will listen for engines.
156 156
157 157 This service is simply a `foolscap.Tub` instance that has a set of Referenceables
158 158 registered with it.
159 159 """
160 160
161 161 # Now create the foolscap tub
162 162 ip = config['controller']['engine_tub']['ip']
163 163 port = config['controller']['engine_tub'].as_int('port')
164 164 location = config['controller']['engine_tub']['location']
165 165 secure = config['controller']['engine_tub']['secure']
166 166 cert_file = config['controller']['engine_tub']['cert_file']
167 167 engine_tub, engine_listener = make_tub(ip, port, secure, cert_file)
168 168
169 169 # Set the location in the trivial case of localhost
170 170 if ip == 'localhost' or ip == '127.0.0.1':
171 171 location = "127.0.0.1"
172 172
173 173 if not secure:
174 174 log.msg("WARNING: you are running the controller with no engine security")
175 175
176 176 def set_location_and_register():
177 177 """Set the location for the tub and return a deferred."""
178 178
179 179 def register(empty, ref, furl_file):
180 180 # We create and then move to make sure that when the file
181 181 # appears to other processes, the buffer has the flushed
182 182 # and the file has been closed
183 183 temp_furl_file = get_temp_furlfile(furl_file)
184 184 engine_tub.registerReference(ref, furlFile=temp_furl_file)
185 185 os.rename(temp_furl_file, furl_file)
186 186
187 187 if location == '':
188 188 d = engine_tub.setLocationAutomatically()
189 189 else:
190 190 d = defer.maybeDeferred(engine_tub.setLocation, "%s:%i" % (location, engine_listener.getPortnum()))
191 191
192 192 furl_file = config['controller']['engine_furl_file']
193 193 engine_fc_interface = import_item(config['controller']['engine_fc_interface'])
194 194 log.msg("Saving furl for the engine to file: %s" % furl_file)
195 195 check_furl_file_security(furl_file, secure)
196 196 fc_controller = engine_fc_interface(controller_service)
197 197 d.addCallback(register, fc_controller, furl_file=furl_file)
198 198
199 199 reactor.callWhenRunning(set_location_and_register)
200 200 return engine_tub
201 201
202 202 def start_controller():
203 203 """
204 204 Start the controller by creating the service hierarchy and starting the reactor.
205 205
206 206 This method does the following:
207 207
208 208 * It starts the controller logging
209 209 * In execute an import statement for the controller
210 210 * It creates 2 `foolscap.Tub` instances for the client and the engines
211 211 and registers `foolscap.Referenceables` with the tubs to expose the
212 212 controller to engines and clients.
213 213 """
214 214 config = kernel_config_manager.get_config_obj()
215 215
216 216 # Start logging
217 217 logfile = config['controller']['logfile']
218 218 if logfile:
219 219 logfile = logfile + str(os.getpid()) + '.log'
220 220 try:
221 221 openLogFile = open(logfile, 'w')
222 222 except:
223 223 openLogFile = sys.stdout
224 224 else:
225 225 openLogFile = sys.stdout
226 226 log.startLogging(openLogFile)
227 227
228 228 # Execute any user defined import statements
229 229 cis = config['controller']['import_statement']
230 230 if cis:
231 231 try:
232 232 exec cis in globals(), locals()
233 233 except:
234 234 log.msg("Error running import_statement: %s" % cis)
235 235
236 236 # Delete old furl files unless the reuse_furls is set
237 237 reuse = config['controller']['reuse_furls']
238 238 if not reuse:
239 239 paths = (config['controller']['engine_furl_file'],
240 240 config['controller']['controller_interfaces']['task']['furl_file'],
241 241 config['controller']['controller_interfaces']['multiengine']['furl_file']
242 242 )
243 243 for p in paths:
244 244 if os.path.isfile(p):
245 245 os.remove(p)
246 246
247 247 # Create the service hierarchy
248 248 main_service = service.MultiService()
249 249 # The controller service
250 250 controller_service = controllerservice.ControllerService()
251 251 controller_service.setServiceParent(main_service)
252 252 # The client tub and all its refereceables
253 253 client_service = make_client_service(controller_service, config)
254 254 client_service.setServiceParent(main_service)
255 255 # The engine tub
256 256 engine_service = make_engine_service(controller_service, config)
257 257 engine_service.setServiceParent(main_service)
258 258 # Start the controller service and set things running
259 259 main_service.startService()
260 260 reactor.run()
261 261
262 262 def init_config():
263 263 """
264 264 Initialize the configuration using default and command line options.
265 265 """
266 266
267 parser = OptionParser()
267 parser = OptionParser("""ipcontroller [options]
268
269 Start an IPython controller.
270
271 Use the IPYTHONDIR environment variable to change your IPython directory
272 from the default of .ipython or _ipython. The log and security
273 subdirectories of your IPython directory will be used by this script
274 for log files and security files.""")
268 275
269 276 # Client related options
270 277 parser.add_option(
271 278 "--client-ip",
272 279 type="string",
273 280 dest="client_ip",
274 281 help="the IP address or hostname the controller will listen on for client connections"
275 282 )
276 283 parser.add_option(
277 284 "--client-port",
278 285 type="int",
279 286 dest="client_port",
280 287 help="the port the controller will listen on for client connections"
281 288 )
282 289 parser.add_option(
283 290 '--client-location',
284 291 type="string",
285 292 dest="client_location",
286 293 help="hostname or ip for clients to connect to"
287 294 )
288 295 parser.add_option(
289 296 "-x",
290 297 action="store_false",
291 298 dest="client_secure",
292 299 help="turn off all client security"
293 300 )
294 301 parser.add_option(
295 302 '--client-cert-file',
296 303 type="string",
297 304 dest="client_cert_file",
298 305 help="file to store the client SSL certificate"
299 306 )
300 307 parser.add_option(
301 308 '--task-furl-file',
302 309 type="string",
303 310 dest="task_furl_file",
304 311 help="file to store the FURL for task clients to connect with"
305 312 )
306 313 parser.add_option(
307 314 '--multiengine-furl-file',
308 315 type="string",
309 316 dest="multiengine_furl_file",
310 317 help="file to store the FURL for multiengine clients to connect with"
311 318 )
312 319 # Engine related options
313 320 parser.add_option(
314 321 "--engine-ip",
315 322 type="string",
316 323 dest="engine_ip",
317 324 help="the IP address or hostname the controller will listen on for engine connections"
318 325 )
319 326 parser.add_option(
320 327 "--engine-port",
321 328 type="int",
322 329 dest="engine_port",
323 330 help="the port the controller will listen on for engine connections"
324 331 )
325 332 parser.add_option(
326 333 '--engine-location',
327 334 type="string",
328 335 dest="engine_location",
329 336 help="hostname or ip for engines to connect to"
330 337 )
331 338 parser.add_option(
332 339 "-y",
333 340 action="store_false",
334 341 dest="engine_secure",
335 342 help="turn off all engine security"
336 343 )
337 344 parser.add_option(
338 345 '--engine-cert-file',
339 346 type="string",
340 347 dest="engine_cert_file",
341 348 help="file to store the engine SSL certificate"
342 349 )
343 350 parser.add_option(
344 351 '--engine-furl-file',
345 352 type="string",
346 353 dest="engine_furl_file",
347 354 help="file to store the FURL for engines to connect with"
348 355 )
349 356 parser.add_option(
350 357 "-l", "--logfile",
351 358 type="string",
352 359 dest="logfile",
353 360 help="log file name (default is stdout)"
354 361 )
355 362 parser.add_option(
356 363 "-r",
357 364 action="store_true",
358 365 dest="reuse_furls",
359 366 help="try to reuse all furl files"
360 367 )
361 368
362 369 (options, args) = parser.parse_args()
363 370
364 371 config = kernel_config_manager.get_config_obj()
365 372
366 373 # Update with command line options
367 374 if options.client_ip is not None:
368 375 config['controller']['client_tub']['ip'] = options.client_ip
369 376 if options.client_port is not None:
370 377 config['controller']['client_tub']['port'] = options.client_port
371 378 if options.client_location is not None:
372 379 config['controller']['client_tub']['location'] = options.client_location
373 380 if options.client_secure is not None:
374 381 config['controller']['client_tub']['secure'] = options.client_secure
375 382 if options.client_cert_file is not None:
376 383 config['controller']['client_tub']['cert_file'] = options.client_cert_file
377 384 if options.task_furl_file is not None:
378 385 config['controller']['controller_interfaces']['task']['furl_file'] = options.task_furl_file
379 386 if options.multiengine_furl_file is not None:
380 387 config['controller']['controller_interfaces']['multiengine']['furl_file'] = options.multiengine_furl_file
381 388 if options.engine_ip is not None:
382 389 config['controller']['engine_tub']['ip'] = options.engine_ip
383 390 if options.engine_port is not None:
384 391 config['controller']['engine_tub']['port'] = options.engine_port
385 392 if options.engine_location is not None:
386 393 config['controller']['engine_tub']['location'] = options.engine_location
387 394 if options.engine_secure is not None:
388 395 config['controller']['engine_tub']['secure'] = options.engine_secure
389 396 if options.engine_cert_file is not None:
390 397 config['controller']['engine_tub']['cert_file'] = options.engine_cert_file
391 398 if options.engine_furl_file is not None:
392 399 config['controller']['engine_furl_file'] = options.engine_furl_file
393 400 if options.reuse_furls is not None:
394 401 config['controller']['reuse_furls'] = options.reuse_furls
395 402
396 403 if options.logfile is not None:
397 404 config['controller']['logfile'] = options.logfile
398 405
399 406 kernel_config_manager.update_config_obj(config)
400 407
401 408 def main():
402 409 """
403 410 After creating the configuration information, start the controller.
404 411 """
405 412 init_config()
406 413 start_controller()
407 414
408 415 if __name__ == "__main__":
409 416 main()
@@ -1,186 +1,193
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Start the IPython Engine."""
5 5
6 6 __docformat__ = "restructuredtext en"
7 7
8 8 #-------------------------------------------------------------------------------
9 9 # Copyright (C) 2008 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-------------------------------------------------------------------------------
14 14
15 15 #-------------------------------------------------------------------------------
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 19 # Python looks for an empty string at the beginning of sys.path to enable
20 20 # importing from the cwd.
21 21 import sys
22 22 sys.path.insert(0, '')
23 23
24 24 from optparse import OptionParser
25 25 import os
26 26
27 27 from twisted.application import service
28 28 from twisted.internet import reactor
29 29 from twisted.python import log
30 30
31 31 from IPython.kernel.fcutil import Tub, UnauthenticatedTub
32 32
33 33 from IPython.kernel.core.config import config_manager as core_config_manager
34 34 from IPython.config.cutils import import_item
35 35 from IPython.kernel.engineservice import EngineService
36 36
37 37 # Create various ipython directories if they don't exist.
38 38 # This must be done before IPython.kernel.config is imported.
39 39 from IPython.iplib import user_setup
40 40 from IPython.genutils import get_ipython_dir, get_log_dir, get_security_dir
41 41 if os.name == 'posix':
42 42 rc_suffix = ''
43 43 else:
44 44 rc_suffix = '.ini'
45 45 user_setup(get_ipython_dir(), rc_suffix, mode='install', interactive=False)
46 46 get_log_dir()
47 47 get_security_dir()
48 48
49 49 from IPython.kernel.config import config_manager as kernel_config_manager
50 50 from IPython.kernel.engineconnector import EngineConnector
51 51
52 52
53 53 #-------------------------------------------------------------------------------
54 54 # Code
55 55 #-------------------------------------------------------------------------------
56 56
57 57 def start_engine():
58 58 """
59 59 Start the engine, by creating it and starting the Twisted reactor.
60 60
61 61 This method does:
62 62
63 63 * If it exists, runs the `mpi_import_statement` to call `MPI_Init`
64 64 * Starts the engine logging
65 65 * Creates an IPython shell and wraps it in an `EngineService`
66 66 * Creates a `foolscap.Tub` to use in connecting to a controller.
67 67 * Uses the tub and the `EngineService` along with a Foolscap URL
68 68 (or FURL) to connect to the controller and register the engine
69 69 with the controller
70 70 """
71 71 kernel_config = kernel_config_manager.get_config_obj()
72 72 core_config = core_config_manager.get_config_obj()
73 73
74 74
75 75 # Execute the mpi import statement that needs to call MPI_Init
76 76 global mpi
77 77 mpikey = kernel_config['mpi']['default']
78 78 mpi_import_statement = kernel_config['mpi'].get(mpikey, None)
79 79 if mpi_import_statement is not None:
80 80 try:
81 81 exec mpi_import_statement in globals()
82 82 except:
83 83 mpi = None
84 84 else:
85 85 mpi = None
86 86
87 87 # Start logging
88 88 logfile = kernel_config['engine']['logfile']
89 89 if logfile:
90 90 logfile = logfile + str(os.getpid()) + '.log'
91 91 try:
92 92 openLogFile = open(logfile, 'w')
93 93 except:
94 94 openLogFile = sys.stdout
95 95 else:
96 96 openLogFile = sys.stdout
97 97 log.startLogging(openLogFile)
98 98
99 99 # Create the underlying shell class and EngineService
100 100 shell_class = import_item(core_config['shell']['shell_class'])
101 101 engine_service = EngineService(shell_class, mpi=mpi)
102 102 shell_import_statement = core_config['shell']['import_statement']
103 103 if shell_import_statement:
104 104 try:
105 105 engine_service.execute(shell_import_statement)
106 106 except:
107 107 log.msg("Error running import_statement: %s" % shell_import_statement)
108 108
109 109 # Create the service hierarchy
110 110 main_service = service.MultiService()
111 111 engine_service.setServiceParent(main_service)
112 112 tub_service = Tub()
113 113 tub_service.setServiceParent(main_service)
114 114 # This needs to be called before the connection is initiated
115 115 main_service.startService()
116 116
117 117 # This initiates the connection to the controller and calls
118 118 # register_engine to tell the controller we are ready to do work
119 119 engine_connector = EngineConnector(tub_service)
120 120 furl_file = kernel_config['engine']['furl_file']
121 121 log.msg("Using furl file: %s" % furl_file)
122 122
123 123 def call_connect(engine_service, furl_file):
124 124 d = engine_connector.connect_to_controller(engine_service, furl_file)
125 125 def handle_error(f):
126 126 # If this print statement is replaced by a log.err(f) I get
127 127 # an unhandled error, which makes no sense. I shouldn't have
128 128 # to use a print statement here. My only thought is that
129 129 # at the beginning of the process the logging is still starting up
130 130 print "error connecting to controller:", f.getErrorMessage()
131 131 reactor.callLater(0.1, reactor.stop)
132 132 d.addErrback(handle_error)
133 133
134 134 reactor.callWhenRunning(call_connect, engine_service, furl_file)
135 135 reactor.run()
136 136
137 137
138 138 def init_config():
139 139 """
140 140 Initialize the configuration using default and command line options.
141 141 """
142 142
143 parser = OptionParser()
143 parser = OptionParser("""ipengine [options]
144
145 Start an IPython engine.
146
147 Use the IPYTHONDIR environment variable to change your IPython directory
148 from the default of .ipython or _ipython. The log and security
149 subdirectories of your IPython directory will be used by this script
150 for log files and security files.""")
144 151
145 152 parser.add_option(
146 153 "--furl-file",
147 154 type="string",
148 155 dest="furl_file",
149 156 help="The filename containing the FURL of the controller"
150 157 )
151 158 parser.add_option(
152 159 "--mpi",
153 160 type="string",
154 161 dest="mpi",
155 162 help="How to enable MPI (mpi4py, pytrilinos, or empty string to disable)"
156 163 )
157 164 parser.add_option(
158 165 "-l",
159 166 "--logfile",
160 167 type="string",
161 168 dest="logfile",
162 169 help="log file name (default is stdout)"
163 170 )
164 171
165 172 (options, args) = parser.parse_args()
166 173
167 174 kernel_config = kernel_config_manager.get_config_obj()
168 175 # Now override with command line options
169 176 if options.furl_file is not None:
170 177 kernel_config['engine']['furl_file'] = options.furl_file
171 178 if options.logfile is not None:
172 179 kernel_config['engine']['logfile'] = options.logfile
173 180 if options.mpi is not None:
174 181 kernel_config['mpi']['default'] = options.mpi
175 182
176 183
177 184 def main():
178 185 """
179 186 After creating the configuration information, start the engine.
180 187 """
181 188 init_config()
182 189 start_engine()
183 190
184 191
185 192 if __name__ == "__main__":
186 193 main()
General Comments 0
You need to be logged in to leave comments. Login now