##// END OF EJS Templates
Replace Itpl with str.format in parallel launcher....
Thomas Kluyver -
Show More
@@ -1,241 +1,242 b''
1 import os
1 import os
2
2
3 c = get_config()
3 c = get_config()
4
4
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Select which launchers to use
6 # Select which launchers to use
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 # This allows you to control what method is used to start the controller
9 # This allows you to control what method is used to start the controller
10 # and engines. The following methods are currently supported:
10 # and engines. The following methods are currently supported:
11 # - Start as a regular process on localhost.
11 # - Start as a regular process on localhost.
12 # - Start using mpiexec.
12 # - Start using mpiexec.
13 # - Start using the Windows HPC Server 2008 scheduler
13 # - Start using the Windows HPC Server 2008 scheduler
14 # - Start using PBS/SGE
14 # - Start using PBS/SGE
15 # - Start using SSH
15 # - Start using SSH
16
16
17
17
18 # The selected launchers can be configured below.
18 # The selected launchers can be configured below.
19
19
20 # Options are:
20 # Options are:
21 # - LocalControllerLauncher
21 # - LocalControllerLauncher
22 # - MPIExecControllerLauncher
22 # - MPIExecControllerLauncher
23 # - PBSControllerLauncher
23 # - PBSControllerLauncher
24 # - SGEControllerLauncher
24 # - SGEControllerLauncher
25 # - WindowsHPCControllerLauncher
25 # - WindowsHPCControllerLauncher
26 # c.IPClusterStartApp.controller_launcher = 'IPython.parallel.apps.launcher.LocalControllerLauncher'
26 # c.IPClusterStartApp.controller_launcher = 'IPython.parallel.apps.launcher.LocalControllerLauncher'
27 # c.IPClusterStartApp.controller_launcher = 'IPython.parallel.apps.launcher.PBSControllerLauncher'
27 # c.IPClusterStartApp.controller_launcher = 'IPython.parallel.apps.launcher.PBSControllerLauncher'
28
28
29 # Options are:
29 # Options are:
30 # - LocalEngineSetLauncher
30 # - LocalEngineSetLauncher
31 # - MPIExecEngineSetLauncher
31 # - MPIExecEngineSetLauncher
32 # - PBSEngineSetLauncher
32 # - PBSEngineSetLauncher
33 # - SGEEngineSetLauncher
33 # - SGEEngineSetLauncher
34 # - WindowsHPCEngineSetLauncher
34 # - WindowsHPCEngineSetLauncher
35 # c.IPClusterEnginesApp.engine_launcher = 'IPython.parallel.apps.launcher.LocalEngineSetLauncher'
35 # c.IPClusterEnginesApp.engine_launcher = 'IPython.parallel.apps.launcher.LocalEngineSetLauncher'
36
36
37 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
38 # Application configuration
38 # Application configuration
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40
40
41 # The default number of engines that will be started. This is overridden by
41 # The default number of engines that will be started. This is overridden by
42 # the -n command line option: "ipcluster start -n 4"
42 # the -n command line option: "ipcluster start -n 4"
43 # c.IPClusterEnginesApp.n = 2
43 # c.IPClusterEnginesApp.n = 2
44
44
45 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
45 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
46 # c.BaseParallelApp.log_to_file = False
46 # c.BaseParallelApp.log_to_file = False
47
47
48 # Remove old logs from cluster_dir/log before starting.
48 # Remove old logs from cluster_dir/log before starting.
49 # c.BaseParallelApp.clean_logs = True
49 # c.BaseParallelApp.clean_logs = True
50
50
51 # The working directory for the process. The application will use os.chdir
51 # The working directory for the process. The application will use os.chdir
52 # to change to this directory before starting.
52 # to change to this directory before starting.
53 # c.BaseParallelApp.work_dir = os.getcwd()
53 # c.BaseParallelApp.work_dir = os.getcwd()
54
54
55
55
56 #-----------------------------------------------------------------------------
56 #-----------------------------------------------------------------------------
57 # Local process launchers
57 # Local process launchers
58 #-----------------------------------------------------------------------------
58 #-----------------------------------------------------------------------------
59
59
60 # The command line arguments to call the controller with.
60 # The command line arguments to call the controller with.
61 # c.LocalControllerLauncher.controller_args = \
61 # c.LocalControllerLauncher.controller_args = \
62 # ['--log-to-file','--log-level', '40']
62 # ['--log-to-file','--log-level', '40']
63
63
64 # The working directory for the controller
64 # The working directory for the controller
65 # c.LocalEngineSetLauncher.work_dir = u''
65 # c.LocalEngineSetLauncher.work_dir = u''
66
66
67 # Command line argument passed to the engines.
67 # Command line argument passed to the engines.
68 # c.LocalEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
68 # c.LocalEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
69
69
70 #-----------------------------------------------------------------------------
70 #-----------------------------------------------------------------------------
71 # MPIExec launchers
71 # MPIExec launchers
72 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
73
73
74 # The mpiexec/mpirun command to use in both the controller and engines.
74 # The mpiexec/mpirun command to use in both the controller and engines.
75 # c.MPIExecLauncher.mpi_cmd = ['mpiexec']
75 # c.MPIExecLauncher.mpi_cmd = ['mpiexec']
76
76
77 # Additional arguments to pass to the actual mpiexec command.
77 # Additional arguments to pass to the actual mpiexec command.
78 # c.MPIExecLauncher.mpi_args = []
78 # c.MPIExecLauncher.mpi_args = []
79
79
80 # The mpiexec/mpirun command and args can be overridden if they should be different
80 # The mpiexec/mpirun command and args can be overridden if they should be different
81 # for controller and engines.
81 # for controller and engines.
82 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
82 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
83 # c.MPIExecControllerLauncher.mpi_args = []
83 # c.MPIExecControllerLauncher.mpi_args = []
84 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
84 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
85 # c.MPIExecEngineSetLauncher.mpi_args = []
85 # c.MPIExecEngineSetLauncher.mpi_args = []
86
86
87 # The command line argument to call the controller with.
87 # The command line argument to call the controller with.
88 # c.MPIExecControllerLauncher.controller_args = \
88 # c.MPIExecControllerLauncher.controller_args = \
89 # ['--log-to-file','--log-level', '40']
89 # ['--log-to-file','--log-level', '40']
90
90
91 # Command line argument passed to the engines.
91 # Command line argument passed to the engines.
92 # c.MPIExecEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
92 # c.MPIExecEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
93
93
94 # The default number of engines to start if not given elsewhere.
94 # The default number of engines to start if not given elsewhere.
95 # c.MPIExecEngineSetLauncher.n = 1
95 # c.MPIExecEngineSetLauncher.n = 1
96
96
97 #-----------------------------------------------------------------------------
97 #-----------------------------------------------------------------------------
98 # SSH launchers
98 # SSH launchers
99 #-----------------------------------------------------------------------------
99 #-----------------------------------------------------------------------------
100
100
101 # ipclusterz can be used to launch controller and engines remotely via ssh.
101 # ipclusterz can be used to launch controller and engines remotely via ssh.
102 # Note that currently ipclusterz does not do any file distribution, so if
102 # Note that currently ipclusterz does not do any file distribution, so if
103 # machines are not on a shared filesystem, config and json files must be
103 # machines are not on a shared filesystem, config and json files must be
104 # distributed. For this reason, the reuse_files defaults to True on an
104 # distributed. For this reason, the reuse_files defaults to True on an
105 # ssh-launched Controller. This flag can be overridded by the program_args
105 # ssh-launched Controller. This flag can be overridded by the program_args
106 # attribute of c.SSHControllerLauncher.
106 # attribute of c.SSHControllerLauncher.
107
107
108 # set the ssh cmd for launching remote commands. The default is ['ssh']
108 # set the ssh cmd for launching remote commands. The default is ['ssh']
109 # c.SSHLauncher.ssh_cmd = ['ssh']
109 # c.SSHLauncher.ssh_cmd = ['ssh']
110
110
111 # set the ssh cmd for launching remote commands. The default is ['ssh']
111 # set the ssh cmd for launching remote commands. The default is ['ssh']
112 # c.SSHLauncher.ssh_args = ['tt']
112 # c.SSHLauncher.ssh_args = ['tt']
113
113
114 # Set the user and hostname for the controller
114 # Set the user and hostname for the controller
115 # c.SSHControllerLauncher.hostname = 'controller.example.com'
115 # c.SSHControllerLauncher.hostname = 'controller.example.com'
116 # c.SSHControllerLauncher.user = os.environ.get('USER','username')
116 # c.SSHControllerLauncher.user = os.environ.get('USER','username')
117
117
118 # Set the arguments to be passed to ipcontrollerz
118 # Set the arguments to be passed to ipcontrollerz
119 # note that remotely launched ipcontrollerz will not get the contents of
119 # note that remotely launched ipcontrollerz will not get the contents of
120 # the local ipcontrollerz_config.py unless it resides on the *remote host*
120 # the local ipcontrollerz_config.py unless it resides on the *remote host*
121 # in the location specified by the --cluster_dir argument.
121 # in the location specified by the --cluster_dir argument.
122 # c.SSHControllerLauncher.program_args = ['-r', '-ip', '0.0.0.0', '--cluster_dir', '/path/to/cd']
122 # c.SSHControllerLauncher.program_args = ['-r', '-ip', '0.0.0.0', '--cluster_dir', '/path/to/cd']
123
123
124 # Set the default args passed to ipenginez for SSH launched engines
124 # Set the default args passed to ipengine for SSH launched engines
125 # c.SSHEngineSetLauncher.engine_args = ['--mpi', 'mpi4py']
125 # c.SSHEngineSetLauncher.engine_args = ['--mpi', 'mpi4py']
126
126
127 # SSH engines are launched as a dict of locations/n-engines.
127 # SSH engines are launched as a dict of locations/n-engines.
128 # if a value is a tuple instead of an int, it is assumed to be of the form
128 # if a value is a tuple instead of an int, it is assumed to be of the form
129 # (n, [args]), setting the arguments to passed to ipenginez on `host`.
129 # (n, [args]), setting the arguments to passed to ipengine on `host`.
130 # otherwise, c.SSHEngineSetLauncher.engine_args will be used as the default.
130 # otherwise, c.SSHEngineSetLauncher.engine_args will be used as the default.
131
131
132 # In this case, there will be 3 engines at my.example.com, and
132 # In this case, there will be 3 engines at my.example.com, and
133 # 2 at you@ipython.scipy.org with a special json connector location.
133 # 2 at you@ipython.scipy.org with a special json connector location.
134 # c.SSHEngineSetLauncher.engines = {'my.example.com' : 3,
134 # c.SSHEngineSetLauncher.engines = {'my.example.com' : 3,
135 # 'you@ipython.scipy.org' : (2, ['-f', '/path/to/ipcontroller-engine.json']}
135 # 'you@ipython.scipy.org' : (2, ['-f', '/path/to/ipcontroller-engine.json']}
136 # }
136 # }
137
137
138 #-----------------------------------------------------------------------------
138 #-----------------------------------------------------------------------------
139 # Unix batch (PBS) schedulers launchers
139 # Unix batch (PBS) schedulers launchers
140 #-----------------------------------------------------------------------------
140 #-----------------------------------------------------------------------------
141
141
142 # SGE and PBS are very similar. All configurables in this section called 'PBS*'
142 # SGE and PBS are very similar. All configurables in this section called 'PBS*'
143 # also exist as 'SGE*'.
143 # also exist as 'SGE*'.
144
144
145 # The command line program to use to submit a PBS job.
145 # The command line program to use to submit a PBS job.
146 # c.PBSLauncher.submit_command = ['qsub']
146 # c.PBSLauncher.submit_command = ['qsub']
147
147
148 # The command line program to use to delete a PBS job.
148 # The command line program to use to delete a PBS job.
149 # c.PBSLauncher.delete_command = ['qdel']
149 # c.PBSLauncher.delete_command = ['qdel']
150
150
151 # The PBS queue in which the job should run
151 # The PBS queue in which the job should run
152 # c.PBSLauncher.queue = 'myqueue'
152 # c.PBSLauncher.queue = 'myqueue'
153
153
154 # A regular expression that takes the output of qsub and find the job id.
154 # A regular expression that takes the output of qsub and find the job id.
155 # c.PBSLauncher.job_id_regexp = r'\d+'
155 # c.PBSLauncher.job_id_regexp = r'\d+'
156
156
157 # If for some reason the Controller and Engines have different options above, they
157 # If for some reason the Controller and Engines have different options above, they
158 # can be set as c.PBSControllerLauncher.<option> etc.
158 # can be set as c.PBSControllerLauncher.<option> etc.
159
159
160 # PBS and SGE have default templates, but you can specify your own, either as strings
160 # PBS and SGE have default templates, but you can specify your own, either as strings
161 # or from files, as described here:
161 # or from files, as described here:
162
162
163 # The batch submission script used to start the controller. This is where
163 # The batch submission script used to start the controller. This is where
164 # environment variables would be setup, etc. This string is interpreted using
164 # environment variables would be setup, etc. This string is interpreted using
165 # the Itpl module in IPython.external. Basically, you can use ${n} for the
165 # Python's string formatting. Basically, you can use {queue} for the name
166 # number of engine and ${cluster_dir} for the cluster_dir.
166 # of the PBS queue, and {profile_dir} for the profile_dir.
167 # c.PBSControllerLauncher.batch_template = """
167 # c.PBSControllerLauncher.batch_template = """
168 # #PBS -N ipcontroller
168 # #PBS -N ipcontroller
169 # #PBS -q $queue
169 # #PBS -q {queue}
170 #
170 #
171 # ipcontrollerz --cluster-dir $cluster_dir
171 # ipcontroller profile_dir={profile_dir}
172 # """
172 # """
173
173
174 # You can also load this template from a file
174 # You can also load this template from a file
175 # c.PBSControllerLauncher.batch_template_file = u"/path/to/my/template.sh"
175 # c.PBSControllerLauncher.batch_template_file = u"/path/to/my/template.sh"
176
176
177 # The name of the instantiated batch script that will actually be used to
177 # The name of the instantiated batch script that will actually be used to
178 # submit the job. This will be written to the cluster directory.
178 # submit the job. This will be written to the cluster directory.
179 # c.PBSControllerLauncher.batch_file_name = u'pbs_controller'
179 # c.PBSControllerLauncher.batch_file_name = u'pbs_controller'
180
180
181 # The batch submission script used to start the engines. This is where
181 # The batch submission script used to start the engines. This is where
182 # environment variables would be setup, etc. This string is interpreted using
182 # environment variables would be setup, etc. This string is interpreted using
183 # the Itpl module in IPython.external. Basically, you can use ${n} for the
183 # Python's string formatting. Basically, you can use {queue} for the name
184 # number of engine and ${cluster_dir} for the cluster_dir.
184 # of the PBS queue, and {profile_dir} for the profile_dir, and {n}
185 # for the number of engines.
185 # c.PBSEngineSetLauncher.batch_template = """
186 # c.PBSEngineSetLauncher.batch_template = """
186 # #PBS -N ipcontroller
187 # #PBS -N ipengine
187 # #PBS -l nprocs=$n
188 # #PBS -l nprocs={n}
188 #
189 #
189 # ipenginez --cluster-dir $cluster_dir$s
190 # ipengine profile_dir={profile_dir}
190 # """
191 # """
191
192
192 # You can also load this template from a file
193 # You can also load this template from a file
193 # c.PBSControllerLauncher.batch_template_file = u"/path/to/my/template.sh"
194 # c.PBSControllerLauncher.batch_template_file = u"/path/to/my/template.sh"
194
195
195 # The name of the instantiated batch script that will actually be used to
196 # The name of the instantiated batch script that will actually be used to
196 # submit the job. This will be written to the cluster directory.
197 # submit the job. This will be written to the cluster directory.
197 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_engines'
198 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_engines'
198
199
199
200
200
201
201 #-----------------------------------------------------------------------------
202 #-----------------------------------------------------------------------------
202 # Windows HPC Server 2008 launcher configuration
203 # Windows HPC Server 2008 launcher configuration
203 #-----------------------------------------------------------------------------
204 #-----------------------------------------------------------------------------
204
205
205 # c.IPControllerJob.job_name = 'IPController'
206 # c.IPControllerJob.job_name = 'IPController'
206 # c.IPControllerJob.is_exclusive = False
207 # c.IPControllerJob.is_exclusive = False
207 # c.IPControllerJob.username = r'USERDOMAIN\USERNAME'
208 # c.IPControllerJob.username = r'USERDOMAIN\USERNAME'
208 # c.IPControllerJob.priority = 'Highest'
209 # c.IPControllerJob.priority = 'Highest'
209 # c.IPControllerJob.requested_nodes = ''
210 # c.IPControllerJob.requested_nodes = ''
210 # c.IPControllerJob.project = 'MyProject'
211 # c.IPControllerJob.project = 'MyProject'
211
212
212 # c.IPControllerTask.task_name = 'IPController'
213 # c.IPControllerTask.task_name = 'IPController'
213 # c.IPControllerTask.controller_cmd = [u'ipcontroller.exe']
214 # c.IPControllerTask.controller_cmd = [u'ipcontroller.exe']
214 # c.IPControllerTask.controller_args = ['--log-to-file', '--log-level', '40']
215 # c.IPControllerTask.controller_args = ['--log-to-file', 'log_level=40']
215 # c.IPControllerTask.environment_variables = {}
216 # c.IPControllerTask.environment_variables = {}
216
217
217 # c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE'
218 # c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE'
218 # c.WindowsHPCControllerLauncher.job_file_name = u'ipcontroller_job.xml'
219 # c.WindowsHPCControllerLauncher.job_file_name = u'ipcontroller_job.xml'
219
220
220
221
221 # c.IPEngineSetJob.job_name = 'IPEngineSet'
222 # c.IPEngineSetJob.job_name = 'IPEngineSet'
222 # c.IPEngineSetJob.is_exclusive = False
223 # c.IPEngineSetJob.is_exclusive = False
223 # c.IPEngineSetJob.username = r'USERDOMAIN\USERNAME'
224 # c.IPEngineSetJob.username = r'USERDOMAIN\USERNAME'
224 # c.IPEngineSetJob.priority = 'Highest'
225 # c.IPEngineSetJob.priority = 'Highest'
225 # c.IPEngineSetJob.requested_nodes = ''
226 # c.IPEngineSetJob.requested_nodes = ''
226 # c.IPEngineSetJob.project = 'MyProject'
227 # c.IPEngineSetJob.project = 'MyProject'
227
228
228 # c.IPEngineTask.task_name = 'IPEngine'
229 # c.IPEngineTask.task_name = 'IPEngine'
229 # c.IPEngineTask.engine_cmd = [u'ipengine.exe']
230 # c.IPEngineTask.engine_cmd = [u'ipengine.exe']
230 # c.IPEngineTask.engine_args = ['--log-to-file', '--log-level', '40']
231 # c.IPEngineTask.engine_args = ['--log-to-file', 'log_level=40']
231 # c.IPEngineTask.environment_variables = {}
232 # c.IPEngineTask.environment_variables = {}
232
233
233 # c.WindowsHPCEngineSetLauncher.scheduler = 'HEADNODE'
234 # c.WindowsHPCEngineSetLauncher.scheduler = 'HEADNODE'
234 # c.WindowsHPCEngineSetLauncher.job_file_name = u'ipengineset_job.xml'
235 # c.WindowsHPCEngineSetLauncher.job_file_name = u'ipengineset_job.xml'
235
236
236
237
237
238
238
239
239
240
240
241
241
242
@@ -1,1070 +1,1067 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching IPython processes asynchronously.
4 Facilities for launching IPython processes asynchronously.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import copy
18 import copy
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import stat
22 import stat
23
23
24 # signal imports, handling various platforms, versions
24 # signal imports, handling various platforms, versions
25
25
26 from signal import SIGINT, SIGTERM
26 from signal import SIGINT, SIGTERM
27 try:
27 try:
28 from signal import SIGKILL
28 from signal import SIGKILL
29 except ImportError:
29 except ImportError:
30 # Windows
30 # Windows
31 SIGKILL=SIGTERM
31 SIGKILL=SIGTERM
32
32
33 try:
33 try:
34 # Windows >= 2.7, 3.2
34 # Windows >= 2.7, 3.2
35 from signal import CTRL_C_EVENT as SIGINT
35 from signal import CTRL_C_EVENT as SIGINT
36 except ImportError:
36 except ImportError:
37 pass
37 pass
38
38
39 from subprocess import Popen, PIPE, STDOUT
39 from subprocess import Popen, PIPE, STDOUT
40 try:
40 try:
41 from subprocess import check_output
41 from subprocess import check_output
42 except ImportError:
42 except ImportError:
43 # pre-2.7, define check_output with Popen
43 # pre-2.7, define check_output with Popen
44 def check_output(*args, **kwargs):
44 def check_output(*args, **kwargs):
45 kwargs.update(dict(stdout=PIPE))
45 kwargs.update(dict(stdout=PIPE))
46 p = Popen(*args, **kwargs)
46 p = Popen(*args, **kwargs)
47 out,err = p.communicate()
47 out,err = p.communicate()
48 return out
48 return out
49
49
50 from zmq.eventloop import ioloop
50 from zmq.eventloop import ioloop
51
51
52 from IPython.external import Itpl
53 # from IPython.config.configurable import Configurable
52 # from IPython.config.configurable import Configurable
54 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
53 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
55 from IPython.utils.path import get_ipython_module_path
54 from IPython.utils.path import get_ipython_module_path
56 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
55 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
57
56
58 from IPython.parallel.factory import LoggingFactory
57 from IPython.parallel.factory import LoggingFactory
59
58
60 from .win32support import forward_read_events
59 from .win32support import forward_read_events
61
60
62 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
61 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
63
62
64 WINDOWS = os.name == 'nt'
63 WINDOWS = os.name == 'nt'
65
64
66 #-----------------------------------------------------------------------------
65 #-----------------------------------------------------------------------------
67 # Paths to the kernel apps
66 # Paths to the kernel apps
68 #-----------------------------------------------------------------------------
67 #-----------------------------------------------------------------------------
69
68
70
69
71 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
70 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
72 'IPython.parallel.apps.ipclusterapp'
71 'IPython.parallel.apps.ipclusterapp'
73 ))
72 ))
74
73
75 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
74 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
76 'IPython.parallel.apps.ipengineapp'
75 'IPython.parallel.apps.ipengineapp'
77 ))
76 ))
78
77
79 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
78 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
80 'IPython.parallel.apps.ipcontrollerapp'
79 'IPython.parallel.apps.ipcontrollerapp'
81 ))
80 ))
82
81
83 #-----------------------------------------------------------------------------
82 #-----------------------------------------------------------------------------
84 # Base launchers and errors
83 # Base launchers and errors
85 #-----------------------------------------------------------------------------
84 #-----------------------------------------------------------------------------
86
85
87
86
88 class LauncherError(Exception):
87 class LauncherError(Exception):
89 pass
88 pass
90
89
91
90
92 class ProcessStateError(LauncherError):
91 class ProcessStateError(LauncherError):
93 pass
92 pass
94
93
95
94
96 class UnknownStatus(LauncherError):
95 class UnknownStatus(LauncherError):
97 pass
96 pass
98
97
99
98
100 class BaseLauncher(LoggingFactory):
99 class BaseLauncher(LoggingFactory):
101 """An asbtraction for starting, stopping and signaling a process."""
100 """An asbtraction for starting, stopping and signaling a process."""
102
101
103 # In all of the launchers, the work_dir is where child processes will be
102 # In all of the launchers, the work_dir is where child processes will be
104 # run. This will usually be the profile_dir, but may not be. any work_dir
103 # run. This will usually be the profile_dir, but may not be. any work_dir
105 # passed into the __init__ method will override the config value.
104 # passed into the __init__ method will override the config value.
106 # This should not be used to set the work_dir for the actual engine
105 # This should not be used to set the work_dir for the actual engine
107 # and controller. Instead, use their own config files or the
106 # and controller. Instead, use their own config files or the
108 # controller_args, engine_args attributes of the launchers to add
107 # controller_args, engine_args attributes of the launchers to add
109 # the work_dir option.
108 # the work_dir option.
110 work_dir = Unicode(u'.')
109 work_dir = Unicode(u'.')
111 loop = Instance('zmq.eventloop.ioloop.IOLoop')
110 loop = Instance('zmq.eventloop.ioloop.IOLoop')
112
111
113 start_data = Any()
112 start_data = Any()
114 stop_data = Any()
113 stop_data = Any()
115
114
116 def _loop_default(self):
115 def _loop_default(self):
117 return ioloop.IOLoop.instance()
116 return ioloop.IOLoop.instance()
118
117
119 def __init__(self, work_dir=u'.', config=None, **kwargs):
118 def __init__(self, work_dir=u'.', config=None, **kwargs):
120 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
119 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
121 self.state = 'before' # can be before, running, after
120 self.state = 'before' # can be before, running, after
122 self.stop_callbacks = []
121 self.stop_callbacks = []
123 self.start_data = None
122 self.start_data = None
124 self.stop_data = None
123 self.stop_data = None
125
124
126 @property
125 @property
127 def args(self):
126 def args(self):
128 """A list of cmd and args that will be used to start the process.
127 """A list of cmd and args that will be used to start the process.
129
128
130 This is what is passed to :func:`spawnProcess` and the first element
129 This is what is passed to :func:`spawnProcess` and the first element
131 will be the process name.
130 will be the process name.
132 """
131 """
133 return self.find_args()
132 return self.find_args()
134
133
135 def find_args(self):
134 def find_args(self):
136 """The ``.args`` property calls this to find the args list.
135 """The ``.args`` property calls this to find the args list.
137
136
138 Subcommand should implement this to construct the cmd and args.
137 Subcommand should implement this to construct the cmd and args.
139 """
138 """
140 raise NotImplementedError('find_args must be implemented in a subclass')
139 raise NotImplementedError('find_args must be implemented in a subclass')
141
140
142 @property
141 @property
143 def arg_str(self):
142 def arg_str(self):
144 """The string form of the program arguments."""
143 """The string form of the program arguments."""
145 return ' '.join(self.args)
144 return ' '.join(self.args)
146
145
147 @property
146 @property
148 def running(self):
147 def running(self):
149 """Am I running."""
148 """Am I running."""
150 if self.state == 'running':
149 if self.state == 'running':
151 return True
150 return True
152 else:
151 else:
153 return False
152 return False
154
153
155 def start(self):
154 def start(self):
156 """Start the process.
155 """Start the process.
157
156
158 This must return a deferred that fires with information about the
157 This must return a deferred that fires with information about the
159 process starting (like a pid, job id, etc.).
158 process starting (like a pid, job id, etc.).
160 """
159 """
161 raise NotImplementedError('start must be implemented in a subclass')
160 raise NotImplementedError('start must be implemented in a subclass')
162
161
163 def stop(self):
162 def stop(self):
164 """Stop the process and notify observers of stopping.
163 """Stop the process and notify observers of stopping.
165
164
166 This must return a deferred that fires with information about the
165 This must return a deferred that fires with information about the
167 processing stopping, like errors that occur while the process is
166 processing stopping, like errors that occur while the process is
168 attempting to be shut down. This deferred won't fire when the process
167 attempting to be shut down. This deferred won't fire when the process
169 actually stops. To observe the actual process stopping, see
168 actually stops. To observe the actual process stopping, see
170 :func:`observe_stop`.
169 :func:`observe_stop`.
171 """
170 """
172 raise NotImplementedError('stop must be implemented in a subclass')
171 raise NotImplementedError('stop must be implemented in a subclass')
173
172
174 def on_stop(self, f):
173 def on_stop(self, f):
175 """Get a deferred that will fire when the process stops.
174 """Get a deferred that will fire when the process stops.
176
175
177 The deferred will fire with data that contains information about
176 The deferred will fire with data that contains information about
178 the exit status of the process.
177 the exit status of the process.
179 """
178 """
180 if self.state=='after':
179 if self.state=='after':
181 return f(self.stop_data)
180 return f(self.stop_data)
182 else:
181 else:
183 self.stop_callbacks.append(f)
182 self.stop_callbacks.append(f)
184
183
185 def notify_start(self, data):
184 def notify_start(self, data):
186 """Call this to trigger startup actions.
185 """Call this to trigger startup actions.
187
186
188 This logs the process startup and sets the state to 'running'. It is
187 This logs the process startup and sets the state to 'running'. It is
189 a pass-through so it can be used as a callback.
188 a pass-through so it can be used as a callback.
190 """
189 """
191
190
192 self.log.info('Process %r started: %r' % (self.args[0], data))
191 self.log.info('Process %r started: %r' % (self.args[0], data))
193 self.start_data = data
192 self.start_data = data
194 self.state = 'running'
193 self.state = 'running'
195 return data
194 return data
196
195
197 def notify_stop(self, data):
196 def notify_stop(self, data):
198 """Call this to trigger process stop actions.
197 """Call this to trigger process stop actions.
199
198
200 This logs the process stopping and sets the state to 'after'. Call
199 This logs the process stopping and sets the state to 'after'. Call
201 this to trigger all the deferreds from :func:`observe_stop`."""
200 this to trigger all the deferreds from :func:`observe_stop`."""
202
201
203 self.log.info('Process %r stopped: %r' % (self.args[0], data))
202 self.log.info('Process %r stopped: %r' % (self.args[0], data))
204 self.stop_data = data
203 self.stop_data = data
205 self.state = 'after'
204 self.state = 'after'
206 for i in range(len(self.stop_callbacks)):
205 for i in range(len(self.stop_callbacks)):
207 d = self.stop_callbacks.pop()
206 d = self.stop_callbacks.pop()
208 d(data)
207 d(data)
209 return data
208 return data
210
209
211 def signal(self, sig):
210 def signal(self, sig):
212 """Signal the process.
211 """Signal the process.
213
212
214 Return a semi-meaningless deferred after signaling the process.
213 Return a semi-meaningless deferred after signaling the process.
215
214
216 Parameters
215 Parameters
217 ----------
216 ----------
218 sig : str or int
217 sig : str or int
219 'KILL', 'INT', etc., or any signal number
218 'KILL', 'INT', etc., or any signal number
220 """
219 """
221 raise NotImplementedError('signal must be implemented in a subclass')
220 raise NotImplementedError('signal must be implemented in a subclass')
222
221
223
222
224 #-----------------------------------------------------------------------------
223 #-----------------------------------------------------------------------------
225 # Local process launchers
224 # Local process launchers
226 #-----------------------------------------------------------------------------
225 #-----------------------------------------------------------------------------
227
226
228
227
229 class LocalProcessLauncher(BaseLauncher):
228 class LocalProcessLauncher(BaseLauncher):
230 """Start and stop an external process in an asynchronous manner.
229 """Start and stop an external process in an asynchronous manner.
231
230
232 This will launch the external process with a working directory of
231 This will launch the external process with a working directory of
233 ``self.work_dir``.
232 ``self.work_dir``.
234 """
233 """
235
234
236 # This is used to to construct self.args, which is passed to
235 # This is used to to construct self.args, which is passed to
237 # spawnProcess.
236 # spawnProcess.
238 cmd_and_args = List([])
237 cmd_and_args = List([])
239 poll_frequency = Int(100) # in ms
238 poll_frequency = Int(100) # in ms
240
239
241 def __init__(self, work_dir=u'.', config=None, **kwargs):
240 def __init__(self, work_dir=u'.', config=None, **kwargs):
242 super(LocalProcessLauncher, self).__init__(
241 super(LocalProcessLauncher, self).__init__(
243 work_dir=work_dir, config=config, **kwargs
242 work_dir=work_dir, config=config, **kwargs
244 )
243 )
245 self.process = None
244 self.process = None
246 self.start_deferred = None
245 self.start_deferred = None
247 self.poller = None
246 self.poller = None
248
247
249 def find_args(self):
248 def find_args(self):
250 return self.cmd_and_args
249 return self.cmd_and_args
251
250
252 def start(self):
251 def start(self):
253 if self.state == 'before':
252 if self.state == 'before':
254 self.process = Popen(self.args,
253 self.process = Popen(self.args,
255 stdout=PIPE,stderr=PIPE,stdin=PIPE,
254 stdout=PIPE,stderr=PIPE,stdin=PIPE,
256 env=os.environ,
255 env=os.environ,
257 cwd=self.work_dir
256 cwd=self.work_dir
258 )
257 )
259 if WINDOWS:
258 if WINDOWS:
260 self.stdout = forward_read_events(self.process.stdout)
259 self.stdout = forward_read_events(self.process.stdout)
261 self.stderr = forward_read_events(self.process.stderr)
260 self.stderr = forward_read_events(self.process.stderr)
262 else:
261 else:
263 self.stdout = self.process.stdout.fileno()
262 self.stdout = self.process.stdout.fileno()
264 self.stderr = self.process.stderr.fileno()
263 self.stderr = self.process.stderr.fileno()
265 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
264 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
266 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
265 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
267 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
266 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
268 self.poller.start()
267 self.poller.start()
269 self.notify_start(self.process.pid)
268 self.notify_start(self.process.pid)
270 else:
269 else:
271 s = 'The process was already started and has state: %r' % self.state
270 s = 'The process was already started and has state: %r' % self.state
272 raise ProcessStateError(s)
271 raise ProcessStateError(s)
273
272
274 def stop(self):
273 def stop(self):
275 return self.interrupt_then_kill()
274 return self.interrupt_then_kill()
276
275
277 def signal(self, sig):
276 def signal(self, sig):
278 if self.state == 'running':
277 if self.state == 'running':
279 if WINDOWS and sig != SIGINT:
278 if WINDOWS and sig != SIGINT:
280 # use Windows tree-kill for better child cleanup
279 # use Windows tree-kill for better child cleanup
281 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
280 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
282 else:
281 else:
283 self.process.send_signal(sig)
282 self.process.send_signal(sig)
284
283
285 def interrupt_then_kill(self, delay=2.0):
284 def interrupt_then_kill(self, delay=2.0):
286 """Send INT, wait a delay and then send KILL."""
285 """Send INT, wait a delay and then send KILL."""
287 try:
286 try:
288 self.signal(SIGINT)
287 self.signal(SIGINT)
289 except Exception:
288 except Exception:
290 self.log.debug("interrupt failed")
289 self.log.debug("interrupt failed")
291 pass
290 pass
292 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
291 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
293 self.killer.start()
292 self.killer.start()
294
293
295 # callbacks, etc:
294 # callbacks, etc:
296
295
297 def handle_stdout(self, fd, events):
296 def handle_stdout(self, fd, events):
298 if WINDOWS:
297 if WINDOWS:
299 line = self.stdout.recv()
298 line = self.stdout.recv()
300 else:
299 else:
301 line = self.process.stdout.readline()
300 line = self.process.stdout.readline()
302 # a stopped process will be readable but return empty strings
301 # a stopped process will be readable but return empty strings
303 if line:
302 if line:
304 self.log.info(line[:-1])
303 self.log.info(line[:-1])
305 else:
304 else:
306 self.poll()
305 self.poll()
307
306
308 def handle_stderr(self, fd, events):
307 def handle_stderr(self, fd, events):
309 if WINDOWS:
308 if WINDOWS:
310 line = self.stderr.recv()
309 line = self.stderr.recv()
311 else:
310 else:
312 line = self.process.stderr.readline()
311 line = self.process.stderr.readline()
313 # a stopped process will be readable but return empty strings
312 # a stopped process will be readable but return empty strings
314 if line:
313 if line:
315 self.log.error(line[:-1])
314 self.log.error(line[:-1])
316 else:
315 else:
317 self.poll()
316 self.poll()
318
317
319 def poll(self):
318 def poll(self):
320 status = self.process.poll()
319 status = self.process.poll()
321 if status is not None:
320 if status is not None:
322 self.poller.stop()
321 self.poller.stop()
323 self.loop.remove_handler(self.stdout)
322 self.loop.remove_handler(self.stdout)
324 self.loop.remove_handler(self.stderr)
323 self.loop.remove_handler(self.stderr)
325 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
324 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
326 return status
325 return status
327
326
328 class LocalControllerLauncher(LocalProcessLauncher):
327 class LocalControllerLauncher(LocalProcessLauncher):
329 """Launch a controller as a regular external process."""
328 """Launch a controller as a regular external process."""
330
329
331 controller_cmd = List(ipcontroller_cmd_argv, config=True,
330 controller_cmd = List(ipcontroller_cmd_argv, config=True,
332 help="""Popen command to launch ipcontroller.""")
331 help="""Popen command to launch ipcontroller.""")
333 # Command line arguments to ipcontroller.
332 # Command line arguments to ipcontroller.
334 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
333 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
335 help="""command-line args to pass to ipcontroller""")
334 help="""command-line args to pass to ipcontroller""")
336
335
337 def find_args(self):
336 def find_args(self):
338 return self.controller_cmd + self.controller_args
337 return self.controller_cmd + self.controller_args
339
338
340 def start(self, profile_dir):
339 def start(self, profile_dir):
341 """Start the controller by profile_dir."""
340 """Start the controller by profile_dir."""
342 self.controller_args.extend(['profile_dir=%s'%profile_dir])
341 self.controller_args.extend(['profile_dir=%s'%profile_dir])
343 self.profile_dir = unicode(profile_dir)
342 self.profile_dir = unicode(profile_dir)
344 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
343 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
345 return super(LocalControllerLauncher, self).start()
344 return super(LocalControllerLauncher, self).start()
346
345
347
346
348 class LocalEngineLauncher(LocalProcessLauncher):
347 class LocalEngineLauncher(LocalProcessLauncher):
349 """Launch a single engine as a regular externall process."""
348 """Launch a single engine as a regular externall process."""
350
349
351 engine_cmd = List(ipengine_cmd_argv, config=True,
350 engine_cmd = List(ipengine_cmd_argv, config=True,
352 help="""command to launch the Engine.""")
351 help="""command to launch the Engine.""")
353 # Command line arguments for ipengine.
352 # Command line arguments for ipengine.
354 engine_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
353 engine_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
355 help="command-line arguments to pass to ipengine"
354 help="command-line arguments to pass to ipengine"
356 )
355 )
357
356
358 def find_args(self):
357 def find_args(self):
359 return self.engine_cmd + self.engine_args
358 return self.engine_cmd + self.engine_args
360
359
361 def start(self, profile_dir):
360 def start(self, profile_dir):
362 """Start the engine by profile_dir."""
361 """Start the engine by profile_dir."""
363 self.engine_args.extend(['profile_dir=%s'%profile_dir])
362 self.engine_args.extend(['profile_dir=%s'%profile_dir])
364 self.profile_dir = unicode(profile_dir)
363 self.profile_dir = unicode(profile_dir)
365 return super(LocalEngineLauncher, self).start()
364 return super(LocalEngineLauncher, self).start()
366
365
367
366
368 class LocalEngineSetLauncher(BaseLauncher):
367 class LocalEngineSetLauncher(BaseLauncher):
369 """Launch a set of engines as regular external processes."""
368 """Launch a set of engines as regular external processes."""
370
369
371 # Command line arguments for ipengine.
370 # Command line arguments for ipengine.
372 engine_args = List(
371 engine_args = List(
373 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
372 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
374 help="command-line arguments to pass to ipengine"
373 help="command-line arguments to pass to ipengine"
375 )
374 )
376 # launcher class
375 # launcher class
377 launcher_class = LocalEngineLauncher
376 launcher_class = LocalEngineLauncher
378
377
379 launchers = Dict()
378 launchers = Dict()
380 stop_data = Dict()
379 stop_data = Dict()
381
380
382 def __init__(self, work_dir=u'.', config=None, **kwargs):
381 def __init__(self, work_dir=u'.', config=None, **kwargs):
383 super(LocalEngineSetLauncher, self).__init__(
382 super(LocalEngineSetLauncher, self).__init__(
384 work_dir=work_dir, config=config, **kwargs
383 work_dir=work_dir, config=config, **kwargs
385 )
384 )
386 self.stop_data = {}
385 self.stop_data = {}
387
386
388 def start(self, n, profile_dir):
387 def start(self, n, profile_dir):
389 """Start n engines by profile or profile_dir."""
388 """Start n engines by profile or profile_dir."""
390 self.profile_dir = unicode(profile_dir)
389 self.profile_dir = unicode(profile_dir)
391 dlist = []
390 dlist = []
392 for i in range(n):
391 for i in range(n):
393 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
392 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
394 # Copy the engine args over to each engine launcher.
393 # Copy the engine args over to each engine launcher.
395 el.engine_args = copy.deepcopy(self.engine_args)
394 el.engine_args = copy.deepcopy(self.engine_args)
396 el.on_stop(self._notice_engine_stopped)
395 el.on_stop(self._notice_engine_stopped)
397 d = el.start(profile_dir)
396 d = el.start(profile_dir)
398 if i==0:
397 if i==0:
399 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
398 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
400 self.launchers[i] = el
399 self.launchers[i] = el
401 dlist.append(d)
400 dlist.append(d)
402 self.notify_start(dlist)
401 self.notify_start(dlist)
403 # The consumeErrors here could be dangerous
402 # The consumeErrors here could be dangerous
404 # dfinal = gatherBoth(dlist, consumeErrors=True)
403 # dfinal = gatherBoth(dlist, consumeErrors=True)
405 # dfinal.addCallback(self.notify_start)
404 # dfinal.addCallback(self.notify_start)
406 return dlist
405 return dlist
407
406
408 def find_args(self):
407 def find_args(self):
409 return ['engine set']
408 return ['engine set']
410
409
411 def signal(self, sig):
410 def signal(self, sig):
412 dlist = []
411 dlist = []
413 for el in self.launchers.itervalues():
412 for el in self.launchers.itervalues():
414 d = el.signal(sig)
413 d = el.signal(sig)
415 dlist.append(d)
414 dlist.append(d)
416 # dfinal = gatherBoth(dlist, consumeErrors=True)
415 # dfinal = gatherBoth(dlist, consumeErrors=True)
417 return dlist
416 return dlist
418
417
419 def interrupt_then_kill(self, delay=1.0):
418 def interrupt_then_kill(self, delay=1.0):
420 dlist = []
419 dlist = []
421 for el in self.launchers.itervalues():
420 for el in self.launchers.itervalues():
422 d = el.interrupt_then_kill(delay)
421 d = el.interrupt_then_kill(delay)
423 dlist.append(d)
422 dlist.append(d)
424 # dfinal = gatherBoth(dlist, consumeErrors=True)
423 # dfinal = gatherBoth(dlist, consumeErrors=True)
425 return dlist
424 return dlist
426
425
427 def stop(self):
426 def stop(self):
428 return self.interrupt_then_kill()
427 return self.interrupt_then_kill()
429
428
430 def _notice_engine_stopped(self, data):
429 def _notice_engine_stopped(self, data):
431 pid = data['pid']
430 pid = data['pid']
432 for idx,el in self.launchers.iteritems():
431 for idx,el in self.launchers.iteritems():
433 if el.process.pid == pid:
432 if el.process.pid == pid:
434 break
433 break
435 self.launchers.pop(idx)
434 self.launchers.pop(idx)
436 self.stop_data[idx] = data
435 self.stop_data[idx] = data
437 if not self.launchers:
436 if not self.launchers:
438 self.notify_stop(self.stop_data)
437 self.notify_stop(self.stop_data)
439
438
440
439
441 #-----------------------------------------------------------------------------
440 #-----------------------------------------------------------------------------
442 # MPIExec launchers
441 # MPIExec launchers
443 #-----------------------------------------------------------------------------
442 #-----------------------------------------------------------------------------
444
443
445
444
446 class MPIExecLauncher(LocalProcessLauncher):
445 class MPIExecLauncher(LocalProcessLauncher):
447 """Launch an external process using mpiexec."""
446 """Launch an external process using mpiexec."""
448
447
449 mpi_cmd = List(['mpiexec'], config=True,
448 mpi_cmd = List(['mpiexec'], config=True,
450 help="The mpiexec command to use in starting the process."
449 help="The mpiexec command to use in starting the process."
451 )
450 )
452 mpi_args = List([], config=True,
451 mpi_args = List([], config=True,
453 help="The command line arguments to pass to mpiexec."
452 help="The command line arguments to pass to mpiexec."
454 )
453 )
455 program = List(['date'], config=True,
454 program = List(['date'], config=True,
456 help="The program to start via mpiexec.")
455 help="The program to start via mpiexec.")
457 program_args = List([], config=True,
456 program_args = List([], config=True,
458 help="The command line argument to the program."
457 help="The command line argument to the program."
459 )
458 )
460 n = Int(1)
459 n = Int(1)
461
460
462 def find_args(self):
461 def find_args(self):
463 """Build self.args using all the fields."""
462 """Build self.args using all the fields."""
464 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
463 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
465 self.program + self.program_args
464 self.program + self.program_args
466
465
467 def start(self, n):
466 def start(self, n):
468 """Start n instances of the program using mpiexec."""
467 """Start n instances of the program using mpiexec."""
469 self.n = n
468 self.n = n
470 return super(MPIExecLauncher, self).start()
469 return super(MPIExecLauncher, self).start()
471
470
472
471
473 class MPIExecControllerLauncher(MPIExecLauncher):
472 class MPIExecControllerLauncher(MPIExecLauncher):
474 """Launch a controller using mpiexec."""
473 """Launch a controller using mpiexec."""
475
474
476 controller_cmd = List(ipcontroller_cmd_argv, config=True,
475 controller_cmd = List(ipcontroller_cmd_argv, config=True,
477 help="Popen command to launch the Contropper"
476 help="Popen command to launch the Contropper"
478 )
477 )
479 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
478 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
480 help="Command line arguments to pass to ipcontroller."
479 help="Command line arguments to pass to ipcontroller."
481 )
480 )
482 n = Int(1)
481 n = Int(1)
483
482
484 def start(self, profile_dir):
483 def start(self, profile_dir):
485 """Start the controller by profile_dir."""
484 """Start the controller by profile_dir."""
486 self.controller_args.extend(['profile_dir=%s'%profile_dir])
485 self.controller_args.extend(['profile_dir=%s'%profile_dir])
487 self.profile_dir = unicode(profile_dir)
486 self.profile_dir = unicode(profile_dir)
488 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
487 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
489 return super(MPIExecControllerLauncher, self).start(1)
488 return super(MPIExecControllerLauncher, self).start(1)
490
489
491 def find_args(self):
490 def find_args(self):
492 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
491 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
493 self.controller_cmd + self.controller_args
492 self.controller_cmd + self.controller_args
494
493
495
494
496 class MPIExecEngineSetLauncher(MPIExecLauncher):
495 class MPIExecEngineSetLauncher(MPIExecLauncher):
497
496
498 program = List(ipengine_cmd_argv, config=True,
497 program = List(ipengine_cmd_argv, config=True,
499 help="Popen command for ipengine"
498 help="Popen command for ipengine"
500 )
499 )
501 program_args = List(
500 program_args = List(
502 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
501 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
503 help="Command line arguments for ipengine."
502 help="Command line arguments for ipengine."
504 )
503 )
505 n = Int(1)
504 n = Int(1)
506
505
507 def start(self, n, profile_dir):
506 def start(self, n, profile_dir):
508 """Start n engines by profile or profile_dir."""
507 """Start n engines by profile or profile_dir."""
509 self.program_args.extend(['profile_dir=%s'%profile_dir])
508 self.program_args.extend(['profile_dir=%s'%profile_dir])
510 self.profile_dir = unicode(profile_dir)
509 self.profile_dir = unicode(profile_dir)
511 self.n = n
510 self.n = n
512 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
511 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
513 return super(MPIExecEngineSetLauncher, self).start(n)
512 return super(MPIExecEngineSetLauncher, self).start(n)
514
513
515 #-----------------------------------------------------------------------------
514 #-----------------------------------------------------------------------------
516 # SSH launchers
515 # SSH launchers
517 #-----------------------------------------------------------------------------
516 #-----------------------------------------------------------------------------
518
517
519 # TODO: Get SSH Launcher working again.
518 # TODO: Get SSH Launcher working again.
520
519
521 class SSHLauncher(LocalProcessLauncher):
520 class SSHLauncher(LocalProcessLauncher):
522 """A minimal launcher for ssh.
521 """A minimal launcher for ssh.
523
522
524 To be useful this will probably have to be extended to use the ``sshx``
523 To be useful this will probably have to be extended to use the ``sshx``
525 idea for environment variables. There could be other things this needs
524 idea for environment variables. There could be other things this needs
526 as well.
525 as well.
527 """
526 """
528
527
529 ssh_cmd = List(['ssh'], config=True,
528 ssh_cmd = List(['ssh'], config=True,
530 help="command for starting ssh")
529 help="command for starting ssh")
531 ssh_args = List(['-tt'], config=True,
530 ssh_args = List(['-tt'], config=True,
532 help="args to pass to ssh")
531 help="args to pass to ssh")
533 program = List(['date'], config=True,
532 program = List(['date'], config=True,
534 help="Program to launch via ssh")
533 help="Program to launch via ssh")
535 program_args = List([], config=True,
534 program_args = List([], config=True,
536 help="args to pass to remote program")
535 help="args to pass to remote program")
537 hostname = Unicode('', config=True,
536 hostname = Unicode('', config=True,
538 help="hostname on which to launch the program")
537 help="hostname on which to launch the program")
539 user = Unicode('', config=True,
538 user = Unicode('', config=True,
540 help="username for ssh")
539 help="username for ssh")
541 location = Unicode('', config=True,
540 location = Unicode('', config=True,
542 help="user@hostname location for ssh in one setting")
541 help="user@hostname location for ssh in one setting")
543
542
544 def _hostname_changed(self, name, old, new):
543 def _hostname_changed(self, name, old, new):
545 if self.user:
544 if self.user:
546 self.location = u'%s@%s' % (self.user, new)
545 self.location = u'%s@%s' % (self.user, new)
547 else:
546 else:
548 self.location = new
547 self.location = new
549
548
550 def _user_changed(self, name, old, new):
549 def _user_changed(self, name, old, new):
551 self.location = u'%s@%s' % (new, self.hostname)
550 self.location = u'%s@%s' % (new, self.hostname)
552
551
553 def find_args(self):
552 def find_args(self):
554 return self.ssh_cmd + self.ssh_args + [self.location] + \
553 return self.ssh_cmd + self.ssh_args + [self.location] + \
555 self.program + self.program_args
554 self.program + self.program_args
556
555
557 def start(self, profile_dir, hostname=None, user=None):
556 def start(self, profile_dir, hostname=None, user=None):
558 self.profile_dir = unicode(profile_dir)
557 self.profile_dir = unicode(profile_dir)
559 if hostname is not None:
558 if hostname is not None:
560 self.hostname = hostname
559 self.hostname = hostname
561 if user is not None:
560 if user is not None:
562 self.user = user
561 self.user = user
563
562
564 return super(SSHLauncher, self).start()
563 return super(SSHLauncher, self).start()
565
564
566 def signal(self, sig):
565 def signal(self, sig):
567 if self.state == 'running':
566 if self.state == 'running':
568 # send escaped ssh connection-closer
567 # send escaped ssh connection-closer
569 self.process.stdin.write('~.')
568 self.process.stdin.write('~.')
570 self.process.stdin.flush()
569 self.process.stdin.flush()
571
570
572
571
573
572
574 class SSHControllerLauncher(SSHLauncher):
573 class SSHControllerLauncher(SSHLauncher):
575
574
576 program = List(ipcontroller_cmd_argv, config=True,
575 program = List(ipcontroller_cmd_argv, config=True,
577 help="remote ipcontroller command.")
576 help="remote ipcontroller command.")
578 program_args = List(['--reuse-files', '--log-to-file','log_level=%i'%logging.INFO], config=True,
577 program_args = List(['--reuse-files', '--log-to-file','log_level=%i'%logging.INFO], config=True,
579 help="Command line arguments to ipcontroller.")
578 help="Command line arguments to ipcontroller.")
580
579
581
580
582 class SSHEngineLauncher(SSHLauncher):
581 class SSHEngineLauncher(SSHLauncher):
583 program = List(ipengine_cmd_argv, config=True,
582 program = List(ipengine_cmd_argv, config=True,
584 help="remote ipengine command.")
583 help="remote ipengine command.")
585 # Command line arguments for ipengine.
584 # Command line arguments for ipengine.
586 program_args = List(
585 program_args = List(
587 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
586 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
588 help="Command line arguments to ipengine."
587 help="Command line arguments to ipengine."
589 )
588 )
590
589
591 class SSHEngineSetLauncher(LocalEngineSetLauncher):
590 class SSHEngineSetLauncher(LocalEngineSetLauncher):
592 launcher_class = SSHEngineLauncher
591 launcher_class = SSHEngineLauncher
593 engines = Dict(config=True,
592 engines = Dict(config=True,
594 help="""dict of engines to launch. This is a dict by hostname of ints,
593 help="""dict of engines to launch. This is a dict by hostname of ints,
595 corresponding to the number of engines to start on that host.""")
594 corresponding to the number of engines to start on that host.""")
596
595
597 def start(self, n, profile_dir):
596 def start(self, n, profile_dir):
598 """Start engines by profile or profile_dir.
597 """Start engines by profile or profile_dir.
599 `n` is ignored, and the `engines` config property is used instead.
598 `n` is ignored, and the `engines` config property is used instead.
600 """
599 """
601
600
602 self.profile_dir = unicode(profile_dir)
601 self.profile_dir = unicode(profile_dir)
603 dlist = []
602 dlist = []
604 for host, n in self.engines.iteritems():
603 for host, n in self.engines.iteritems():
605 if isinstance(n, (tuple, list)):
604 if isinstance(n, (tuple, list)):
606 n, args = n
605 n, args = n
607 else:
606 else:
608 args = copy.deepcopy(self.engine_args)
607 args = copy.deepcopy(self.engine_args)
609
608
610 if '@' in host:
609 if '@' in host:
611 user,host = host.split('@',1)
610 user,host = host.split('@',1)
612 else:
611 else:
613 user=None
612 user=None
614 for i in range(n):
613 for i in range(n):
615 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
614 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
616
615
617 # Copy the engine args over to each engine launcher.
616 # Copy the engine args over to each engine launcher.
618 i
617 i
619 el.program_args = args
618 el.program_args = args
620 el.on_stop(self._notice_engine_stopped)
619 el.on_stop(self._notice_engine_stopped)
621 d = el.start(profile_dir, user=user, hostname=host)
620 d = el.start(profile_dir, user=user, hostname=host)
622 if i==0:
621 if i==0:
623 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
622 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
624 self.launchers[host+str(i)] = el
623 self.launchers[host+str(i)] = el
625 dlist.append(d)
624 dlist.append(d)
626 self.notify_start(dlist)
625 self.notify_start(dlist)
627 return dlist
626 return dlist
628
627
629
628
630
629
631 #-----------------------------------------------------------------------------
630 #-----------------------------------------------------------------------------
632 # Windows HPC Server 2008 scheduler launchers
631 # Windows HPC Server 2008 scheduler launchers
633 #-----------------------------------------------------------------------------
632 #-----------------------------------------------------------------------------
634
633
635
634
636 # This is only used on Windows.
635 # This is only used on Windows.
637 def find_job_cmd():
636 def find_job_cmd():
638 if WINDOWS:
637 if WINDOWS:
639 try:
638 try:
640 return find_cmd('job')
639 return find_cmd('job')
641 except (FindCmdError, ImportError):
640 except (FindCmdError, ImportError):
642 # ImportError will be raised if win32api is not installed
641 # ImportError will be raised if win32api is not installed
643 return 'job'
642 return 'job'
644 else:
643 else:
645 return 'job'
644 return 'job'
646
645
647
646
648 class WindowsHPCLauncher(BaseLauncher):
647 class WindowsHPCLauncher(BaseLauncher):
649
648
650 job_id_regexp = Unicode(r'\d+', config=True,
649 job_id_regexp = Unicode(r'\d+', config=True,
651 help="""A regular expression used to get the job id from the output of the
650 help="""A regular expression used to get the job id from the output of the
652 submit_command. """
651 submit_command. """
653 )
652 )
654 job_file_name = Unicode(u'ipython_job.xml', config=True,
653 job_file_name = Unicode(u'ipython_job.xml', config=True,
655 help="The filename of the instantiated job script.")
654 help="The filename of the instantiated job script.")
656 # The full path to the instantiated job script. This gets made dynamically
655 # The full path to the instantiated job script. This gets made dynamically
657 # by combining the work_dir with the job_file_name.
656 # by combining the work_dir with the job_file_name.
658 job_file = Unicode(u'')
657 job_file = Unicode(u'')
659 scheduler = Unicode('', config=True,
658 scheduler = Unicode('', config=True,
660 help="The hostname of the scheduler to submit the job to.")
659 help="The hostname of the scheduler to submit the job to.")
661 job_cmd = Unicode(find_job_cmd(), config=True,
660 job_cmd = Unicode(find_job_cmd(), config=True,
662 help="The command for submitting jobs.")
661 help="The command for submitting jobs.")
663
662
664 def __init__(self, work_dir=u'.', config=None, **kwargs):
663 def __init__(self, work_dir=u'.', config=None, **kwargs):
665 super(WindowsHPCLauncher, self).__init__(
664 super(WindowsHPCLauncher, self).__init__(
666 work_dir=work_dir, config=config, **kwargs
665 work_dir=work_dir, config=config, **kwargs
667 )
666 )
668
667
669 @property
668 @property
670 def job_file(self):
669 def job_file(self):
671 return os.path.join(self.work_dir, self.job_file_name)
670 return os.path.join(self.work_dir, self.job_file_name)
672
671
673 def write_job_file(self, n):
672 def write_job_file(self, n):
674 raise NotImplementedError("Implement write_job_file in a subclass.")
673 raise NotImplementedError("Implement write_job_file in a subclass.")
675
674
676 def find_args(self):
675 def find_args(self):
677 return [u'job.exe']
676 return [u'job.exe']
678
677
679 def parse_job_id(self, output):
678 def parse_job_id(self, output):
680 """Take the output of the submit command and return the job id."""
679 """Take the output of the submit command and return the job id."""
681 m = re.search(self.job_id_regexp, output)
680 m = re.search(self.job_id_regexp, output)
682 if m is not None:
681 if m is not None:
683 job_id = m.group()
682 job_id = m.group()
684 else:
683 else:
685 raise LauncherError("Job id couldn't be determined: %s" % output)
684 raise LauncherError("Job id couldn't be determined: %s" % output)
686 self.job_id = job_id
685 self.job_id = job_id
687 self.log.info('Job started with job id: %r' % job_id)
686 self.log.info('Job started with job id: %r' % job_id)
688 return job_id
687 return job_id
689
688
690 def start(self, n):
689 def start(self, n):
691 """Start n copies of the process using the Win HPC job scheduler."""
690 """Start n copies of the process using the Win HPC job scheduler."""
692 self.write_job_file(n)
691 self.write_job_file(n)
693 args = [
692 args = [
694 'submit',
693 'submit',
695 '/jobfile:%s' % self.job_file,
694 '/jobfile:%s' % self.job_file,
696 '/scheduler:%s' % self.scheduler
695 '/scheduler:%s' % self.scheduler
697 ]
696 ]
698 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
697 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
699 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
698 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
700 output = check_output([self.job_cmd]+args,
699 output = check_output([self.job_cmd]+args,
701 env=os.environ,
700 env=os.environ,
702 cwd=self.work_dir,
701 cwd=self.work_dir,
703 stderr=STDOUT
702 stderr=STDOUT
704 )
703 )
705 job_id = self.parse_job_id(output)
704 job_id = self.parse_job_id(output)
706 self.notify_start(job_id)
705 self.notify_start(job_id)
707 return job_id
706 return job_id
708
707
709 def stop(self):
708 def stop(self):
710 args = [
709 args = [
711 'cancel',
710 'cancel',
712 self.job_id,
711 self.job_id,
713 '/scheduler:%s' % self.scheduler
712 '/scheduler:%s' % self.scheduler
714 ]
713 ]
715 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
714 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
716 try:
715 try:
717 output = check_output([self.job_cmd]+args,
716 output = check_output([self.job_cmd]+args,
718 env=os.environ,
717 env=os.environ,
719 cwd=self.work_dir,
718 cwd=self.work_dir,
720 stderr=STDOUT
719 stderr=STDOUT
721 )
720 )
722 except:
721 except:
723 output = 'The job already appears to be stoppped: %r' % self.job_id
722 output = 'The job already appears to be stoppped: %r' % self.job_id
724 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
723 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
725 return output
724 return output
726
725
727
726
728 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
727 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
729
728
730 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
729 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
731 help="WinHPC xml job file.")
730 help="WinHPC xml job file.")
732 extra_args = List([], config=False,
731 extra_args = List([], config=False,
733 help="extra args to pass to ipcontroller")
732 help="extra args to pass to ipcontroller")
734
733
735 def write_job_file(self, n):
734 def write_job_file(self, n):
736 job = IPControllerJob(config=self.config)
735 job = IPControllerJob(config=self.config)
737
736
738 t = IPControllerTask(config=self.config)
737 t = IPControllerTask(config=self.config)
739 # The tasks work directory is *not* the actual work directory of
738 # The tasks work directory is *not* the actual work directory of
740 # the controller. It is used as the base path for the stdout/stderr
739 # the controller. It is used as the base path for the stdout/stderr
741 # files that the scheduler redirects to.
740 # files that the scheduler redirects to.
742 t.work_directory = self.profile_dir
741 t.work_directory = self.profile_dir
743 # Add the profile_dir and from self.start().
742 # Add the profile_dir and from self.start().
744 t.controller_args.extend(self.extra_args)
743 t.controller_args.extend(self.extra_args)
745 job.add_task(t)
744 job.add_task(t)
746
745
747 self.log.info("Writing job description file: %s" % self.job_file)
746 self.log.info("Writing job description file: %s" % self.job_file)
748 job.write(self.job_file)
747 job.write(self.job_file)
749
748
750 @property
749 @property
751 def job_file(self):
750 def job_file(self):
752 return os.path.join(self.profile_dir, self.job_file_name)
751 return os.path.join(self.profile_dir, self.job_file_name)
753
752
754 def start(self, profile_dir):
753 def start(self, profile_dir):
755 """Start the controller by profile_dir."""
754 """Start the controller by profile_dir."""
756 self.extra_args = ['profile_dir=%s'%profile_dir]
755 self.extra_args = ['profile_dir=%s'%profile_dir]
757 self.profile_dir = unicode(profile_dir)
756 self.profile_dir = unicode(profile_dir)
758 return super(WindowsHPCControllerLauncher, self).start(1)
757 return super(WindowsHPCControllerLauncher, self).start(1)
759
758
760
759
761 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
760 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
762
761
763 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
762 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
764 help="jobfile for ipengines job")
763 help="jobfile for ipengines job")
765 extra_args = List([], config=False,
764 extra_args = List([], config=False,
766 help="extra args to pas to ipengine")
765 help="extra args to pas to ipengine")
767
766
768 def write_job_file(self, n):
767 def write_job_file(self, n):
769 job = IPEngineSetJob(config=self.config)
768 job = IPEngineSetJob(config=self.config)
770
769
771 for i in range(n):
770 for i in range(n):
772 t = IPEngineTask(config=self.config)
771 t = IPEngineTask(config=self.config)
773 # The tasks work directory is *not* the actual work directory of
772 # The tasks work directory is *not* the actual work directory of
774 # the engine. It is used as the base path for the stdout/stderr
773 # the engine. It is used as the base path for the stdout/stderr
775 # files that the scheduler redirects to.
774 # files that the scheduler redirects to.
776 t.work_directory = self.profile_dir
775 t.work_directory = self.profile_dir
777 # Add the profile_dir and from self.start().
776 # Add the profile_dir and from self.start().
778 t.engine_args.extend(self.extra_args)
777 t.engine_args.extend(self.extra_args)
779 job.add_task(t)
778 job.add_task(t)
780
779
781 self.log.info("Writing job description file: %s" % self.job_file)
780 self.log.info("Writing job description file: %s" % self.job_file)
782 job.write(self.job_file)
781 job.write(self.job_file)
783
782
784 @property
783 @property
785 def job_file(self):
784 def job_file(self):
786 return os.path.join(self.profile_dir, self.job_file_name)
785 return os.path.join(self.profile_dir, self.job_file_name)
787
786
788 def start(self, n, profile_dir):
787 def start(self, n, profile_dir):
789 """Start the controller by profile_dir."""
788 """Start the controller by profile_dir."""
790 self.extra_args = ['profile_dir=%s'%profile_dir]
789 self.extra_args = ['profile_dir=%s'%profile_dir]
791 self.profile_dir = unicode(profile_dir)
790 self.profile_dir = unicode(profile_dir)
792 return super(WindowsHPCEngineSetLauncher, self).start(n)
791 return super(WindowsHPCEngineSetLauncher, self).start(n)
793
792
794
793
795 #-----------------------------------------------------------------------------
794 #-----------------------------------------------------------------------------
796 # Batch (PBS) system launchers
795 # Batch (PBS) system launchers
797 #-----------------------------------------------------------------------------
796 #-----------------------------------------------------------------------------
798
797
799 class BatchSystemLauncher(BaseLauncher):
798 class BatchSystemLauncher(BaseLauncher):
800 """Launch an external process using a batch system.
799 """Launch an external process using a batch system.
801
800
802 This class is designed to work with UNIX batch systems like PBS, LSF,
801 This class is designed to work with UNIX batch systems like PBS, LSF,
803 GridEngine, etc. The overall model is that there are different commands
802 GridEngine, etc. The overall model is that there are different commands
804 like qsub, qdel, etc. that handle the starting and stopping of the process.
803 like qsub, qdel, etc. that handle the starting and stopping of the process.
805
804
806 This class also has the notion of a batch script. The ``batch_template``
805 This class also has the notion of a batch script. The ``batch_template``
807 attribute can be set to a string that is a template for the batch script.
806 attribute can be set to a string that is a template for the batch script.
808 This template is instantiated using Itpl. Thus the template can use
807 This template is instantiated using string formatting. Thus the template can
809 ${n} fot the number of instances. Subclasses can add additional variables
808 use {n} fot the number of instances. Subclasses can add additional variables
810 to the template dict.
809 to the template dict.
811 """
810 """
812
811
813 # Subclasses must fill these in. See PBSEngineSet
812 # Subclasses must fill these in. See PBSEngineSet
814 submit_command = List([''], config=True,
813 submit_command = List([''], config=True,
815 help="The name of the command line program used to submit jobs.")
814 help="The name of the command line program used to submit jobs.")
816 delete_command = List([''], config=True,
815 delete_command = List([''], config=True,
817 help="The name of the command line program used to delete jobs.")
816 help="The name of the command line program used to delete jobs.")
818 job_id_regexp = Unicode('', config=True,
817 job_id_regexp = Unicode('', config=True,
819 help="""A regular expression used to get the job id from the output of the
818 help="""A regular expression used to get the job id from the output of the
820 submit_command.""")
819 submit_command.""")
821 batch_template = Unicode('', config=True,
820 batch_template = Unicode('', config=True,
822 help="The string that is the batch script template itself.")
821 help="The string that is the batch script template itself.")
823 batch_template_file = Unicode(u'', config=True,
822 batch_template_file = Unicode(u'', config=True,
824 help="The file that contains the batch template.")
823 help="The file that contains the batch template.")
825 batch_file_name = Unicode(u'batch_script', config=True,
824 batch_file_name = Unicode(u'batch_script', config=True,
826 help="The filename of the instantiated batch script.")
825 help="The filename of the instantiated batch script.")
827 queue = Unicode(u'', config=True,
826 queue = Unicode(u'', config=True,
828 help="The PBS Queue.")
827 help="The PBS Queue.")
829
828
830 # not configurable, override in subclasses
829 # not configurable, override in subclasses
831 # PBS Job Array regex
830 # PBS Job Array regex
832 job_array_regexp = Unicode('')
831 job_array_regexp = Unicode('')
833 job_array_template = Unicode('')
832 job_array_template = Unicode('')
834 # PBS Queue regex
833 # PBS Queue regex
835 queue_regexp = Unicode('')
834 queue_regexp = Unicode('')
836 queue_template = Unicode('')
835 queue_template = Unicode('')
837 # The default batch template, override in subclasses
836 # The default batch template, override in subclasses
838 default_template = Unicode('')
837 default_template = Unicode('')
839 # The full path to the instantiated batch script.
838 # The full path to the instantiated batch script.
840 batch_file = Unicode(u'')
839 batch_file = Unicode(u'')
841 # the format dict used with batch_template:
840 # the format dict used with batch_template:
842 context = Dict()
841 context = Dict()
843
842
844
843
845 def find_args(self):
844 def find_args(self):
846 return self.submit_command + [self.batch_file]
845 return self.submit_command + [self.batch_file]
847
846
848 def __init__(self, work_dir=u'.', config=None, **kwargs):
847 def __init__(self, work_dir=u'.', config=None, **kwargs):
849 super(BatchSystemLauncher, self).__init__(
848 super(BatchSystemLauncher, self).__init__(
850 work_dir=work_dir, config=config, **kwargs
849 work_dir=work_dir, config=config, **kwargs
851 )
850 )
852 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
851 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
853
852
854 def parse_job_id(self, output):
853 def parse_job_id(self, output):
855 """Take the output of the submit command and return the job id."""
854 """Take the output of the submit command and return the job id."""
856 m = re.search(self.job_id_regexp, output)
855 m = re.search(self.job_id_regexp, output)
857 if m is not None:
856 if m is not None:
858 job_id = m.group()
857 job_id = m.group()
859 else:
858 else:
860 raise LauncherError("Job id couldn't be determined: %s" % output)
859 raise LauncherError("Job id couldn't be determined: %s" % output)
861 self.job_id = job_id
860 self.job_id = job_id
862 self.log.info('Job submitted with job id: %r' % job_id)
861 self.log.info('Job submitted with job id: %r' % job_id)
863 return job_id
862 return job_id
864
863
865 def write_batch_script(self, n):
864 def write_batch_script(self, n):
866 """Instantiate and write the batch script to the work_dir."""
865 """Instantiate and write the batch script to the work_dir."""
867 self.context['n'] = n
866 self.context['n'] = n
868 self.context['queue'] = self.queue
867 self.context['queue'] = self.queue
869 print self.context
870 # first priority is batch_template if set
868 # first priority is batch_template if set
871 if self.batch_template_file and not self.batch_template:
869 if self.batch_template_file and not self.batch_template:
872 # second priority is batch_template_file
870 # second priority is batch_template_file
873 with open(self.batch_template_file) as f:
871 with open(self.batch_template_file) as f:
874 self.batch_template = f.read()
872 self.batch_template = f.read()
875 if not self.batch_template:
873 if not self.batch_template:
876 # third (last) priority is default_template
874 # third (last) priority is default_template
877 self.batch_template = self.default_template
875 self.batch_template = self.default_template
878
876
879 regex = re.compile(self.job_array_regexp)
877 regex = re.compile(self.job_array_regexp)
880 # print regex.search(self.batch_template)
878 # print regex.search(self.batch_template)
881 if not regex.search(self.batch_template):
879 if not regex.search(self.batch_template):
882 self.log.info("adding job array settings to batch script")
880 self.log.info("adding job array settings to batch script")
883 firstline, rest = self.batch_template.split('\n',1)
881 firstline, rest = self.batch_template.split('\n',1)
884 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
882 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
885
883
886 regex = re.compile(self.queue_regexp)
884 regex = re.compile(self.queue_regexp)
887 # print regex.search(self.batch_template)
885 # print regex.search(self.batch_template)
888 if self.queue and not regex.search(self.batch_template):
886 if self.queue and not regex.search(self.batch_template):
889 self.log.info("adding PBS queue settings to batch script")
887 self.log.info("adding PBS queue settings to batch script")
890 firstline, rest = self.batch_template.split('\n',1)
888 firstline, rest = self.batch_template.split('\n',1)
891 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
889 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
892
890
893 script_as_string = Itpl.itplns(self.batch_template, self.context)
891 script_as_string = self.batch_template.format(**self.context)
894 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
892 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
895
893
896 with open(self.batch_file, 'w') as f:
894 with open(self.batch_file, 'w') as f:
897 f.write(script_as_string)
895 f.write(script_as_string)
898 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
896 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
899
897
900 def start(self, n, profile_dir):
898 def start(self, n, profile_dir):
901 """Start n copies of the process using a batch system."""
899 """Start n copies of the process using a batch system."""
902 # Here we save profile and profile_dir in the context so they
900 # Here we save profile_dir in the context so they
903 # can be used in the batch script template as ${profile} and
901 # can be used in the batch script template as {profile_dir}
904 # ${profile_dir}
905 self.context['profile_dir'] = profile_dir
902 self.context['profile_dir'] = profile_dir
906 self.profile_dir = unicode(profile_dir)
903 self.profile_dir = unicode(profile_dir)
907 self.write_batch_script(n)
904 self.write_batch_script(n)
908 output = check_output(self.args, env=os.environ)
905 output = check_output(self.args, env=os.environ)
909
906
910 job_id = self.parse_job_id(output)
907 job_id = self.parse_job_id(output)
911 self.notify_start(job_id)
908 self.notify_start(job_id)
912 return job_id
909 return job_id
913
910
914 def stop(self):
911 def stop(self):
915 output = check_output(self.delete_command+[self.job_id], env=os.environ)
912 output = check_output(self.delete_command+[self.job_id], env=os.environ)
916 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
913 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
917 return output
914 return output
918
915
919
916
920 class PBSLauncher(BatchSystemLauncher):
917 class PBSLauncher(BatchSystemLauncher):
921 """A BatchSystemLauncher subclass for PBS."""
918 """A BatchSystemLauncher subclass for PBS."""
922
919
923 submit_command = List(['qsub'], config=True,
920 submit_command = List(['qsub'], config=True,
924 help="The PBS submit command ['qsub']")
921 help="The PBS submit command ['qsub']")
925 delete_command = List(['qdel'], config=True,
922 delete_command = List(['qdel'], config=True,
926 help="The PBS delete command ['qsub']")
923 help="The PBS delete command ['qsub']")
927 job_id_regexp = Unicode(r'\d+', config=True,
924 job_id_regexp = Unicode(r'\d+', config=True,
928 help="Regular expresion for identifying the job ID [r'\d+']")
925 help="Regular expresion for identifying the job ID [r'\d+']")
929
926
930 batch_file = Unicode(u'')
927 batch_file = Unicode(u'')
931 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
928 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
932 job_array_template = Unicode('#PBS -t 1-$n')
929 job_array_template = Unicode('#PBS -t 1-{n}')
933 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
930 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
934 queue_template = Unicode('#PBS -q $queue')
931 queue_template = Unicode('#PBS -q {queue}')
935
932
936
933
937 class PBSControllerLauncher(PBSLauncher):
934 class PBSControllerLauncher(PBSLauncher):
938 """Launch a controller using PBS."""
935 """Launch a controller using PBS."""
939
936
940 batch_file_name = Unicode(u'pbs_controller', config=True,
937 batch_file_name = Unicode(u'pbs_controller', config=True,
941 help="batch file name for the controller job.")
938 help="batch file name for the controller job.")
942 default_template= Unicode("""#!/bin/sh
939 default_template= Unicode("""#!/bin/sh
943 #PBS -V
940 #PBS -V
944 #PBS -N ipcontroller
941 #PBS -N ipcontroller
945 %s --log-to-file profile_dir $profile_dir
942 %s --log-to-file profile_dir={profile_dir}
946 """%(' '.join(ipcontroller_cmd_argv)))
943 """%(' '.join(ipcontroller_cmd_argv)))
947
944
948 def start(self, profile_dir):
945 def start(self, profile_dir):
949 """Start the controller by profile or profile_dir."""
946 """Start the controller by profile or profile_dir."""
950 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
947 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
951 return super(PBSControllerLauncher, self).start(1, profile_dir)
948 return super(PBSControllerLauncher, self).start(1, profile_dir)
952
949
953
950
954 class PBSEngineSetLauncher(PBSLauncher):
951 class PBSEngineSetLauncher(PBSLauncher):
955 """Launch Engines using PBS"""
952 """Launch Engines using PBS"""
956 batch_file_name = Unicode(u'pbs_engines', config=True,
953 batch_file_name = Unicode(u'pbs_engines', config=True,
957 help="batch file name for the engine(s) job.")
954 help="batch file name for the engine(s) job.")
958 default_template= Unicode(u"""#!/bin/sh
955 default_template= Unicode(u"""#!/bin/sh
959 #PBS -V
956 #PBS -V
960 #PBS -N ipengine
957 #PBS -N ipengine
961 %s profile_dir $profile_dir
958 %s profile_dir={profile_dir}
962 """%(' '.join(ipengine_cmd_argv)))
959 """%(' '.join(ipengine_cmd_argv)))
963
960
964 def start(self, n, profile_dir):
961 def start(self, n, profile_dir):
965 """Start n engines by profile or profile_dir."""
962 """Start n engines by profile or profile_dir."""
966 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
963 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
967 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
964 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
968
965
969 #SGE is very similar to PBS
966 #SGE is very similar to PBS
970
967
971 class SGELauncher(PBSLauncher):
968 class SGELauncher(PBSLauncher):
972 """Sun GridEngine is a PBS clone with slightly different syntax"""
969 """Sun GridEngine is a PBS clone with slightly different syntax"""
973 job_array_regexp = Unicode('#\$\$\W+\-t')
970 job_array_regexp = Unicode('#\$\W+\-t')
974 job_array_template = Unicode('#$$ -t 1-$n')
971 job_array_template = Unicode('#$ -t 1-{n}')
975 queue_regexp = Unicode('#$$\W+-q\W+\$?\w+')
972 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
976 queue_template = Unicode('#$$ -q $queue')
973 queue_template = Unicode('#$ -q $queue')
977
974
978 class SGEControllerLauncher(SGELauncher):
975 class SGEControllerLauncher(SGELauncher):
979 """Launch a controller using SGE."""
976 """Launch a controller using SGE."""
980
977
981 batch_file_name = Unicode(u'sge_controller', config=True,
978 batch_file_name = Unicode(u'sge_controller', config=True,
982 help="batch file name for the ipontroller job.")
979 help="batch file name for the ipontroller job.")
983 default_template= Unicode(u"""#$$ -V
980 default_template= Unicode(u"""#$ -V
984 #$$ -S /bin/sh
981 #$ -S /bin/sh
985 #$$ -N ipcontroller
982 #$ -N ipcontroller
986 %s --log-to-file profile_dir=$profile_dir
983 %s --log-to-file profile_dir={profile_dir}
987 """%(' '.join(ipcontroller_cmd_argv)))
984 """%(' '.join(ipcontroller_cmd_argv)))
988
985
989 def start(self, profile_dir):
986 def start(self, profile_dir):
990 """Start the controller by profile or profile_dir."""
987 """Start the controller by profile or profile_dir."""
991 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
988 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
992 return super(SGEControllerLauncher, self).start(1, profile_dir)
989 return super(SGEControllerLauncher, self).start(1, profile_dir)
993
990
994 class SGEEngineSetLauncher(SGELauncher):
991 class SGEEngineSetLauncher(SGELauncher):
995 """Launch Engines with SGE"""
992 """Launch Engines with SGE"""
996 batch_file_name = Unicode(u'sge_engines', config=True,
993 batch_file_name = Unicode(u'sge_engines', config=True,
997 help="batch file name for the engine(s) job.")
994 help="batch file name for the engine(s) job.")
998 default_template = Unicode("""#$$ -V
995 default_template = Unicode("""#$ -V
999 #$$ -S /bin/sh
996 #$ -S /bin/sh
1000 #$$ -N ipengine
997 #$ -N ipengine
1001 %s profile_dir=$profile_dir
998 %s profile_dir={profile_dir}
1002 """%(' '.join(ipengine_cmd_argv)))
999 """%(' '.join(ipengine_cmd_argv)))
1003
1000
1004 def start(self, n, profile_dir):
1001 def start(self, n, profile_dir):
1005 """Start n engines by profile or profile_dir."""
1002 """Start n engines by profile or profile_dir."""
1006 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1003 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1007 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1004 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1008
1005
1009
1006
1010 #-----------------------------------------------------------------------------
1007 #-----------------------------------------------------------------------------
1011 # A launcher for ipcluster itself!
1008 # A launcher for ipcluster itself!
1012 #-----------------------------------------------------------------------------
1009 #-----------------------------------------------------------------------------
1013
1010
1014
1011
1015 class IPClusterLauncher(LocalProcessLauncher):
1012 class IPClusterLauncher(LocalProcessLauncher):
1016 """Launch the ipcluster program in an external process."""
1013 """Launch the ipcluster program in an external process."""
1017
1014
1018 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1015 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1019 help="Popen command for ipcluster")
1016 help="Popen command for ipcluster")
1020 ipcluster_args = List(
1017 ipcluster_args = List(
1021 ['--clean-logs', '--log-to-file', 'log_level=%i'%logging.INFO], config=True,
1018 ['--clean-logs', '--log-to-file', 'log_level=%i'%logging.INFO], config=True,
1022 help="Command line arguments to pass to ipcluster.")
1019 help="Command line arguments to pass to ipcluster.")
1023 ipcluster_subcommand = Unicode('start')
1020 ipcluster_subcommand = Unicode('start')
1024 ipcluster_n = Int(2)
1021 ipcluster_n = Int(2)
1025
1022
1026 def find_args(self):
1023 def find_args(self):
1027 return self.ipcluster_cmd + ['--'+self.ipcluster_subcommand] + \
1024 return self.ipcluster_cmd + ['--'+self.ipcluster_subcommand] + \
1028 ['n=%i'%self.ipcluster_n] + self.ipcluster_args
1025 ['n=%i'%self.ipcluster_n] + self.ipcluster_args
1029
1026
1030 def start(self):
1027 def start(self):
1031 self.log.info("Starting ipcluster: %r" % self.args)
1028 self.log.info("Starting ipcluster: %r" % self.args)
1032 return super(IPClusterLauncher, self).start()
1029 return super(IPClusterLauncher, self).start()
1033
1030
1034 #-----------------------------------------------------------------------------
1031 #-----------------------------------------------------------------------------
1035 # Collections of launchers
1032 # Collections of launchers
1036 #-----------------------------------------------------------------------------
1033 #-----------------------------------------------------------------------------
1037
1034
1038 local_launchers = [
1035 local_launchers = [
1039 LocalControllerLauncher,
1036 LocalControllerLauncher,
1040 LocalEngineLauncher,
1037 LocalEngineLauncher,
1041 LocalEngineSetLauncher,
1038 LocalEngineSetLauncher,
1042 ]
1039 ]
1043 mpi_launchers = [
1040 mpi_launchers = [
1044 MPIExecLauncher,
1041 MPIExecLauncher,
1045 MPIExecControllerLauncher,
1042 MPIExecControllerLauncher,
1046 MPIExecEngineSetLauncher,
1043 MPIExecEngineSetLauncher,
1047 ]
1044 ]
1048 ssh_launchers = [
1045 ssh_launchers = [
1049 SSHLauncher,
1046 SSHLauncher,
1050 SSHControllerLauncher,
1047 SSHControllerLauncher,
1051 SSHEngineLauncher,
1048 SSHEngineLauncher,
1052 SSHEngineSetLauncher,
1049 SSHEngineSetLauncher,
1053 ]
1050 ]
1054 winhpc_launchers = [
1051 winhpc_launchers = [
1055 WindowsHPCLauncher,
1052 WindowsHPCLauncher,
1056 WindowsHPCControllerLauncher,
1053 WindowsHPCControllerLauncher,
1057 WindowsHPCEngineSetLauncher,
1054 WindowsHPCEngineSetLauncher,
1058 ]
1055 ]
1059 pbs_launchers = [
1056 pbs_launchers = [
1060 PBSLauncher,
1057 PBSLauncher,
1061 PBSControllerLauncher,
1058 PBSControllerLauncher,
1062 PBSEngineSetLauncher,
1059 PBSEngineSetLauncher,
1063 ]
1060 ]
1064 sge_launchers = [
1061 sge_launchers = [
1065 SGELauncher,
1062 SGELauncher,
1066 SGEControllerLauncher,
1063 SGEControllerLauncher,
1067 SGEEngineSetLauncher,
1064 SGEEngineSetLauncher,
1068 ]
1065 ]
1069 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1066 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1070 + pbs_launchers + sge_launchers No newline at end of file
1067 + pbs_launchers + sge_launchers
General Comments 0
You need to be logged in to leave comments. Login now