##// END OF EJS Templates
Work on default config files and docstrings....
Brian Granger -
Show More
@@ -1,67 +1,155 b''
1 import os
1 import os
2
2
3 c = get_config()
3 c = get_config()
4
4
5 # Options are:
5 #-----------------------------------------------------------------------------
6 # * LocalControllerLauncher
6 # Select which launchers to use
7 # * PBSControllerLauncher
7 #-----------------------------------------------------------------------------
8
9 # This allows you to control what method is used to start the controller
10 # and engines. The following methods are currently supported:
11 # * Start as a regular process on localhost.
12 # * Start using mpiexec.
13 # * Start using PBS
14 # * Start using SSH (currently broken)
15
16 # The selected launchers can be configured below.
17
18 # Options are (LocalControllerLauncher, MPIExecControllerLauncher,
19 # PBSControllerLauncher)
8 # c.Global.controller_launcher = 'IPython.kernel.launcher.LocalControllerLauncher'
20 # c.Global.controller_launcher = 'IPython.kernel.launcher.LocalControllerLauncher'
9
21
10 # Options are:
22 # Options are (LocalEngineSetLauncher, MPIExecEngineSetLauncher,
11 # * LocalEngineSetLauncher
23 # PBSEngineSetLauncher)
12 # * MPIExecEngineSetLauncher
13 # * PBSEngineSetLauncher
14 # c.Global.engine_launcher = 'IPython.kernel.launcher.LocalEngineSetLauncher'
24 # c.Global.engine_launcher = 'IPython.kernel.launcher.LocalEngineSetLauncher'
15
25
16 # c.Global.log_to_file = False
26 #-----------------------------------------------------------------------------
27 # Global configuration
28 #-----------------------------------------------------------------------------
29
30 # The default number of engine that will be started. This is overridden by
31 # the -n command line option: "ipcluster start -n 4"
17 # c.Global.n = 2
32 # c.Global.n = 2
18 # c.Global.reset_config = False
19 # c.Global.clean_logs = True
20
33
21 # c.MPIExecLauncher.mpi_cmd = ['mpiexec']
34 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
22 # c.MPIExecLauncher.mpi_args = []
35 # c.Global.log_to_file = False
23 # c.MPIExecLauncher.program = []
24 # c.MPIExecLauncher.program_args = []
25 # c.MPIExecLauncher.n = 1
26
36
27 # c.SSHLauncher.ssh_cmd = ['ssh']
37 # Remove old logs from cluster_dir/log before starting.
28 # c.SSHLauncher.ssh_args = []
38 # c.Global.clean_logs = True
29 # c.SSHLauncher.program = []
30 # s.SSHLauncher.program_args = []
31 # c.SSHLauncher.hostname = ''
32 # c.SSHLauncher.user = os.environ['USER']
33
39
34 # c.PBSLauncher.submit_command = 'qsub'
40 #-----------------------------------------------------------------------------
35 # c.PBSLauncher.delete_command = 'qdel'
41 # Controller launcher configuration
36 # c.PBSLauncher.job_id_regexp = '\d+'
42 #-----------------------------------------------------------------------------
37 # c.PBSLauncher.batch_template = """"""
38 # c.PBSLauncher.batch_file_name = u'pbs_batch_script'
39
43
40 # c.LocalControllerLauncher.controller_args = []
44 # Configure how the controller is started. The configuration of the controller
45 # can also bet setup by editing the controller config file:
46 # ipcontroller_config.py
41
47
48 # The command line arguments to call the controller with.
49 # c.LocalControllerLauncher.controller_args = \
50 # ['--log-to-file','--log-level', '40']
51
52 # The mpiexec/mpirun command to use in started the controller.
42 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
53 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
54
55 # Additional arguments to pass to the actual mpiexec command.
43 # c.MPIExecControllerLauncher.mpi_args = []
56 # c.MPIExecControllerLauncher.mpi_args = []
44 # c.MPIExecControllerLauncher.controller_args = []
45 # c.MPIExecControllerLauncher.n = 1
46
57
58 # The command line argument to call the controller with.
59 # c.MPIExecControllerLauncher.controller_args = \
60 # ['--log-to-file','--log-level', '40']
61
62 # The command line program to use to submit a PBS job.
47 # c.PBSControllerLauncher.submit_command = 'qsub'
63 # c.PBSControllerLauncher.submit_command = 'qsub'
64
65 # The command line program to use to delete a PBS job.
48 # c.PBSControllerLauncher.delete_command = 'qdel'
66 # c.PBSControllerLauncher.delete_command = 'qdel'
67
68 # A regular expression that takes the output of qsub and find the job id.
49 # c.PBSControllerLauncher.job_id_regexp = '\d+'
69 # c.PBSControllerLauncher.job_id_regexp = '\d+'
70
71 # The batch submission script used to start the controller. This is where
72 # environment variables would be setup, etc. This string is interpolated using
73 # the Itpl module in IPython.external. Basically, you can use ${profile} for
74 # the controller profile or ${cluster_dir} for the cluster_dir.
50 # c.PBSControllerLauncher.batch_template = """"""
75 # c.PBSControllerLauncher.batch_template = """"""
51 # c.PBSLauncher.batch_file_name = u'pbs_batch_script'
52
76
53 # c.LocalEngineLauncher.engine_args = []
77 # The name of the instantiated batch script that will actually be used to
78 # submit the job. This will be written to the cluster directory.
79 # c.PBSControllerLauncher.batch_file_name = u'pbs_batch_script_controller'
80
81 #-----------------------------------------------------------------------------
82 # Engine launcher configuration
83 #-----------------------------------------------------------------------------
54
84
55 # c.LocalEngineSetLauncher.engine_args = []
85 # Command line argument passed to the engines.
86 # c.LocalEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
56
87
88 # The mpiexec/mpirun command to use in started the controller.
57 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
89 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
90
91 # Additional arguments to pass to the actual mpiexec command.
58 # c.MPIExecEngineSetLauncher.mpi_args = []
92 # c.MPIExecEngineSetLauncher.mpi_args = []
59 # c.MPIExecEngineSetLauncher.controller_args = []
93
94 # Command line argument passed to the engines.
95 # c.MPIExecEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
96
97 # The default number of engines to start if not given elsewhere.
60 # c.MPIExecEngineSetLauncher.n = 1
98 # c.MPIExecEngineSetLauncher.n = 1
61
99
100 # The command line program to use to submit a PBS job.
62 # c.PBSEngineSetLauncher.submit_command = 'qsub'
101 # c.PBSEngineSetLauncher.submit_command = 'qsub'
102
103 # The command line program to use to delete a PBS job.
63 # c.PBSEngineSetLauncher.delete_command = 'qdel'
104 # c.PBSEngineSetLauncher.delete_command = 'qdel'
105
106 # A regular expression that takes the output of qsub and find the job id.
64 # c.PBSEngineSetLauncher.job_id_regexp = '\d+'
107 # c.PBSEngineSetLauncher.job_id_regexp = '\d+'
108
109 # The batch submission script used to start the engines. This is where
110 # environment variables would be setup, etc. This string is interpolated using
111 # the Itpl module in IPython.external. Basically, you can use ${n} for the
112 # number of engine, ${profile} or the engine profile and ${cluster_dir}
113 # for the cluster_dir.
65 # c.PBSEngineSetLauncher.batch_template = """"""
114 # c.PBSEngineSetLauncher.batch_template = """"""
66 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script'
115
116 # The name of the instantiated batch script that will actually be used to
117 # submit the job. This will be written to the cluster directory.
118 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script_engines'
119
120 #-----------------------------------------------------------------------------
121 # Base launcher configuration
122 #-----------------------------------------------------------------------------
123
124 # The various launchers are organized into an inheritance hierarchy.
125 # The configurations can also be iherited and the following attributes
126 # allow you to configure the base classes.
127
128 # c.MPIExecLauncher.mpi_cmd = ['mpiexec']
129 # c.MPIExecLauncher.mpi_args = []
130 # c.MPIExecLauncher.program = []
131 # c.MPIExecLauncher.program_args = []
132 # c.MPIExecLauncher.n = 1
133
134 # c.SSHLauncher.ssh_cmd = ['ssh']
135 # c.SSHLauncher.ssh_args = []
136 # c.SSHLauncher.program = []
137 # s.SSHLauncher.program_args = []
138 # c.SSHLauncher.hostname = ''
139 # c.SSHLauncher.user = os.environ['USER']
140
141 # c.BatchSystemLauncher.submit_command
142 # c.BatchSystemLauncher.delete_command
143 # c.BatchSystemLauncher.job_id_regexp
144 # c.BatchSystemLauncher.batch_template
145 # c.BatchSystemLauncher.batch_file_name
146
147 # c.PBSLauncher.submit_command = 'qsub'
148 # c.PBSLauncher.delete_command = 'qdel'
149 # c.PBSLauncher.job_id_regexp = '\d+'
150 # c.PBSLauncher.batch_template = """"""
151 # c.PBSLauncher.batch_file_name = u'pbs_batch_script'
152
153
154
67
155
@@ -1,68 +1,132 b''
1 from IPython.config.loader import Config
1 from IPython.config.loader import Config
2
2
3 c = get_config()
3 c = get_config()
4
4
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Global configuration
6 # Global configuration
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 # Basic Global config attributes
9 # Basic Global config attributes
10
11 # Start up messages are logged to stdout using the logging module.
12 # These all happen before the twisted reactor is started and are
13 # useful for debugging purposes. Can be (10=DEBUG,20=INFO,30=WARN,40=CRITICAL)
14 # and smaller is more verbose.
15 # c.Global.log_level = 20
16
17 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
10 # c.Global.log_to_file = False
18 # c.Global.log_to_file = False
19
20 # Remove old logs from cluster_dir/log before starting.
11 # c.Global.clean_logs = True
21 # c.Global.clean_logs = True
22
23 # A list of Python statements that will be run before starting the
24 # controller. This is provided because occasionally certain things need to
25 # be imported in the controller for pickling to work.
12 # c.Global.import_statements = ['import math']
26 # c.Global.import_statements = ['import math']
27
28 # Reuse the controller's FURL files. If False, FURL files are regenerated
29 # each time the controller is run. If True, they will be reused, *but*, you
30 # also must set the network ports by hand. If set, this will override the
31 # values set for the client and engine connections below.
13 # c.Global.reuse_furls = True
32 # c.Global.reuse_furls = True
33
34 # Enable SSL encryption on all connections to the controller. If set, this
35 # will override the values set for the client and engine connections below.
14 # c.Global.secure = True
36 # c.Global.secure = True
15
37
16 #-----------------------------------------------------------------------------
38 #-----------------------------------------------------------------------------
17 # Configure the client services
39 # Configure the client services
18 #-----------------------------------------------------------------------------
40 #-----------------------------------------------------------------------------
19
41
20 # Basic client service config attributes
42 # Basic client service config attributes
43
44 # The network interface the controller will listen on for client connections.
45 # This should be an IP address or hostname of the controller's host. The empty
46 # string means listen on all interfaces.
21 # c.FCClientServiceFactory.ip = ''
47 # c.FCClientServiceFactory.ip = ''
48
49 # The TCP/IP port the controller will listen on for client connections. If 0
50 # a random port will be used. If the controller's host has a firewall running
51 # it must allow incoming traffic on this port.
22 # c.FCClientServiceFactory.port = 0
52 # c.FCClientServiceFactory.port = 0
53
54 # The client learns how to connect to the controller by looking at the
55 # location field embedded in the FURL. If this field is empty, all network
56 # interfaces that the controller is listening on will be listed. To have the
57 # client connect on a particular interface, list it here.
23 # c.FCClientServiceFactory.location = ''
58 # c.FCClientServiceFactory.location = ''
59
60 # Use SSL encryption for the client connection.
24 # c.FCClientServiceFactory.secure = True
61 # c.FCClientServiceFactory.secure = True
62
63 # Reuse the client FURL each time the controller is started. If set, you must
64 # also pick a specific network port above (FCClientServiceFactory.port).
25 # c.FCClientServiceFactory.reuse_furls = False
65 # c.FCClientServiceFactory.reuse_furls = False
26
66
27 # You shouldn't have to modify the rest of this section
67 #-----------------------------------------------------------------------------
68 # Configure the engine services
69 #-----------------------------------------------------------------------------
70
71 # Basic config attributes for the engine services.
72
73 # The network interface the controller will listen on for engine connections.
74 # This should be an IP address or hostname of the controller's host. The empty
75 # string means listen on all interfaces.
76 # c.FCEngineServiceFactory.ip = ''
77
78 # The TCP/IP port the controller will listen on for engine connections. If 0
79 # a random port will be used. If the controller's host has a firewall running
80 # it must allow incoming traffic on this port.
81 # c.FCEngineServiceFactory.port = 0
82
83 # The engine learns how to connect to the controller by looking at the
84 # location field embedded in the FURL. If this field is empty, all network
85 # interfaces that the controller is listening on will be listed. To have the
86 # client connect on a particular interface, list it here.
87 # c.FCEngineServiceFactory.location = ''
88
89 # Use SSL encryption for the engine connection.
90 # c.FCEngineServiceFactory.secure = True
91
92 # Reuse the client FURL each time the controller is started. If set, you must
93 # also pick a specific network port above (FCClientServiceFactory.port).
94 # c.FCEngineServiceFactory.reuse_furls = False
95
96 #-----------------------------------------------------------------------------
97 # Developer level configuration attributes
98 #-----------------------------------------------------------------------------
99
100 # You shouldn't have to modify anything in this section. These attributes
101 # are more for developers who want to change the behavior of the controller
102 # at a fundamental level.
103
28 # c.FCClientServiceFactory.cert_file = 'ipcontroller-client.pem'
104 # c.FCClientServiceFactory.cert_file = 'ipcontroller-client.pem'
29
105
30 # default_client_interfaces = Config()
106 # default_client_interfaces = Config()
31 # default_client_interfaces.Task.interface_chain = [
107 # default_client_interfaces.Task.interface_chain = [
32 # 'IPython.kernel.task.ITaskController',
108 # 'IPython.kernel.task.ITaskController',
33 # 'IPython.kernel.taskfc.IFCTaskController'
109 # 'IPython.kernel.taskfc.IFCTaskController'
34 # ]
110 # ]
35 #
111 #
36 # default_client_interfaces.Task.furl_file = 'ipcontroller-tc.furl'
112 # default_client_interfaces.Task.furl_file = 'ipcontroller-tc.furl'
37 #
113 #
38 # default_client_interfaces.MultiEngine.interface_chain = [
114 # default_client_interfaces.MultiEngine.interface_chain = [
39 # 'IPython.kernel.multiengine.IMultiEngine',
115 # 'IPython.kernel.multiengine.IMultiEngine',
40 # 'IPython.kernel.multienginefc.IFCSynchronousMultiEngine'
116 # 'IPython.kernel.multienginefc.IFCSynchronousMultiEngine'
41 # ]
117 # ]
42 #
118 #
43 # default_client_interfaces.MultiEngine.furl_file = 'ipcontroller-mec.furl'
119 # default_client_interfaces.MultiEngine.furl_file = 'ipcontroller-mec.furl'
44 #
120 #
45 # c.FCEngineServiceFactory.interfaces = default_client_interfaces
121 # c.FCEngineServiceFactory.interfaces = default_client_interfaces
46
122
47 #-----------------------------------------------------------------------------
48 # Configure the engine services
49 #-----------------------------------------------------------------------------
50
51 # Basic config attributes for the engine services
52 # c.FCEngineServiceFactory.ip = ''
53 # c.FCEngineServiceFactory.port = 0
54 # c.FCEngineServiceFactory.location = ''
55 # c.FCEngineServiceFactory.secure = True
56 # c.FCEngineServiceFactory.reuse_furls = False
57
58 # You shouldn't have to modify the rest of this section
59 # c.FCEngineServiceFactory.cert_file = 'ipcontroller-engine.pem'
123 # c.FCEngineServiceFactory.cert_file = 'ipcontroller-engine.pem'
60
124
61 # default_engine_interfaces = Config()
125 # default_engine_interfaces = Config()
62 # default_engine_interfaces.Default.interface_chain = [
126 # default_engine_interfaces.Default.interface_chain = [
63 # 'IPython.kernel.enginefc.IFCControllerBase'
127 # 'IPython.kernel.enginefc.IFCControllerBase'
64 # ]
128 # ]
65 #
129 #
66 # default_engine_interfaces.Default.furl_file = 'ipcontroller-engine.furl'
130 # default_engine_interfaces.Default.furl_file = 'ipcontroller-engine.furl'
67 #
131 #
68 # c.FCEngineServiceFactory.interfaces = default_engine_interfaces
132 # c.FCEngineServiceFactory.interfaces = default_engine_interfaces
@@ -1,28 +1,86 b''
1 c = get_config()
1 c = get_config()
2
2
3 #-----------------------------------------------------------------------------
4 # Global configuration
5 #-----------------------------------------------------------------------------
6
7 # Start up messages are logged to stdout using the logging module.
8 # These all happen before the twisted reactor is started and are
9 # useful for debugging purposes. Can be (10=DEBUG,20=INFO,30=WARN,40=CRITICAL)
10 # and smaller is more verbose.
11 # c.Global.log_level = 20
12
13 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
3 # c.Global.log_to_file = False
14 # c.Global.log_to_file = False
4 # c.Global.clean_logs = False
15
16 # Remove old logs from cluster_dir/log before starting.
17 # c.Global.clean_logs = True
18
19 # A list of strings that will be executed in the users namespace on the engine
20 # before it connects to the controller.
5 # c.Global.exec_lines = ['import numpy']
21 # c.Global.exec_lines = ['import numpy']
6 # c.Global.log_level = 10
22
7 # c.Global.shell_class = 'IPython.kernel.core.interpreter.Interpreter'
23 # The engine will try to connect to the controller multiple times, to allow
8 # c.Global.furl_file_name = 'ipcontroller-engine.furl'
24 # the controller time to startup and write its FURL file. These parameters
9 # c.Global.furl_file = ''
25 # control the number of retries (connect_max_tries) and the initial delay
10 # The max number of connection attemps and the initial delay between
26 # (connect_delay) between attemps. The actual delay between attempts gets
27 # longer each time by a factor of 1.5 (delay[i] = 1.5*delay[i-1])
11 # those attemps.
28 # those attemps.
12 # c.Global.connect_delay = 0.1
29 # c.Global.connect_delay = 0.1
13 # c.Global.connect_max_tries = 15
30 # c.Global.connect_max_tries = 15
14
31
32 # By default, the engine will look for the controller's FURL file in its own
33 # cluster directory. Sometimes, the FURL file will be elsewhere and this
34 # attribute can be set to the full path of the FURL file.
35 # c.Global.furl_file = ''
36
37 #-----------------------------------------------------------------------------
38 # MPI configuration
39 #-----------------------------------------------------------------------------
15
40
41 # Upon starting the engine can be configured to call MPI_Init. This section
42 # configures that.
43
44 # Select which MPI section to execute to setup MPI. The value of this
45 # attribute must match the name of another attribute in the MPI config
46 # section (mpi4py, pytrilinos, etc.). This can also be set by the --mpi
47 # command line option.
16 # c.MPI.use = ''
48 # c.MPI.use = ''
49
50 # Initialize MPI using mpi4py. To use this, set c.MPI.use = 'mpi4py' to use
51 # --mpi=mpi4py at the command line.
17 # c.MPI.mpi4py = """from mpi4py import MPI as mpi
52 # c.MPI.mpi4py = """from mpi4py import MPI as mpi
18 # mpi.size = mpi.COMM_WORLD.Get_size()
53 # mpi.size = mpi.COMM_WORLD.Get_size()
19 # mpi.rank = mpi.COMM_WORLD.Get_rank()
54 # mpi.rank = mpi.COMM_WORLD.Get_rank()
20 # """
55 # """
56
57 # Initialize MPI using pytrilinos. To use this, set c.MPI.use = 'pytrilinos'
58 # to use --mpi=pytrilinos at the command line.
21 # c.MPI.pytrilinos = """from PyTrilinos import Epetra
59 # c.MPI.pytrilinos = """from PyTrilinos import Epetra
22 # class SimpleStruct:
60 # class SimpleStruct:
23 # pass
61 # pass
24 # mpi = SimpleStruct()
62 # mpi = SimpleStruct()
25 # mpi.rank = 0
63 # mpi.rank = 0
26 # mpi.size = 0
64 # mpi.size = 0
27 # """
65 # """
28
66
67 #-----------------------------------------------------------------------------
68 # Developer level configuration attributes
69 #-----------------------------------------------------------------------------
70
71 # You shouldn't have to modify anything in this section. These attributes
72 # are more for developers who want to change the behavior of the controller
73 # at a fundamental level.
74
75 # You should not have to change these attributes.
76
77 # c.Global.shell_class = 'IPython.kernel.core.interpreter.Interpreter'
78
79 # c.Global.furl_file_name = 'ipcontroller-engine.furl'
80
81
82
83
84
85
86
@@ -1,713 +1,766 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Facilities for handling client connections to the controller."""
4 """Facilities for handling client connections to the controller."""
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008-2009 The IPython Development Team
7 # Copyright (C) 2008-2009 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 from __future__ import with_statement
17 from __future__ import with_statement
18 import os
18 import os
19
19
20 from IPython.kernel.fcutil import (
20 from IPython.kernel.fcutil import (
21 Tub,
21 Tub,
22 find_furl,
22 find_furl,
23 is_valid_furl_or_file,
23 is_valid_furl_or_file,
24 validate_furl_or_file,
24 validate_furl_or_file,
25 FURLError
25 FURLError
26 )
26 )
27 from IPython.kernel.clusterdir import ClusterDir, ClusterDirError
27 from IPython.kernel.clusterdir import ClusterDir, ClusterDirError
28 from IPython.kernel.launcher import IPClusterLauncher
28 from IPython.kernel.launcher import IPClusterLauncher
29 from IPython.kernel.twistedutil import (
29 from IPython.kernel.twistedutil import (
30 gatherBoth,
30 gatherBoth,
31 make_deferred,
31 make_deferred,
32 blockingCallFromThread,
32 blockingCallFromThread,
33 sleep_deferred
33 sleep_deferred
34 )
34 )
35 from IPython.utils.importstring import import_item
35 from IPython.utils.importstring import import_item
36 from IPython.utils.genutils import get_ipython_dir
36 from IPython.utils.genutils import get_ipython_dir
37
37
38 from twisted.internet import defer
38 from twisted.internet import defer
39 from twisted.internet.defer import inlineCallbacks, returnValue
39 from twisted.internet.defer import inlineCallbacks, returnValue
40 from twisted.python import failure, log
40 from twisted.python import failure, log
41
41
42 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
43 # The ClientConnector class
43 # The ClientConnector class
44 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
45
45
46 DELAY = 0.2
46 DELAY = 0.2
47 MAX_TRIES = 9
47 MAX_TRIES = 9
48
48
49
49
50 class ClientConnectorError(Exception):
50 class ClientConnectorError(Exception):
51 pass
51 pass
52
52
53
53
54 class AsyncClientConnector(object):
54 class AsyncClientConnector(object):
55 """A class for getting remote references and clients from furls.
55 """A class for getting remote references and clients from furls.
56
56
57 This start a single :class:`Tub` for all remote reference and caches
57 This start a single :class:`Tub` for all remote reference and caches
58 references.
58 references.
59 """
59 """
60
60
61 def __init__(self):
61 def __init__(self):
62 self._remote_refs = {}
62 self._remote_refs = {}
63 self.tub = Tub()
63 self.tub = Tub()
64 self.tub.startService()
64 self.tub.startService()
65
65
66 def _find_furl(self, profile='default', cluster_dir=None,
66 def _find_furl(self, profile='default', cluster_dir=None,
67 furl_or_file=None, furl_file_name=None,
67 furl_or_file=None, furl_file_name=None,
68 ipythondir=None):
68 ipythondir=None):
69 """Find a FURL file by profile+ipythondir or cluster dir.
69 """Find a FURL file by profile+ipythondir or cluster dir.
70
70
71 This raises an :exc:`~IPython.kernel.fcutil.FURLError` exception
71 This raises an :exc:`~IPython.kernel.fcutil.FURLError` exception
72 if a FURL file can't be found.
72 if a FURL file can't be found.
73 """
73 """
74 # Try by furl_or_file
74 # Try by furl_or_file
75 if furl_or_file is not None:
75 if furl_or_file is not None:
76 validate_furl_or_file(furl_or_file)
76 validate_furl_or_file(furl_or_file)
77 return furl_or_file
77 return furl_or_file
78
78
79 if furl_file_name is None:
79 if furl_file_name is None:
80 raise FURLError('A furl_file_name must be provided')
80 raise FURLError('A furl_file_name must be provided')
81
81
82 # Try by cluster_dir
82 # Try by cluster_dir
83 if cluster_dir is not None:
83 if cluster_dir is not None:
84 cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
84 cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
85 sdir = cluster_dir_obj.security_dir
85 sdir = cluster_dir_obj.security_dir
86 furl_file = os.path.join(sdir, furl_file_name)
86 furl_file = os.path.join(sdir, furl_file_name)
87 validate_furl_or_file(furl_file)
87 validate_furl_or_file(furl_file)
88 return furl_file
88 return furl_file
89
89
90 # Try by profile
90 # Try by profile
91 if ipythondir is None:
91 if ipythondir is None:
92 ipythondir = get_ipython_dir()
92 ipythondir = get_ipython_dir()
93 if profile is not None:
93 if profile is not None:
94 cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
94 cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
95 ipythondir, profile)
95 ipythondir, profile)
96 sdir = cluster_dir_obj.security_dir
96 sdir = cluster_dir_obj.security_dir
97 furl_file = os.path.join(sdir, furl_file_name)
97 furl_file = os.path.join(sdir, furl_file_name)
98 validate_furl_or_file(furl_file)
98 validate_furl_or_file(furl_file)
99 return furl_file
99 return furl_file
100
100
101 raise FURLError('Could not find a valid FURL file.')
101 raise FURLError('Could not find a valid FURL file.')
102
102
103 def get_reference(self, furl_or_file):
103 def get_reference(self, furl_or_file):
104 """Get a remote reference using a furl or a file containing a furl.
104 """Get a remote reference using a furl or a file containing a furl.
105
105
106 Remote references are cached locally so once a remote reference
106 Remote references are cached locally so once a remote reference
107 has been retrieved for a given furl, the cached version is
107 has been retrieved for a given furl, the cached version is
108 returned.
108 returned.
109
109
110 Parameters
110 Parameters
111 ----------
111 ----------
112 furl_or_file : str
112 furl_or_file : str
113 A furl or a filename containing a furl. This should already be
113 A furl or a filename containing a furl. This should already be
114 validated, but might not yet exist.
114 validated, but might not yet exist.
115
115
116 Returns
116 Returns
117 -------
117 -------
118 A deferred to a remote reference
118 A deferred to a remote reference
119 """
119 """
120 furl = furl_or_file
120 furl = furl_or_file
121 if furl in self._remote_refs:
121 if furl in self._remote_refs:
122 d = defer.succeed(self._remote_refs[furl])
122 d = defer.succeed(self._remote_refs[furl])
123 else:
123 else:
124 d = self.tub.getReference(furl)
124 d = self.tub.getReference(furl)
125 d.addCallback(self._save_ref, furl)
125 d.addCallback(self._save_ref, furl)
126 return d
126 return d
127
127
128 def _save_ref(self, ref, furl):
128 def _save_ref(self, ref, furl):
129 """Cache a remote reference by its furl."""
129 """Cache a remote reference by its furl."""
130 self._remote_refs[furl] = ref
130 self._remote_refs[furl] = ref
131 return ref
131 return ref
132
132
133 def get_task_client(self, profile='default', cluster_dir=None,
133 def get_task_client(self, profile='default', cluster_dir=None,
134 furl_or_file=None, ipythondir=None,
134 furl_or_file=None, ipythondir=None,
135 delay=DELAY, max_tries=MAX_TRIES):
135 delay=DELAY, max_tries=MAX_TRIES):
136 """Get the task controller client.
136 """Get the task controller client.
137
137
138 This method is a simple wrapper around `get_client` that passes in
138 This method is a simple wrapper around `get_client` that passes in
139 the default name of the task client FURL file. Usually only
139 the default name of the task client FURL file. Usually only
140 the ``profile`` option will be needed. If a FURL file can't be
140 the ``profile`` option will be needed. If a FURL file can't be
141 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
141 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
142
142
143 Parameters
143 Parameters
144 ----------
144 ----------
145 profile : str
145 profile : str
146 The name of a cluster directory profile (default="default"). The
146 The name of a cluster directory profile (default="default"). The
147 cluster directory "cluster_<profile>" will be searched for
147 cluster directory "cluster_<profile>" will be searched for
148 in ``os.getcwd()``, the ipythondir and then in the directories
148 in ``os.getcwd()``, the ipythondir and then in the directories
149 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
149 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
150 cluster_dir : str
150 cluster_dir : str
151 The full path to a cluster directory. This is useful if profiles
151 The full path to a cluster directory. This is useful if profiles
152 are not being used.
152 are not being used.
153 furl_or_file : str
153 furl_or_file : str
154 A furl or a filename containing a FURLK. This is useful if you
154 A furl or a filename containing a FURLK. This is useful if you
155 simply know the location of the FURL file.
155 simply know the location of the FURL file.
156 ipythondir : str
156 ipythondir : str
157 The location of the ipythondir if different from the default.
157 The location of the ipythondir if different from the default.
158 This is used if the cluster directory is being found by profile.
158 This is used if the cluster directory is being found by profile.
159 delay : float
160 The initial delay between re-connection attempts. Susequent delays
161 get longer according to ``delay[i] = 1.5*delay[i-1]``.
162 max_tries : int
163 The max number of re-connection attempts.
159
164
160 Returns
165 Returns
161 -------
166 -------
162 A deferred to the actual client class.
167 A deferred to the actual client class.
163 """
168 """
164 return self.get_client(
169 return self.get_client(
165 profile, cluster_dir, furl_or_file,
170 profile, cluster_dir, furl_or_file,
166 'ipcontroller-tc.furl', ipythondir,
171 'ipcontroller-tc.furl', ipythondir,
167 delay, max_tries
172 delay, max_tries
168 )
173 )
169
174
170 def get_multiengine_client(self, profile='default', cluster_dir=None,
175 def get_multiengine_client(self, profile='default', cluster_dir=None,
171 furl_or_file=None, ipythondir=None,
176 furl_or_file=None, ipythondir=None,
172 delay=DELAY, max_tries=MAX_TRIES):
177 delay=DELAY, max_tries=MAX_TRIES):
173 """Get the multiengine controller client.
178 """Get the multiengine controller client.
174
179
175 This method is a simple wrapper around `get_client` that passes in
180 This method is a simple wrapper around `get_client` that passes in
176 the default name of the task client FURL file. Usually only
181 the default name of the task client FURL file. Usually only
177 the ``profile`` option will be needed. If a FURL file can't be
182 the ``profile`` option will be needed. If a FURL file can't be
178 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
183 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
179
184
180 Parameters
185 Parameters
181 ----------
186 ----------
182 profile : str
187 profile : str
183 The name of a cluster directory profile (default="default"). The
188 The name of a cluster directory profile (default="default"). The
184 cluster directory "cluster_<profile>" will be searched for
189 cluster directory "cluster_<profile>" will be searched for
185 in ``os.getcwd()``, the ipythondir and then in the directories
190 in ``os.getcwd()``, the ipythondir and then in the directories
186 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
191 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
187 cluster_dir : str
192 cluster_dir : str
188 The full path to a cluster directory. This is useful if profiles
193 The full path to a cluster directory. This is useful if profiles
189 are not being used.
194 are not being used.
190 furl_or_file : str
195 furl_or_file : str
191 A furl or a filename containing a FURLK. This is useful if you
196 A furl or a filename containing a FURLK. This is useful if you
192 simply know the location of the FURL file.
197 simply know the location of the FURL file.
193 ipythondir : str
198 ipythondir : str
194 The location of the ipythondir if different from the default.
199 The location of the ipythondir if different from the default.
195 This is used if the cluster directory is being found by profile.
200 This is used if the cluster directory is being found by profile.
196
201 delay : float
202 The initial delay between re-connection attempts. Susequent delays
203 get longer according to ``delay[i] = 1.5*delay[i-1]``.
204 max_tries : int
205 The max number of re-connection attempts.
206
197 Returns
207 Returns
198 -------
208 -------
199 A deferred to the actual client class.
209 A deferred to the actual client class.
200 """
210 """
201 return self.get_client(
211 return self.get_client(
202 profile, cluster_dir, furl_or_file,
212 profile, cluster_dir, furl_or_file,
203 'ipcontroller-mec.furl', ipythondir,
213 'ipcontroller-mec.furl', ipythondir,
204 delay, max_tries
214 delay, max_tries
205 )
215 )
206
216
207 def get_client(self, profile='default', cluster_dir=None,
217 def get_client(self, profile='default', cluster_dir=None,
208 furl_or_file=None, furl_file_name=None, ipythondir=None,
218 furl_or_file=None, furl_file_name=None, ipythondir=None,
209 delay=DELAY, max_tries=MAX_TRIES):
219 delay=DELAY, max_tries=MAX_TRIES):
210 """Get a remote reference and wrap it in a client by furl.
220 """Get a remote reference and wrap it in a client by furl.
211
221
212 This method is a simple wrapper around `get_client` that passes in
222 This method is a simple wrapper around `get_client` that passes in
213 the default name of the task client FURL file. Usually only
223 the default name of the task client FURL file. Usually only
214 the ``profile`` option will be needed. If a FURL file can't be
224 the ``profile`` option will be needed. If a FURL file can't be
215 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
225 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
216
226
217 Parameters
227 Parameters
218 ----------
228 ----------
219 profile : str
229 profile : str
220 The name of a cluster directory profile (default="default"). The
230 The name of a cluster directory profile (default="default"). The
221 cluster directory "cluster_<profile>" will be searched for
231 cluster directory "cluster_<profile>" will be searched for
222 in ``os.getcwd()``, the ipythondir and then in the directories
232 in ``os.getcwd()``, the ipythondir and then in the directories
223 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
233 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
224 cluster_dir : str
234 cluster_dir : str
225 The full path to a cluster directory. This is useful if profiles
235 The full path to a cluster directory. This is useful if profiles
226 are not being used.
236 are not being used.
227 furl_or_file : str
237 furl_or_file : str
228 A furl or a filename containing a FURL. This is useful if you
238 A furl or a filename containing a FURL. This is useful if you
229 simply know the location of the FURL file.
239 simply know the location of the FURL file.
230 furl_file_name : str
240 furl_file_name : str
231 The filename (not the full path) of the FURL. This must be
241 The filename (not the full path) of the FURL. This must be
232 provided if ``furl_or_file`` is not.
242 provided if ``furl_or_file`` is not.
233 ipythondir : str
243 ipythondir : str
234 The location of the ipythondir if different from the default.
244 The location of the ipythondir if different from the default.
235 This is used if the cluster directory is being found by profile.
245 This is used if the cluster directory is being found by profile.
246 delay : float
247 The initial delay between re-connection attempts. Susequent delays
248 get longer according to ``delay[i] = 1.5*delay[i-1]``.
249 max_tries : int
250 The max number of re-connection attempts.
236
251
237 Returns
252 Returns
238 -------
253 -------
239 A deferred to the actual client class. Or a failure to a
254 A deferred to the actual client class. Or a failure to a
240 :exc:`FURLError`.
255 :exc:`FURLError`.
241 """
256 """
242 try:
257 try:
243 furl_file = self._find_furl(
258 furl_file = self._find_furl(
244 profile, cluster_dir, furl_or_file,
259 profile, cluster_dir, furl_or_file,
245 furl_file_name, ipythondir
260 furl_file_name, ipythondir
246 )
261 )
247 except FURLError:
262 except FURLError:
248 return defer.fail(failure.Failure())
263 return defer.fail(failure.Failure())
249
264
250 def _wrap_remote_reference(rr):
265 def _wrap_remote_reference(rr):
251 d = rr.callRemote('get_client_name')
266 d = rr.callRemote('get_client_name')
252 d.addCallback(lambda name: import_item(name))
267 d.addCallback(lambda name: import_item(name))
253 def adapt(client_interface):
268 def adapt(client_interface):
254 client = client_interface(rr)
269 client = client_interface(rr)
255 client.tub = self.tub
270 client.tub = self.tub
256 return client
271 return client
257 d.addCallback(adapt)
272 d.addCallback(adapt)
258
273
259 return d
274 return d
260
275
261 d = self._try_to_connect(furl_file, delay, max_tries, attempt=0)
276 d = self._try_to_connect(furl_file, delay, max_tries, attempt=0)
262 d.addCallback(_wrap_remote_reference)
277 d.addCallback(_wrap_remote_reference)
263 return d
278 return d
264
279
265 @inlineCallbacks
280 @inlineCallbacks
266 def _try_to_connect(self, furl_or_file, delay, max_tries, attempt):
281 def _try_to_connect(self, furl_or_file, delay, max_tries, attempt):
267 """Try to connect to the controller with retry logic."""
282 """Try to connect to the controller with retry logic."""
268 if attempt < max_tries:
283 if attempt < max_tries:
269 log.msg("Connecting to controller [%r]: %s" % \
284 log.msg("Connecting to controller [%r]: %s" % \
270 (attempt, furl_or_file))
285 (attempt, furl_or_file))
271 try:
286 try:
272 self.furl = find_furl(furl_or_file)
287 self.furl = find_furl(furl_or_file)
273 # Uncomment this to see the FURL being tried.
288 # Uncomment this to see the FURL being tried.
274 # log.msg("FURL: %s" % self.furl)
289 # log.msg("FURL: %s" % self.furl)
275 rr = yield self.get_reference(self.furl)
290 rr = yield self.get_reference(self.furl)
276 except:
291 except:
277 if attempt==max_tries-1:
292 if attempt==max_tries-1:
278 # This will propagate the exception all the way to the top
293 # This will propagate the exception all the way to the top
279 # where it can be handled.
294 # where it can be handled.
280 raise
295 raise
281 else:
296 else:
282 yield sleep_deferred(delay)
297 yield sleep_deferred(delay)
283 rr = yield self._try_to_connect(
298 rr = yield self._try_to_connect(
284 furl_or_file, 1.5*delay, max_tries, attempt+1
299 furl_or_file, 1.5*delay, max_tries, attempt+1
285 )
300 )
286 returnValue(rr)
301 returnValue(rr)
287 else:
302 else:
288 returnValue(rr)
303 returnValue(rr)
289 else:
304 else:
290 raise ClientConnectorError(
305 raise ClientConnectorError(
291 'Could not connect to controller, max_tries (%r) exceeded. '
306 'Could not connect to controller, max_tries (%r) exceeded. '
292 'This usually means that i) the controller was not started, '
307 'This usually means that i) the controller was not started, '
293 'or ii) a firewall was blocking the client from connecting '
308 'or ii) a firewall was blocking the client from connecting '
294 'to the controller.' % max_tries
309 'to the controller.' % max_tries
295 )
310 )
296
311
297
312
298 class ClientConnector(object):
313 class ClientConnector(object):
299 """A blocking version of a client connector.
314 """A blocking version of a client connector.
300
315
301 This class creates a single :class:`Tub` instance and allows remote
316 This class creates a single :class:`Tub` instance and allows remote
302 references and client to be retrieved by their FURLs. Remote references
317 references and client to be retrieved by their FURLs. Remote references
303 are cached locally and FURL files can be found using profiles and cluster
318 are cached locally and FURL files can be found using profiles and cluster
304 directories.
319 directories.
305 """
320 """
306
321
307 def __init__(self):
322 def __init__(self):
308 self.async_cc = AsyncClientConnector()
323 self.async_cc = AsyncClientConnector()
309
324
310 def get_task_client(self, profile='default', cluster_dir=None,
325 def get_task_client(self, profile='default', cluster_dir=None,
311 furl_or_file=None, ipythondir=None,
326 furl_or_file=None, ipythondir=None,
312 delay=DELAY, max_tries=MAX_TRIES):
327 delay=DELAY, max_tries=MAX_TRIES):
313 """Get the task client.
328 """Get the task client.
314
329
315 Usually only the ``profile`` option will be needed. If a FURL file
330 Usually only the ``profile`` option will be needed. If a FURL file
316 can't be found by its profile, use ``cluster_dir`` or
331 can't be found by its profile, use ``cluster_dir`` or
317 ``furl_or_file``.
332 ``furl_or_file``.
318
333
319 Parameters
334 Parameters
320 ----------
335 ----------
321 profile : str
336 profile : str
322 The name of a cluster directory profile (default="default"). The
337 The name of a cluster directory profile (default="default"). The
323 cluster directory "cluster_<profile>" will be searched for
338 cluster directory "cluster_<profile>" will be searched for
324 in ``os.getcwd()``, the ipythondir and then in the directories
339 in ``os.getcwd()``, the ipythondir and then in the directories
325 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
340 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
326 cluster_dir : str
341 cluster_dir : str
327 The full path to a cluster directory. This is useful if profiles
342 The full path to a cluster directory. This is useful if profiles
328 are not being used.
343 are not being used.
329 furl_or_file : str
344 furl_or_file : str
330 A furl or a filename containing a FURLK. This is useful if you
345 A furl or a filename containing a FURLK. This is useful if you
331 simply know the location of the FURL file.
346 simply know the location of the FURL file.
332 ipythondir : str
347 ipythondir : str
333 The location of the ipythondir if different from the default.
348 The location of the ipythondir if different from the default.
334 This is used if the cluster directory is being found by profile.
349 This is used if the cluster directory is being found by profile.
350 delay : float
351 The initial delay between re-connection attempts. Susequent delays
352 get longer according to ``delay[i] = 1.5*delay[i-1]``.
353 max_tries : int
354 The max number of re-connection attempts.
335
355
336 Returns
356 Returns
337 -------
357 -------
338 The task client instance.
358 The task client instance.
339 """
359 """
340 client = blockingCallFromThread(
360 client = blockingCallFromThread(
341 self.async_cc.get_task_client, profile, cluster_dir,
361 self.async_cc.get_task_client, profile, cluster_dir,
342 furl_or_file, ipythondir, delay, max_tries
362 furl_or_file, ipythondir, delay, max_tries
343 )
363 )
344 return client.adapt_to_blocking_client()
364 return client.adapt_to_blocking_client()
345
365
346 def get_multiengine_client(self, profile='default', cluster_dir=None,
366 def get_multiengine_client(self, profile='default', cluster_dir=None,
347 furl_or_file=None, ipythondir=None,
367 furl_or_file=None, ipythondir=None,
348 delay=DELAY, max_tries=MAX_TRIES):
368 delay=DELAY, max_tries=MAX_TRIES):
349 """Get the multiengine client.
369 """Get the multiengine client.
350
370
351 Usually only the ``profile`` option will be needed. If a FURL file
371 Usually only the ``profile`` option will be needed. If a FURL file
352 can't be found by its profile, use ``cluster_dir`` or
372 can't be found by its profile, use ``cluster_dir`` or
353 ``furl_or_file``.
373 ``furl_or_file``.
354
374
355 Parameters
375 Parameters
356 ----------
376 ----------
357 profile : str
377 profile : str
358 The name of a cluster directory profile (default="default"). The
378 The name of a cluster directory profile (default="default"). The
359 cluster directory "cluster_<profile>" will be searched for
379 cluster directory "cluster_<profile>" will be searched for
360 in ``os.getcwd()``, the ipythondir and then in the directories
380 in ``os.getcwd()``, the ipythondir and then in the directories
361 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
381 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
362 cluster_dir : str
382 cluster_dir : str
363 The full path to a cluster directory. This is useful if profiles
383 The full path to a cluster directory. This is useful if profiles
364 are not being used.
384 are not being used.
365 furl_or_file : str
385 furl_or_file : str
366 A furl or a filename containing a FURLK. This is useful if you
386 A furl or a filename containing a FURLK. This is useful if you
367 simply know the location of the FURL file.
387 simply know the location of the FURL file.
368 ipythondir : str
388 ipythondir : str
369 The location of the ipythondir if different from the default.
389 The location of the ipythondir if different from the default.
370 This is used if the cluster directory is being found by profile.
390 This is used if the cluster directory is being found by profile.
391 delay : float
392 The initial delay between re-connection attempts. Susequent delays
393 get longer according to ``delay[i] = 1.5*delay[i-1]``.
394 max_tries : int
395 The max number of re-connection attempts.
371
396
372 Returns
397 Returns
373 -------
398 -------
374 The multiengine client instance.
399 The multiengine client instance.
375 """
400 """
376 client = blockingCallFromThread(
401 client = blockingCallFromThread(
377 self.async_cc.get_multiengine_client, profile, cluster_dir,
402 self.async_cc.get_multiengine_client, profile, cluster_dir,
378 furl_or_file, ipythondir, delay, max_tries
403 furl_or_file, ipythondir, delay, max_tries
379 )
404 )
380 return client.adapt_to_blocking_client()
405 return client.adapt_to_blocking_client()
381
406
382 def get_client(self, profile='default', cluster_dir=None,
407 def get_client(self, profile='default', cluster_dir=None,
383 furl_or_file=None, ipythondir=None,
408 furl_or_file=None, ipythondir=None,
384 delay=DELAY, max_tries=MAX_TRIES):
409 delay=DELAY, max_tries=MAX_TRIES):
385 client = blockingCallFromThread(
410 client = blockingCallFromThread(
386 self.async_cc.get_client, profile, cluster_dir,
411 self.async_cc.get_client, profile, cluster_dir,
387 furl_or_file, ipythondir,
412 furl_or_file, ipythondir,
388 delay, max_tries
413 delay, max_tries
389 )
414 )
390 return client.adapt_to_blocking_client()
415 return client.adapt_to_blocking_client()
391
416
392
417
393 class ClusterStateError(Exception):
418 class ClusterStateError(Exception):
394 pass
419 pass
395
420
396
421
397 class AsyncCluster(object):
422 class AsyncCluster(object):
398 """An class that wraps the :command:`ipcluster` script."""
423 """An class that wraps the :command:`ipcluster` script."""
399
424
400 def __init__(self, profile='default', cluster_dir=None, ipythondir=None,
425 def __init__(self, profile='default', cluster_dir=None, ipythondir=None,
401 auto_create=False, auto_stop=True):
426 auto_create=False, auto_stop=True):
402 """Create a class to manage an IPython cluster.
427 """Create a class to manage an IPython cluster.
403
428
404 This class calls the :command:`ipcluster` command with the right
429 This class calls the :command:`ipcluster` command with the right
405 options to start an IPython cluster. Typically a cluster directory
430 options to start an IPython cluster. Typically a cluster directory
406 must be created (:command:`ipcluster create`) and configured before
431 must be created (:command:`ipcluster create`) and configured before
407 using this class. Configuration is done by editing the
432 using this class. Configuration is done by editing the
408 configuration files in the top level of the cluster directory.
433 configuration files in the top level of the cluster directory.
409
434
410 Parameters
435 Parameters
411 ----------
436 ----------
412 profile : str
437 profile : str
413 The name of a cluster directory profile (default="default"). The
438 The name of a cluster directory profile (default="default"). The
414 cluster directory "cluster_<profile>" will be searched for
439 cluster directory "cluster_<profile>" will be searched for
415 in ``os.getcwd()``, the ipythondir and then in the directories
440 in ``os.getcwd()``, the ipythondir and then in the directories
416 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
441 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
417 cluster_dir : str
442 cluster_dir : str
418 The full path to a cluster directory. This is useful if profiles
443 The full path to a cluster directory. This is useful if profiles
419 are not being used.
444 are not being used.
420 ipythondir : str
445 ipythondir : str
421 The location of the ipythondir if different from the default.
446 The location of the ipythondir if different from the default.
422 This is used if the cluster directory is being found by profile.
447 This is used if the cluster directory is being found by profile.
423 auto_create : bool
448 auto_create : bool
424 Automatically create the cluster directory it is dones't exist.
449 Automatically create the cluster directory it is dones't exist.
425 This will usually only make sense if using a local cluster
450 This will usually only make sense if using a local cluster
426 (default=False).
451 (default=False).
427 auto_stop : bool
452 auto_stop : bool
428 Automatically stop the cluster when this instance is garbage
453 Automatically stop the cluster when this instance is garbage
429 collected (default=True). This is useful if you want the cluster
454 collected (default=True). This is useful if you want the cluster
430 to live beyond your current process. There is also an instance
455 to live beyond your current process. There is also an instance
431 attribute ``auto_stop`` to change this behavior.
456 attribute ``auto_stop`` to change this behavior.
432 """
457 """
433 self._setup_cluster_dir(profile, cluster_dir, ipythondir, auto_create)
458 self._setup_cluster_dir(profile, cluster_dir, ipythondir, auto_create)
434 self.state = 'before'
459 self.state = 'before'
435 self.launcher = None
460 self.launcher = None
436 self.client_connector = None
461 self.client_connector = None
437 self.auto_stop = auto_stop
462 self.auto_stop = auto_stop
438
463
439 def __del__(self):
464 def __del__(self):
440 if self.auto_stop and self.state=='running':
465 if self.auto_stop and self.state=='running':
441 print "Auto stopping the cluster..."
466 print "Auto stopping the cluster..."
442 self.stop()
467 self.stop()
443
468
444 @property
469 @property
445 def location(self):
470 def location(self):
446 if hasattr(self, 'cluster_dir_obj'):
471 if hasattr(self, 'cluster_dir_obj'):
447 return self.cluster_dir_obj.location
472 return self.cluster_dir_obj.location
448 else:
473 else:
449 return ''
474 return ''
450
475
451 @property
476 @property
452 def running(self):
477 def running(self):
453 if self.state=='running':
478 if self.state=='running':
454 return True
479 return True
455 else:
480 else:
456 return False
481 return False
457
482
458 def _setup_cluster_dir(self, profile, cluster_dir, ipythondir, auto_create):
483 def _setup_cluster_dir(self, profile, cluster_dir, ipythondir, auto_create):
459 if ipythondir is None:
484 if ipythondir is None:
460 ipythondir = get_ipython_dir()
485 ipythondir = get_ipython_dir()
461 if cluster_dir is not None:
486 if cluster_dir is not None:
462 try:
487 try:
463 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
488 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
464 except ClusterDirError:
489 except ClusterDirError:
465 pass
490 pass
466 if profile is not None:
491 if profile is not None:
467 try:
492 try:
468 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
493 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
469 ipythondir, profile)
494 ipythondir, profile)
470 except ClusterDirError:
495 except ClusterDirError:
471 pass
496 pass
472 if auto_create or profile=='default':
497 if auto_create or profile=='default':
473 # This should call 'ipcluster create --profile default
498 # This should call 'ipcluster create --profile default
474 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
499 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
475 ipythondir, profile)
500 ipythondir, profile)
476 else:
501 else:
477 raise ClusterDirError('Cluster dir not found.')
502 raise ClusterDirError('Cluster dir not found.')
478
503
479 @make_deferred
504 @make_deferred
480 def start(self, n=2):
505 def start(self, n=2):
481 """Start the IPython cluster with n engines.
506 """Start the IPython cluster with n engines.
482
507
483 Parameters
508 Parameters
484 ----------
509 ----------
485 n : int
510 n : int
486 The number of engine to start.
511 The number of engine to start.
487 """
512 """
488 # We might want to add logic to test if the cluster has started
513 # We might want to add logic to test if the cluster has started
489 # by another process....
514 # by another process....
490 if not self.state=='running':
515 if not self.state=='running':
491 self.launcher = IPClusterLauncher(os.getcwd())
516 self.launcher = IPClusterLauncher(os.getcwd())
492 self.launcher.ipcluster_n = n
517 self.launcher.ipcluster_n = n
493 self.launcher.ipcluster_subcommand = 'start'
518 self.launcher.ipcluster_subcommand = 'start'
494 d = self.launcher.start()
519 d = self.launcher.start()
495 d.addCallback(self._handle_start)
520 d.addCallback(self._handle_start)
496 return d
521 return d
497 else:
522 else:
498 raise ClusterStateError('Cluster is already running')
523 raise ClusterStateError('Cluster is already running')
499
524
500 @make_deferred
525 @make_deferred
501 def stop(self):
526 def stop(self):
502 """Stop the IPython cluster if it is running."""
527 """Stop the IPython cluster if it is running."""
503 if self.state=='running':
528 if self.state=='running':
504 d1 = self.launcher.observe_stop()
529 d1 = self.launcher.observe_stop()
505 d1.addCallback(self._handle_stop)
530 d1.addCallback(self._handle_stop)
506 d2 = self.launcher.stop()
531 d2 = self.launcher.stop()
507 return gatherBoth([d1, d2], consumeErrors=True)
532 return gatherBoth([d1, d2], consumeErrors=True)
508 else:
533 else:
509 raise ClusterStateError("Cluster not running")
534 raise ClusterStateError("Cluster not running")
510
535
511 def get_multiengine_client(self, delay=DELAY, max_tries=MAX_TRIES):
536 def get_multiengine_client(self, delay=DELAY, max_tries=MAX_TRIES):
512 """Get the multiengine client for the running cluster.
537 """Get the multiengine client for the running cluster.
513
538
514 If this fails, it means that the cluster has not finished starting.
539 If this fails, it means that the cluster has not finished starting.
515 Usually waiting a few seconds are re-trying will solve this.
540 Usually waiting a few seconds are re-trying will solve this.
516 """
541 """
517 if self.client_connector is None:
542 if self.client_connector is None:
518 self.client_connector = AsyncClientConnector()
543 self.client_connector = AsyncClientConnector()
519 return self.client_connector.get_multiengine_client(
544 return self.client_connector.get_multiengine_client(
520 cluster_dir=self.cluster_dir_obj.location,
545 cluster_dir=self.cluster_dir_obj.location,
521 delay=delay, max_tries=max_tries
546 delay=delay, max_tries=max_tries
522 )
547 )
523
548
524 def get_task_client(self, delay=DELAY, max_tries=MAX_TRIES):
549 def get_task_client(self, delay=DELAY, max_tries=MAX_TRIES):
525 """Get the task client for the running cluster.
550 """Get the task client for the running cluster.
526
551
527 If this fails, it means that the cluster has not finished starting.
552 If this fails, it means that the cluster has not finished starting.
528 Usually waiting a few seconds are re-trying will solve this.
553 Usually waiting a few seconds are re-trying will solve this.
529 """
554 """
530 if self.client_connector is None:
555 if self.client_connector is None:
531 self.client_connector = AsyncClientConnector()
556 self.client_connector = AsyncClientConnector()
532 return self.client_connector.get_task_client(
557 return self.client_connector.get_task_client(
533 cluster_dir=self.cluster_dir_obj.location,
558 cluster_dir=self.cluster_dir_obj.location,
534 delay=delay, max_tries=max_tries
559 delay=delay, max_tries=max_tries
535 )
560 )
536
561
537 def get_ipengine_logs(self):
562 def get_ipengine_logs(self):
538 return self.get_logs_by_name('ipengine')
563 return self.get_logs_by_name('ipengine')
539
564
540 def get_ipcontroller_logs(self):
565 def get_ipcontroller_logs(self):
541 return self.get_logs_by_name('ipcontroller')
566 return self.get_logs_by_name('ipcontroller')
542
567
543 def get_ipcluster_logs(self):
568 def get_ipcluster_logs(self):
544 return self.get_logs_by_name('ipcluster')
569 return self.get_logs_by_name('ipcluster')
545
570
546 def get_logs_by_name(self, name='ipcluster'):
571 def get_logs_by_name(self, name='ipcluster'):
547 log_dir = self.cluster_dir_obj.log_dir
572 log_dir = self.cluster_dir_obj.log_dir
548 logs = {}
573 logs = {}
549 for log in os.listdir(log_dir):
574 for log in os.listdir(log_dir):
550 if log.startswith(name + '-') and log.endswith('.log'):
575 if log.startswith(name + '-') and log.endswith('.log'):
551 with open(os.path.join(log_dir, log), 'r') as f:
576 with open(os.path.join(log_dir, log), 'r') as f:
552 logs[log] = f.read()
577 logs[log] = f.read()
553 return logs
578 return logs
554
579
555 def get_logs(self):
580 def get_logs(self):
556 d = self.get_ipcluster_logs()
581 d = self.get_ipcluster_logs()
557 d.update(self.get_ipengine_logs())
582 d.update(self.get_ipengine_logs())
558 d.update(self.get_ipcontroller_logs())
583 d.update(self.get_ipcontroller_logs())
559 return d
584 return d
560
585
561 def _handle_start(self, r):
586 def _handle_start(self, r):
562 self.state = 'running'
587 self.state = 'running'
563
588
564 def _handle_stop(self, r):
589 def _handle_stop(self, r):
565 self.state = 'after'
590 self.state = 'after'
566
591
567
592
568 class Cluster(object):
593 class Cluster(object):
569
594
570
595
571 def __init__(self, profile='default', cluster_dir=None, ipythondir=None,
596 def __init__(self, profile='default', cluster_dir=None, ipythondir=None,
572 auto_create=False, auto_stop=True):
597 auto_create=False, auto_stop=True):
573 """Create a class to manage an IPython cluster.
598 """Create a class to manage an IPython cluster.
574
599
575 This class calls the :command:`ipcluster` command with the right
600 This class calls the :command:`ipcluster` command with the right
576 options to start an IPython cluster. Typically a cluster directory
601 options to start an IPython cluster. Typically a cluster directory
577 must be created (:command:`ipcluster create`) and configured before
602 must be created (:command:`ipcluster create`) and configured before
578 using this class. Configuration is done by editing the
603 using this class. Configuration is done by editing the
579 configuration files in the top level of the cluster directory.
604 configuration files in the top level of the cluster directory.
580
605
581 Parameters
606 Parameters
582 ----------
607 ----------
583 profile : str
608 profile : str
584 The name of a cluster directory profile (default="default"). The
609 The name of a cluster directory profile (default="default"). The
585 cluster directory "cluster_<profile>" will be searched for
610 cluster directory "cluster_<profile>" will be searched for
586 in ``os.getcwd()``, the ipythondir and then in the directories
611 in ``os.getcwd()``, the ipythondir and then in the directories
587 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
612 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
588 cluster_dir : str
613 cluster_dir : str
589 The full path to a cluster directory. This is useful if profiles
614 The full path to a cluster directory. This is useful if profiles
590 are not being used.
615 are not being used.
591 ipythondir : str
616 ipythondir : str
592 The location of the ipythondir if different from the default.
617 The location of the ipythondir if different from the default.
593 This is used if the cluster directory is being found by profile.
618 This is used if the cluster directory is being found by profile.
594 auto_create : bool
619 auto_create : bool
595 Automatically create the cluster directory it is dones't exist.
620 Automatically create the cluster directory it is dones't exist.
596 This will usually only make sense if using a local cluster
621 This will usually only make sense if using a local cluster
597 (default=False).
622 (default=False).
598 auto_stop : bool
623 auto_stop : bool
599 Automatically stop the cluster when this instance is garbage
624 Automatically stop the cluster when this instance is garbage
600 collected (default=True). This is useful if you want the cluster
625 collected (default=True). This is useful if you want the cluster
601 to live beyond your current process. There is also an instance
626 to live beyond your current process. There is also an instance
602 attribute ``auto_stop`` to change this behavior.
627 attribute ``auto_stop`` to change this behavior.
603 """
628 """
604 self.async_cluster = AsyncCluster(
629 self.async_cluster = AsyncCluster(
605 profile, cluster_dir, ipythondir, auto_create, auto_stop
630 profile, cluster_dir, ipythondir, auto_create, auto_stop
606 )
631 )
607 self.cluster_dir_obj = self.async_cluster.cluster_dir_obj
632 self.cluster_dir_obj = self.async_cluster.cluster_dir_obj
608 self.client_connector = None
633 self.client_connector = None
609
634
610 def _set_auto_stop(self, value):
635 def _set_auto_stop(self, value):
611 self.async_cluster.auto_stop = value
636 self.async_cluster.auto_stop = value
612
637
613 def _get_auto_stop(self):
638 def _get_auto_stop(self):
614 return self.async_cluster.auto_stop
639 return self.async_cluster.auto_stop
615
640
616 auto_stop = property(_get_auto_stop, _set_auto_stop)
641 auto_stop = property(_get_auto_stop, _set_auto_stop)
617
642
618 @property
643 @property
619 def location(self):
644 def location(self):
620 return self.async_cluster.location
645 return self.async_cluster.location
621
646
622 @property
647 @property
623 def running(self):
648 def running(self):
624 return self.async_cluster.running
649 return self.async_cluster.running
625
650
626 def start(self, n=2):
651 def start(self, n=2):
627 """Start the IPython cluster with n engines.
652 """Start the IPython cluster with n engines.
628
653
629 Parameters
654 Parameters
630 ----------
655 ----------
631 n : int
656 n : int
632 The number of engine to start.
657 The number of engine to start.
633 """
658 """
634 return blockingCallFromThread(self.async_cluster.start, n)
659 return blockingCallFromThread(self.async_cluster.start, n)
635
660
636 def stop(self):
661 def stop(self):
637 """Stop the IPython cluster if it is running."""
662 """Stop the IPython cluster if it is running."""
638 return blockingCallFromThread(self.async_cluster.stop)
663 return blockingCallFromThread(self.async_cluster.stop)
639
664
640 def get_multiengine_client(self, delay=DELAY, max_tries=MAX_TRIES):
665 def get_multiengine_client(self, delay=DELAY, max_tries=MAX_TRIES):
641 """Get the multiengine client for the running cluster.
666 """Get the multiengine client for the running cluster.
642
667
643 If this fails, it means that the cluster has not finished starting.
668 This will try to attempt to the controller multiple times. If this
644 Usually waiting a few seconds are re-trying will solve this.
669 fails altogether, try looking at the following:
670 * Make sure the controller is starting properly by looking at its
671 log files.
672 * Make sure the controller is writing its FURL file in the location
673 expected by the client.
674 * Make sure a firewall on the controller's host is not blocking the
675 client from connecting.
676
677 Parameters
678 ----------
679 delay : float
680 The initial delay between re-connection attempts. Susequent delays
681 get longer according to ``delay[i] = 1.5*delay[i-1]``.
682 max_tries : int
683 The max number of re-connection attempts.
645 """
684 """
646 if self.client_connector is None:
685 if self.client_connector is None:
647 self.client_connector = ClientConnector()
686 self.client_connector = ClientConnector()
648 return self.client_connector.get_multiengine_client(
687 return self.client_connector.get_multiengine_client(
649 cluster_dir=self.cluster_dir_obj.location,
688 cluster_dir=self.cluster_dir_obj.location,
650 delay=delay, max_tries=max_tries
689 delay=delay, max_tries=max_tries
651 )
690 )
652
691
653 def get_task_client(self, delay=DELAY, max_tries=MAX_TRIES):
692 def get_task_client(self, delay=DELAY, max_tries=MAX_TRIES):
654 """Get the task client for the running cluster.
693 """Get the task client for the running cluster.
655
694
656 If this fails, it means that the cluster has not finished starting.
695 This will try to attempt to the controller multiple times. If this
657 Usually waiting a few seconds are re-trying will solve this.
696 fails altogether, try looking at the following:
697 * Make sure the controller is starting properly by looking at its
698 log files.
699 * Make sure the controller is writing its FURL file in the location
700 expected by the client.
701 * Make sure a firewall on the controller's host is not blocking the
702 client from connecting.
703
704 Parameters
705 ----------
706 delay : float
707 The initial delay between re-connection attempts. Susequent delays
708 get longer according to ``delay[i] = 1.5*delay[i-1]``.
709 max_tries : int
710 The max number of re-connection attempts.
658 """
711 """
659 if self.client_connector is None:
712 if self.client_connector is None:
660 self.client_connector = ClientConnector()
713 self.client_connector = ClientConnector()
661 return self.client_connector.get_task_client(
714 return self.client_connector.get_task_client(
662 cluster_dir=self.cluster_dir_obj.location,
715 cluster_dir=self.cluster_dir_obj.location,
663 delay=delay, max_tries=max_tries
716 delay=delay, max_tries=max_tries
664 )
717 )
665
718
666 def __repr__(self):
719 def __repr__(self):
667 s = "<Cluster(running=%r, location=%s)" % (self.running, self.location)
720 s = "<Cluster(running=%r, location=%s)" % (self.running, self.location)
668 return s
721 return s
669
722
670 def get_logs_by_name(self, name='ipcluter'):
723 def get_logs_by_name(self, name='ipcluter'):
671 """Get a dict of logs by process name (ipcluster, ipengine, etc.)"""
724 """Get a dict of logs by process name (ipcluster, ipengine, etc.)"""
672 return self.async_cluster.get_logs_by_name(name)
725 return self.async_cluster.get_logs_by_name(name)
673
726
674 def get_ipengine_logs(self):
727 def get_ipengine_logs(self):
675 """Get a dict of logs for all engines in this cluster."""
728 """Get a dict of logs for all engines in this cluster."""
676 return self.async_cluster.get_ipengine_logs()
729 return self.async_cluster.get_ipengine_logs()
677
730
678 def get_ipcontroller_logs(self):
731 def get_ipcontroller_logs(self):
679 """Get a dict of logs for the controller in this cluster."""
732 """Get a dict of logs for the controller in this cluster."""
680 return self.async_cluster.get_ipcontroller_logs()
733 return self.async_cluster.get_ipcontroller_logs()
681
734
682 def get_ipcluster_logs(self):
735 def get_ipcluster_logs(self):
683 """Get a dict of the ipcluster logs for this cluster."""
736 """Get a dict of the ipcluster logs for this cluster."""
684 return self.async_cluster.get_ipcluster_logs()
737 return self.async_cluster.get_ipcluster_logs()
685
738
686 def get_logs(self):
739 def get_logs(self):
687 """Get a dict of all logs for this cluster."""
740 """Get a dict of all logs for this cluster."""
688 return self.async_cluster.get_logs()
741 return self.async_cluster.get_logs()
689
742
690 def _print_logs(self, logs):
743 def _print_logs(self, logs):
691 for k, v in logs.iteritems():
744 for k, v in logs.iteritems():
692 print "==================================="
745 print "==================================="
693 print "Logfile: %s" % k
746 print "Logfile: %s" % k
694 print "==================================="
747 print "==================================="
695 print v
748 print v
696 print
749 print
697
750
698 def print_ipengine_logs(self):
751 def print_ipengine_logs(self):
699 """Print the ipengine logs for this cluster to stdout."""
752 """Print the ipengine logs for this cluster to stdout."""
700 self._print_logs(self.get_ipengine_logs())
753 self._print_logs(self.get_ipengine_logs())
701
754
702 def print_ipcontroller_logs(self):
755 def print_ipcontroller_logs(self):
703 """Print the ipcontroller logs for this cluster to stdout."""
756 """Print the ipcontroller logs for this cluster to stdout."""
704 self._print_logs(self.get_ipcontroller_logs())
757 self._print_logs(self.get_ipcontroller_logs())
705
758
706 def print_ipcluster_logs(self):
759 def print_ipcluster_logs(self):
707 """Print the ipcluster logs for this cluster to stdout."""
760 """Print the ipcluster logs for this cluster to stdout."""
708 self._print_logs(self.get_ipcluster_logs())
761 self._print_logs(self.get_ipcluster_logs())
709
762
710 def print_logs(self):
763 def print_logs(self):
711 """Print all the logs for this cluster to stdout."""
764 """Print all the logs for this cluster to stdout."""
712 self._print_logs(self.get_logs())
765 self._print_logs(self.get_logs())
713
766
@@ -1,628 +1,700 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching processing asynchronously.
4 Facilities for launching processing asynchronously.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import re
19 import re
20 import sys
20 import sys
21
21
22 from IPython.core.component import Component
22 from IPython.core.component import Component
23 from IPython.external import Itpl
23 from IPython.external import Itpl
24 from IPython.utils.traitlets import Str, Int, List, Unicode
24 from IPython.utils.traitlets import Str, Int, List, Unicode
25 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
25 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
26
26
27 from twisted.internet import reactor, defer
27 from twisted.internet import reactor, defer
28 from twisted.internet.defer import inlineCallbacks
28 from twisted.internet.defer import inlineCallbacks
29 from twisted.internet.protocol import ProcessProtocol
29 from twisted.internet.protocol import ProcessProtocol
30 from twisted.internet.utils import getProcessOutput
30 from twisted.internet.utils import getProcessOutput
31 from twisted.internet.error import ProcessDone, ProcessTerminated
31 from twisted.internet.error import ProcessDone, ProcessTerminated
32 from twisted.python import log
32 from twisted.python import log
33 from twisted.python.failure import Failure
33 from twisted.python.failure import Failure
34
34
35 #-----------------------------------------------------------------------------
35 #-----------------------------------------------------------------------------
36 # Generic launchers
36 # Generic launchers
37 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
38
38
39
39
40 class LauncherError(Exception):
40 class LauncherError(Exception):
41 pass
41 pass
42
42
43
43
44 class ProcessStateError(LauncherError):
44 class ProcessStateError(LauncherError):
45 pass
45 pass
46
46
47
47
48 class UnknownStatus(LauncherError):
48 class UnknownStatus(LauncherError):
49 pass
49 pass
50
50
51
51
52 class BaseLauncher(Component):
52 class BaseLauncher(Component):
53 """An asbtraction for starting, stopping and signaling a process."""
53 """An asbtraction for starting, stopping and signaling a process."""
54
54
55 # A directory for files related to the process. But, we don't cd to
56 # this directory,
55 working_dir = Unicode(u'')
57 working_dir = Unicode(u'')
56
58
57 def __init__(self, working_dir, parent=None, name=None, config=None):
59 def __init__(self, working_dir, parent=None, name=None, config=None):
58 super(BaseLauncher, self).__init__(parent, name, config)
60 super(BaseLauncher, self).__init__(parent, name, config)
59 self.working_dir = working_dir
61 self.working_dir = working_dir
60 self.state = 'before' # can be before, running, after
62 self.state = 'before' # can be before, running, after
61 self.stop_deferreds = []
63 self.stop_deferreds = []
62 self.start_data = None
64 self.start_data = None
63 self.stop_data = None
65 self.stop_data = None
64
66
65 @property
67 @property
66 def args(self):
68 def args(self):
67 """A list of cmd and args that will be used to start the process."""
69 """A list of cmd and args that will be used to start the process.
70
71 This is what is passed to :func:`spawnProcess` and the first element
72 will be the process name.
73 """
68 return self.find_args()
74 return self.find_args()
69
75
70 def find_args(self):
76 def find_args(self):
71 """The ``.args`` property calls this to find the args list."""
77 """The ``.args`` property calls this to find the args list.
78
79 Subcommand should implement this to construct the cmd and args.
80 """
72 raise NotImplementedError('find_args must be implemented in a subclass')
81 raise NotImplementedError('find_args must be implemented in a subclass')
73
82
74 @property
83 @property
75 def arg_str(self):
84 def arg_str(self):
76 """The string form of the program arguments."""
85 """The string form of the program arguments."""
77 return ' '.join(self.args)
86 return ' '.join(self.args)
78
87
79 @property
88 @property
80 def running(self):
89 def running(self):
90 """Am I running."""
81 if self.state == 'running':
91 if self.state == 'running':
82 return True
92 return True
83 else:
93 else:
84 return False
94 return False
85
95
86 def start(self):
96 def start(self):
87 """Start the process.
97 """Start the process.
88
98
89 This must return a deferred that fires with information about the
99 This must return a deferred that fires with information about the
90 process starting (like a pid, job id, etc.)
100 process starting (like a pid, job id, etc.).
91 """
101 """
92 return defer.fail(
102 return defer.fail(
93 Failure(NotImplementedError(
103 Failure(NotImplementedError(
94 'start must be implemented in a subclass')
104 'start must be implemented in a subclass')
95 )
105 )
96 )
106 )
97
107
98 def stop(self):
108 def stop(self):
99 """Stop the process and notify observers of ProcessStopped.
109 """Stop the process and notify observers of stopping.
100
110
101 This must return a deferred that fires with any errors that occur
111 This must return a deferred that fires with information about the
102 while the process is attempting to be shut down. This deferred
112 processing stopping, like errors that occur while the process is
103 won't fire when the process actually stops. These events are
113 attempting to be shut down. This deferred won't fire when the process
104 handled by calling :func:`observe_stop`.
114 actually stops. To observe the actual process stopping, see
115 :func:`observe_stop`.
105 """
116 """
106 return defer.fail(
117 return defer.fail(
107 Failure(NotImplementedError(
118 Failure(NotImplementedError(
108 'stop must be implemented in a subclass')
119 'stop must be implemented in a subclass')
109 )
120 )
110 )
121 )
111
122
112 def observe_stop(self):
123 def observe_stop(self):
113 """Get a deferred that will fire when the process stops.
124 """Get a deferred that will fire when the process stops.
114
125
115 The deferred will fire with data that contains information about
126 The deferred will fire with data that contains information about
116 the exit status of the process.
127 the exit status of the process.
117 """
128 """
118 if self.state=='after':
129 if self.state=='after':
119 return defer.succeed(self.stop_data)
130 return defer.succeed(self.stop_data)
120 else:
131 else:
121 d = defer.Deferred()
132 d = defer.Deferred()
122 self.stop_deferreds.append(d)
133 self.stop_deferreds.append(d)
123 return d
134 return d
124
135
125 def notify_start(self, data):
136 def notify_start(self, data):
126 """Call this to tigger startup actions.
137 """Call this to trigger startup actions.
127
138
128 This logs the process startup and sets the state to running. It is
139 This logs the process startup and sets the state to 'running'. It is
129 a pass-through so it can be used as a callback.
140 a pass-through so it can be used as a callback.
130 """
141 """
131
142
132 log.msg('Process %r started: %r' % (self.args[0], data))
143 log.msg('Process %r started: %r' % (self.args[0], data))
133 self.start_data = data
144 self.start_data = data
134 self.state = 'running'
145 self.state = 'running'
135 return data
146 return data
136
147
137 def notify_stop(self, data):
148 def notify_stop(self, data):
138 """Call this to trigger all the deferreds from :func:`observe_stop`."""
149 """Call this to trigger process stop actions.
150
151 This logs the process stopping and sets the state to 'after'. Call
152 this to trigger all the deferreds from :func:`observe_stop`."""
139
153
140 log.msg('Process %r stopped: %r' % (self.args[0], data))
154 log.msg('Process %r stopped: %r' % (self.args[0], data))
141 self.stop_data = data
155 self.stop_data = data
142 self.state = 'after'
156 self.state = 'after'
143 for i in range(len(self.stop_deferreds)):
157 for i in range(len(self.stop_deferreds)):
144 d = self.stop_deferreds.pop()
158 d = self.stop_deferreds.pop()
145 d.callback(data)
159 d.callback(data)
146 return data
160 return data
147
161
148 def signal(self, sig):
162 def signal(self, sig):
149 """Signal the process.
163 """Signal the process.
150
164
151 Return a semi-meaningless deferred after signaling the process.
165 Return a semi-meaningless deferred after signaling the process.
152
166
153 Parameters
167 Parameters
154 ----------
168 ----------
155 sig : str or int
169 sig : str or int
156 'KILL', 'INT', etc., or any signal number
170 'KILL', 'INT', etc., or any signal number
157 """
171 """
158 return defer.fail(
172 return defer.fail(
159 Failure(NotImplementedError(
173 Failure(NotImplementedError(
160 'signal must be implemented in a subclass')
174 'signal must be implemented in a subclass')
161 )
175 )
162 )
176 )
163
177
164
178
165 class LocalProcessLauncherProtocol(ProcessProtocol):
179 class LocalProcessLauncherProtocol(ProcessProtocol):
166 """A ProcessProtocol to go with the LocalProcessLauncher."""
180 """A ProcessProtocol to go with the LocalProcessLauncher."""
167
181
168 def __init__(self, process_launcher):
182 def __init__(self, process_launcher):
169 self.process_launcher = process_launcher
183 self.process_launcher = process_launcher
170 self.pid = None
184 self.pid = None
171
185
172 def connectionMade(self):
186 def connectionMade(self):
173 self.pid = self.transport.pid
187 self.pid = self.transport.pid
174 self.process_launcher.notify_start(self.transport.pid)
188 self.process_launcher.notify_start(self.transport.pid)
175
189
176 def processEnded(self, status):
190 def processEnded(self, status):
177 value = status.value
191 value = status.value
178 if isinstance(value, ProcessDone):
192 if isinstance(value, ProcessDone):
179 self.process_launcher.notify_stop(
193 self.process_launcher.notify_stop(
180 {'exit_code':0,
194 {'exit_code':0,
181 'signal':None,
195 'signal':None,
182 'status':None,
196 'status':None,
183 'pid':self.pid
197 'pid':self.pid
184 }
198 }
185 )
199 )
186 elif isinstance(value, ProcessTerminated):
200 elif isinstance(value, ProcessTerminated):
187 self.process_launcher.notify_stop(
201 self.process_launcher.notify_stop(
188 {'exit_code':value.exitCode,
202 {'exit_code':value.exitCode,
189 'signal':value.signal,
203 'signal':value.signal,
190 'status':value.status,
204 'status':value.status,
191 'pid':self.pid
205 'pid':self.pid
192 }
206 }
193 )
207 )
194 else:
208 else:
195 raise UnknownStatus("Unknown exit status, this is probably a "
209 raise UnknownStatus("Unknown exit status, this is probably a "
196 "bug in Twisted")
210 "bug in Twisted")
197
211
198 def outReceived(self, data):
212 def outReceived(self, data):
199 log.msg(data)
213 log.msg(data)
200
214
201 def errReceived(self, data):
215 def errReceived(self, data):
202 log.err(data)
216 log.err(data)
203
217
204
218
205 class LocalProcessLauncher(BaseLauncher):
219 class LocalProcessLauncher(BaseLauncher):
206 """Start and stop an external process in an asynchronous manner."""
220 """Start and stop an external process in an asynchronous manner."""
207
221
222 # This is used to to construct self.args, which is passed to
223 # spawnProcess.
208 cmd_and_args = List([])
224 cmd_and_args = List([])
209
225
210 def __init__(self, working_dir, parent=None, name=None, config=None):
226 def __init__(self, working_dir, parent=None, name=None, config=None):
211 super(LocalProcessLauncher, self).__init__(
227 super(LocalProcessLauncher, self).__init__(
212 working_dir, parent, name, config
228 working_dir, parent, name, config
213 )
229 )
214 self.process_protocol = None
230 self.process_protocol = None
215 self.start_deferred = None
231 self.start_deferred = None
216
232
217 def find_args(self):
233 def find_args(self):
218 return self.cmd_and_args
234 return self.cmd_and_args
219
235
220 def start(self):
236 def start(self):
221 if self.state == 'before':
237 if self.state == 'before':
222 self.process_protocol = LocalProcessLauncherProtocol(self)
238 self.process_protocol = LocalProcessLauncherProtocol(self)
223 self.start_deferred = defer.Deferred()
239 self.start_deferred = defer.Deferred()
224 self.process_transport = reactor.spawnProcess(
240 self.process_transport = reactor.spawnProcess(
225 self.process_protocol,
241 self.process_protocol,
226 str(self.args[0]),
242 str(self.args[0]),
227 [str(a) for a in self.args],
243 [str(a) for a in self.args],
228 env=os.environ
244 env=os.environ
229 )
245 )
230 return self.start_deferred
246 return self.start_deferred
231 else:
247 else:
232 s = 'The process was already started and has state: %r' % self.state
248 s = 'The process was already started and has state: %r' % self.state
233 return defer.fail(ProcessStateError(s))
249 return defer.fail(ProcessStateError(s))
234
250
235 def notify_start(self, data):
251 def notify_start(self, data):
236 super(LocalProcessLauncher, self).notify_start(data)
252 super(LocalProcessLauncher, self).notify_start(data)
237 self.start_deferred.callback(data)
253 self.start_deferred.callback(data)
238
254
239 def stop(self):
255 def stop(self):
240 return self.interrupt_then_kill()
256 return self.interrupt_then_kill()
241
257
242 @make_deferred
258 @make_deferred
243 def signal(self, sig):
259 def signal(self, sig):
244 if self.state == 'running':
260 if self.state == 'running':
245 self.process_transport.signalProcess(sig)
261 self.process_transport.signalProcess(sig)
246
262
247 @inlineCallbacks
263 @inlineCallbacks
248 def interrupt_then_kill(self, delay=2.0):
264 def interrupt_then_kill(self, delay=2.0):
265 """Send INT, wait a delay and then send KILL."""
249 yield self.signal('INT')
266 yield self.signal('INT')
250 yield sleep_deferred(delay)
267 yield sleep_deferred(delay)
251 yield self.signal('KILL')
268 yield self.signal('KILL')
252
269
253
270
254 class MPIExecLauncher(LocalProcessLauncher):
271 class MPIExecLauncher(LocalProcessLauncher):
272 """Launch an external process using mpiexec."""
255
273
274 # The mpiexec command to use in starting the process.
256 mpi_cmd = List(['mpiexec'], config=True)
275 mpi_cmd = List(['mpiexec'], config=True)
276 # The command line arguments to pass to mpiexec.
257 mpi_args = List([], config=True)
277 mpi_args = List([], config=True)
278 # The program to start using mpiexec.
258 program = List(['date'], config=True)
279 program = List(['date'], config=True)
280 # The command line argument to the program.
259 program_args = List([], config=True)
281 program_args = List([], config=True)
282 # The number of instances of the program to start.
260 n = Int(1, config=True)
283 n = Int(1, config=True)
261
284
262 def find_args(self):
285 def find_args(self):
286 """Build self.args using all the fields."""
263 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
287 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
264 self.program + self.program_args
288 self.program + self.program_args
265
289
266 def start(self, n):
290 def start(self, n):
291 """Start n instances of the program using mpiexec."""
267 self.n = n
292 self.n = n
268 return super(MPIExecLauncher, self).start()
293 return super(MPIExecLauncher, self).start()
269
294
270
295
271 class SSHLauncher(BaseLauncher):
296 class SSHLauncher(BaseLauncher):
272 """A minimal launcher for ssh.
297 """A minimal launcher for ssh.
273
298
274 To be useful this will probably have to be extended to use the ``sshx``
299 To be useful this will probably have to be extended to use the ``sshx``
275 idea for environment variables. There could be other things this needs
300 idea for environment variables. There could be other things this needs
276 as well.
301 as well.
277 """
302 """
278
303
279 ssh_cmd = List(['ssh'], config=True)
304 ssh_cmd = List(['ssh'], config=True)
280 ssh_args = List([], config=True)
305 ssh_args = List([], config=True)
281 program = List(['date'], config=True)
306 program = List(['date'], config=True)
282 program_args = List([], config=True)
307 program_args = List([], config=True)
283 hostname = Str('', config=True)
308 hostname = Str('', config=True)
284 user = Str(os.environ['USER'], config=True)
309 user = Str(os.environ['USER'], config=True)
285 location = Str('')
310 location = Str('')
286
311
287 def _hostname_changed(self, name, old, new):
312 def _hostname_changed(self, name, old, new):
288 self.location = '%s@%s' % (self.user, new)
313 self.location = '%s@%s' % (self.user, new)
289
314
290 def _user_changed(self, name, old, new):
315 def _user_changed(self, name, old, new):
291 self.location = '%s@%s' % (new, self.hostname)
316 self.location = '%s@%s' % (new, self.hostname)
292
317
293 def find_args(self):
318 def find_args(self):
294 return self.ssh_cmd + self.ssh_args + [self.location] + \
319 return self.ssh_cmd + self.ssh_args + [self.location] + \
295 self.program + self.program_args
320 self.program + self.program_args
296
321
297 def start(self, n, hostname=None, user=None):
322 def start(self, n, hostname=None, user=None):
298 if hostname is not None:
323 if hostname is not None:
299 self.hostname = hostname
324 self.hostname = hostname
300 if user is not None:
325 if user is not None:
301 self.user = user
326 self.user = user
302 return super(SSHLauncher, self).start()
327 return super(SSHLauncher, self).start()
303
328
304
329
305 class WindowsHPCLauncher(BaseLauncher):
330 class WindowsHPCLauncher(BaseLauncher):
306 pass
331 pass
307
332
308
333
309 class BatchSystemLauncher(BaseLauncher):
334 class BatchSystemLauncher(BaseLauncher):
335 """Launch an external process using a batch system.
336
337 This class is designed to work with UNIX batch systems like PBS, LSF,
338 GridEngine, etc. The overall model is that there are different commands
339 like qsub, qdel, etc. that handle the starting and stopping of the process.
340
341 This class also has the notion of a batch script. The ``batch_template``
342 attribute can be set to a string that is a template for the batch script.
343 This template is instantiated using Itpl. Thus the template can use
344 ${n} fot the number of instances. Subclasses can add additional variables
345 to the template dict.
346 """
310
347
311 # Subclasses must fill these in. See PBSEngineSet
348 # Subclasses must fill these in. See PBSEngineSet
349 # The name of the command line program used to submit jobs.
312 submit_command = Str('', config=True)
350 submit_command = Str('', config=True)
351 # The name of the command line program used to delete jobs.
313 delete_command = Str('', config=True)
352 delete_command = Str('', config=True)
353 # A regular expression used to get the job id from the output of the
354 # submit_command.
314 job_id_regexp = Str('', config=True)
355 job_id_regexp = Str('', config=True)
356 # The string that is the batch script template itself.
315 batch_template = Str('', config=True)
357 batch_template = Str('', config=True)
358 # The filename of the instantiated batch script.
316 batch_file_name = Unicode(u'batch_script', config=True)
359 batch_file_name = Unicode(u'batch_script', config=True)
360 # The full path to the instantiated batch script.
317 batch_file = Unicode(u'')
361 batch_file = Unicode(u'')
318
362
319 def __init__(self, working_dir, parent=None, name=None, config=None):
363 def __init__(self, working_dir, parent=None, name=None, config=None):
320 super(BatchSystemLauncher, self).__init__(
364 super(BatchSystemLauncher, self).__init__(
321 working_dir, parent, name, config
365 working_dir, parent, name, config
322 )
366 )
323 self.batch_file = os.path.join(self.working_dir, self.batch_file_name)
367 self.batch_file = os.path.join(self.working_dir, self.batch_file_name)
324 self.context = {}
368 self.context = {}
325
369
326 def parse_job_id(self, output):
370 def parse_job_id(self, output):
371 """Take the output of the submit command and return the job id."""
327 m = re.match(self.job_id_regexp, output)
372 m = re.match(self.job_id_regexp, output)
328 if m is not None:
373 if m is not None:
329 job_id = m.group()
374 job_id = m.group()
330 else:
375 else:
331 raise LauncherError("Job id couldn't be determined: %s" % output)
376 raise LauncherError("Job id couldn't be determined: %s" % output)
332 self.job_id = job_id
377 self.job_id = job_id
333 log.msg('Job started with job id: %r' % job_id)
378 log.msg('Job started with job id: %r' % job_id)
334 return job_id
379 return job_id
335
380
336 def write_batch_script(self, n):
381 def write_batch_script(self, n):
382 """Instantiate and write the batch script to the working_dir."""
337 self.context['n'] = n
383 self.context['n'] = n
338 script_as_string = Itpl.itplns(self.batch_template, self.context)
384 script_as_string = Itpl.itplns(self.batch_template, self.context)
339 log.msg('Writing instantiated batch script: %s' % self.batch_file)
385 log.msg('Writing instantiated batch script: %s' % self.batch_file)
340 f = open(self.batch_file, 'w')
386 f = open(self.batch_file, 'w')
341 f.write(script_as_string)
387 f.write(script_as_string)
342 f.close()
388 f.close()
343
389
344 @inlineCallbacks
390 @inlineCallbacks
345 def start(self, n):
391 def start(self, n):
346 """Start n copies of the process using a batch system."""
392 """Start n copies of the process using a batch system."""
347 self.write_batch_script(n)
393 self.write_batch_script(n)
348 output = yield getProcessOutput(self.submit_command,
394 output = yield getProcessOutput(self.submit_command,
349 [self.batch_file], env=os.environ)
395 [self.batch_file], env=os.environ)
350 job_id = self.parse_job_id(output)
396 job_id = self.parse_job_id(output)
351 self.notify_start(job_id)
397 self.notify_start(job_id)
352 defer.returnValue(job_id)
398 defer.returnValue(job_id)
353
399
354 @inlineCallbacks
400 @inlineCallbacks
355 def stop(self):
401 def stop(self):
356 output = yield getProcessOutput(self.delete_command,
402 output = yield getProcessOutput(self.delete_command,
357 [self.job_id], env=os.environ
403 [self.job_id], env=os.environ
358 )
404 )
359 self.notify_stop(output) # Pass the output of the kill cmd
405 self.notify_stop(output) # Pass the output of the kill cmd
360 defer.returnValue(output)
406 defer.returnValue(output)
361
407
362
408
363 class PBSLauncher(BatchSystemLauncher):
409 class PBSLauncher(BatchSystemLauncher):
410 """A BatchSystemLauncher subclass for PBS."""
364
411
365 submit_command = Str('qsub', config=True)
412 submit_command = Str('qsub', config=True)
366 delete_command = Str('qdel', config=True)
413 delete_command = Str('qdel', config=True)
367 job_id_regexp = Str('\d+', config=True)
414 job_id_regexp = Str('\d+', config=True)
368 batch_template = Str('', config=True)
415 batch_template = Str('', config=True)
369 batch_file_name = Unicode(u'pbs_batch_script', config=True)
416 batch_file_name = Unicode(u'pbs_batch_script', config=True)
370 batch_file = Unicode(u'')
417 batch_file = Unicode(u'')
371
418
372
419
373 #-----------------------------------------------------------------------------
420 #-----------------------------------------------------------------------------
374 # Controller launchers
421 # Controller launchers
375 #-----------------------------------------------------------------------------
422 #-----------------------------------------------------------------------------
376
423
377 def find_controller_cmd():
424 def find_controller_cmd():
425 """Find the command line ipcontroller program in a cross platform way."""
378 if sys.platform == 'win32':
426 if sys.platform == 'win32':
379 # This logic is needed because the ipcontroller script doesn't
427 # This logic is needed because the ipcontroller script doesn't
380 # always get installed in the same way or in the same location.
428 # always get installed in the same way or in the same location.
381 from IPython.kernel import ipcontrollerapp
429 from IPython.kernel import ipcontrollerapp
382 script_location = ipcontrollerapp.__file__.replace('.pyc', '.py')
430 script_location = ipcontrollerapp.__file__.replace('.pyc', '.py')
383 # The -u option here turns on unbuffered output, which is required
431 # The -u option here turns on unbuffered output, which is required
384 # on Win32 to prevent wierd conflict and problems with Twisted.
432 # on Win32 to prevent wierd conflict and problems with Twisted.
385 # Also, use sys.executable to make sure we are picking up the
433 # Also, use sys.executable to make sure we are picking up the
386 # right python exe.
434 # right python exe.
387 cmd = [sys.executable, '-u', script_location]
435 cmd = [sys.executable, '-u', script_location]
388 else:
436 else:
389 # ipcontroller has to be on the PATH in this case.
437 # ipcontroller has to be on the PATH in this case.
390 cmd = ['ipcontroller']
438 cmd = ['ipcontroller']
391 return cmd
439 return cmd
392
440
393
441
394 class LocalControllerLauncher(LocalProcessLauncher):
442 class LocalControllerLauncher(LocalProcessLauncher):
443 """Launch a controller as a regular external process."""
395
444
396 controller_cmd = List(find_controller_cmd())
445 controller_cmd = List(find_controller_cmd())
446 # Command line arguments to ipcontroller.
397 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
447 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
398
448
399 def find_args(self):
449 def find_args(self):
400 return self.controller_cmd + self.controller_args
450 return self.controller_cmd + self.controller_args
401
451
402 def start(self, profile=None, cluster_dir=None):
452 def start(self, profile=None, cluster_dir=None):
453 """Start the controller by profile or cluster_dir."""
403 if cluster_dir is not None:
454 if cluster_dir is not None:
404 self.controller_args.extend(['--cluster-dir', cluster_dir])
455 self.controller_args.extend(['--cluster-dir', cluster_dir])
405 if profile is not None:
456 if profile is not None:
406 self.controller_args.extend(['--profile', profile])
457 self.controller_args.extend(['--profile', profile])
407 log.msg("Starting LocalControllerLauncher: %r" % self.args)
458 log.msg("Starting LocalControllerLauncher: %r" % self.args)
408 return super(LocalControllerLauncher, self).start()
459 return super(LocalControllerLauncher, self).start()
409
460
410
461
411 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
462 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
412 pass
463 pass
413
464
414
465
415 class MPIExecControllerLauncher(MPIExecLauncher):
466 class MPIExecControllerLauncher(MPIExecLauncher):
467 """Launch a controller using mpiexec."""
416
468
417 controller_cmd = List(find_controller_cmd(), config=False)
469 controller_cmd = List(find_controller_cmd(), config=False)
470 # Command line arguments to ipcontroller.
418 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
471 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
419 n = Int(1, config=False)
472 n = Int(1, config=False)
420
473
421 def start(self, profile=None, cluster_dir=None):
474 def start(self, profile=None, cluster_dir=None):
475 """Start the controller by profile or cluster_dir."""
422 if cluster_dir is not None:
476 if cluster_dir is not None:
423 self.controller_args.extend(['--cluster-dir', cluster_dir])
477 self.controller_args.extend(['--cluster-dir', cluster_dir])
424 if profile is not None:
478 if profile is not None:
425 self.controller_args.extend(['--profile', profile])
479 self.controller_args.extend(['--profile', profile])
426 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
480 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
427 return super(MPIExecControllerLauncher, self).start(1)
481 return super(MPIExecControllerLauncher, self).start(1)
428
482
429
430 def find_args(self):
483 def find_args(self):
431 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
484 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
432 self.controller_cmd + self.controller_args
485 self.controller_cmd + self.controller_args
433
486
434
487
435 class PBSControllerLauncher(PBSLauncher):
488 class PBSControllerLauncher(PBSLauncher):
489 """Launch a controller using PBS."""
490
491 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
436
492
437 def start(self, profile=None, cluster_dir=None):
493 def start(self, profile=None, cluster_dir=None):
494 """Start the controller by profile or cluster_dir."""
438 # Here we save profile and cluster_dir in the context so they
495 # Here we save profile and cluster_dir in the context so they
439 # can be used in the batch script template as ${profile} and
496 # can be used in the batch script template as ${profile} and
440 # ${cluster_dir}
497 # ${cluster_dir}
441 if cluster_dir is not None:
498 if cluster_dir is not None:
442 self.context['cluster_dir'] = cluster_dir
499 self.context['cluster_dir'] = cluster_dir
443 if profile is not None:
500 if profile is not None:
444 self.context['profile'] = profile
501 self.context['profile'] = profile
445 log.msg("Starting PBSControllerLauncher: %r" % self.args)
502 log.msg("Starting PBSControllerLauncher: %r" % self.args)
446 return super(PBSControllerLauncher, self).start(1)
503 return super(PBSControllerLauncher, self).start(1)
447
504
448
505
449 class SSHControllerLauncher(SSHLauncher):
506 class SSHControllerLauncher(SSHLauncher):
450 pass
507 pass
451
508
452
509
453 #-----------------------------------------------------------------------------
510 #-----------------------------------------------------------------------------
454 # Engine launchers
511 # Engine launchers
455 #-----------------------------------------------------------------------------
512 #-----------------------------------------------------------------------------
456
513
457
514
458 def find_engine_cmd():
515 def find_engine_cmd():
516 """Find the command line ipengine program in a cross platform way."""
459 if sys.platform == 'win32':
517 if sys.platform == 'win32':
460 # This logic is needed because the ipengine script doesn't
518 # This logic is needed because the ipengine script doesn't
461 # always get installed in the same way or in the same location.
519 # always get installed in the same way or in the same location.
462 from IPython.kernel import ipengineapp
520 from IPython.kernel import ipengineapp
463 script_location = ipengineapp.__file__.replace('.pyc', '.py')
521 script_location = ipengineapp.__file__.replace('.pyc', '.py')
464 # The -u option here turns on unbuffered output, which is required
522 # The -u option here turns on unbuffered output, which is required
465 # on Win32 to prevent wierd conflict and problems with Twisted.
523 # on Win32 to prevent wierd conflict and problems with Twisted.
466 # Also, use sys.executable to make sure we are picking up the
524 # Also, use sys.executable to make sure we are picking up the
467 # right python exe.
525 # right python exe.
468 cmd = [sys.executable, '-u', script_location]
526 cmd = [sys.executable, '-u', script_location]
469 else:
527 else:
470 # ipcontroller has to be on the PATH in this case.
528 # ipcontroller has to be on the PATH in this case.
471 cmd = ['ipengine']
529 cmd = ['ipengine']
472 return cmd
530 return cmd
473
531
474
532
475 class LocalEngineLauncher(LocalProcessLauncher):
533 class LocalEngineLauncher(LocalProcessLauncher):
534 """Launch a single engine as a regular externall process."""
476
535
477 engine_cmd = List(find_engine_cmd())
536 engine_cmd = List(find_engine_cmd())
537 # Command line arguments for ipengine.
478 engine_args = List(
538 engine_args = List(
479 ['--log-to-file','--log-level', '40'], config=True
539 ['--log-to-file','--log-level', '40'], config=True
480 )
540 )
481
541
482 def find_args(self):
542 def find_args(self):
483 return self.engine_cmd + self.engine_args
543 return self.engine_cmd + self.engine_args
484
544
485 def start(self, profile=None, cluster_dir=None):
545 def start(self, profile=None, cluster_dir=None):
546 """Start the engine by profile or cluster_dir."""
486 if cluster_dir is not None:
547 if cluster_dir is not None:
487 self.engine_args.extend(['--cluster-dir', cluster_dir])
548 self.engine_args.extend(['--cluster-dir', cluster_dir])
488 if profile is not None:
549 if profile is not None:
489 self.engine_args.extend(['--profile', profile])
550 self.engine_args.extend(['--profile', profile])
490 return super(LocalEngineLauncher, self).start()
551 return super(LocalEngineLauncher, self).start()
491
552
492
553
493 class LocalEngineSetLauncher(BaseLauncher):
554 class LocalEngineSetLauncher(BaseLauncher):
555 """Launch a set of engines as regular external processes."""
494
556
557 # Command line arguments for ipengine.
495 engine_args = List(
558 engine_args = List(
496 ['--log-to-file','--log-level', '40'], config=True
559 ['--log-to-file','--log-level', '40'], config=True
497 )
560 )
498
561
499 def __init__(self, working_dir, parent=None, name=None, config=None):
562 def __init__(self, working_dir, parent=None, name=None, config=None):
500 super(LocalEngineSetLauncher, self).__init__(
563 super(LocalEngineSetLauncher, self).__init__(
501 working_dir, parent, name, config
564 working_dir, parent, name, config
502 )
565 )
503 self.launchers = []
566 self.launchers = []
504
567
505 def start(self, n, profile=None, cluster_dir=None):
568 def start(self, n, profile=None, cluster_dir=None):
569 """Start n engines by profile or cluster_dir."""
506 dlist = []
570 dlist = []
507 for i in range(n):
571 for i in range(n):
508 el = LocalEngineLauncher(self.working_dir, self)
572 el = LocalEngineLauncher(self.working_dir, self)
509 # Copy the engine args over to each engine launcher.
573 # Copy the engine args over to each engine launcher.
510 import copy
574 import copy
511 el.engine_args = copy.deepcopy(self.engine_args)
575 el.engine_args = copy.deepcopy(self.engine_args)
512 d = el.start(profile, cluster_dir)
576 d = el.start(profile, cluster_dir)
513 if i==0:
577 if i==0:
514 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
578 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
515 self.launchers.append(el)
579 self.launchers.append(el)
516 dlist.append(d)
580 dlist.append(d)
517 # The consumeErrors here could be dangerous
581 # The consumeErrors here could be dangerous
518 dfinal = gatherBoth(dlist, consumeErrors=True)
582 dfinal = gatherBoth(dlist, consumeErrors=True)
519 dfinal.addCallback(self.notify_start)
583 dfinal.addCallback(self.notify_start)
520 return dfinal
584 return dfinal
521
585
522 def find_args(self):
586 def find_args(self):
523 return ['engine set']
587 return ['engine set']
524
588
525 def signal(self, sig):
589 def signal(self, sig):
526 dlist = []
590 dlist = []
527 for el in self.launchers:
591 for el in self.launchers:
528 d = el.signal(sig)
592 d = el.signal(sig)
529 dlist.append(d)
593 dlist.append(d)
530 dfinal = gatherBoth(dlist, consumeErrors=True)
594 dfinal = gatherBoth(dlist, consumeErrors=True)
531 return dfinal
595 return dfinal
532
596
533 def interrupt_then_kill(self, delay=1.0):
597 def interrupt_then_kill(self, delay=1.0):
534 dlist = []
598 dlist = []
535 for el in self.launchers:
599 for el in self.launchers:
536 d = el.interrupt_then_kill(delay)
600 d = el.interrupt_then_kill(delay)
537 dlist.append(d)
601 dlist.append(d)
538 dfinal = gatherBoth(dlist, consumeErrors=True)
602 dfinal = gatherBoth(dlist, consumeErrors=True)
539 return dfinal
603 return dfinal
540
604
541 def stop(self):
605 def stop(self):
542 return self.interrupt_then_kill()
606 return self.interrupt_then_kill()
543
607
544 def observe_stop(self):
608 def observe_stop(self):
545 dlist = [el.observe_stop() for el in self.launchers]
609 dlist = [el.observe_stop() for el in self.launchers]
546 dfinal = gatherBoth(dlist, consumeErrors=False)
610 dfinal = gatherBoth(dlist, consumeErrors=False)
547 dfinal.addCallback(self.notify_stop)
611 dfinal.addCallback(self.notify_stop)
548 return dfinal
612 return dfinal
549
613
550
614
551 class MPIExecEngineSetLauncher(MPIExecLauncher):
615 class MPIExecEngineSetLauncher(MPIExecLauncher):
552
616
553 engine_cmd = List(find_engine_cmd(), config=False)
617 engine_cmd = List(find_engine_cmd(), config=False)
618 # Command line arguments for ipengine.
554 engine_args = List(
619 engine_args = List(
555 ['--log-to-file','--log-level', '40'], config=True
620 ['--log-to-file','--log-level', '40'], config=True
556 )
621 )
557 n = Int(1, config=True)
622 n = Int(1, config=True)
558
623
559 def start(self, n, profile=None, cluster_dir=None):
624 def start(self, n, profile=None, cluster_dir=None):
625 """Start n engines by profile or cluster_dir."""
560 if cluster_dir is not None:
626 if cluster_dir is not None:
561 self.engine_args.extend(['--cluster-dir', cluster_dir])
627 self.engine_args.extend(['--cluster-dir', cluster_dir])
562 if profile is not None:
628 if profile is not None:
563 self.engine_args.extend(['--profile', profile])
629 self.engine_args.extend(['--profile', profile])
564 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
630 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
565 return super(MPIExecEngineSetLauncher, self).start(n)
631 return super(MPIExecEngineSetLauncher, self).start(n)
566
632
567 def find_args(self):
633 def find_args(self):
568 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
634 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
569 self.engine_cmd + self.engine_args
635 self.engine_cmd + self.engine_args
570
636
571
637
572 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
638 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
573 pass
639 pass
574
640
575
641
576 class PBSEngineSetLauncher(PBSLauncher):
642 class PBSEngineSetLauncher(PBSLauncher):
577
643
644 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
645
578 def start(self, n, profile=None, cluster_dir=None):
646 def start(self, n, profile=None, cluster_dir=None):
647 """Start n engines by profile or cluster_dir."""
579 if cluster_dir is not None:
648 if cluster_dir is not None:
580 self.program_args.extend(['--cluster-dir', cluster_dir])
649 self.program_args.extend(['--cluster-dir', cluster_dir])
581 if profile is not None:
650 if profile is not None:
582 self.program_args.extend(['-p', profile])
651 self.program_args.extend(['-p', profile])
583 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
652 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
584 return super(PBSEngineSetLauncher, self).start(n)
653 return super(PBSEngineSetLauncher, self).start(n)
585
654
586
655
587 class SSHEngineSetLauncher(BaseLauncher):
656 class SSHEngineSetLauncher(BaseLauncher):
588 pass
657 pass
589
658
590
659
591 #-----------------------------------------------------------------------------
660 #-----------------------------------------------------------------------------
592 # A launcher for ipcluster itself!
661 # A launcher for ipcluster itself!
593 #-----------------------------------------------------------------------------
662 #-----------------------------------------------------------------------------
594
663
595
664
596 def find_ipcluster_cmd():
665 def find_ipcluster_cmd():
666 """Find the command line ipcluster program in a cross platform way."""
597 if sys.platform == 'win32':
667 if sys.platform == 'win32':
598 # This logic is needed because the ipcluster script doesn't
668 # This logic is needed because the ipcluster script doesn't
599 # always get installed in the same way or in the same location.
669 # always get installed in the same way or in the same location.
600 from IPython.kernel import ipclusterapp
670 from IPython.kernel import ipclusterapp
601 script_location = ipclusterapp.__file__.replace('.pyc', '.py')
671 script_location = ipclusterapp.__file__.replace('.pyc', '.py')
602 # The -u option here turns on unbuffered output, which is required
672 # The -u option here turns on unbuffered output, which is required
603 # on Win32 to prevent wierd conflict and problems with Twisted.
673 # on Win32 to prevent wierd conflict and problems with Twisted.
604 # Also, use sys.executable to make sure we are picking up the
674 # Also, use sys.executable to make sure we are picking up the
605 # right python exe.
675 # right python exe.
606 cmd = [sys.executable, '-u', script_location]
676 cmd = [sys.executable, '-u', script_location]
607 else:
677 else:
608 # ipcontroller has to be on the PATH in this case.
678 # ipcontroller has to be on the PATH in this case.
609 cmd = ['ipcluster']
679 cmd = ['ipcluster']
610 return cmd
680 return cmd
611
681
612
682
613 class IPClusterLauncher(LocalProcessLauncher):
683 class IPClusterLauncher(LocalProcessLauncher):
684 """Launch the ipcluster program in an external process."""
614
685
615 ipcluster_cmd = List(find_ipcluster_cmd())
686 ipcluster_cmd = List(find_ipcluster_cmd())
687 # Command line arguments to pass to ipcluster.
616 ipcluster_args = List(
688 ipcluster_args = List(
617 ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True)
689 ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True)
618 ipcluster_subcommand = Str('start')
690 ipcluster_subcommand = Str('start')
619 ipcluster_n = Int(2)
691 ipcluster_n = Int(2)
620
692
621 def find_args(self):
693 def find_args(self):
622 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
694 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
623 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
695 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
624
696
625 def start(self):
697 def start(self):
626 log.msg("Starting ipcluster: %r" % self.args)
698 log.msg("Starting ipcluster: %r" % self.args)
627 return super(IPClusterLauncher, self).start()
699 return super(IPClusterLauncher, self).start()
628
700
General Comments 0
You need to be logged in to leave comments. Login now