##// END OF EJS Templates
Most of the new ipcluster is now working, including a nice client.
Brian Granger -
Show More
@@ -0,0 +1,17 b''
1 c = get_config()
2
3 # This can be used at any point in a config file to load a sub config
4 # and merge it into the current one.
5 load_subconfig('ipython_config.py')
6
7 lines = """
8 from IPython.kernel.client import *
9 """
10
11 # You have to make sure that attributes that are containers already
12 # exist before using them. Simple assigning a new list will override
13 # all previous values.
14 if hasattr(c.Global, 'exec_lines'):
15 c.Global.exec_lines.append(lines)
16 else:
17 c.Global.exec_lines = [lines] No newline at end of file
@@ -1,66 +1,67 b''
1 import os
1 import os
2
2
3 c = get_config()
3 c = get_config()
4
4
5 # Options are:
5 # Options are:
6 # * LocalControllerLauncher
6 # * LocalControllerLauncher
7 # * PBSControllerLauncher
7 # * PBSControllerLauncher
8 # c.Global.controller_launcher = 'IPython.kernel.launcher.LocalControllerLauncher'
8 # c.Global.controller_launcher = 'IPython.kernel.launcher.LocalControllerLauncher'
9
9
10 # Options are:
10 # Options are:
11 # * LocalEngineSetLauncher
11 # * LocalEngineSetLauncher
12 # * MPIExecEngineSetLauncher
12 # * MPIExecEngineSetLauncher
13 # * PBSEngineSetLauncher
13 # * PBSEngineSetLauncher
14 # c.Global.engine_launcher = 'IPython.kernel.launcher.LocalEngineSetLauncher'
14 # c.Global.engine_launcher = 'IPython.kernel.launcher.LocalEngineSetLauncher'
15
15
16 # c.Global.log_to_file = False
16 # c.Global.log_to_file = False
17 # c.Global.n = 2
17 # c.Global.n = 2
18 # c.Global.reset_config = False
18 # c.Global.reset_config = False
19 # c.Global.clean_logs = True
19
20
20 # c.MPIExecLauncher.mpi_cmd = ['mpiexec']
21 # c.MPIExecLauncher.mpi_cmd = ['mpiexec']
21 # c.MPIExecLauncher.mpi_args = []
22 # c.MPIExecLauncher.mpi_args = []
22 # c.MPIExecLauncher.program = []
23 # c.MPIExecLauncher.program = []
23 # c.MPIExecLauncher.program_args = []
24 # c.MPIExecLauncher.program_args = []
24 # c.MPIExecLauncher.n = 1
25 # c.MPIExecLauncher.n = 1
25
26
26 # c.SSHLauncher.ssh_cmd = ['ssh']
27 # c.SSHLauncher.ssh_cmd = ['ssh']
27 # c.SSHLauncher.ssh_args = []
28 # c.SSHLauncher.ssh_args = []
28 # c.SSHLauncher.program = []
29 # c.SSHLauncher.program = []
29 # s.SSHLauncher.program_args = []
30 # s.SSHLauncher.program_args = []
30 # c.SSHLauncher.hostname = ''
31 # c.SSHLauncher.hostname = ''
31 # c.SSHLauncher.user = os.environ['USER']
32 # c.SSHLauncher.user = os.environ['USER']
32
33
33 # c.PBSLauncher.submit_command = 'qsub'
34 # c.PBSLauncher.submit_command = 'qsub'
34 # c.PBSLauncher.delete_command = 'qdel'
35 # c.PBSLauncher.delete_command = 'qdel'
35 # c.PBSLauncher.job_id_regexp = '\d+'
36 # c.PBSLauncher.job_id_regexp = '\d+'
36 # c.PBSLauncher.batch_template = """"""
37 # c.PBSLauncher.batch_template = """"""
37 # c.PBSLauncher.batch_file_name = u'pbs_batch_script'
38 # c.PBSLauncher.batch_file_name = u'pbs_batch_script'
38
39
39 # c.LocalControllerLauncher.controller_args = []
40 # c.LocalControllerLauncher.controller_args = []
40
41
41 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
42 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
42 # c.MPIExecControllerLauncher.mpi_args = []
43 # c.MPIExecControllerLauncher.mpi_args = []
43 # c.MPIExecControllerLauncher.controller_args = []
44 # c.MPIExecControllerLauncher.controller_args = []
44 # c.MPIExecControllerLauncher.n = 1
45 # c.MPIExecControllerLauncher.n = 1
45
46
46 # c.PBSControllerLauncher.submit_command = 'qsub'
47 # c.PBSControllerLauncher.submit_command = 'qsub'
47 # c.PBSControllerLauncher.delete_command = 'qdel'
48 # c.PBSControllerLauncher.delete_command = 'qdel'
48 # c.PBSControllerLauncher.job_id_regexp = '\d+'
49 # c.PBSControllerLauncher.job_id_regexp = '\d+'
49 # c.PBSControllerLauncher.batch_template = """"""
50 # c.PBSControllerLauncher.batch_template = """"""
50 # c.PBSLauncher.batch_file_name = u'pbs_batch_script'
51 # c.PBSLauncher.batch_file_name = u'pbs_batch_script'
51
52
52 # c.LocalEngineLauncher.engine_args = []
53 # c.LocalEngineLauncher.engine_args = []
53
54
54 # c.LocalEngineSetLauncher.engine_args = []
55 # c.LocalEngineSetLauncher.engine_args = []
55
56
56 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
57 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
57 # c.MPIExecEngineSetLauncher.mpi_args = []
58 # c.MPIExecEngineSetLauncher.mpi_args = []
58 # c.MPIExecEngineSetLauncher.controller_args = []
59 # c.MPIExecEngineSetLauncher.controller_args = []
59 # c.MPIExecEngineSetLauncher.n = 1
60 # c.MPIExecEngineSetLauncher.n = 1
60
61
61 # c.PBSEngineSetLauncher.submit_command = 'qsub'
62 # c.PBSEngineSetLauncher.submit_command = 'qsub'
62 # c.PBSEngineSetLauncher.delete_command = 'qdel'
63 # c.PBSEngineSetLauncher.delete_command = 'qdel'
63 # c.PBSEngineSetLauncher.job_id_regexp = '\d+'
64 # c.PBSEngineSetLauncher.job_id_regexp = '\d+'
64 # c.PBSEngineSetLauncher.batch_template = """"""
65 # c.PBSEngineSetLauncher.batch_template = """"""
65 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script'
66 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script'
66
67
@@ -1,72 +1,68 b''
1 from IPython.config.loader import Config
1 from IPython.config.loader import Config
2
2
3 c = get_config()
3 c = get_config()
4
4
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Global configuration
6 # Global configuration
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 # Basic Global config attributes
9 # Basic Global config attributes
10 # c.Global.log_to_file = False
10 # c.Global.log_to_file = False
11 # c.Global.clean_logs = True
11 # c.Global.import_statements = ['import math']
12 # c.Global.import_statements = ['import math']
12 # c.Global.reuse_furls = True
13 # c.Global.reuse_furls = True
13 # c.Global.secure = True
14 # c.Global.secure = True
14
15
15 # You shouldn't have to modify these
16 # c.Global.log_dir_name = 'log'
17 # c.Global.security_dir_name = 'security'
18
19
20 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
21 # Configure the client services
17 # Configure the client services
22 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
23
19
24 # Basic client service config attributes
20 # Basic client service config attributes
25 # c.FCClientServiceFactory.ip = ''
21 # c.FCClientServiceFactory.ip = ''
26 # c.FCClientServiceFactory.port = 0
22 # c.FCClientServiceFactory.port = 0
27 # c.FCClientServiceFactory.location = ''
23 # c.FCClientServiceFactory.location = ''
28 # c.FCClientServiceFactory.secure = True
24 # c.FCClientServiceFactory.secure = True
29 # c.FCClientServiceFactory.reuse_furls = False
25 # c.FCClientServiceFactory.reuse_furls = False
30
26
31 # You shouldn't have to modify the rest of this section
27 # You shouldn't have to modify the rest of this section
32 # c.FCClientServiceFactory.cert_file = 'ipcontroller-client.pem'
28 # c.FCClientServiceFactory.cert_file = 'ipcontroller-client.pem'
33
29
34 # default_client_interfaces = Config()
30 # default_client_interfaces = Config()
35 # default_client_interfaces.Task.interface_chain = [
31 # default_client_interfaces.Task.interface_chain = [
36 # 'IPython.kernel.task.ITaskController',
32 # 'IPython.kernel.task.ITaskController',
37 # 'IPython.kernel.taskfc.IFCTaskController'
33 # 'IPython.kernel.taskfc.IFCTaskController'
38 # ]
34 # ]
39 #
35 #
40 # default_client_interfaces.Task.furl_file = 'ipcontroller-tc.furl'
36 # default_client_interfaces.Task.furl_file = 'ipcontroller-tc.furl'
41 #
37 #
42 # default_client_interfaces.MultiEngine.interface_chain = [
38 # default_client_interfaces.MultiEngine.interface_chain = [
43 # 'IPython.kernel.multiengine.IMultiEngine',
39 # 'IPython.kernel.multiengine.IMultiEngine',
44 # 'IPython.kernel.multienginefc.IFCSynchronousMultiEngine'
40 # 'IPython.kernel.multienginefc.IFCSynchronousMultiEngine'
45 # ]
41 # ]
46 #
42 #
47 # default_client_interfaces.MultiEngine.furl_file = 'ipcontroller-mec.furl'
43 # default_client_interfaces.MultiEngine.furl_file = 'ipcontroller-mec.furl'
48 #
44 #
49 # c.FCEngineServiceFactory.interfaces = default_client_interfaces
45 # c.FCEngineServiceFactory.interfaces = default_client_interfaces
50
46
51 #-----------------------------------------------------------------------------
47 #-----------------------------------------------------------------------------
52 # Configure the engine services
48 # Configure the engine services
53 #-----------------------------------------------------------------------------
49 #-----------------------------------------------------------------------------
54
50
55 # Basic config attributes for the engine services
51 # Basic config attributes for the engine services
56 # c.FCEngineServiceFactory.ip = ''
52 # c.FCEngineServiceFactory.ip = ''
57 # c.FCEngineServiceFactory.port = 0
53 # c.FCEngineServiceFactory.port = 0
58 # c.FCEngineServiceFactory.location = ''
54 # c.FCEngineServiceFactory.location = ''
59 # c.FCEngineServiceFactory.secure = True
55 # c.FCEngineServiceFactory.secure = True
60 # c.FCEngineServiceFactory.reuse_furls = False
56 # c.FCEngineServiceFactory.reuse_furls = False
61
57
62 # You shouldn't have to modify the rest of this section
58 # You shouldn't have to modify the rest of this section
63 # c.FCEngineServiceFactory.cert_file = 'ipcontroller-engine.pem'
59 # c.FCEngineServiceFactory.cert_file = 'ipcontroller-engine.pem'
64
60
65 # default_engine_interfaces = Config()
61 # default_engine_interfaces = Config()
66 # default_engine_interfaces.Default.interface_chain = [
62 # default_engine_interfaces.Default.interface_chain = [
67 # 'IPython.kernel.enginefc.IFCControllerBase'
63 # 'IPython.kernel.enginefc.IFCControllerBase'
68 # ]
64 # ]
69 #
65 #
70 # default_engine_interfaces.Default.furl_file = 'ipcontroller-engine.furl'
66 # default_engine_interfaces.Default.furl_file = 'ipcontroller-engine.furl'
71 #
67 #
72 # c.FCEngineServiceFactory.interfaces = default_engine_interfaces
68 # c.FCEngineServiceFactory.interfaces = default_engine_interfaces
@@ -1,24 +1,28 b''
1 c = get_config()
1 c = get_config()
2
2
3 # c.Global.log_to_file = False
3 # c.Global.log_to_file = False
4 # c.Global.clean_logs = False
4 # c.Global.exec_lines = ['import numpy']
5 # c.Global.exec_lines = ['import numpy']
5 # c.Global.log_dir_name = 'log'
6 # c.Global.security_dir_name = 'security'
7 # c.Global.log_level = 10
6 # c.Global.log_level = 10
8 # c.Global.shell_class = 'IPython.kernel.core.interpreter.Interpreter'
7 # c.Global.shell_class = 'IPython.kernel.core.interpreter.Interpreter'
9 # c.Global.furl_file_name = 'ipcontroller-engine.furl'
8 # c.Global.furl_file_name = 'ipcontroller-engine.furl'
10 # c.Global.furl_file = ''
9 # c.Global.furl_file = ''
10 # The max number of connection attemps and the initial delay between
11 # those attemps.
12 # c.Global.connect_delay = 0.1
13 # c.Global.connect_max_tries = 15
14
11
15
12 # c.MPI.use = ''
16 # c.MPI.use = ''
13 # c.MPI.mpi4py = """from mpi4py import MPI as mpi
17 # c.MPI.mpi4py = """from mpi4py import MPI as mpi
14 # mpi.size = mpi.COMM_WORLD.Get_size()
18 # mpi.size = mpi.COMM_WORLD.Get_size()
15 # mpi.rank = mpi.COMM_WORLD.Get_rank()
19 # mpi.rank = mpi.COMM_WORLD.Get_rank()
16 # """
20 # """
17 # c.MPI.pytrilinos = """from PyTrilinos import Epetra
21 # c.MPI.pytrilinos = """from PyTrilinos import Epetra
18 # class SimpleStruct:
22 # class SimpleStruct:
19 # pass
23 # pass
20 # mpi = SimpleStruct()
24 # mpi = SimpleStruct()
21 # mpi.rank = 0
25 # mpi.rank = 0
22 # mpi.size = 0
26 # mpi.size = 0
23 # """
27 # """
24
28
@@ -1,41 +1,42 b''
1 #!/usr/bin/env python
1 # encoding: utf-8
2 # encoding: utf-8
2
3
3 """Asynchronous clients for the IPython controller.
4 """Asynchronous clients for the IPython controller.
4
5
5 This module has clients for using the various interfaces of the controller
6 This module has clients for using the various interfaces of the controller
6 in a fully asynchronous manner. This means that you will need to run the
7 in a fully asynchronous manner. This means that you will need to run the
7 Twisted reactor yourself and that all methods of the client classes return
8 Twisted reactor yourself and that all methods of the client classes return
8 deferreds to the result.
9 deferreds to the result.
9
10
10 The main methods are are `get_*_client` and `get_client`.
11 The main methods are are `get_*_client` and `get_client`.
11 """
12 """
12
13 #-----------------------------------------------------------------------------
13 __docformat__ = "restructuredtext en"
14 # Copyright (C) 2008-2009 The IPython Development Team
14
15 #-------------------------------------------------------------------------------
16 # Copyright (C) 2008 The IPython Development Team
17 #
15 #
18 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
19 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
20 #-------------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
21
19
22 #-------------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
23 # Imports
21 # Imports
24 #-------------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
25
23
26 from IPython.kernel import codeutil
24 from IPython.kernel import codeutil
27 from IPython.kernel.clientconnector import ClientConnector
25 from IPython.kernel.clientconnector import (
26 AsyncClientConnector,
27 AsyncCluster
28 )
28
29
29 # Other things that the user will need
30 # Other things that the user will need
30 from IPython.kernel.task import MapTask, StringTask
31 from IPython.kernel.task import MapTask, StringTask
31 from IPython.kernel.error import CompositeError
32 from IPython.kernel.error import CompositeError
32
33
33 #-------------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
34 # Code
35 # Code
35 #-------------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
36
37
37 _client_tub = ClientConnector()
38 _client_tub = AsyncClientConnector()
38 get_multiengine_client = _client_tub.get_multiengine_client
39 get_multiengine_client = _client_tub.get_multiengine_client
39 get_task_client = _client_tub.get_task_client
40 get_task_client = _client_tub.get_task_client
40 get_client = _client_tub.get_client
41 get_client = _client_tub.get_client
41
42
@@ -1,96 +1,87 b''
1 #!/usr/bin/env python
1 # encoding: utf-8
2 # encoding: utf-8
2
3
3 """This module contains blocking clients for the controller interfaces.
4 """This module contains blocking clients for the controller interfaces.
4
5
5 Unlike the clients in `asyncclient.py`, the clients in this module are fully
6 Unlike the clients in `asyncclient.py`, the clients in this module are fully
6 blocking. This means that methods on the clients return the actual results
7 blocking. This means that methods on the clients return the actual results
7 rather than a deferred to the result. Also, we manage the Twisted reactor
8 rather than a deferred to the result. Also, we manage the Twisted reactor
8 for you. This is done by running the reactor in a thread.
9 for you. This is done by running the reactor in a thread.
9
10
10 The main classes in this module are:
11 The main classes in this module are:
11
12
12 * MultiEngineClient
13 * MultiEngineClient
13 * TaskClient
14 * TaskClient
14 * Task
15 * Task
15 * CompositeError
16 * CompositeError
16 """
17 """
17
18
18 __docformat__ = "restructuredtext en"
19 #-----------------------------------------------------------------------------
19
20 # Copyright (C) 2008-2009 The IPython Development Team
20 #-------------------------------------------------------------------------------
21 # Copyright (C) 2008 The IPython Development Team
22 #
21 #
23 # Distributed under the terms of the BSD License. The full license is in
22 # Distributed under the terms of the BSD License. The full license is in
24 # the file COPYING, distributed as part of this software.
23 # the file COPYING, distributed as part of this software.
25 #-------------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
26
25
27 #-------------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
28 # Imports
27 # Imports
29 #-------------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
30
29
30 from cStringIO import StringIO
31 import sys
31 import sys
32 import warnings
32
33
33 # from IPython.utils import growl
34 # from IPython.utils import growl
34 # growl.start("IPython1 Client")
35 # growl.start("IPython1 Client")
35
36
36
37
37 from twisted.internet import reactor
38 from twisted.internet import reactor
38 from IPython.kernel.clientconnector import ClientConnector
39 from twisted.internet.error import PotentialZombieWarning
40 from twisted.python import log
41
42 from IPython.kernel.clientconnector import ClientConnector, Cluster
39 from IPython.kernel.twistedutil import ReactorInThread
43 from IPython.kernel.twistedutil import ReactorInThread
40 from IPython.kernel.twistedutil import blockingCallFromThread
44 from IPython.kernel.twistedutil import blockingCallFromThread
41
45
42 # These enable various things
46 # These enable various things
43 from IPython.kernel import codeutil
47 from IPython.kernel import codeutil
44 # import IPython.kernel.magic
48 # import IPython.kernel.magic
45
49
46 # Other things that the user will need
50 # Other things that the user will need
47 from IPython.kernel.task import MapTask, StringTask
51 from IPython.kernel.task import MapTask, StringTask
48 from IPython.kernel.error import CompositeError
52 from IPython.kernel.error import CompositeError
49
53
50 #-------------------------------------------------------------------------------
54 #-------------------------------------------------------------------------------
51 # Code
55 # Code
52 #-------------------------------------------------------------------------------
56 #-------------------------------------------------------------------------------
53
57
54 _client_tub = ClientConnector()
58 warnings.simplefilter('ignore', PotentialZombieWarning)
55
56
57 def get_multiengine_client(furl_or_file=''):
58 """Get the blocking MultiEngine client.
59
60 :Parameters:
61 furl_or_file : str
62 A furl or a filename containing a furl. If empty, the
63 default furl_file will be used
64
65 :Returns:
66 The connected MultiEngineClient instance
67 """
68 client = blockingCallFromThread(_client_tub.get_multiengine_client,
69 furl_or_file)
70 return client.adapt_to_blocking_client()
71
72 def get_task_client(furl_or_file=''):
73 """Get the blocking Task client.
74
75 :Parameters:
76 furl_or_file : str
77 A furl or a filename containing a furl. If empty, the
78 default furl_file will be used
79
80 :Returns:
81 The connected TaskClient instance
82 """
83 client = blockingCallFromThread(_client_tub.get_task_client,
84 furl_or_file)
85 return client.adapt_to_blocking_client()
86
59
60 _client_tub = ClientConnector()
87
61
62 get_multiengine_client = _client_tub.get_multiengine_client
63 get_task_client = _client_tub.get_task_client
88 MultiEngineClient = get_multiengine_client
64 MultiEngineClient = get_multiengine_client
89 TaskClient = get_task_client
65 TaskClient = get_task_client
90
66
91
67 twisted_log = StringIO()
68 log.startLogging(sys.stdout, setStdout=0)
92
69
93 # Now we start the reactor in a thread
70 # Now we start the reactor in a thread
94 rit = ReactorInThread()
71 rit = ReactorInThread()
95 rit.setDaemon(True)
72 rit.setDaemon(True)
96 rit.start() No newline at end of file
73 rit.start()
74
75
76
77
78 __all__ = [
79 'MapTask',
80 'StringTask',
81 'MultiEngineClient',
82 'TaskClient',
83 'CompositeError',
84 'get_task_client',
85 'get_multiengine_client',
86 'Cluster'
87 ]
This diff has been collapsed as it changes many lines, (618 lines changed) Show them Hide them
@@ -1,150 +1,584 b''
1 #!/usr/bin/env python
1 # encoding: utf-8
2 # encoding: utf-8
2
3
3 """A class for handling client connections to the controller."""
4 """Facilities for handling client connections to the controller."""
4
5
5 __docformat__ = "restructuredtext en"
6 #-----------------------------------------------------------------------------
6
7 # Copyright (C) 2008-2009 The IPython Development Team
7 #-------------------------------------------------------------------------------
8 # Copyright (C) 2008 The IPython Development Team
9 #
8 #
10 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
12 #-------------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
13
12
14 #-------------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
15 # Imports
14 # Imports
16 #-------------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
17
16
18 from twisted.internet import defer
17 import os
19
18
20 from IPython.kernel.fcutil import Tub, UnauthenticatedTub
19 from IPython.kernel.fcutil import Tub, find_furl
20 from IPython.kernel.clusterdir import ClusterDir, ClusterDirError
21 from IPython.kernel.launcher import IPClusterLauncher
22 from IPython.kernel.twistedutil import gatherBoth, make_deferred
23 from IPython.kernel.twistedutil import blockingCallFromThread
21
24
22 from IPython.kernel.config import config_manager as kernel_config_manager
23 from IPython.utils.importstring import import_item
25 from IPython.utils.importstring import import_item
24 from IPython.kernel.fcutil import find_furl
26 from IPython.utils.genutils import get_ipython_dir
25
27
26 co = kernel_config_manager.get_config_obj()
28 from twisted.internet import defer
27 client_co = co['client']
29 from twisted.python import failure
28
30
29 #-------------------------------------------------------------------------------
31 #-----------------------------------------------------------------------------
30 # The ClientConnector class
32 # The ClientConnector class
31 #-------------------------------------------------------------------------------
33 #-----------------------------------------------------------------------------
32
34
33 class ClientConnector(object):
35
34 """
36 class AsyncClientConnector(object):
35 This class gets remote references from furls and returns the wrapped clients.
37 """A class for getting remote references and clients from furls.
36
38
37 This class is also used in `client.py` and `asyncclient.py` to create
39 This start a single :class:`Tub` for all remote reference and caches
38 a single per client-process Tub.
40 references.
39 """
41 """
40
42
41 def __init__(self):
43 def __init__(self):
42 self._remote_refs = {}
44 self._remote_refs = {}
43 self.tub = Tub()
45 self.tub = Tub()
44 self.tub.startService()
46 self.tub.startService()
45
47
46 def get_reference(self, furl_or_file):
48 def _find_furl(self, profile='default', cluster_dir=None,
49 furl_or_file=None, furl_file_name=None,
50 ipythondir=None):
51 """Find a FURL file by profile+ipythondir or cluster dir.
52
53 This raises an exception if a FURL file can't be found.
47 """
54 """
48 Get a remote reference using a furl or a file containing a furl.
55 # Try by furl_or_file
49
56 if furl_or_file is not None:
57 try:
58 furl = find_furl(furl_or_file)
59 except ValueError:
60 return furl
61
62 if furl_file_name is None:
63 raise ValueError('A furl_file_name must be provided')
64
65 # Try by cluster_dir
66 if cluster_dir is not None:
67 cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
68 sdir = cluster_dir_obj.security_dir
69 furl_file = os.path.join(sdir, furl_file_name)
70 return find_furl(furl_file)
71
72 # Try by profile
73 if ipythondir is None:
74 ipythondir = get_ipython_dir()
75 if profile is not None:
76 cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
77 ipythondir, profile)
78 sdir = cluster_dir_obj.security_dir
79 furl_file = os.path.join(sdir, furl_file_name)
80 return find_furl(furl_file)
81
82 raise ValueError('Could not find a valid FURL file.')
83
84 def get_reference(self, furl_or_file):
85 """Get a remote reference using a furl or a file containing a furl.
86
50 Remote references are cached locally so once a remote reference
87 Remote references are cached locally so once a remote reference
51 has been retrieved for a given furl, the cached version is
88 has been retrieved for a given furl, the cached version is
52 returned.
89 returned.
53
90
54 :Parameters:
91 Parameters
55 furl_or_file : str
92 ----------
56 A furl or a filename containing a furl
93 furl_or_file : str
57
94 A furl or a filename containing a furl
58 :Returns:
95
59 A deferred to a remote reference
96 Returns
97 -------
98 A deferred to a remote reference
60 """
99 """
61 furl = find_furl(furl_or_file)
100 furl = find_furl(furl_or_file)
62 if furl in self._remote_refs:
101 if furl in self._remote_refs:
63 d = defer.succeed(self._remote_refs[furl])
102 d = defer.succeed(self._remote_refs[furl])
64 else:
103 else:
65 d = self.tub.getReference(furl)
104 d = self.tub.getReference(furl)
66 d.addCallback(self.save_ref, furl)
105 d.addCallback(self._save_ref, furl)
67 return d
106 return d
68
107
69 def save_ref(self, ref, furl):
108 def _save_ref(self, ref, furl):
70 """
109 """Cache a remote reference by its furl."""
71 Cache a remote reference by its furl.
72 """
73 self._remote_refs[furl] = ref
110 self._remote_refs[furl] = ref
74 return ref
111 return ref
75
112
76 def get_task_client(self, furl_or_file=''):
113 def get_task_client(self, profile='default', cluster_dir=None,
77 """
114 furl_or_file=None, ipythondir=None):
78 Get the task controller client.
115 """Get the task controller client.
79
116
80 This method is a simple wrapper around `get_client` that allow
117 This method is a simple wrapper around `get_client` that passes in
81 `furl_or_file` to be empty, in which case, the furls is taken
118 the default name of the task client FURL file. Usually only
82 from the default furl file given in the configuration.
119 the ``profile`` option will be needed. If a FURL file can't be
120 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
83
121
84 :Parameters:
122 Parameters
85 furl_or_file : str
123 ----------
86 A furl or a filename containing a furl. If empty, the
124 profile : str
87 default furl_file will be used
125 The name of a cluster directory profile (default="default"). The
88
126 cluster directory "cluster_<profile>" will be searched for
89 :Returns:
127 in ``os.getcwd()``, the ipythondir and then in the directories
90 A deferred to the actual client class
128 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
91 """
129 cluster_dir : str
92 task_co = client_co['client_interfaces']['task']
130 The full path to a cluster directory. This is useful if profiles
93 if furl_or_file:
131 are not being used.
94 ff = furl_or_file
132 furl_or_file : str
95 else:
133 A furl or a filename containing a FURLK. This is useful if you
96 ff = task_co['furl_file']
134 simply know the location of the FURL file.
97 return self.get_client(ff)
135 ipythondir : str
136 The location of the ipythondir if different from the default.
137 This is used if the cluster directory is being found by profile.
98
138
99 def get_multiengine_client(self, furl_or_file=''):
139 Returns
140 -------
141 A deferred to the actual client class.
100 """
142 """
101 Get the multiengine controller client.
143 return self.get_client(
144 profile, cluster_dir, furl_or_file,
145 'ipcontroller-tc.furl', ipythondir
146 )
147
148 def get_multiengine_client(self, profile='default', cluster_dir=None,
149 furl_or_file=None, ipythondir=None):
150 """Get the multiengine controller client.
102
151
103 This method is a simple wrapper around `get_client` that allow
152 This method is a simple wrapper around `get_client` that passes in
104 `furl_or_file` to be empty, in which case, the furls is taken
153 the default name of the task client FURL file. Usually only
105 from the default furl file given in the configuration.
154 the ``profile`` option will be needed. If a FURL file can't be
155 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
106
156
107 :Parameters:
157 Parameters
108 furl_or_file : str
158 ----------
109 A furl or a filename containing a furl. If empty, the
159 profile : str
110 default furl_file will be used
160 The name of a cluster directory profile (default="default"). The
161 cluster directory "cluster_<profile>" will be searched for
162 in ``os.getcwd()``, the ipythondir and then in the directories
163 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
164 cluster_dir : str
165 The full path to a cluster directory. This is useful if profiles
166 are not being used.
167 furl_or_file : str
168 A furl or a filename containing a FURLK. This is useful if you
169 simply know the location of the FURL file.
170 ipythondir : str
171 The location of the ipythondir if different from the default.
172 This is used if the cluster directory is being found by profile.
111
173
112 :Returns:
174 Returns
113 A deferred to the actual client class
175 -------
176 A deferred to the actual client class.
114 """
177 """
115 task_co = client_co['client_interfaces']['multiengine']
178 return self.get_client(
116 if furl_or_file:
179 profile, cluster_dir, furl_or_file,
117 ff = furl_or_file
180 'ipcontroller-mec.furl', ipythondir
118 else:
181 )
119 ff = task_co['furl_file']
120 return self.get_client(ff)
121
182
122 def get_client(self, furl_or_file):
183 def get_client(self, profile='default', cluster_dir=None,
123 """
184 furl_or_file=None, furl_file_name=None, ipythondir=None):
124 Get a remote reference and wrap it in a client by furl.
185 """Get a remote reference and wrap it in a client by furl.
125
186
126 This method first gets a remote reference and then calls its
187 This method is a simple wrapper around `get_client` that passes in
127 `get_client_name` method to find the apprpriate client class
188 the default name of the task client FURL file. Usually only
128 that should be used to wrap the remote reference.
189 the ``profile`` option will be needed. If a FURL file can't be
129
190 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
130 :Parameters:
131 furl_or_file : str
132 A furl or a filename containing a furl
133
191
134 :Returns:
192 Parameters
135 A deferred to the actual client class
193 ----------
194 profile : str
195 The name of a cluster directory profile (default="default"). The
196 cluster directory "cluster_<profile>" will be searched for
197 in ``os.getcwd()``, the ipythondir and then in the directories
198 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
199 cluster_dir : str
200 The full path to a cluster directory. This is useful if profiles
201 are not being used.
202 furl_or_file : str
203 A furl or a filename containing a FURLK. This is useful if you
204 simply know the location of the FURL file.
205 furl_file_name : str
206 The filename (not the full path) of the FURL. This must be
207 provided if ``furl_or_file`` is not.
208 ipythondir : str
209 The location of the ipythondir if different from the default.
210 This is used if the cluster directory is being found by profile.
211
212 Returns
213 -------
214 A deferred to the actual client class.
136 """
215 """
137 furl = find_furl(furl_or_file)
216 try:
217 furl = self._find_furl(
218 profile, cluster_dir, furl_or_file,
219 furl_file_name, ipythondir
220 )
221 except:
222 return defer.fail(failure.Failure())
223
138 d = self.get_reference(furl)
224 d = self.get_reference(furl)
139 def wrap_remote_reference(rr):
225
226 def _wrap_remote_reference(rr):
140 d = rr.callRemote('get_client_name')
227 d = rr.callRemote('get_client_name')
141 d.addCallback(lambda name: import_item(name))
228 d.addCallback(lambda name: import_item(name))
142 def adapt(client_interface):
229 def adapt(client_interface):
143 client = client_interface(rr)
230 client = client_interface(rr)
144 client.tub = self.tub
231 client.tub = self.tub
145 return client
232 return client
146 d.addCallback(adapt)
233 d.addCallback(adapt)
147
234
148 return d
235 return d
149 d.addCallback(wrap_remote_reference)
236
237 d.addCallback(_wrap_remote_reference)
150 return d
238 return d
239
240
241 class ClientConnector(object):
242 """A blocking version of a client connector.
243
244 This class creates a single :class:`Tub` instance and allows remote
245 references and client to be retrieved by their FURLs. Remote references
246 are cached locally and FURL files can be found using profiles and cluster
247 directories.
248 """
249
250 def __init__(self):
251 self.async_cc = AsyncClientConnector()
252
253 def get_task_client(self, profile='default', cluster_dir=None,
254 furl_or_file=None, ipythondir=None):
255 """Get the task client.
256
257 Usually only the ``profile`` option will be needed. If a FURL file
258 can't be found by its profile, use ``cluster_dir`` or
259 ``furl_or_file``.
260
261 Parameters
262 ----------
263 profile : str
264 The name of a cluster directory profile (default="default"). The
265 cluster directory "cluster_<profile>" will be searched for
266 in ``os.getcwd()``, the ipythondir and then in the directories
267 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
268 cluster_dir : str
269 The full path to a cluster directory. This is useful if profiles
270 are not being used.
271 furl_or_file : str
272 A furl or a filename containing a FURLK. This is useful if you
273 simply know the location of the FURL file.
274 ipythondir : str
275 The location of the ipythondir if different from the default.
276 This is used if the cluster directory is being found by profile.
277
278 Returns
279 -------
280 The task client instance.
281 """
282 client = blockingCallFromThread(
283 self.async_cc.get_task_client, profile, cluster_dir,
284 furl_or_file, ipythondir
285 )
286 return client.adapt_to_blocking_client()
287
288 def get_multiengine_client(self, profile='default', cluster_dir=None,
289 furl_or_file=None, ipythondir=None):
290 """Get the multiengine client.
291
292 Usually only the ``profile`` option will be needed. If a FURL file
293 can't be found by its profile, use ``cluster_dir`` or
294 ``furl_or_file``.
295
296 Parameters
297 ----------
298 profile : str
299 The name of a cluster directory profile (default="default"). The
300 cluster directory "cluster_<profile>" will be searched for
301 in ``os.getcwd()``, the ipythondir and then in the directories
302 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
303 cluster_dir : str
304 The full path to a cluster directory. This is useful if profiles
305 are not being used.
306 furl_or_file : str
307 A furl or a filename containing a FURLK. This is useful if you
308 simply know the location of the FURL file.
309 ipythondir : str
310 The location of the ipythondir if different from the default.
311 This is used if the cluster directory is being found by profile.
312
313 Returns
314 -------
315 The multiengine client instance.
316 """
317 client = blockingCallFromThread(
318 self.async_cc.get_multiengine_client, profile, cluster_dir,
319 furl_or_file, ipythondir
320 )
321 return client.adapt_to_blocking_client()
322
323 def get_client(self, profile='default', cluster_dir=None,
324 furl_or_file=None, ipythondir=None):
325 client = blockingCallFromThread(
326 self.async_cc.get_client, profile, cluster_dir,
327 furl_or_file, ipythondir
328 )
329 return client.adapt_to_blocking_client()
330
331
332 class ClusterStateError(Exception):
333 pass
334
335
336 class AsyncCluster(object):
337 """An class that wraps the :command:`ipcluster` script."""
338
339 def __init__(self, profile='default', cluster_dir=None, ipythondir=None,
340 auto_create=False, auto_stop=True):
341 """Create a class to manage an IPython cluster.
342
343 This class calls the :command:`ipcluster` command with the right
344 options to start an IPython cluster. Typically a cluster directory
345 must be created (:command:`ipcluster create`) and configured before
346 using this class. Configuration is done by editing the
347 configuration files in the top level of the cluster directory.
348
349 Parameters
350 ----------
351 profile : str
352 The name of a cluster directory profile (default="default"). The
353 cluster directory "cluster_<profile>" will be searched for
354 in ``os.getcwd()``, the ipythondir and then in the directories
355 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
356 cluster_dir : str
357 The full path to a cluster directory. This is useful if profiles
358 are not being used.
359 furl_or_file : str
360 A furl or a filename containing a FURLK. This is useful if you
361 simply know the location of the FURL file.
362 ipythondir : str
363 The location of the ipythondir if different from the default.
364 This is used if the cluster directory is being found by profile.
365 auto_create : bool
366 Automatically create the cluster directory it is dones't exist.
367 This will usually only make sense if using a local cluster
368 (default=False).
369 auto_stop : bool
370 Automatically stop the cluster when this instance is garbage
371 collected (default=True). This is useful if you want the cluster
372 to live beyond your current process. There is also an instance
373 attribute ``auto_stop`` to change this behavior.
374 """
375 self._setup_cluster_dir(profile, cluster_dir, ipythondir, auto_create)
376 self.state = 'before'
377 self.launcher = None
378 self.client_connector = None
379 self.auto_stop = auto_stop
380
381 def __del__(self):
382 if self.auto_stop and self.state=='running':
383 print "Auto stopping the cluster..."
384 self.stop()
385
386 @property
387 def location(self):
388 if hasattr(self, 'cluster_dir_obj'):
389 return self.cluster_dir_obj.location
390 else:
391 return ''
392
393 @property
394 def running(self):
395 if self.state=='running':
396 return True
397 else:
398 return False
399
400 def _setup_cluster_dir(self, profile, cluster_dir, ipythondir, auto_create):
401 if ipythondir is None:
402 ipythondir = get_ipython_dir()
403 if cluster_dir is not None:
404 try:
405 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
406 except ClusterDirError:
407 pass
408 if profile is not None:
409 try:
410 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
411 ipythondir, profile)
412 except ClusterDirError:
413 pass
414 if auto_create or profile=='default':
415 # This should call 'ipcluster create --profile default
416 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
417 ipythondir, profile)
418 else:
419 raise ClusterDirError('Cluster dir not found.')
420
421 @make_deferred
422 def start(self, n=2):
423 """Start the IPython cluster with n engines.
424
425 Parameters
426 ----------
427 n : int
428 The number of engine to start.
429 """
430 # We might want to add logic to test if the cluster has started
431 # by another process....
432 if not self.state=='running':
433 self.launcher = IPClusterLauncher(os.getcwd())
434 self.launcher.ipcluster_n = n
435 self.launcher.ipcluster_subcommand = 'start'
436 d = self.launcher.start()
437 d.addCallback(self._handle_start)
438 return d
439 else:
440 raise ClusterStateError('Cluster is already running')
441
442 @make_deferred
443 def stop(self):
444 """Stop the IPython cluster if it is running."""
445 if self.state=='running':
446 d1 = self.launcher.observe_stop()
447 d1.addCallback(self._handle_stop)
448 d2 = self.launcher.stop()
449 return gatherBoth([d1, d2], consumeErrors=True)
450 else:
451 raise ClusterStateError("Cluster not running")
452
453 def get_multiengine_client(self):
454 """Get the multiengine client for the running cluster.
455
456 If this fails, it means that the cluster has not finished starting.
457 Usually waiting a few seconds are re-trying will solve this.
458 """
459 if self.client_connector is None:
460 self.client_connector = AsyncClientConnector()
461 return self.client_connector.get_multiengine_client(
462 cluster_dir=self.cluster_dir_obj.location
463 )
464
465 def get_task_client(self):
466 """Get the task client for the running cluster.
467
468 If this fails, it means that the cluster has not finished starting.
469 Usually waiting a few seconds are re-trying will solve this.
470 """
471 if self.client_connector is None:
472 self.client_connector = AsyncClientConnector()
473 return self.client_connector.get_task_client(
474 cluster_dir=self.cluster_dir_obj.location
475 )
476
477 def _handle_start(self, r):
478 self.state = 'running'
479
480 def _handle_stop(self, r):
481 self.state = 'after'
482
483
484 class Cluster(object):
485
486
487 def __init__(self, profile='default', cluster_dir=None, ipythondir=None,
488 auto_create=False, auto_stop=True):
489 """Create a class to manage an IPython cluster.
490
491 This class calls the :command:`ipcluster` command with the right
492 options to start an IPython cluster. Typically a cluster directory
493 must be created (:command:`ipcluster create`) and configured before
494 using this class. Configuration is done by editing the
495 configuration files in the top level of the cluster directory.
496
497 Parameters
498 ----------
499 profile : str
500 The name of a cluster directory profile (default="default"). The
501 cluster directory "cluster_<profile>" will be searched for
502 in ``os.getcwd()``, the ipythondir and then in the directories
503 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
504 cluster_dir : str
505 The full path to a cluster directory. This is useful if profiles
506 are not being used.
507 furl_or_file : str
508 A furl or a filename containing a FURLK. This is useful if you
509 simply know the location of the FURL file.
510 ipythondir : str
511 The location of the ipythondir if different from the default.
512 This is used if the cluster directory is being found by profile.
513 auto_create : bool
514 Automatically create the cluster directory it is dones't exist.
515 This will usually only make sense if using a local cluster
516 (default=False).
517 auto_stop : bool
518 Automatically stop the cluster when this instance is garbage
519 collected (default=True). This is useful if you want the cluster
520 to live beyond your current process. There is also an instance
521 attribute ``auto_stop`` to change this behavior.
522 """
523 self.async_cluster = AsyncCluster(
524 profile, cluster_dir, ipythondir, auto_create, auto_stop
525 )
526 self.cluster_dir_obj = self.async_cluster.cluster_dir_obj
527 self.client_connector = None
528
529 def _set_auto_stop(self, value):
530 self.async_cluster.auto_stop = value
531
532 def _get_auto_stop(self):
533 return self.async_cluster.auto_stop
534
535 auto_stop = property(_get_auto_stop, _set_auto_stop)
536
537 @property
538 def location(self):
539 return self.async_cluster.location
540
541 @property
542 def running(self):
543 return self.async_cluster.running
544
545 def start(self, n=2):
546 """Start the IPython cluster with n engines.
547
548 Parameters
549 ----------
550 n : int
551 The number of engine to start.
552 """
553 return blockingCallFromThread(self.async_cluster.start, n)
554
555 def stop(self):
556 """Stop the IPython cluster if it is running."""
557 return blockingCallFromThread(self.async_cluster.stop)
558
559 def get_multiengine_client(self):
560 """Get the multiengine client for the running cluster.
561
562 If this fails, it means that the cluster has not finished starting.
563 Usually waiting a few seconds are re-trying will solve this.
564 """
565 if self.client_connector is None:
566 self.client_connector = ClientConnector()
567 return self.client_connector.get_multiengine_client(
568 cluster_dir=self.cluster_dir_obj.location
569 )
570
571 def get_task_client(self):
572 """Get the task client for the running cluster.
573
574 If this fails, it means that the cluster has not finished starting.
575 Usually waiting a few seconds are re-trying will solve this.
576 """
577 if self.client_connector is None:
578 self.client_connector = ClientConnector()
579 return self.client_connector.get_task_client(
580 cluster_dir=self.cluster_dir_obj.location
581 )
582
583
584
@@ -1,352 +1,394 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython cluster directory
4 The IPython cluster directory
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import shutil
19 import shutil
20 import sys
21
22 from twisted.python import log
20
23
21 from IPython.core import release
24 from IPython.core import release
22 from IPython.config.loader import PyFileConfigLoader
25 from IPython.config.loader import PyFileConfigLoader
23 from IPython.core.application import Application
26 from IPython.core.application import Application
24 from IPython.core.component import Component
27 from IPython.core.component import Component
25 from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault
28 from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault
26 from IPython.utils.traitlets import Unicode, Bool
29 from IPython.utils.traitlets import Unicode, Bool
27
30
28 #-----------------------------------------------------------------------------
31 #-----------------------------------------------------------------------------
29 # Imports
32 # Imports
30 #-----------------------------------------------------------------------------
33 #-----------------------------------------------------------------------------
31
34
32
35
33 class ClusterDirError(Exception):
36 class ClusterDirError(Exception):
34 pass
37 pass
35
38
36
39
37 class ClusterDir(Component):
40 class ClusterDir(Component):
38 """An object to manage the cluster directory and its resources.
41 """An object to manage the cluster directory and its resources.
39
42
40 The cluster directory is used by :command:`ipcontroller`,
43 The cluster directory is used by :command:`ipcontroller`,
41 :command:`ipcontroller` and :command:`ipcontroller` to manage the
44 :command:`ipcontroller` and :command:`ipcontroller` to manage the
42 configuration, logging and security of these applications.
45 configuration, logging and security of these applications.
43
46
44 This object knows how to find, create and manage these directories. This
47 This object knows how to find, create and manage these directories. This
45 should be used by any code that want's to handle cluster directories.
48 should be used by any code that want's to handle cluster directories.
46 """
49 """
47
50
48 security_dir_name = Unicode('security')
51 security_dir_name = Unicode('security')
49 log_dir_name = Unicode('log')
52 log_dir_name = Unicode('log')
50 security_dir = Unicode()
53 security_dir = Unicode()
51 log_dir = Unicode('')
54 log_dir = Unicode('')
52 location = Unicode('')
55 location = Unicode('')
53
56
54 def __init__(self, location):
57 def __init__(self, location):
55 super(ClusterDir, self).__init__(None)
58 super(ClusterDir, self).__init__(None)
56 self.location = location
59 self.location = location
57
60
58 def _location_changed(self, name, old, new):
61 def _location_changed(self, name, old, new):
59 if not os.path.isdir(new):
62 if not os.path.isdir(new):
60 os.makedirs(new, mode=0777)
63 os.makedirs(new, mode=0777)
61 else:
64 else:
62 os.chmod(new, 0777)
65 os.chmod(new, 0777)
63 self.security_dir = os.path.join(new, self.security_dir_name)
66 self.security_dir = os.path.join(new, self.security_dir_name)
64 self.log_dir = os.path.join(new, self.log_dir_name)
67 self.log_dir = os.path.join(new, self.log_dir_name)
65 self.check_dirs()
68 self.check_dirs()
66
69
67 def _log_dir_changed(self, name, old, new):
70 def _log_dir_changed(self, name, old, new):
68 self.check_log_dir()
71 self.check_log_dir()
69
72
70 def check_log_dir(self):
73 def check_log_dir(self):
71 if not os.path.isdir(self.log_dir):
74 if not os.path.isdir(self.log_dir):
72 os.mkdir(self.log_dir, 0777)
75 os.mkdir(self.log_dir, 0777)
73 else:
76 else:
74 os.chmod(self.log_dir, 0777)
77 os.chmod(self.log_dir, 0777)
75
78
76 def _security_dir_changed(self, name, old, new):
79 def _security_dir_changed(self, name, old, new):
77 self.check_security_dir()
80 self.check_security_dir()
78
81
79 def check_security_dir(self):
82 def check_security_dir(self):
80 if not os.path.isdir(self.security_dir):
83 if not os.path.isdir(self.security_dir):
81 os.mkdir(self.security_dir, 0700)
84 os.mkdir(self.security_dir, 0700)
82 else:
85 else:
83 os.chmod(self.security_dir, 0700)
86 os.chmod(self.security_dir, 0700)
84
87
85 def check_dirs(self):
88 def check_dirs(self):
86 self.check_security_dir()
89 self.check_security_dir()
87 self.check_log_dir()
90 self.check_log_dir()
88
91
89 def load_config_file(self, filename):
92 def load_config_file(self, filename):
90 """Load a config file from the top level of the cluster dir.
93 """Load a config file from the top level of the cluster dir.
91
94
92 Parameters
95 Parameters
93 ----------
96 ----------
94 filename : unicode or str
97 filename : unicode or str
95 The filename only of the config file that must be located in
98 The filename only of the config file that must be located in
96 the top-level of the cluster directory.
99 the top-level of the cluster directory.
97 """
100 """
98 loader = PyFileConfigLoader(filename, self.location)
101 loader = PyFileConfigLoader(filename, self.location)
99 return loader.load_config()
102 return loader.load_config()
100
103
101 def copy_config_file(self, config_file, path=None, overwrite=False):
104 def copy_config_file(self, config_file, path=None, overwrite=False):
102 """Copy a default config file into the active cluster directory.
105 """Copy a default config file into the active cluster directory.
103
106
104 Default configuration files are kept in :mod:`IPython.config.default`.
107 Default configuration files are kept in :mod:`IPython.config.default`.
105 This function moves these from that location to the working cluster
108 This function moves these from that location to the working cluster
106 directory.
109 directory.
107 """
110 """
108 if path is None:
111 if path is None:
109 import IPython.config.default
112 import IPython.config.default
110 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
113 path = IPython.config.default.__file__.split(os.path.sep)[:-1]
111 path = os.path.sep.join(path)
114 path = os.path.sep.join(path)
112 src = os.path.join(path, config_file)
115 src = os.path.join(path, config_file)
113 dst = os.path.join(self.location, config_file)
116 dst = os.path.join(self.location, config_file)
114 if not os.path.isfile(dst) or overwrite:
117 if not os.path.isfile(dst) or overwrite:
115 shutil.copy(src, dst)
118 shutil.copy(src, dst)
116
119
117 def copy_all_config_files(self, path=None, overwrite=False):
120 def copy_all_config_files(self, path=None, overwrite=False):
118 """Copy all config files into the active cluster directory."""
121 """Copy all config files into the active cluster directory."""
119 for f in ['ipcontroller_config.py', 'ipengine_config.py',
122 for f in ['ipcontroller_config.py', 'ipengine_config.py',
120 'ipcluster_config.py']:
123 'ipcluster_config.py']:
121 self.copy_config_file(f, path=path, overwrite=overwrite)
124 self.copy_config_file(f, path=path, overwrite=overwrite)
122
125
123 @classmethod
126 @classmethod
124 def create_cluster_dir(csl, cluster_dir):
127 def create_cluster_dir(csl, cluster_dir):
125 """Create a new cluster directory given a full path.
128 """Create a new cluster directory given a full path.
126
129
127 Parameters
130 Parameters
128 ----------
131 ----------
129 cluster_dir : str
132 cluster_dir : str
130 The full path to the cluster directory. If it does exist, it will
133 The full path to the cluster directory. If it does exist, it will
131 be used. If not, it will be created.
134 be used. If not, it will be created.
132 """
135 """
133 return ClusterDir(cluster_dir)
136 return ClusterDir(cluster_dir)
134
137
135 @classmethod
138 @classmethod
136 def create_cluster_dir_by_profile(cls, path, profile='default'):
139 def create_cluster_dir_by_profile(cls, path, profile='default'):
137 """Create a cluster dir by profile name and path.
140 """Create a cluster dir by profile name and path.
138
141
139 Parameters
142 Parameters
140 ----------
143 ----------
141 path : str
144 path : str
142 The path (directory) to put the cluster directory in.
145 The path (directory) to put the cluster directory in.
143 profile : str
146 profile : str
144 The name of the profile. The name of the cluster directory will
147 The name of the profile. The name of the cluster directory will
145 be "cluster_<profile>".
148 be "cluster_<profile>".
146 """
149 """
147 if not os.path.isdir(path):
150 if not os.path.isdir(path):
148 raise ClusterDirError('Directory not found: %s' % path)
151 raise ClusterDirError('Directory not found: %s' % path)
149 cluster_dir = os.path.join(path, 'cluster_' + profile)
152 cluster_dir = os.path.join(path, 'cluster_' + profile)
150 return ClusterDir(cluster_dir)
153 return ClusterDir(cluster_dir)
151
154
152 @classmethod
155 @classmethod
153 def find_cluster_dir_by_profile(cls, ipythondir, profile='default'):
156 def find_cluster_dir_by_profile(cls, ipythondir, profile='default'):
154 """Find an existing cluster dir by profile name, return its ClusterDir.
157 """Find an existing cluster dir by profile name, return its ClusterDir.
155
158
156 This searches through a sequence of paths for a cluster dir. If it
159 This searches through a sequence of paths for a cluster dir. If it
157 is not found, a :class:`ClusterDirError` exception will be raised.
160 is not found, a :class:`ClusterDirError` exception will be raised.
158
161
159 The search path algorithm is:
162 The search path algorithm is:
160 1. ``os.getcwd()``
163 1. ``os.getcwd()``
161 2. ``ipythondir``
164 2. ``ipythondir``
162 3. The directories found in the ":" separated
165 3. The directories found in the ":" separated
163 :env:`IPCLUSTERDIR_PATH` environment variable.
166 :env:`IPCLUSTERDIR_PATH` environment variable.
164
167
165 Parameters
168 Parameters
166 ----------
169 ----------
167 ipythondir : unicode or str
170 ipythondir : unicode or str
168 The IPython directory to use.
171 The IPython directory to use.
169 profile : unicode or str
172 profile : unicode or str
170 The name of the profile. The name of the cluster directory
173 The name of the profile. The name of the cluster directory
171 will be "cluster_<profile>".
174 will be "cluster_<profile>".
172 """
175 """
173 dirname = 'cluster_' + profile
176 dirname = 'cluster_' + profile
174 cluster_dir_paths = os.environ.get('IPCLUSTERDIR_PATH','')
177 cluster_dir_paths = os.environ.get('IPCLUSTERDIR_PATH','')
175 if cluster_dir_paths:
178 if cluster_dir_paths:
176 cluster_dir_paths = cluster_dir_paths.split(':')
179 cluster_dir_paths = cluster_dir_paths.split(':')
177 else:
180 else:
178 cluster_dir_paths = []
181 cluster_dir_paths = []
179 paths = [os.getcwd(), ipythondir] + cluster_dir_paths
182 paths = [os.getcwd(), ipythondir] + cluster_dir_paths
180 for p in paths:
183 for p in paths:
181 cluster_dir = os.path.join(p, dirname)
184 cluster_dir = os.path.join(p, dirname)
182 if os.path.isdir(cluster_dir):
185 if os.path.isdir(cluster_dir):
183 return ClusterDir(cluster_dir)
186 return ClusterDir(cluster_dir)
184 else:
187 else:
185 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
188 raise ClusterDirError('Cluster directory not found in paths: %s' % dirname)
186
189
187 @classmethod
190 @classmethod
188 def find_cluster_dir(cls, cluster_dir):
191 def find_cluster_dir(cls, cluster_dir):
189 """Find/create a cluster dir and return its ClusterDir.
192 """Find/create a cluster dir and return its ClusterDir.
190
193
191 This will create the cluster directory if it doesn't exist.
194 This will create the cluster directory if it doesn't exist.
192
195
193 Parameters
196 Parameters
194 ----------
197 ----------
195 cluster_dir : unicode or str
198 cluster_dir : unicode or str
196 The path of the cluster directory. This is expanded using
199 The path of the cluster directory. This is expanded using
197 :func:`os.path.expandvars` and :func:`os.path.expanduser`.
200 :func:`os.path.expandvars` and :func:`os.path.expanduser`.
198 """
201 """
199 cluster_dir = os.path.expandvars(os.path.expanduser(cluster_dir))
202 cluster_dir = os.path.expandvars(os.path.expanduser(cluster_dir))
200 if not os.path.isdir(cluster_dir):
203 if not os.path.isdir(cluster_dir):
201 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
204 raise ClusterDirError('Cluster directory not found: %s' % cluster_dir)
202 return ClusterDir(cluster_dir)
205 return ClusterDir(cluster_dir)
203
206
204
207
205 class AppWithClusterDirArgParseConfigLoader(ArgParseConfigLoader):
208 class AppWithClusterDirArgParseConfigLoader(ArgParseConfigLoader):
206 """Default command line options for IPython cluster applications."""
209 """Default command line options for IPython cluster applications."""
207
210
208 def _add_other_arguments(self):
211 def _add_other_arguments(self):
209 self.parser.add_argument('-ipythondir', '--ipython-dir',
212 self.parser.add_argument('-ipythondir', '--ipython-dir',
210 dest='Global.ipythondir',type=str,
213 dest='Global.ipythondir',type=str,
211 help='Set to override default location of Global.ipythondir.',
214 help='Set to override default location of Global.ipythondir.',
212 default=NoConfigDefault,
215 default=NoConfigDefault,
213 metavar='Global.ipythondir')
216 metavar='Global.ipythondir'
217 )
214 self.parser.add_argument('-p','-profile', '--profile',
218 self.parser.add_argument('-p','-profile', '--profile',
215 dest='Global.profile',type=str,
219 dest='Global.profile',type=str,
216 help='The string name of the profile to be used. This determines '
220 help='The string name of the profile to be used. This determines '
217 'the name of the cluster dir as: cluster_<profile>. The default profile '
221 'the name of the cluster dir as: cluster_<profile>. The default profile '
218 'is named "default". The cluster directory is resolve this way '
222 'is named "default". The cluster directory is resolve this way '
219 'if the --cluster-dir option is not used.',
223 'if the --cluster-dir option is not used.',
220 default=NoConfigDefault,
224 default=NoConfigDefault,
221 metavar='Global.profile')
225 metavar='Global.profile'
226 )
222 self.parser.add_argument('-log_level', '--log-level',
227 self.parser.add_argument('-log_level', '--log-level',
223 dest="Global.log_level",type=int,
228 dest="Global.log_level",type=int,
224 help='Set the log level (0,10,20,30,40,50). Default is 30.',
229 help='Set the log level (0,10,20,30,40,50). Default is 30.',
225 default=NoConfigDefault,
230 default=NoConfigDefault,
226 metavar="Global.log_level")
231 metavar="Global.log_level"
232 )
227 self.parser.add_argument('-cluster_dir', '--cluster-dir',
233 self.parser.add_argument('-cluster_dir', '--cluster-dir',
228 dest='Global.cluster_dir',type=str,
234 dest='Global.cluster_dir',type=str,
229 help='Set the cluster dir. This overrides the logic used by the '
235 help='Set the cluster dir. This overrides the logic used by the '
230 '--profile option.',
236 '--profile option.',
231 default=NoConfigDefault,
237 default=NoConfigDefault,
232 metavar='Global.cluster_dir')
238 metavar='Global.cluster_dir'
233
239 )
240 self.parser.add_argument('-clean_logs', '--clean-logs',
241 dest='Global.clean_logs', action='store_true',
242 help='Delete old log flies before starting.',
243 default=NoConfigDefault
244 )
245 self.parser.add_argument('-noclean_logs', '--no-clean-logs',
246 dest='Global.clean_logs', action='store_false',
247 help="Don't Delete old log flies before starting.",
248 default=NoConfigDefault
249 )
234
250
235 class ApplicationWithClusterDir(Application):
251 class ApplicationWithClusterDir(Application):
236 """An application that puts everything into a cluster directory.
252 """An application that puts everything into a cluster directory.
237
253
238 Instead of looking for things in the ipythondir, this type of application
254 Instead of looking for things in the ipythondir, this type of application
239 will use its own private directory called the "cluster directory"
255 will use its own private directory called the "cluster directory"
240 for things like config files, log files, etc.
256 for things like config files, log files, etc.
241
257
242 The cluster directory is resolved as follows:
258 The cluster directory is resolved as follows:
243
259
244 * If the ``--cluster-dir`` option is given, it is used.
260 * If the ``--cluster-dir`` option is given, it is used.
245 * If ``--cluster-dir`` is not given, the application directory is
261 * If ``--cluster-dir`` is not given, the application directory is
246 resolve using the profile name as ``cluster_<profile>``. The search
262 resolve using the profile name as ``cluster_<profile>``. The search
247 path for this directory is then i) cwd if it is found there
263 path for this directory is then i) cwd if it is found there
248 and ii) in ipythondir otherwise.
264 and ii) in ipythondir otherwise.
249
265
250 The config file for the application is to be put in the cluster
266 The config file for the application is to be put in the cluster
251 dir and named the value of the ``config_file_name`` class attribute.
267 dir and named the value of the ``config_file_name`` class attribute.
252 """
268 """
253
269
254 auto_create_cluster_dir = True
270 auto_create_cluster_dir = True
255
271
256 def create_default_config(self):
272 def create_default_config(self):
257 super(ApplicationWithClusterDir, self).create_default_config()
273 super(ApplicationWithClusterDir, self).create_default_config()
258 self.default_config.Global.profile = 'default'
274 self.default_config.Global.profile = 'default'
259 self.default_config.Global.cluster_dir = ''
275 self.default_config.Global.cluster_dir = ''
276 self.default_config.Global.log_to_file = False
277 self.default_config.Global.clean_logs = False
260
278
261 def create_command_line_config(self):
279 def create_command_line_config(self):
262 """Create and return a command line config loader."""
280 """Create and return a command line config loader."""
263 return AppWithClusterDirArgParseConfigLoader(
281 return AppWithClusterDirArgParseConfigLoader(
264 description=self.description,
282 description=self.description,
265 version=release.version
283 version=release.version
266 )
284 )
267
285
268 def find_resources(self):
286 def find_resources(self):
269 """This resolves the cluster directory.
287 """This resolves the cluster directory.
270
288
271 This tries to find the cluster directory and if successful, it will
289 This tries to find the cluster directory and if successful, it will
272 have done:
290 have done:
273 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
291 * Sets ``self.cluster_dir_obj`` to the :class:`ClusterDir` object for
274 the application.
292 the application.
275 * Sets ``self.cluster_dir`` attribute of the application and config
293 * Sets ``self.cluster_dir`` attribute of the application and config
276 objects.
294 objects.
277
295
278 The algorithm used for this is as follows:
296 The algorithm used for this is as follows:
279 1. Try ``Global.cluster_dir``.
297 1. Try ``Global.cluster_dir``.
280 2. Try using ``Global.profile``.
298 2. Try using ``Global.profile``.
281 3. If both of these fail and ``self.auto_create_cluster_dir`` is
299 3. If both of these fail and ``self.auto_create_cluster_dir`` is
282 ``True``, then create the new cluster dir in the IPython directory.
300 ``True``, then create the new cluster dir in the IPython directory.
283 4. If all fails, then raise :class:`ClusterDirError`.
301 4. If all fails, then raise :class:`ClusterDirError`.
284 """
302 """
285
303
286 try:
304 try:
287 cluster_dir = self.command_line_config.Global.cluster_dir
305 cluster_dir = self.command_line_config.Global.cluster_dir
288 except AttributeError:
306 except AttributeError:
289 cluster_dir = self.default_config.Global.cluster_dir
307 cluster_dir = self.default_config.Global.cluster_dir
290 cluster_dir = os.path.expandvars(os.path.expanduser(cluster_dir))
308 cluster_dir = os.path.expandvars(os.path.expanduser(cluster_dir))
291 try:
309 try:
292 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
310 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
293 except ClusterDirError:
311 except ClusterDirError:
294 pass
312 pass
295 else:
313 else:
296 self.log.info('Using existing cluster dir: %s' % \
314 self.log.info('Using existing cluster dir: %s' % \
297 self.cluster_dir_obj.location
315 self.cluster_dir_obj.location
298 )
316 )
299 self.finish_cluster_dir()
317 self.finish_cluster_dir()
300 return
318 return
301
319
302 try:
320 try:
303 self.profile = self.command_line_config.Global.profile
321 self.profile = self.command_line_config.Global.profile
304 except AttributeError:
322 except AttributeError:
305 self.profile = self.default_config.Global.profile
323 self.profile = self.default_config.Global.profile
306 try:
324 try:
307 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
325 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
308 self.ipythondir, self.profile)
326 self.ipythondir, self.profile)
309 except ClusterDirError:
327 except ClusterDirError:
310 pass
328 pass
311 else:
329 else:
312 self.log.info('Using existing cluster dir: %s' % \
330 self.log.info('Using existing cluster dir: %s' % \
313 self.cluster_dir_obj.location
331 self.cluster_dir_obj.location
314 )
332 )
315 self.finish_cluster_dir()
333 self.finish_cluster_dir()
316 return
334 return
317
335
318 if self.auto_create_cluster_dir:
336 if self.auto_create_cluster_dir:
319 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
337 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
320 self.ipythondir, self.profile
338 self.ipythondir, self.profile
321 )
339 )
322 self.log.info('Creating new cluster dir: %s' % \
340 self.log.info('Creating new cluster dir: %s' % \
323 self.cluster_dir_obj.location
341 self.cluster_dir_obj.location
324 )
342 )
325 self.finish_cluster_dir()
343 self.finish_cluster_dir()
326 else:
344 else:
327 raise ClusterDirError('Could not find a valid cluster directory.')
345 raise ClusterDirError('Could not find a valid cluster directory.')
328
346
329 def finish_cluster_dir(self):
347 def finish_cluster_dir(self):
330 # Set the cluster directory
348 # Set the cluster directory
331 self.cluster_dir = self.cluster_dir_obj.location
349 self.cluster_dir = self.cluster_dir_obj.location
332
350
333 # These have to be set because they could be different from the one
351 # These have to be set because they could be different from the one
334 # that we just computed. Because command line has the highest
352 # that we just computed. Because command line has the highest
335 # priority, this will always end up in the master_config.
353 # priority, this will always end up in the master_config.
336 self.default_config.Global.cluster_dir = self.cluster_dir
354 self.default_config.Global.cluster_dir = self.cluster_dir
337 self.command_line_config.Global.cluster_dir = self.cluster_dir
355 self.command_line_config.Global.cluster_dir = self.cluster_dir
338
356
339 # Set the search path to the cluster directory
357 # Set the search path to the cluster directory
340 self.config_file_paths = (self.cluster_dir,)
358 self.config_file_paths = (self.cluster_dir,)
341
359
342 def find_config_file_name(self):
360 def find_config_file_name(self):
343 """Find the config file name for this application."""
361 """Find the config file name for this application."""
344 # For this type of Application it should be set as a class attribute.
362 # For this type of Application it should be set as a class attribute.
345 if not hasattr(self, 'config_file_name'):
363 if not hasattr(self, 'config_file_name'):
346 self.log.critical("No config filename found")
364 self.log.critical("No config filename found")
347
365
348 def find_config_file_paths(self):
366 def find_config_file_paths(self):
349 # Set the search path to the cluster directory
367 # Set the search path to the cluster directory
350 self.config_file_paths = (self.cluster_dir,)
368 self.config_file_paths = (self.cluster_dir,)
351
369
352
370 def pre_construct(self):
371 # The log and security dirs were set earlier, but here we put them
372 # into the config and log them.
373 config = self.master_config
374 sdir = self.cluster_dir_obj.security_dir
375 self.security_dir = config.Global.security_dir = sdir
376 ldir = self.cluster_dir_obj.log_dir
377 self.log_dir = config.Global.log_dir = ldir
378 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
379
380 def start_logging(self):
381 # Remove old log files
382 if self.master_config.Global.clean_logs:
383 log_dir = self.master_config.Global.log_dir
384 for f in os.listdir(log_dir):
385 if f.startswith(self.name + '-') and f.endswith('.log'):
386 os.remove(os.path.join(log_dir, f))
387 # Start logging to the new log file
388 if self.master_config.Global.log_to_file:
389 log_filename = self.name + '-' + str(os.getpid()) + '.log'
390 logfile = os.path.join(self.log_dir, log_filename)
391 open_log_file = open(logfile, 'w')
392 else:
393 open_log_file = sys.stdout
394 log.startLogging(open_log_file)
@@ -1,131 +1,130 b''
1 #!/usr/bin/env python
1 # encoding: utf-8
2 # encoding: utf-8
2
3
3 """A class that manages the engines connection to the controller."""
4 """A class that manages the engines connection to the controller."""
4
5
5 __docformat__ = "restructuredtext en"
6 #-----------------------------------------------------------------------------
6
7 # Copyright (C) 2008-2009 The IPython Development Team
7 #-------------------------------------------------------------------------------
8 # Copyright (C) 2008 The IPython Development Team
9 #
8 #
10 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
12 #-------------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
13
12
14 #-------------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
15 # Imports
14 # Imports
16 #-------------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
17
16
18 import os
17 import os
19 import cPickle as pickle
18 import cPickle as pickle
20
19
21 from twisted.python import log, failure
20 from twisted.python import log, failure
22 from twisted.internet import defer
21 from twisted.internet import defer
23 from twisted.internet.defer import inlineCallbacks, returnValue
22 from twisted.internet.defer import inlineCallbacks, returnValue
24
23
25 from IPython.kernel.fcutil import find_furl
24 from IPython.kernel.fcutil import find_furl
26 from IPython.kernel.enginefc import IFCEngine
25 from IPython.kernel.enginefc import IFCEngine
27 from IPython.kernel.twistedutil import sleep_deferred
26 from IPython.kernel.twistedutil import sleep_deferred
28
27
29 #-------------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
30 # The ClientConnector class
29 # The ClientConnector class
31 #-------------------------------------------------------------------------------
30 #-----------------------------------------------------------------------------
32
31
33
32
34 class EngineConnectorError(Exception):
33 class EngineConnectorError(Exception):
35 pass
34 pass
36
35
37
36
38 class EngineConnector(object):
37 class EngineConnector(object):
39 """Manage an engines connection to a controller.
38 """Manage an engines connection to a controller.
40
39
41 This class takes a foolscap `Tub` and provides a `connect_to_controller`
40 This class takes a foolscap `Tub` and provides a `connect_to_controller`
42 method that will use the `Tub` to connect to a controller and register
41 method that will use the `Tub` to connect to a controller and register
43 the engine with the controller.
42 the engine with the controller.
44 """
43 """
45
44
46 def __init__(self, tub):
45 def __init__(self, tub):
47 self.tub = tub
46 self.tub = tub
48
47
49 def connect_to_controller(self, engine_service, furl_or_file,
48 def connect_to_controller(self, engine_service, furl_or_file,
50 delay=0.1, max_tries=10):
49 delay=0.1, max_tries=10):
51 """
50 """
52 Make a connection to a controller specified by a furl.
51 Make a connection to a controller specified by a furl.
53
52
54 This method takes an `IEngineBase` instance and a foolcap URL and uses
53 This method takes an `IEngineBase` instance and a foolcap URL and uses
55 the `tub` attribute to make a connection to the controller. The
54 the `tub` attribute to make a connection to the controller. The
56 foolscap URL contains all the information needed to connect to the
55 foolscap URL contains all the information needed to connect to the
57 controller, including the ip and port as well as any encryption and
56 controller, including the ip and port as well as any encryption and
58 authentication information needed for the connection.
57 authentication information needed for the connection.
59
58
60 After getting a reference to the controller, this method calls the
59 After getting a reference to the controller, this method calls the
61 `register_engine` method of the controller to actually register the
60 `register_engine` method of the controller to actually register the
62 engine.
61 engine.
63
62
64 This method will try to connect to the controller multiple times with
63 This method will try to connect to the controller multiple times with
65 a delay in between. Each time the FURL file is read anew.
64 a delay in between. Each time the FURL file is read anew.
66
65
67 Parameters
66 Parameters
68 __________
67 __________
69 engine_service : IEngineBase
68 engine_service : IEngineBase
70 An instance of an `IEngineBase` implementer
69 An instance of an `IEngineBase` implementer
71 furl_or_file : str
70 furl_or_file : str
72 A furl or a filename containing a furl
71 A furl or a filename containing a furl
73 delay : float
72 delay : float
74 The intial time to wait between connection attempts. Subsequent
73 The intial time to wait between connection attempts. Subsequent
75 attempts have increasing delays.
74 attempts have increasing delays.
76 max_tries : int
75 max_tries : int
77 The maximum number of connection attempts.
76 The maximum number of connection attempts.
78 """
77 """
79 if not self.tub.running:
78 if not self.tub.running:
80 self.tub.startService()
79 self.tub.startService()
81 self.engine_service = engine_service
80 self.engine_service = engine_service
82 self.engine_reference = IFCEngine(self.engine_service)
81 self.engine_reference = IFCEngine(self.engine_service)
83
82
84 d = self._try_to_connect(furl_or_file, delay, max_tries, attempt=0)
83 d = self._try_to_connect(furl_or_file, delay, max_tries, attempt=0)
85 return d
84 return d
86
85
87 @inlineCallbacks
86 @inlineCallbacks
88 def _try_to_connect(self, furl_or_file, delay, max_tries, attempt):
87 def _try_to_connect(self, furl_or_file, delay, max_tries, attempt):
89 """Try to connect to the controller with retry logic."""
88 """Try to connect to the controller with retry logic."""
90 if attempt < max_tries:
89 if attempt < max_tries:
91 log.msg("Attempting to connect to controller [%r]: %s" % \
90 log.msg("Attempting to connect to controller [%r]: %s" % \
92 (attempt, furl_or_file))
91 (attempt, furl_or_file))
93 try:
92 try:
94 self.furl = find_furl(furl_or_file)
93 self.furl = find_furl(furl_or_file)
95 # Uncomment this to see the FURL being tried.
94 # Uncomment this to see the FURL being tried.
96 # log.msg("FURL: %s" % self.furl)
95 # log.msg("FURL: %s" % self.furl)
97 rr = yield self.tub.getReference(self.furl)
96 rr = yield self.tub.getReference(self.furl)
98 except:
97 except:
99 if attempt==max_tries-1:
98 if attempt==max_tries-1:
100 # This will propagate the exception all the way to the top
99 # This will propagate the exception all the way to the top
101 # where it can be handled.
100 # where it can be handled.
102 raise
101 raise
103 else:
102 else:
104 yield sleep_deferred(delay)
103 yield sleep_deferred(delay)
105 yield self._try_to_connect(
104 yield self._try_to_connect(
106 furl_or_file, 1.5*delay, max_tries, attempt+1
105 furl_or_file, 1.5*delay, max_tries, attempt+1
107 )
106 )
108 else:
107 else:
109 result = yield self._register(rr)
108 result = yield self._register(rr)
110 returnValue(result)
109 returnValue(result)
111 else:
110 else:
112 raise EngineConnectorError(
111 raise EngineConnectorError(
113 'Could not connect to controller, max_tries (%r) exceeded. '
112 'Could not connect to controller, max_tries (%r) exceeded. '
114 'This usually means that i) the controller was not started, '
113 'This usually means that i) the controller was not started, '
115 'or ii) a firewall was blocking the engine from connecting '
114 'or ii) a firewall was blocking the engine from connecting '
116 'to the controller.' % max_tries
115 'to the controller.' % max_tries
117 )
116 )
118
117
119 def _register(self, rr):
118 def _register(self, rr):
120 self.remote_ref = rr
119 self.remote_ref = rr
121 # Now register myself with the controller
120 # Now register myself with the controller
122 desired_id = self.engine_service.id
121 desired_id = self.engine_service.id
123 d = self.remote_ref.callRemote('register_engine', self.engine_reference,
122 d = self.remote_ref.callRemote('register_engine', self.engine_reference,
124 desired_id, os.getpid(), pickle.dumps(self.engine_service.properties,2))
123 desired_id, os.getpid(), pickle.dumps(self.engine_service.properties,2))
125 return d.addCallback(self._reference_sent)
124 return d.addCallback(self._reference_sent)
126
125
127 def _reference_sent(self, registration_dict):
126 def _reference_sent(self, registration_dict):
128 self.engine_service.id = registration_dict['id']
127 self.engine_service.id = registration_dict['id']
129 log.msg("engine registration succeeded, got id: %r" % self.engine_service.id)
128 log.msg("engine registration succeeded, got id: %r" % self.engine_service.id)
130 return self.engine_service.id
129 return self.engine_service.id
131
130
@@ -1,283 +1,306 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import logging
18 import logging
19 import os
19 import os
20 import signal
20 import signal
21 import sys
21 import sys
22
22
23 from IPython.core import release
23 from IPython.core import release
24 from IPython.external import argparse
24 from IPython.external import argparse
25 from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault
25 from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault
26 from IPython.utils.importstring import import_item
26 from IPython.utils.importstring import import_item
27
27
28 from IPython.kernel.clusterdir import (
28 from IPython.kernel.clusterdir import (
29 ApplicationWithClusterDir, ClusterDirError
29 ApplicationWithClusterDir, ClusterDirError
30 )
30 )
31
31
32 from twisted.internet import reactor, defer
32 from twisted.internet import reactor, defer
33 from twisted.python import log
33 from twisted.python import log
34
34
35 #-----------------------------------------------------------------------------
35 #-----------------------------------------------------------------------------
36 # Code for launchers
36 # Code for launchers
37 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
38
38
39
39
40
40
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42 # The ipcluster application
42 # The ipcluster application
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44
44
45
45
46 class IPClusterCLLoader(ArgParseConfigLoader):
46 class IPClusterCLLoader(ArgParseConfigLoader):
47
47
48 def _add_arguments(self):
48 def _add_arguments(self):
49 # This has all the common options that all subcommands use
49 # This has all the common options that all subcommands use
50 parent_parser1 = argparse.ArgumentParser(add_help=False)
50 parent_parser1 = argparse.ArgumentParser(add_help=False)
51 parent_parser1.add_argument('-ipythondir', '--ipython-dir',
51 parent_parser1.add_argument('-ipythondir', '--ipython-dir',
52 dest='Global.ipythondir',type=str,
52 dest='Global.ipythondir',type=str,
53 help='Set to override default location of Global.ipythondir.',
53 help='Set to override default location of Global.ipythondir.',
54 default=NoConfigDefault,
54 default=NoConfigDefault,
55 metavar='Global.ipythondir')
55 metavar='Global.ipythondir')
56 parent_parser1.add_argument('-log_level', '--log-level',
56 parent_parser1.add_argument('-log_level', '--log-level',
57 dest="Global.log_level",type=int,
57 dest="Global.log_level",type=int,
58 help='Set the log level (0,10,20,30,40,50). Default is 30.',
58 help='Set the log level (0,10,20,30,40,50). Default is 30.',
59 default=NoConfigDefault,
59 default=NoConfigDefault,
60 metavar='Global.log_level')
60 metavar='Global.log_level')
61
61
62 # This has all the common options that other subcommands use
62 # This has all the common options that other subcommands use
63 parent_parser2 = argparse.ArgumentParser(add_help=False)
63 parent_parser2 = argparse.ArgumentParser(add_help=False)
64 parent_parser2.add_argument('-p','-profile', '--profile',
64 parent_parser2.add_argument('-p','-profile', '--profile',
65 dest='Global.profile',type=str,
65 dest='Global.profile',type=str,
66 default=NoConfigDefault,
66 default=NoConfigDefault,
67 help='The string name of the profile to be used. This determines '
67 help='The string name of the profile to be used. This determines '
68 'the name of the cluster dir as: cluster_<profile>. The default profile '
68 'the name of the cluster dir as: cluster_<profile>. The default profile '
69 'is named "default". The cluster directory is resolve this way '
69 'is named "default". The cluster directory is resolve this way '
70 'if the --cluster-dir option is not used.',
70 'if the --cluster-dir option is not used.',
71 default=NoConfigDefault,
71 default=NoConfigDefault,
72 metavar='Global.profile')
72 metavar='Global.profile')
73 parent_parser2.add_argument('-cluster_dir', '--cluster-dir',
73 parent_parser2.add_argument('-cluster_dir', '--cluster-dir',
74 dest='Global.cluster_dir',type=str,
74 dest='Global.cluster_dir',type=str,
75 default=NoConfigDefault,
75 default=NoConfigDefault,
76 help='Set the cluster dir. This overrides the logic used by the '
76 help='Set the cluster dir. This overrides the logic used by the '
77 '--profile option.',
77 '--profile option.',
78 default=NoConfigDefault,
78 default=NoConfigDefault,
79 metavar='Global.cluster_dir')
79 metavar='Global.cluster_dir')
80 parent_parser2.add_argument('--log-to-file',
80 parent_parser2.add_argument('--log-to-file',
81 action='store_true', dest='Global.log_to_file',
81 action='store_true', dest='Global.log_to_file',
82 default=NoConfigDefault,
82 default=NoConfigDefault,
83 help='Log to a file in the log directory (default is stdout)'
83 help='Log to a file in the log directory (default is stdout)'
84 )
84 )
85
85
86 subparsers = self.parser.add_subparsers(
86 subparsers = self.parser.add_subparsers(
87 dest='Global.subcommand',
87 dest='Global.subcommand',
88 title='ipcluster subcommands',
88 title='ipcluster subcommands',
89 description='ipcluster has a variety of subcommands. '
89 description='ipcluster has a variety of subcommands. '
90 'The general way of running ipcluster is "ipcluster <cmd> '
90 'The general way of running ipcluster is "ipcluster <cmd> '
91 ' [options]""',
91 ' [options]""',
92 help='For more help, type "ipcluster <cmd> -h"')
92 help='For more help, type "ipcluster <cmd> -h"')
93
93
94 parser_list = subparsers.add_parser(
94 parser_list = subparsers.add_parser(
95 'list',
95 'list',
96 help='List all clusters in cwd and ipythondir.',
96 help='List all clusters in cwd and ipythondir.',
97 parents=[parent_parser1]
97 parents=[parent_parser1]
98 )
98 )
99
99
100 parser_create = subparsers.add_parser(
100 parser_create = subparsers.add_parser(
101 'create',
101 'create',
102 help='Create a new cluster directory.',
102 help='Create a new cluster directory.',
103 parents=[parent_parser1, parent_parser2]
103 parents=[parent_parser1, parent_parser2]
104 )
104 )
105 parser_create.add_argument(
105 parser_create.add_argument(
106 '--reset-config',
106 '--reset-config',
107 dest='Global.reset_config', action='store_true',
107 dest='Global.reset_config', action='store_true',
108 default=NoConfigDefault,
108 default=NoConfigDefault,
109 help='Recopy the default config files to the cluster directory. '
109 help='Recopy the default config files to the cluster directory. '
110 'You will loose any modifications you have made to these files.'
110 'You will loose any modifications you have made to these files.'
111 )
111 )
112
112
113 parser_start = subparsers.add_parser(
113 parser_start = subparsers.add_parser(
114 'start',
114 'start',
115 help='Start a cluster.',
115 help='Start a cluster.',
116 parents=[parent_parser1, parent_parser2]
116 parents=[parent_parser1, parent_parser2]
117 )
117 )
118 parser_start.add_argument(
118 parser_start.add_argument(
119 '-n', '--number',
119 '-n', '--number',
120 type=int, dest='Global.n',
120 type=int, dest='Global.n',
121 default=NoConfigDefault,
121 default=NoConfigDefault,
122 help='The number of engines to start.',
122 help='The number of engines to start.',
123 metavar='Global.n'
123 metavar='Global.n'
124 )
124 )
125
125 parser_start.add_argument('-clean_logs', '--clean-logs',
126 dest='Global.clean_logs', action='store_true',
127 help='Delete old log flies before starting.',
128 default=NoConfigDefault
129 )
130 parser_start.add_argument('-noclean_logs', '--no-clean-logs',
131 dest='Global.clean_logs', action='store_false',
132 help="Don't delete old log flies before starting.",
133 default=NoConfigDefault
134 )
126
135
127 default_config_file_name = 'ipcluster_config.py'
136 default_config_file_name = 'ipcluster_config.py'
128
137
129
138
130 class IPClusterApp(ApplicationWithClusterDir):
139 class IPClusterApp(ApplicationWithClusterDir):
131
140
132 name = 'ipcluster'
141 name = 'ipcluster'
133 description = 'Start an IPython cluster (controller and engines).'
142 description = 'Start an IPython cluster (controller and engines).'
134 config_file_name = default_config_file_name
143 config_file_name = default_config_file_name
135 default_log_level = logging.INFO
144 default_log_level = logging.INFO
136 auto_create_cluster_dir = False
145 auto_create_cluster_dir = False
137
146
138 def create_default_config(self):
147 def create_default_config(self):
139 super(IPClusterApp, self).create_default_config()
148 super(IPClusterApp, self).create_default_config()
140 self.default_config.Global.controller_launcher = \
149 self.default_config.Global.controller_launcher = \
141 'IPython.kernel.launcher.LocalControllerLauncher'
150 'IPython.kernel.launcher.LocalControllerLauncher'
142 self.default_config.Global.engine_launcher = \
151 self.default_config.Global.engine_launcher = \
143 'IPython.kernel.launcher.LocalEngineSetLauncher'
152 'IPython.kernel.launcher.LocalEngineSetLauncher'
144 self.default_config.Global.log_to_file = False
145 self.default_config.Global.n = 2
153 self.default_config.Global.n = 2
146 self.default_config.Global.reset_config = False
154 self.default_config.Global.reset_config = False
155 self.default_config.Global.clean_logs = True
147
156
148 def create_command_line_config(self):
157 def create_command_line_config(self):
149 """Create and return a command line config loader."""
158 """Create and return a command line config loader."""
150 return IPClusterCLLoader(
159 return IPClusterCLLoader(
151 description=self.description,
160 description=self.description,
152 version=release.version
161 version=release.version
153 )
162 )
154
163
155 def find_resources(self):
164 def find_resources(self):
156 subcommand = self.command_line_config.Global.subcommand
165 subcommand = self.command_line_config.Global.subcommand
157 if subcommand=='list':
166 if subcommand=='list':
158 self.list_cluster_dirs()
167 self.list_cluster_dirs()
159 # Exit immediately because there is nothing left to do.
168 # Exit immediately because there is nothing left to do.
160 self.exit()
169 self.exit()
161 elif subcommand=='create':
170 elif subcommand=='create':
162 self.auto_create_cluster_dir = True
171 self.auto_create_cluster_dir = True
163 super(IPClusterApp, self).find_resources()
172 super(IPClusterApp, self).find_resources()
164 elif subcommand=='start':
173 elif subcommand=='start':
165 self.auto_create_cluster_dir = False
174 self.auto_create_cluster_dir = False
166 try:
175 try:
167 super(IPClusterApp, self).find_resources()
176 super(IPClusterApp, self).find_resources()
168 except ClusterDirError:
177 except ClusterDirError:
169 raise ClusterDirError(
178 raise ClusterDirError(
170 "Could not find a cluster directory. A cluster dir must "
179 "Could not find a cluster directory. A cluster dir must "
171 "be created before running 'ipcluster start'. Do "
180 "be created before running 'ipcluster start'. Do "
172 "'ipcluster create -h' or 'ipcluster list -h' for more "
181 "'ipcluster create -h' or 'ipcluster list -h' for more "
173 "information about creating and listing cluster dirs."
182 "information about creating and listing cluster dirs."
174 )
183 )
184
175 def construct(self):
185 def construct(self):
176 config = self.master_config
186 config = self.master_config
177 if config.Global.subcommand=='list':
187 if config.Global.subcommand=='list':
178 pass
188 pass
179 elif config.Global.subcommand=='create':
189 elif config.Global.subcommand=='create':
180 self.log.info('Copying default config files to cluster directory '
190 self.log.info('Copying default config files to cluster directory '
181 '[overwrite=%r]' % (config.Global.reset_config,))
191 '[overwrite=%r]' % (config.Global.reset_config,))
182 self.cluster_dir_obj.copy_all_config_files(overwrite=config.Global.reset_config)
192 self.cluster_dir_obj.copy_all_config_files(overwrite=config.Global.reset_config)
183 elif config.Global.subcommand=='start':
193 elif config.Global.subcommand=='start':
184 self.start_logging()
194 self.start_logging()
185 reactor.callWhenRunning(self.start_launchers)
195 reactor.callWhenRunning(self.start_launchers)
186
196
187 def list_cluster_dirs(self):
197 def list_cluster_dirs(self):
198 # Find the search paths
188 cluster_dir_paths = os.environ.get('IPCLUSTERDIR_PATH','')
199 cluster_dir_paths = os.environ.get('IPCLUSTERDIR_PATH','')
189 if cluster_dir_paths:
200 if cluster_dir_paths:
190 cluster_dir_paths = cluster_dir_paths.split(':')
201 cluster_dir_paths = cluster_dir_paths.split(':')
191 else:
202 else:
192 cluster_dir_paths = []
203 cluster_dir_paths = []
193 # We need to look both in default_config and command_line_config!!!
204 try:
194 paths = [os.getcwd(), self.default_config.Global.ipythondir] + \
205 ipythondir = self.command_line_config.Global.ipythondir
206 except AttributeError:
207 ipythondir = self.default_config.Global.ipythondir
208 paths = [os.getcwd(), ipythondir] + \
195 cluster_dir_paths
209 cluster_dir_paths
210 paths = list(set(paths))
211
196 self.log.info('Searching for cluster dirs in paths: %r' % paths)
212 self.log.info('Searching for cluster dirs in paths: %r' % paths)
197 for path in paths:
213 for path in paths:
198 files = os.listdir(path)
214 files = os.listdir(path)
199 for f in files:
215 for f in files:
200 full_path = os.path.join(path, f)
216 full_path = os.path.join(path, f)
201 if os.path.isdir(full_path) and f.startswith('cluster_'):
217 if os.path.isdir(full_path) and f.startswith('cluster_'):
202 profile = full_path.split('_')[-1]
218 profile = full_path.split('_')[-1]
203 start_cmd = '"ipcluster start -n 4 -p %s"' % profile
219 start_cmd = '"ipcluster start -n 4 -p %s"' % profile
204 print start_cmd + " ==> " + full_path
220 print start_cmd + " ==> " + full_path
205
221
206 def start_logging(self):
207 if self.master_config.Global.log_to_file:
208 log_filename = self.name + '-' + str(os.getpid()) + '.log'
209 logfile = os.path.join(self.log_dir, log_filename)
210 open_log_file = open(logfile, 'w')
211 else:
212 open_log_file = sys.stdout
213 log.startLogging(open_log_file)
214
215 def start_launchers(self):
222 def start_launchers(self):
216 config = self.master_config
223 config = self.master_config
217
224
218 # Create the launchers
225 # Create the launchers
219 el_class = import_item(config.Global.engine_launcher)
226 el_class = import_item(config.Global.engine_launcher)
220 self.engine_launcher = el_class(
227 self.engine_launcher = el_class(
221 self.cluster_dir, config=config
228 self.cluster_dir, config=config
222 )
229 )
223 cl_class = import_item(config.Global.controller_launcher)
230 cl_class = import_item(config.Global.controller_launcher)
224 self.controller_launcher = cl_class(
231 self.controller_launcher = cl_class(
225 self.cluster_dir, config=config
232 self.cluster_dir, config=config
226 )
233 )
227
234
228 # Setup signals
235 # Setup signals
229 signal.signal(signal.SIGINT, self.stop_launchers)
236 signal.signal(signal.SIGINT, self.stop_launchers)
237 # signal.signal(signal.SIGKILL, self.stop_launchers)
230
238
231 # Setup the observing of stopping
239 # Setup the observing of stopping
232 d1 = self.controller_launcher.observe_stop()
240 d1 = self.controller_launcher.observe_stop()
233 d1.addCallback(self.stop_engines)
241 d1.addCallback(self.stop_engines)
234 d1.addErrback(self.err_and_stop)
242 d1.addErrback(self.err_and_stop)
235 # If this triggers, just let them die
243 # If this triggers, just let them die
236 # d2 = self.engine_launcher.observe_stop()
244 # d2 = self.engine_launcher.observe_stop()
237
245
238 # Start the controller and engines
246 # Start the controller and engines
239 d = self.controller_launcher.start(
247 d = self.controller_launcher.start(
240 profile=None, cluster_dir=config.Global.cluster_dir
248 profile=None, cluster_dir=config.Global.cluster_dir
241 )
249 )
242 d.addCallback(lambda _: self.start_engines())
250 d.addCallback(lambda _: self.start_engines())
243 d.addErrback(self.err_and_stop)
251 d.addErrback(self.err_and_stop)
244
252
245 def err_and_stop(self, f):
253 def err_and_stop(self, f):
246 log.msg('Unexpected error in ipcluster:')
254 log.msg('Unexpected error in ipcluster:')
247 log.err(f)
255 log.err(f)
248 reactor.stop()
256 reactor.stop()
249
257
250 def stop_engines(self, r):
258 def stop_engines(self, r):
251 return self.engine_launcher.stop()
259 return self.engine_launcher.stop()
252
260
253 def start_engines(self):
261 def start_engines(self):
254 config = self.master_config
262 config = self.master_config
255 d = self.engine_launcher.start(
263 d = self.engine_launcher.start(
256 config.Global.n,
264 config.Global.n,
257 profile=None, cluster_dir=config.Global.cluster_dir
265 profile=None, cluster_dir=config.Global.cluster_dir
258 )
266 )
259 return d
267 return d
260
268
261 def stop_launchers(self, signum, frame):
269 def stop_launchers(self, signum, frame):
262 log.msg("Stopping cluster")
270 log.msg("Stopping cluster")
263 d1 = self.engine_launcher.stop()
271 d1 = self.engine_launcher.stop()
264 d1.addCallback(lambda _: self.controller_launcher.stop)
272 d2 = self.controller_launcher.stop()
273 # d1.addCallback(lambda _: self.controller_launcher.stop)
265 d1.addErrback(self.err_and_stop)
274 d1.addErrback(self.err_and_stop)
275 d2.addErrback(self.err_and_stop)
266 reactor.callLater(2.0, reactor.stop)
276 reactor.callLater(2.0, reactor.stop)
267
277
278 def start_logging(self):
279 # Remove old log files
280 if self.master_config.Global.clean_logs:
281 log_dir = self.master_config.Global.log_dir
282 for f in os.listdir(log_dir):
283 if f.startswith('ipengine' + '-') and f.endswith('.log'):
284 os.remove(os.path.join(log_dir, f))
285 for f in os.listdir(log_dir):
286 if f.startswith('ipcontroller' + '-') and f.endswith('.log'):
287 os.remove(os.path.join(log_dir, f))
288 super(IPClusterApp, self).start_logging()
289
268 def start_app(self):
290 def start_app(self):
269 config = self.master_config
291 config = self.master_config
270 if config.Global.subcommand=='create' or config.Global.subcommand=='list':
292 if config.Global.subcommand=='create' or config.Global.subcommand=='list':
271 return
293 return
272 elif config.Global.subcommand=='start':
294 elif config.Global.subcommand=='start':
273 reactor.run()
295 reactor.run()
274
296
275
297
276 def launch_new_instance():
298 def launch_new_instance():
277 """Create and run the IPython cluster."""
299 """Create and run the IPython cluster."""
278 app = IPClusterApp()
300 app = IPClusterApp()
279 app.start()
301 app.start()
280
302
281
303
282 if __name__ == '__main__':
304 if __name__ == '__main__':
283 launch_new_instance() No newline at end of file
305 launch_new_instance()
306
@@ -1,275 +1,254 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import copy
18 import copy
19 import os
19 import os
20 import sys
20 import sys
21
21
22 from twisted.application import service
22 from twisted.application import service
23 from twisted.internet import reactor
23 from twisted.internet import reactor
24 from twisted.python import log
24 from twisted.python import log
25
25
26 from IPython.config.loader import Config, NoConfigDefault
26 from IPython.config.loader import Config, NoConfigDefault
27
27
28 from IPython.kernel.clusterdir import (
28 from IPython.kernel.clusterdir import (
29 ApplicationWithClusterDir,
29 ApplicationWithClusterDir,
30 AppWithClusterDirArgParseConfigLoader
30 AppWithClusterDirArgParseConfigLoader
31 )
31 )
32
32
33 from IPython.core import release
33 from IPython.core import release
34
34
35 from IPython.utils.traitlets import Str, Instance
35 from IPython.utils.traitlets import Str, Instance
36
36
37 from IPython.kernel import controllerservice
37 from IPython.kernel import controllerservice
38
38
39 from IPython.kernel.fcutil import FCServiceFactory
39 from IPython.kernel.fcutil import FCServiceFactory
40
40
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42 # Default interfaces
42 # Default interfaces
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44
44
45
45
46 # The default client interfaces for FCClientServiceFactory.interfaces
46 # The default client interfaces for FCClientServiceFactory.interfaces
47 default_client_interfaces = Config()
47 default_client_interfaces = Config()
48 default_client_interfaces.Task.interface_chain = [
48 default_client_interfaces.Task.interface_chain = [
49 'IPython.kernel.task.ITaskController',
49 'IPython.kernel.task.ITaskController',
50 'IPython.kernel.taskfc.IFCTaskController'
50 'IPython.kernel.taskfc.IFCTaskController'
51 ]
51 ]
52
52
53 default_client_interfaces.Task.furl_file = 'ipcontroller-tc.furl'
53 default_client_interfaces.Task.furl_file = 'ipcontroller-tc.furl'
54
54
55 default_client_interfaces.MultiEngine.interface_chain = [
55 default_client_interfaces.MultiEngine.interface_chain = [
56 'IPython.kernel.multiengine.IMultiEngine',
56 'IPython.kernel.multiengine.IMultiEngine',
57 'IPython.kernel.multienginefc.IFCSynchronousMultiEngine'
57 'IPython.kernel.multienginefc.IFCSynchronousMultiEngine'
58 ]
58 ]
59
59
60 default_client_interfaces.MultiEngine.furl_file = 'ipcontroller-mec.furl'
60 default_client_interfaces.MultiEngine.furl_file = 'ipcontroller-mec.furl'
61
61
62 # Make this a dict we can pass to Config.__init__ for the default
62 # Make this a dict we can pass to Config.__init__ for the default
63 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
63 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
64
64
65
65
66
66
67 # The default engine interfaces for FCEngineServiceFactory.interfaces
67 # The default engine interfaces for FCEngineServiceFactory.interfaces
68 default_engine_interfaces = Config()
68 default_engine_interfaces = Config()
69 default_engine_interfaces.Default.interface_chain = [
69 default_engine_interfaces.Default.interface_chain = [
70 'IPython.kernel.enginefc.IFCControllerBase'
70 'IPython.kernel.enginefc.IFCControllerBase'
71 ]
71 ]
72
72
73 default_engine_interfaces.Default.furl_file = 'ipcontroller-engine.furl'
73 default_engine_interfaces.Default.furl_file = 'ipcontroller-engine.furl'
74
74
75 # Make this a dict we can pass to Config.__init__ for the default
75 # Make this a dict we can pass to Config.__init__ for the default
76 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
76 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
77
77
78
78
79 #-----------------------------------------------------------------------------
79 #-----------------------------------------------------------------------------
80 # Service factories
80 # Service factories
81 #-----------------------------------------------------------------------------
81 #-----------------------------------------------------------------------------
82
82
83
83
84 class FCClientServiceFactory(FCServiceFactory):
84 class FCClientServiceFactory(FCServiceFactory):
85 """A Foolscap implementation of the client services."""
85 """A Foolscap implementation of the client services."""
86
86
87 cert_file = Str('ipcontroller-client.pem', config=True)
87 cert_file = Str('ipcontroller-client.pem', config=True)
88 interfaces = Instance(klass=Config, kw=default_client_interfaces,
88 interfaces = Instance(klass=Config, kw=default_client_interfaces,
89 allow_none=False, config=True)
89 allow_none=False, config=True)
90
90
91
91
92 class FCEngineServiceFactory(FCServiceFactory):
92 class FCEngineServiceFactory(FCServiceFactory):
93 """A Foolscap implementation of the engine services."""
93 """A Foolscap implementation of the engine services."""
94
94
95 cert_file = Str('ipcontroller-engine.pem', config=True)
95 cert_file = Str('ipcontroller-engine.pem', config=True)
96 interfaces = Instance(klass=dict, kw=default_engine_interfaces,
96 interfaces = Instance(klass=dict, kw=default_engine_interfaces,
97 allow_none=False, config=True)
97 allow_none=False, config=True)
98
98
99
99
100 #-----------------------------------------------------------------------------
100 #-----------------------------------------------------------------------------
101 # The main application
101 # The main application
102 #-----------------------------------------------------------------------------
102 #-----------------------------------------------------------------------------
103
103
104
104
105 cl_args = (
105 cl_args = (
106 # Client config
106 # Client config
107 (('--client-ip',), dict(
107 (('--client-ip',), dict(
108 type=str, dest='FCClientServiceFactory.ip', default=NoConfigDefault,
108 type=str, dest='FCClientServiceFactory.ip', default=NoConfigDefault,
109 help='The IP address or hostname the controller will listen on for '
109 help='The IP address or hostname the controller will listen on for '
110 'client connections.',
110 'client connections.',
111 metavar='FCClientServiceFactory.ip')
111 metavar='FCClientServiceFactory.ip')
112 ),
112 ),
113 (('--client-port',), dict(
113 (('--client-port',), dict(
114 type=int, dest='FCClientServiceFactory.port', default=NoConfigDefault,
114 type=int, dest='FCClientServiceFactory.port', default=NoConfigDefault,
115 help='The port the controller will listen on for client connections. '
115 help='The port the controller will listen on for client connections. '
116 'The default is to use 0, which will autoselect an open port.',
116 'The default is to use 0, which will autoselect an open port.',
117 metavar='FCClientServiceFactory.port')
117 metavar='FCClientServiceFactory.port')
118 ),
118 ),
119 (('--client-location',), dict(
119 (('--client-location',), dict(
120 type=str, dest='FCClientServiceFactory.location', default=NoConfigDefault,
120 type=str, dest='FCClientServiceFactory.location', default=NoConfigDefault,
121 help='The hostname or IP that clients should connect to. This does '
121 help='The hostname or IP that clients should connect to. This does '
122 'not control which interface the controller listens on. Instead, this '
122 'not control which interface the controller listens on. Instead, this '
123 'determines the hostname/IP that is listed in the FURL, which is how '
123 'determines the hostname/IP that is listed in the FURL, which is how '
124 'clients know where to connect. Useful if the controller is listening '
124 'clients know where to connect. Useful if the controller is listening '
125 'on multiple interfaces.',
125 'on multiple interfaces.',
126 metavar='FCClientServiceFactory.location')
126 metavar='FCClientServiceFactory.location')
127 ),
127 ),
128 # Engine config
128 # Engine config
129 (('--engine-ip',), dict(
129 (('--engine-ip',), dict(
130 type=str, dest='FCEngineServiceFactory.ip', default=NoConfigDefault,
130 type=str, dest='FCEngineServiceFactory.ip', default=NoConfigDefault,
131 help='The IP address or hostname the controller will listen on for '
131 help='The IP address or hostname the controller will listen on for '
132 'engine connections.',
132 'engine connections.',
133 metavar='FCEngineServiceFactory.ip')
133 metavar='FCEngineServiceFactory.ip')
134 ),
134 ),
135 (('--engine-port',), dict(
135 (('--engine-port',), dict(
136 type=int, dest='FCEngineServiceFactory.port', default=NoConfigDefault,
136 type=int, dest='FCEngineServiceFactory.port', default=NoConfigDefault,
137 help='The port the controller will listen on for engine connections. '
137 help='The port the controller will listen on for engine connections. '
138 'The default is to use 0, which will autoselect an open port.',
138 'The default is to use 0, which will autoselect an open port.',
139 metavar='FCEngineServiceFactory.port')
139 metavar='FCEngineServiceFactory.port')
140 ),
140 ),
141 (('--engine-location',), dict(
141 (('--engine-location',), dict(
142 type=str, dest='FCEngineServiceFactory.location', default=NoConfigDefault,
142 type=str, dest='FCEngineServiceFactory.location', default=NoConfigDefault,
143 help='The hostname or IP that engines should connect to. This does '
143 help='The hostname or IP that engines should connect to. This does '
144 'not control which interface the controller listens on. Instead, this '
144 'not control which interface the controller listens on. Instead, this '
145 'determines the hostname/IP that is listed in the FURL, which is how '
145 'determines the hostname/IP that is listed in the FURL, which is how '
146 'engines know where to connect. Useful if the controller is listening '
146 'engines know where to connect. Useful if the controller is listening '
147 'on multiple interfaces.',
147 'on multiple interfaces.',
148 metavar='FCEngineServiceFactory.location')
148 metavar='FCEngineServiceFactory.location')
149 ),
149 ),
150 # Global config
150 # Global config
151 (('--log-to-file',), dict(
151 (('--log-to-file',), dict(
152 action='store_true', dest='Global.log_to_file', default=NoConfigDefault,
152 action='store_true', dest='Global.log_to_file', default=NoConfigDefault,
153 help='Log to a file in the log directory (default is stdout)')
153 help='Log to a file in the log directory (default is stdout)')
154 ),
154 ),
155 (('-r','--reuse-furls'), dict(
155 (('-r','--reuse-furls'), dict(
156 action='store_true', dest='Global.reuse_furls', default=NoConfigDefault,
156 action='store_true', dest='Global.reuse_furls', default=NoConfigDefault,
157 help='Try to reuse all FURL files. If this is not set all FURL files '
157 help='Try to reuse all FURL files. If this is not set all FURL files '
158 'are deleted before the controller starts. This must be set if '
158 'are deleted before the controller starts. This must be set if '
159 'specific ports are specified by --engine-port or --client-port.')
159 'specific ports are specified by --engine-port or --client-port.')
160 ),
160 ),
161 (('-ns','--no-security'), dict(
161 (('-ns','--no-security'), dict(
162 action='store_false', dest='Global.secure', default=NoConfigDefault,
162 action='store_false', dest='Global.secure', default=NoConfigDefault,
163 help='Turn off SSL encryption for all connections.')
163 help='Turn off SSL encryption for all connections.')
164 )
164 )
165 )
165 )
166
166
167
167
168 class IPControllerAppCLConfigLoader(AppWithClusterDirArgParseConfigLoader):
168 class IPControllerAppCLConfigLoader(AppWithClusterDirArgParseConfigLoader):
169
169
170 arguments = cl_args
170 arguments = cl_args
171
171
172
172
173 default_config_file_name = 'ipcontroller_config.py'
173 default_config_file_name = 'ipcontroller_config.py'
174
174
175
175
176 class IPControllerApp(ApplicationWithClusterDir):
176 class IPControllerApp(ApplicationWithClusterDir):
177
177
178 name = 'ipcontroller'
178 name = 'ipcontroller'
179 description = 'Start the IPython controller for parallel computing.'
179 description = 'Start the IPython controller for parallel computing.'
180 config_file_name = default_config_file_name
180 config_file_name = default_config_file_name
181 auto_create_cluster_dir = True
181 auto_create_cluster_dir = True
182
182
183 def create_default_config(self):
183 def create_default_config(self):
184 super(IPControllerApp, self).create_default_config()
184 super(IPControllerApp, self).create_default_config()
185 self.default_config.Global.reuse_furls = False
185 self.default_config.Global.reuse_furls = False
186 self.default_config.Global.secure = True
186 self.default_config.Global.secure = True
187 self.default_config.Global.import_statements = []
187 self.default_config.Global.import_statements = []
188 self.default_config.Global.log_to_file = False
188 self.default_config.Global.clean_logs = True
189
189
190 def create_command_line_config(self):
190 def create_command_line_config(self):
191 """Create and return a command line config loader."""
191 """Create and return a command line config loader."""
192 return IPControllerAppCLConfigLoader(
192 return IPControllerAppCLConfigLoader(
193 description=self.description,
193 description=self.description,
194 version=release.version
194 version=release.version
195 )
195 )
196
196
197 def post_load_command_line_config(self):
197 def post_load_command_line_config(self):
198 # Now setup reuse_furls
198 # Now setup reuse_furls
199 c = self.command_line_config
199 c = self.command_line_config
200 if hasattr(c.Global, 'reuse_furls'):
200 if hasattr(c.Global, 'reuse_furls'):
201 c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
201 c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
202 c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
202 c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
203 del c.Global.reuse_furls
203 del c.Global.reuse_furls
204 if hasattr(c.Global, 'secure'):
204 if hasattr(c.Global, 'secure'):
205 c.FCClientServiceFactory.secure = c.Global.secure
205 c.FCClientServiceFactory.secure = c.Global.secure
206 c.FCEngineServiceFactory.secure = c.Global.secure
206 c.FCEngineServiceFactory.secure = c.Global.secure
207 del c.Global.secure
207 del c.Global.secure
208
208
209 def pre_construct(self):
210 # The log and security dirs were set earlier, but here we put them
211 # into the config and log them.
212 config = self.master_config
213 sdir = self.cluster_dir_obj.security_dir
214 self.security_dir = config.Global.security_dir = sdir
215 ldir = self.cluster_dir_obj.log_dir
216 self.log_dir = config.Global.log_dir = ldir
217 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
218 self.log.info("Log directory set to: %s" % self.log_dir)
219 self.log.info("Security directory set to: %s" % self.security_dir)
220
221 def construct(self):
209 def construct(self):
222 # I am a little hesitant to put these into InteractiveShell itself.
210 # I am a little hesitant to put these into InteractiveShell itself.
223 # But that might be the place for them
211 # But that might be the place for them
224 sys.path.insert(0, '')
212 sys.path.insert(0, '')
225
213
226 self.start_logging()
214 self.start_logging()
227 self.import_statements()
215 self.import_statements()
228
216
229 # Create the service hierarchy
217 # Create the service hierarchy
230 self.main_service = service.MultiService()
218 self.main_service = service.MultiService()
231 # The controller service
219 # The controller service
232 controller_service = controllerservice.ControllerService()
220 controller_service = controllerservice.ControllerService()
233 controller_service.setServiceParent(self.main_service)
221 controller_service.setServiceParent(self.main_service)
234 # The client tub and all its refereceables
222 # The client tub and all its refereceables
235 csfactory = FCClientServiceFactory(self.master_config, controller_service)
223 csfactory = FCClientServiceFactory(self.master_config, controller_service)
236 client_service = csfactory.create()
224 client_service = csfactory.create()
237 client_service.setServiceParent(self.main_service)
225 client_service.setServiceParent(self.main_service)
238 # The engine tub
226 # The engine tub
239 esfactory = FCEngineServiceFactory(self.master_config, controller_service)
227 esfactory = FCEngineServiceFactory(self.master_config, controller_service)
240 engine_service = esfactory.create()
228 engine_service = esfactory.create()
241 engine_service.setServiceParent(self.main_service)
229 engine_service.setServiceParent(self.main_service)
242
230
243 def start_logging(self):
244 if self.master_config.Global.log_to_file:
245 log_filename = self.name + '-' + str(os.getpid()) + '.log'
246 logfile = os.path.join(self.log_dir, log_filename)
247 open_log_file = open(logfile, 'w')
248 else:
249 open_log_file = sys.stdout
250 log.startLogging(open_log_file)
251
252 def import_statements(self):
231 def import_statements(self):
253 statements = self.master_config.Global.import_statements
232 statements = self.master_config.Global.import_statements
254 for s in statements:
233 for s in statements:
255 try:
234 try:
256 log.msg("Executing statement: '%s'" % s)
235 log.msg("Executing statement: '%s'" % s)
257 exec s in globals(), locals()
236 exec s in globals(), locals()
258 except:
237 except:
259 log.msg("Error running statement: %s" % s)
238 log.msg("Error running statement: %s" % s)
260
239
261 def start_app(self):
240 def start_app(self):
262 # Start the controller service and set things running
241 # Start the controller service and set things running
263 self.main_service.startService()
242 self.main_service.startService()
264 reactor.run()
243 reactor.run()
265
244
266
245
267 def launch_new_instance():
246 def launch_new_instance():
268 """Create and run the IPython controller"""
247 """Create and run the IPython controller"""
269 app = IPControllerApp()
248 app = IPControllerApp()
270 app.start()
249 app.start()
271
250
272
251
273 if __name__ == '__main__':
252 if __name__ == '__main__':
274 launch_new_instance()
253 launch_new_instance()
275
254
@@ -1,249 +1,239 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application
4 The IPython controller application
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import sys
19 import sys
20
20
21 from twisted.application import service
21 from twisted.application import service
22 from twisted.internet import reactor
22 from twisted.internet import reactor
23 from twisted.python import log
23 from twisted.python import log
24
24
25 from IPython.config.loader import NoConfigDefault
25 from IPython.config.loader import NoConfigDefault
26
26
27 from IPython.kernel.clusterdir import (
27 from IPython.kernel.clusterdir import (
28 ApplicationWithClusterDir,
28 ApplicationWithClusterDir,
29 AppWithClusterDirArgParseConfigLoader
29 AppWithClusterDirArgParseConfigLoader
30 )
30 )
31 from IPython.core import release
31 from IPython.core import release
32
32
33 from IPython.utils.importstring import import_item
33 from IPython.utils.importstring import import_item
34
34
35 from IPython.kernel.engineservice import EngineService
35 from IPython.kernel.engineservice import EngineService
36 from IPython.kernel.fcutil import Tub
36 from IPython.kernel.fcutil import Tub
37 from IPython.kernel.engineconnector import EngineConnector
37 from IPython.kernel.engineconnector import EngineConnector
38
38
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40 # The main application
40 # The main application
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42
42
43
43
44 cl_args = (
44 cl_args = (
45 # Controller config
45 # Controller config
46 (('--furl-file',), dict(
46 (('--furl-file',), dict(
47 type=str, dest='Global.furl_file', default=NoConfigDefault,
47 type=str, dest='Global.furl_file', default=NoConfigDefault,
48 help='The full location of the file containing the FURL of the '
48 help='The full location of the file containing the FURL of the '
49 'controller. If this is not given, the FURL file must be in the '
49 'controller. If this is not given, the FURL file must be in the '
50 'security directory of the cluster directory. This location is '
50 'security directory of the cluster directory. This location is '
51 'resolved using the --profile and --app-dir options.',
51 'resolved using the --profile and --app-dir options.',
52 metavar='Global.furl_file')
52 metavar='Global.furl_file')
53 ),
53 ),
54 # MPI
54 # MPI
55 (('--mpi',), dict(
55 (('--mpi',), dict(
56 type=str, dest='MPI.use', default=NoConfigDefault,
56 type=str, dest='MPI.use', default=NoConfigDefault,
57 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).',
57 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).',
58 metavar='MPI.use')
58 metavar='MPI.use')
59 ),
59 ),
60 # Global config
60 # Global config
61 (('--log-to-file',), dict(
61 (('--log-to-file',), dict(
62 action='store_true', dest='Global.log_to_file', default=NoConfigDefault,
62 action='store_true', dest='Global.log_to_file', default=NoConfigDefault,
63 help='Log to a file in the log directory (default is stdout)')
63 help='Log to a file in the log directory (default is stdout)')
64 )
64 )
65 )
65 )
66
66
67
67
68 class IPEngineAppCLConfigLoader(AppWithClusterDirArgParseConfigLoader):
68 class IPEngineAppCLConfigLoader(AppWithClusterDirArgParseConfigLoader):
69
69
70 arguments = cl_args
70 arguments = cl_args
71
71
72
72
73 mpi4py_init = """from mpi4py import MPI as mpi
73 mpi4py_init = """from mpi4py import MPI as mpi
74 mpi.size = mpi.COMM_WORLD.Get_size()
74 mpi.size = mpi.COMM_WORLD.Get_size()
75 mpi.rank = mpi.COMM_WORLD.Get_rank()
75 mpi.rank = mpi.COMM_WORLD.Get_rank()
76 """
76 """
77
77
78 pytrilinos_init = """from PyTrilinos import Epetra
78 pytrilinos_init = """from PyTrilinos import Epetra
79 class SimpleStruct:
79 class SimpleStruct:
80 pass
80 pass
81 mpi = SimpleStruct()
81 mpi = SimpleStruct()
82 mpi.rank = 0
82 mpi.rank = 0
83 mpi.size = 0
83 mpi.size = 0
84 """
84 """
85
85
86
86
87 default_config_file_name = 'ipengine_config.py'
87 default_config_file_name = 'ipengine_config.py'
88
88
89
89
90 class IPEngineApp(ApplicationWithClusterDir):
90 class IPEngineApp(ApplicationWithClusterDir):
91
91
92 name = 'ipengine'
92 name = 'ipengine'
93 description = 'Start the IPython engine for parallel computing.'
93 description = 'Start the IPython engine for parallel computing.'
94 config_file_name = default_config_file_name
94 config_file_name = default_config_file_name
95 auto_create_cluster_dir = True
95 auto_create_cluster_dir = True
96
96
97 def create_default_config(self):
97 def create_default_config(self):
98 super(IPEngineApp, self).create_default_config()
98 super(IPEngineApp, self).create_default_config()
99
99
100 # The engine should not clean logs as we don't want to remove the
101 # active log files of other running engines.
102 self.default_config.Global.clean_logs = False
103
100 # Global config attributes
104 # Global config attributes
101 self.default_config.Global.log_to_file = False
102 self.default_config.Global.exec_lines = []
105 self.default_config.Global.exec_lines = []
103 # The log and security dir names must match that of the controller
104 self.default_config.Global.log_dir_name = 'log'
105 self.default_config.Global.security_dir_name = 'security'
106 self.default_config.Global.shell_class = 'IPython.kernel.core.interpreter.Interpreter'
106 self.default_config.Global.shell_class = 'IPython.kernel.core.interpreter.Interpreter'
107
107
108 # Configuration related to the controller
108 # Configuration related to the controller
109 # This must match the filename (path not included) that the controller
109 # This must match the filename (path not included) that the controller
110 # used for the FURL file.
110 # used for the FURL file.
111 self.default_config.Global.furl_file_name = 'ipcontroller-engine.furl'
111 self.default_config.Global.furl_file_name = 'ipcontroller-engine.furl'
112 # If given, this is the actual location of the controller's FURL file.
112 # If given, this is the actual location of the controller's FURL file.
113 # If not, this is computed using the profile, app_dir and furl_file_name
113 # If not, this is computed using the profile, app_dir and furl_file_name
114 self.default_config.Global.furl_file = ''
114 self.default_config.Global.furl_file = ''
115
115
116 # The max number of connection attemps and the initial delay between
117 # those attemps.
118 self.default_config.Global.connect_delay = 0.1
119 self.default_config.Global.connect_max_tries = 15
120
116 # MPI related config attributes
121 # MPI related config attributes
117 self.default_config.MPI.use = ''
122 self.default_config.MPI.use = ''
118 self.default_config.MPI.mpi4py = mpi4py_init
123 self.default_config.MPI.mpi4py = mpi4py_init
119 self.default_config.MPI.pytrilinos = pytrilinos_init
124 self.default_config.MPI.pytrilinos = pytrilinos_init
120
125
121 def create_command_line_config(self):
126 def create_command_line_config(self):
122 """Create and return a command line config loader."""
127 """Create and return a command line config loader."""
123 return IPEngineAppCLConfigLoader(
128 return IPEngineAppCLConfigLoader(
124 description=self.description,
129 description=self.description,
125 version=release.version
130 version=release.version
126 )
131 )
127
132
128 def post_load_command_line_config(self):
133 def post_load_command_line_config(self):
129 pass
134 pass
130
135
131 def pre_construct(self):
136 def pre_construct(self):
132 config = self.master_config
137 super(IPEngineApp, self).pre_construct()
133 sdir = self.cluster_dir_obj.security_dir
134 self.security_dir = config.Global.security_dir = sdir
135 ldir = self.cluster_dir_obj.log_dir
136 self.log_dir = config.Global.log_dir = ldir
137 self.log.info("Cluster directory set to: %s" % self.cluster_dir)
138 self.log.info("Log directory set to: %s" % self.log_dir)
139 self.log.info("Security directory set to: %s" % self.security_dir)
140
141 self.find_cont_furl_file()
138 self.find_cont_furl_file()
142
139
143 def find_cont_furl_file(self):
140 def find_cont_furl_file(self):
144 """Set the furl file.
141 """Set the furl file.
145
142
146 Here we don't try to actually see if it exists for is valid as that
143 Here we don't try to actually see if it exists for is valid as that
147 is hadled by the connection logic.
144 is hadled by the connection logic.
148 """
145 """
149 config = self.master_config
146 config = self.master_config
150 # Find the actual controller FURL file
147 # Find the actual controller FURL file
151 if not config.Global.furl_file:
148 if not config.Global.furl_file:
152 try_this = os.path.join(
149 try_this = os.path.join(
153 config.Global.cluster_dir,
150 config.Global.cluster_dir,
154 config.Global.security_dir,
151 config.Global.security_dir,
155 config.Global.furl_file_name
152 config.Global.furl_file_name
156 )
153 )
157 config.Global.furl_file = try_this
154 config.Global.furl_file = try_this
158
155
159 def construct(self):
156 def construct(self):
160 # I am a little hesitant to put these into InteractiveShell itself.
157 # I am a little hesitant to put these into InteractiveShell itself.
161 # But that might be the place for them
158 # But that might be the place for them
162 sys.path.insert(0, '')
159 sys.path.insert(0, '')
163
160
164 self.start_mpi()
161 self.start_mpi()
165 self.start_logging()
162 self.start_logging()
166
163
167 # Create the underlying shell class and EngineService
164 # Create the underlying shell class and EngineService
168 shell_class = import_item(self.master_config.Global.shell_class)
165 shell_class = import_item(self.master_config.Global.shell_class)
169 self.engine_service = EngineService(shell_class, mpi=mpi)
166 self.engine_service = EngineService(shell_class, mpi=mpi)
170
167
171 self.exec_lines()
168 self.exec_lines()
172
169
173 # Create the service hierarchy
170 # Create the service hierarchy
174 self.main_service = service.MultiService()
171 self.main_service = service.MultiService()
175 self.engine_service.setServiceParent(self.main_service)
172 self.engine_service.setServiceParent(self.main_service)
176 self.tub_service = Tub()
173 self.tub_service = Tub()
177 self.tub_service.setServiceParent(self.main_service)
174 self.tub_service.setServiceParent(self.main_service)
178 # This needs to be called before the connection is initiated
175 # This needs to be called before the connection is initiated
179 self.main_service.startService()
176 self.main_service.startService()
180
177
181 # This initiates the connection to the controller and calls
178 # This initiates the connection to the controller and calls
182 # register_engine to tell the controller we are ready to do work
179 # register_engine to tell the controller we are ready to do work
183 self.engine_connector = EngineConnector(self.tub_service)
180 self.engine_connector = EngineConnector(self.tub_service)
184
181
185 log.msg("Using furl file: %s" % self.master_config.Global.furl_file)
182 log.msg("Using furl file: %s" % self.master_config.Global.furl_file)
186
183
187 reactor.callWhenRunning(self.call_connect)
184 reactor.callWhenRunning(self.call_connect)
188
185
189 def call_connect(self):
186 def call_connect(self):
190 d = self.engine_connector.connect_to_controller(
187 d = self.engine_connector.connect_to_controller(
191 self.engine_service,
188 self.engine_service,
192 self.master_config.Global.furl_file
189 self.master_config.Global.furl_file,
190 self.master_config.Global.connect_delay,
191 self.master_config.Global.connect_max_tries
193 )
192 )
194
193
195 def handle_error(f):
194 def handle_error(f):
196 log.msg('Error connecting to controller. This usually means that '
195 log.msg('Error connecting to controller. This usually means that '
197 'i) the controller was not started, ii) a firewall was blocking '
196 'i) the controller was not started, ii) a firewall was blocking '
198 'the engine from connecting to the controller or iii) the engine '
197 'the engine from connecting to the controller or iii) the engine '
199 ' was not pointed at the right FURL file:')
198 ' was not pointed at the right FURL file:')
200 log.msg(f.getErrorMessage())
199 log.msg(f.getErrorMessage())
201 reactor.callLater(0.1, reactor.stop)
200 reactor.callLater(0.1, reactor.stop)
202
201
203 d.addErrback(handle_error)
202 d.addErrback(handle_error)
204
203
205 def start_mpi(self):
204 def start_mpi(self):
206 global mpi
205 global mpi
207 mpikey = self.master_config.MPI.use
206 mpikey = self.master_config.MPI.use
208 mpi_import_statement = self.master_config.MPI.get(mpikey, None)
207 mpi_import_statement = self.master_config.MPI.get(mpikey, None)
209 if mpi_import_statement is not None:
208 if mpi_import_statement is not None:
210 try:
209 try:
211 self.log.info("Initializing MPI:")
210 self.log.info("Initializing MPI:")
212 self.log.info(mpi_import_statement)
211 self.log.info(mpi_import_statement)
213 exec mpi_import_statement in globals()
212 exec mpi_import_statement in globals()
214 except:
213 except:
215 mpi = None
214 mpi = None
216 else:
215 else:
217 mpi = None
216 mpi = None
218
217
219 def start_logging(self):
220 if self.master_config.Global.log_to_file:
221 log_filename = self.name + '-' + str(os.getpid()) + '.log'
222 logfile = os.path.join(self.log_dir, log_filename)
223 open_log_file = open(logfile, 'w')
224 else:
225 open_log_file = sys.stdout
226 log.startLogging(open_log_file)
227
228 def exec_lines(self):
218 def exec_lines(self):
229 for line in self.master_config.Global.exec_lines:
219 for line in self.master_config.Global.exec_lines:
230 try:
220 try:
231 log.msg("Executing statement: '%s'" % line)
221 log.msg("Executing statement: '%s'" % line)
232 self.engine_service.execute(line)
222 self.engine_service.execute(line)
233 except:
223 except:
234 log.msg("Error executing statement: %s" % line)
224 log.msg("Error executing statement: %s" % line)
235
225
236 def start_app(self):
226 def start_app(self):
237 # Start the controller service and set things running
227 # Start the controller service and set things running
238 reactor.run()
228 reactor.run()
239
229
240
230
241 def launch_new_instance():
231 def launch_new_instance():
242 """Create and run the IPython controller"""
232 """Create and run the IPython controller"""
243 app = IPEngineApp()
233 app = IPEngineApp()
244 app.start()
234 app.start()
245
235
246
236
247 if __name__ == '__main__':
237 if __name__ == '__main__':
248 launch_new_instance()
238 launch_new_instance()
249
239
@@ -1,585 +1,628 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching processing asynchronously.
4 Facilities for launching processing asynchronously.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import re
19 import re
20 import sys
20 import sys
21
21
22 from IPython.core.component import Component
22 from IPython.core.component import Component
23 from IPython.external import Itpl
23 from IPython.external import Itpl
24 from IPython.utils.traitlets import Str, Int, List, Unicode
24 from IPython.utils.traitlets import Str, Int, List, Unicode
25 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
25 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
26
26
27 from twisted.internet import reactor, defer
27 from twisted.internet import reactor, defer
28 from twisted.internet.defer import inlineCallbacks
28 from twisted.internet.defer import inlineCallbacks
29 from twisted.internet.protocol import ProcessProtocol
29 from twisted.internet.protocol import ProcessProtocol
30 from twisted.internet.utils import getProcessOutput
30 from twisted.internet.utils import getProcessOutput
31 from twisted.internet.error import ProcessDone, ProcessTerminated
31 from twisted.internet.error import ProcessDone, ProcessTerminated
32 from twisted.python import log
32 from twisted.python import log
33 from twisted.python.failure import Failure
33 from twisted.python.failure import Failure
34
34
35 #-----------------------------------------------------------------------------
35 #-----------------------------------------------------------------------------
36 # Generic launchers
36 # Generic launchers
37 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
38
38
39
39
40 class LauncherError(Exception):
40 class LauncherError(Exception):
41 pass
41 pass
42
42
43
43
44 class ProcessStateError(LauncherError):
44 class ProcessStateError(LauncherError):
45 pass
45 pass
46
46
47
47
48 class UnknownStatus(LauncherError):
48 class UnknownStatus(LauncherError):
49 pass
49 pass
50
50
51
51
52 class BaseLauncher(Component):
52 class BaseLauncher(Component):
53 """An asbtraction for starting, stopping and signaling a process."""
53 """An asbtraction for starting, stopping and signaling a process."""
54
54
55 working_dir = Unicode(u'')
55 working_dir = Unicode(u'')
56
56
57 def __init__(self, working_dir, parent=None, name=None, config=None):
57 def __init__(self, working_dir, parent=None, name=None, config=None):
58 super(BaseLauncher, self).__init__(parent, name, config)
58 super(BaseLauncher, self).__init__(parent, name, config)
59 self.working_dir = working_dir
59 self.working_dir = working_dir
60 self.state = 'before' # can be before, running, after
60 self.state = 'before' # can be before, running, after
61 self.stop_deferreds = []
61 self.stop_deferreds = []
62 self.start_data = None
62 self.start_data = None
63 self.stop_data = None
63 self.stop_data = None
64
64
65 @property
65 @property
66 def args(self):
66 def args(self):
67 """A list of cmd and args that will be used to start the process."""
67 """A list of cmd and args that will be used to start the process."""
68 return self.find_args()
68 return self.find_args()
69
69
70 def find_args(self):
70 def find_args(self):
71 """The ``.args`` property calls this to find the args list."""
71 """The ``.args`` property calls this to find the args list."""
72 raise NotImplementedError('find_args must be implemented in a subclass')
72 raise NotImplementedError('find_args must be implemented in a subclass')
73
73
74 @property
74 @property
75 def arg_str(self):
75 def arg_str(self):
76 """The string form of the program arguments."""
76 """The string form of the program arguments."""
77 return ' '.join(self.args)
77 return ' '.join(self.args)
78
78
79 @property
79 @property
80 def running(self):
80 def running(self):
81 if self.state == 'running':
81 if self.state == 'running':
82 return True
82 return True
83 else:
83 else:
84 return False
84 return False
85
85
86 def start(self):
86 def start(self):
87 """Start the process.
87 """Start the process.
88
88
89 This must return a deferred that fires with information about the
89 This must return a deferred that fires with information about the
90 process starting (like a pid, job id, etc.)
90 process starting (like a pid, job id, etc.)
91 """
91 """
92 return defer.fail(
92 return defer.fail(
93 Failure(NotImplementedError(
93 Failure(NotImplementedError(
94 'start must be implemented in a subclass')
94 'start must be implemented in a subclass')
95 )
95 )
96 )
96 )
97
97
98 def stop(self):
98 def stop(self):
99 """Stop the process and notify observers of ProcessStopped.
99 """Stop the process and notify observers of ProcessStopped.
100
100
101 This must return a deferred that fires with any errors that occur
101 This must return a deferred that fires with any errors that occur
102 while the process is attempting to be shut down. This deferred
102 while the process is attempting to be shut down. This deferred
103 won't fire when the process actually stops. These events are
103 won't fire when the process actually stops. These events are
104 handled by calling :func:`observe_stop`.
104 handled by calling :func:`observe_stop`.
105 """
105 """
106 return defer.fail(
106 return defer.fail(
107 Failure(NotImplementedError(
107 Failure(NotImplementedError(
108 'stop must be implemented in a subclass')
108 'stop must be implemented in a subclass')
109 )
109 )
110 )
110 )
111
111
112 def observe_stop(self):
112 def observe_stop(self):
113 """Get a deferred that will fire when the process stops.
113 """Get a deferred that will fire when the process stops.
114
114
115 The deferred will fire with data that contains information about
115 The deferred will fire with data that contains information about
116 the exit status of the process.
116 the exit status of the process.
117 """
117 """
118 if self.state=='after':
118 if self.state=='after':
119 return defer.succeed(self.stop_data)
119 return defer.succeed(self.stop_data)
120 else:
120 else:
121 d = defer.Deferred()
121 d = defer.Deferred()
122 self.stop_deferreds.append(d)
122 self.stop_deferreds.append(d)
123 return d
123 return d
124
124
125 def notify_start(self, data):
125 def notify_start(self, data):
126 """Call this to tigger startup actions.
126 """Call this to tigger startup actions.
127
127
128 This logs the process startup and sets the state to running. It is
128 This logs the process startup and sets the state to running. It is
129 a pass-through so it can be used as a callback.
129 a pass-through so it can be used as a callback.
130 """
130 """
131
131
132 log.msg('Process %r started: %r' % (self.args[0], data))
132 log.msg('Process %r started: %r' % (self.args[0], data))
133 self.start_data = data
133 self.start_data = data
134 self.state = 'running'
134 self.state = 'running'
135 return data
135 return data
136
136
137 def notify_stop(self, data):
137 def notify_stop(self, data):
138 """Call this to trigger all the deferreds from :func:`observe_stop`."""
138 """Call this to trigger all the deferreds from :func:`observe_stop`."""
139
139
140 log.msg('Process %r stopped: %r' % (self.args[0], data))
140 log.msg('Process %r stopped: %r' % (self.args[0], data))
141 self.stop_data = data
141 self.stop_data = data
142 self.state = 'after'
142 self.state = 'after'
143 for i in range(len(self.stop_deferreds)):
143 for i in range(len(self.stop_deferreds)):
144 d = self.stop_deferreds.pop()
144 d = self.stop_deferreds.pop()
145 d.callback(data)
145 d.callback(data)
146 return data
146 return data
147
147
148 def signal(self, sig):
148 def signal(self, sig):
149 """Signal the process.
149 """Signal the process.
150
150
151 Return a semi-meaningless deferred after signaling the process.
151 Return a semi-meaningless deferred after signaling the process.
152
152
153 Parameters
153 Parameters
154 ----------
154 ----------
155 sig : str or int
155 sig : str or int
156 'KILL', 'INT', etc., or any signal number
156 'KILL', 'INT', etc., or any signal number
157 """
157 """
158 return defer.fail(
158 return defer.fail(
159 Failure(NotImplementedError(
159 Failure(NotImplementedError(
160 'signal must be implemented in a subclass')
160 'signal must be implemented in a subclass')
161 )
161 )
162 )
162 )
163
163
164
164
165 class LocalProcessLauncherProtocol(ProcessProtocol):
165 class LocalProcessLauncherProtocol(ProcessProtocol):
166 """A ProcessProtocol to go with the LocalProcessLauncher."""
166 """A ProcessProtocol to go with the LocalProcessLauncher."""
167
167
168 def __init__(self, process_launcher):
168 def __init__(self, process_launcher):
169 self.process_launcher = process_launcher
169 self.process_launcher = process_launcher
170 self.pid = None
170 self.pid = None
171
171
172 def connectionMade(self):
172 def connectionMade(self):
173 self.pid = self.transport.pid
173 self.pid = self.transport.pid
174 self.process_launcher.notify_start(self.transport.pid)
174 self.process_launcher.notify_start(self.transport.pid)
175
175
176 def processEnded(self, status):
176 def processEnded(self, status):
177 value = status.value
177 value = status.value
178 if isinstance(value, ProcessDone):
178 if isinstance(value, ProcessDone):
179 self.process_launcher.notify_stop(
179 self.process_launcher.notify_stop(
180 {'exit_code':0,
180 {'exit_code':0,
181 'signal':None,
181 'signal':None,
182 'status':None,
182 'status':None,
183 'pid':self.pid
183 'pid':self.pid
184 }
184 }
185 )
185 )
186 elif isinstance(value, ProcessTerminated):
186 elif isinstance(value, ProcessTerminated):
187 self.process_launcher.notify_stop(
187 self.process_launcher.notify_stop(
188 {'exit_code':value.exitCode,
188 {'exit_code':value.exitCode,
189 'signal':value.signal,
189 'signal':value.signal,
190 'status':value.status,
190 'status':value.status,
191 'pid':self.pid
191 'pid':self.pid
192 }
192 }
193 )
193 )
194 else:
194 else:
195 raise UnknownStatus("Unknown exit status, this is probably a "
195 raise UnknownStatus("Unknown exit status, this is probably a "
196 "bug in Twisted")
196 "bug in Twisted")
197
197
198 def outReceived(self, data):
198 def outReceived(self, data):
199 log.msg(data)
199 log.msg(data)
200
200
201 def errReceived(self, data):
201 def errReceived(self, data):
202 log.err(data)
202 log.err(data)
203
203
204
204
205 class LocalProcessLauncher(BaseLauncher):
205 class LocalProcessLauncher(BaseLauncher):
206 """Start and stop an external process in an asynchronous manner."""
206 """Start and stop an external process in an asynchronous manner."""
207
207
208 cmd_and_args = List([])
208 cmd_and_args = List([])
209
209
210 def __init__(self, working_dir, parent=None, name=None, config=None):
210 def __init__(self, working_dir, parent=None, name=None, config=None):
211 super(LocalProcessLauncher, self).__init__(
211 super(LocalProcessLauncher, self).__init__(
212 working_dir, parent, name, config
212 working_dir, parent, name, config
213 )
213 )
214 self.process_protocol = None
214 self.process_protocol = None
215 self.start_deferred = None
215 self.start_deferred = None
216
216
217 def find_args(self):
217 def find_args(self):
218 return self.cmd_and_args
218 return self.cmd_and_args
219
219
220 def start(self):
220 def start(self):
221 if self.state == 'before':
221 if self.state == 'before':
222 self.process_protocol = LocalProcessLauncherProtocol(self)
222 self.process_protocol = LocalProcessLauncherProtocol(self)
223 self.start_deferred = defer.Deferred()
223 self.start_deferred = defer.Deferred()
224 self.process_transport = reactor.spawnProcess(
224 self.process_transport = reactor.spawnProcess(
225 self.process_protocol,
225 self.process_protocol,
226 str(self.args[0]),
226 str(self.args[0]),
227 [str(a) for a in self.args],
227 [str(a) for a in self.args],
228 env=os.environ
228 env=os.environ
229 )
229 )
230 return self.start_deferred
230 return self.start_deferred
231 else:
231 else:
232 s = 'The process was already started and has state: %r' % self.state
232 s = 'The process was already started and has state: %r' % self.state
233 return defer.fail(ProcessStateError(s))
233 return defer.fail(ProcessStateError(s))
234
234
235 def notify_start(self, data):
235 def notify_start(self, data):
236 super(LocalProcessLauncher, self).notify_start(data)
236 super(LocalProcessLauncher, self).notify_start(data)
237 self.start_deferred.callback(data)
237 self.start_deferred.callback(data)
238
238
239 def stop(self):
239 def stop(self):
240 return self.interrupt_then_kill()
240 return self.interrupt_then_kill()
241
241
242 @make_deferred
242 @make_deferred
243 def signal(self, sig):
243 def signal(self, sig):
244 if self.state == 'running':
244 if self.state == 'running':
245 self.process_transport.signalProcess(sig)
245 self.process_transport.signalProcess(sig)
246
246
247 @inlineCallbacks
247 @inlineCallbacks
248 def interrupt_then_kill(self, delay=1.0):
248 def interrupt_then_kill(self, delay=1.0):
249 yield self.signal('INT')
249 yield self.signal('INT')
250 yield sleep_deferred(delay)
250 yield sleep_deferred(delay)
251 yield self.signal('KILL')
251 yield self.signal('KILL')
252
252
253
253
254 class MPIExecLauncher(LocalProcessLauncher):
254 class MPIExecLauncher(LocalProcessLauncher):
255
255
256 mpi_cmd = List(['mpiexec'], config=True)
256 mpi_cmd = List(['mpiexec'], config=True)
257 mpi_args = List([], config=True)
257 mpi_args = List([], config=True)
258 program = List(['date'], config=True)
258 program = List(['date'], config=True)
259 program_args = List([], config=True)
259 program_args = List([], config=True)
260 n = Int(1, config=True)
260 n = Int(1, config=True)
261
261
262 def find_args(self):
262 def find_args(self):
263 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
263 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
264 self.program + self.program_args
264 self.program + self.program_args
265
265
266 def start(self, n):
266 def start(self, n):
267 self.n = n
267 self.n = n
268 return super(MPIExecLauncher, self).start()
268 return super(MPIExecLauncher, self).start()
269
269
270
270
271 class SSHLauncher(BaseLauncher):
271 class SSHLauncher(BaseLauncher):
272 """A minimal launcher for ssh.
272 """A minimal launcher for ssh.
273
273
274 To be useful this will probably have to be extended to use the ``sshx``
274 To be useful this will probably have to be extended to use the ``sshx``
275 idea for environment variables. There could be other things this needs
275 idea for environment variables. There could be other things this needs
276 as well.
276 as well.
277 """
277 """
278
278
279 ssh_cmd = List(['ssh'], config=True)
279 ssh_cmd = List(['ssh'], config=True)
280 ssh_args = List([], config=True)
280 ssh_args = List([], config=True)
281 program = List(['date'], config=True)
281 program = List(['date'], config=True)
282 program_args = List([], config=True)
282 program_args = List([], config=True)
283 hostname = Str('', config=True)
283 hostname = Str('', config=True)
284 user = Str(os.environ['USER'], config=True)
284 user = Str(os.environ['USER'], config=True)
285 location = Str('')
285 location = Str('')
286
286
287 def _hostname_changed(self, name, old, new):
287 def _hostname_changed(self, name, old, new):
288 self.location = '%s@%s' % (self.user, new)
288 self.location = '%s@%s' % (self.user, new)
289
289
290 def _user_changed(self, name, old, new):
290 def _user_changed(self, name, old, new):
291 self.location = '%s@%s' % (new, self.hostname)
291 self.location = '%s@%s' % (new, self.hostname)
292
292
293 def find_args(self):
293 def find_args(self):
294 return self.ssh_cmd + self.ssh_args + [self.location] + \
294 return self.ssh_cmd + self.ssh_args + [self.location] + \
295 self.program + self.program_args
295 self.program + self.program_args
296
296
297 def start(self, n, hostname=None, user=None):
297 def start(self, n, hostname=None, user=None):
298 if hostname is not None:
298 if hostname is not None:
299 self.hostname = hostname
299 self.hostname = hostname
300 if user is not None:
300 if user is not None:
301 self.user = user
301 self.user = user
302 return super(SSHLauncher, self).start()
302 return super(SSHLauncher, self).start()
303
303
304
304
305 class WindowsHPCLauncher(BaseLauncher):
305 class WindowsHPCLauncher(BaseLauncher):
306 pass
306 pass
307
307
308
308
309 class BatchSystemLauncher(BaseLauncher):
309 class BatchSystemLauncher(BaseLauncher):
310
310
311 # Subclasses must fill these in. See PBSEngineSet
311 # Subclasses must fill these in. See PBSEngineSet
312 submit_command = Str('', config=True)
312 submit_command = Str('', config=True)
313 delete_command = Str('', config=True)
313 delete_command = Str('', config=True)
314 job_id_regexp = Str('', config=True)
314 job_id_regexp = Str('', config=True)
315 batch_template = Str('', config=True)
315 batch_template = Str('', config=True)
316 batch_file_name = Unicode(u'batch_script', config=True)
316 batch_file_name = Unicode(u'batch_script', config=True)
317 batch_file = Unicode(u'')
317 batch_file = Unicode(u'')
318
318
319 def __init__(self, working_dir, parent=None, name=None, config=None):
319 def __init__(self, working_dir, parent=None, name=None, config=None):
320 super(BatchSystemLauncher, self).__init__(
320 super(BatchSystemLauncher, self).__init__(
321 working_dir, parent, name, config
321 working_dir, parent, name, config
322 )
322 )
323 self.batch_file = os.path.join(self.working_dir, self.batch_file_name)
323 self.batch_file = os.path.join(self.working_dir, self.batch_file_name)
324 self.context = {}
324 self.context = {}
325
325
326 def parse_job_id(self, output):
326 def parse_job_id(self, output):
327 m = re.match(self.job_id_regexp, output)
327 m = re.match(self.job_id_regexp, output)
328 if m is not None:
328 if m is not None:
329 job_id = m.group()
329 job_id = m.group()
330 else:
330 else:
331 raise LauncherError("Job id couldn't be determined: %s" % output)
331 raise LauncherError("Job id couldn't be determined: %s" % output)
332 self.job_id = job_id
332 self.job_id = job_id
333 log.msg('Job started with job id: %r' % job_id)
333 log.msg('Job started with job id: %r' % job_id)
334 return job_id
334 return job_id
335
335
336 def write_batch_script(self, n):
336 def write_batch_script(self, n):
337 self.context['n'] = n
337 self.context['n'] = n
338 script_as_string = Itpl.itplns(self.batch_template, self.context)
338 script_as_string = Itpl.itplns(self.batch_template, self.context)
339 log.msg('Writing instantiated batch script: %s' % self.batch_file)
339 log.msg('Writing instantiated batch script: %s' % self.batch_file)
340 f = open(self.batch_file, 'w')
340 f = open(self.batch_file, 'w')
341 f.write(script_as_string)
341 f.write(script_as_string)
342 f.close()
342 f.close()
343
343
344 @inlineCallbacks
344 @inlineCallbacks
345 def start(self, n):
345 def start(self, n):
346 """Start n copies of the process using a batch system."""
346 """Start n copies of the process using a batch system."""
347 self.write_batch_script(n)
347 self.write_batch_script(n)
348 output = yield getProcessOutput(self.submit_command,
348 output = yield getProcessOutput(self.submit_command,
349 [self.batch_file], env=os.environ)
349 [self.batch_file], env=os.environ)
350 job_id = self.parse_job_id(output)
350 job_id = self.parse_job_id(output)
351 self.notify_start(job_id)
351 self.notify_start(job_id)
352 defer.returnValue(job_id)
352 defer.returnValue(job_id)
353
353
354 @inlineCallbacks
354 @inlineCallbacks
355 def stop(self):
355 def stop(self):
356 output = yield getProcessOutput(self.delete_command,
356 output = yield getProcessOutput(self.delete_command,
357 [self.job_id], env=os.environ
357 [self.job_id], env=os.environ
358 )
358 )
359 self.notify_stop(output) # Pass the output of the kill cmd
359 self.notify_stop(output) # Pass the output of the kill cmd
360 defer.returnValue(output)
360 defer.returnValue(output)
361
361
362
362
363 class PBSLauncher(BatchSystemLauncher):
363 class PBSLauncher(BatchSystemLauncher):
364
364
365 submit_command = Str('qsub', config=True)
365 submit_command = Str('qsub', config=True)
366 delete_command = Str('qdel', config=True)
366 delete_command = Str('qdel', config=True)
367 job_id_regexp = Str('\d+', config=True)
367 job_id_regexp = Str('\d+', config=True)
368 batch_template = Str('', config=True)
368 batch_template = Str('', config=True)
369 batch_file_name = Unicode(u'pbs_batch_script', config=True)
369 batch_file_name = Unicode(u'pbs_batch_script', config=True)
370 batch_file = Unicode(u'')
370 batch_file = Unicode(u'')
371
371
372
372
373 #-----------------------------------------------------------------------------
373 #-----------------------------------------------------------------------------
374 # Controller launchers
374 # Controller launchers
375 #-----------------------------------------------------------------------------
375 #-----------------------------------------------------------------------------
376
376
377 def find_controller_cmd():
377 def find_controller_cmd():
378 if sys.platform == 'win32':
378 if sys.platform == 'win32':
379 # This logic is needed because the ipcontroller script doesn't
379 # This logic is needed because the ipcontroller script doesn't
380 # always get installed in the same way or in the same location.
380 # always get installed in the same way or in the same location.
381 from IPython.kernel import ipcontrollerapp
381 from IPython.kernel import ipcontrollerapp
382 script_location = ipcontrollerapp.__file__.replace('.pyc', '.py')
382 script_location = ipcontrollerapp.__file__.replace('.pyc', '.py')
383 # The -u option here turns on unbuffered output, which is required
383 # The -u option here turns on unbuffered output, which is required
384 # on Win32 to prevent wierd conflict and problems with Twisted.
384 # on Win32 to prevent wierd conflict and problems with Twisted.
385 # Also, use sys.executable to make sure we are picking up the
385 # Also, use sys.executable to make sure we are picking up the
386 # right python exe.
386 # right python exe.
387 cmd = [sys.executable, '-u', script_location]
387 cmd = [sys.executable, '-u', script_location]
388 else:
388 else:
389 # ipcontroller has to be on the PATH in this case.
389 # ipcontroller has to be on the PATH in this case.
390 cmd = ['ipcontroller']
390 cmd = ['ipcontroller']
391 return cmd
391 return cmd
392
392
393
393
394 class LocalControllerLauncher(LocalProcessLauncher):
394 class LocalControllerLauncher(LocalProcessLauncher):
395
395
396 controller_cmd = List(find_controller_cmd())
396 controller_cmd = List(find_controller_cmd())
397 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
397 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
398
398
399 def find_args(self):
399 def find_args(self):
400 return self.controller_cmd + self.controller_args
400 return self.controller_cmd + self.controller_args
401
401
402 def start(self, profile=None, cluster_dir=None):
402 def start(self, profile=None, cluster_dir=None):
403 if cluster_dir is not None:
403 if cluster_dir is not None:
404 self.controller_args.extend(['--cluster-dir', cluster_dir])
404 self.controller_args.extend(['--cluster-dir', cluster_dir])
405 if profile is not None:
405 if profile is not None:
406 self.controller_args.extend(['--profile', profile])
406 self.controller_args.extend(['--profile', profile])
407 log.msg("Starting LocalControllerLauncher: %r" % self.args)
407 log.msg("Starting LocalControllerLauncher: %r" % self.args)
408 return super(LocalControllerLauncher, self).start()
408 return super(LocalControllerLauncher, self).start()
409
409
410
410
411 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
411 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
412 pass
412 pass
413
413
414
414
415 class MPIExecControllerLauncher(MPIExecLauncher):
415 class MPIExecControllerLauncher(MPIExecLauncher):
416
416
417 controller_cmd = List(find_controller_cmd(), config=False)
417 controller_cmd = List(find_controller_cmd(), config=False)
418 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
418 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
419 n = Int(1, config=False)
419 n = Int(1, config=False)
420
420
421 def start(self, profile=None, cluster_dir=None):
421 def start(self, profile=None, cluster_dir=None):
422 if cluster_dir is not None:
422 if cluster_dir is not None:
423 self.controller_args.extend(['--cluster-dir', cluster_dir])
423 self.controller_args.extend(['--cluster-dir', cluster_dir])
424 if profile is not None:
424 if profile is not None:
425 self.controller_args.extend(['--profile', profile])
425 self.controller_args.extend(['--profile', profile])
426 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
426 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
427 return super(MPIExecControllerLauncher, self).start(1)
427 return super(MPIExecControllerLauncher, self).start(1)
428
428
429
429
430 def find_args(self):
430 def find_args(self):
431 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
431 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
432 self.controller_cmd + self.controller_args
432 self.controller_cmd + self.controller_args
433
433
434
434
435 class PBSControllerLauncher(PBSLauncher):
435 class PBSControllerLauncher(PBSLauncher):
436
436
437 def start(self, profile=None, cluster_dir=None):
437 def start(self, profile=None, cluster_dir=None):
438 # Here we save profile and cluster_dir in the context so they
438 # Here we save profile and cluster_dir in the context so they
439 # can be used in the batch script template as ${profile} and
439 # can be used in the batch script template as ${profile} and
440 # ${cluster_dir}
440 # ${cluster_dir}
441 if cluster_dir is not None:
441 if cluster_dir is not None:
442 self.context['cluster_dir'] = cluster_dir
442 self.context['cluster_dir'] = cluster_dir
443 if profile is not None:
443 if profile is not None:
444 self.context['profile'] = profile
444 self.context['profile'] = profile
445 log.msg("Starting PBSControllerLauncher: %r" % self.args)
445 log.msg("Starting PBSControllerLauncher: %r" % self.args)
446 return super(PBSControllerLauncher, self).start(1)
446 return super(PBSControllerLauncher, self).start(1)
447
447
448
448
449 class SSHControllerLauncher(SSHLauncher):
449 class SSHControllerLauncher(SSHLauncher):
450 pass
450 pass
451
451
452
452
453 #-----------------------------------------------------------------------------
453 #-----------------------------------------------------------------------------
454 # Engine launchers
454 # Engine launchers
455 #-----------------------------------------------------------------------------
455 #-----------------------------------------------------------------------------
456
456
457
457
458 def find_engine_cmd():
458 def find_engine_cmd():
459 if sys.platform == 'win32':
459 if sys.platform == 'win32':
460 # This logic is needed because the ipengine script doesn't
460 # This logic is needed because the ipengine script doesn't
461 # always get installed in the same way or in the same location.
461 # always get installed in the same way or in the same location.
462 from IPython.kernel import ipengineapp
462 from IPython.kernel import ipengineapp
463 script_location = ipengineapp.__file__.replace('.pyc', '.py')
463 script_location = ipengineapp.__file__.replace('.pyc', '.py')
464 # The -u option here turns on unbuffered output, which is required
464 # The -u option here turns on unbuffered output, which is required
465 # on Win32 to prevent wierd conflict and problems with Twisted.
465 # on Win32 to prevent wierd conflict and problems with Twisted.
466 # Also, use sys.executable to make sure we are picking up the
466 # Also, use sys.executable to make sure we are picking up the
467 # right python exe.
467 # right python exe.
468 cmd = [sys.executable, '-u', script_location]
468 cmd = [sys.executable, '-u', script_location]
469 else:
469 else:
470 # ipcontroller has to be on the PATH in this case.
470 # ipcontroller has to be on the PATH in this case.
471 cmd = ['ipengine']
471 cmd = ['ipengine']
472 return cmd
472 return cmd
473
473
474
474
475 class LocalEngineLauncher(LocalProcessLauncher):
475 class LocalEngineLauncher(LocalProcessLauncher):
476
476
477 engine_cmd = List(find_engine_cmd())
477 engine_cmd = List(find_engine_cmd())
478 engine_args = List(['--log-to-file','--log-level', '40'], config=True)
478 engine_args = List(
479 ['--log-to-file','--log-level', '40'], config=True
480 )
479
481
480 def find_args(self):
482 def find_args(self):
481 return self.engine_cmd + self.engine_args
483 return self.engine_cmd + self.engine_args
482
484
483 def start(self, profile=None, cluster_dir=None):
485 def start(self, profile=None, cluster_dir=None):
484 if cluster_dir is not None:
486 if cluster_dir is not None:
485 self.engine_args.extend(['--cluster-dir', cluster_dir])
487 self.engine_args.extend(['--cluster-dir', cluster_dir])
486 if profile is not None:
488 if profile is not None:
487 self.engine_args.extend(['--profile', profile])
489 self.engine_args.extend(['--profile', profile])
488 return super(LocalEngineLauncher, self).start()
490 return super(LocalEngineLauncher, self).start()
489
491
490
492
491 class LocalEngineSetLauncher(BaseLauncher):
493 class LocalEngineSetLauncher(BaseLauncher):
492
494
493 engine_args = List(['--log-to-file','--log-level', '40'], config=True)
495 engine_args = List(
496 ['--log-to-file','--log-level', '40'], config=True
497 )
494
498
495 def __init__(self, working_dir, parent=None, name=None, config=None):
499 def __init__(self, working_dir, parent=None, name=None, config=None):
496 super(LocalEngineSetLauncher, self).__init__(
500 super(LocalEngineSetLauncher, self).__init__(
497 working_dir, parent, name, config
501 working_dir, parent, name, config
498 )
502 )
499 self.launchers = []
503 self.launchers = []
500
504
501 def start(self, n, profile=None, cluster_dir=None):
505 def start(self, n, profile=None, cluster_dir=None):
502 dlist = []
506 dlist = []
503 for i in range(n):
507 for i in range(n):
504 el = LocalEngineLauncher(self.working_dir, self)
508 el = LocalEngineLauncher(self.working_dir, self)
505 # Copy the engine args over to each engine launcher.
509 # Copy the engine args over to each engine launcher.
506 import copy
510 import copy
507 el.engine_args = copy.deepcopy(self.engine_args)
511 el.engine_args = copy.deepcopy(self.engine_args)
508 d = el.start(profile, cluster_dir)
512 d = el.start(profile, cluster_dir)
509 if i==0:
513 if i==0:
510 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
514 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
511 self.launchers.append(el)
515 self.launchers.append(el)
512 dlist.append(d)
516 dlist.append(d)
513 # The consumeErrors here could be dangerous
517 # The consumeErrors here could be dangerous
514 dfinal = gatherBoth(dlist, consumeErrors=True)
518 dfinal = gatherBoth(dlist, consumeErrors=True)
515 dfinal.addCallback(self.notify_start)
519 dfinal.addCallback(self.notify_start)
516 return dfinal
520 return dfinal
517
521
518 def find_args(self):
522 def find_args(self):
519 return ['engine set']
523 return ['engine set']
520
524
521 def signal(self, sig):
525 def signal(self, sig):
522 dlist = []
526 dlist = []
523 for el in self.launchers:
527 for el in self.launchers:
524 d = el.signal(sig)
528 d = el.signal(sig)
525 dlist.append(d)
529 dlist.append(d)
526 dfinal = gatherBoth(dlist, consumeErrors=True)
530 dfinal = gatherBoth(dlist, consumeErrors=True)
527 return dfinal
531 return dfinal
528
532
529 def interrupt_then_kill(self, delay=1.0):
533 def interrupt_then_kill(self, delay=1.0):
530 dlist = []
534 dlist = []
531 for el in self.launchers:
535 for el in self.launchers:
532 d = el.interrupt_then_kill(delay)
536 d = el.interrupt_then_kill(delay)
533 dlist.append(d)
537 dlist.append(d)
534 dfinal = gatherBoth(dlist, consumeErrors=True)
538 dfinal = gatherBoth(dlist, consumeErrors=True)
535 return dfinal
539 return dfinal
536
540
537 def stop(self):
541 def stop(self):
538 return self.interrupt_then_kill()
542 return self.interrupt_then_kill()
539
543
540 def observe_stop(self):
544 def observe_stop(self):
541 dlist = [el.observe_stop() for el in self.launchers]
545 dlist = [el.observe_stop() for el in self.launchers]
542 dfinal = gatherBoth(dlist, consumeErrors=False)
546 dfinal = gatherBoth(dlist, consumeErrors=False)
543 dfinal.addCallback(self.notify_stop)
547 dfinal.addCallback(self.notify_stop)
544 return dfinal
548 return dfinal
545
549
546
550
547 class MPIExecEngineSetLauncher(MPIExecLauncher):
551 class MPIExecEngineSetLauncher(MPIExecLauncher):
548
552
549 engine_cmd = List(find_engine_cmd(), config=False)
553 engine_cmd = List(find_engine_cmd(), config=False)
550 engine_args = List(['--log-to-file','--log-level', '40'], config=True)
554 engine_args = List(
555 ['--log-to-file','--log-level', '40'], config=True
556 )
551 n = Int(1, config=True)
557 n = Int(1, config=True)
552
558
553 def start(self, n, profile=None, cluster_dir=None):
559 def start(self, n, profile=None, cluster_dir=None):
554 if cluster_dir is not None:
560 if cluster_dir is not None:
555 self.engine_args.extend(['--cluster-dir', cluster_dir])
561 self.engine_args.extend(['--cluster-dir', cluster_dir])
556 if profile is not None:
562 if profile is not None:
557 self.engine_args.extend(['--profile', profile])
563 self.engine_args.extend(['--profile', profile])
558 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
564 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
559 return super(MPIExecEngineSetLauncher, self).start(n)
565 return super(MPIExecEngineSetLauncher, self).start(n)
560
566
561 def find_args(self):
567 def find_args(self):
562 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
568 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
563 self.engine_cmd + self.engine_args
569 self.engine_cmd + self.engine_args
564
570
565
571
566 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
572 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
567 pass
573 pass
568
574
569
575
570 class PBSEngineSetLauncher(PBSLauncher):
576 class PBSEngineSetLauncher(PBSLauncher):
571
577
572 def start(self, n, profile=None, cluster_dir=None):
578 def start(self, n, profile=None, cluster_dir=None):
573 if cluster_dir is not None:
579 if cluster_dir is not None:
574 self.program_args.extend(['--cluster-dir', cluster_dir])
580 self.program_args.extend(['--cluster-dir', cluster_dir])
575 if profile is not None:
581 if profile is not None:
576 self.program_args.extend(['-p', profile])
582 self.program_args.extend(['-p', profile])
577 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
583 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
578 return super(PBSEngineSetLauncher, self).start(n)
584 return super(PBSEngineSetLauncher, self).start(n)
579
585
580
586
581 class SSHEngineSetLauncher(BaseLauncher):
587 class SSHEngineSetLauncher(BaseLauncher):
582 pass
588 pass
583
589
584
590
591 #-----------------------------------------------------------------------------
592 # A launcher for ipcluster itself!
593 #-----------------------------------------------------------------------------
594
595
596 def find_ipcluster_cmd():
597 if sys.platform == 'win32':
598 # This logic is needed because the ipcluster script doesn't
599 # always get installed in the same way or in the same location.
600 from IPython.kernel import ipclusterapp
601 script_location = ipclusterapp.__file__.replace('.pyc', '.py')
602 # The -u option here turns on unbuffered output, which is required
603 # on Win32 to prevent wierd conflict and problems with Twisted.
604 # Also, use sys.executable to make sure we are picking up the
605 # right python exe.
606 cmd = [sys.executable, '-u', script_location]
607 else:
608 # ipcontroller has to be on the PATH in this case.
609 cmd = ['ipcluster']
610 return cmd
611
612
613 class IPClusterLauncher(LocalProcessLauncher):
614
615 ipcluster_cmd = List(find_ipcluster_cmd())
616 ipcluster_args = List(
617 ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True)
618 ipcluster_subcommand = Str('start')
619 ipcluster_n = Int(2)
620
621 def find_args(self):
622 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
623 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
624
625 def start(self):
626 log.msg("Starting ipcluster: %r" % self.args)
627 return super(IPClusterLauncher, self).start()
585
628
@@ -1,265 +1,263 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Things directly related to all of twisted."""
4 """Things directly related to all of twisted."""
5
5
6 __docformat__ = "restructuredtext en"
6 #-----------------------------------------------------------------------------
7
7 # Copyright (C) 2008-2009 The IPython Development Team
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2008 The IPython Development Team
10 #
8 #
11 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
14
12
15 #-------------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
16 # Imports
14 # Imports
17 #-------------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
18
16
19 import os, sys
17 import os, sys
20 import threading, Queue, atexit
18 import threading, Queue, atexit
21
19
22 import twisted
20 import twisted
23 from twisted.internet import defer, reactor
21 from twisted.internet import defer, reactor
24 from twisted.python import log, failure
22 from twisted.python import log, failure
25
23
26 from IPython.kernel.error import FileTimeoutError
24 from IPython.kernel.error import FileTimeoutError
27
25
28 #-------------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
29 # Classes related to twisted and threads
27 # Classes related to twisted and threads
30 #-------------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
31
29
32
30
33 class ReactorInThread(threading.Thread):
31 class ReactorInThread(threading.Thread):
34 """Run the twisted reactor in a different thread.
32 """Run the twisted reactor in a different thread.
35
33
36 For the process to be able to exit cleanly, do the following:
34 For the process to be able to exit cleanly, do the following:
37
35
38 rit = ReactorInThread()
36 rit = ReactorInThread()
39 rit.setDaemon(True)
37 rit.setDaemon(True)
40 rit.start()
38 rit.start()
41
39
42 """
40 """
43
41
44 def run(self):
42 def run(self):
45 reactor.run(installSignalHandlers=0)
43 reactor.run(installSignalHandlers=0)
46 # self.join()
44 # self.join()
47
45
48 def stop(self):
46 def stop(self):
49 # I don't think this does anything useful.
47 # I don't think this does anything useful.
50 blockingCallFromThread(reactor.stop)
48 blockingCallFromThread(reactor.stop)
51 self.join()
49 self.join()
52
50
53 if(twisted.version.major >= 8):
51 if(twisted.version.major >= 8):
54 import twisted.internet.threads
52 import twisted.internet.threads
55 def blockingCallFromThread(f, *a, **kw):
53 def blockingCallFromThread(f, *a, **kw):
56 """
54 """
57 Run a function in the reactor from a thread, and wait for the result
55 Run a function in the reactor from a thread, and wait for the result
58 synchronously, i.e. until the callback chain returned by the function get a
56 synchronously, i.e. until the callback chain returned by the function get a
59 result.
57 result.
60
58
61 Delegates to twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw),
59 Delegates to twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw),
62 passing twisted.internet.reactor for the first argument.
60 passing twisted.internet.reactor for the first argument.
63
61
64 @param f: the callable to run in the reactor thread
62 @param f: the callable to run in the reactor thread
65 @type f: any callable.
63 @type f: any callable.
66 @param a: the arguments to pass to C{f}.
64 @param a: the arguments to pass to C{f}.
67 @param kw: the keyword arguments to pass to C{f}.
65 @param kw: the keyword arguments to pass to C{f}.
68
66
69 @return: the result of the callback chain.
67 @return: the result of the callback chain.
70 @raise: any error raised during the callback chain.
68 @raise: any error raised during the callback chain.
71 """
69 """
72 return twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw)
70 return twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw)
73
71
74 else:
72 else:
75 def blockingCallFromThread(f, *a, **kw):
73 def blockingCallFromThread(f, *a, **kw):
76 """
74 """
77 Run a function in the reactor from a thread, and wait for the result
75 Run a function in the reactor from a thread, and wait for the result
78 synchronously, i.e. until the callback chain returned by the function get a
76 synchronously, i.e. until the callback chain returned by the function get a
79 result.
77 result.
80
78
81 @param f: the callable to run in the reactor thread
79 @param f: the callable to run in the reactor thread
82 @type f: any callable.
80 @type f: any callable.
83 @param a: the arguments to pass to C{f}.
81 @param a: the arguments to pass to C{f}.
84 @param kw: the keyword arguments to pass to C{f}.
82 @param kw: the keyword arguments to pass to C{f}.
85
83
86 @return: the result of the callback chain.
84 @return: the result of the callback chain.
87 @raise: any error raised during the callback chain.
85 @raise: any error raised during the callback chain.
88 """
86 """
89 from twisted.internet import reactor
87 from twisted.internet import reactor
90 queue = Queue.Queue()
88 queue = Queue.Queue()
91 def _callFromThread():
89 def _callFromThread():
92 result = defer.maybeDeferred(f, *a, **kw)
90 result = defer.maybeDeferred(f, *a, **kw)
93 result.addBoth(queue.put)
91 result.addBoth(queue.put)
94
92
95 reactor.callFromThread(_callFromThread)
93 reactor.callFromThread(_callFromThread)
96 result = queue.get()
94 result = queue.get()
97 if isinstance(result, failure.Failure):
95 if isinstance(result, failure.Failure):
98 # This makes it easier for the debugger to get access to the instance
96 # This makes it easier for the debugger to get access to the instance
99 try:
97 try:
100 result.raiseException()
98 result.raiseException()
101 except Exception, e:
99 except Exception, e:
102 raise e
100 raise e
103 return result
101 return result
104
102
105
103
106
104
107 #-------------------------------------------------------------------------------
105 #-------------------------------------------------------------------------------
108 # Things for managing deferreds
106 # Things for managing deferreds
109 #-------------------------------------------------------------------------------
107 #-------------------------------------------------------------------------------
110
108
111
109
112 def parseResults(results):
110 def parseResults(results):
113 """Pull out results/Failures from a DeferredList."""
111 """Pull out results/Failures from a DeferredList."""
114 return [x[1] for x in results]
112 return [x[1] for x in results]
115
113
116 def gatherBoth(dlist, fireOnOneCallback=0,
114 def gatherBoth(dlist, fireOnOneCallback=0,
117 fireOnOneErrback=0,
115 fireOnOneErrback=0,
118 consumeErrors=0,
116 consumeErrors=0,
119 logErrors=0):
117 logErrors=0):
120 """This is like gatherBoth, but sets consumeErrors=1."""
118 """This is like gatherBoth, but sets consumeErrors=1."""
121 d = DeferredList(dlist, fireOnOneCallback, fireOnOneErrback,
119 d = DeferredList(dlist, fireOnOneCallback, fireOnOneErrback,
122 consumeErrors, logErrors)
120 consumeErrors, logErrors)
123 if not fireOnOneCallback:
121 if not fireOnOneCallback:
124 d.addCallback(parseResults)
122 d.addCallback(parseResults)
125 return d
123 return d
126
124
127 SUCCESS = True
125 SUCCESS = True
128 FAILURE = False
126 FAILURE = False
129
127
130 class DeferredList(defer.Deferred):
128 class DeferredList(defer.Deferred):
131 """I combine a group of deferreds into one callback.
129 """I combine a group of deferreds into one callback.
132
130
133 I track a list of L{Deferred}s for their callbacks, and make a single
131 I track a list of L{Deferred}s for their callbacks, and make a single
134 callback when they have all completed, a list of (success, result)
132 callback when they have all completed, a list of (success, result)
135 tuples, 'success' being a boolean.
133 tuples, 'success' being a boolean.
136
134
137 Note that you can still use a L{Deferred} after putting it in a
135 Note that you can still use a L{Deferred} after putting it in a
138 DeferredList. For example, you can suppress 'Unhandled error in Deferred'
136 DeferredList. For example, you can suppress 'Unhandled error in Deferred'
139 messages by adding errbacks to the Deferreds *after* putting them in the
137 messages by adding errbacks to the Deferreds *after* putting them in the
140 DeferredList, as a DeferredList won't swallow the errors. (Although a more
138 DeferredList, as a DeferredList won't swallow the errors. (Although a more
141 convenient way to do this is simply to set the consumeErrors flag)
139 convenient way to do this is simply to set the consumeErrors flag)
142
140
143 Note: This is a modified version of the twisted.internet.defer.DeferredList
141 Note: This is a modified version of the twisted.internet.defer.DeferredList
144 """
142 """
145
143
146 fireOnOneCallback = 0
144 fireOnOneCallback = 0
147 fireOnOneErrback = 0
145 fireOnOneErrback = 0
148
146
149 def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0,
147 def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0,
150 consumeErrors=0, logErrors=0):
148 consumeErrors=0, logErrors=0):
151 """Initialize a DeferredList.
149 """Initialize a DeferredList.
152
150
153 @type deferredList: C{list} of L{Deferred}s
151 @type deferredList: C{list} of L{Deferred}s
154 @param deferredList: The list of deferreds to track.
152 @param deferredList: The list of deferreds to track.
155 @param fireOnOneCallback: (keyword param) a flag indicating that
153 @param fireOnOneCallback: (keyword param) a flag indicating that
156 only one callback needs to be fired for me to call
154 only one callback needs to be fired for me to call
157 my callback
155 my callback
158 @param fireOnOneErrback: (keyword param) a flag indicating that
156 @param fireOnOneErrback: (keyword param) a flag indicating that
159 only one errback needs to be fired for me to call
157 only one errback needs to be fired for me to call
160 my errback
158 my errback
161 @param consumeErrors: (keyword param) a flag indicating that any errors
159 @param consumeErrors: (keyword param) a flag indicating that any errors
162 raised in the original deferreds should be
160 raised in the original deferreds should be
163 consumed by this DeferredList. This is useful to
161 consumed by this DeferredList. This is useful to
164 prevent spurious warnings being logged.
162 prevent spurious warnings being logged.
165 """
163 """
166 self.resultList = [None] * len(deferredList)
164 self.resultList = [None] * len(deferredList)
167 defer.Deferred.__init__(self)
165 defer.Deferred.__init__(self)
168 if len(deferredList) == 0 and not fireOnOneCallback:
166 if len(deferredList) == 0 and not fireOnOneCallback:
169 self.callback(self.resultList)
167 self.callback(self.resultList)
170
168
171 # These flags need to be set *before* attaching callbacks to the
169 # These flags need to be set *before* attaching callbacks to the
172 # deferreds, because the callbacks use these flags, and will run
170 # deferreds, because the callbacks use these flags, and will run
173 # synchronously if any of the deferreds are already fired.
171 # synchronously if any of the deferreds are already fired.
174 self.fireOnOneCallback = fireOnOneCallback
172 self.fireOnOneCallback = fireOnOneCallback
175 self.fireOnOneErrback = fireOnOneErrback
173 self.fireOnOneErrback = fireOnOneErrback
176 self.consumeErrors = consumeErrors
174 self.consumeErrors = consumeErrors
177 self.logErrors = logErrors
175 self.logErrors = logErrors
178 self.finishedCount = 0
176 self.finishedCount = 0
179
177
180 index = 0
178 index = 0
181 for deferred in deferredList:
179 for deferred in deferredList:
182 deferred.addCallbacks(self._cbDeferred, self._cbDeferred,
180 deferred.addCallbacks(self._cbDeferred, self._cbDeferred,
183 callbackArgs=(index,SUCCESS),
181 callbackArgs=(index,SUCCESS),
184 errbackArgs=(index,FAILURE))
182 errbackArgs=(index,FAILURE))
185 index = index + 1
183 index = index + 1
186
184
187 def _cbDeferred(self, result, index, succeeded):
185 def _cbDeferred(self, result, index, succeeded):
188 """(internal) Callback for when one of my deferreds fires.
186 """(internal) Callback for when one of my deferreds fires.
189 """
187 """
190 self.resultList[index] = (succeeded, result)
188 self.resultList[index] = (succeeded, result)
191
189
192 self.finishedCount += 1
190 self.finishedCount += 1
193 if not self.called:
191 if not self.called:
194 if succeeded == SUCCESS and self.fireOnOneCallback:
192 if succeeded == SUCCESS and self.fireOnOneCallback:
195 self.callback((result, index))
193 self.callback((result, index))
196 elif succeeded == FAILURE and self.fireOnOneErrback:
194 elif succeeded == FAILURE and self.fireOnOneErrback:
197 # We have modified this to fire the errback chain with the actual
195 # We have modified this to fire the errback chain with the actual
198 # Failure instance the originally occured rather than twisted's
196 # Failure instance the originally occured rather than twisted's
199 # FirstError which wraps the failure
197 # FirstError which wraps the failure
200 self.errback(result)
198 self.errback(result)
201 elif self.finishedCount == len(self.resultList):
199 elif self.finishedCount == len(self.resultList):
202 self.callback(self.resultList)
200 self.callback(self.resultList)
203
201
204 if succeeded == FAILURE and self.logErrors:
202 if succeeded == FAILURE and self.logErrors:
205 log.err(result)
203 log.err(result)
206 if succeeded == FAILURE and self.consumeErrors:
204 if succeeded == FAILURE and self.consumeErrors:
207 result = None
205 result = None
208
206
209 return result
207 return result
210
208
211
209
212 def wait_for_file(filename, delay=0.1, max_tries=10):
210 def wait_for_file(filename, delay=0.1, max_tries=10):
213 """Wait (poll) for a file to be created.
211 """Wait (poll) for a file to be created.
214
212
215 This method returns a Deferred that will fire when a file exists. It
213 This method returns a Deferred that will fire when a file exists. It
216 works by polling os.path.isfile in time intervals specified by the
214 works by polling os.path.isfile in time intervals specified by the
217 delay argument. If `max_tries` is reached, it will errback with a
215 delay argument. If `max_tries` is reached, it will errback with a
218 `FileTimeoutError`.
216 `FileTimeoutError`.
219
217
220 Parameters
218 Parameters
221 ----------
219 ----------
222 filename : str
220 filename : str
223 The name of the file to wait for.
221 The name of the file to wait for.
224 delay : float
222 delay : float
225 The time to wait between polls.
223 The time to wait between polls.
226 max_tries : int
224 max_tries : int
227 The max number of attempts before raising `FileTimeoutError`
225 The max number of attempts before raising `FileTimeoutError`
228
226
229 Returns
227 Returns
230 -------
228 -------
231 d : Deferred
229 d : Deferred
232 A Deferred instance that will fire when the file exists.
230 A Deferred instance that will fire when the file exists.
233 """
231 """
234
232
235 d = defer.Deferred()
233 d = defer.Deferred()
236
234
237 def _test_for_file(filename, attempt=0):
235 def _test_for_file(filename, attempt=0):
238 if attempt >= max_tries:
236 if attempt >= max_tries:
239 d.errback(FileTimeoutError(
237 d.errback(FileTimeoutError(
240 'timeout waiting for file to be created: %s' % filename
238 'timeout waiting for file to be created: %s' % filename
241 ))
239 ))
242 else:
240 else:
243 if os.path.isfile(filename):
241 if os.path.isfile(filename):
244 d.callback(True)
242 d.callback(True)
245 else:
243 else:
246 reactor.callLater(delay, _test_for_file, filename, attempt+1)
244 reactor.callLater(delay, _test_for_file, filename, attempt+1)
247
245
248 _test_for_file(filename)
246 _test_for_file(filename)
249 return d
247 return d
250
248
251
249
252 def sleep_deferred(seconds):
250 def sleep_deferred(seconds):
253 """Sleep without blocking the event loop."""
251 """Sleep without blocking the event loop."""
254 d = defer.Deferred()
252 d = defer.Deferred()
255 reactor.callLater(seconds, d.callback, seconds)
253 reactor.callLater(seconds, d.callback, seconds)
256 return d
254 return d
257
255
258
256
259 def make_deferred(func):
257 def make_deferred(func):
260 """A decorator that calls a function with :func`maybeDeferred`."""
258 """A decorator that calls a function with :func`maybeDeferred`."""
261
259
262 def _wrapper(*args, **kwargs):
260 def _wrapper(*args, **kwargs):
263 return defer.maybeDeferred(func, *args, **kwargs)
261 return defer.maybeDeferred(func, *args, **kwargs)
264
262
265 return _wrapper No newline at end of file
263 return _wrapper
General Comments 0
You need to be logged in to leave comments. Login now