##// END OF EJS Templates
Brian Granger -
Show More
@@ -67,10 +67,10 b' class EngineConnector(object):'
67 self.furl = find_furl(furl_or_file)
67 self.furl = find_furl(furl_or_file)
68 except ValueError:
68 except ValueError:
69 return defer.fail(failure.Failure())
69 return defer.fail(failure.Failure())
70 # return defer.fail(failure.Failure(ValueError('not a valid furl or furl file: %r' % furl_or_file)))
70 else:
71 d = self.tub.getReference(self.furl)
71 d = self.tub.getReference(self.furl)
72 d.addCallbacks(self._register, self._log_failure)
72 d.addCallbacks(self._register, self._log_failure)
73 return d
73 return d
74
74
75 def _log_failure(self, reason):
75 def _log_failure(self, reason):
76 log.err('EngineConnector: engine registration failed:')
76 log.err('EngineConnector: engine registration failed:')
@@ -104,6 +104,9 b' class StopLocalExecution(KernelError):'
104 class SecurityError(KernelError):
104 class SecurityError(KernelError):
105 pass
105 pass
106
106
107 class FileTimeoutError(KernelError):
108 pass
109
107 class CompositeError(KernelError):
110 class CompositeError(KernelError):
108 def __init__(self, message, elist):
111 def __init__(self, message, elist):
109 Exception.__init__(self, *(message, elist))
112 Exception.__init__(self, *(message, elist))
@@ -31,9 +31,10 b' from IPython.external import argparse'
31 from IPython.external import Itpl
31 from IPython.external import Itpl
32 from IPython.genutils import get_ipython_dir, num_cpus
32 from IPython.genutils import get_ipython_dir, num_cpus
33 from IPython.kernel.fcutil import have_crypto
33 from IPython.kernel.fcutil import have_crypto
34 from IPython.kernel.error import SecurityError
34 from IPython.kernel.config import config_manager as kernel_config_manager
35 from IPython.kernel.error import SecurityError, FileTimeoutError
35 from IPython.kernel.fcutil import have_crypto
36 from IPython.kernel.fcutil import have_crypto
36 from IPython.kernel.twistedutil import gatherBoth
37 from IPython.kernel.twistedutil import gatherBoth, wait_for_file
37 from IPython.kernel.util import printer
38 from IPython.kernel.util import printer
38
39
39
40
@@ -469,6 +470,7 b' class SSHEngineSet(object):'
469 # The main functions should then just parse the command line arguments, create
470 # The main functions should then just parse the command line arguments, create
470 # the appropriate class and call a 'start' method.
471 # the appropriate class and call a 'start' method.
471
472
473
472 def check_security(args, cont_args):
474 def check_security(args, cont_args):
473 if (not args.x or not args.y) and not have_crypto:
475 if (not args.x or not args.y) and not have_crypto:
474 log.err("""
476 log.err("""
@@ -482,6 +484,7 b' Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")'
482 cont_args.append('-y')
484 cont_args.append('-y')
483 return True
485 return True
484
486
487
485 def check_reuse(args, cont_args):
488 def check_reuse(args, cont_args):
486 if args.r:
489 if args.r:
487 cont_args.append('-r')
490 cont_args.append('-r')
@@ -495,6 +498,23 b' the --client-port and --engine-port options.""")'
495 cont_args.append('--engine-port=%i' % args.engine_port)
498 cont_args.append('--engine-port=%i' % args.engine_port)
496 return True
499 return True
497
500
501
502 def _err_and_stop(f):
503 log.err(f)
504 reactor.stop()
505
506
507 def _delay_start(cont_pid, start_engines, furl_file, reuse):
508 if not reuse:
509 if os.path.isfile(furl_file):
510 os.unlink(furl_file)
511 log.msg('Waiting for controller to finish starting...')
512 d = wait_for_file(furl_file, delay=0.2, max_tries=50)
513 d.addCallback(lambda _: log.msg('Controller started'))
514 d.addCallback(lambda _: start_engines(cont_pid))
515 return d
516
517
498 def main_local(args):
518 def main_local(args):
499 cont_args = []
519 cont_args = []
500 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
520 cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller'))
@@ -524,13 +544,10 b' def main_local(args):'
524 signal.signal(signal.SIGINT,shutdown)
544 signal.signal(signal.SIGINT,shutdown)
525 d = eset.start(args.n)
545 d = eset.start(args.n)
526 return d
546 return d
527 def delay_start(cont_pid):
547 config = kernel_config_manager.get_config_obj()
528 # This is needed because the controller doesn't start listening
548 furl_file = config['controller']['engine_furl_file']
529 # right when it starts and the controller needs to write
549 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
530 # furl files for the engine to pick up
550 dstart.addErrback(_err_and_stop)
531 reactor.callLater(1.0, start_engines, cont_pid)
532 dstart.addCallback(delay_start)
533 dstart.addErrback(lambda f: f.raiseException())
534
551
535
552
536 def main_mpi(args):
553 def main_mpi(args):
@@ -566,13 +583,10 b' def main_mpi(args):'
566 signal.signal(signal.SIGINT,shutdown)
583 signal.signal(signal.SIGINT,shutdown)
567 d = eset.start()
584 d = eset.start()
568 return d
585 return d
569 def delay_start(cont_pid):
586 config = kernel_config_manager.get_config_obj()
570 # This is needed because the controller doesn't start listening
587 furl_file = config['controller']['engine_furl_file']
571 # right when it starts and the controller needs to write
588 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
572 # furl files for the engine to pick up
589 dstart.addErrback(_err_and_stop)
573 reactor.callLater(1.0, start_engines, cont_pid)
574 dstart.addCallback(delay_start)
575 dstart.addErrback(lambda f: f.raiseException())
576
590
577
591
578 def main_pbs(args):
592 def main_pbs(args):
@@ -599,8 +613,10 b' def main_pbs(args):'
599 signal.signal(signal.SIGINT,shutdown)
613 signal.signal(signal.SIGINT,shutdown)
600 d = pbs_set.start(args.n)
614 d = pbs_set.start(args.n)
601 return d
615 return d
602 dstart.addCallback(start_engines)
616 config = kernel_config_manager.get_config_obj()
603 dstart.addErrback(lambda f: f.raiseException())
617 furl_file = config['controller']['engine_furl_file']
618 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
619 dstart.addErrback(_err_and_stop)
604
620
605
621
606 def main_ssh(args):
622 def main_ssh(args):
@@ -641,12 +657,10 b' def main_ssh(args):'
641 signal.signal(signal.SIGINT,shutdown)
657 signal.signal(signal.SIGINT,shutdown)
642 d = ssh_set.start(clusterfile['send_furl'])
658 d = ssh_set.start(clusterfile['send_furl'])
643 return d
659 return d
644
660 config = kernel_config_manager.get_config_obj()
645 def delay_start(cont_pid):
661 furl_file = config['controller']['engine_furl_file']
646 reactor.callLater(1.0, start_engines, cont_pid)
662 dstart.addCallback(_delay_start, start_engines, furl_file, args.r)
647
663 dstart.addErrback(_err_and_stop)
648 dstart.addCallback(delay_start)
649 dstart.addErrback(lambda f: f.raiseException())
650
664
651
665
652 def get_args():
666 def get_args():
@@ -22,6 +22,7 b' import sys'
22 sys.path.insert(0, '')
22 sys.path.insert(0, '')
23
23
24 import sys, time, os
24 import sys, time, os
25 import tempfile
25 from optparse import OptionParser
26 from optparse import OptionParser
26
27
27 from twisted.application import internet, service
28 from twisted.application import internet, service
@@ -45,6 +46,10 b' from IPython.config.cutils import import_item'
45 # Code
46 # Code
46 #-------------------------------------------------------------------------------
47 #-------------------------------------------------------------------------------
47
48
49 def get_temp_furlfile(filename):
50 return tempfile.mktemp(dir=os.path.dirname(filename),
51 prefix=os.path.basename(filename))
52
48 def make_tub(ip, port, secure, cert_file):
53 def make_tub(ip, port, secure, cert_file):
49 """
54 """
50 Create a listening tub given an ip, port, and cert_file location.
55 Create a listening tub given an ip, port, and cert_file location.
@@ -107,13 +112,19 b' def make_client_service(controller_service, config):'
107 """Set the location for the tub and return a deferred."""
112 """Set the location for the tub and return a deferred."""
108
113
109 def register(empty, ref, furl_file):
114 def register(empty, ref, furl_file):
110 client_tub.registerReference(ref, furlFile=furl_file)
115 # We create and then move to make sure that when the file
116 # appears to other processes, the buffer has the flushed
117 # and the file has been closed
118 temp_furl_file = get_temp_furlfile(furl_file)
119 log.msg(temp_furl_file)
120 client_tub.registerReference(ref, furlFile=temp_furl_file)
121 os.rename(temp_furl_file, furl_file)
111
122
112 if location == '':
123 if location == '':
113 d = client_tub.setLocationAutomatically()
124 d = client_tub.setLocationAutomatically()
114 else:
125 else:
115 d = defer.maybeDeferred(client_tub.setLocation, "%s:%i" % (location, client_listener.getPortnum()))
126 d = defer.maybeDeferred(client_tub.setLocation, "%s:%i" % (location, client_listener.getPortnum()))
116
127
117 for ciname, ci in config['controller']['controller_interfaces'].iteritems():
128 for ciname, ci in config['controller']['controller_interfaces'].iteritems():
118 log.msg("Adapting Controller to interface: %s" % ciname)
129 log.msg("Adapting Controller to interface: %s" % ciname)
119 furl_file = ci['furl_file']
130 furl_file = ci['furl_file']
@@ -154,7 +165,13 b' def make_engine_service(controller_service, config):'
154 """Set the location for the tub and return a deferred."""
165 """Set the location for the tub and return a deferred."""
155
166
156 def register(empty, ref, furl_file):
167 def register(empty, ref, furl_file):
157 engine_tub.registerReference(ref, furlFile=furl_file)
168 # We create and then move to make sure that when the file
169 # appears to other processes, the buffer has the flushed
170 # and the file has been closed
171 temp_furl_file = get_temp_furlfile(furl_file)
172 log.msg(temp_furl_file)
173 engine_tub.registerReference(ref, furlFile=temp_furl_file)
174 os.rename(temp_furl_file, furl_file)
158
175
159 if location == '':
176 if location == '':
160 d = engine_tub.setLocationAutomatically()
177 d = engine_tub.setLocationAutomatically()
@@ -106,13 +106,19 b' def start_engine():'
106 engine_connector = EngineConnector(tub_service)
106 engine_connector = EngineConnector(tub_service)
107 furl_file = kernel_config['engine']['furl_file']
107 furl_file = kernel_config['engine']['furl_file']
108 log.msg("Using furl file: %s" % furl_file)
108 log.msg("Using furl file: %s" % furl_file)
109 d = engine_connector.connect_to_controller(engine_service, furl_file)
110 def handle_error(f):
111 log.err(f)
112 if reactor.running:
113 reactor.stop()
114 d.addErrback(handle_error)
115
109
110 def call_connect(engine_service, furl_file):
111 d = engine_connector.connect_to_controller(engine_service, furl_file)
112 def handle_error(f):
113 # If this print statement is replaced by a log.err(f) I get
114 # an unhandled error, which makes no sense. I shouldn't have
115 # to use a print statement here. My only thought is that
116 # at the beginning of the process the logging is still starting up
117 print "error connecting to controller:", f.getErrorMessage()
118 reactor.callLater(0.1, reactor.stop)
119 d.addErrback(handle_error)
120
121 reactor.callWhenRunning(call_connect, engine_service, furl_file)
116 reactor.run()
122 reactor.run()
117
123
118
124
@@ -4,16 +4,16 b''
4
4
5 __docformat__ = "restructuredtext en"
5 __docformat__ = "restructuredtext en"
6
6
7 #-------------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008 The IPython Development Team
8 # Copyright (C) 2008 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-------------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-------------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-------------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 try:
18 try:
19 from twisted.internet import defer
19 from twisted.internet import defer
@@ -4,16 +4,16 b''
4
4
5 __docformat__ = "restructuredtext en"
5 __docformat__ = "restructuredtext en"
6
6
7 #-------------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008 The IPython Development Team
8 # Copyright (C) 2008 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-------------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-------------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-------------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 try:
18 try:
19 import zope.interface as zi
19 import zope.interface as zi
@@ -31,9 +31,9 b' except ImportError:'
31 import nose
31 import nose
32 raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap")
32 raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap")
33
33
34 #-------------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
35 # Tests
35 # Tests
36 #-------------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37
37
38 class SerializedTestCase(unittest.TestCase):
38 class SerializedTestCase(unittest.TestCase):
39
39
@@ -16,12 +16,15 b' __docformat__ = "restructuredtext en"'
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import os, sys
19 import threading, Queue, atexit
20 import threading, Queue, atexit
20 import twisted
21
21
22 import twisted
22 from twisted.internet import defer, reactor
23 from twisted.internet import defer, reactor
23 from twisted.python import log, failure
24 from twisted.python import log, failure
24
25
26 from IPython.kernel.error import FileTimeoutError
27
25 #-------------------------------------------------------------------------------
28 #-------------------------------------------------------------------------------
26 # Classes related to twisted and threads
29 # Classes related to twisted and threads
27 #-------------------------------------------------------------------------------
30 #-------------------------------------------------------------------------------
@@ -204,3 +207,43 b' class DeferredList(defer.Deferred):'
204 result = None
207 result = None
205
208
206 return result
209 return result
210
211
212 def wait_for_file(filename, delay=0.1, max_tries=10):
213 """Wait (poll) for a file to be created.
214
215 This method returns a Deferred that will fire when a file exists. It
216 works by polling os.path.isfile in time intervals specified by the
217 delay argument. If `max_tries` is reached, it will errback with a
218 `FileTimeoutError`.
219
220 Parameters
221 ----------
222 filename : str
223 The name of the file to wait for.
224 delay : float
225 The time to wait between polls.
226 max_tries : int
227 The max number of attempts before raising `FileTimeoutError`
228
229 Returns
230 -------
231 d : Deferred
232 A Deferred instance that will fire when the file exists.
233 """
234
235 d = defer.Deferred()
236
237 def _test_for_file(filename, attempt=0):
238 if attempt >= max_tries:
239 d.errback(FileTimeoutError(
240 'timeout waiting for file to be created: %s' % filename
241 ))
242 else:
243 if os.path.isfile(filename):
244 d.callback(True)
245 else:
246 reactor.callLater(delay, _test_for_file, filename, attempt+1)
247
248 _test_for_file(filename)
249 return d
@@ -302,18 +302,33 b' The ``--furl-file`` flag works like this::'
302 Make FURL files persistent
302 Make FURL files persistent
303 ---------------------------
303 ---------------------------
304
304
305 At fist glance it may seem that that managing the FURL files is a bit annoying. Going back to the house and key analogy, copying the FURL around each time you start the controller is like having to make a new key every time you want to unlock the door and enter your house. As with your house, you want to be able to create the key (or FURL file) once, and then simply use it at any point in the future.
305 At fist glance it may seem that that managing the FURL files is a bit
306 annoying. Going back to the house and key analogy, copying the FURL around
307 each time you start the controller is like having to make a new key every time
308 you want to unlock the door and enter your house. As with your house, you want
309 to be able to create the key (or FURL file) once, and then simply use it at
310 any point in the future.
306
311
307 This is possible. The only thing you have to do is decide what ports the controller will listen on for the engines and clients. This is done as follows::
312 This is possible. but before you do this, you **must** remove any old FURL
313 files in the :file:`~/.ipython/security` directory.
314
315 .. warning::
316
317 You **must** remove old FURL files before using persistent FURL files.
318
319 Then, The only thing you have to do is decide what ports the controller will
320 listen on for the engines and clients. This is done as follows::
308
321
309 $ ipcontroller -r --client-port=10101 --engine-port=10102
322 $ ipcontroller -r --client-port=10101 --engine-port=10102
310
323
311 These options also work with all of the various modes of
324 These options also work with all of the various modes of
312 :command:`ipcluster`::
325 :command:`ipcluster`::
313
326
314 $ ipcluster local -n 2 -r --client-port=10101 --engine-port=10102
327 $ ipcluster local -n 2 -r --client-port=10101 --engine-port=10102
315
328
316 Then, just copy the furl files over the first time and you are set. You can start and stop the controller and engines any many times as you want in the future, just make sure to tell the controller to use the *same* ports.
329 Then, just copy the furl files over the first time and you are set. You can
330 start and stop the controller and engines any many times as you want in the
331 future, just make sure to tell the controller to use the *same* ports.
317
332
318 .. note::
333 .. note::
319
334
General Comments 0
You need to be logged in to leave comments. Login now