##// END OF EJS Templates
Initial version of Win HPC job scehduler support.
bgranger -
Show More
@@ -1,155 +1,194 b''
1 import os
1 import os
2
2
3 c = get_config()
3 c = get_config()
4
4
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Select which launchers to use
6 # Select which launchers to use
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 # This allows you to control what method is used to start the controller
9 # This allows you to control what method is used to start the controller
10 # and engines. The following methods are currently supported:
10 # and engines. The following methods are currently supported:
11 # * Start as a regular process on localhost.
11 # * Start as a regular process on localhost.
12 # * Start using mpiexec.
12 # * Start using mpiexec.
13 # * Start using PBS
13 # * Start using PBS
14 # * Start using SSH (currently broken)
14 # * Start using SSH (currently broken)
15
15
16 # The selected launchers can be configured below.
16 # The selected launchers can be configured below.
17
17
18 # Options are (LocalControllerLauncher, MPIExecControllerLauncher,
18 # Options are (LocalControllerLauncher, MPIExecControllerLauncher,
19 # PBSControllerLauncher)
19 # PBSControllerLauncher, WindowsHPCControllerLauncher)
20 # c.Global.controller_launcher = 'IPython.kernel.launcher.LocalControllerLauncher'
20 # c.Global.controller_launcher = 'IPython.kernel.launcher.LocalControllerLauncher'
21
21
22 # Options are (LocalEngineSetLauncher, MPIExecEngineSetLauncher,
22 # Options are (LocalEngineSetLauncher, MPIExecEngineSetLauncher,
23 # PBSEngineSetLauncher)
23 # PBSEngineSetLauncher)
24 # c.Global.engine_launcher = 'IPython.kernel.launcher.LocalEngineSetLauncher'
24 # c.Global.engine_launcher = 'IPython.kernel.launcher.LocalEngineSetLauncher'
25
25
26 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
27 # Global configuration
27 # Global configuration
28 #-----------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
29
29
30 # The default number of engine that will be started. This is overridden by
30 # The default number of engine that will be started. This is overridden by
31 # the -n command line option: "ipcluster start -n 4"
31 # the -n command line option: "ipcluster start -n 4"
32 # c.Global.n = 2
32 # c.Global.n = 2
33
33
34 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
34 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
35 # c.Global.log_to_file = False
35 # c.Global.log_to_file = False
36
36
37 # Remove old logs from cluster_dir/log before starting.
37 # Remove old logs from cluster_dir/log before starting.
38 # c.Global.clean_logs = True
38 # c.Global.clean_logs = True
39
39
40 #-----------------------------------------------------------------------------
40 #-----------------------------------------------------------------------------
41 # Controller launcher configuration
41 # Controller launcher configuration
42 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
43
43
44 # Configure how the controller is started. The configuration of the controller
44 # Configure how the controller is started. The configuration of the controller
45 # can also bet setup by editing the controller config file:
45 # can also bet setup by editing the controller config file:
46 # ipcontroller_config.py
46 # ipcontroller_config.py
47
47
48 # The command line arguments to call the controller with.
48 # The command line arguments to call the controller with.
49 # c.LocalControllerLauncher.controller_args = \
49 # c.LocalControllerLauncher.controller_args = \
50 # ['--log-to-file','--log-level', '40']
50 # ['--log-to-file','--log-level', '40']
51
51
52 # The mpiexec/mpirun command to use in started the controller.
52 # The mpiexec/mpirun command to use in started the controller.
53 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
53 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
54
54
55 # Additional arguments to pass to the actual mpiexec command.
55 # Additional arguments to pass to the actual mpiexec command.
56 # c.MPIExecControllerLauncher.mpi_args = []
56 # c.MPIExecControllerLauncher.mpi_args = []
57
57
58 # The command line argument to call the controller with.
58 # The command line argument to call the controller with.
59 # c.MPIExecControllerLauncher.controller_args = \
59 # c.MPIExecControllerLauncher.controller_args = \
60 # ['--log-to-file','--log-level', '40']
60 # ['--log-to-file','--log-level', '40']
61
61
62 # The command line program to use to submit a PBS job.
62 # The command line program to use to submit a PBS job.
63 # c.PBSControllerLauncher.submit_command = 'qsub'
63 # c.PBSControllerLauncher.submit_command = 'qsub'
64
64
65 # The command line program to use to delete a PBS job.
65 # The command line program to use to delete a PBS job.
66 # c.PBSControllerLauncher.delete_command = 'qdel'
66 # c.PBSControllerLauncher.delete_command = 'qdel'
67
67
68 # A regular expression that takes the output of qsub and find the job id.
68 # A regular expression that takes the output of qsub and find the job id.
69 # c.PBSControllerLauncher.job_id_regexp = '\d+'
69 # c.PBSControllerLauncher.job_id_regexp = '\d+'
70
70
71 # The batch submission script used to start the controller. This is where
71 # The batch submission script used to start the controller. This is where
72 # environment variables would be setup, etc. This string is interpolated using
72 # environment variables would be setup, etc. This string is interpolated using
73 # the Itpl module in IPython.external. Basically, you can use ${profile} for
73 # the Itpl module in IPython.external. Basically, you can use ${profile} for
74 # the controller profile or ${cluster_dir} for the cluster_dir.
74 # the controller profile or ${cluster_dir} for the cluster_dir.
75 # c.PBSControllerLauncher.batch_template = """"""
75 # c.PBSControllerLauncher.batch_template = """"""
76
76
77 # The name of the instantiated batch script that will actually be used to
77 # The name of the instantiated batch script that will actually be used to
78 # submit the job. This will be written to the cluster directory.
78 # submit the job. This will be written to the cluster directory.
79 # c.PBSControllerLauncher.batch_file_name = u'pbs_batch_script_controller'
79 # c.PBSControllerLauncher.batch_file_name = u'pbs_batch_script_controller'
80
80
81 #-----------------------------------------------------------------------------
81 #-----------------------------------------------------------------------------
82 # Windows HPC Server 2008 launcher configuration
83 #-----------------------------------------------------------------------------
84
85 # c.WinHPCJob.username = 'DOMAIN\\user'
86 # c.WinHPCJob.priority = 'Highest'
87 # c.WinHPCJob.requested_nodes = ''
88 # c.WinHPCJob.project = ''
89 # c.WinHPCJob.is_exclusive = False
90
91 # c.WinHPCTask.environment_variables = {}
92 # c.WinHPCTask.work_directory = ''
93 # c.WinHPCTask.is_rerunnable = True
94
95 # c.IPControllerTask.task_name = 'IPController'
96 # c.IPControllerTask.controller_cmd = ['ipcontroller.exe']
97 # c.IPControllerTask.controller_args = ['--log-to-file', '--log-level', '40']
98 # c.IPControllerTask.environment_variables = {}
99
100 # c.IPEngineTask.task_name = 'IPController'
101 # c.IPEngineTask.engine_cmd = ['ipengine.exe']
102 # c.IPEngineTask.engine_args = ['--log-to-file', '--log-level', '40']
103 # c.IPEngineTask.environment_variables = {}
104
105 # c.WindowsHPCLauncher.scheduler = 'HEADNODE'
106 # c.WindowsHPCLauncher.username = '\\DOMAIN\USERNAME'
107 # c.WindowsHPCLauncher.priority = 'Highest'
108 # c.WindowsHPCLauncher.requested_nodes = ''
109 # c.WindowsHPCLauncher.job_file_name = u'ipython_job.xml'
110 # c.WindowsHPCLauncher.project = 'MyProject'
111
112 # c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE'
113 # c.WindowsHPCControllerLauncher.username = '\\DOMAIN\USERNAME'
114 # c.WindowsHPCControllerLauncher.priority = 'Highest'
115 # c.WindowsHPCControllerLauncher.requested_nodes = ''
116 # c.WindowsHPCControllerLauncher.job_file_name = u'ipcontroller_job.xml'
117 # c.WindowsHPCControllerLauncher.project = 'MyProject'
118
119
120 #-----------------------------------------------------------------------------
82 # Engine launcher configuration
121 # Engine launcher configuration
83 #-----------------------------------------------------------------------------
122 #-----------------------------------------------------------------------------
84
123
85 # Command line argument passed to the engines.
124 # Command line argument passed to the engines.
86 # c.LocalEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
125 # c.LocalEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
87
126
88 # The mpiexec/mpirun command to use in started the controller.
127 # The mpiexec/mpirun command to use in started the controller.
89 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
128 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
90
129
91 # Additional arguments to pass to the actual mpiexec command.
130 # Additional arguments to pass to the actual mpiexec command.
92 # c.MPIExecEngineSetLauncher.mpi_args = []
131 # c.MPIExecEngineSetLauncher.mpi_args = []
93
132
94 # Command line argument passed to the engines.
133 # Command line argument passed to the engines.
95 # c.MPIExecEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
134 # c.MPIExecEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
96
135
97 # The default number of engines to start if not given elsewhere.
136 # The default number of engines to start if not given elsewhere.
98 # c.MPIExecEngineSetLauncher.n = 1
137 # c.MPIExecEngineSetLauncher.n = 1
99
138
100 # The command line program to use to submit a PBS job.
139 # The command line program to use to submit a PBS job.
101 # c.PBSEngineSetLauncher.submit_command = 'qsub'
140 # c.PBSEngineSetLauncher.submit_command = 'qsub'
102
141
103 # The command line program to use to delete a PBS job.
142 # The command line program to use to delete a PBS job.
104 # c.PBSEngineSetLauncher.delete_command = 'qdel'
143 # c.PBSEngineSetLauncher.delete_command = 'qdel'
105
144
106 # A regular expression that takes the output of qsub and find the job id.
145 # A regular expression that takes the output of qsub and find the job id.
107 # c.PBSEngineSetLauncher.job_id_regexp = '\d+'
146 # c.PBSEngineSetLauncher.job_id_regexp = '\d+'
108
147
109 # The batch submission script used to start the engines. This is where
148 # The batch submission script used to start the engines. This is where
110 # environment variables would be setup, etc. This string is interpolated using
149 # environment variables would be setup, etc. This string is interpolated using
111 # the Itpl module in IPython.external. Basically, you can use ${n} for the
150 # the Itpl module in IPython.external. Basically, you can use ${n} for the
112 # number of engine, ${profile} or the engine profile and ${cluster_dir}
151 # number of engine, ${profile} or the engine profile and ${cluster_dir}
113 # for the cluster_dir.
152 # for the cluster_dir.
114 # c.PBSEngineSetLauncher.batch_template = """"""
153 # c.PBSEngineSetLauncher.batch_template = """"""
115
154
116 # The name of the instantiated batch script that will actually be used to
155 # The name of the instantiated batch script that will actually be used to
117 # submit the job. This will be written to the cluster directory.
156 # submit the job. This will be written to the cluster directory.
118 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script_engines'
157 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script_engines'
119
158
120 #-----------------------------------------------------------------------------
159 #-----------------------------------------------------------------------------
121 # Base launcher configuration
160 # Base launcher configuration
122 #-----------------------------------------------------------------------------
161 #-----------------------------------------------------------------------------
123
162
124 # The various launchers are organized into an inheritance hierarchy.
163 # The various launchers are organized into an inheritance hierarchy.
125 # The configurations can also be iherited and the following attributes
164 # The configurations can also be iherited and the following attributes
126 # allow you to configure the base classes.
165 # allow you to configure the base classes.
127
166
128 # c.MPIExecLauncher.mpi_cmd = ['mpiexec']
167 # c.MPIExecLauncher.mpi_cmd = ['mpiexec']
129 # c.MPIExecLauncher.mpi_args = []
168 # c.MPIExecLauncher.mpi_args = []
130 # c.MPIExecLauncher.program = []
169 # c.MPIExecLauncher.program = []
131 # c.MPIExecLauncher.program_args = []
170 # c.MPIExecLauncher.program_args = []
132 # c.MPIExecLauncher.n = 1
171 # c.MPIExecLauncher.n = 1
133
172
134 # c.SSHLauncher.ssh_cmd = ['ssh']
173 # c.SSHLauncher.ssh_cmd = ['ssh']
135 # c.SSHLauncher.ssh_args = []
174 # c.SSHLauncher.ssh_args = []
136 # c.SSHLauncher.program = []
175 # c.SSHLauncher.program = []
137 # s.SSHLauncher.program_args = []
176 # s.SSHLauncher.program_args = []
138 # c.SSHLauncher.hostname = ''
177 # c.SSHLauncher.hostname = ''
139 # c.SSHLauncher.user = os.environ['USER']
178 # c.SSHLauncher.user = os.environ['USER']
140
179
141 # c.BatchSystemLauncher.submit_command
180 # c.BatchSystemLauncher.submit_command
142 # c.BatchSystemLauncher.delete_command
181 # c.BatchSystemLauncher.delete_command
143 # c.BatchSystemLauncher.job_id_regexp
182 # c.BatchSystemLauncher.job_id_regexp
144 # c.BatchSystemLauncher.batch_template
183 # c.BatchSystemLauncher.batch_template
145 # c.BatchSystemLauncher.batch_file_name
184 # c.BatchSystemLauncher.batch_file_name
146
185
147 # c.PBSLauncher.submit_command = 'qsub'
186 # c.PBSLauncher.submit_command = 'qsub'
148 # c.PBSLauncher.delete_command = 'qdel'
187 # c.PBSLauncher.delete_command = 'qdel'
149 # c.PBSLauncher.job_id_regexp = '\d+'
188 # c.PBSLauncher.job_id_regexp = '\d+'
150 # c.PBSLauncher.batch_template = """"""
189 # c.PBSLauncher.batch_template = """"""
151 # c.PBSLauncher.batch_file_name = u'pbs_batch_script'
190 # c.PBSLauncher.batch_file_name = u'pbs_batch_script'
152
191
153
192
154
193
155
194
@@ -1,700 +1,808 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching processing asynchronously.
4 Facilities for launching processing asynchronously.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import re
19 import re
20 import sys
20 import sys
21
21
22 from IPython.core.component import Component
22 from IPython.core.component import Component
23 from IPython.external import Itpl
23 from IPython.external import Itpl
24 from IPython.utils.traitlets import Str, Int, List, Unicode
24 from IPython.utils.traitlets import Str, Int, List, Unicode, Enum
25 from IPython.utils.platutils import find_cmd
25 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
26 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
27 from IPython.kernel.winhpcjob import (
28 WinHPCJob, WinHPCTask,
29 IPControllerTask, IPEngineTask
30 )
26
31
27 from twisted.internet import reactor, defer
32 from twisted.internet import reactor, defer
28 from twisted.internet.defer import inlineCallbacks
33 from twisted.internet.defer import inlineCallbacks
29 from twisted.internet.protocol import ProcessProtocol
34 from twisted.internet.protocol import ProcessProtocol
30 from twisted.internet.utils import getProcessOutput
35 from twisted.internet.utils import getProcessOutput
31 from twisted.internet.error import ProcessDone, ProcessTerminated
36 from twisted.internet.error import ProcessDone, ProcessTerminated
32 from twisted.python import log
37 from twisted.python import log
33 from twisted.python.failure import Failure
38 from twisted.python.failure import Failure
34
39
35 #-----------------------------------------------------------------------------
40 #-----------------------------------------------------------------------------
36 # Generic launchers
41 # Generic launchers
37 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
38
43
39
44
40 class LauncherError(Exception):
45 class LauncherError(Exception):
41 pass
46 pass
42
47
43
48
44 class ProcessStateError(LauncherError):
49 class ProcessStateError(LauncherError):
45 pass
50 pass
46
51
47
52
48 class UnknownStatus(LauncherError):
53 class UnknownStatus(LauncherError):
49 pass
54 pass
50
55
51
56
52 class BaseLauncher(Component):
57 class BaseLauncher(Component):
53 """An asbtraction for starting, stopping and signaling a process."""
58 """An asbtraction for starting, stopping and signaling a process."""
54
59
55 # A directory for files related to the process. But, we don't cd to
60 # A directory for files related to the process. But, we don't cd to
56 # this directory,
61 # this directory,
57 working_dir = Unicode(u'')
62 working_dir = Unicode(u'')
58
63
59 def __init__(self, working_dir, parent=None, name=None, config=None):
64 def __init__(self, working_dir, parent=None, name=None, config=None):
60 super(BaseLauncher, self).__init__(parent, name, config)
65 super(BaseLauncher, self).__init__(parent, name, config)
61 self.working_dir = working_dir
66 self.working_dir = working_dir
62 self.state = 'before' # can be before, running, after
67 self.state = 'before' # can be before, running, after
63 self.stop_deferreds = []
68 self.stop_deferreds = []
64 self.start_data = None
69 self.start_data = None
65 self.stop_data = None
70 self.stop_data = None
66
71
67 @property
72 @property
68 def args(self):
73 def args(self):
69 """A list of cmd and args that will be used to start the process.
74 """A list of cmd and args that will be used to start the process.
70
75
71 This is what is passed to :func:`spawnProcess` and the first element
76 This is what is passed to :func:`spawnProcess` and the first element
72 will be the process name.
77 will be the process name.
73 """
78 """
74 return self.find_args()
79 return self.find_args()
75
80
76 def find_args(self):
81 def find_args(self):
77 """The ``.args`` property calls this to find the args list.
82 """The ``.args`` property calls this to find the args list.
78
83
79 Subcommand should implement this to construct the cmd and args.
84 Subcommand should implement this to construct the cmd and args.
80 """
85 """
81 raise NotImplementedError('find_args must be implemented in a subclass')
86 raise NotImplementedError('find_args must be implemented in a subclass')
82
87
83 @property
88 @property
84 def arg_str(self):
89 def arg_str(self):
85 """The string form of the program arguments."""
90 """The string form of the program arguments."""
86 return ' '.join(self.args)
91 return ' '.join(self.args)
87
92
88 @property
93 @property
89 def running(self):
94 def running(self):
90 """Am I running."""
95 """Am I running."""
91 if self.state == 'running':
96 if self.state == 'running':
92 return True
97 return True
93 else:
98 else:
94 return False
99 return False
95
100
96 def start(self):
101 def start(self):
97 """Start the process.
102 """Start the process.
98
103
99 This must return a deferred that fires with information about the
104 This must return a deferred that fires with information about the
100 process starting (like a pid, job id, etc.).
105 process starting (like a pid, job id, etc.).
101 """
106 """
102 return defer.fail(
107 return defer.fail(
103 Failure(NotImplementedError(
108 Failure(NotImplementedError(
104 'start must be implemented in a subclass')
109 'start must be implemented in a subclass')
105 )
110 )
106 )
111 )
107
112
108 def stop(self):
113 def stop(self):
109 """Stop the process and notify observers of stopping.
114 """Stop the process and notify observers of stopping.
110
115
111 This must return a deferred that fires with information about the
116 This must return a deferred that fires with information about the
112 processing stopping, like errors that occur while the process is
117 processing stopping, like errors that occur while the process is
113 attempting to be shut down. This deferred won't fire when the process
118 attempting to be shut down. This deferred won't fire when the process
114 actually stops. To observe the actual process stopping, see
119 actually stops. To observe the actual process stopping, see
115 :func:`observe_stop`.
120 :func:`observe_stop`.
116 """
121 """
117 return defer.fail(
122 return defer.fail(
118 Failure(NotImplementedError(
123 Failure(NotImplementedError(
119 'stop must be implemented in a subclass')
124 'stop must be implemented in a subclass')
120 )
125 )
121 )
126 )
122
127
123 def observe_stop(self):
128 def observe_stop(self):
124 """Get a deferred that will fire when the process stops.
129 """Get a deferred that will fire when the process stops.
125
130
126 The deferred will fire with data that contains information about
131 The deferred will fire with data that contains information about
127 the exit status of the process.
132 the exit status of the process.
128 """
133 """
129 if self.state=='after':
134 if self.state=='after':
130 return defer.succeed(self.stop_data)
135 return defer.succeed(self.stop_data)
131 else:
136 else:
132 d = defer.Deferred()
137 d = defer.Deferred()
133 self.stop_deferreds.append(d)
138 self.stop_deferreds.append(d)
134 return d
139 return d
135
140
136 def notify_start(self, data):
141 def notify_start(self, data):
137 """Call this to trigger startup actions.
142 """Call this to trigger startup actions.
138
143
139 This logs the process startup and sets the state to 'running'. It is
144 This logs the process startup and sets the state to 'running'. It is
140 a pass-through so it can be used as a callback.
145 a pass-through so it can be used as a callback.
141 """
146 """
142
147
143 log.msg('Process %r started: %r' % (self.args[0], data))
148 log.msg('Process %r started: %r' % (self.args[0], data))
144 self.start_data = data
149 self.start_data = data
145 self.state = 'running'
150 self.state = 'running'
146 return data
151 return data
147
152
148 def notify_stop(self, data):
153 def notify_stop(self, data):
149 """Call this to trigger process stop actions.
154 """Call this to trigger process stop actions.
150
155
151 This logs the process stopping and sets the state to 'after'. Call
156 This logs the process stopping and sets the state to 'after'. Call
152 this to trigger all the deferreds from :func:`observe_stop`."""
157 this to trigger all the deferreds from :func:`observe_stop`."""
153
158
154 log.msg('Process %r stopped: %r' % (self.args[0], data))
159 log.msg('Process %r stopped: %r' % (self.args[0], data))
155 self.stop_data = data
160 self.stop_data = data
156 self.state = 'after'
161 self.state = 'after'
157 for i in range(len(self.stop_deferreds)):
162 for i in range(len(self.stop_deferreds)):
158 d = self.stop_deferreds.pop()
163 d = self.stop_deferreds.pop()
159 d.callback(data)
164 d.callback(data)
160 return data
165 return data
161
166
162 def signal(self, sig):
167 def signal(self, sig):
163 """Signal the process.
168 """Signal the process.
164
169
165 Return a semi-meaningless deferred after signaling the process.
170 Return a semi-meaningless deferred after signaling the process.
166
171
167 Parameters
172 Parameters
168 ----------
173 ----------
169 sig : str or int
174 sig : str or int
170 'KILL', 'INT', etc., or any signal number
175 'KILL', 'INT', etc., or any signal number
171 """
176 """
172 return defer.fail(
177 return defer.fail(
173 Failure(NotImplementedError(
178 Failure(NotImplementedError(
174 'signal must be implemented in a subclass')
179 'signal must be implemented in a subclass')
175 )
180 )
176 )
181 )
177
182
178
183
179 class LocalProcessLauncherProtocol(ProcessProtocol):
184 class LocalProcessLauncherProtocol(ProcessProtocol):
180 """A ProcessProtocol to go with the LocalProcessLauncher."""
185 """A ProcessProtocol to go with the LocalProcessLauncher."""
181
186
182 def __init__(self, process_launcher):
187 def __init__(self, process_launcher):
183 self.process_launcher = process_launcher
188 self.process_launcher = process_launcher
184 self.pid = None
189 self.pid = None
185
190
186 def connectionMade(self):
191 def connectionMade(self):
187 self.pid = self.transport.pid
192 self.pid = self.transport.pid
188 self.process_launcher.notify_start(self.transport.pid)
193 self.process_launcher.notify_start(self.transport.pid)
189
194
190 def processEnded(self, status):
195 def processEnded(self, status):
191 value = status.value
196 value = status.value
192 if isinstance(value, ProcessDone):
197 if isinstance(value, ProcessDone):
193 self.process_launcher.notify_stop(
198 self.process_launcher.notify_stop(
194 {'exit_code':0,
199 {'exit_code':0,
195 'signal':None,
200 'signal':None,
196 'status':None,
201 'status':None,
197 'pid':self.pid
202 'pid':self.pid
198 }
203 }
199 )
204 )
200 elif isinstance(value, ProcessTerminated):
205 elif isinstance(value, ProcessTerminated):
201 self.process_launcher.notify_stop(
206 self.process_launcher.notify_stop(
202 {'exit_code':value.exitCode,
207 {'exit_code':value.exitCode,
203 'signal':value.signal,
208 'signal':value.signal,
204 'status':value.status,
209 'status':value.status,
205 'pid':self.pid
210 'pid':self.pid
206 }
211 }
207 )
212 )
208 else:
213 else:
209 raise UnknownStatus("Unknown exit status, this is probably a "
214 raise UnknownStatus("Unknown exit status, this is probably a "
210 "bug in Twisted")
215 "bug in Twisted")
211
216
212 def outReceived(self, data):
217 def outReceived(self, data):
213 log.msg(data)
218 log.msg(data)
214
219
215 def errReceived(self, data):
220 def errReceived(self, data):
216 log.err(data)
221 log.err(data)
217
222
218
223
219 class LocalProcessLauncher(BaseLauncher):
224 class LocalProcessLauncher(BaseLauncher):
220 """Start and stop an external process in an asynchronous manner."""
225 """Start and stop an external process in an asynchronous manner."""
221
226
222 # This is used to to construct self.args, which is passed to
227 # This is used to to construct self.args, which is passed to
223 # spawnProcess.
228 # spawnProcess.
224 cmd_and_args = List([])
229 cmd_and_args = List([])
225
230
226 def __init__(self, working_dir, parent=None, name=None, config=None):
231 def __init__(self, working_dir, parent=None, name=None, config=None):
227 super(LocalProcessLauncher, self).__init__(
232 super(LocalProcessLauncher, self).__init__(
228 working_dir, parent, name, config
233 working_dir, parent, name, config
229 )
234 )
230 self.process_protocol = None
235 self.process_protocol = None
231 self.start_deferred = None
236 self.start_deferred = None
232
237
233 def find_args(self):
238 def find_args(self):
234 return self.cmd_and_args
239 return self.cmd_and_args
235
240
236 def start(self):
241 def start(self):
237 if self.state == 'before':
242 if self.state == 'before':
238 self.process_protocol = LocalProcessLauncherProtocol(self)
243 self.process_protocol = LocalProcessLauncherProtocol(self)
239 self.start_deferred = defer.Deferred()
244 self.start_deferred = defer.Deferred()
240 self.process_transport = reactor.spawnProcess(
245 self.process_transport = reactor.spawnProcess(
241 self.process_protocol,
246 self.process_protocol,
242 str(self.args[0]),
247 str(self.args[0]),
243 [str(a) for a in self.args],
248 [str(a) for a in self.args],
244 env=os.environ
249 env=os.environ
245 )
250 )
246 return self.start_deferred
251 return self.start_deferred
247 else:
252 else:
248 s = 'The process was already started and has state: %r' % self.state
253 s = 'The process was already started and has state: %r' % self.state
249 return defer.fail(ProcessStateError(s))
254 return defer.fail(ProcessStateError(s))
250
255
251 def notify_start(self, data):
256 def notify_start(self, data):
252 super(LocalProcessLauncher, self).notify_start(data)
257 super(LocalProcessLauncher, self).notify_start(data)
253 self.start_deferred.callback(data)
258 self.start_deferred.callback(data)
254
259
255 def stop(self):
260 def stop(self):
256 return self.interrupt_then_kill()
261 return self.interrupt_then_kill()
257
262
258 @make_deferred
263 @make_deferred
259 def signal(self, sig):
264 def signal(self, sig):
260 if self.state == 'running':
265 if self.state == 'running':
261 self.process_transport.signalProcess(sig)
266 self.process_transport.signalProcess(sig)
262
267
263 @inlineCallbacks
268 @inlineCallbacks
264 def interrupt_then_kill(self, delay=2.0):
269 def interrupt_then_kill(self, delay=2.0):
265 """Send INT, wait a delay and then send KILL."""
270 """Send INT, wait a delay and then send KILL."""
266 yield self.signal('INT')
271 yield self.signal('INT')
267 yield sleep_deferred(delay)
272 yield sleep_deferred(delay)
268 yield self.signal('KILL')
273 yield self.signal('KILL')
269
274
270
275
271 class MPIExecLauncher(LocalProcessLauncher):
276 class MPIExecLauncher(LocalProcessLauncher):
272 """Launch an external process using mpiexec."""
277 """Launch an external process using mpiexec."""
273
278
274 # The mpiexec command to use in starting the process.
279 # The mpiexec command to use in starting the process.
275 mpi_cmd = List(['mpiexec'], config=True)
280 mpi_cmd = List(['mpiexec'], config=True)
276 # The command line arguments to pass to mpiexec.
281 # The command line arguments to pass to mpiexec.
277 mpi_args = List([], config=True)
282 mpi_args = List([], config=True)
278 # The program to start using mpiexec.
283 # The program to start using mpiexec.
279 program = List(['date'], config=True)
284 program = List(['date'], config=True)
280 # The command line argument to the program.
285 # The command line argument to the program.
281 program_args = List([], config=True)
286 program_args = List([], config=True)
282 # The number of instances of the program to start.
287 # The number of instances of the program to start.
283 n = Int(1, config=True)
288 n = Int(1, config=True)
284
289
285 def find_args(self):
290 def find_args(self):
286 """Build self.args using all the fields."""
291 """Build self.args using all the fields."""
287 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
292 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
288 self.program + self.program_args
293 self.program + self.program_args
289
294
290 def start(self, n):
295 def start(self, n):
291 """Start n instances of the program using mpiexec."""
296 """Start n instances of the program using mpiexec."""
292 self.n = n
297 self.n = n
293 return super(MPIExecLauncher, self).start()
298 return super(MPIExecLauncher, self).start()
294
299
295
300
296 class SSHLauncher(BaseLauncher):
301 class SSHLauncher(BaseLauncher):
297 """A minimal launcher for ssh.
302 """A minimal launcher for ssh.
298
303
299 To be useful this will probably have to be extended to use the ``sshx``
304 To be useful this will probably have to be extended to use the ``sshx``
300 idea for environment variables. There could be other things this needs
305 idea for environment variables. There could be other things this needs
301 as well.
306 as well.
302 """
307 """
303
308
304 ssh_cmd = List(['ssh'], config=True)
309 ssh_cmd = List(['ssh'], config=True)
305 ssh_args = List([], config=True)
310 ssh_args = List([], config=True)
306 program = List(['date'], config=True)
311 program = List(['date'], config=True)
307 program_args = List([], config=True)
312 program_args = List([], config=True)
308 hostname = Str('', config=True)
313 hostname = Str('', config=True)
309 user = Str('', config=True)
314 user = Str('', config=True)
310 location = Str('')
315 location = Str('')
311
316
312 def _hostname_changed(self, name, old, new):
317 def _hostname_changed(self, name, old, new):
313 self.location = '%s@%s' % (self.user, new)
318 self.location = '%s@%s' % (self.user, new)
314
319
315 def _user_changed(self, name, old, new):
320 def _user_changed(self, name, old, new):
316 self.location = '%s@%s' % (new, self.hostname)
321 self.location = '%s@%s' % (new, self.hostname)
317
322
318 def find_args(self):
323 def find_args(self):
319 return self.ssh_cmd + self.ssh_args + [self.location] + \
324 return self.ssh_cmd + self.ssh_args + [self.location] + \
320 self.program + self.program_args
325 self.program + self.program_args
321
326
322 def start(self, n, hostname=None, user=None):
327 def start(self, n, hostname=None, user=None):
323 if hostname is not None:
328 if hostname is not None:
324 self.hostname = hostname
329 self.hostname = hostname
325 if user is not None:
330 if user is not None:
326 self.user = user
331 self.user = user
327 return super(SSHLauncher, self).start()
332 return super(SSHLauncher, self).start()
328
333
329
334
330 class WindowsHPCLauncher(BaseLauncher):
335 class WindowsHPCLauncher(BaseLauncher):
331 pass
332
336
337 # A regular expression used to get the job id from the output of the
338 # submit_command.
339 job_id_regexp = Str('\d+', config=True)
340 # The filename of the instantiated job script.
341 job_file_name = Unicode(u'ipython_job.xml', config=True)
342 # The full path to the instantiated job script. This gets made dynamically
343 # by combining the working_dir with the job_file_name.
344 job_file = Unicode(u'')
345 # The hostname of the scheduler to submit the job to
346 scheduler = Str('HEADNODE', config=True)
347 username = Str(os.environ.get('USERNAME', ''), config=True)
348 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
349 default_value='Highest', config=True)
350 requested_nodes = Str('', config=True)
351 project = Str('MyProject', config=True)
352 job_cmd = Str(find_cmd('job'), config=True)
353
354 def __init__(self, working_dir, parent=None, name=None, config=None):
355 super(WindowsHPCLauncher, self).__init__(
356 working_dir, parent, name, config
357 )
358 self.job_file = os.path.join(self.working_dir, self.job_file_name)
359
360 def write_job_file(self, n):
361 raise NotImplementedError("Implement write_job_file in a subclass.")
362
363 def find_args(self):
364 return ['job.exe']
365
366 def parse_job_id(self, output):
367 """Take the output of the submit command and return the job id."""
368 m = re.search(self.job_id_regexp, output)
369 if m is not None:
370 job_id = m.group()
371 else:
372 raise LauncherError("Job id couldn't be determined: %s" % output)
373 self.job_id = job_id
374 log.msg('Job started with job id: %r' % job_id)
375 return job_id
376
377 @inlineCallbacks
378 def start(self, n):
379 """Start n copies of the process using the Win HPC job scheduler."""
380 self.write_job_file(n)
381 args = [
382 'submit',
383 '/jobfile:%s' % self.job_file,
384 '/scheduler:%s' % self.scheduler
385 ]
386 log.msg("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
387 output = yield getProcessOutput(self.job_cmd,
388 args,
389 env=os.environ,
390 path=self.working_dir
391 )
392 job_id = self.parse_job_id(output)
393 self.notify_start(job_id)
394 defer.returnValue(job_id)
395
396 @inlineCallbacks
397 def stop(self):
398 args = [
399 'cancel',
400 self.job_id,
401 '/scheduler:%s' % self.scheduler
402 ]
403 log.msg("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
404 try:
405 output = yield getProcessOutput(self.job_cmd,
406 args,
407 env=os.environ,
408 path=self.working_dir
409 )
410 except:
411 output = 'The job already appears to be stoppped: %r' % self.job_id
412 self.notify_stop(output) # Pass the output of the kill cmd
413 defer.returnValue(output)
414
333
415
334 class BatchSystemLauncher(BaseLauncher):
416 class BatchSystemLauncher(BaseLauncher):
335 """Launch an external process using a batch system.
417 """Launch an external process using a batch system.
336
418
337 This class is designed to work with UNIX batch systems like PBS, LSF,
419 This class is designed to work with UNIX batch systems like PBS, LSF,
338 GridEngine, etc. The overall model is that there are different commands
420 GridEngine, etc. The overall model is that there are different commands
339 like qsub, qdel, etc. that handle the starting and stopping of the process.
421 like qsub, qdel, etc. that handle the starting and stopping of the process.
340
422
341 This class also has the notion of a batch script. The ``batch_template``
423 This class also has the notion of a batch script. The ``batch_template``
342 attribute can be set to a string that is a template for the batch script.
424 attribute can be set to a string that is a template for the batch script.
343 This template is instantiated using Itpl. Thus the template can use
425 This template is instantiated using Itpl. Thus the template can use
344 ${n} fot the number of instances. Subclasses can add additional variables
426 ${n} fot the number of instances. Subclasses can add additional variables
345 to the template dict.
427 to the template dict.
346 """
428 """
347
429
348 # Subclasses must fill these in. See PBSEngineSet
430 # Subclasses must fill these in. See PBSEngineSet
349 # The name of the command line program used to submit jobs.
431 # The name of the command line program used to submit jobs.
350 submit_command = Str('', config=True)
432 submit_command = Str('', config=True)
351 # The name of the command line program used to delete jobs.
433 # The name of the command line program used to delete jobs.
352 delete_command = Str('', config=True)
434 delete_command = Str('', config=True)
353 # A regular expression used to get the job id from the output of the
435 # A regular expression used to get the job id from the output of the
354 # submit_command.
436 # submit_command.
355 job_id_regexp = Str('', config=True)
437 job_id_regexp = Str('', config=True)
356 # The string that is the batch script template itself.
438 # The string that is the batch script template itself.
357 batch_template = Str('', config=True)
439 batch_template = Str('', config=True)
358 # The filename of the instantiated batch script.
440 # The filename of the instantiated batch script.
359 batch_file_name = Unicode(u'batch_script', config=True)
441 batch_file_name = Unicode(u'batch_script', config=True)
360 # The full path to the instantiated batch script.
442 # The full path to the instantiated batch script.
361 batch_file = Unicode(u'')
443 batch_file = Unicode(u'')
362
444
363 def __init__(self, working_dir, parent=None, name=None, config=None):
445 def __init__(self, working_dir, parent=None, name=None, config=None):
364 super(BatchSystemLauncher, self).__init__(
446 super(BatchSystemLauncher, self).__init__(
365 working_dir, parent, name, config
447 working_dir, parent, name, config
366 )
448 )
367 self.batch_file = os.path.join(self.working_dir, self.batch_file_name)
449 self.batch_file = os.path.join(self.working_dir, self.batch_file_name)
368 self.context = {}
450 self.context = {}
369
451
370 def parse_job_id(self, output):
452 def parse_job_id(self, output):
371 """Take the output of the submit command and return the job id."""
453 """Take the output of the submit command and return the job id."""
372 m = re.match(self.job_id_regexp, output)
454 m = re.match(self.job_id_regexp, output)
373 if m is not None:
455 if m is not None:
374 job_id = m.group()
456 job_id = m.group()
375 else:
457 else:
376 raise LauncherError("Job id couldn't be determined: %s" % output)
458 raise LauncherError("Job id couldn't be determined: %s" % output)
377 self.job_id = job_id
459 self.job_id = job_id
378 log.msg('Job started with job id: %r' % job_id)
460 log.msg('Job started with job id: %r' % job_id)
379 return job_id
461 return job_id
380
462
381 def write_batch_script(self, n):
463 def write_batch_script(self, n):
382 """Instantiate and write the batch script to the working_dir."""
464 """Instantiate and write the batch script to the working_dir."""
383 self.context['n'] = n
465 self.context['n'] = n
384 script_as_string = Itpl.itplns(self.batch_template, self.context)
466 script_as_string = Itpl.itplns(self.batch_template, self.context)
385 log.msg('Writing instantiated batch script: %s' % self.batch_file)
467 log.msg('Writing instantiated batch script: %s' % self.batch_file)
386 f = open(self.batch_file, 'w')
468 f = open(self.batch_file, 'w')
387 f.write(script_as_string)
469 f.write(script_as_string)
388 f.close()
470 f.close()
389
471
390 @inlineCallbacks
472 @inlineCallbacks
391 def start(self, n):
473 def start(self, n):
392 """Start n copies of the process using a batch system."""
474 """Start n copies of the process using a batch system."""
393 self.write_batch_script(n)
475 self.write_batch_script(n)
394 output = yield getProcessOutput(self.submit_command,
476 output = yield getProcessOutput(self.submit_command,
395 [self.batch_file], env=os.environ)
477 [self.batch_file], env=os.environ)
396 job_id = self.parse_job_id(output)
478 job_id = self.parse_job_id(output)
397 self.notify_start(job_id)
479 self.notify_start(job_id)
398 defer.returnValue(job_id)
480 defer.returnValue(job_id)
399
481
400 @inlineCallbacks
482 @inlineCallbacks
401 def stop(self):
483 def stop(self):
402 output = yield getProcessOutput(self.delete_command,
484 output = yield getProcessOutput(self.delete_command,
403 [self.job_id], env=os.environ
485 [self.job_id], env=os.environ
404 )
486 )
405 self.notify_stop(output) # Pass the output of the kill cmd
487 self.notify_stop(output) # Pass the output of the kill cmd
406 defer.returnValue(output)
488 defer.returnValue(output)
407
489
408
490
409 class PBSLauncher(BatchSystemLauncher):
491 class PBSLauncher(BatchSystemLauncher):
410 """A BatchSystemLauncher subclass for PBS."""
492 """A BatchSystemLauncher subclass for PBS."""
411
493
412 submit_command = Str('qsub', config=True)
494 submit_command = Str('qsub', config=True)
413 delete_command = Str('qdel', config=True)
495 delete_command = Str('qdel', config=True)
414 job_id_regexp = Str('\d+', config=True)
496 job_id_regexp = Str('\d+', config=True)
415 batch_template = Str('', config=True)
497 batch_template = Str('', config=True)
416 batch_file_name = Unicode(u'pbs_batch_script', config=True)
498 batch_file_name = Unicode(u'pbs_batch_script', config=True)
417 batch_file = Unicode(u'')
499 batch_file = Unicode(u'')
418
500
419
501
420 #-----------------------------------------------------------------------------
502 #-----------------------------------------------------------------------------
421 # Controller launchers
503 # Controller launchers
422 #-----------------------------------------------------------------------------
504 #-----------------------------------------------------------------------------
423
505
424 def find_controller_cmd():
506 def find_controller_cmd():
425 """Find the command line ipcontroller program in a cross platform way."""
507 """Find the command line ipcontroller program in a cross platform way."""
426 if sys.platform == 'win32':
508 if sys.platform == 'win32':
427 # This logic is needed because the ipcontroller script doesn't
509 # This logic is needed because the ipcontroller script doesn't
428 # always get installed in the same way or in the same location.
510 # always get installed in the same way or in the same location.
429 from IPython.kernel import ipcontrollerapp
511 from IPython.kernel import ipcontrollerapp
430 script_location = ipcontrollerapp.__file__.replace('.pyc', '.py')
512 script_location = ipcontrollerapp.__file__.replace('.pyc', '.py')
431 # The -u option here turns on unbuffered output, which is required
513 # The -u option here turns on unbuffered output, which is required
432 # on Win32 to prevent wierd conflict and problems with Twisted.
514 # on Win32 to prevent wierd conflict and problems with Twisted.
433 # Also, use sys.executable to make sure we are picking up the
515 # Also, use sys.executable to make sure we are picking up the
434 # right python exe.
516 # right python exe.
435 cmd = [sys.executable, '-u', script_location]
517 cmd = [sys.executable, '-u', script_location]
436 else:
518 else:
437 # ipcontroller has to be on the PATH in this case.
519 # ipcontroller has to be on the PATH in this case.
438 cmd = ['ipcontroller']
520 cmd = ['ipcontroller']
439 return cmd
521 return cmd
440
522
441
523
442 class LocalControllerLauncher(LocalProcessLauncher):
524 class LocalControllerLauncher(LocalProcessLauncher):
443 """Launch a controller as a regular external process."""
525 """Launch a controller as a regular external process."""
444
526
445 controller_cmd = List(find_controller_cmd(), config=True)
527 controller_cmd = List(find_controller_cmd(), config=True)
446 # Command line arguments to ipcontroller.
528 # Command line arguments to ipcontroller.
447 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
529 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
448
530
449 def find_args(self):
531 def find_args(self):
450 return self.controller_cmd + self.controller_args
532 return self.controller_cmd + self.controller_args
451
533
452 def start(self, profile=None, cluster_dir=None):
534 def start(self, profile=None, cluster_dir=None):
453 """Start the controller by profile or cluster_dir."""
535 """Start the controller by profile or cluster_dir."""
454 if cluster_dir is not None:
536 if cluster_dir is not None:
455 self.controller_args.extend(['--cluster-dir', cluster_dir])
537 self.controller_args.extend(['--cluster-dir', cluster_dir])
456 if profile is not None:
538 if profile is not None:
457 self.controller_args.extend(['--profile', profile])
539 self.controller_args.extend(['--profile', profile])
458 log.msg("Starting LocalControllerLauncher: %r" % self.args)
540 log.msg("Starting LocalControllerLauncher: %r" % self.args)
459 return super(LocalControllerLauncher, self).start()
541 return super(LocalControllerLauncher, self).start()
460
542
461
543
462 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
544 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
463 pass
545
546 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
547 extra_args = List([],config=False)
548
549 def write_job_file(self, n):
550 job = WinHPCJob(self)
551 job.job_name = "IPController"
552 job.username = self.username
553 job.priority = self.priority
554 job.requested_nodes = self.requested_nodes
555 job.project = self.project
556
557 t = IPControllerTask(self)
558 t.work_directory = self.working_dir
559 # Add the --profile and --cluster-dir args from start.
560 t.controller_args.extend(self.extra_args)
561 job.add_task(t)
562 log.msg("Writing job description file: %s" % self.job_file)
563 job.write(self.job_file)
564
565 def start(self, profile=None, cluster_dir=None):
566 """Start the controller by profile or cluster_dir."""
567 if cluster_dir is not None:
568 self.extra_args = ['--cluster-dir', cluster_dir]
569 if profile is not None:
570 self.extra_args = ['--profile', profile]
571 return super(WindowsHPCControllerLauncher, self).start(1)
464
572
465
573
466 class MPIExecControllerLauncher(MPIExecLauncher):
574 class MPIExecControllerLauncher(MPIExecLauncher):
467 """Launch a controller using mpiexec."""
575 """Launch a controller using mpiexec."""
468
576
469 controller_cmd = List(find_controller_cmd(), config=True)
577 controller_cmd = List(find_controller_cmd(), config=True)
470 # Command line arguments to ipcontroller.
578 # Command line arguments to ipcontroller.
471 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
579 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
472 n = Int(1, config=False)
580 n = Int(1, config=False)
473
581
474 def start(self, profile=None, cluster_dir=None):
582 def start(self, profile=None, cluster_dir=None):
475 """Start the controller by profile or cluster_dir."""
583 """Start the controller by profile or cluster_dir."""
476 if cluster_dir is not None:
584 if cluster_dir is not None:
477 self.controller_args.extend(['--cluster-dir', cluster_dir])
585 self.controller_args.extend(['--cluster-dir', cluster_dir])
478 if profile is not None:
586 if profile is not None:
479 self.controller_args.extend(['--profile', profile])
587 self.controller_args.extend(['--profile', profile])
480 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
588 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
481 return super(MPIExecControllerLauncher, self).start(1)
589 return super(MPIExecControllerLauncher, self).start(1)
482
590
483 def find_args(self):
591 def find_args(self):
484 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
592 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
485 self.controller_cmd + self.controller_args
593 self.controller_cmd + self.controller_args
486
594
487
595
488 class PBSControllerLauncher(PBSLauncher):
596 class PBSControllerLauncher(PBSLauncher):
489 """Launch a controller using PBS."""
597 """Launch a controller using PBS."""
490
598
491 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
599 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
492
600
493 def start(self, profile=None, cluster_dir=None):
601 def start(self, profile=None, cluster_dir=None):
494 """Start the controller by profile or cluster_dir."""
602 """Start the controller by profile or cluster_dir."""
495 # Here we save profile and cluster_dir in the context so they
603 # Here we save profile and cluster_dir in the context so they
496 # can be used in the batch script template as ${profile} and
604 # can be used in the batch script template as ${profile} and
497 # ${cluster_dir}
605 # ${cluster_dir}
498 if cluster_dir is not None:
606 if cluster_dir is not None:
499 self.context['cluster_dir'] = cluster_dir
607 self.context['cluster_dir'] = cluster_dir
500 if profile is not None:
608 if profile is not None:
501 self.context['profile'] = profile
609 self.context['profile'] = profile
502 log.msg("Starting PBSControllerLauncher: %r" % self.args)
610 log.msg("Starting PBSControllerLauncher: %r" % self.args)
503 return super(PBSControllerLauncher, self).start(1)
611 return super(PBSControllerLauncher, self).start(1)
504
612
505
613
506 class SSHControllerLauncher(SSHLauncher):
614 class SSHControllerLauncher(SSHLauncher):
507 pass
615 pass
508
616
509
617
510 #-----------------------------------------------------------------------------
618 #-----------------------------------------------------------------------------
511 # Engine launchers
619 # Engine launchers
512 #-----------------------------------------------------------------------------
620 #-----------------------------------------------------------------------------
513
621
514
622
515 def find_engine_cmd():
623 def find_engine_cmd():
516 """Find the command line ipengine program in a cross platform way."""
624 """Find the command line ipengine program in a cross platform way."""
517 if sys.platform == 'win32':
625 if sys.platform == 'win32':
518 # This logic is needed because the ipengine script doesn't
626 # This logic is needed because the ipengine script doesn't
519 # always get installed in the same way or in the same location.
627 # always get installed in the same way or in the same location.
520 from IPython.kernel import ipengineapp
628 from IPython.kernel import ipengineapp
521 script_location = ipengineapp.__file__.replace('.pyc', '.py')
629 script_location = ipengineapp.__file__.replace('.pyc', '.py')
522 # The -u option here turns on unbuffered output, which is required
630 # The -u option here turns on unbuffered output, which is required
523 # on Win32 to prevent wierd conflict and problems with Twisted.
631 # on Win32 to prevent wierd conflict and problems with Twisted.
524 # Also, use sys.executable to make sure we are picking up the
632 # Also, use sys.executable to make sure we are picking up the
525 # right python exe.
633 # right python exe.
526 cmd = [sys.executable, '-u', script_location]
634 cmd = [sys.executable, '-u', script_location]
527 else:
635 else:
528 # ipcontroller has to be on the PATH in this case.
636 # ipcontroller has to be on the PATH in this case.
529 cmd = ['ipengine']
637 cmd = ['ipengine']
530 return cmd
638 return cmd
531
639
532
640
533 class LocalEngineLauncher(LocalProcessLauncher):
641 class LocalEngineLauncher(LocalProcessLauncher):
534 """Launch a single engine as a regular externall process."""
642 """Launch a single engine as a regular externall process."""
535
643
536 engine_cmd = List(find_engine_cmd(), config=True)
644 engine_cmd = List(find_engine_cmd(), config=True)
537 # Command line arguments for ipengine.
645 # Command line arguments for ipengine.
538 engine_args = List(
646 engine_args = List(
539 ['--log-to-file','--log-level', '40'], config=True
647 ['--log-to-file','--log-level', '40'], config=True
540 )
648 )
541
649
542 def find_args(self):
650 def find_args(self):
543 return self.engine_cmd + self.engine_args
651 return self.engine_cmd + self.engine_args
544
652
545 def start(self, profile=None, cluster_dir=None):
653 def start(self, profile=None, cluster_dir=None):
546 """Start the engine by profile or cluster_dir."""
654 """Start the engine by profile or cluster_dir."""
547 if cluster_dir is not None:
655 if cluster_dir is not None:
548 self.engine_args.extend(['--cluster-dir', cluster_dir])
656 self.engine_args.extend(['--cluster-dir', cluster_dir])
549 if profile is not None:
657 if profile is not None:
550 self.engine_args.extend(['--profile', profile])
658 self.engine_args.extend(['--profile', profile])
551 return super(LocalEngineLauncher, self).start()
659 return super(LocalEngineLauncher, self).start()
552
660
553
661
554 class LocalEngineSetLauncher(BaseLauncher):
662 class LocalEngineSetLauncher(BaseLauncher):
555 """Launch a set of engines as regular external processes."""
663 """Launch a set of engines as regular external processes."""
556
664
557 # Command line arguments for ipengine.
665 # Command line arguments for ipengine.
558 engine_args = List(
666 engine_args = List(
559 ['--log-to-file','--log-level', '40'], config=True
667 ['--log-to-file','--log-level', '40'], config=True
560 )
668 )
561
669
562 def __init__(self, working_dir, parent=None, name=None, config=None):
670 def __init__(self, working_dir, parent=None, name=None, config=None):
563 super(LocalEngineSetLauncher, self).__init__(
671 super(LocalEngineSetLauncher, self).__init__(
564 working_dir, parent, name, config
672 working_dir, parent, name, config
565 )
673 )
566 self.launchers = []
674 self.launchers = []
567
675
568 def start(self, n, profile=None, cluster_dir=None):
676 def start(self, n, profile=None, cluster_dir=None):
569 """Start n engines by profile or cluster_dir."""
677 """Start n engines by profile or cluster_dir."""
570 dlist = []
678 dlist = []
571 for i in range(n):
679 for i in range(n):
572 el = LocalEngineLauncher(self.working_dir, self)
680 el = LocalEngineLauncher(self.working_dir, self)
573 # Copy the engine args over to each engine launcher.
681 # Copy the engine args over to each engine launcher.
574 import copy
682 import copy
575 el.engine_args = copy.deepcopy(self.engine_args)
683 el.engine_args = copy.deepcopy(self.engine_args)
576 d = el.start(profile, cluster_dir)
684 d = el.start(profile, cluster_dir)
577 if i==0:
685 if i==0:
578 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
686 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
579 self.launchers.append(el)
687 self.launchers.append(el)
580 dlist.append(d)
688 dlist.append(d)
581 # The consumeErrors here could be dangerous
689 # The consumeErrors here could be dangerous
582 dfinal = gatherBoth(dlist, consumeErrors=True)
690 dfinal = gatherBoth(dlist, consumeErrors=True)
583 dfinal.addCallback(self.notify_start)
691 dfinal.addCallback(self.notify_start)
584 return dfinal
692 return dfinal
585
693
586 def find_args(self):
694 def find_args(self):
587 return ['engine set']
695 return ['engine set']
588
696
589 def signal(self, sig):
697 def signal(self, sig):
590 dlist = []
698 dlist = []
591 for el in self.launchers:
699 for el in self.launchers:
592 d = el.signal(sig)
700 d = el.signal(sig)
593 dlist.append(d)
701 dlist.append(d)
594 dfinal = gatherBoth(dlist, consumeErrors=True)
702 dfinal = gatherBoth(dlist, consumeErrors=True)
595 return dfinal
703 return dfinal
596
704
597 def interrupt_then_kill(self, delay=1.0):
705 def interrupt_then_kill(self, delay=1.0):
598 dlist = []
706 dlist = []
599 for el in self.launchers:
707 for el in self.launchers:
600 d = el.interrupt_then_kill(delay)
708 d = el.interrupt_then_kill(delay)
601 dlist.append(d)
709 dlist.append(d)
602 dfinal = gatherBoth(dlist, consumeErrors=True)
710 dfinal = gatherBoth(dlist, consumeErrors=True)
603 return dfinal
711 return dfinal
604
712
605 def stop(self):
713 def stop(self):
606 return self.interrupt_then_kill()
714 return self.interrupt_then_kill()
607
715
608 def observe_stop(self):
716 def observe_stop(self):
609 dlist = [el.observe_stop() for el in self.launchers]
717 dlist = [el.observe_stop() for el in self.launchers]
610 dfinal = gatherBoth(dlist, consumeErrors=False)
718 dfinal = gatherBoth(dlist, consumeErrors=False)
611 dfinal.addCallback(self.notify_stop)
719 dfinal.addCallback(self.notify_stop)
612 return dfinal
720 return dfinal
613
721
614
722
615 class MPIExecEngineSetLauncher(MPIExecLauncher):
723 class MPIExecEngineSetLauncher(MPIExecLauncher):
616
724
617 engine_cmd = List(find_engine_cmd(), config=True)
725 engine_cmd = List(find_engine_cmd(), config=True)
618 # Command line arguments for ipengine.
726 # Command line arguments for ipengine.
619 engine_args = List(
727 engine_args = List(
620 ['--log-to-file','--log-level', '40'], config=True
728 ['--log-to-file','--log-level', '40'], config=True
621 )
729 )
622 n = Int(1, config=True)
730 n = Int(1, config=True)
623
731
624 def start(self, n, profile=None, cluster_dir=None):
732 def start(self, n, profile=None, cluster_dir=None):
625 """Start n engines by profile or cluster_dir."""
733 """Start n engines by profile or cluster_dir."""
626 if cluster_dir is not None:
734 if cluster_dir is not None:
627 self.engine_args.extend(['--cluster-dir', cluster_dir])
735 self.engine_args.extend(['--cluster-dir', cluster_dir])
628 if profile is not None:
736 if profile is not None:
629 self.engine_args.extend(['--profile', profile])
737 self.engine_args.extend(['--profile', profile])
630 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
738 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
631 return super(MPIExecEngineSetLauncher, self).start(n)
739 return super(MPIExecEngineSetLauncher, self).start(n)
632
740
633 def find_args(self):
741 def find_args(self):
634 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
742 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
635 self.engine_cmd + self.engine_args
743 self.engine_cmd + self.engine_args
636
744
637
745
638 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
746 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
639 pass
747 pass
640
748
641
749
642 class PBSEngineSetLauncher(PBSLauncher):
750 class PBSEngineSetLauncher(PBSLauncher):
643
751
644 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
752 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
645
753
646 def start(self, n, profile=None, cluster_dir=None):
754 def start(self, n, profile=None, cluster_dir=None):
647 """Start n engines by profile or cluster_dir."""
755 """Start n engines by profile or cluster_dir."""
648 if cluster_dir is not None:
756 if cluster_dir is not None:
649 self.program_args.extend(['--cluster-dir', cluster_dir])
757 self.program_args.extend(['--cluster-dir', cluster_dir])
650 if profile is not None:
758 if profile is not None:
651 self.program_args.extend(['-p', profile])
759 self.program_args.extend(['-p', profile])
652 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
760 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
653 return super(PBSEngineSetLauncher, self).start(n)
761 return super(PBSEngineSetLauncher, self).start(n)
654
762
655
763
656 class SSHEngineSetLauncher(BaseLauncher):
764 class SSHEngineSetLauncher(BaseLauncher):
657 pass
765 pass
658
766
659
767
660 #-----------------------------------------------------------------------------
768 #-----------------------------------------------------------------------------
661 # A launcher for ipcluster itself!
769 # A launcher for ipcluster itself!
662 #-----------------------------------------------------------------------------
770 #-----------------------------------------------------------------------------
663
771
664
772
665 def find_ipcluster_cmd():
773 def find_ipcluster_cmd():
666 """Find the command line ipcluster program in a cross platform way."""
774 """Find the command line ipcluster program in a cross platform way."""
667 if sys.platform == 'win32':
775 if sys.platform == 'win32':
668 # This logic is needed because the ipcluster script doesn't
776 # This logic is needed because the ipcluster script doesn't
669 # always get installed in the same way or in the same location.
777 # always get installed in the same way or in the same location.
670 from IPython.kernel import ipclusterapp
778 from IPython.kernel import ipclusterapp
671 script_location = ipclusterapp.__file__.replace('.pyc', '.py')
779 script_location = ipclusterapp.__file__.replace('.pyc', '.py')
672 # The -u option here turns on unbuffered output, which is required
780 # The -u option here turns on unbuffered output, which is required
673 # on Win32 to prevent wierd conflict and problems with Twisted.
781 # on Win32 to prevent wierd conflict and problems with Twisted.
674 # Also, use sys.executable to make sure we are picking up the
782 # Also, use sys.executable to make sure we are picking up the
675 # right python exe.
783 # right python exe.
676 cmd = [sys.executable, '-u', script_location]
784 cmd = [sys.executable, '-u', script_location]
677 else:
785 else:
678 # ipcontroller has to be on the PATH in this case.
786 # ipcontroller has to be on the PATH in this case.
679 cmd = ['ipcluster']
787 cmd = ['ipcluster']
680 return cmd
788 return cmd
681
789
682
790
683 class IPClusterLauncher(LocalProcessLauncher):
791 class IPClusterLauncher(LocalProcessLauncher):
684 """Launch the ipcluster program in an external process."""
792 """Launch the ipcluster program in an external process."""
685
793
686 ipcluster_cmd = List(find_ipcluster_cmd(), config=True)
794 ipcluster_cmd = List(find_ipcluster_cmd(), config=True)
687 # Command line arguments to pass to ipcluster.
795 # Command line arguments to pass to ipcluster.
688 ipcluster_args = List(
796 ipcluster_args = List(
689 ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True)
797 ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True)
690 ipcluster_subcommand = Str('start')
798 ipcluster_subcommand = Str('start')
691 ipcluster_n = Int(2)
799 ipcluster_n = Int(2)
692
800
693 def find_args(self):
801 def find_args(self):
694 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
802 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
695 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
803 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
696
804
697 def start(self):
805 def start(self):
698 log.msg("Starting ipcluster: %r" % self.args)
806 log.msg("Starting ipcluster: %r" % self.args)
699 return super(IPClusterLauncher, self).start()
807 return super(IPClusterLauncher, self).start()
700
808
@@ -1,229 +1,277 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Job and task components for writing .xml files that the Windows HPC Server
4 Job and task components for writing .xml files that the Windows HPC Server
5 2008 can use to start jobs.
5 2008 can use to start jobs.
6 """
6 """
7
7
8 #-----------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2008-2009 The IPython Development Team
9 # Copyright (C) 2008-2009 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
14
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 from __future__ import with_statement
19 from __future__ import with_statement
20
20
21 import os
21 import os
22 import re
22 import re
23 import uuid
23
24
24 from xml.etree import ElementTree as ET
25 from xml.etree import ElementTree as ET
25 from xml.dom import minidom
26 from xml.dom import minidom
26
27
27 from IPython.core.component import Component
28 from IPython.core.component import Component
28 from IPython.external import Itpl
29 from IPython.external import Itpl
29 from IPython.utils.traitlets import (
30 from IPython.utils.traitlets import (
30 Str, Int, List, Unicode, Instance,
31 Str, Int, List, Unicode, Instance,
31 Enum, Bool
32 Enum, Bool, CStr
32 )
33 )
33
34
34 #-----------------------------------------------------------------------------
35 #-----------------------------------------------------------------------------
35 # Job and Task Component
36 # Job and Task Component
36 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
37
38
38
39
39 def as_str(value):
40 def as_str(value):
40 if isinstance(value, str):
41 if isinstance(value, str):
41 return value
42 return value
42 elif isinstance(value, bool):
43 elif isinstance(value, bool):
43 if value:
44 if value:
44 return 'true'
45 return 'true'
45 else:
46 else:
46 return 'false'
47 return 'false'
47 elif isinstance(value, (int, float)):
48 elif isinstance(value, (int, float)):
48 return repr(value)
49 return repr(value)
49 else:
50 else:
50 return value
51 return value
51
52
52
53
53 def indent(elem, level=0):
54 def indent(elem, level=0):
54 i = "\n" + level*" "
55 i = "\n" + level*" "
55 if len(elem):
56 if len(elem):
56 if not elem.text or not elem.text.strip():
57 if not elem.text or not elem.text.strip():
57 elem.text = i + " "
58 elem.text = i + " "
58 if not elem.tail or not elem.tail.strip():
59 if not elem.tail or not elem.tail.strip():
59 elem.tail = i
60 elem.tail = i
60 for elem in elem:
61 for elem in elem:
61 indent(elem, level+1)
62 indent(elem, level+1)
62 if not elem.tail or not elem.tail.strip():
63 if not elem.tail or not elem.tail.strip():
63 elem.tail = i
64 elem.tail = i
64 else:
65 else:
65 if level and (not elem.tail or not elem.tail.strip()):
66 if level and (not elem.tail or not elem.tail.strip()):
66 elem.tail = i
67 elem.tail = i
67
68
68
69
69 class WinHPCJob(Component):
70 class WinHPCJob(Component):
70
71
71 job_id = Str('')
72 job_id = Str('')
72 job_name = Str('MyJob', config=True)
73 job_name = Str('MyJob', config=True)
73 min_cores = Int(1, config=True)
74 min_cores = Int(1, config=True)
74 max_cores = Int(1, config=True)
75 max_cores = Int(1, config=True)
75 min_sockets = Int(1, config=True)
76 min_sockets = Int(1, config=True)
76 max_sockets = Int(1, config=True)
77 max_sockets = Int(1, config=True)
77 min_nodes = Int(1, config=True)
78 min_nodes = Int(1, config=True)
78 max_nodes = Int(1, config=True)
79 max_nodes = Int(1, config=True)
79 unit_type = Str("Core", config=True)
80 unit_type = Str("Core", config=True)
80 auto_calculate_min = Bool(True, config=True)
81 auto_calculate_min = Bool(True, config=True)
81 auto_calculate_max = Bool(True, config=True)
82 auto_calculate_max = Bool(True, config=True)
82 run_until_canceled = Bool(False, config=True)
83 run_until_canceled = Bool(False, config=True)
83 is_exclusive = Bool(False, config=True)
84 is_exclusive = Bool(False, config=True)
84 username = Str(os.environ.get('USERNAME', ''), config=True)
85 username = Str(os.environ.get('USERNAME', ''), config=True)
85 owner = Str('', config=True)
86 job_type = Str('Batch', config=True)
86 job_type = Str('Batch', config=True)
87 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
87 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
88 default_value='Highest', config=True)
88 default_value='Highest', config=True)
89 requested_nodes = Str('', config=True)
89 requested_nodes = Str('', config=True)
90 project = Str('IPython', config=True)
90 project = Str('IPython', config=True)
91 xmlns = Str('http://schemas.microsoft.com/HPCS2008/scheduler/')
91 xmlns = Str('http://schemas.microsoft.com/HPCS2008/scheduler/')
92 version = Str("2.000")
92 version = Str("2.000")
93 tasks = List([])
93 tasks = List([])
94
94
95 def _username_changed(self, name, old, new):
95 @property
96 self.owner = new
96 def owner(self):
97 return self.username
97
98
98 def _write_attr(self, root, attr, key):
99 def _write_attr(self, root, attr, key):
99 s = as_str(getattr(self, attr, ''))
100 s = as_str(getattr(self, attr, ''))
100 if s:
101 if s:
101 root.set(key, s)
102 root.set(key, s)
102
103
103 def as_element(self):
104 def as_element(self):
104 # We have to add _A_ type things to get the right order than
105 # We have to add _A_ type things to get the right order than
105 # the MSFT XML parser expects.
106 # the MSFT XML parser expects.
106 root = ET.Element('Job')
107 root = ET.Element('Job')
107 self._write_attr(root, 'version', '_A_Version')
108 self._write_attr(root, 'version', '_A_Version')
108 self._write_attr(root, 'job_name', '_B_Name')
109 self._write_attr(root, 'job_name', '_B_Name')
109 self._write_attr(root, 'unit_type', '_C_UnitType')
110 self._write_attr(root, 'unit_type', '_C_UnitType')
110 self._write_attr(root, 'min_cores', '_D_MinCores')
111 self._write_attr(root, 'min_cores', '_D_MinCores')
111 self._write_attr(root, 'max_cores', '_E_MaxCores')
112 self._write_attr(root, 'max_cores', '_E_MaxCores')
112 self._write_attr(root, 'min_sockets', '_F_MinSockets')
113 self._write_attr(root, 'min_sockets', '_F_MinSockets')
113 self._write_attr(root, 'max_sockets', '_G_MaxSockets')
114 self._write_attr(root, 'max_sockets', '_G_MaxSockets')
114 self._write_attr(root, 'min_nodes', '_H_MinNodes')
115 self._write_attr(root, 'min_nodes', '_H_MinNodes')
115 self._write_attr(root, 'max_nodes', '_I_MaxNodes')
116 self._write_attr(root, 'max_nodes', '_I_MaxNodes')
116 self._write_attr(root, 'run_until_canceled', '_J_RunUntilCanceled')
117 self._write_attr(root, 'run_until_canceled', '_J_RunUntilCanceled')
117 self._write_attr(root, 'is_exclusive', '_K_IsExclusive')
118 self._write_attr(root, 'is_exclusive', '_K_IsExclusive')
118 self._write_attr(root, 'username', '_L_UserName')
119 self._write_attr(root, 'username', '_L_UserName')
119 self._write_attr(root, 'job_type', '_M_JobType')
120 self._write_attr(root, 'job_type', '_M_JobType')
120 self._write_attr(root, 'priority', '_N_Priority')
121 self._write_attr(root, 'priority', '_N_Priority')
121 self._write_attr(root, 'requested_nodes', '_O_RequestedNodes')
122 self._write_attr(root, 'requested_nodes', '_O_RequestedNodes')
122 self._write_attr(root, 'auto_calculate_max', '_P_AutoCalculateMax')
123 self._write_attr(root, 'auto_calculate_max', '_P_AutoCalculateMax')
123 self._write_attr(root, 'auto_calculate_min', '_Q_AutoCalculateMin')
124 self._write_attr(root, 'auto_calculate_min', '_Q_AutoCalculateMin')
124 self._write_attr(root, 'project', '_R_Project')
125 self._write_attr(root, 'project', '_R_Project')
125 self._write_attr(root, 'owner', '_S_Owner')
126 self._write_attr(root, 'owner', '_S_Owner')
126 self._write_attr(root, 'xmlns', '_T_xmlns')
127 self._write_attr(root, 'xmlns', '_T_xmlns')
127 dependencies = ET.SubElement(root, "Dependencies")
128 dependencies = ET.SubElement(root, "Dependencies")
128 etasks = ET.SubElement(root, "Tasks")
129 etasks = ET.SubElement(root, "Tasks")
129 for t in self.tasks:
130 for t in self.tasks:
130 etasks.append(t.as_element())
131 etasks.append(t.as_element())
131 return root
132 return root
132
133
133 def tostring(self):
134 def tostring(self):
134 """Return the string representation of the job description XML."""
135 """Return the string representation of the job description XML."""
135 root = self.as_element()
136 root = self.as_element()
136 indent(root)
137 indent(root)
137 txt = ET.tostring(root, encoding="utf-8")
138 txt = ET.tostring(root, encoding="utf-8")
138 # Now remove the tokens used to order the attributes.
139 # Now remove the tokens used to order the attributes.
139 txt = re.sub(r'_[A-Z]_','',txt)
140 txt = re.sub(r'_[A-Z]_','',txt)
140 txt = '<?xml version="1.0" encoding="utf-8"?>\n' + txt
141 txt = '<?xml version="1.0" encoding="utf-8"?>\n' + txt
141 return txt
142 return txt
142
143
143 def write(self, filename):
144 def write(self, filename):
144 """Write the XML job description to a file."""
145 """Write the XML job description to a file."""
145 txt = self.tostring()
146 txt = self.tostring()
146 with open(filename, 'w') as f:
147 with open(filename, 'w') as f:
147 f.write(txt)
148 f.write(txt)
148
149
149 def add_task(self, task):
150 def add_task(self, task):
150 """Add a task to the job.
151 """Add a task to the job.
151
152
152 Parameters
153 Parameters
153 ----------
154 ----------
154 task : :class:`WinHPCTask`
155 task : :class:`WinHPCTask`
155 The task object to add.
156 The task object to add.
156 """
157 """
157 self.tasks.append(task)
158 self.tasks.append(task)
158
159
159
160
160 class WinHPCTask(Component):
161 class WinHPCTask(Component):
161
162
162 task_id = Str('')
163 task_id = Str('')
163 task_name = Str('')
164 task_name = Str('')
164 version = Str("2.000")
165 version = Str("2.000")
165 min_cores = Int(1, config=True)
166 min_cores = Int(1, config=True)
166 max_cores = Int(1, config=True)
167 max_cores = Int(1, config=True)
167 min_sockets = Int(1, config=True)
168 min_sockets = Int(1, config=True)
168 max_sockets = Int(1, config=True)
169 max_sockets = Int(1, config=True)
169 min_nodes = Int(1, config=True)
170 min_nodes = Int(1, config=True)
170 max_nodes = Int(1, config=True)
171 max_nodes = Int(1, config=True)
171 unit_type = Str("Core", config=True)
172 unit_type = Str("Core", config=True)
172 command_line = Str('', config=True)
173 command_line = CStr('', config=True)
173 work_directory = Str('', config=True)
174 work_directory = CStr('', config=True)
174 is_rerunnaable = Bool(True, config=True)
175 is_rerunnaable = Bool(True, config=True)
175 std_out_file_path = Str('', config=True)
176 std_out_file_path = CStr('', config=True)
176 std_err_file_path = Str('', config=True)
177 std_err_file_path = CStr('', config=True)
177 is_parametric = Bool(False, config=True)
178 is_parametric = Bool(False, config=True)
178 environment_variables = Instance(dict, args=())
179 environment_variables = Instance(dict, args=(), config=True)
179
180
180 def _write_attr(self, root, attr, key):
181 def _write_attr(self, root, attr, key):
181 s = as_str(getattr(self, attr, ''))
182 s = as_str(getattr(self, attr, ''))
182 if s:
183 if s:
183 root.set(key, s)
184 root.set(key, s)
184
185
185 def as_element(self):
186 def as_element(self):
186 root = ET.Element('Task')
187 root = ET.Element('Task')
187 self._write_attr(root, 'version', '_A_Version')
188 self._write_attr(root, 'version', '_A_Version')
188 self._write_attr(root, 'task_name', '_B_Name')
189 self._write_attr(root, 'task_name', '_B_Name')
189 self._write_attr(root, 'min_cores', '_C_MinCores')
190 self._write_attr(root, 'min_cores', '_C_MinCores')
190 self._write_attr(root, 'max_cores', '_D_MaxCores')
191 self._write_attr(root, 'max_cores', '_D_MaxCores')
191 self._write_attr(root, 'min_sockets', '_E_MinSockets')
192 self._write_attr(root, 'min_sockets', '_E_MinSockets')
192 self._write_attr(root, 'max_sockets', '_F_MaxSockets')
193 self._write_attr(root, 'max_sockets', '_F_MaxSockets')
193 self._write_attr(root, 'min_nodes', '_G_MinNodes')
194 self._write_attr(root, 'min_nodes', '_G_MinNodes')
194 self._write_attr(root, 'max_nodes', '_H_MaxNodes')
195 self._write_attr(root, 'max_nodes', '_H_MaxNodes')
195 self._write_attr(root, 'command_line', '_I_CommandLine')
196 self._write_attr(root, 'command_line', '_I_CommandLine')
196 self._write_attr(root, 'work_directory', '_J_WorkDirectory')
197 self._write_attr(root, 'work_directory', '_J_WorkDirectory')
197 self._write_attr(root, 'is_rerunnaable', '_K_IsRerunnable')
198 self._write_attr(root, 'is_rerunnaable', '_K_IsRerunnable')
198 self._write_attr(root, 'std_out_file_path', '_L_StdOutFilePath')
199 self._write_attr(root, 'std_out_file_path', '_L_StdOutFilePath')
199 self._write_attr(root, 'std_err_file_path', '_M_StdErrFilePath')
200 self._write_attr(root, 'std_err_file_path', '_M_StdErrFilePath')
200 self._write_attr(root, 'is_parametric', '_N_IsParametric')
201 self._write_attr(root, 'is_parametric', '_N_IsParametric')
201 self._write_attr(root, 'unit_type', '_O_UnitType')
202 self._write_attr(root, 'unit_type', '_O_UnitType')
202 root.append(self.get_env_vars())
203 root.append(self.get_env_vars())
203 return root
204 return root
204
205
205 def get_env_vars(self):
206 def get_env_vars(self):
206 env_vars = ET.Element('EnvironmentVariables')
207 env_vars = ET.Element('EnvironmentVariables')
207 for k, v in self.environment_variables.items():
208 for k, v in self.environment_variables.items():
208 variable = ET.SubElement(env_vars, "Variable")
209 variable = ET.SubElement(env_vars, "Variable")
209 name = ET.SubElement(variable, "Name")
210 name = ET.SubElement(variable, "Name")
210 name.text = k
211 name.text = k
211 value = ET.SubElement(variable, "Value")
212 value = ET.SubElement(variable, "Value")
212 value.text = v
213 value.text = v
213 return env_vars
214 return env_vars
214
215
215
216
217
218 # By declaring these, we can configure the controller and engine separately!
219
220 class IPControllerTask(WinHPCTask):
221
222 task_name = Str('IPController', config=True)
223 controller_cmd = List(['ipcontroller.exe'], config=True)
224 controller_args = List(['--log-to-file', '--log-level', '40'], config=True)
225 # I don't want these to be configurable
226 std_out_file_path = CStr(os.path.join('log','ipcontroller-out.txt'), config=False)
227 std_err_file_path = CStr(os.path.join('log','ipcontroller-err.txt'), config=False)
228 min_cores = Int(1, config=False)
229 max_cores = Int(1, config=False)
230 min_sockets = Int(1, config=False)
231 max_sockets = Int(1, config=False)
232 min_nodes = Int(1, config=False)
233 max_nodes = Int(1, config=False)
234 unit_type = Str("Core", config=False)
235 work_directory = CStr('', config=False)
236
237 @property
238 def command_line(self):
239 return ' '.join(self.controller_cmd + self.controller_args)
240
241
242 class IPEngineTask(WinHPCTask):
243
244 task_name = Str('IPEngine', config=True)
245 engine_cmd = List(['ipengine.exe'], config=True)
246 engine_args = List(['--log-to-file', '--log-level', '40'], config=True)
247 # I don't want these to be configurable
248 std_out_file_path = CStr(os.path.join('log','ipengine-out-%s.txt' % uuid.uuid1()), config=False)
249 std_err_file_path = CStr(os.path.join('log','ipengine-err-%s.txt' % uuid.uuid1()), config=False)
250 min_cores = Int(1, config=False)
251 max_cores = Int(1, config=False)
252 min_sockets = Int(1, config=False)
253 max_sockets = Int(1, config=False)
254 min_nodes = Int(1, config=False)
255 max_nodes = Int(1, config=False)
256 unit_type = Str("Core", config=False)
257 work_directory = CStr('', config=False)
258
259 @property
260 def command_line(self):
261 return ' '.join(self.engine_cmd + self.engine_args)
262
263
216 # j = WinHPCJob(None)
264 # j = WinHPCJob(None)
217 # j.job_name = 'IPCluster'
265 # j.job_name = 'IPCluster'
218 # j.username = 'GNET\\bgranger'
266 # j.username = 'GNET\\bgranger'
219 # j.requested_nodes = 'GREEN'
267 # j.requested_nodes = 'GREEN'
220 #
268 #
221 # t = WinHPCTask(None)
269 # t = WinHPCTask(None)
222 # t.task_name = 'Controller'
270 # t.task_name = 'Controller'
223 # t.command_line = r"\\blue\domainusers$\bgranger\Python\Python25\Scripts\ipcontroller.exe --log-to-file -p default --log-level 10"
271 # t.command_line = r"\\blue\domainusers$\bgranger\Python\Python25\Scripts\ipcontroller.exe --log-to-file -p default --log-level 10"
224 # t.work_directory = r"\\blue\domainusers$\bgranger\.ipython\cluster_default"
272 # t.work_directory = r"\\blue\domainusers$\bgranger\.ipython\cluster_default"
225 # t.std_out_file_path = 'controller-out.txt'
273 # t.std_out_file_path = 'controller-out.txt'
226 # t.std_err_file_path = 'controller-err.txt'
274 # t.std_err_file_path = 'controller-err.txt'
227 # t.environment_variables['PYTHONPATH'] = r"\\blue\domainusers$\bgranger\Python\Python25\Lib\site-packages"
275 # t.environment_variables['PYTHONPATH'] = r"\\blue\domainusers$\bgranger\Python\Python25\Lib\site-packages"
228 # j.add_task(t)
276 # j.add_task(t)
229
277
General Comments 0
You need to be logged in to leave comments. Login now