##// END OF EJS Templates
Brian Granger -
Show More
@@ -67,10 +67,10 b' class EngineConnector(object):'
67 67 self.furl = find_furl(furl_or_file)
68 68 except ValueError:
69 69 return defer.fail(failure.Failure())
70 # return defer.fail(failure.Failure(ValueError('not a valid furl or furl file: %r' % furl_or_file)))
71 d = self.tub.getReference(self.furl)
72 d.addCallbacks(self._register, self._log_failure)
73 return d
70 else:
71 d = self.tub.getReference(self.furl)
72 d.addCallbacks(self._register, self._log_failure)
73 return d
74 74
75 75 def _log_failure(self, reason):
76 76 log.err('EngineConnector: engine registration failed:')
@@ -104,6 +104,9 b' class StopLocalExecution(KernelError):'
104 104 class SecurityError(KernelError):
105 105 pass
106 106
107 class FileTimeoutError(KernelError):
108 pass
109
107 110 class CompositeError(KernelError):
108 111 def __init__(self, message, elist):
109 112 Exception.__init__(self, *(message, elist))
@@ -31,9 +31,10 b' from IPython.external import argparse'
31 31 from IPython.external import Itpl
32 32 from IPython.genutils import get_ipython_dir, num_cpus
33 33 from IPython.kernel.fcutil import have_crypto
34 from IPython.kernel.error import SecurityError
34 from IPython.kernel.config import config_manager as kernel_config_manager
35 from IPython.kernel.error import SecurityError, FileTimeoutError
35 36 from IPython.kernel.fcutil import have_crypto
36 from IPython.kernel.twistedutil import gatherBoth
37 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
37 38 from IPython.kernel.util import printer
38 39
39 40
@@ -469,6 +470,7 b' class SSHEngineSet(object):'
469 470 # The main functions should then just parse the command line arguments, create
470 471 # the appropriate class and call a 'start' method.
471 472
473
472 474 def check_security(args, cont_args):
473 475 if (not args.x or not args.y) and not have_crypto:
474 476 log.err("""
@@ -482,6 +484,7 b' Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")'
482 484 cont_args.append('-y')
483 485 return True
484 486
487
485 488 def check_reuse(args, cont_args):
486 489 if args.r:
487 490 cont_args.append('-r')
@@ -495,6 +498,23 b' the --client-port and --engine-port options.""")'
495 498 cont_args.append('--engine-port=%i' % args.engine_port)
496 499 return True
497 500
501
502 def _err_and_stop(f):
503 log.err(f)
504 reactor.stop()
505
506
507 def _delay_start(cont_pid, start_engines, furl_file, reuse):
508 if not reuse:
509 if os.path.isfile(furl_file):
510 os.unlink(furl_file)
511 log.msg('Waiting for controller to finish starting...')
512 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
513 d.addCallback(lambda _: log.msg('Controller started'))
514 d.addCallback(lambda _: start_engines(cont_pid))
515 return d
516
517
498 518 def main_local(args):
499 519 cont_args = []
500 520 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
@@ -524,13 +544,10 b' def main_local(args):'
524 544 signal.signal(signal.SIGINT,shutdown)
525 545 d = eset.start(args.n)
526 546 return d
527 def delay_start(cont_pid):
528 # This is needed because the controller doesn't start listening
529 # right when it starts and the controller needs to write
530 # furl files for the engine to pick up
531 reactor.callLater(1.0, start_engines, cont_pid)
532 dstart.addCallback(delay_start)
533 dstart.addErrback(lambda f: f.raiseException())
547 config = kernel_config_manager.get_config_obj()
548 furl_file = config['controller']['engine_furl_file']
549 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
550 dstart.addErrback(_err_and_stop)
534 551
535 552
536 553 def main_mpi(args):
@@ -566,13 +583,10 b' def main_mpi(args):'
566 583 signal.signal(signal.SIGINT,shutdown)
567 584 d = eset.start()
568 585 return d
569 def delay_start(cont_pid):
570 # This is needed because the controller doesn't start listening
571 # right when it starts and the controller needs to write
572 # furl files for the engine to pick up
573 reactor.callLater(1.0, start_engines, cont_pid)
574 dstart.addCallback(delay_start)
575 dstart.addErrback(lambda f: f.raiseException())
586 config = kernel_config_manager.get_config_obj()
587 furl_file = config['controller']['engine_furl_file']
588 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
589 dstart.addErrback(_err_and_stop)
576 590
577 591
578 592 def main_pbs(args):
@@ -599,8 +613,10 b' def main_pbs(args):'
599 613 signal.signal(signal.SIGINT,shutdown)
600 614 d = pbs_set.start(args.n)
601 615 return d
602 dstart.addCallback(start_engines)
603 dstart.addErrback(lambda f: f.raiseException())
616 config = kernel_config_manager.get_config_obj()
617 furl_file = config['controller']['engine_furl_file']
618 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
619 dstart.addErrback(_err_and_stop)
604 620
605 621
606 622 def main_ssh(args):
@@ -641,12 +657,10 b' def main_ssh(args):'
641 657 signal.signal(signal.SIGINT,shutdown)
642 658 d = ssh_set.start(clusterfile['send_furl'])
643 659 return d
644
645 def delay_start(cont_pid):
646 reactor.callLater(1.0, start_engines, cont_pid)
647
648 dstart.addCallback(delay_start)
649 dstart.addErrback(lambda f: f.raiseException())
660 config = kernel_config_manager.get_config_obj()
661 furl_file = config['controller']['engine_furl_file']
662 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
663 dstart.addErrback(_err_and_stop)
650 664
651 665
652 666 def get_args():
@@ -22,6 +22,7 b' import sys'
22 22 sys.path.insert(0, '')
23 23
24 24 import sys, time, os
25 import tempfile
25 26 from optparse import OptionParser
26 27
27 28 from twisted.application import internet, service
@@ -45,6 +46,10 b' from IPython.config.cutils import import_item'
45 46 # Code
46 47 #-------------------------------------------------------------------------------
47 48
49 def get_temp_furlfile(filename):
50 return tempfile.mktemp(dir=os.path.dirname(filename),
51 prefix=os.path.basename(filename))
52
48 53 def make_tub(ip, port, secure, cert_file):
49 54 """
50 55 Create a listening tub given an ip, port, and cert_file location.
@@ -107,13 +112,19 b' def make_client_service(controller_service, config):'
107 112 """Set the location for the tub and return a deferred."""
108 113
109 114 def register(empty, ref, furl_file):
110 client_tub.registerReference(ref, furlFile=furl_file)
115 # We create and then move to make sure that when the file
116 # appears to other processes, the buffer has the flushed
117 # and the file has been closed
118 temp_furl_file = get_temp_furlfile(furl_file)
119 log.msg(temp_furl_file)
120 client_tub.registerReference(ref, furlFile=temp_furl_file)
121 os.rename(temp_furl_file, furl_file)
111 122
112 123 if location == '':
113 124 d = client_tub.setLocationAutomatically()
114 125 else:
115 126 d = defer.maybeDeferred(client_tub.setLocation, "%s:%i" % (location, client_listener.getPortnum()))
116
127
117 128 for ciname, ci in config['controller']['controller_interfaces'].iteritems():
118 129 log.msg("Adapting Controller to interface: %s" % ciname)
119 130 furl_file = ci['furl_file']
@@ -154,7 +165,13 b' def make_engine_service(controller_service, config):'
154 165 """Set the location for the tub and return a deferred."""
155 166
156 167 def register(empty, ref, furl_file):
157 engine_tub.registerReference(ref, furlFile=furl_file)
168 # We create and then move to make sure that when the file
169 # appears to other processes, the buffer has the flushed
170 # and the file has been closed
171 temp_furl_file = get_temp_furlfile(furl_file)
172 log.msg(temp_furl_file)
173 engine_tub.registerReference(ref, furlFile=temp_furl_file)
174 os.rename(temp_furl_file, furl_file)
158 175
159 176 if location == '':
160 177 d = engine_tub.setLocationAutomatically()
@@ -106,13 +106,19 b' def start_engine():'
106 106 engine_connector = EngineConnector(tub_service)
107 107 furl_file = kernel_config['engine']['furl_file']
108 108 log.msg("Using furl file: %s" % furl_file)
109 d = engine_connector.connect_to_controller(engine_service, furl_file)
110 def handle_error(f):
111 log.err(f)
112 if reactor.running:
113 reactor.stop()
114 d.addErrback(handle_error)
115 109
110 def call_connect(engine_service, furl_file):
111 d = engine_connector.connect_to_controller(engine_service, furl_file)
112 def handle_error(f):
113 # If this print statement is replaced by a log.err(f) I get
114 # an unhandled error, which makes no sense. I shouldn't have
115 # to use a print statement here. My only thought is that
116 # at the beginning of the process the logging is still starting up
117 print "error connecting to controller:", f.getErrorMessage()
118 reactor.callLater(0.1, reactor.stop)
119 d.addErrback(handle_error)
120
121 reactor.callWhenRunning(call_connect, engine_service, furl_file)
116 122 reactor.run()
117 123
118 124
@@ -4,16 +4,16 b''
4 4
5 5 __docformat__ = "restructuredtext en"
6 6
7 #-------------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 #-------------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 13
14 #-------------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 15 # Imports
16 #-------------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 17
18 18 try:
19 19 from twisted.internet import defer
@@ -4,16 +4,16 b''
4 4
5 5 __docformat__ = "restructuredtext en"
6 6
7 #-------------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 #-------------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 13
14 #-------------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 15 # Imports
16 #-------------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 17
18 18 try:
19 19 import zope.interface as zi
@@ -31,9 +31,9 b' except ImportError:'
31 31 import nose
32 32 raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap")
33 33
34 #-------------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
35 35 # Tests
36 #-------------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37 37
38 38 class SerializedTestCase(unittest.TestCase):
39 39
@@ -16,12 +16,15 b' __docformat__ = "restructuredtext en"'
16 16 # Imports
17 17 #-------------------------------------------------------------------------------
18 18
19 import os, sys
19 20 import threading, Queue, atexit
20 import twisted
21 21
22 import twisted
22 23 from twisted.internet import defer, reactor
23 24 from twisted.python import log, failure
24 25
26 from IPython.kernel.error import FileTimeoutError
27
25 28 #-------------------------------------------------------------------------------
26 29 # Classes related to twisted and threads
27 30 #-------------------------------------------------------------------------------
@@ -204,3 +207,43 b' class DeferredList(defer.Deferred):'
204 207 result = None
205 208
206 209 return result
210
211
212 def wait_for_file(filename, delay=0.1, max_tries=10):
213 """Wait (poll) for a file to be created.
214
215 This method returns a Deferred that will fire when a file exists. It
216 works by polling os.path.isfile in time intervals specified by the
217 delay argument. If `max_tries` is reached, it will errback with a
218 `FileTimeoutError`.
219
220 Parameters
221 ----------
222 filename : str
223 The name of the file to wait for.
224 delay : float
225 The time to wait between polls.
226 max_tries : int
227 The max number of attempts before raising `FileTimeoutError`
228
229 Returns
230 -------
231 d : Deferred
232 A Deferred instance that will fire when the file exists.
233 """
234
235 d = defer.Deferred()
236
237 def _test_for_file(filename, attempt=0):
238 if attempt >= max_tries:
239 d.errback(FileTimeoutError(
240 'timeout waiting for file to be created: %s' % filename
241 ))
242 else:
243 if os.path.isfile(filename):
244 d.callback(True)
245 else:
246 reactor.callLater(delay, _test_for_file, filename, attempt+1)
247
248 _test_for_file(filename)
249 return d
@@ -302,18 +302,33 b' The ``--furl-file`` flag works like this::'
302 302 Make FURL files persistent
303 303 ---------------------------
304 304
305 At fist glance it may seem that that managing the FURL files is a bit annoying. Going back to the house and key analogy, copying the FURL around each time you start the controller is like having to make a new key every time you want to unlock the door and enter your house. As with your house, you want to be able to create the key (or FURL file) once, and then simply use it at any point in the future.
305 At fist glance it may seem that that managing the FURL files is a bit
306 annoying. Going back to the house and key analogy, copying the FURL around
307 each time you start the controller is like having to make a new key every time
308 you want to unlock the door and enter your house. As with your house, you want
309 to be able to create the key (or FURL file) once, and then simply use it at
310 any point in the future.
306 311
307 This is possible. The only thing you have to do is decide what ports the controller will listen on for the engines and clients. This is done as follows::
312 This is possible. but before you do this, you **must** remove any old FURL
313 files in the :file:`~/.ipython/security` directory.
314
315 .. warning::
316
317 You **must** remove old FURL files before using persistent FURL files.
318
319 Then, The only thing you have to do is decide what ports the controller will
320 listen on for the engines and clients. This is done as follows::
308 321
309 322 $ ipcontroller -r --client-port=10101 --engine-port=10102
310 323
311 These options also work with all of the various modes of
324 These options also work with all of the various modes of
312 325 :command:`ipcluster`::
313 326
314 327 $ ipcluster local -n 2 -r --client-port=10101 --engine-port=10102
315 328
316 Then, just copy the furl files over the first time and you are set. You can start and stop the controller and engines any many times as you want in the future, just make sure to tell the controller to use the *same* ports.
329 Then, just copy the furl files over the first time and you are set. You can
330 start and stop the controller and engines any many times as you want in the
331 future, just make sure to tell the controller to use the *same* ports.
317 332
318 333 .. note::
319 334
General Comments 0
You need to be logged in to leave comments. Login now