Show More
@@ -67,10 +67,10 b' class EngineConnector(object):' | |||||
67 | self.furl = find_furl(furl_or_file) |
|
67 | self.furl = find_furl(furl_or_file) | |
68 | except ValueError: |
|
68 | except ValueError: | |
69 | return defer.fail(failure.Failure()) |
|
69 | return defer.fail(failure.Failure()) | |
70 | # return defer.fail(failure.Failure(ValueError('not a valid furl or furl file: %r' % furl_or_file))) |
|
70 | else: | |
71 | d = self.tub.getReference(self.furl) |
|
71 | d = self.tub.getReference(self.furl) | |
72 | d.addCallbacks(self._register, self._log_failure) |
|
72 | d.addCallbacks(self._register, self._log_failure) | |
73 | return d |
|
73 | return d | |
74 |
|
74 | |||
75 | def _log_failure(self, reason): |
|
75 | def _log_failure(self, reason): | |
76 | log.err('EngineConnector: engine registration failed:') |
|
76 | log.err('EngineConnector: engine registration failed:') |
@@ -104,6 +104,9 b' class StopLocalExecution(KernelError):' | |||||
104 | class SecurityError(KernelError): |
|
104 | class SecurityError(KernelError): | |
105 | pass |
|
105 | pass | |
106 |
|
106 | |||
|
107 | class FileTimeoutError(KernelError): | |||
|
108 | pass | |||
|
109 | ||||
107 | class CompositeError(KernelError): |
|
110 | class CompositeError(KernelError): | |
108 | def __init__(self, message, elist): |
|
111 | def __init__(self, message, elist): | |
109 | Exception.__init__(self, *(message, elist)) |
|
112 | Exception.__init__(self, *(message, elist)) |
@@ -31,9 +31,10 b' from IPython.external import argparse' | |||||
31 | from IPython.external import Itpl |
|
31 | from IPython.external import Itpl | |
32 | from IPython.genutils import get_ipython_dir, num_cpus |
|
32 | from IPython.genutils import get_ipython_dir, num_cpus | |
33 | from IPython.kernel.fcutil import have_crypto |
|
33 | from IPython.kernel.fcutil import have_crypto | |
34 | from IPython.kernel.error import SecurityError |
|
34 | from IPython.kernel.config import config_manager as kernel_config_manager | |
|
35 | from IPython.kernel.error import SecurityError, FileTimeoutError | |||
35 | from IPython.kernel.fcutil import have_crypto |
|
36 | from IPython.kernel.fcutil import have_crypto | |
36 | from IPython.kernel.twistedutil import gatherBoth |
|
37 | from IPython.kernel.twistedutil import gatherBoth, wait_for_file | |
37 | from IPython.kernel.util import printer |
|
38 | from IPython.kernel.util import printer | |
38 |
|
39 | |||
39 |
|
40 | |||
@@ -469,6 +470,7 b' class SSHEngineSet(object):' | |||||
469 | # The main functions should then just parse the command line arguments, create |
|
470 | # The main functions should then just parse the command line arguments, create | |
470 | # the appropriate class and call a 'start' method. |
|
471 | # the appropriate class and call a 'start' method. | |
471 |
|
472 | |||
|
473 | ||||
472 | def check_security(args, cont_args): |
|
474 | def check_security(args, cont_args): | |
473 | if (not args.x or not args.y) and not have_crypto: |
|
475 | if (not args.x or not args.y) and not have_crypto: | |
474 | log.err(""" |
|
476 | log.err(""" | |
@@ -482,6 +484,7 b' Try running ipcluster with the -xy flags: ipcluster local -xy -n 4""")' | |||||
482 | cont_args.append('-y') |
|
484 | cont_args.append('-y') | |
483 | return True |
|
485 | return True | |
484 |
|
486 | |||
|
487 | ||||
485 | def check_reuse(args, cont_args): |
|
488 | def check_reuse(args, cont_args): | |
486 | if args.r: |
|
489 | if args.r: | |
487 | cont_args.append('-r') |
|
490 | cont_args.append('-r') | |
@@ -495,6 +498,23 b' the --client-port and --engine-port options.""")' | |||||
495 | cont_args.append('--engine-port=%i' % args.engine_port) |
|
498 | cont_args.append('--engine-port=%i' % args.engine_port) | |
496 | return True |
|
499 | return True | |
497 |
|
500 | |||
|
501 | ||||
|
502 | def _err_and_stop(f): | |||
|
503 | log.err(f) | |||
|
504 | reactor.stop() | |||
|
505 | ||||
|
506 | ||||
|
507 | def _delay_start(cont_pid, start_engines, furl_file, reuse): | |||
|
508 | if not reuse: | |||
|
509 | if os.path.isfile(furl_file): | |||
|
510 | os.unlink(furl_file) | |||
|
511 | log.msg('Waiting for controller to finish starting...') | |||
|
512 | d = wait_for_file(furl_file, delay=0.2, max_tries=50) | |||
|
513 | d.addCallback(lambda _: log.msg('Controller started')) | |||
|
514 | d.addCallback(lambda _: start_engines(cont_pid)) | |||
|
515 | return d | |||
|
516 | ||||
|
517 | ||||
498 | def main_local(args): |
|
518 | def main_local(args): | |
499 | cont_args = [] |
|
519 | cont_args = [] | |
500 | cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) |
|
520 | cont_args.append('--logfile=%s' % pjoin(args.logdir,'ipcontroller')) | |
@@ -524,13 +544,10 b' def main_local(args):' | |||||
524 | signal.signal(signal.SIGINT,shutdown) |
|
544 | signal.signal(signal.SIGINT,shutdown) | |
525 | d = eset.start(args.n) |
|
545 | d = eset.start(args.n) | |
526 | return d |
|
546 | return d | |
527 | def delay_start(cont_pid): |
|
547 | config = kernel_config_manager.get_config_obj() | |
528 | # This is needed because the controller doesn't start listening |
|
548 | furl_file = config['controller']['engine_furl_file'] | |
529 | # right when it starts and the controller needs to write |
|
549 | dstart.addCallback(_delay_start, start_engines, furl_file, args.r) | |
530 | # furl files for the engine to pick up |
|
550 | dstart.addErrback(_err_and_stop) | |
531 | reactor.callLater(1.0, start_engines, cont_pid) |
|
|||
532 | dstart.addCallback(delay_start) |
|
|||
533 | dstart.addErrback(lambda f: f.raiseException()) |
|
|||
534 |
|
551 | |||
535 |
|
552 | |||
536 | def main_mpi(args): |
|
553 | def main_mpi(args): | |
@@ -566,13 +583,10 b' def main_mpi(args):' | |||||
566 | signal.signal(signal.SIGINT,shutdown) |
|
583 | signal.signal(signal.SIGINT,shutdown) | |
567 | d = eset.start() |
|
584 | d = eset.start() | |
568 | return d |
|
585 | return d | |
569 | def delay_start(cont_pid): |
|
586 | config = kernel_config_manager.get_config_obj() | |
570 | # This is needed because the controller doesn't start listening |
|
587 | furl_file = config['controller']['engine_furl_file'] | |
571 | # right when it starts and the controller needs to write |
|
588 | dstart.addCallback(_delay_start, start_engines, furl_file, args.r) | |
572 | # furl files for the engine to pick up |
|
589 | dstart.addErrback(_err_and_stop) | |
573 | reactor.callLater(1.0, start_engines, cont_pid) |
|
|||
574 | dstart.addCallback(delay_start) |
|
|||
575 | dstart.addErrback(lambda f: f.raiseException()) |
|
|||
576 |
|
590 | |||
577 |
|
591 | |||
578 | def main_pbs(args): |
|
592 | def main_pbs(args): | |
@@ -599,8 +613,10 b' def main_pbs(args):' | |||||
599 | signal.signal(signal.SIGINT,shutdown) |
|
613 | signal.signal(signal.SIGINT,shutdown) | |
600 | d = pbs_set.start(args.n) |
|
614 | d = pbs_set.start(args.n) | |
601 | return d |
|
615 | return d | |
602 | dstart.addCallback(start_engines) |
|
616 | config = kernel_config_manager.get_config_obj() | |
603 | dstart.addErrback(lambda f: f.raiseException()) |
|
617 | furl_file = config['controller']['engine_furl_file'] | |
|
618 | dstart.addCallback(_delay_start, start_engines, furl_file, args.r) | |||
|
619 | dstart.addErrback(_err_and_stop) | |||
604 |
|
620 | |||
605 |
|
621 | |||
606 | def main_ssh(args): |
|
622 | def main_ssh(args): | |
@@ -641,12 +657,10 b' def main_ssh(args):' | |||||
641 | signal.signal(signal.SIGINT,shutdown) |
|
657 | signal.signal(signal.SIGINT,shutdown) | |
642 | d = ssh_set.start(clusterfile['send_furl']) |
|
658 | d = ssh_set.start(clusterfile['send_furl']) | |
643 | return d |
|
659 | return d | |
644 |
|
660 | config = kernel_config_manager.get_config_obj() | ||
645 | def delay_start(cont_pid): |
|
661 | furl_file = config['controller']['engine_furl_file'] | |
646 | reactor.callLater(1.0, start_engines, cont_pid) |
|
662 | dstart.addCallback(_delay_start, start_engines, furl_file, args.r) | |
647 |
|
663 | dstart.addErrback(_err_and_stop) | ||
648 | dstart.addCallback(delay_start) |
|
|||
649 | dstart.addErrback(lambda f: f.raiseException()) |
|
|||
650 |
|
664 | |||
651 |
|
665 | |||
652 | def get_args(): |
|
666 | def get_args(): |
@@ -22,6 +22,7 b' import sys' | |||||
22 | sys.path.insert(0, '') |
|
22 | sys.path.insert(0, '') | |
23 |
|
23 | |||
24 | import sys, time, os |
|
24 | import sys, time, os | |
|
25 | import tempfile | |||
25 | from optparse import OptionParser |
|
26 | from optparse import OptionParser | |
26 |
|
27 | |||
27 | from twisted.application import internet, service |
|
28 | from twisted.application import internet, service | |
@@ -45,6 +46,10 b' from IPython.config.cutils import import_item' | |||||
45 | # Code |
|
46 | # Code | |
46 | #------------------------------------------------------------------------------- |
|
47 | #------------------------------------------------------------------------------- | |
47 |
|
48 | |||
|
49 | def get_temp_furlfile(filename): | |||
|
50 | return tempfile.mktemp(dir=os.path.dirname(filename), | |||
|
51 | prefix=os.path.basename(filename)) | |||
|
52 | ||||
48 | def make_tub(ip, port, secure, cert_file): |
|
53 | def make_tub(ip, port, secure, cert_file): | |
49 | """ |
|
54 | """ | |
50 | Create a listening tub given an ip, port, and cert_file location. |
|
55 | Create a listening tub given an ip, port, and cert_file location. | |
@@ -107,13 +112,19 b' def make_client_service(controller_service, config):' | |||||
107 | """Set the location for the tub and return a deferred.""" |
|
112 | """Set the location for the tub and return a deferred.""" | |
108 |
|
113 | |||
109 | def register(empty, ref, furl_file): |
|
114 | def register(empty, ref, furl_file): | |
110 | client_tub.registerReference(ref, furlFile=furl_file) |
|
115 | # We create and then move to make sure that when the file | |
|
116 | # appears to other processes, the buffer has the flushed | |||
|
117 | # and the file has been closed | |||
|
118 | temp_furl_file = get_temp_furlfile(furl_file) | |||
|
119 | log.msg(temp_furl_file) | |||
|
120 | client_tub.registerReference(ref, furlFile=temp_furl_file) | |||
|
121 | os.rename(temp_furl_file, furl_file) | |||
111 |
|
122 | |||
112 | if location == '': |
|
123 | if location == '': | |
113 | d = client_tub.setLocationAutomatically() |
|
124 | d = client_tub.setLocationAutomatically() | |
114 | else: |
|
125 | else: | |
115 | d = defer.maybeDeferred(client_tub.setLocation, "%s:%i" % (location, client_listener.getPortnum())) |
|
126 | d = defer.maybeDeferred(client_tub.setLocation, "%s:%i" % (location, client_listener.getPortnum())) | |
116 |
|
|
127 | ||
117 | for ciname, ci in config['controller']['controller_interfaces'].iteritems(): |
|
128 | for ciname, ci in config['controller']['controller_interfaces'].iteritems(): | |
118 | log.msg("Adapting Controller to interface: %s" % ciname) |
|
129 | log.msg("Adapting Controller to interface: %s" % ciname) | |
119 | furl_file = ci['furl_file'] |
|
130 | furl_file = ci['furl_file'] | |
@@ -154,7 +165,13 b' def make_engine_service(controller_service, config):' | |||||
154 | """Set the location for the tub and return a deferred.""" |
|
165 | """Set the location for the tub and return a deferred.""" | |
155 |
|
166 | |||
156 | def register(empty, ref, furl_file): |
|
167 | def register(empty, ref, furl_file): | |
157 | engine_tub.registerReference(ref, furlFile=furl_file) |
|
168 | # We create and then move to make sure that when the file | |
|
169 | # appears to other processes, the buffer has the flushed | |||
|
170 | # and the file has been closed | |||
|
171 | temp_furl_file = get_temp_furlfile(furl_file) | |||
|
172 | log.msg(temp_furl_file) | |||
|
173 | engine_tub.registerReference(ref, furlFile=temp_furl_file) | |||
|
174 | os.rename(temp_furl_file, furl_file) | |||
158 |
|
175 | |||
159 | if location == '': |
|
176 | if location == '': | |
160 | d = engine_tub.setLocationAutomatically() |
|
177 | d = engine_tub.setLocationAutomatically() |
@@ -106,13 +106,19 b' def start_engine():' | |||||
106 | engine_connector = EngineConnector(tub_service) |
|
106 | engine_connector = EngineConnector(tub_service) | |
107 | furl_file = kernel_config['engine']['furl_file'] |
|
107 | furl_file = kernel_config['engine']['furl_file'] | |
108 | log.msg("Using furl file: %s" % furl_file) |
|
108 | log.msg("Using furl file: %s" % furl_file) | |
109 | d = engine_connector.connect_to_controller(engine_service, furl_file) |
|
|||
110 | def handle_error(f): |
|
|||
111 | log.err(f) |
|
|||
112 | if reactor.running: |
|
|||
113 | reactor.stop() |
|
|||
114 | d.addErrback(handle_error) |
|
|||
115 |
|
109 | |||
|
110 | def call_connect(engine_service, furl_file): | |||
|
111 | d = engine_connector.connect_to_controller(engine_service, furl_file) | |||
|
112 | def handle_error(f): | |||
|
113 | # If this print statement is replaced by a log.err(f) I get | |||
|
114 | # an unhandled error, which makes no sense. I shouldn't have | |||
|
115 | # to use a print statement here. My only thought is that | |||
|
116 | # at the beginning of the process the logging is still starting up | |||
|
117 | print "error connecting to controller:", f.getErrorMessage() | |||
|
118 | reactor.callLater(0.1, reactor.stop) | |||
|
119 | d.addErrback(handle_error) | |||
|
120 | ||||
|
121 | reactor.callWhenRunning(call_connect, engine_service, furl_file) | |||
116 | reactor.run() |
|
122 | reactor.run() | |
117 |
|
123 | |||
118 |
|
124 |
@@ -4,16 +4,16 b'' | |||||
4 |
|
4 | |||
5 | __docformat__ = "restructuredtext en" |
|
5 | __docformat__ = "restructuredtext en" | |
6 |
|
6 | |||
7 |
#----------------------------------------------------------------------------- |
|
7 | #----------------------------------------------------------------------------- | |
8 | # Copyright (C) 2008 The IPython Development Team |
|
8 | # Copyright (C) 2008 The IPython Development Team | |
9 | # |
|
9 | # | |
10 | # Distributed under the terms of the BSD License. The full license is in |
|
10 | # Distributed under the terms of the BSD License. The full license is in | |
11 | # the file COPYING, distributed as part of this software. |
|
11 | # the file COPYING, distributed as part of this software. | |
12 |
#----------------------------------------------------------------------------- |
|
12 | #----------------------------------------------------------------------------- | |
13 |
|
13 | |||
14 |
#----------------------------------------------------------------------------- |
|
14 | #----------------------------------------------------------------------------- | |
15 | # Imports |
|
15 | # Imports | |
16 |
#----------------------------------------------------------------------------- |
|
16 | #----------------------------------------------------------------------------- | |
17 |
|
17 | |||
18 | try: |
|
18 | try: | |
19 | from twisted.internet import defer |
|
19 | from twisted.internet import defer |
@@ -4,16 +4,16 b'' | |||||
4 |
|
4 | |||
5 | __docformat__ = "restructuredtext en" |
|
5 | __docformat__ = "restructuredtext en" | |
6 |
|
6 | |||
7 |
#----------------------------------------------------------------------------- |
|
7 | #----------------------------------------------------------------------------- | |
8 | # Copyright (C) 2008 The IPython Development Team |
|
8 | # Copyright (C) 2008 The IPython Development Team | |
9 | # |
|
9 | # | |
10 | # Distributed under the terms of the BSD License. The full license is in |
|
10 | # Distributed under the terms of the BSD License. The full license is in | |
11 | # the file COPYING, distributed as part of this software. |
|
11 | # the file COPYING, distributed as part of this software. | |
12 |
#----------------------------------------------------------------------------- |
|
12 | #----------------------------------------------------------------------------- | |
13 |
|
13 | |||
14 |
#----------------------------------------------------------------------------- |
|
14 | #----------------------------------------------------------------------------- | |
15 | # Imports |
|
15 | # Imports | |
16 |
#----------------------------------------------------------------------------- |
|
16 | #----------------------------------------------------------------------------- | |
17 |
|
17 | |||
18 | try: |
|
18 | try: | |
19 | import zope.interface as zi |
|
19 | import zope.interface as zi | |
@@ -31,9 +31,9 b' except ImportError:' | |||||
31 | import nose |
|
31 | import nose | |
32 | raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap") |
|
32 | raise nose.SkipTest("This test requires zope.interface, Twisted and Foolscap") | |
33 |
|
33 | |||
34 |
#----------------------------------------------------------------------------- |
|
34 | #----------------------------------------------------------------------------- | |
35 | # Tests |
|
35 | # Tests | |
36 |
#----------------------------------------------------------------------------- |
|
36 | #----------------------------------------------------------------------------- | |
37 |
|
37 | |||
38 | class SerializedTestCase(unittest.TestCase): |
|
38 | class SerializedTestCase(unittest.TestCase): | |
39 |
|
39 |
@@ -16,12 +16,15 b' __docformat__ = "restructuredtext en"' | |||||
16 | # Imports |
|
16 | # Imports | |
17 | #------------------------------------------------------------------------------- |
|
17 | #------------------------------------------------------------------------------- | |
18 |
|
18 | |||
|
19 | import os, sys | |||
19 | import threading, Queue, atexit |
|
20 | import threading, Queue, atexit | |
20 | import twisted |
|
|||
21 |
|
21 | |||
|
22 | import twisted | |||
22 | from twisted.internet import defer, reactor |
|
23 | from twisted.internet import defer, reactor | |
23 | from twisted.python import log, failure |
|
24 | from twisted.python import log, failure | |
24 |
|
25 | |||
|
26 | from IPython.kernel.error import FileTimeoutError | |||
|
27 | ||||
25 | #------------------------------------------------------------------------------- |
|
28 | #------------------------------------------------------------------------------- | |
26 | # Classes related to twisted and threads |
|
29 | # Classes related to twisted and threads | |
27 | #------------------------------------------------------------------------------- |
|
30 | #------------------------------------------------------------------------------- | |
@@ -204,3 +207,43 b' class DeferredList(defer.Deferred):' | |||||
204 | result = None |
|
207 | result = None | |
205 |
|
208 | |||
206 | return result |
|
209 | return result | |
|
210 | ||||
|
211 | ||||
|
212 | def wait_for_file(filename, delay=0.1, max_tries=10): | |||
|
213 | """Wait (poll) for a file to be created. | |||
|
214 | ||||
|
215 | This method returns a Deferred that will fire when a file exists. It | |||
|
216 | works by polling os.path.isfile in time intervals specified by the | |||
|
217 | delay argument. If `max_tries` is reached, it will errback with a | |||
|
218 | `FileTimeoutError`. | |||
|
219 | ||||
|
220 | Parameters | |||
|
221 | ---------- | |||
|
222 | filename : str | |||
|
223 | The name of the file to wait for. | |||
|
224 | delay : float | |||
|
225 | The time to wait between polls. | |||
|
226 | max_tries : int | |||
|
227 | The max number of attempts before raising `FileTimeoutError` | |||
|
228 | ||||
|
229 | Returns | |||
|
230 | ------- | |||
|
231 | d : Deferred | |||
|
232 | A Deferred instance that will fire when the file exists. | |||
|
233 | """ | |||
|
234 | ||||
|
235 | d = defer.Deferred() | |||
|
236 | ||||
|
237 | def _test_for_file(filename, attempt=0): | |||
|
238 | if attempt >= max_tries: | |||
|
239 | d.errback(FileTimeoutError( | |||
|
240 | 'timeout waiting for file to be created: %s' % filename | |||
|
241 | )) | |||
|
242 | else: | |||
|
243 | if os.path.isfile(filename): | |||
|
244 | d.callback(True) | |||
|
245 | else: | |||
|
246 | reactor.callLater(delay, _test_for_file, filename, attempt+1) | |||
|
247 | ||||
|
248 | _test_for_file(filename) | |||
|
249 | return d |
@@ -302,18 +302,33 b' The ``--furl-file`` flag works like this::' | |||||
302 | Make FURL files persistent |
|
302 | Make FURL files persistent | |
303 | --------------------------- |
|
303 | --------------------------- | |
304 |
|
304 | |||
305 | At fist glance it may seem that that managing the FURL files is a bit annoying. Going back to the house and key analogy, copying the FURL around each time you start the controller is like having to make a new key every time you want to unlock the door and enter your house. As with your house, you want to be able to create the key (or FURL file) once, and then simply use it at any point in the future. |
|
305 | At fist glance it may seem that that managing the FURL files is a bit | |
|
306 | annoying. Going back to the house and key analogy, copying the FURL around | |||
|
307 | each time you start the controller is like having to make a new key every time | |||
|
308 | you want to unlock the door and enter your house. As with your house, you want | |||
|
309 | to be able to create the key (or FURL file) once, and then simply use it at | |||
|
310 | any point in the future. | |||
306 |
|
311 | |||
307 | This is possible. The only thing you have to do is decide what ports the controller will listen on for the engines and clients. This is done as follows:: |
|
312 | This is possible. but before you do this, you **must** remove any old FURL | |
|
313 | files in the :file:`~/.ipython/security` directory. | |||
|
314 | ||||
|
315 | .. warning:: | |||
|
316 | ||||
|
317 | You **must** remove old FURL files before using persistent FURL files. | |||
|
318 | ||||
|
319 | Then, The only thing you have to do is decide what ports the controller will | |||
|
320 | listen on for the engines and clients. This is done as follows:: | |||
308 |
|
321 | |||
309 | $ ipcontroller -r --client-port=10101 --engine-port=10102 |
|
322 | $ ipcontroller -r --client-port=10101 --engine-port=10102 | |
310 |
|
323 | |||
311 |
These options also work with all of the various modes of |
|
324 | These options also work with all of the various modes of | |
312 | :command:`ipcluster`:: |
|
325 | :command:`ipcluster`:: | |
313 |
|
326 | |||
314 | $ ipcluster local -n 2 -r --client-port=10101 --engine-port=10102 |
|
327 | $ ipcluster local -n 2 -r --client-port=10101 --engine-port=10102 | |
315 |
|
328 | |||
316 | Then, just copy the furl files over the first time and you are set. You can start and stop the controller and engines any many times as you want in the future, just make sure to tell the controller to use the *same* ports. |
|
329 | Then, just copy the furl files over the first time and you are set. You can | |
|
330 | start and stop the controller and engines any many times as you want in the | |||
|
331 | future, just make sure to tell the controller to use the *same* ports. | |||
317 |
|
332 | |||
318 | .. note:: |
|
333 | .. note:: | |
319 |
|
334 |
General Comments 0
You need to be logged in to leave comments.
Login now