##// END OF EJS Templates
Work on default config files and docstrings....
Brian Granger -
Show More
@@ -1,67 +1,155 b''
1 1 import os
2 2
3 3 c = get_config()
4 4
5 # Options are:
6 # * LocalControllerLauncher
7 # * PBSControllerLauncher
5 #-----------------------------------------------------------------------------
6 # Select which launchers to use
7 #-----------------------------------------------------------------------------
8
9 # This allows you to control what method is used to start the controller
10 # and engines. The following methods are currently supported:
11 # * Start as a regular process on localhost.
12 # * Start using mpiexec.
13 # * Start using PBS
14 # * Start using SSH (currently broken)
15
16 # The selected launchers can be configured below.
17
18 # Options are (LocalControllerLauncher, MPIExecControllerLauncher,
19 # PBSControllerLauncher)
8 20 # c.Global.controller_launcher = 'IPython.kernel.launcher.LocalControllerLauncher'
9 21
10 # Options are:
11 # * LocalEngineSetLauncher
12 # * MPIExecEngineSetLauncher
13 # * PBSEngineSetLauncher
22 # Options are (LocalEngineSetLauncher, MPIExecEngineSetLauncher,
23 # PBSEngineSetLauncher)
14 24 # c.Global.engine_launcher = 'IPython.kernel.launcher.LocalEngineSetLauncher'
15 25
16 # c.Global.log_to_file = False
26 #-----------------------------------------------------------------------------
27 # Global configuration
28 #-----------------------------------------------------------------------------
29
30 # The default number of engine that will be started. This is overridden by
31 # the -n command line option: "ipcluster start -n 4"
17 32 # c.Global.n = 2
18 # c.Global.reset_config = False
19 # c.Global.clean_logs = True
20 33
21 # c.MPIExecLauncher.mpi_cmd = ['mpiexec']
22 # c.MPIExecLauncher.mpi_args = []
23 # c.MPIExecLauncher.program = []
24 # c.MPIExecLauncher.program_args = []
25 # c.MPIExecLauncher.n = 1
34 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
35 # c.Global.log_to_file = False
26 36
27 # c.SSHLauncher.ssh_cmd = ['ssh']
28 # c.SSHLauncher.ssh_args = []
29 # c.SSHLauncher.program = []
30 # s.SSHLauncher.program_args = []
31 # c.SSHLauncher.hostname = ''
32 # c.SSHLauncher.user = os.environ['USER']
37 # Remove old logs from cluster_dir/log before starting.
38 # c.Global.clean_logs = True
33 39
34 # c.PBSLauncher.submit_command = 'qsub'
35 # c.PBSLauncher.delete_command = 'qdel'
36 # c.PBSLauncher.job_id_regexp = '\d+'
37 # c.PBSLauncher.batch_template = """"""
38 # c.PBSLauncher.batch_file_name = u'pbs_batch_script'
40 #-----------------------------------------------------------------------------
41 # Controller launcher configuration
42 #-----------------------------------------------------------------------------
39 43
40 # c.LocalControllerLauncher.controller_args = []
44 # Configure how the controller is started. The configuration of the controller
45 # can also bet setup by editing the controller config file:
46 # ipcontroller_config.py
41 47
48 # The command line arguments to call the controller with.
49 # c.LocalControllerLauncher.controller_args = \
50 # ['--log-to-file','--log-level', '40']
51
52 # The mpiexec/mpirun command to use in started the controller.
42 53 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
54
55 # Additional arguments to pass to the actual mpiexec command.
43 56 # c.MPIExecControllerLauncher.mpi_args = []
44 # c.MPIExecControllerLauncher.controller_args = []
45 # c.MPIExecControllerLauncher.n = 1
46 57
58 # The command line argument to call the controller with.
59 # c.MPIExecControllerLauncher.controller_args = \
60 # ['--log-to-file','--log-level', '40']
61
62 # The command line program to use to submit a PBS job.
47 63 # c.PBSControllerLauncher.submit_command = 'qsub'
64
65 # The command line program to use to delete a PBS job.
48 66 # c.PBSControllerLauncher.delete_command = 'qdel'
67
68 # A regular expression that takes the output of qsub and find the job id.
49 69 # c.PBSControllerLauncher.job_id_regexp = '\d+'
70
71 # The batch submission script used to start the controller. This is where
72 # environment variables would be setup, etc. This string is interpolated using
73 # the Itpl module in IPython.external. Basically, you can use ${profile} for
74 # the controller profile or ${cluster_dir} for the cluster_dir.
50 75 # c.PBSControllerLauncher.batch_template = """"""
51 # c.PBSLauncher.batch_file_name = u'pbs_batch_script'
52 76
53 # c.LocalEngineLauncher.engine_args = []
77 # The name of the instantiated batch script that will actually be used to
78 # submit the job. This will be written to the cluster directory.
79 # c.PBSControllerLauncher.batch_file_name = u'pbs_batch_script_controller'
80
81 #-----------------------------------------------------------------------------
82 # Engine launcher configuration
83 #-----------------------------------------------------------------------------
54 84
55 # c.LocalEngineSetLauncher.engine_args = []
85 # Command line argument passed to the engines.
86 # c.LocalEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
56 87
88 # The mpiexec/mpirun command to use in started the controller.
57 89 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
90
91 # Additional arguments to pass to the actual mpiexec command.
58 92 # c.MPIExecEngineSetLauncher.mpi_args = []
59 # c.MPIExecEngineSetLauncher.controller_args = []
93
94 # Command line argument passed to the engines.
95 # c.MPIExecEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
96
97 # The default number of engines to start if not given elsewhere.
60 98 # c.MPIExecEngineSetLauncher.n = 1
61 99
100 # The command line program to use to submit a PBS job.
62 101 # c.PBSEngineSetLauncher.submit_command = 'qsub'
102
103 # The command line program to use to delete a PBS job.
63 104 # c.PBSEngineSetLauncher.delete_command = 'qdel'
105
106 # A regular expression that takes the output of qsub and find the job id.
64 107 # c.PBSEngineSetLauncher.job_id_regexp = '\d+'
108
109 # The batch submission script used to start the engines. This is where
110 # environment variables would be setup, etc. This string is interpolated using
111 # the Itpl module in IPython.external. Basically, you can use ${n} for the
112 # number of engine, ${profile} or the engine profile and ${cluster_dir}
113 # for the cluster_dir.
65 114 # c.PBSEngineSetLauncher.batch_template = """"""
66 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script'
115
116 # The name of the instantiated batch script that will actually be used to
117 # submit the job. This will be written to the cluster directory.
118 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script_engines'
119
120 #-----------------------------------------------------------------------------
121 # Base launcher configuration
122 #-----------------------------------------------------------------------------
123
124 # The various launchers are organized into an inheritance hierarchy.
125 # The configurations can also be iherited and the following attributes
126 # allow you to configure the base classes.
127
128 # c.MPIExecLauncher.mpi_cmd = ['mpiexec']
129 # c.MPIExecLauncher.mpi_args = []
130 # c.MPIExecLauncher.program = []
131 # c.MPIExecLauncher.program_args = []
132 # c.MPIExecLauncher.n = 1
133
134 # c.SSHLauncher.ssh_cmd = ['ssh']
135 # c.SSHLauncher.ssh_args = []
136 # c.SSHLauncher.program = []
137 # s.SSHLauncher.program_args = []
138 # c.SSHLauncher.hostname = ''
139 # c.SSHLauncher.user = os.environ['USER']
140
141 # c.BatchSystemLauncher.submit_command
142 # c.BatchSystemLauncher.delete_command
143 # c.BatchSystemLauncher.job_id_regexp
144 # c.BatchSystemLauncher.batch_template
145 # c.BatchSystemLauncher.batch_file_name
146
147 # c.PBSLauncher.submit_command = 'qsub'
148 # c.PBSLauncher.delete_command = 'qdel'
149 # c.PBSLauncher.job_id_regexp = '\d+'
150 # c.PBSLauncher.batch_template = """"""
151 # c.PBSLauncher.batch_file_name = u'pbs_batch_script'
152
153
154
67 155
@@ -1,68 +1,132 b''
1 1 from IPython.config.loader import Config
2 2
3 3 c = get_config()
4 4
5 5 #-----------------------------------------------------------------------------
6 6 # Global configuration
7 7 #-----------------------------------------------------------------------------
8 8
9 9 # Basic Global config attributes
10
11 # Start up messages are logged to stdout using the logging module.
12 # These all happen before the twisted reactor is started and are
13 # useful for debugging purposes. Can be (10=DEBUG,20=INFO,30=WARN,40=CRITICAL)
14 # and smaller is more verbose.
15 # c.Global.log_level = 20
16
17 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
10 18 # c.Global.log_to_file = False
19
20 # Remove old logs from cluster_dir/log before starting.
11 21 # c.Global.clean_logs = True
22
23 # A list of Python statements that will be run before starting the
24 # controller. This is provided because occasionally certain things need to
25 # be imported in the controller for pickling to work.
12 26 # c.Global.import_statements = ['import math']
27
28 # Reuse the controller's FURL files. If False, FURL files are regenerated
29 # each time the controller is run. If True, they will be reused, *but*, you
30 # also must set the network ports by hand. If set, this will override the
31 # values set for the client and engine connections below.
13 32 # c.Global.reuse_furls = True
33
34 # Enable SSL encryption on all connections to the controller. If set, this
35 # will override the values set for the client and engine connections below.
14 36 # c.Global.secure = True
15 37
16 38 #-----------------------------------------------------------------------------
17 39 # Configure the client services
18 40 #-----------------------------------------------------------------------------
19 41
20 42 # Basic client service config attributes
43
44 # The network interface the controller will listen on for client connections.
45 # This should be an IP address or hostname of the controller's host. The empty
46 # string means listen on all interfaces.
21 47 # c.FCClientServiceFactory.ip = ''
48
49 # The TCP/IP port the controller will listen on for client connections. If 0
50 # a random port will be used. If the controller's host has a firewall running
51 # it must allow incoming traffic on this port.
22 52 # c.FCClientServiceFactory.port = 0
53
54 # The client learns how to connect to the controller by looking at the
55 # location field embedded in the FURL. If this field is empty, all network
56 # interfaces that the controller is listening on will be listed. To have the
57 # client connect on a particular interface, list it here.
23 58 # c.FCClientServiceFactory.location = ''
59
60 # Use SSL encryption for the client connection.
24 61 # c.FCClientServiceFactory.secure = True
62
63 # Reuse the client FURL each time the controller is started. If set, you must
64 # also pick a specific network port above (FCClientServiceFactory.port).
25 65 # c.FCClientServiceFactory.reuse_furls = False
26 66
27 # You shouldn't have to modify the rest of this section
67 #-----------------------------------------------------------------------------
68 # Configure the engine services
69 #-----------------------------------------------------------------------------
70
71 # Basic config attributes for the engine services.
72
73 # The network interface the controller will listen on for engine connections.
74 # This should be an IP address or hostname of the controller's host. The empty
75 # string means listen on all interfaces.
76 # c.FCEngineServiceFactory.ip = ''
77
78 # The TCP/IP port the controller will listen on for engine connections. If 0
79 # a random port will be used. If the controller's host has a firewall running
80 # it must allow incoming traffic on this port.
81 # c.FCEngineServiceFactory.port = 0
82
83 # The engine learns how to connect to the controller by looking at the
84 # location field embedded in the FURL. If this field is empty, all network
85 # interfaces that the controller is listening on will be listed. To have the
86 # client connect on a particular interface, list it here.
87 # c.FCEngineServiceFactory.location = ''
88
89 # Use SSL encryption for the engine connection.
90 # c.FCEngineServiceFactory.secure = True
91
92 # Reuse the client FURL each time the controller is started. If set, you must
93 # also pick a specific network port above (FCClientServiceFactory.port).
94 # c.FCEngineServiceFactory.reuse_furls = False
95
96 #-----------------------------------------------------------------------------
97 # Developer level configuration attributes
98 #-----------------------------------------------------------------------------
99
100 # You shouldn't have to modify anything in this section. These attributes
101 # are more for developers who want to change the behavior of the controller
102 # at a fundamental level.
103
28 104 # c.FCClientServiceFactory.cert_file = 'ipcontroller-client.pem'
29 105
30 106 # default_client_interfaces = Config()
31 107 # default_client_interfaces.Task.interface_chain = [
32 108 # 'IPython.kernel.task.ITaskController',
33 109 # 'IPython.kernel.taskfc.IFCTaskController'
34 110 # ]
35 111 #
36 112 # default_client_interfaces.Task.furl_file = 'ipcontroller-tc.furl'
37 113 #
38 114 # default_client_interfaces.MultiEngine.interface_chain = [
39 115 # 'IPython.kernel.multiengine.IMultiEngine',
40 116 # 'IPython.kernel.multienginefc.IFCSynchronousMultiEngine'
41 117 # ]
42 118 #
43 119 # default_client_interfaces.MultiEngine.furl_file = 'ipcontroller-mec.furl'
44 120 #
45 121 # c.FCEngineServiceFactory.interfaces = default_client_interfaces
46 122
47 #-----------------------------------------------------------------------------
48 # Configure the engine services
49 #-----------------------------------------------------------------------------
50
51 # Basic config attributes for the engine services
52 # c.FCEngineServiceFactory.ip = ''
53 # c.FCEngineServiceFactory.port = 0
54 # c.FCEngineServiceFactory.location = ''
55 # c.FCEngineServiceFactory.secure = True
56 # c.FCEngineServiceFactory.reuse_furls = False
57
58 # You shouldn't have to modify the rest of this section
59 123 # c.FCEngineServiceFactory.cert_file = 'ipcontroller-engine.pem'
60 124
61 125 # default_engine_interfaces = Config()
62 126 # default_engine_interfaces.Default.interface_chain = [
63 127 # 'IPython.kernel.enginefc.IFCControllerBase'
64 128 # ]
65 129 #
66 130 # default_engine_interfaces.Default.furl_file = 'ipcontroller-engine.furl'
67 131 #
68 132 # c.FCEngineServiceFactory.interfaces = default_engine_interfaces
@@ -1,28 +1,86 b''
1 1 c = get_config()
2 2
3 #-----------------------------------------------------------------------------
4 # Global configuration
5 #-----------------------------------------------------------------------------
6
7 # Start up messages are logged to stdout using the logging module.
8 # These all happen before the twisted reactor is started and are
9 # useful for debugging purposes. Can be (10=DEBUG,20=INFO,30=WARN,40=CRITICAL)
10 # and smaller is more verbose.
11 # c.Global.log_level = 20
12
13 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
3 14 # c.Global.log_to_file = False
4 # c.Global.clean_logs = False
15
16 # Remove old logs from cluster_dir/log before starting.
17 # c.Global.clean_logs = True
18
19 # A list of strings that will be executed in the users namespace on the engine
20 # before it connects to the controller.
5 21 # c.Global.exec_lines = ['import numpy']
6 # c.Global.log_level = 10
7 # c.Global.shell_class = 'IPython.kernel.core.interpreter.Interpreter'
8 # c.Global.furl_file_name = 'ipcontroller-engine.furl'
9 # c.Global.furl_file = ''
10 # The max number of connection attemps and the initial delay between
22
23 # The engine will try to connect to the controller multiple times, to allow
24 # the controller time to startup and write its FURL file. These parameters
25 # control the number of retries (connect_max_tries) and the initial delay
26 # (connect_delay) between attemps. The actual delay between attempts gets
27 # longer each time by a factor of 1.5 (delay[i] = 1.5*delay[i-1])
11 28 # those attemps.
12 29 # c.Global.connect_delay = 0.1
13 30 # c.Global.connect_max_tries = 15
14 31
32 # By default, the engine will look for the controller's FURL file in its own
33 # cluster directory. Sometimes, the FURL file will be elsewhere and this
34 # attribute can be set to the full path of the FURL file.
35 # c.Global.furl_file = ''
36
37 #-----------------------------------------------------------------------------
38 # MPI configuration
39 #-----------------------------------------------------------------------------
15 40
41 # Upon starting the engine can be configured to call MPI_Init. This section
42 # configures that.
43
44 # Select which MPI section to execute to setup MPI. The value of this
45 # attribute must match the name of another attribute in the MPI config
46 # section (mpi4py, pytrilinos, etc.). This can also be set by the --mpi
47 # command line option.
16 48 # c.MPI.use = ''
49
50 # Initialize MPI using mpi4py. To use this, set c.MPI.use = 'mpi4py' to use
51 # --mpi=mpi4py at the command line.
17 52 # c.MPI.mpi4py = """from mpi4py import MPI as mpi
18 53 # mpi.size = mpi.COMM_WORLD.Get_size()
19 54 # mpi.rank = mpi.COMM_WORLD.Get_rank()
20 55 # """
56
57 # Initialize MPI using pytrilinos. To use this, set c.MPI.use = 'pytrilinos'
58 # to use --mpi=pytrilinos at the command line.
21 59 # c.MPI.pytrilinos = """from PyTrilinos import Epetra
22 60 # class SimpleStruct:
23 61 # pass
24 62 # mpi = SimpleStruct()
25 63 # mpi.rank = 0
26 64 # mpi.size = 0
27 65 # """
28 66
67 #-----------------------------------------------------------------------------
68 # Developer level configuration attributes
69 #-----------------------------------------------------------------------------
70
71 # You shouldn't have to modify anything in this section. These attributes
72 # are more for developers who want to change the behavior of the controller
73 # at a fundamental level.
74
75 # You should not have to change these attributes.
76
77 # c.Global.shell_class = 'IPython.kernel.core.interpreter.Interpreter'
78
79 # c.Global.furl_file_name = 'ipcontroller-engine.furl'
80
81
82
83
84
85
86
@@ -1,713 +1,766 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Facilities for handling client connections to the controller."""
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2008-2009 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 from __future__ import with_statement
18 18 import os
19 19
20 20 from IPython.kernel.fcutil import (
21 21 Tub,
22 22 find_furl,
23 23 is_valid_furl_or_file,
24 24 validate_furl_or_file,
25 25 FURLError
26 26 )
27 27 from IPython.kernel.clusterdir import ClusterDir, ClusterDirError
28 28 from IPython.kernel.launcher import IPClusterLauncher
29 29 from IPython.kernel.twistedutil import (
30 30 gatherBoth,
31 31 make_deferred,
32 32 blockingCallFromThread,
33 33 sleep_deferred
34 34 )
35 35 from IPython.utils.importstring import import_item
36 36 from IPython.utils.genutils import get_ipython_dir
37 37
38 38 from twisted.internet import defer
39 39 from twisted.internet.defer import inlineCallbacks, returnValue
40 40 from twisted.python import failure, log
41 41
42 42 #-----------------------------------------------------------------------------
43 43 # The ClientConnector class
44 44 #-----------------------------------------------------------------------------
45 45
46 46 DELAY = 0.2
47 47 MAX_TRIES = 9
48 48
49 49
50 50 class ClientConnectorError(Exception):
51 51 pass
52 52
53 53
54 54 class AsyncClientConnector(object):
55 55 """A class for getting remote references and clients from furls.
56 56
57 57 This start a single :class:`Tub` for all remote reference and caches
58 58 references.
59 59 """
60 60
61 61 def __init__(self):
62 62 self._remote_refs = {}
63 63 self.tub = Tub()
64 64 self.tub.startService()
65 65
66 66 def _find_furl(self, profile='default', cluster_dir=None,
67 67 furl_or_file=None, furl_file_name=None,
68 68 ipythondir=None):
69 69 """Find a FURL file by profile+ipythondir or cluster dir.
70 70
71 71 This raises an :exc:`~IPython.kernel.fcutil.FURLError` exception
72 72 if a FURL file can't be found.
73 73 """
74 74 # Try by furl_or_file
75 75 if furl_or_file is not None:
76 76 validate_furl_or_file(furl_or_file)
77 77 return furl_or_file
78 78
79 79 if furl_file_name is None:
80 80 raise FURLError('A furl_file_name must be provided')
81 81
82 82 # Try by cluster_dir
83 83 if cluster_dir is not None:
84 84 cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
85 85 sdir = cluster_dir_obj.security_dir
86 86 furl_file = os.path.join(sdir, furl_file_name)
87 87 validate_furl_or_file(furl_file)
88 88 return furl_file
89 89
90 90 # Try by profile
91 91 if ipythondir is None:
92 92 ipythondir = get_ipython_dir()
93 93 if profile is not None:
94 94 cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
95 95 ipythondir, profile)
96 96 sdir = cluster_dir_obj.security_dir
97 97 furl_file = os.path.join(sdir, furl_file_name)
98 98 validate_furl_or_file(furl_file)
99 99 return furl_file
100 100
101 101 raise FURLError('Could not find a valid FURL file.')
102 102
103 103 def get_reference(self, furl_or_file):
104 104 """Get a remote reference using a furl or a file containing a furl.
105 105
106 106 Remote references are cached locally so once a remote reference
107 107 has been retrieved for a given furl, the cached version is
108 108 returned.
109 109
110 110 Parameters
111 111 ----------
112 112 furl_or_file : str
113 113 A furl or a filename containing a furl. This should already be
114 114 validated, but might not yet exist.
115 115
116 116 Returns
117 117 -------
118 118 A deferred to a remote reference
119 119 """
120 120 furl = furl_or_file
121 121 if furl in self._remote_refs:
122 122 d = defer.succeed(self._remote_refs[furl])
123 123 else:
124 124 d = self.tub.getReference(furl)
125 125 d.addCallback(self._save_ref, furl)
126 126 return d
127 127
128 128 def _save_ref(self, ref, furl):
129 129 """Cache a remote reference by its furl."""
130 130 self._remote_refs[furl] = ref
131 131 return ref
132 132
133 133 def get_task_client(self, profile='default', cluster_dir=None,
134 134 furl_or_file=None, ipythondir=None,
135 135 delay=DELAY, max_tries=MAX_TRIES):
136 136 """Get the task controller client.
137 137
138 138 This method is a simple wrapper around `get_client` that passes in
139 139 the default name of the task client FURL file. Usually only
140 140 the ``profile`` option will be needed. If a FURL file can't be
141 141 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
142 142
143 143 Parameters
144 144 ----------
145 145 profile : str
146 146 The name of a cluster directory profile (default="default"). The
147 147 cluster directory "cluster_<profile>" will be searched for
148 148 in ``os.getcwd()``, the ipythondir and then in the directories
149 149 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
150 150 cluster_dir : str
151 151 The full path to a cluster directory. This is useful if profiles
152 152 are not being used.
153 153 furl_or_file : str
154 154 A furl or a filename containing a FURLK. This is useful if you
155 155 simply know the location of the FURL file.
156 156 ipythondir : str
157 157 The location of the ipythondir if different from the default.
158 158 This is used if the cluster directory is being found by profile.
159 delay : float
160 The initial delay between re-connection attempts. Susequent delays
161 get longer according to ``delay[i] = 1.5*delay[i-1]``.
162 max_tries : int
163 The max number of re-connection attempts.
159 164
160 165 Returns
161 166 -------
162 167 A deferred to the actual client class.
163 168 """
164 169 return self.get_client(
165 170 profile, cluster_dir, furl_or_file,
166 171 'ipcontroller-tc.furl', ipythondir,
167 172 delay, max_tries
168 173 )
169 174
170 175 def get_multiengine_client(self, profile='default', cluster_dir=None,
171 176 furl_or_file=None, ipythondir=None,
172 177 delay=DELAY, max_tries=MAX_TRIES):
173 178 """Get the multiengine controller client.
174 179
175 180 This method is a simple wrapper around `get_client` that passes in
176 181 the default name of the task client FURL file. Usually only
177 182 the ``profile`` option will be needed. If a FURL file can't be
178 183 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
179 184
180 185 Parameters
181 186 ----------
182 187 profile : str
183 188 The name of a cluster directory profile (default="default"). The
184 189 cluster directory "cluster_<profile>" will be searched for
185 190 in ``os.getcwd()``, the ipythondir and then in the directories
186 191 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
187 192 cluster_dir : str
188 193 The full path to a cluster directory. This is useful if profiles
189 194 are not being used.
190 195 furl_or_file : str
191 196 A furl or a filename containing a FURLK. This is useful if you
192 197 simply know the location of the FURL file.
193 198 ipythondir : str
194 199 The location of the ipythondir if different from the default.
195 200 This is used if the cluster directory is being found by profile.
196
201 delay : float
202 The initial delay between re-connection attempts. Susequent delays
203 get longer according to ``delay[i] = 1.5*delay[i-1]``.
204 max_tries : int
205 The max number of re-connection attempts.
206
197 207 Returns
198 208 -------
199 209 A deferred to the actual client class.
200 210 """
201 211 return self.get_client(
202 212 profile, cluster_dir, furl_or_file,
203 213 'ipcontroller-mec.furl', ipythondir,
204 214 delay, max_tries
205 215 )
206 216
207 217 def get_client(self, profile='default', cluster_dir=None,
208 218 furl_or_file=None, furl_file_name=None, ipythondir=None,
209 219 delay=DELAY, max_tries=MAX_TRIES):
210 220 """Get a remote reference and wrap it in a client by furl.
211 221
212 222 This method is a simple wrapper around `get_client` that passes in
213 223 the default name of the task client FURL file. Usually only
214 224 the ``profile`` option will be needed. If a FURL file can't be
215 225 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
216 226
217 227 Parameters
218 228 ----------
219 229 profile : str
220 230 The name of a cluster directory profile (default="default"). The
221 231 cluster directory "cluster_<profile>" will be searched for
222 232 in ``os.getcwd()``, the ipythondir and then in the directories
223 233 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
224 234 cluster_dir : str
225 235 The full path to a cluster directory. This is useful if profiles
226 236 are not being used.
227 237 furl_or_file : str
228 238 A furl or a filename containing a FURL. This is useful if you
229 239 simply know the location of the FURL file.
230 240 furl_file_name : str
231 241 The filename (not the full path) of the FURL. This must be
232 242 provided if ``furl_or_file`` is not.
233 243 ipythondir : str
234 244 The location of the ipythondir if different from the default.
235 245 This is used if the cluster directory is being found by profile.
246 delay : float
247 The initial delay between re-connection attempts. Susequent delays
248 get longer according to ``delay[i] = 1.5*delay[i-1]``.
249 max_tries : int
250 The max number of re-connection attempts.
236 251
237 252 Returns
238 253 -------
239 254 A deferred to the actual client class. Or a failure to a
240 255 :exc:`FURLError`.
241 256 """
242 257 try:
243 258 furl_file = self._find_furl(
244 259 profile, cluster_dir, furl_or_file,
245 260 furl_file_name, ipythondir
246 261 )
247 262 except FURLError:
248 263 return defer.fail(failure.Failure())
249 264
250 265 def _wrap_remote_reference(rr):
251 266 d = rr.callRemote('get_client_name')
252 267 d.addCallback(lambda name: import_item(name))
253 268 def adapt(client_interface):
254 269 client = client_interface(rr)
255 270 client.tub = self.tub
256 271 return client
257 272 d.addCallback(adapt)
258 273
259 274 return d
260 275
261 276 d = self._try_to_connect(furl_file, delay, max_tries, attempt=0)
262 277 d.addCallback(_wrap_remote_reference)
263 278 return d
264 279
265 280 @inlineCallbacks
266 281 def _try_to_connect(self, furl_or_file, delay, max_tries, attempt):
267 282 """Try to connect to the controller with retry logic."""
268 283 if attempt < max_tries:
269 284 log.msg("Connecting to controller [%r]: %s" % \
270 285 (attempt, furl_or_file))
271 286 try:
272 287 self.furl = find_furl(furl_or_file)
273 288 # Uncomment this to see the FURL being tried.
274 289 # log.msg("FURL: %s" % self.furl)
275 290 rr = yield self.get_reference(self.furl)
276 291 except:
277 292 if attempt==max_tries-1:
278 293 # This will propagate the exception all the way to the top
279 294 # where it can be handled.
280 295 raise
281 296 else:
282 297 yield sleep_deferred(delay)
283 298 rr = yield self._try_to_connect(
284 299 furl_or_file, 1.5*delay, max_tries, attempt+1
285 300 )
286 301 returnValue(rr)
287 302 else:
288 303 returnValue(rr)
289 304 else:
290 305 raise ClientConnectorError(
291 306 'Could not connect to controller, max_tries (%r) exceeded. '
292 307 'This usually means that i) the controller was not started, '
293 308 'or ii) a firewall was blocking the client from connecting '
294 309 'to the controller.' % max_tries
295 310 )
296 311
297 312
298 313 class ClientConnector(object):
299 314 """A blocking version of a client connector.
300 315
301 316 This class creates a single :class:`Tub` instance and allows remote
302 317 references and client to be retrieved by their FURLs. Remote references
303 318 are cached locally and FURL files can be found using profiles and cluster
304 319 directories.
305 320 """
306 321
307 322 def __init__(self):
308 323 self.async_cc = AsyncClientConnector()
309 324
310 325 def get_task_client(self, profile='default', cluster_dir=None,
311 326 furl_or_file=None, ipythondir=None,
312 327 delay=DELAY, max_tries=MAX_TRIES):
313 328 """Get the task client.
314 329
315 330 Usually only the ``profile`` option will be needed. If a FURL file
316 331 can't be found by its profile, use ``cluster_dir`` or
317 332 ``furl_or_file``.
318 333
319 334 Parameters
320 335 ----------
321 336 profile : str
322 337 The name of a cluster directory profile (default="default"). The
323 338 cluster directory "cluster_<profile>" will be searched for
324 339 in ``os.getcwd()``, the ipythondir and then in the directories
325 340 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
326 341 cluster_dir : str
327 342 The full path to a cluster directory. This is useful if profiles
328 343 are not being used.
329 344 furl_or_file : str
330 345 A furl or a filename containing a FURLK. This is useful if you
331 346 simply know the location of the FURL file.
332 347 ipythondir : str
333 348 The location of the ipythondir if different from the default.
334 349 This is used if the cluster directory is being found by profile.
350 delay : float
351 The initial delay between re-connection attempts. Susequent delays
352 get longer according to ``delay[i] = 1.5*delay[i-1]``.
353 max_tries : int
354 The max number of re-connection attempts.
335 355
336 356 Returns
337 357 -------
338 358 The task client instance.
339 359 """
340 360 client = blockingCallFromThread(
341 361 self.async_cc.get_task_client, profile, cluster_dir,
342 362 furl_or_file, ipythondir, delay, max_tries
343 363 )
344 364 return client.adapt_to_blocking_client()
345 365
346 366 def get_multiengine_client(self, profile='default', cluster_dir=None,
347 367 furl_or_file=None, ipythondir=None,
348 368 delay=DELAY, max_tries=MAX_TRIES):
349 369 """Get the multiengine client.
350 370
351 371 Usually only the ``profile`` option will be needed. If a FURL file
352 372 can't be found by its profile, use ``cluster_dir`` or
353 373 ``furl_or_file``.
354 374
355 375 Parameters
356 376 ----------
357 377 profile : str
358 378 The name of a cluster directory profile (default="default"). The
359 379 cluster directory "cluster_<profile>" will be searched for
360 380 in ``os.getcwd()``, the ipythondir and then in the directories
361 381 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
362 382 cluster_dir : str
363 383 The full path to a cluster directory. This is useful if profiles
364 384 are not being used.
365 385 furl_or_file : str
366 386 A furl or a filename containing a FURLK. This is useful if you
367 387 simply know the location of the FURL file.
368 388 ipythondir : str
369 389 The location of the ipythondir if different from the default.
370 390 This is used if the cluster directory is being found by profile.
391 delay : float
392 The initial delay between re-connection attempts. Susequent delays
393 get longer according to ``delay[i] = 1.5*delay[i-1]``.
394 max_tries : int
395 The max number of re-connection attempts.
371 396
372 397 Returns
373 398 -------
374 399 The multiengine client instance.
375 400 """
376 401 client = blockingCallFromThread(
377 402 self.async_cc.get_multiengine_client, profile, cluster_dir,
378 403 furl_or_file, ipythondir, delay, max_tries
379 404 )
380 405 return client.adapt_to_blocking_client()
381 406
382 407 def get_client(self, profile='default', cluster_dir=None,
383 408 furl_or_file=None, ipythondir=None,
384 409 delay=DELAY, max_tries=MAX_TRIES):
385 410 client = blockingCallFromThread(
386 411 self.async_cc.get_client, profile, cluster_dir,
387 412 furl_or_file, ipythondir,
388 413 delay, max_tries
389 414 )
390 415 return client.adapt_to_blocking_client()
391 416
392 417
393 418 class ClusterStateError(Exception):
394 419 pass
395 420
396 421
397 422 class AsyncCluster(object):
398 423 """An class that wraps the :command:`ipcluster` script."""
399 424
400 425 def __init__(self, profile='default', cluster_dir=None, ipythondir=None,
401 426 auto_create=False, auto_stop=True):
402 427 """Create a class to manage an IPython cluster.
403 428
404 429 This class calls the :command:`ipcluster` command with the right
405 430 options to start an IPython cluster. Typically a cluster directory
406 431 must be created (:command:`ipcluster create`) and configured before
407 432 using this class. Configuration is done by editing the
408 433 configuration files in the top level of the cluster directory.
409 434
410 435 Parameters
411 436 ----------
412 437 profile : str
413 438 The name of a cluster directory profile (default="default"). The
414 439 cluster directory "cluster_<profile>" will be searched for
415 440 in ``os.getcwd()``, the ipythondir and then in the directories
416 441 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
417 442 cluster_dir : str
418 443 The full path to a cluster directory. This is useful if profiles
419 444 are not being used.
420 445 ipythondir : str
421 446 The location of the ipythondir if different from the default.
422 447 This is used if the cluster directory is being found by profile.
423 448 auto_create : bool
424 449 Automatically create the cluster directory it is dones't exist.
425 450 This will usually only make sense if using a local cluster
426 451 (default=False).
427 452 auto_stop : bool
428 453 Automatically stop the cluster when this instance is garbage
429 454 collected (default=True). This is useful if you want the cluster
430 455 to live beyond your current process. There is also an instance
431 456 attribute ``auto_stop`` to change this behavior.
432 457 """
433 458 self._setup_cluster_dir(profile, cluster_dir, ipythondir, auto_create)
434 459 self.state = 'before'
435 460 self.launcher = None
436 461 self.client_connector = None
437 462 self.auto_stop = auto_stop
438 463
439 464 def __del__(self):
440 465 if self.auto_stop and self.state=='running':
441 466 print "Auto stopping the cluster..."
442 467 self.stop()
443 468
444 469 @property
445 470 def location(self):
446 471 if hasattr(self, 'cluster_dir_obj'):
447 472 return self.cluster_dir_obj.location
448 473 else:
449 474 return ''
450 475
451 476 @property
452 477 def running(self):
453 478 if self.state=='running':
454 479 return True
455 480 else:
456 481 return False
457 482
458 483 def _setup_cluster_dir(self, profile, cluster_dir, ipythondir, auto_create):
459 484 if ipythondir is None:
460 485 ipythondir = get_ipython_dir()
461 486 if cluster_dir is not None:
462 487 try:
463 488 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
464 489 except ClusterDirError:
465 490 pass
466 491 if profile is not None:
467 492 try:
468 493 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
469 494 ipythondir, profile)
470 495 except ClusterDirError:
471 496 pass
472 497 if auto_create or profile=='default':
473 498 # This should call 'ipcluster create --profile default
474 499 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
475 500 ipythondir, profile)
476 501 else:
477 502 raise ClusterDirError('Cluster dir not found.')
478 503
479 504 @make_deferred
480 505 def start(self, n=2):
481 506 """Start the IPython cluster with n engines.
482 507
483 508 Parameters
484 509 ----------
485 510 n : int
486 511 The number of engine to start.
487 512 """
488 513 # We might want to add logic to test if the cluster has started
489 514 # by another process....
490 515 if not self.state=='running':
491 516 self.launcher = IPClusterLauncher(os.getcwd())
492 517 self.launcher.ipcluster_n = n
493 518 self.launcher.ipcluster_subcommand = 'start'
494 519 d = self.launcher.start()
495 520 d.addCallback(self._handle_start)
496 521 return d
497 522 else:
498 523 raise ClusterStateError('Cluster is already running')
499 524
500 525 @make_deferred
501 526 def stop(self):
502 527 """Stop the IPython cluster if it is running."""
503 528 if self.state=='running':
504 529 d1 = self.launcher.observe_stop()
505 530 d1.addCallback(self._handle_stop)
506 531 d2 = self.launcher.stop()
507 532 return gatherBoth([d1, d2], consumeErrors=True)
508 533 else:
509 534 raise ClusterStateError("Cluster not running")
510 535
511 536 def get_multiengine_client(self, delay=DELAY, max_tries=MAX_TRIES):
512 537 """Get the multiengine client for the running cluster.
513 538
514 539 If this fails, it means that the cluster has not finished starting.
515 540 Usually waiting a few seconds are re-trying will solve this.
516 541 """
517 542 if self.client_connector is None:
518 543 self.client_connector = AsyncClientConnector()
519 544 return self.client_connector.get_multiengine_client(
520 545 cluster_dir=self.cluster_dir_obj.location,
521 546 delay=delay, max_tries=max_tries
522 547 )
523 548
524 549 def get_task_client(self, delay=DELAY, max_tries=MAX_TRIES):
525 550 """Get the task client for the running cluster.
526 551
527 552 If this fails, it means that the cluster has not finished starting.
528 553 Usually waiting a few seconds are re-trying will solve this.
529 554 """
530 555 if self.client_connector is None:
531 556 self.client_connector = AsyncClientConnector()
532 557 return self.client_connector.get_task_client(
533 558 cluster_dir=self.cluster_dir_obj.location,
534 559 delay=delay, max_tries=max_tries
535 560 )
536 561
537 562 def get_ipengine_logs(self):
538 563 return self.get_logs_by_name('ipengine')
539 564
540 565 def get_ipcontroller_logs(self):
541 566 return self.get_logs_by_name('ipcontroller')
542 567
543 568 def get_ipcluster_logs(self):
544 569 return self.get_logs_by_name('ipcluster')
545 570
546 571 def get_logs_by_name(self, name='ipcluster'):
547 572 log_dir = self.cluster_dir_obj.log_dir
548 573 logs = {}
549 574 for log in os.listdir(log_dir):
550 575 if log.startswith(name + '-') and log.endswith('.log'):
551 576 with open(os.path.join(log_dir, log), 'r') as f:
552 577 logs[log] = f.read()
553 578 return logs
554 579
555 580 def get_logs(self):
556 581 d = self.get_ipcluster_logs()
557 582 d.update(self.get_ipengine_logs())
558 583 d.update(self.get_ipcontroller_logs())
559 584 return d
560 585
561 586 def _handle_start(self, r):
562 587 self.state = 'running'
563 588
564 589 def _handle_stop(self, r):
565 590 self.state = 'after'
566 591
567 592
568 593 class Cluster(object):
569 594
570 595
571 596 def __init__(self, profile='default', cluster_dir=None, ipythondir=None,
572 597 auto_create=False, auto_stop=True):
573 598 """Create a class to manage an IPython cluster.
574 599
575 600 This class calls the :command:`ipcluster` command with the right
576 601 options to start an IPython cluster. Typically a cluster directory
577 602 must be created (:command:`ipcluster create`) and configured before
578 603 using this class. Configuration is done by editing the
579 604 configuration files in the top level of the cluster directory.
580 605
581 606 Parameters
582 607 ----------
583 608 profile : str
584 609 The name of a cluster directory profile (default="default"). The
585 610 cluster directory "cluster_<profile>" will be searched for
586 611 in ``os.getcwd()``, the ipythondir and then in the directories
587 612 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
588 613 cluster_dir : str
589 614 The full path to a cluster directory. This is useful if profiles
590 615 are not being used.
591 616 ipythondir : str
592 617 The location of the ipythondir if different from the default.
593 618 This is used if the cluster directory is being found by profile.
594 619 auto_create : bool
595 620 Automatically create the cluster directory it is dones't exist.
596 621 This will usually only make sense if using a local cluster
597 622 (default=False).
598 623 auto_stop : bool
599 624 Automatically stop the cluster when this instance is garbage
600 625 collected (default=True). This is useful if you want the cluster
601 626 to live beyond your current process. There is also an instance
602 627 attribute ``auto_stop`` to change this behavior.
603 628 """
604 629 self.async_cluster = AsyncCluster(
605 630 profile, cluster_dir, ipythondir, auto_create, auto_stop
606 631 )
607 632 self.cluster_dir_obj = self.async_cluster.cluster_dir_obj
608 633 self.client_connector = None
609 634
610 635 def _set_auto_stop(self, value):
611 636 self.async_cluster.auto_stop = value
612 637
613 638 def _get_auto_stop(self):
614 639 return self.async_cluster.auto_stop
615 640
616 641 auto_stop = property(_get_auto_stop, _set_auto_stop)
617 642
618 643 @property
619 644 def location(self):
620 645 return self.async_cluster.location
621 646
622 647 @property
623 648 def running(self):
624 649 return self.async_cluster.running
625 650
626 651 def start(self, n=2):
627 652 """Start the IPython cluster with n engines.
628 653
629 654 Parameters
630 655 ----------
631 656 n : int
632 657 The number of engine to start.
633 658 """
634 659 return blockingCallFromThread(self.async_cluster.start, n)
635 660
636 661 def stop(self):
637 662 """Stop the IPython cluster if it is running."""
638 663 return blockingCallFromThread(self.async_cluster.stop)
639 664
640 665 def get_multiengine_client(self, delay=DELAY, max_tries=MAX_TRIES):
641 666 """Get the multiengine client for the running cluster.
642 667
643 If this fails, it means that the cluster has not finished starting.
644 Usually waiting a few seconds are re-trying will solve this.
668 This will try to attempt to the controller multiple times. If this
669 fails altogether, try looking at the following:
670 * Make sure the controller is starting properly by looking at its
671 log files.
672 * Make sure the controller is writing its FURL file in the location
673 expected by the client.
674 * Make sure a firewall on the controller's host is not blocking the
675 client from connecting.
676
677 Parameters
678 ----------
679 delay : float
680 The initial delay between re-connection attempts. Susequent delays
681 get longer according to ``delay[i] = 1.5*delay[i-1]``.
682 max_tries : int
683 The max number of re-connection attempts.
645 684 """
646 685 if self.client_connector is None:
647 686 self.client_connector = ClientConnector()
648 687 return self.client_connector.get_multiengine_client(
649 688 cluster_dir=self.cluster_dir_obj.location,
650 689 delay=delay, max_tries=max_tries
651 690 )
652 691
653 692 def get_task_client(self, delay=DELAY, max_tries=MAX_TRIES):
654 693 """Get the task client for the running cluster.
655 694
656 If this fails, it means that the cluster has not finished starting.
657 Usually waiting a few seconds are re-trying will solve this.
695 This will try to attempt to the controller multiple times. If this
696 fails altogether, try looking at the following:
697 * Make sure the controller is starting properly by looking at its
698 log files.
699 * Make sure the controller is writing its FURL file in the location
700 expected by the client.
701 * Make sure a firewall on the controller's host is not blocking the
702 client from connecting.
703
704 Parameters
705 ----------
706 delay : float
707 The initial delay between re-connection attempts. Susequent delays
708 get longer according to ``delay[i] = 1.5*delay[i-1]``.
709 max_tries : int
710 The max number of re-connection attempts.
658 711 """
659 712 if self.client_connector is None:
660 713 self.client_connector = ClientConnector()
661 714 return self.client_connector.get_task_client(
662 715 cluster_dir=self.cluster_dir_obj.location,
663 716 delay=delay, max_tries=max_tries
664 717 )
665 718
666 719 def __repr__(self):
667 720 s = "<Cluster(running=%r, location=%s)" % (self.running, self.location)
668 721 return s
669 722
670 723 def get_logs_by_name(self, name='ipcluter'):
671 724 """Get a dict of logs by process name (ipcluster, ipengine, etc.)"""
672 725 return self.async_cluster.get_logs_by_name(name)
673 726
674 727 def get_ipengine_logs(self):
675 728 """Get a dict of logs for all engines in this cluster."""
676 729 return self.async_cluster.get_ipengine_logs()
677 730
678 731 def get_ipcontroller_logs(self):
679 732 """Get a dict of logs for the controller in this cluster."""
680 733 return self.async_cluster.get_ipcontroller_logs()
681 734
682 735 def get_ipcluster_logs(self):
683 736 """Get a dict of the ipcluster logs for this cluster."""
684 737 return self.async_cluster.get_ipcluster_logs()
685 738
686 739 def get_logs(self):
687 740 """Get a dict of all logs for this cluster."""
688 741 return self.async_cluster.get_logs()
689 742
690 743 def _print_logs(self, logs):
691 744 for k, v in logs.iteritems():
692 745 print "==================================="
693 746 print "Logfile: %s" % k
694 747 print "==================================="
695 748 print v
696 749 print
697 750
698 751 def print_ipengine_logs(self):
699 752 """Print the ipengine logs for this cluster to stdout."""
700 753 self._print_logs(self.get_ipengine_logs())
701 754
702 755 def print_ipcontroller_logs(self):
703 756 """Print the ipcontroller logs for this cluster to stdout."""
704 757 self._print_logs(self.get_ipcontroller_logs())
705 758
706 759 def print_ipcluster_logs(self):
707 760 """Print the ipcluster logs for this cluster to stdout."""
708 761 self._print_logs(self.get_ipcluster_logs())
709 762
710 763 def print_logs(self):
711 764 """Print all the logs for this cluster to stdout."""
712 765 self._print_logs(self.get_logs())
713 766
@@ -1,628 +1,700 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 Facilities for launching processing asynchronously.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import re
20 20 import sys
21 21
22 22 from IPython.core.component import Component
23 23 from IPython.external import Itpl
24 24 from IPython.utils.traitlets import Str, Int, List, Unicode
25 25 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
26 26
27 27 from twisted.internet import reactor, defer
28 28 from twisted.internet.defer import inlineCallbacks
29 29 from twisted.internet.protocol import ProcessProtocol
30 30 from twisted.internet.utils import getProcessOutput
31 31 from twisted.internet.error import ProcessDone, ProcessTerminated
32 32 from twisted.python import log
33 33 from twisted.python.failure import Failure
34 34
35 35 #-----------------------------------------------------------------------------
36 36 # Generic launchers
37 37 #-----------------------------------------------------------------------------
38 38
39 39
40 40 class LauncherError(Exception):
41 41 pass
42 42
43 43
44 44 class ProcessStateError(LauncherError):
45 45 pass
46 46
47 47
48 48 class UnknownStatus(LauncherError):
49 49 pass
50 50
51 51
52 52 class BaseLauncher(Component):
53 53 """An asbtraction for starting, stopping and signaling a process."""
54 54
55 # A directory for files related to the process. But, we don't cd to
56 # this directory,
55 57 working_dir = Unicode(u'')
56 58
57 59 def __init__(self, working_dir, parent=None, name=None, config=None):
58 60 super(BaseLauncher, self).__init__(parent, name, config)
59 61 self.working_dir = working_dir
60 62 self.state = 'before' # can be before, running, after
61 63 self.stop_deferreds = []
62 64 self.start_data = None
63 65 self.stop_data = None
64 66
65 67 @property
66 68 def args(self):
67 """A list of cmd and args that will be used to start the process."""
69 """A list of cmd and args that will be used to start the process.
70
71 This is what is passed to :func:`spawnProcess` and the first element
72 will be the process name.
73 """
68 74 return self.find_args()
69 75
70 76 def find_args(self):
71 """The ``.args`` property calls this to find the args list."""
77 """The ``.args`` property calls this to find the args list.
78
79 Subcommand should implement this to construct the cmd and args.
80 """
72 81 raise NotImplementedError('find_args must be implemented in a subclass')
73 82
74 83 @property
75 84 def arg_str(self):
76 85 """The string form of the program arguments."""
77 86 return ' '.join(self.args)
78 87
79 88 @property
80 89 def running(self):
90 """Am I running."""
81 91 if self.state == 'running':
82 92 return True
83 93 else:
84 94 return False
85 95
86 96 def start(self):
87 97 """Start the process.
88 98
89 99 This must return a deferred that fires with information about the
90 process starting (like a pid, job id, etc.)
100 process starting (like a pid, job id, etc.).
91 101 """
92 102 return defer.fail(
93 103 Failure(NotImplementedError(
94 104 'start must be implemented in a subclass')
95 105 )
96 106 )
97 107
98 108 def stop(self):
99 """Stop the process and notify observers of ProcessStopped.
109 """Stop the process and notify observers of stopping.
100 110
101 This must return a deferred that fires with any errors that occur
102 while the process is attempting to be shut down. This deferred
103 won't fire when the process actually stops. These events are
104 handled by calling :func:`observe_stop`.
111 This must return a deferred that fires with information about the
112 processing stopping, like errors that occur while the process is
113 attempting to be shut down. This deferred won't fire when the process
114 actually stops. To observe the actual process stopping, see
115 :func:`observe_stop`.
105 116 """
106 117 return defer.fail(
107 118 Failure(NotImplementedError(
108 119 'stop must be implemented in a subclass')
109 120 )
110 121 )
111 122
112 123 def observe_stop(self):
113 124 """Get a deferred that will fire when the process stops.
114 125
115 126 The deferred will fire with data that contains information about
116 127 the exit status of the process.
117 128 """
118 129 if self.state=='after':
119 130 return defer.succeed(self.stop_data)
120 131 else:
121 132 d = defer.Deferred()
122 133 self.stop_deferreds.append(d)
123 134 return d
124 135
125 136 def notify_start(self, data):
126 """Call this to tigger startup actions.
137 """Call this to trigger startup actions.
127 138
128 This logs the process startup and sets the state to running. It is
139 This logs the process startup and sets the state to 'running'. It is
129 140 a pass-through so it can be used as a callback.
130 141 """
131 142
132 143 log.msg('Process %r started: %r' % (self.args[0], data))
133 144 self.start_data = data
134 145 self.state = 'running'
135 146 return data
136 147
137 148 def notify_stop(self, data):
138 """Call this to trigger all the deferreds from :func:`observe_stop`."""
149 """Call this to trigger process stop actions.
150
151 This logs the process stopping and sets the state to 'after'. Call
152 this to trigger all the deferreds from :func:`observe_stop`."""
139 153
140 154 log.msg('Process %r stopped: %r' % (self.args[0], data))
141 155 self.stop_data = data
142 156 self.state = 'after'
143 157 for i in range(len(self.stop_deferreds)):
144 158 d = self.stop_deferreds.pop()
145 159 d.callback(data)
146 160 return data
147 161
148 162 def signal(self, sig):
149 163 """Signal the process.
150 164
151 165 Return a semi-meaningless deferred after signaling the process.
152 166
153 167 Parameters
154 168 ----------
155 169 sig : str or int
156 170 'KILL', 'INT', etc., or any signal number
157 171 """
158 172 return defer.fail(
159 173 Failure(NotImplementedError(
160 174 'signal must be implemented in a subclass')
161 175 )
162 176 )
163 177
164 178
165 179 class LocalProcessLauncherProtocol(ProcessProtocol):
166 180 """A ProcessProtocol to go with the LocalProcessLauncher."""
167 181
168 182 def __init__(self, process_launcher):
169 183 self.process_launcher = process_launcher
170 184 self.pid = None
171 185
172 186 def connectionMade(self):
173 187 self.pid = self.transport.pid
174 188 self.process_launcher.notify_start(self.transport.pid)
175 189
176 190 def processEnded(self, status):
177 191 value = status.value
178 192 if isinstance(value, ProcessDone):
179 193 self.process_launcher.notify_stop(
180 194 {'exit_code':0,
181 195 'signal':None,
182 196 'status':None,
183 197 'pid':self.pid
184 198 }
185 199 )
186 200 elif isinstance(value, ProcessTerminated):
187 201 self.process_launcher.notify_stop(
188 202 {'exit_code':value.exitCode,
189 203 'signal':value.signal,
190 204 'status':value.status,
191 205 'pid':self.pid
192 206 }
193 207 )
194 208 else:
195 209 raise UnknownStatus("Unknown exit status, this is probably a "
196 210 "bug in Twisted")
197 211
198 212 def outReceived(self, data):
199 213 log.msg(data)
200 214
201 215 def errReceived(self, data):
202 216 log.err(data)
203 217
204 218
205 219 class LocalProcessLauncher(BaseLauncher):
206 220 """Start and stop an external process in an asynchronous manner."""
207 221
222 # This is used to to construct self.args, which is passed to
223 # spawnProcess.
208 224 cmd_and_args = List([])
209 225
210 226 def __init__(self, working_dir, parent=None, name=None, config=None):
211 227 super(LocalProcessLauncher, self).__init__(
212 228 working_dir, parent, name, config
213 229 )
214 230 self.process_protocol = None
215 231 self.start_deferred = None
216 232
217 233 def find_args(self):
218 234 return self.cmd_and_args
219 235
220 236 def start(self):
221 237 if self.state == 'before':
222 238 self.process_protocol = LocalProcessLauncherProtocol(self)
223 239 self.start_deferred = defer.Deferred()
224 240 self.process_transport = reactor.spawnProcess(
225 241 self.process_protocol,
226 242 str(self.args[0]),
227 243 [str(a) for a in self.args],
228 244 env=os.environ
229 245 )
230 246 return self.start_deferred
231 247 else:
232 248 s = 'The process was already started and has state: %r' % self.state
233 249 return defer.fail(ProcessStateError(s))
234 250
235 251 def notify_start(self, data):
236 252 super(LocalProcessLauncher, self).notify_start(data)
237 253 self.start_deferred.callback(data)
238 254
239 255 def stop(self):
240 256 return self.interrupt_then_kill()
241 257
242 258 @make_deferred
243 259 def signal(self, sig):
244 260 if self.state == 'running':
245 261 self.process_transport.signalProcess(sig)
246 262
247 263 @inlineCallbacks
248 264 def interrupt_then_kill(self, delay=2.0):
265 """Send INT, wait a delay and then send KILL."""
249 266 yield self.signal('INT')
250 267 yield sleep_deferred(delay)
251 268 yield self.signal('KILL')
252 269
253 270
254 271 class MPIExecLauncher(LocalProcessLauncher):
272 """Launch an external process using mpiexec."""
255 273
274 # The mpiexec command to use in starting the process.
256 275 mpi_cmd = List(['mpiexec'], config=True)
276 # The command line arguments to pass to mpiexec.
257 277 mpi_args = List([], config=True)
278 # The program to start using mpiexec.
258 279 program = List(['date'], config=True)
280 # The command line argument to the program.
259 281 program_args = List([], config=True)
282 # The number of instances of the program to start.
260 283 n = Int(1, config=True)
261 284
262 285 def find_args(self):
286 """Build self.args using all the fields."""
263 287 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
264 288 self.program + self.program_args
265 289
266 290 def start(self, n):
291 """Start n instances of the program using mpiexec."""
267 292 self.n = n
268 293 return super(MPIExecLauncher, self).start()
269 294
270 295
271 296 class SSHLauncher(BaseLauncher):
272 297 """A minimal launcher for ssh.
273 298
274 299 To be useful this will probably have to be extended to use the ``sshx``
275 300 idea for environment variables. There could be other things this needs
276 301 as well.
277 302 """
278 303
279 304 ssh_cmd = List(['ssh'], config=True)
280 305 ssh_args = List([], config=True)
281 306 program = List(['date'], config=True)
282 307 program_args = List([], config=True)
283 308 hostname = Str('', config=True)
284 309 user = Str(os.environ['USER'], config=True)
285 310 location = Str('')
286 311
287 312 def _hostname_changed(self, name, old, new):
288 313 self.location = '%s@%s' % (self.user, new)
289 314
290 315 def _user_changed(self, name, old, new):
291 316 self.location = '%s@%s' % (new, self.hostname)
292 317
293 318 def find_args(self):
294 319 return self.ssh_cmd + self.ssh_args + [self.location] + \
295 320 self.program + self.program_args
296 321
297 322 def start(self, n, hostname=None, user=None):
298 323 if hostname is not None:
299 324 self.hostname = hostname
300 325 if user is not None:
301 326 self.user = user
302 327 return super(SSHLauncher, self).start()
303 328
304 329
305 330 class WindowsHPCLauncher(BaseLauncher):
306 331 pass
307 332
308 333
309 334 class BatchSystemLauncher(BaseLauncher):
335 """Launch an external process using a batch system.
336
337 This class is designed to work with UNIX batch systems like PBS, LSF,
338 GridEngine, etc. The overall model is that there are different commands
339 like qsub, qdel, etc. that handle the starting and stopping of the process.
340
341 This class also has the notion of a batch script. The ``batch_template``
342 attribute can be set to a string that is a template for the batch script.
343 This template is instantiated using Itpl. Thus the template can use
344 ${n} fot the number of instances. Subclasses can add additional variables
345 to the template dict.
346 """
310 347
311 348 # Subclasses must fill these in. See PBSEngineSet
349 # The name of the command line program used to submit jobs.
312 350 submit_command = Str('', config=True)
351 # The name of the command line program used to delete jobs.
313 352 delete_command = Str('', config=True)
353 # A regular expression used to get the job id from the output of the
354 # submit_command.
314 355 job_id_regexp = Str('', config=True)
356 # The string that is the batch script template itself.
315 357 batch_template = Str('', config=True)
358 # The filename of the instantiated batch script.
316 359 batch_file_name = Unicode(u'batch_script', config=True)
360 # The full path to the instantiated batch script.
317 361 batch_file = Unicode(u'')
318 362
319 363 def __init__(self, working_dir, parent=None, name=None, config=None):
320 364 super(BatchSystemLauncher, self).__init__(
321 365 working_dir, parent, name, config
322 366 )
323 367 self.batch_file = os.path.join(self.working_dir, self.batch_file_name)
324 368 self.context = {}
325 369
326 370 def parse_job_id(self, output):
371 """Take the output of the submit command and return the job id."""
327 372 m = re.match(self.job_id_regexp, output)
328 373 if m is not None:
329 374 job_id = m.group()
330 375 else:
331 376 raise LauncherError("Job id couldn't be determined: %s" % output)
332 377 self.job_id = job_id
333 378 log.msg('Job started with job id: %r' % job_id)
334 379 return job_id
335 380
336 381 def write_batch_script(self, n):
382 """Instantiate and write the batch script to the working_dir."""
337 383 self.context['n'] = n
338 384 script_as_string = Itpl.itplns(self.batch_template, self.context)
339 385 log.msg('Writing instantiated batch script: %s' % self.batch_file)
340 386 f = open(self.batch_file, 'w')
341 387 f.write(script_as_string)
342 388 f.close()
343 389
344 390 @inlineCallbacks
345 391 def start(self, n):
346 392 """Start n copies of the process using a batch system."""
347 393 self.write_batch_script(n)
348 394 output = yield getProcessOutput(self.submit_command,
349 395 [self.batch_file], env=os.environ)
350 396 job_id = self.parse_job_id(output)
351 397 self.notify_start(job_id)
352 398 defer.returnValue(job_id)
353 399
354 400 @inlineCallbacks
355 401 def stop(self):
356 402 output = yield getProcessOutput(self.delete_command,
357 403 [self.job_id], env=os.environ
358 404 )
359 405 self.notify_stop(output) # Pass the output of the kill cmd
360 406 defer.returnValue(output)
361 407
362 408
363 409 class PBSLauncher(BatchSystemLauncher):
410 """A BatchSystemLauncher subclass for PBS."""
364 411
365 412 submit_command = Str('qsub', config=True)
366 413 delete_command = Str('qdel', config=True)
367 414 job_id_regexp = Str('\d+', config=True)
368 415 batch_template = Str('', config=True)
369 416 batch_file_name = Unicode(u'pbs_batch_script', config=True)
370 417 batch_file = Unicode(u'')
371 418
372 419
373 420 #-----------------------------------------------------------------------------
374 421 # Controller launchers
375 422 #-----------------------------------------------------------------------------
376 423
377 424 def find_controller_cmd():
425 """Find the command line ipcontroller program in a cross platform way."""
378 426 if sys.platform == 'win32':
379 427 # This logic is needed because the ipcontroller script doesn't
380 428 # always get installed in the same way or in the same location.
381 429 from IPython.kernel import ipcontrollerapp
382 430 script_location = ipcontrollerapp.__file__.replace('.pyc', '.py')
383 431 # The -u option here turns on unbuffered output, which is required
384 432 # on Win32 to prevent wierd conflict and problems with Twisted.
385 433 # Also, use sys.executable to make sure we are picking up the
386 434 # right python exe.
387 435 cmd = [sys.executable, '-u', script_location]
388 436 else:
389 437 # ipcontroller has to be on the PATH in this case.
390 438 cmd = ['ipcontroller']
391 439 return cmd
392 440
393 441
394 442 class LocalControllerLauncher(LocalProcessLauncher):
443 """Launch a controller as a regular external process."""
395 444
396 445 controller_cmd = List(find_controller_cmd())
446 # Command line arguments to ipcontroller.
397 447 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
398 448
399 449 def find_args(self):
400 450 return self.controller_cmd + self.controller_args
401 451
402 452 def start(self, profile=None, cluster_dir=None):
453 """Start the controller by profile or cluster_dir."""
403 454 if cluster_dir is not None:
404 455 self.controller_args.extend(['--cluster-dir', cluster_dir])
405 456 if profile is not None:
406 457 self.controller_args.extend(['--profile', profile])
407 458 log.msg("Starting LocalControllerLauncher: %r" % self.args)
408 459 return super(LocalControllerLauncher, self).start()
409 460
410 461
411 462 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
412 463 pass
413 464
414 465
415 466 class MPIExecControllerLauncher(MPIExecLauncher):
467 """Launch a controller using mpiexec."""
416 468
417 469 controller_cmd = List(find_controller_cmd(), config=False)
470 # Command line arguments to ipcontroller.
418 471 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
419 472 n = Int(1, config=False)
420 473
421 474 def start(self, profile=None, cluster_dir=None):
475 """Start the controller by profile or cluster_dir."""
422 476 if cluster_dir is not None:
423 477 self.controller_args.extend(['--cluster-dir', cluster_dir])
424 478 if profile is not None:
425 479 self.controller_args.extend(['--profile', profile])
426 480 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
427 481 return super(MPIExecControllerLauncher, self).start(1)
428 482
429
430 483 def find_args(self):
431 484 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
432 485 self.controller_cmd + self.controller_args
433 486
434 487
435 488 class PBSControllerLauncher(PBSLauncher):
489 """Launch a controller using PBS."""
490
491 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
436 492
437 493 def start(self, profile=None, cluster_dir=None):
494 """Start the controller by profile or cluster_dir."""
438 495 # Here we save profile and cluster_dir in the context so they
439 496 # can be used in the batch script template as ${profile} and
440 497 # ${cluster_dir}
441 498 if cluster_dir is not None:
442 499 self.context['cluster_dir'] = cluster_dir
443 500 if profile is not None:
444 501 self.context['profile'] = profile
445 502 log.msg("Starting PBSControllerLauncher: %r" % self.args)
446 503 return super(PBSControllerLauncher, self).start(1)
447 504
448 505
449 506 class SSHControllerLauncher(SSHLauncher):
450 507 pass
451 508
452 509
453 510 #-----------------------------------------------------------------------------
454 511 # Engine launchers
455 512 #-----------------------------------------------------------------------------
456 513
457 514
458 515 def find_engine_cmd():
516 """Find the command line ipengine program in a cross platform way."""
459 517 if sys.platform == 'win32':
460 518 # This logic is needed because the ipengine script doesn't
461 519 # always get installed in the same way or in the same location.
462 520 from IPython.kernel import ipengineapp
463 521 script_location = ipengineapp.__file__.replace('.pyc', '.py')
464 522 # The -u option here turns on unbuffered output, which is required
465 523 # on Win32 to prevent wierd conflict and problems with Twisted.
466 524 # Also, use sys.executable to make sure we are picking up the
467 525 # right python exe.
468 526 cmd = [sys.executable, '-u', script_location]
469 527 else:
470 528 # ipcontroller has to be on the PATH in this case.
471 529 cmd = ['ipengine']
472 530 return cmd
473 531
474 532
475 533 class LocalEngineLauncher(LocalProcessLauncher):
534 """Launch a single engine as a regular externall process."""
476 535
477 536 engine_cmd = List(find_engine_cmd())
537 # Command line arguments for ipengine.
478 538 engine_args = List(
479 539 ['--log-to-file','--log-level', '40'], config=True
480 540 )
481 541
482 542 def find_args(self):
483 543 return self.engine_cmd + self.engine_args
484 544
485 545 def start(self, profile=None, cluster_dir=None):
546 """Start the engine by profile or cluster_dir."""
486 547 if cluster_dir is not None:
487 548 self.engine_args.extend(['--cluster-dir', cluster_dir])
488 549 if profile is not None:
489 550 self.engine_args.extend(['--profile', profile])
490 551 return super(LocalEngineLauncher, self).start()
491 552
492 553
493 554 class LocalEngineSetLauncher(BaseLauncher):
555 """Launch a set of engines as regular external processes."""
494 556
557 # Command line arguments for ipengine.
495 558 engine_args = List(
496 559 ['--log-to-file','--log-level', '40'], config=True
497 560 )
498 561
499 562 def __init__(self, working_dir, parent=None, name=None, config=None):
500 563 super(LocalEngineSetLauncher, self).__init__(
501 564 working_dir, parent, name, config
502 565 )
503 566 self.launchers = []
504 567
505 568 def start(self, n, profile=None, cluster_dir=None):
569 """Start n engines by profile or cluster_dir."""
506 570 dlist = []
507 571 for i in range(n):
508 572 el = LocalEngineLauncher(self.working_dir, self)
509 573 # Copy the engine args over to each engine launcher.
510 574 import copy
511 575 el.engine_args = copy.deepcopy(self.engine_args)
512 576 d = el.start(profile, cluster_dir)
513 577 if i==0:
514 578 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
515 579 self.launchers.append(el)
516 580 dlist.append(d)
517 581 # The consumeErrors here could be dangerous
518 582 dfinal = gatherBoth(dlist, consumeErrors=True)
519 583 dfinal.addCallback(self.notify_start)
520 584 return dfinal
521 585
522 586 def find_args(self):
523 587 return ['engine set']
524 588
525 589 def signal(self, sig):
526 590 dlist = []
527 591 for el in self.launchers:
528 592 d = el.signal(sig)
529 593 dlist.append(d)
530 594 dfinal = gatherBoth(dlist, consumeErrors=True)
531 595 return dfinal
532 596
533 597 def interrupt_then_kill(self, delay=1.0):
534 598 dlist = []
535 599 for el in self.launchers:
536 600 d = el.interrupt_then_kill(delay)
537 601 dlist.append(d)
538 602 dfinal = gatherBoth(dlist, consumeErrors=True)
539 603 return dfinal
540 604
541 605 def stop(self):
542 606 return self.interrupt_then_kill()
543 607
544 608 def observe_stop(self):
545 609 dlist = [el.observe_stop() for el in self.launchers]
546 610 dfinal = gatherBoth(dlist, consumeErrors=False)
547 611 dfinal.addCallback(self.notify_stop)
548 612 return dfinal
549 613
550 614
551 615 class MPIExecEngineSetLauncher(MPIExecLauncher):
552 616
553 617 engine_cmd = List(find_engine_cmd(), config=False)
618 # Command line arguments for ipengine.
554 619 engine_args = List(
555 620 ['--log-to-file','--log-level', '40'], config=True
556 621 )
557 622 n = Int(1, config=True)
558 623
559 624 def start(self, n, profile=None, cluster_dir=None):
625 """Start n engines by profile or cluster_dir."""
560 626 if cluster_dir is not None:
561 627 self.engine_args.extend(['--cluster-dir', cluster_dir])
562 628 if profile is not None:
563 629 self.engine_args.extend(['--profile', profile])
564 630 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
565 631 return super(MPIExecEngineSetLauncher, self).start(n)
566 632
567 633 def find_args(self):
568 634 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
569 635 self.engine_cmd + self.engine_args
570 636
571 637
572 638 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
573 639 pass
574 640
575 641
576 642 class PBSEngineSetLauncher(PBSLauncher):
577 643
644 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
645
578 646 def start(self, n, profile=None, cluster_dir=None):
647 """Start n engines by profile or cluster_dir."""
579 648 if cluster_dir is not None:
580 649 self.program_args.extend(['--cluster-dir', cluster_dir])
581 650 if profile is not None:
582 651 self.program_args.extend(['-p', profile])
583 652 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
584 653 return super(PBSEngineSetLauncher, self).start(n)
585 654
586 655
587 656 class SSHEngineSetLauncher(BaseLauncher):
588 657 pass
589 658
590 659
591 660 #-----------------------------------------------------------------------------
592 661 # A launcher for ipcluster itself!
593 662 #-----------------------------------------------------------------------------
594 663
595 664
596 665 def find_ipcluster_cmd():
666 """Find the command line ipcluster program in a cross platform way."""
597 667 if sys.platform == 'win32':
598 668 # This logic is needed because the ipcluster script doesn't
599 669 # always get installed in the same way or in the same location.
600 670 from IPython.kernel import ipclusterapp
601 671 script_location = ipclusterapp.__file__.replace('.pyc', '.py')
602 672 # The -u option here turns on unbuffered output, which is required
603 673 # on Win32 to prevent wierd conflict and problems with Twisted.
604 674 # Also, use sys.executable to make sure we are picking up the
605 675 # right python exe.
606 676 cmd = [sys.executable, '-u', script_location]
607 677 else:
608 678 # ipcontroller has to be on the PATH in this case.
609 679 cmd = ['ipcluster']
610 680 return cmd
611 681
612 682
613 683 class IPClusterLauncher(LocalProcessLauncher):
684 """Launch the ipcluster program in an external process."""
614 685
615 686 ipcluster_cmd = List(find_ipcluster_cmd())
687 # Command line arguments to pass to ipcluster.
616 688 ipcluster_args = List(
617 689 ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True)
618 690 ipcluster_subcommand = Str('start')
619 691 ipcluster_n = Int(2)
620 692
621 693 def find_args(self):
622 694 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
623 695 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
624 696
625 697 def start(self):
626 698 log.msg("Starting ipcluster: %r" % self.args)
627 699 return super(IPClusterLauncher, self).start()
628 700
General Comments 0
You need to be logged in to leave comments. Login now