##// END OF EJS Templates
Update PBS/SGE launchers with 0.10.1 options and defaults
MinRK -
Show More
@@ -1,227 +1,238 b''
1 import os
1 import os
2
2
3 c = get_config()
3 c = get_config()
4
4
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Select which launchers to use
6 # Select which launchers to use
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 # This allows you to control what method is used to start the controller
9 # This allows you to control what method is used to start the controller
10 # and engines. The following methods are currently supported:
10 # and engines. The following methods are currently supported:
11 # - Start as a regular process on localhost.
11 # - Start as a regular process on localhost.
12 # - Start using mpiexec.
12 # - Start using mpiexec.
13 # - Start using the Windows HPC Server 2008 scheduler
13 # - Start using the Windows HPC Server 2008 scheduler
14 # - Start using PBS
14 # - Start using PBS/SGE
15 # - Start using SSH
15 # - Start using SSH
16
16
17
17
18 # The selected launchers can be configured below.
18 # The selected launchers can be configured below.
19
19
20 # Options are:
20 # Options are:
21 # - LocalControllerLauncher
21 # - LocalControllerLauncher
22 # - MPIExecControllerLauncher
22 # - MPIExecControllerLauncher
23 # - PBSControllerLauncher
23 # - PBSControllerLauncher
24 # - SGEControllerLauncher
24 # - WindowsHPCControllerLauncher
25 # - WindowsHPCControllerLauncher
25 # c.Global.controller_launcher = 'IPython.zmq.parallel.launcher.LocalControllerLauncher'
26 # c.Global.controller_launcher = 'IPython.zmq.parallel.launcher.LocalControllerLauncher'
27 c.Global.controller_launcher = 'IPython.zmq.parallel.launcher.PBSControllerLauncher'
26
28
27 # Options are:
29 # Options are:
28 # - LocalEngineSetLauncher
30 # - LocalEngineSetLauncher
29 # - MPIExecEngineSetLauncher
31 # - MPIExecEngineSetLauncher
30 # - PBSEngineSetLauncher
32 # - PBSEngineSetLauncher
33 # - SGEEngineSetLauncher
31 # - WindowsHPCEngineSetLauncher
34 # - WindowsHPCEngineSetLauncher
32 # c.Global.engine_launcher = 'IPython.zmq.parallel.launcher.LocalEngineSetLauncher'
35 # c.Global.engine_launcher = 'IPython.zmq.parallel.launcher.LocalEngineSetLauncher'
33
36
34 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
35 # Global configuration
38 # Global configuration
36 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
37
40
38 # The default number of engines that will be started. This is overridden by
41 # The default number of engines that will be started. This is overridden by
39 # the -n command line option: "ipcluster start -n 4"
42 # the -n command line option: "ipcluster start -n 4"
40 # c.Global.n = 2
43 # c.Global.n = 2
41
44
42 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
45 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
43 # c.Global.log_to_file = False
46 # c.Global.log_to_file = False
44
47
45 # Remove old logs from cluster_dir/log before starting.
48 # Remove old logs from cluster_dir/log before starting.
46 # c.Global.clean_logs = True
49 # c.Global.clean_logs = True
47
50
48 # The working directory for the process. The application will use os.chdir
51 # The working directory for the process. The application will use os.chdir
49 # to change to this directory before starting.
52 # to change to this directory before starting.
50 # c.Global.work_dir = os.getcwd()
53 # c.Global.work_dir = os.getcwd()
51
54
52
55
53 #-----------------------------------------------------------------------------
56 #-----------------------------------------------------------------------------
54 # Local process launchers
57 # Local process launchers
55 #-----------------------------------------------------------------------------
58 #-----------------------------------------------------------------------------
56
59
57 # The command line arguments to call the controller with.
60 # The command line arguments to call the controller with.
58 # c.LocalControllerLauncher.controller_args = \
61 # c.LocalControllerLauncher.controller_args = \
59 # ['--log-to-file','--log-level', '40']
62 # ['--log-to-file','--log-level', '40']
60
63
61 # The working directory for the controller
64 # The working directory for the controller
62 # c.LocalEngineSetLauncher.work_dir = u''
65 # c.LocalEngineSetLauncher.work_dir = u''
63
66
64 # Command line argument passed to the engines.
67 # Command line argument passed to the engines.
65 # c.LocalEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
68 # c.LocalEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
66
69
67 #-----------------------------------------------------------------------------
70 #-----------------------------------------------------------------------------
68 # MPIExec launchers
71 # MPIExec launchers
69 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
70
73
71 # The mpiexec/mpirun command to use in both the controller and engines.
74 # The mpiexec/mpirun command to use in both the controller and engines.
72 # c.MPIExecLauncher.mpi_cmd = ['mpiexec']
75 # c.MPIExecLauncher.mpi_cmd = ['mpiexec']
73
76
74 # Additional arguments to pass to the actual mpiexec command.
77 # Additional arguments to pass to the actual mpiexec command.
75 # c.MPIExecLauncher.mpi_args = []
78 # c.MPIExecLauncher.mpi_args = []
76
79
77 # The mpiexec/mpirun command and args can be overridden if they should be different
80 # The mpiexec/mpirun command and args can be overridden if they should be different
78 # for controller and engines.
81 # for controller and engines.
79 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
82 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
80 # c.MPIExecControllerLauncher.mpi_args = []
83 # c.MPIExecControllerLauncher.mpi_args = []
81 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
84 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
82 # c.MPIExecEngineSetLauncher.mpi_args = []
85 # c.MPIExecEngineSetLauncher.mpi_args = []
83
86
84 # The command line argument to call the controller with.
87 # The command line argument to call the controller with.
85 # c.MPIExecControllerLauncher.controller_args = \
88 # c.MPIExecControllerLauncher.controller_args = \
86 # ['--log-to-file','--log-level', '40']
89 # ['--log-to-file','--log-level', '40']
87
90
88 # Command line argument passed to the engines.
91 # Command line argument passed to the engines.
89 # c.MPIExecEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
92 # c.MPIExecEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
90
93
91 # The default number of engines to start if not given elsewhere.
94 # The default number of engines to start if not given elsewhere.
92 # c.MPIExecEngineSetLauncher.n = 1
95 # c.MPIExecEngineSetLauncher.n = 1
93
96
94 #-----------------------------------------------------------------------------
97 #-----------------------------------------------------------------------------
95 # SSH launchers
98 # SSH launchers
96 #-----------------------------------------------------------------------------
99 #-----------------------------------------------------------------------------
97
100
98 # ipclusterz can be used to launch controller and engines remotely via ssh.
101 # ipclusterz can be used to launch controller and engines remotely via ssh.
99 # Note that currently ipclusterz does not do any file distribution, so if
102 # Note that currently ipclusterz does not do any file distribution, so if
100 # machines are not on a shared filesystem, config and json files must be
103 # machines are not on a shared filesystem, config and json files must be
101 # distributed. For this reason, the reuse_files defaults to True on an
104 # distributed. For this reason, the reuse_files defaults to True on an
102 # ssh-launched Controller. This flag can be overridded by the program_args
105 # ssh-launched Controller. This flag can be overridded by the program_args
103 # attribute of c.SSHControllerLauncher.
106 # attribute of c.SSHControllerLauncher.
104
107
105 # set the ssh cmd for launching remote commands. The default is ['ssh']
108 # set the ssh cmd for launching remote commands. The default is ['ssh']
106 # c.SSHLauncher.ssh_cmd = ['ssh']
109 # c.SSHLauncher.ssh_cmd = ['ssh']
107
110
108 # set the ssh cmd for launching remote commands. The default is ['ssh']
111 # set the ssh cmd for launching remote commands. The default is ['ssh']
109 # c.SSHLauncher.ssh_args = ['tt']
112 # c.SSHLauncher.ssh_args = ['tt']
110
113
111 # Set the user and hostname for the controller
114 # Set the user and hostname for the controller
112 # c.SSHControllerLauncher.hostname = 'controller.example.com'
115 # c.SSHControllerLauncher.hostname = 'controller.example.com'
113 # c.SSHControllerLauncher.user = os.environ.get('USER','username')
116 # c.SSHControllerLauncher.user = os.environ.get('USER','username')
114
117
115 # Set the arguments to be passed to ipcontrollerz
118 # Set the arguments to be passed to ipcontrollerz
116 # note that remotely launched ipcontrollerz will not get the contents of
119 # note that remotely launched ipcontrollerz will not get the contents of
117 # the local ipcontrollerz_config.py unless it resides on the *remote host*
120 # the local ipcontrollerz_config.py unless it resides on the *remote host*
118 # in the location specified by the --cluster_dir argument.
121 # in the location specified by the --cluster_dir argument.
119 # c.SSHControllerLauncher.program_args = ['-r', '-ip', '0.0.0.0', '--cluster_dir', '/path/to/cd']
122 # c.SSHControllerLauncher.program_args = ['-r', '-ip', '0.0.0.0', '--cluster_dir', '/path/to/cd']
120
123
121 # Set the default args passed to ipenginez for SSH launched engines
124 # Set the default args passed to ipenginez for SSH launched engines
122 # c.SSHEngineSetLauncher.engine_args = ['--mpi', 'mpi4py']
125 # c.SSHEngineSetLauncher.engine_args = ['--mpi', 'mpi4py']
123
126
124 # SSH engines are launched as a dict of locations/n-engines.
127 # SSH engines are launched as a dict of locations/n-engines.
125 # if a value is a tuple instead of an int, it is assumed to be of the form
128 # if a value is a tuple instead of an int, it is assumed to be of the form
126 # (n, [args]), setting the arguments to passed to ipenginez on `host`.
129 # (n, [args]), setting the arguments to passed to ipenginez on `host`.
127 # otherwise, c.SSHEngineSetLauncher.engine_args will be used as the default.
130 # otherwise, c.SSHEngineSetLauncher.engine_args will be used as the default.
128
131
129 # In this case, there will be 3 engines at my.example.com, and
132 # In this case, there will be 3 engines at my.example.com, and
130 # 2 at you@ipython.scipy.org with a special json connector location.
133 # 2 at you@ipython.scipy.org with a special json connector location.
131 # c.SSHEngineSetLauncher.engines = {'my.example.com' : 3,
134 # c.SSHEngineSetLauncher.engines = {'my.example.com' : 3,
132 # 'you@ipython.scipy.org' : (2, ['-f', '/path/to/ipcontroller-engine.json']}
135 # 'you@ipython.scipy.org' : (2, ['-f', '/path/to/ipcontroller-engine.json']}
133 # }
136 # }
134
137
135 #-----------------------------------------------------------------------------
138 #-----------------------------------------------------------------------------
136 # Unix batch (PBS) schedulers launchers
139 # Unix batch (PBS) schedulers launchers
137 #-----------------------------------------------------------------------------
140 #-----------------------------------------------------------------------------
138
141
142 # SGE and PBS are very similar. All configurables in this section called 'PBS*'
143 # also exist as 'SGE*'.
144
139 # The command line program to use to submit a PBS job.
145 # The command line program to use to submit a PBS job.
140 # c.PBSControllerLauncher.submit_command = ['qsub']
146 # c.PBSLauncher.submit_command = ['qsub']
141
147
142 # The command line program to use to delete a PBS job.
148 # The command line program to use to delete a PBS job.
143 # c.PBSControllerLauncher.delete_command = ['qdel']
149 # c.PBSLauncher.delete_command = ['qdel']
150
151 # The PBS queue in which the job should run
152 # c.PBSLauncher.queue = 'myqueue'
144
153
145 # A regular expression that takes the output of qsub and find the job id.
154 # A regular expression that takes the output of qsub and find the job id.
146 # c.PBSControllerLauncher.job_id_regexp = r'\d+'
155 # c.PBSLauncher.job_id_regexp = r'\d+'
156
157 # If for some reason the Controller and Engines have different options above, they
158 # can be set as c.PBSControllerLauncher.<option> etc.
147
159
148 # The batch submission script used to start the controller. This is where
160 # The batch submission script used to start the controller. This is where
149 # environment variables would be setup, etc. This string is interpreted using
161 # environment variables would be setup, etc. This string is interpreted using
150 # the Itpl module in IPython.external. Basically, you can use ${n} for the
162 # the Itpl module in IPython.external. Basically, you can use ${n} for the
151 # number of engine and ${cluster_dir} for the cluster_dir.
163 # number of engine and ${cluster_dir} for the cluster_dir.
152 # c.PBSControllerLauncher.batch_template = """
164 # c.PBSControllerLauncher.batch_template = """
153 # #PBS -N ipcontroller
165 # #PBS -N ipcontroller
166 # #PBS -q $queue
154 #
167 #
155 # ipcontrollerz --cluster-dir $cluster_dir
168 # ipcontrollerz --cluster-dir $cluster_dir
156 # """
169 # """
157
170
171 # You can also load this template from a file
172 # c.PBSControllerLauncher.batch_template_file = u"/path/to/my/template.sh"
173
158 # The name of the instantiated batch script that will actually be used to
174 # The name of the instantiated batch script that will actually be used to
159 # submit the job. This will be written to the cluster directory.
175 # submit the job. This will be written to the cluster directory.
160 # c.PBSControllerLauncher.batch_file_name = u'pbs_batch_script_controller'
176 # c.PBSControllerLauncher.batch_file_name = u'pbs_controller'
161
162
163 # The command line program to use to submit a PBS job.
164 # c.PBSEngineSetLauncher.submit_command = 'qsub'
165
166 # The command line program to use to delete a PBS job.
167 # c.PBSEngineSetLauncher.delete_command = 'qdel'
168
169 # A regular expression that takes the output of qsub and find the job id.
170 # c.PBSEngineSetLauncher.job_id_regexp = r'\d+'
171
177
172 # The batch submission script used to start the engines. This is where
178 # The batch submission script used to start the engines. This is where
173 # environment variables would be setup, etc. This string is interpreted using
179 # environment variables would be setup, etc. This string is interpreted using
174 # the Itpl module in IPython.external. Basically, you can use ${n} for the
180 # the Itpl module in IPython.external. Basically, you can use ${n} for the
175 # number of engine and ${cluster_dir} for the cluster_dir.
181 # number of engine and ${cluster_dir} for the cluster_dir.
176 # c.PBSEngineSetLauncher.batch_template = """
182 # c.PBSEngineSetLauncher.batch_template = """
177 # #PBS -N ipcontroller
183 # #PBS -N ipcontroller
178 # #PBS -l nprocs=$n
184 # #PBS -l nprocs=$n
179 #
185 #
180 # ipenginez --cluster-dir $cluster_dir$s
186 # ipenginez --cluster-dir $cluster_dir$s
181 # """
187 # """
182
188
189 # You can also load this template from a file
190 # c.PBSControllerLauncher.batch_template_file = u"/path/to/my/template.sh"
191
183 # The name of the instantiated batch script that will actually be used to
192 # The name of the instantiated batch script that will actually be used to
184 # submit the job. This will be written to the cluster directory.
193 # submit the job. This will be written to the cluster directory.
185 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script_engines'
194 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_engines'
195
196
186
197
187 #-----------------------------------------------------------------------------
198 #-----------------------------------------------------------------------------
188 # Windows HPC Server 2008 launcher configuration
199 # Windows HPC Server 2008 launcher configuration
189 #-----------------------------------------------------------------------------
200 #-----------------------------------------------------------------------------
190
201
191 # c.IPControllerJob.job_name = 'IPController'
202 # c.IPControllerJob.job_name = 'IPController'
192 # c.IPControllerJob.is_exclusive = False
203 # c.IPControllerJob.is_exclusive = False
193 # c.IPControllerJob.username = r'USERDOMAIN\USERNAME'
204 # c.IPControllerJob.username = r'USERDOMAIN\USERNAME'
194 # c.IPControllerJob.priority = 'Highest'
205 # c.IPControllerJob.priority = 'Highest'
195 # c.IPControllerJob.requested_nodes = ''
206 # c.IPControllerJob.requested_nodes = ''
196 # c.IPControllerJob.project = 'MyProject'
207 # c.IPControllerJob.project = 'MyProject'
197
208
198 # c.IPControllerTask.task_name = 'IPController'
209 # c.IPControllerTask.task_name = 'IPController'
199 # c.IPControllerTask.controller_cmd = [u'ipcontroller.exe']
210 # c.IPControllerTask.controller_cmd = [u'ipcontroller.exe']
200 # c.IPControllerTask.controller_args = ['--log-to-file', '--log-level', '40']
211 # c.IPControllerTask.controller_args = ['--log-to-file', '--log-level', '40']
201 # c.IPControllerTask.environment_variables = {}
212 # c.IPControllerTask.environment_variables = {}
202
213
203 # c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE'
214 # c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE'
204 # c.WindowsHPCControllerLauncher.job_file_name = u'ipcontroller_job.xml'
215 # c.WindowsHPCControllerLauncher.job_file_name = u'ipcontroller_job.xml'
205
216
206
217
207 # c.IPEngineSetJob.job_name = 'IPEngineSet'
218 # c.IPEngineSetJob.job_name = 'IPEngineSet'
208 # c.IPEngineSetJob.is_exclusive = False
219 # c.IPEngineSetJob.is_exclusive = False
209 # c.IPEngineSetJob.username = r'USERDOMAIN\USERNAME'
220 # c.IPEngineSetJob.username = r'USERDOMAIN\USERNAME'
210 # c.IPEngineSetJob.priority = 'Highest'
221 # c.IPEngineSetJob.priority = 'Highest'
211 # c.IPEngineSetJob.requested_nodes = ''
222 # c.IPEngineSetJob.requested_nodes = ''
212 # c.IPEngineSetJob.project = 'MyProject'
223 # c.IPEngineSetJob.project = 'MyProject'
213
224
214 # c.IPEngineTask.task_name = 'IPEngine'
225 # c.IPEngineTask.task_name = 'IPEngine'
215 # c.IPEngineTask.engine_cmd = [u'ipengine.exe']
226 # c.IPEngineTask.engine_cmd = [u'ipengine.exe']
216 # c.IPEngineTask.engine_args = ['--log-to-file', '--log-level', '40']
227 # c.IPEngineTask.engine_args = ['--log-to-file', '--log-level', '40']
217 # c.IPEngineTask.environment_variables = {}
228 # c.IPEngineTask.environment_variables = {}
218
229
219 # c.WindowsHPCEngineSetLauncher.scheduler = 'HEADNODE'
230 # c.WindowsHPCEngineSetLauncher.scheduler = 'HEADNODE'
220 # c.WindowsHPCEngineSetLauncher.job_file_name = u'ipengineset_job.xml'
231 # c.WindowsHPCEngineSetLauncher.job_file_name = u'ipengineset_job.xml'
221
232
222
233
223
234
224
235
225
236
226
237
227
238
@@ -1,879 +1,971 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching IPython processes asynchronously.
4 Facilities for launching IPython processes asynchronously.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import copy
18 import copy
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import stat
22
23
23 from signal import SIGINT, SIGTERM
24 from signal import SIGINT, SIGTERM
24 try:
25 try:
25 from signal import SIGKILL
26 from signal import SIGKILL
26 except ImportError:
27 except ImportError:
27 SIGKILL=SIGTERM
28 SIGKILL=SIGTERM
28
29
29 from subprocess import Popen, PIPE, STDOUT
30 from subprocess import Popen, PIPE, STDOUT
30 try:
31 try:
31 from subprocess import check_output
32 from subprocess import check_output
32 except ImportError:
33 except ImportError:
33 # pre-2.7, define check_output with Popen
34 # pre-2.7, define check_output with Popen
34 def check_output(*args, **kwargs):
35 def check_output(*args, **kwargs):
35 kwargs.update(dict(stdout=PIPE))
36 kwargs.update(dict(stdout=PIPE))
36 p = Popen(*args, **kwargs)
37 p = Popen(*args, **kwargs)
37 out,err = p.communicate()
38 out,err = p.communicate()
38 return out
39 return out
39
40
40 from zmq.eventloop import ioloop
41 from zmq.eventloop import ioloop
41
42
42 from IPython.external import Itpl
43 from IPython.external import Itpl
43 # from IPython.config.configurable import Configurable
44 # from IPython.config.configurable import Configurable
44 from IPython.utils.traitlets import Any, Str, Int, List, Unicode, Dict, Instance
45 from IPython.utils.traitlets import Any, Str, Int, List, Unicode, Dict, Instance, CUnicode
45 from IPython.utils.path import get_ipython_module_path
46 from IPython.utils.path import get_ipython_module_path
46 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
47 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
47
48
48 from .factory import LoggingFactory
49 from .factory import LoggingFactory
49
50
50 # load winhpcjob from IPython.kernel
51 # load winhpcjob from IPython.kernel
51 try:
52 try:
52 from IPython.kernel.winhpcjob import (
53 from IPython.kernel.winhpcjob import (
53 IPControllerTask, IPEngineTask,
54 IPControllerTask, IPEngineTask,
54 IPControllerJob, IPEngineSetJob
55 IPControllerJob, IPEngineSetJob
55 )
56 )
56 except ImportError:
57 except ImportError:
57 pass
58 pass
58
59
59
60
60 #-----------------------------------------------------------------------------
61 #-----------------------------------------------------------------------------
61 # Paths to the kernel apps
62 # Paths to the kernel apps
62 #-----------------------------------------------------------------------------
63 #-----------------------------------------------------------------------------
63
64
64
65
65 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
66 ipclusterz_cmd_argv = pycmd2argv(get_ipython_module_path(
66 'IPython.zmq.parallel.ipclusterapp'
67 'IPython.zmq.parallel.ipclusterapp'
67 ))
68 ))
68
69
69 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
70 ipenginez_cmd_argv = pycmd2argv(get_ipython_module_path(
70 'IPython.zmq.parallel.ipengineapp'
71 'IPython.zmq.parallel.ipengineapp'
71 ))
72 ))
72
73
73 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
74 ipcontrollerz_cmd_argv = pycmd2argv(get_ipython_module_path(
74 'IPython.zmq.parallel.ipcontrollerapp'
75 'IPython.zmq.parallel.ipcontrollerapp'
75 ))
76 ))
76
77
77 #-----------------------------------------------------------------------------
78 #-----------------------------------------------------------------------------
78 # Base launchers and errors
79 # Base launchers and errors
79 #-----------------------------------------------------------------------------
80 #-----------------------------------------------------------------------------
80
81
81
82
82 class LauncherError(Exception):
83 class LauncherError(Exception):
83 pass
84 pass
84
85
85
86
86 class ProcessStateError(LauncherError):
87 class ProcessStateError(LauncherError):
87 pass
88 pass
88
89
89
90
90 class UnknownStatus(LauncherError):
91 class UnknownStatus(LauncherError):
91 pass
92 pass
92
93
93
94
94 class BaseLauncher(LoggingFactory):
95 class BaseLauncher(LoggingFactory):
95 """An asbtraction for starting, stopping and signaling a process."""
96 """An asbtraction for starting, stopping and signaling a process."""
96
97
97 # In all of the launchers, the work_dir is where child processes will be
98 # In all of the launchers, the work_dir is where child processes will be
98 # run. This will usually be the cluster_dir, but may not be. any work_dir
99 # run. This will usually be the cluster_dir, but may not be. any work_dir
99 # passed into the __init__ method will override the config value.
100 # passed into the __init__ method will override the config value.
100 # This should not be used to set the work_dir for the actual engine
101 # This should not be used to set the work_dir for the actual engine
101 # and controller. Instead, use their own config files or the
102 # and controller. Instead, use their own config files or the
102 # controller_args, engine_args attributes of the launchers to add
103 # controller_args, engine_args attributes of the launchers to add
103 # the --work-dir option.
104 # the --work-dir option.
104 work_dir = Unicode(u'.')
105 work_dir = Unicode(u'.')
105 loop = Instance('zmq.eventloop.ioloop.IOLoop')
106 loop = Instance('zmq.eventloop.ioloop.IOLoop')
106
107
107 start_data = Any()
108 start_data = Any()
108 stop_data = Any()
109 stop_data = Any()
109
110
110 def _loop_default(self):
111 def _loop_default(self):
111 return ioloop.IOLoop.instance()
112 return ioloop.IOLoop.instance()
112
113
113 def __init__(self, work_dir=u'.', config=None, **kwargs):
114 def __init__(self, work_dir=u'.', config=None, **kwargs):
114 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
115 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
115 self.state = 'before' # can be before, running, after
116 self.state = 'before' # can be before, running, after
116 self.stop_callbacks = []
117 self.stop_callbacks = []
117 self.start_data = None
118 self.start_data = None
118 self.stop_data = None
119 self.stop_data = None
119
120
120 @property
121 @property
121 def args(self):
122 def args(self):
122 """A list of cmd and args that will be used to start the process.
123 """A list of cmd and args that will be used to start the process.
123
124
124 This is what is passed to :func:`spawnProcess` and the first element
125 This is what is passed to :func:`spawnProcess` and the first element
125 will be the process name.
126 will be the process name.
126 """
127 """
127 return self.find_args()
128 return self.find_args()
128
129
129 def find_args(self):
130 def find_args(self):
130 """The ``.args`` property calls this to find the args list.
131 """The ``.args`` property calls this to find the args list.
131
132
132 Subcommand should implement this to construct the cmd and args.
133 Subcommand should implement this to construct the cmd and args.
133 """
134 """
134 raise NotImplementedError('find_args must be implemented in a subclass')
135 raise NotImplementedError('find_args must be implemented in a subclass')
135
136
136 @property
137 @property
137 def arg_str(self):
138 def arg_str(self):
138 """The string form of the program arguments."""
139 """The string form of the program arguments."""
139 return ' '.join(self.args)
140 return ' '.join(self.args)
140
141
141 @property
142 @property
142 def running(self):
143 def running(self):
143 """Am I running."""
144 """Am I running."""
144 if self.state == 'running':
145 if self.state == 'running':
145 return True
146 return True
146 else:
147 else:
147 return False
148 return False
148
149
149 def start(self):
150 def start(self):
150 """Start the process.
151 """Start the process.
151
152
152 This must return a deferred that fires with information about the
153 This must return a deferred that fires with information about the
153 process starting (like a pid, job id, etc.).
154 process starting (like a pid, job id, etc.).
154 """
155 """
155 raise NotImplementedError('start must be implemented in a subclass')
156 raise NotImplementedError('start must be implemented in a subclass')
156
157
157 def stop(self):
158 def stop(self):
158 """Stop the process and notify observers of stopping.
159 """Stop the process and notify observers of stopping.
159
160
160 This must return a deferred that fires with information about the
161 This must return a deferred that fires with information about the
161 processing stopping, like errors that occur while the process is
162 processing stopping, like errors that occur while the process is
162 attempting to be shut down. This deferred won't fire when the process
163 attempting to be shut down. This deferred won't fire when the process
163 actually stops. To observe the actual process stopping, see
164 actually stops. To observe the actual process stopping, see
164 :func:`observe_stop`.
165 :func:`observe_stop`.
165 """
166 """
166 raise NotImplementedError('stop must be implemented in a subclass')
167 raise NotImplementedError('stop must be implemented in a subclass')
167
168
168 def on_stop(self, f):
169 def on_stop(self, f):
169 """Get a deferred that will fire when the process stops.
170 """Get a deferred that will fire when the process stops.
170
171
171 The deferred will fire with data that contains information about
172 The deferred will fire with data that contains information about
172 the exit status of the process.
173 the exit status of the process.
173 """
174 """
174 if self.state=='after':
175 if self.state=='after':
175 return f(self.stop_data)
176 return f(self.stop_data)
176 else:
177 else:
177 self.stop_callbacks.append(f)
178 self.stop_callbacks.append(f)
178
179
179 def notify_start(self, data):
180 def notify_start(self, data):
180 """Call this to trigger startup actions.
181 """Call this to trigger startup actions.
181
182
182 This logs the process startup and sets the state to 'running'. It is
183 This logs the process startup and sets the state to 'running'. It is
183 a pass-through so it can be used as a callback.
184 a pass-through so it can be used as a callback.
184 """
185 """
185
186
186 self.log.info('Process %r started: %r' % (self.args[0], data))
187 self.log.info('Process %r started: %r' % (self.args[0], data))
187 self.start_data = data
188 self.start_data = data
188 self.state = 'running'
189 self.state = 'running'
189 return data
190 return data
190
191
191 def notify_stop(self, data):
192 def notify_stop(self, data):
192 """Call this to trigger process stop actions.
193 """Call this to trigger process stop actions.
193
194
194 This logs the process stopping and sets the state to 'after'. Call
195 This logs the process stopping and sets the state to 'after'. Call
195 this to trigger all the deferreds from :func:`observe_stop`."""
196 this to trigger all the deferreds from :func:`observe_stop`."""
196
197
197 self.log.info('Process %r stopped: %r' % (self.args[0], data))
198 self.log.info('Process %r stopped: %r' % (self.args[0], data))
198 self.stop_data = data
199 self.stop_data = data
199 self.state = 'after'
200 self.state = 'after'
200 for i in range(len(self.stop_callbacks)):
201 for i in range(len(self.stop_callbacks)):
201 d = self.stop_callbacks.pop()
202 d = self.stop_callbacks.pop()
202 d(data)
203 d(data)
203 return data
204 return data
204
205
205 def signal(self, sig):
206 def signal(self, sig):
206 """Signal the process.
207 """Signal the process.
207
208
208 Return a semi-meaningless deferred after signaling the process.
209 Return a semi-meaningless deferred after signaling the process.
209
210
210 Parameters
211 Parameters
211 ----------
212 ----------
212 sig : str or int
213 sig : str or int
213 'KILL', 'INT', etc., or any signal number
214 'KILL', 'INT', etc., or any signal number
214 """
215 """
215 raise NotImplementedError('signal must be implemented in a subclass')
216 raise NotImplementedError('signal must be implemented in a subclass')
216
217
217
218
218 #-----------------------------------------------------------------------------
219 #-----------------------------------------------------------------------------
219 # Local process launchers
220 # Local process launchers
220 #-----------------------------------------------------------------------------
221 #-----------------------------------------------------------------------------
221
222
222
223
223 class LocalProcessLauncher(BaseLauncher):
224 class LocalProcessLauncher(BaseLauncher):
224 """Start and stop an external process in an asynchronous manner.
225 """Start and stop an external process in an asynchronous manner.
225
226
226 This will launch the external process with a working directory of
227 This will launch the external process with a working directory of
227 ``self.work_dir``.
228 ``self.work_dir``.
228 """
229 """
229
230
230 # This is used to to construct self.args, which is passed to
231 # This is used to to construct self.args, which is passed to
231 # spawnProcess.
232 # spawnProcess.
232 cmd_and_args = List([])
233 cmd_and_args = List([])
233 poll_frequency = Int(100) # in ms
234 poll_frequency = Int(100) # in ms
234
235
235 def __init__(self, work_dir=u'.', config=None, **kwargs):
236 def __init__(self, work_dir=u'.', config=None, **kwargs):
236 super(LocalProcessLauncher, self).__init__(
237 super(LocalProcessLauncher, self).__init__(
237 work_dir=work_dir, config=config, **kwargs
238 work_dir=work_dir, config=config, **kwargs
238 )
239 )
239 self.process = None
240 self.process = None
240 self.start_deferred = None
241 self.start_deferred = None
241 self.poller = None
242 self.poller = None
242
243
243 def find_args(self):
244 def find_args(self):
244 return self.cmd_and_args
245 return self.cmd_and_args
245
246
246 def start(self):
247 def start(self):
247 if self.state == 'before':
248 if self.state == 'before':
248 self.process = Popen(self.args,
249 self.process = Popen(self.args,
249 stdout=PIPE,stderr=PIPE,stdin=PIPE,
250 stdout=PIPE,stderr=PIPE,stdin=PIPE,
250 env=os.environ,
251 env=os.environ,
251 cwd=self.work_dir
252 cwd=self.work_dir
252 )
253 )
253
254
254 self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ)
255 self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ)
255 self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ)
256 self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ)
256 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
257 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
257 self.poller.start()
258 self.poller.start()
258 self.notify_start(self.process.pid)
259 self.notify_start(self.process.pid)
259 else:
260 else:
260 s = 'The process was already started and has state: %r' % self.state
261 s = 'The process was already started and has state: %r' % self.state
261 raise ProcessStateError(s)
262 raise ProcessStateError(s)
262
263
263 def stop(self):
264 def stop(self):
264 return self.interrupt_then_kill()
265 return self.interrupt_then_kill()
265
266
266 def signal(self, sig):
267 def signal(self, sig):
267 if self.state == 'running':
268 if self.state == 'running':
268 self.process.send_signal(sig)
269 self.process.send_signal(sig)
269
270
270 def interrupt_then_kill(self, delay=2.0):
271 def interrupt_then_kill(self, delay=2.0):
271 """Send INT, wait a delay and then send KILL."""
272 """Send INT, wait a delay and then send KILL."""
272 self.signal(SIGINT)
273 self.signal(SIGINT)
273 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
274 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
274 self.killer.start()
275 self.killer.start()
275
276
276 # callbacks, etc:
277 # callbacks, etc:
277
278
278 def handle_stdout(self, fd, events):
279 def handle_stdout(self, fd, events):
279 line = self.process.stdout.readline()
280 line = self.process.stdout.readline()
280 # a stopped process will be readable but return empty strings
281 # a stopped process will be readable but return empty strings
281 if line:
282 if line:
282 self.log.info(line[:-1])
283 self.log.info(line[:-1])
283 else:
284 else:
284 self.poll()
285 self.poll()
285
286
286 def handle_stderr(self, fd, events):
287 def handle_stderr(self, fd, events):
287 line = self.process.stderr.readline()
288 line = self.process.stderr.readline()
288 # a stopped process will be readable but return empty strings
289 # a stopped process will be readable but return empty strings
289 if line:
290 if line:
290 self.log.error(line[:-1])
291 self.log.error(line[:-1])
291 else:
292 else:
292 self.poll()
293 self.poll()
293
294
294 def poll(self):
295 def poll(self):
295 status = self.process.poll()
296 status = self.process.poll()
296 if status is not None:
297 if status is not None:
297 self.poller.stop()
298 self.poller.stop()
298 self.loop.remove_handler(self.process.stdout.fileno())
299 self.loop.remove_handler(self.process.stdout.fileno())
299 self.loop.remove_handler(self.process.stderr.fileno())
300 self.loop.remove_handler(self.process.stderr.fileno())
300 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
301 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
301 return status
302 return status
302
303
303 class LocalControllerLauncher(LocalProcessLauncher):
304 class LocalControllerLauncher(LocalProcessLauncher):
304 """Launch a controller as a regular external process."""
305 """Launch a controller as a regular external process."""
305
306
306 controller_cmd = List(ipcontroller_cmd_argv, config=True)
307 controller_cmd = List(ipcontrollerz_cmd_argv, config=True)
307 # Command line arguments to ipcontroller.
308 # Command line arguments to ipcontroller.
308 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
309 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
309
310
310 def find_args(self):
311 def find_args(self):
311 return self.controller_cmd + self.controller_args
312 return self.controller_cmd + self.controller_args
312
313
313 def start(self, cluster_dir):
314 def start(self, cluster_dir):
314 """Start the controller by cluster_dir."""
315 """Start the controller by cluster_dir."""
315 self.controller_args.extend(['--cluster-dir', cluster_dir])
316 self.controller_args.extend(['--cluster-dir', cluster_dir])
316 self.cluster_dir = unicode(cluster_dir)
317 self.cluster_dir = unicode(cluster_dir)
317 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
318 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
318 return super(LocalControllerLauncher, self).start()
319 return super(LocalControllerLauncher, self).start()
319
320
320
321
321 class LocalEngineLauncher(LocalProcessLauncher):
322 class LocalEngineLauncher(LocalProcessLauncher):
322 """Launch a single engine as a regular externall process."""
323 """Launch a single engine as a regular externall process."""
323
324
324 engine_cmd = List(ipengine_cmd_argv, config=True)
325 engine_cmd = List(ipenginez_cmd_argv, config=True)
325 # Command line arguments for ipengine.
326 # Command line arguments for ipengine.
326 engine_args = List(
327 engine_args = List(
327 ['--log-to-file','--log-level', str(logging.INFO)], config=True
328 ['--log-to-file','--log-level', str(logging.INFO)], config=True
328 )
329 )
329
330
330 def find_args(self):
331 def find_args(self):
331 return self.engine_cmd + self.engine_args
332 return self.engine_cmd + self.engine_args
332
333
333 def start(self, cluster_dir):
334 def start(self, cluster_dir):
334 """Start the engine by cluster_dir."""
335 """Start the engine by cluster_dir."""
335 self.engine_args.extend(['--cluster-dir', cluster_dir])
336 self.engine_args.extend(['--cluster-dir', cluster_dir])
336 self.cluster_dir = unicode(cluster_dir)
337 self.cluster_dir = unicode(cluster_dir)
337 return super(LocalEngineLauncher, self).start()
338 return super(LocalEngineLauncher, self).start()
338
339
339
340
340 class LocalEngineSetLauncher(BaseLauncher):
341 class LocalEngineSetLauncher(BaseLauncher):
341 """Launch a set of engines as regular external processes."""
342 """Launch a set of engines as regular external processes."""
342
343
343 # Command line arguments for ipengine.
344 # Command line arguments for ipengine.
344 engine_args = List(
345 engine_args = List(
345 ['--log-to-file','--log-level', str(logging.INFO)], config=True
346 ['--log-to-file','--log-level', str(logging.INFO)], config=True
346 )
347 )
347 # launcher class
348 # launcher class
348 launcher_class = LocalEngineLauncher
349 launcher_class = LocalEngineLauncher
349
350
350 launchers = Dict()
351 launchers = Dict()
351 stop_data = Dict()
352 stop_data = Dict()
352
353
353 def __init__(self, work_dir=u'.', config=None, **kwargs):
354 def __init__(self, work_dir=u'.', config=None, **kwargs):
354 super(LocalEngineSetLauncher, self).__init__(
355 super(LocalEngineSetLauncher, self).__init__(
355 work_dir=work_dir, config=config, **kwargs
356 work_dir=work_dir, config=config, **kwargs
356 )
357 )
357 self.stop_data = {}
358 self.stop_data = {}
358
359
359 def start(self, n, cluster_dir):
360 def start(self, n, cluster_dir):
360 """Start n engines by profile or cluster_dir."""
361 """Start n engines by profile or cluster_dir."""
361 self.cluster_dir = unicode(cluster_dir)
362 self.cluster_dir = unicode(cluster_dir)
362 dlist = []
363 dlist = []
363 for i in range(n):
364 for i in range(n):
364 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
365 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
365 # Copy the engine args over to each engine launcher.
366 # Copy the engine args over to each engine launcher.
366 el.engine_args = copy.deepcopy(self.engine_args)
367 el.engine_args = copy.deepcopy(self.engine_args)
367 el.on_stop(self._notice_engine_stopped)
368 el.on_stop(self._notice_engine_stopped)
368 d = el.start(cluster_dir)
369 d = el.start(cluster_dir)
369 if i==0:
370 if i==0:
370 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
371 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
371 self.launchers[i] = el
372 self.launchers[i] = el
372 dlist.append(d)
373 dlist.append(d)
373 self.notify_start(dlist)
374 self.notify_start(dlist)
374 # The consumeErrors here could be dangerous
375 # The consumeErrors here could be dangerous
375 # dfinal = gatherBoth(dlist, consumeErrors=True)
376 # dfinal = gatherBoth(dlist, consumeErrors=True)
376 # dfinal.addCallback(self.notify_start)
377 # dfinal.addCallback(self.notify_start)
377 return dlist
378 return dlist
378
379
379 def find_args(self):
380 def find_args(self):
380 return ['engine set']
381 return ['engine set']
381
382
382 def signal(self, sig):
383 def signal(self, sig):
383 dlist = []
384 dlist = []
384 for el in self.launchers.itervalues():
385 for el in self.launchers.itervalues():
385 d = el.signal(sig)
386 d = el.signal(sig)
386 dlist.append(d)
387 dlist.append(d)
387 # dfinal = gatherBoth(dlist, consumeErrors=True)
388 # dfinal = gatherBoth(dlist, consumeErrors=True)
388 return dlist
389 return dlist
389
390
390 def interrupt_then_kill(self, delay=1.0):
391 def interrupt_then_kill(self, delay=1.0):
391 dlist = []
392 dlist = []
392 for el in self.launchers.itervalues():
393 for el in self.launchers.itervalues():
393 d = el.interrupt_then_kill(delay)
394 d = el.interrupt_then_kill(delay)
394 dlist.append(d)
395 dlist.append(d)
395 # dfinal = gatherBoth(dlist, consumeErrors=True)
396 # dfinal = gatherBoth(dlist, consumeErrors=True)
396 return dlist
397 return dlist
397
398
398 def stop(self):
399 def stop(self):
399 return self.interrupt_then_kill()
400 return self.interrupt_then_kill()
400
401
401 def _notice_engine_stopped(self, data):
402 def _notice_engine_stopped(self, data):
402 pid = data['pid']
403 pid = data['pid']
403 for idx,el in self.launchers.iteritems():
404 for idx,el in self.launchers.iteritems():
404 if el.process.pid == pid:
405 if el.process.pid == pid:
405 break
406 break
406 self.launchers.pop(idx)
407 self.launchers.pop(idx)
407 self.stop_data[idx] = data
408 self.stop_data[idx] = data
408 if not self.launchers:
409 if not self.launchers:
409 self.notify_stop(self.stop_data)
410 self.notify_stop(self.stop_data)
410
411
411
412
412 #-----------------------------------------------------------------------------
413 #-----------------------------------------------------------------------------
413 # MPIExec launchers
414 # MPIExec launchers
414 #-----------------------------------------------------------------------------
415 #-----------------------------------------------------------------------------
415
416
416
417
417 class MPIExecLauncher(LocalProcessLauncher):
418 class MPIExecLauncher(LocalProcessLauncher):
418 """Launch an external process using mpiexec."""
419 """Launch an external process using mpiexec."""
419
420
420 # The mpiexec command to use in starting the process.
421 # The mpiexec command to use in starting the process.
421 mpi_cmd = List(['mpiexec'], config=True)
422 mpi_cmd = List(['mpiexec'], config=True)
422 # The command line arguments to pass to mpiexec.
423 # The command line arguments to pass to mpiexec.
423 mpi_args = List([], config=True)
424 mpi_args = List([], config=True)
424 # The program to start using mpiexec.
425 # The program to start using mpiexec.
425 program = List(['date'], config=True)
426 program = List(['date'], config=True)
426 # The command line argument to the program.
427 # The command line argument to the program.
427 program_args = List([], config=True)
428 program_args = List([], config=True)
428 # The number of instances of the program to start.
429 # The number of instances of the program to start.
429 n = Int(1, config=True)
430 n = Int(1, config=True)
430
431
431 def find_args(self):
432 def find_args(self):
432 """Build self.args using all the fields."""
433 """Build self.args using all the fields."""
433 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
434 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
434 self.program + self.program_args
435 self.program + self.program_args
435
436
436 def start(self, n):
437 def start(self, n):
437 """Start n instances of the program using mpiexec."""
438 """Start n instances of the program using mpiexec."""
438 self.n = n
439 self.n = n
439 return super(MPIExecLauncher, self).start()
440 return super(MPIExecLauncher, self).start()
440
441
441
442
442 class MPIExecControllerLauncher(MPIExecLauncher):
443 class MPIExecControllerLauncher(MPIExecLauncher):
443 """Launch a controller using mpiexec."""
444 """Launch a controller using mpiexec."""
444
445
445 controller_cmd = List(ipcontroller_cmd_argv, config=True)
446 controller_cmd = List(ipcontrollerz_cmd_argv, config=True)
446 # Command line arguments to ipcontroller.
447 # Command line arguments to ipcontroller.
447 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
448 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
448 n = Int(1, config=False)
449 n = Int(1, config=False)
449
450
450 def start(self, cluster_dir):
451 def start(self, cluster_dir):
451 """Start the controller by cluster_dir."""
452 """Start the controller by cluster_dir."""
452 self.controller_args.extend(['--cluster-dir', cluster_dir])
453 self.controller_args.extend(['--cluster-dir', cluster_dir])
453 self.cluster_dir = unicode(cluster_dir)
454 self.cluster_dir = unicode(cluster_dir)
454 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
455 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
455 return super(MPIExecControllerLauncher, self).start(1)
456 return super(MPIExecControllerLauncher, self).start(1)
456
457
457 def find_args(self):
458 def find_args(self):
458 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
459 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
459 self.controller_cmd + self.controller_args
460 self.controller_cmd + self.controller_args
460
461
461
462
462 class MPIExecEngineSetLauncher(MPIExecLauncher):
463 class MPIExecEngineSetLauncher(MPIExecLauncher):
463
464
464 program = List(ipengine_cmd_argv, config=True)
465 program = List(ipenginez_cmd_argv, config=True)
465 # Command line arguments for ipengine.
466 # Command line arguments for ipengine.
466 program_args = List(
467 program_args = List(
467 ['--log-to-file','--log-level', str(logging.INFO)], config=True
468 ['--log-to-file','--log-level', str(logging.INFO)], config=True
468 )
469 )
469 n = Int(1, config=True)
470 n = Int(1, config=True)
470
471
471 def start(self, n, cluster_dir):
472 def start(self, n, cluster_dir):
472 """Start n engines by profile or cluster_dir."""
473 """Start n engines by profile or cluster_dir."""
473 self.program_args.extend(['--cluster-dir', cluster_dir])
474 self.program_args.extend(['--cluster-dir', cluster_dir])
474 self.cluster_dir = unicode(cluster_dir)
475 self.cluster_dir = unicode(cluster_dir)
475 self.n = n
476 self.n = n
476 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
477 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
477 return super(MPIExecEngineSetLauncher, self).start(n)
478 return super(MPIExecEngineSetLauncher, self).start(n)
478
479
479 #-----------------------------------------------------------------------------
480 #-----------------------------------------------------------------------------
480 # SSH launchers
481 # SSH launchers
481 #-----------------------------------------------------------------------------
482 #-----------------------------------------------------------------------------
482
483
483 # TODO: Get SSH Launcher working again.
484 # TODO: Get SSH Launcher working again.
484
485
485 class SSHLauncher(LocalProcessLauncher):
486 class SSHLauncher(LocalProcessLauncher):
486 """A minimal launcher for ssh.
487 """A minimal launcher for ssh.
487
488
488 To be useful this will probably have to be extended to use the ``sshx``
489 To be useful this will probably have to be extended to use the ``sshx``
489 idea for environment variables. There could be other things this needs
490 idea for environment variables. There could be other things this needs
490 as well.
491 as well.
491 """
492 """
492
493
493 ssh_cmd = List(['ssh'], config=True)
494 ssh_cmd = List(['ssh'], config=True)
494 ssh_args = List(['-tt'], config=True)
495 ssh_args = List(['-tt'], config=True)
495 program = List(['date'], config=True)
496 program = List(['date'], config=True)
496 program_args = List([], config=True)
497 program_args = List([], config=True)
497 hostname = Str('', config=True)
498 hostname = CUnicode('', config=True)
498 user = Str('', config=True)
499 user = CUnicode('', config=True)
499 location = Str('')
500 location = CUnicode('')
500
501
501 def _hostname_changed(self, name, old, new):
502 def _hostname_changed(self, name, old, new):
502 if self.user:
503 if self.user:
503 self.location = '%s@%s' % (self.user, new)
504 self.location = u'%s@%s' % (self.user, new)
504 else:
505 else:
505 self.location = new
506 self.location = new
506
507
507 def _user_changed(self, name, old, new):
508 def _user_changed(self, name, old, new):
508 self.location = '%s@%s' % (new, self.hostname)
509 self.location = u'%s@%s' % (new, self.hostname)
509
510
510 def find_args(self):
511 def find_args(self):
511 return self.ssh_cmd + self.ssh_args + [self.location] + \
512 return self.ssh_cmd + self.ssh_args + [self.location] + \
512 self.program + self.program_args
513 self.program + self.program_args
513
514
514 def start(self, cluster_dir, hostname=None, user=None):
515 def start(self, cluster_dir, hostname=None, user=None):
515 self.cluster_dir = unicode(cluster_dir)
516 self.cluster_dir = unicode(cluster_dir)
516 if hostname is not None:
517 if hostname is not None:
517 self.hostname = hostname
518 self.hostname = hostname
518 if user is not None:
519 if user is not None:
519 self.user = user
520 self.user = user
520
521
521 return super(SSHLauncher, self).start()
522 return super(SSHLauncher, self).start()
522
523
523 def signal(self, sig):
524 def signal(self, sig):
524 if self.state == 'running':
525 if self.state == 'running':
525 # send escaped ssh connection-closer
526 # send escaped ssh connection-closer
526 self.process.stdin.write('~.')
527 self.process.stdin.write('~.')
527 self.process.stdin.flush()
528 self.process.stdin.flush()
528
529
529
530
530
531
531 class SSHControllerLauncher(SSHLauncher):
532 class SSHControllerLauncher(SSHLauncher):
532
533
533 program = List(ipcontroller_cmd_argv, config=True)
534 program = List(ipcontrollerz_cmd_argv, config=True)
534 # Command line arguments to ipcontroller.
535 # Command line arguments to ipcontroller.
535 program_args = List(['-r', '--log-to-file','--log-level', str(logging.INFO)], config=True)
536 program_args = List(['-r', '--log-to-file','--log-level', str(logging.INFO)], config=True)
536
537
537
538
538 class SSHEngineLauncher(SSHLauncher):
539 class SSHEngineLauncher(SSHLauncher):
539 program = List(ipengine_cmd_argv, config=True)
540 program = List(ipenginez_cmd_argv, config=True)
540 # Command line arguments for ipengine.
541 # Command line arguments for ipengine.
541 program_args = List(
542 program_args = List(
542 ['--log-to-file','--log-level', str(logging.INFO)], config=True
543 ['--log-to-file','--log-level', str(logging.INFO)], config=True
543 )
544 )
544
545
545 class SSHEngineSetLauncher(LocalEngineSetLauncher):
546 class SSHEngineSetLauncher(LocalEngineSetLauncher):
546 launcher_class = SSHEngineLauncher
547 launcher_class = SSHEngineLauncher
547 engines = Dict(config=True)
548 engines = Dict(config=True)
548
549
549 def start(self, n, cluster_dir):
550 def start(self, n, cluster_dir):
550 """Start engines by profile or cluster_dir.
551 """Start engines by profile or cluster_dir.
551 `n` is ignored, and the `engines` config property is used instead.
552 `n` is ignored, and the `engines` config property is used instead.
552 """
553 """
553
554
554 self.cluster_dir = unicode(cluster_dir)
555 self.cluster_dir = unicode(cluster_dir)
555 dlist = []
556 dlist = []
556 for host, n in self.engines.iteritems():
557 for host, n in self.engines.iteritems():
557 if isinstance(n, (tuple, list)):
558 if isinstance(n, (tuple, list)):
558 n, args = n
559 n, args = n
559 else:
560 else:
560 args = copy.deepcopy(self.engine_args)
561 args = copy.deepcopy(self.engine_args)
561
562
562 if '@' in host:
563 if '@' in host:
563 user,host = host.split('@',1)
564 user,host = host.split('@',1)
564 else:
565 else:
565 user=None
566 user=None
566 for i in range(n):
567 for i in range(n):
567 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
568 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
568
569
569 # Copy the engine args over to each engine launcher.
570 # Copy the engine args over to each engine launcher.
570 i
571 i
571 el.program_args = args
572 el.program_args = args
572 el.on_stop(self._notice_engine_stopped)
573 el.on_stop(self._notice_engine_stopped)
573 d = el.start(cluster_dir, user=user, hostname=host)
574 d = el.start(cluster_dir, user=user, hostname=host)
574 if i==0:
575 if i==0:
575 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
576 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
576 self.launchers[host+str(i)] = el
577 self.launchers[host+str(i)] = el
577 dlist.append(d)
578 dlist.append(d)
578 self.notify_start(dlist)
579 self.notify_start(dlist)
579 return dlist
580 return dlist
580
581
581
582
582
583
583 #-----------------------------------------------------------------------------
584 #-----------------------------------------------------------------------------
584 # Windows HPC Server 2008 scheduler launchers
585 # Windows HPC Server 2008 scheduler launchers
585 #-----------------------------------------------------------------------------
586 #-----------------------------------------------------------------------------
586
587
587
588
588 # This is only used on Windows.
589 # This is only used on Windows.
589 def find_job_cmd():
590 def find_job_cmd():
590 if os.name=='nt':
591 if os.name=='nt':
591 try:
592 try:
592 return find_cmd('job')
593 return find_cmd('job')
593 except FindCmdError:
594 except FindCmdError:
594 return 'job'
595 return 'job'
595 else:
596 else:
596 return 'job'
597 return 'job'
597
598
598
599
599 class WindowsHPCLauncher(BaseLauncher):
600 class WindowsHPCLauncher(BaseLauncher):
600
601
601 # A regular expression used to get the job id from the output of the
602 # A regular expression used to get the job id from the output of the
602 # submit_command.
603 # submit_command.
603 job_id_regexp = Str(r'\d+', config=True)
604 job_id_regexp = Str(r'\d+', config=True)
604 # The filename of the instantiated job script.
605 # The filename of the instantiated job script.
605 job_file_name = Unicode(u'ipython_job.xml', config=True)
606 job_file_name = CUnicode(u'ipython_job.xml', config=True)
606 # The full path to the instantiated job script. This gets made dynamically
607 # The full path to the instantiated job script. This gets made dynamically
607 # by combining the work_dir with the job_file_name.
608 # by combining the work_dir with the job_file_name.
608 job_file = Unicode(u'')
609 job_file = CUnicode(u'')
609 # The hostname of the scheduler to submit the job to
610 # The hostname of the scheduler to submit the job to
610 scheduler = Str('', config=True)
611 scheduler = CUnicode('', config=True)
611 job_cmd = Str(find_job_cmd(), config=True)
612 job_cmd = CUnicode(find_job_cmd(), config=True)
612
613
613 def __init__(self, work_dir=u'.', config=None, **kwargs):
614 def __init__(self, work_dir=u'.', config=None, **kwargs):
614 super(WindowsHPCLauncher, self).__init__(
615 super(WindowsHPCLauncher, self).__init__(
615 work_dir=work_dir, config=config, **kwargs
616 work_dir=work_dir, config=config, **kwargs
616 )
617 )
617
618
618 @property
619 @property
619 def job_file(self):
620 def job_file(self):
620 return os.path.join(self.work_dir, self.job_file_name)
621 return os.path.join(self.work_dir, self.job_file_name)
621
622
622 def write_job_file(self, n):
623 def write_job_file(self, n):
623 raise NotImplementedError("Implement write_job_file in a subclass.")
624 raise NotImplementedError("Implement write_job_file in a subclass.")
624
625
625 def find_args(self):
626 def find_args(self):
626 return ['job.exe']
627 return [u'job.exe']
627
628
628 def parse_job_id(self, output):
629 def parse_job_id(self, output):
629 """Take the output of the submit command and return the job id."""
630 """Take the output of the submit command and return the job id."""
630 m = re.search(self.job_id_regexp, output)
631 m = re.search(self.job_id_regexp, output)
631 if m is not None:
632 if m is not None:
632 job_id = m.group()
633 job_id = m.group()
633 else:
634 else:
634 raise LauncherError("Job id couldn't be determined: %s" % output)
635 raise LauncherError("Job id couldn't be determined: %s" % output)
635 self.job_id = job_id
636 self.job_id = job_id
636 self.log.info('Job started with job id: %r' % job_id)
637 self.log.info('Job started with job id: %r' % job_id)
637 return job_id
638 return job_id
638
639
639 def start(self, n):
640 def start(self, n):
640 """Start n copies of the process using the Win HPC job scheduler."""
641 """Start n copies of the process using the Win HPC job scheduler."""
641 self.write_job_file(n)
642 self.write_job_file(n)
642 args = [
643 args = [
643 'submit',
644 'submit',
644 '/jobfile:%s' % self.job_file,
645 '/jobfile:%s' % self.job_file,
645 '/scheduler:%s' % self.scheduler
646 '/scheduler:%s' % self.scheduler
646 ]
647 ]
647 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
648 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
648 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
649 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
649 output = check_output([self.job_cmd]+args,
650 output = check_output([self.job_cmd]+args,
650 env=os.environ,
651 env=os.environ,
651 cwd=self.work_dir,
652 cwd=self.work_dir,
652 stderr=STDOUT
653 stderr=STDOUT
653 )
654 )
654 job_id = self.parse_job_id(output)
655 job_id = self.parse_job_id(output)
655 self.notify_start(job_id)
656 self.notify_start(job_id)
656 return job_id
657 return job_id
657
658
658 def stop(self):
659 def stop(self):
659 args = [
660 args = [
660 'cancel',
661 'cancel',
661 self.job_id,
662 self.job_id,
662 '/scheduler:%s' % self.scheduler
663 '/scheduler:%s' % self.scheduler
663 ]
664 ]
664 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
665 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
665 try:
666 try:
666 output = check_output([self.job_cmd]+args,
667 output = check_output([self.job_cmd]+args,
667 env=os.environ,
668 env=os.environ,
668 cwd=self.work_dir,
669 cwd=self.work_dir,
669 stderr=STDOUT
670 stderr=STDOUT
670 )
671 )
671 except:
672 except:
672 output = 'The job already appears to be stoppped: %r' % self.job_id
673 output = 'The job already appears to be stoppped: %r' % self.job_id
673 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
674 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
674 return output
675 return output
675
676
676
677
677 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
678 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
678
679
679 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
680 job_file_name = CUnicode(u'ipcontroller_job.xml', config=True)
680 extra_args = List([], config=False)
681 extra_args = List([], config=False)
681
682
682 def write_job_file(self, n):
683 def write_job_file(self, n):
683 job = IPControllerJob(config=self.config)
684 job = IPControllerJob(config=self.config)
684
685
685 t = IPControllerTask(config=self.config)
686 t = IPControllerTask(config=self.config)
686 # The tasks work directory is *not* the actual work directory of
687 # The tasks work directory is *not* the actual work directory of
687 # the controller. It is used as the base path for the stdout/stderr
688 # the controller. It is used as the base path for the stdout/stderr
688 # files that the scheduler redirects to.
689 # files that the scheduler redirects to.
689 t.work_directory = self.cluster_dir
690 t.work_directory = self.cluster_dir
690 # Add the --cluster-dir and from self.start().
691 # Add the --cluster-dir and from self.start().
691 t.controller_args.extend(self.extra_args)
692 t.controller_args.extend(self.extra_args)
692 job.add_task(t)
693 job.add_task(t)
693
694
694 self.log.info("Writing job description file: %s" % self.job_file)
695 self.log.info("Writing job description file: %s" % self.job_file)
695 job.write(self.job_file)
696 job.write(self.job_file)
696
697
697 @property
698 @property
698 def job_file(self):
699 def job_file(self):
699 return os.path.join(self.cluster_dir, self.job_file_name)
700 return os.path.join(self.cluster_dir, self.job_file_name)
700
701
701 def start(self, cluster_dir):
702 def start(self, cluster_dir):
702 """Start the controller by cluster_dir."""
703 """Start the controller by cluster_dir."""
703 self.extra_args = ['--cluster-dir', cluster_dir]
704 self.extra_args = ['--cluster-dir', cluster_dir]
704 self.cluster_dir = unicode(cluster_dir)
705 self.cluster_dir = unicode(cluster_dir)
705 return super(WindowsHPCControllerLauncher, self).start(1)
706 return super(WindowsHPCControllerLauncher, self).start(1)
706
707
707
708
708 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
709 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
709
710
710 job_file_name = Unicode(u'ipengineset_job.xml', config=True)
711 job_file_name = CUnicode(u'ipengineset_job.xml', config=True)
711 extra_args = List([], config=False)
712 extra_args = List([], config=False)
712
713
713 def write_job_file(self, n):
714 def write_job_file(self, n):
714 job = IPEngineSetJob(config=self.config)
715 job = IPEngineSetJob(config=self.config)
715
716
716 for i in range(n):
717 for i in range(n):
717 t = IPEngineTask(config=self.config)
718 t = IPEngineTask(config=self.config)
718 # The tasks work directory is *not* the actual work directory of
719 # The tasks work directory is *not* the actual work directory of
719 # the engine. It is used as the base path for the stdout/stderr
720 # the engine. It is used as the base path for the stdout/stderr
720 # files that the scheduler redirects to.
721 # files that the scheduler redirects to.
721 t.work_directory = self.cluster_dir
722 t.work_directory = self.cluster_dir
722 # Add the --cluster-dir and from self.start().
723 # Add the --cluster-dir and from self.start().
723 t.engine_args.extend(self.extra_args)
724 t.engine_args.extend(self.extra_args)
724 job.add_task(t)
725 job.add_task(t)
725
726
726 self.log.info("Writing job description file: %s" % self.job_file)
727 self.log.info("Writing job description file: %s" % self.job_file)
727 job.write(self.job_file)
728 job.write(self.job_file)
728
729
729 @property
730 @property
730 def job_file(self):
731 def job_file(self):
731 return os.path.join(self.cluster_dir, self.job_file_name)
732 return os.path.join(self.cluster_dir, self.job_file_name)
732
733
733 def start(self, n, cluster_dir):
734 def start(self, n, cluster_dir):
734 """Start the controller by cluster_dir."""
735 """Start the controller by cluster_dir."""
735 self.extra_args = ['--cluster-dir', cluster_dir]
736 self.extra_args = ['--cluster-dir', cluster_dir]
736 self.cluster_dir = unicode(cluster_dir)
737 self.cluster_dir = unicode(cluster_dir)
737 return super(WindowsHPCEngineSetLauncher, self).start(n)
738 return super(WindowsHPCEngineSetLauncher, self).start(n)
738
739
739
740
740 #-----------------------------------------------------------------------------
741 #-----------------------------------------------------------------------------
741 # Batch (PBS) system launchers
742 # Batch (PBS) system launchers
742 #-----------------------------------------------------------------------------
743 #-----------------------------------------------------------------------------
743
744
744 class BatchSystemLauncher(BaseLauncher):
745 class BatchSystemLauncher(BaseLauncher):
745 """Launch an external process using a batch system.
746 """Launch an external process using a batch system.
746
747
747 This class is designed to work with UNIX batch systems like PBS, LSF,
748 This class is designed to work with UNIX batch systems like PBS, LSF,
748 GridEngine, etc. The overall model is that there are different commands
749 GridEngine, etc. The overall model is that there are different commands
749 like qsub, qdel, etc. that handle the starting and stopping of the process.
750 like qsub, qdel, etc. that handle the starting and stopping of the process.
750
751
751 This class also has the notion of a batch script. The ``batch_template``
752 This class also has the notion of a batch script. The ``batch_template``
752 attribute can be set to a string that is a template for the batch script.
753 attribute can be set to a string that is a template for the batch script.
753 This template is instantiated using Itpl. Thus the template can use
754 This template is instantiated using Itpl. Thus the template can use
754 ${n} fot the number of instances. Subclasses can add additional variables
755 ${n} fot the number of instances. Subclasses can add additional variables
755 to the template dict.
756 to the template dict.
756 """
757 """
757
758
758 # Subclasses must fill these in. See PBSEngineSet
759 # Subclasses must fill these in. See PBSEngineSet
759 # The name of the command line program used to submit jobs.
760 # The name of the command line program used to submit jobs.
760 submit_command = Str('', config=True)
761 submit_command = List([''], config=True)
761 # The name of the command line program used to delete jobs.
762 # The name of the command line program used to delete jobs.
762 delete_command = Str('', config=True)
763 delete_command = List([''], config=True)
763 # A regular expression used to get the job id from the output of the
764 # A regular expression used to get the job id from the output of the
764 # submit_command.
765 # submit_command.
765 job_id_regexp = Str('', config=True)
766 job_id_regexp = CUnicode('', config=True)
766 # The string that is the batch script template itself.
767 # The string that is the batch script template itself.
767 batch_template = Str('', config=True)
768 batch_template = CUnicode('', config=True)
769 # The file that contains the batch template
770 batch_template_file = CUnicode(u'', config=True)
768 # The filename of the instantiated batch script.
771 # The filename of the instantiated batch script.
769 batch_file_name = Unicode(u'batch_script', config=True)
772 batch_file_name = CUnicode(u'batch_script', config=True)
773 # The PBS Queue
774 queue = CUnicode(u'', config=True)
775
776 # not configurable, override in subclasses
777 # PBS Job Array regex
778 job_array_regexp = CUnicode('')
779 job_array_template = CUnicode('')
780 # PBS Queue regex
781 queue_regexp = CUnicode('')
782 queue_template = CUnicode('')
783 # The default batch template, override in subclasses
784 default_template = CUnicode('')
770 # The full path to the instantiated batch script.
785 # The full path to the instantiated batch script.
771 batch_file = Unicode(u'')
786 batch_file = CUnicode(u'')
772 # the format dict used with batch_template:
787 # the format dict used with batch_template:
773 context = Dict()
788 context = Dict()
774
789
775
790
776 def find_args(self):
791 def find_args(self):
777 return [self.submit_command, self.batch_file]
792 return self.submit_command + [self.batch_file]
778
793
779 def __init__(self, work_dir=u'.', config=None, **kwargs):
794 def __init__(self, work_dir=u'.', config=None, **kwargs):
780 super(BatchSystemLauncher, self).__init__(
795 super(BatchSystemLauncher, self).__init__(
781 work_dir=work_dir, config=config, **kwargs
796 work_dir=work_dir, config=config, **kwargs
782 )
797 )
783 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
798 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
784
799
785 def parse_job_id(self, output):
800 def parse_job_id(self, output):
786 """Take the output of the submit command and return the job id."""
801 """Take the output of the submit command and return the job id."""
787 m = re.search(self.job_id_regexp, output)
802 m = re.search(self.job_id_regexp, output)
788 if m is not None:
803 if m is not None:
789 job_id = m.group()
804 job_id = m.group()
790 else:
805 else:
791 raise LauncherError("Job id couldn't be determined: %s" % output)
806 raise LauncherError("Job id couldn't be determined: %s" % output)
792 self.job_id = job_id
807 self.job_id = job_id
793 self.log.info('Job submitted with job id: %r' % job_id)
808 self.log.info('Job submitted with job id: %r' % job_id)
794 return job_id
809 return job_id
795
810
796 def write_batch_script(self, n):
811 def write_batch_script(self, n):
797 """Instantiate and write the batch script to the work_dir."""
812 """Instantiate and write the batch script to the work_dir."""
798 self.context['n'] = n
813 self.context['n'] = n
814 self.context['queue'] = self.queue
815 print self.context
816 # first priority is batch_template if set
817 if self.batch_template_file and not self.batch_template:
818 # second priority is batch_template_file
819 with open(self.batch_template_file) as f:
820 self.batch_template = f.read()
821 if not self.batch_template:
822 # third (last) priority is default_template
823 self.batch_template = self.default_template
824
825 regex = re.compile(self.job_array_regexp)
826 # print regex.search(self.batch_template)
827 if not regex.search(self.batch_template):
828 self.log.info("adding job array settings to batch script")
829 firstline, rest = self.batch_template.split('\n',1)
830 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
831
832 regex = re.compile(self.queue_regexp)
833 # print regex.search(self.batch_template)
834 if self.queue and not regex.search(self.batch_template):
835 self.log.info("adding PBS queue settings to batch script")
836 firstline, rest = self.batch_template.split('\n',1)
837 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
838
799 script_as_string = Itpl.itplns(self.batch_template, self.context)
839 script_as_string = Itpl.itplns(self.batch_template, self.context)
800 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
840 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
801 f = open(self.batch_file, 'w')
841
802 f.write(script_as_string)
842 with open(self.batch_file, 'w') as f:
803 f.close()
843 f.write(script_as_string)
844 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
804
845
805 def start(self, n, cluster_dir):
846 def start(self, n, cluster_dir):
806 """Start n copies of the process using a batch system."""
847 """Start n copies of the process using a batch system."""
807 # Here we save profile and cluster_dir in the context so they
848 # Here we save profile and cluster_dir in the context so they
808 # can be used in the batch script template as ${profile} and
849 # can be used in the batch script template as ${profile} and
809 # ${cluster_dir}
850 # ${cluster_dir}
810 self.context['cluster_dir'] = cluster_dir
851 self.context['cluster_dir'] = cluster_dir
811 self.cluster_dir = unicode(cluster_dir)
852 self.cluster_dir = unicode(cluster_dir)
812 self.write_batch_script(n)
853 self.write_batch_script(n)
813 output = check_output(self.args, env=os.environ)
854 output = check_output(self.args, env=os.environ)
814
855
815 job_id = self.parse_job_id(output)
856 job_id = self.parse_job_id(output)
816 self.notify_start(job_id)
857 self.notify_start(job_id)
817 return job_id
858 return job_id
818
859
819 def stop(self):
860 def stop(self):
820 output = check_output([self.delete_command, self.job_id], env=os.environ)
861 output = check_output(self.delete_command+[self.job_id], env=os.environ)
821 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
862 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
822 return output
863 return output
823
864
824
865
825 class PBSLauncher(BatchSystemLauncher):
866 class PBSLauncher(BatchSystemLauncher):
826 """A BatchSystemLauncher subclass for PBS."""
867 """A BatchSystemLauncher subclass for PBS."""
827
868
828 submit_command = Str('qsub', config=True)
869 submit_command = List(['qsub'], config=True)
829 delete_command = Str('qdel', config=True)
870 delete_command = List(['qdel'], config=True)
830 job_id_regexp = Str(r'\d+', config=True)
871 job_id_regexp = CUnicode(r'\d+', config=True)
831 batch_template = Str('', config=True)
872
832 batch_file_name = Unicode(u'pbs_batch_script', config=True)
873 batch_file = CUnicode(u'')
833 batch_file = Unicode(u'')
874 job_array_regexp = CUnicode('#PBS\W+-t\W+[\w\d\-\$]+')
875 job_array_template = CUnicode('#PBS -t 1-$n')
876 queue_regexp = CUnicode('#PBS\W+-q\W+\$?\w+')
877 queue_template = CUnicode('#PBS -q $queue')
834
878
835
879
836 class PBSControllerLauncher(PBSLauncher):
880 class PBSControllerLauncher(PBSLauncher):
837 """Launch a controller using PBS."""
881 """Launch a controller using PBS."""
838
882
839 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
883 batch_file_name = CUnicode(u'pbs_controller', config=True)
884 default_template= CUnicode("""#!/bin/sh
885 #PBS -V
886 #PBS -N ipcontrollerz
887 %s --log-to-file --cluster-dir $cluster_dir
888 """%(' '.join(ipcontrollerz_cmd_argv)))
840
889
841 def start(self, cluster_dir):
890 def start(self, cluster_dir):
842 """Start the controller by profile or cluster_dir."""
891 """Start the controller by profile or cluster_dir."""
843 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
892 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
844 return super(PBSControllerLauncher, self).start(1, cluster_dir)
893 return super(PBSControllerLauncher, self).start(1, cluster_dir)
845
894
846
895
847 class PBSEngineSetLauncher(PBSLauncher):
896 class PBSEngineSetLauncher(PBSLauncher):
848
897 """Launch Engines using PBS"""
849 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
898 batch_file_name = CUnicode(u'pbs_engines', config=True)
899 default_template= CUnicode(u"""#!/bin/sh
900 #PBS -V
901 #PBS -N ipenginez
902 %s --cluster-dir $cluster_dir
903 """%(' '.join(ipenginez_cmd_argv)))
850
904
851 def start(self, n, cluster_dir):
905 def start(self, n, cluster_dir):
852 """Start n engines by profile or cluster_dir."""
906 """Start n engines by profile or cluster_dir."""
853 self.log.info('Starting PBSEngineSetLauncher: %r' % self.args)
907 self.log.info('Starting %n engines with PBSEngineSetLauncher: %r' % (n, self.args))
854 return super(PBSEngineSetLauncher, self).start(n, cluster_dir)
908 return super(PBSEngineSetLauncher, self).start(n, cluster_dir)
855
909
910 #SGE is very similar to PBS
911
912 class SGELauncher(PBSLauncher):
913 """Sun GridEngine is a PBS clone with slightly different syntax"""
914 job_array_regexp = CUnicode('#$$\W+-t\W+[\w\d\-\$]+')
915 job_array_template = CUnicode('#$$ -t 1-$n')
916 queue_regexp = CUnicode('#$$\W+-q\W+\$?\w+')
917 queue_template = CUnicode('#$$ -q $queue')
918
919 class SGEControllerLauncher(SGELauncher):
920 """Launch a controller using SGE."""
921
922 batch_file_name = CUnicode(u'sge_controller', config=True)
923 default_template= CUnicode(u"""#$$ -V
924 #$$ -S /bin/sh
925 #$$ -N ipcontrollerz
926 %s --log-to-file --cluster-dir $cluster_dir
927 """%(' '.join(ipcontrollerz_cmd_argv)))
928
929 def start(self, cluster_dir):
930 """Start the controller by profile or cluster_dir."""
931 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
932 return super(PBSControllerLauncher, self).start(1, cluster_dir)
933
934 class SGEEngineSetLauncher(SGELauncher):
935 """Launch Engines with SGE"""
936 batch_file_name = CUnicode(u'sge_engines', config=True)
937 default_template = CUnicode("""#$$ -V
938 #$$ -S /bin/sh
939 #$$ -N ipenginez
940 %s --cluster-dir $cluster_dir
941 """%(' '.join(ipenginez_cmd_argv)))
942
943 def start(self, n, cluster_dir):
944 """Start n engines by profile or cluster_dir."""
945 self.log.info('Starting %n engines with SGEEngineSetLauncher: %r' % (n, self.args))
946 return super(SGEEngineSetLauncher, self).start(n, cluster_dir)
947
856
948
857 #-----------------------------------------------------------------------------
949 #-----------------------------------------------------------------------------
858 # A launcher for ipcluster itself!
950 # A launcher for ipcluster itself!
859 #-----------------------------------------------------------------------------
951 #-----------------------------------------------------------------------------
860
952
861
953
862 class IPClusterLauncher(LocalProcessLauncher):
954 class IPClusterLauncher(LocalProcessLauncher):
863 """Launch the ipcluster program in an external process."""
955 """Launch the ipcluster program in an external process."""
864
956
865 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
957 ipcluster_cmd = List(ipclusterz_cmd_argv, config=True)
866 # Command line arguments to pass to ipcluster.
958 # Command line arguments to pass to ipcluster.
867 ipcluster_args = List(
959 ipcluster_args = List(
868 ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True)
960 ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True)
869 ipcluster_subcommand = Str('start')
961 ipcluster_subcommand = Str('start')
870 ipcluster_n = Int(2)
962 ipcluster_n = Int(2)
871
963
872 def find_args(self):
964 def find_args(self):
873 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
965 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
874 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
966 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
875
967
876 def start(self):
968 def start(self):
877 self.log.info("Starting ipcluster: %r" % self.args)
969 self.log.info("Starting ipcluster: %r" % self.args)
878 return super(IPClusterLauncher, self).start()
970 return super(IPClusterLauncher, self).start()
879
971
General Comments 0
You need to be logged in to leave comments. Login now