From ffa7707d255241b680edf1fbb1fed166d87e45d7 2009-04-18 21:53:37 From: Brian Granger Date: 2009-04-18 21:53:37 Subject: [PATCH] Fix for ipcluster bug: https://bugs.launchpad.net/bugs/358202 To fix this bug, I have added a new function wait_for_file that waits for a file to be created and then fires a Deferred. This is then used in ipcluster to wait for the controller to create the engine FURL file. The controller is also not creating its FURL files as a temp file and the moving them to make sure that the IO buffer has been flushed. --- diff --git a/IPython/kernel/engineconnector.py b/IPython/kernel/engineconnector.py index c7be8a9..389a26d 100644 --- a/IPython/kernel/engineconnector.py +++ b/IPython/kernel/engineconnector.py @@ -67,10 +67,10 @@ class EngineConnector(object): self.furl = find_furl(furl_or_file) except ValueError: return defer.fail(failure.Failure()) - # return defer.fail(failure.Failure(ValueError('not a valid furl or furl file: %r' % furl_or_file))) - d = self.tub.getReference(self.furl) - d.addCallbacks(self._register, self._log_failure) - return d + else: + d = self.tub.getReference(self.furl) + d.addCallbacks(self._register, self._log_failure) + return d def _log_failure(self, reason): log.err('EngineConnector: engine registration failed:') diff --git a/IPython/kernel/error.py b/IPython/kernel/error.py index 3aaa78c..4e86f5b 100644 --- a/IPython/kernel/error.py +++ b/IPython/kernel/error.py @@ -104,6 +104,9 @@ class StopLocalExecution(KernelError): class SecurityError(KernelError): pass +class FileTimeoutError(KernelError): + pass + class CompositeError(KernelError): def __init__(self, message, elist): Exception.__init__(self, *(message, elist)) diff --git a/IPython/kernel/scripts/ipcluster.py b/IPython/kernel/scripts/ipcluster.py index 7401c9d..5cb0ea2 100755 --- a/IPython/kernel/scripts/ipcluster.py +++ b/IPython/kernel/scripts/ipcluster.py @@ -31,9 +31,10 @@ from IPython.external import argparse from IPython.external import Itpl from IPython.genutils import get_ipython_dir, num_cpus from IPython.kernel.fcutil import have_crypto -from IPython.kernel.error import SecurityError +from IPython.kernel.config import config_manager as kernel_config_manager +from IPython.kernel.error import SecurityError, FileTimeoutError from IPython.kernel.fcutil import have_crypto -from IPython.kernel.twistedutil import gatherBoth +from IPython.kernel.twistedutil import gatherBoth, wait_for_file from IPython.kernel.util import printer @@ -469,6 +470,7 @@ class SSHEngineSet(object): # The main functions should then just parse the command line arguments, create # the appropriate class and call a 'start' method. + def check_security(args, cont_args): if (not args.x or not args.y) and not have_crypto: log.err(""" @@ -482,6 +484,7 @@ Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""") cont_args.append('-y') return True + def check_reuse(args, cont_args): if args.r: cont_args.append('-r') @@ -495,6 +498,23 @@ the --client-port and --engine-port options.""") cont_args.append('--engine-port=%i' % args.engine_port) return True + +def _err_and_stop(f): + log.err(f) + reactor.stop() + + +def _delay_start(cont_pid, start_engines, furl_file, reuse): + if not reuse: + if os.path.isfile(furl_file): + os.unlink(furl_file) + log.msg('Waiting for controller to finish starting...') + d = wait_for_file(furl_file, delay=0.2, max_tries=50) + d.addCallback(lambda _: log.msg('Controller started')) + d.addCallback(lambda _: start_engines(cont_pid)) + return d + + def main_local(args): cont_args = [] cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) @@ -524,13 +544,10 @@ def main_local(args): signal.signal(signal.SIGINT,shutdown) d = eset.start(args.n) return d - def delay_start(cont_pid): - # This is needed because the controller doesn't start listening - # right when it starts and the controller needs to write - # furl files for the engine to pick up - reactor.callLater(1.0, start_engines, cont_pid) - dstart.addCallback(delay_start) - dstart.addErrback(lambda f: f.raiseException()) + config = kernel_config_manager.get_config_obj() + furl_file = config['controller']['engine_furl_file'] + dstart.addCallback(_delay_start, start_engines, furl_file, args.r) + dstart.addErrback(_err_and_stop) def main_mpi(args): @@ -566,13 +583,10 @@ def main_mpi(args): signal.signal(signal.SIGINT,shutdown) d = eset.start() return d - def delay_start(cont_pid): - # This is needed because the controller doesn't start listening - # right when it starts and the controller needs to write - # furl files for the engine to pick up - reactor.callLater(1.0, start_engines, cont_pid) - dstart.addCallback(delay_start) - dstart.addErrback(lambda f: f.raiseException()) + config = kernel_config_manager.get_config_obj() + furl_file = config['controller']['engine_furl_file'] + dstart.addCallback(_delay_start, start_engines, furl_file, args.r) + dstart.addErrback(_err_and_stop) def main_pbs(args): @@ -599,8 +613,10 @@ def main_pbs(args): signal.signal(signal.SIGINT,shutdown) d = pbs_set.start(args.n) return d - dstart.addCallback(start_engines) - dstart.addErrback(lambda f: f.raiseException()) + config = kernel_config_manager.get_config_obj() + furl_file = config['controller']['engine_furl_file'] + dstart.addCallback(_delay_start, start_engines, furl_file, args.r) + dstart.addErrback(_err_and_stop) def main_ssh(args): @@ -641,12 +657,10 @@ def main_ssh(args): signal.signal(signal.SIGINT,shutdown) d = ssh_set.start(clusterfile['send_furl']) return d - - def delay_start(cont_pid): - reactor.callLater(1.0, start_engines, cont_pid) - - dstart.addCallback(delay_start) - dstart.addErrback(lambda f: f.raiseException()) + config = kernel_config_manager.get_config_obj() + furl_file = config['controller']['engine_furl_file'] + dstart.addCallback(_delay_start, start_engines, furl_file, args.r) + dstart.addErrback(_err_and_stop) def get_args(): diff --git a/IPython/kernel/scripts/ipcontroller.py b/IPython/kernel/scripts/ipcontroller.py index 2606577..9d062ab 100755 --- a/IPython/kernel/scripts/ipcontroller.py +++ b/IPython/kernel/scripts/ipcontroller.py @@ -22,6 +22,7 @@ import sys sys.path.insert(0, '') import sys, time, os +import tempfile from optparse import OptionParser from twisted.application import internet, service @@ -45,6 +46,10 @@ from IPython.config.cutils import import_item # Code #------------------------------------------------------------------------------- +def get_temp_furlfile(filename): + return tempfile.mktemp(dir=os.path.dirname(filename), + prefix=os.path.basename(filename)) + def make_tub(ip, port, secure, cert_file): """ Create a listening tub given an ip, port, and cert_file location. @@ -107,13 +112,19 @@ def make_client_service(controller_service, config): """Set the location for the tub and return a deferred.""" def register(empty, ref, furl_file): - client_tub.registerReference(ref, furlFile=furl_file) + # We create and then move to make sure that when the file + # appears to other processes, the buffer has the flushed + # and the file has been closed + temp_furl_file = get_temp_furlfile(furl_file) + log.msg(temp_furl_file) + client_tub.registerReference(ref, furlFile=temp_furl_file) + os.rename(temp_furl_file, furl_file) if location == '': d = client_tub.setLocationAutomatically() else: d = defer.maybeDeferred(client_tub.setLocation, "%s:%i" % (location, client_listener.getPortnum())) - + for ciname, ci in config['controller']['controller_interfaces'].iteritems(): log.msg("Adapting Controller to interface: %s" % ciname) furl_file = ci['furl_file'] @@ -154,7 +165,13 @@ def make_engine_service(controller_service, config): """Set the location for the tub and return a deferred.""" def register(empty, ref, furl_file): - engine_tub.registerReference(ref, furlFile=furl_file) + # We create and then move to make sure that when the file + # appears to other processes, the buffer has the flushed + # and the file has been closed + temp_furl_file = get_temp_furlfile(furl_file) + log.msg(temp_furl_file) + engine_tub.registerReference(ref, furlFile=temp_furl_file) + os.rename(temp_furl_file, furl_file) if location == '': d = engine_tub.setLocationAutomatically() diff --git a/IPython/kernel/scripts/ipengine.py b/IPython/kernel/scripts/ipengine.py index cd671f1..fe1fe18 100755 --- a/IPython/kernel/scripts/ipengine.py +++ b/IPython/kernel/scripts/ipengine.py @@ -106,13 +106,19 @@ def start_engine(): engine_connector = EngineConnector(tub_service) furl_file = kernel_config['engine']['furl_file'] log.msg("Using furl file: %s" % furl_file) - d = engine_connector.connect_to_controller(engine_service, furl_file) - def handle_error(f): - log.err(f) - if reactor.running: - reactor.stop() - d.addErrback(handle_error) + def call_connect(engine_service, furl_file): + d = engine_connector.connect_to_controller(engine_service, furl_file) + def handle_error(f): + # If this print statement is replaced by a log.err(f) I get + # an unhandled error, which makes no sense. I shouldn't have + # to use a print statement here. My only thought is that + # at the beginning of the process the logging is still starting up + print "error connecting to controller:", f.getErrorMessage() + reactor.callLater(0.1, reactor.stop) + d.addErrback(handle_error) + + reactor.callWhenRunning(call_connect, engine_service, furl_file) reactor.run() diff --git a/IPython/kernel/tests/test_multiengine.py b/IPython/kernel/tests/test_multiengine.py index 82bf41b..08ca5f5 100644 --- a/IPython/kernel/tests/test_multiengine.py +++ b/IPython/kernel/tests/test_multiengine.py @@ -4,16 +4,16 @@ __docformat__ = "restructuredtext en" -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- # Copyright (C) 2008 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- # Imports -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- try: from twisted.internet import defer diff --git a/IPython/kernel/tests/test_newserialized.py b/IPython/kernel/tests/test_newserialized.py index 747b694..0a9d460 100644 --- a/IPython/kernel/tests/test_newserialized.py +++ b/IPython/kernel/tests/test_newserialized.py @@ -4,16 +4,16 @@ __docformat__ = "restructuredtext en" -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- # Copyright (C) 2008 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- # Imports -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- try: import zope.interface as zi @@ -31,9 +31,9 @@ except ImportError: import nose raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap") -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- # Tests -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- class SerializedTestCase(unittest.TestCase): diff --git a/IPython/kernel/twistedutil.py b/IPython/kernel/twistedutil.py index 6956d38..33dc429 100644 --- a/IPython/kernel/twistedutil.py +++ b/IPython/kernel/twistedutil.py @@ -16,12 +16,15 @@ __docformat__ = "restructuredtext en" # Imports #------------------------------------------------------------------------------- +import os, sys import threading, Queue, atexit -import twisted +import twisted from twisted.internet import defer, reactor from twisted.python import log, failure +from IPython.kernel.error import FileTimeoutError + #------------------------------------------------------------------------------- # Classes related to twisted and threads #------------------------------------------------------------------------------- @@ -204,3 +207,43 @@ class DeferredList(defer.Deferred): result = None return result + + +def wait_for_file(filename, delay=0.1, max_tries=10): + """Wait (poll) for a file to be created. + + This method returns a Deferred that will fire when a file exists. It + works by polling os.path.isfile in time intervals specified by the + delay argument. If `max_tries` is reached, it will errback with a + `FileTimeoutError`. + + Parameters + ---------- + filename : str + The name of the file to wait for. + delay : float + The time to wait between polls. + max_tries : int + The max number of attempts before raising `FileTimeoutError` + + Returns + ------- + d : Deferred + A Deferred instance that will fire when the file exists. + """ + + d = defer.Deferred() + + def _test_for_file(filename, attempt=0): + if attempt >= max_tries: + d.errback(FileTimeoutError( + 'timeout waiting for file to be created: %s' % filename + )) + else: + if os.path.isfile(filename): + d.callback(True) + else: + reactor.callLater(delay, _test_for_file, filename, attempt+1) + + _test_for_file(filename) + return d diff --git a/docs/source/parallel/parallel_process.txt b/docs/source/parallel/parallel_process.txt index 3884d89..d943b56 100644 --- a/docs/source/parallel/parallel_process.txt +++ b/docs/source/parallel/parallel_process.txt @@ -302,18 +302,33 @@ The ``--furl-file`` flag works like this:: Make FURL files persistent --------------------------- -At fist glance it may seem that that managing the FURL files is a bit annoying. Going back to the house and key analogy, copying the FURL around each time you start the controller is like having to make a new key every time you want to unlock the door and enter your house. As with your house, you want to be able to create the key (or FURL file) once, and then simply use it at any point in the future. +At fist glance it may seem that that managing the FURL files is a bit +annoying. Going back to the house and key analogy, copying the FURL around +each time you start the controller is like having to make a new key every time +you want to unlock the door and enter your house. As with your house, you want +to be able to create the key (or FURL file) once, and then simply use it at +any point in the future. -This is possible. The only thing you have to do is decide what ports the controller will listen on for the engines and clients. This is done as follows:: +This is possible. but before you do this, you **must** remove any old FURL +files in the :file:`~/.ipython/security` directory. + +.. warning:: + + You **must** remove old FURL files before using persistent FURL files. + +Then, The only thing you have to do is decide what ports the controller will +listen on for the engines and clients. This is done as follows:: $ ipcontroller -r --client-port=10101 --engine-port=10102 -These options also work with all of the various modes of +These options also work with all of the various modes of :command:`ipcluster`:: $ ipcluster local -n 2 -r --client-port=10101 --engine-port=10102 -Then, just copy the furl files over the first time and you are set. You can start and stop the controller and engines any many times as you want in the future, just make sure to tell the controller to use the *same* ports. +Then, just copy the furl files over the first time and you are set. You can +start and stop the controller and engines any many times as you want in the +future, just make sure to tell the controller to use the *same* ports. .. note::