##// END OF EJS Templates
Fixing a few bugs in the unicode path changes.
Brian Granger -
Show More
@@ -1,268 +1,268 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import with_statement
19 19
20 20 import copy
21 21 import os
22 22 import sys
23 23
24 24 from twisted.application import service
25 25 from twisted.internet import reactor
26 26 from twisted.python import log
27 27
28 28 from IPython.config.loader import Config, NoConfigDefault
29 29
30 30 from IPython.kernel.clusterdir import (
31 31 ApplicationWithClusterDir,
32 32 AppWithClusterDirArgParseConfigLoader
33 33 )
34 34
35 35 from IPython.core import release
36 36
37 from IPython.utils.traitlets import Str, Instance
37 from IPython.utils.traitlets import Str, Instance, Unicode
38 38
39 39 from IPython.kernel import controllerservice
40 40
41 41 from IPython.kernel.fcutil import FCServiceFactory
42 42
43 43 #-----------------------------------------------------------------------------
44 44 # Default interfaces
45 45 #-----------------------------------------------------------------------------
46 46
47 47
48 48 # The default client interfaces for FCClientServiceFactory.interfaces
49 49 default_client_interfaces = Config()
50 50 default_client_interfaces.Task.interface_chain = [
51 51 'IPython.kernel.task.ITaskController',
52 52 'IPython.kernel.taskfc.IFCTaskController'
53 53 ]
54 54
55 55 default_client_interfaces.Task.furl_file = 'ipcontroller-tc.furl'
56 56
57 57 default_client_interfaces.MultiEngine.interface_chain = [
58 58 'IPython.kernel.multiengine.IMultiEngine',
59 59 'IPython.kernel.multienginefc.IFCSynchronousMultiEngine'
60 60 ]
61 61
62 62 default_client_interfaces.MultiEngine.furl_file = u'ipcontroller-mec.furl'
63 63
64 64 # Make this a dict we can pass to Config.__init__ for the default
65 65 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
66 66
67 67
68 68
69 69 # The default engine interfaces for FCEngineServiceFactory.interfaces
70 70 default_engine_interfaces = Config()
71 71 default_engine_interfaces.Default.interface_chain = [
72 72 'IPython.kernel.enginefc.IFCControllerBase'
73 73 ]
74 74
75 75 default_engine_interfaces.Default.furl_file = u'ipcontroller-engine.furl'
76 76
77 77 # Make this a dict we can pass to Config.__init__ for the default
78 78 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
79 79
80 80
81 81 #-----------------------------------------------------------------------------
82 82 # Service factories
83 83 #-----------------------------------------------------------------------------
84 84
85 85
86 86 class FCClientServiceFactory(FCServiceFactory):
87 87 """A Foolscap implementation of the client services."""
88 88
89 89 cert_file = Unicode(u'ipcontroller-client.pem', config=True)
90 90 interfaces = Instance(klass=Config, kw=default_client_interfaces,
91 91 allow_none=False, config=True)
92 92
93 93
94 94 class FCEngineServiceFactory(FCServiceFactory):
95 95 """A Foolscap implementation of the engine services."""
96 96
97 97 cert_file = Unicode(u'ipcontroller-engine.pem', config=True)
98 98 interfaces = Instance(klass=dict, kw=default_engine_interfaces,
99 99 allow_none=False, config=True)
100 100
101 101
102 102 #-----------------------------------------------------------------------------
103 103 # The main application
104 104 #-----------------------------------------------------------------------------
105 105
106 106
107 107 cl_args = (
108 108 # Client config
109 109 (('--client-ip',), dict(
110 110 type=str, dest='FCClientServiceFactory.ip', default=NoConfigDefault,
111 111 help='The IP address or hostname the controller will listen on for '
112 112 'client connections.',
113 113 metavar='FCClientServiceFactory.ip')
114 114 ),
115 115 (('--client-port',), dict(
116 116 type=int, dest='FCClientServiceFactory.port', default=NoConfigDefault,
117 117 help='The port the controller will listen on for client connections. '
118 118 'The default is to use 0, which will autoselect an open port.',
119 119 metavar='FCClientServiceFactory.port')
120 120 ),
121 121 (('--client-location',), dict(
122 122 type=str, dest='FCClientServiceFactory.location', default=NoConfigDefault,
123 123 help='The hostname or IP that clients should connect to. This does '
124 124 'not control which interface the controller listens on. Instead, this '
125 125 'determines the hostname/IP that is listed in the FURL, which is how '
126 126 'clients know where to connect. Useful if the controller is listening '
127 127 'on multiple interfaces.',
128 128 metavar='FCClientServiceFactory.location')
129 129 ),
130 130 # Engine config
131 131 (('--engine-ip',), dict(
132 132 type=str, dest='FCEngineServiceFactory.ip', default=NoConfigDefault,
133 133 help='The IP address or hostname the controller will listen on for '
134 134 'engine connections.',
135 135 metavar='FCEngineServiceFactory.ip')
136 136 ),
137 137 (('--engine-port',), dict(
138 138 type=int, dest='FCEngineServiceFactory.port', default=NoConfigDefault,
139 139 help='The port the controller will listen on for engine connections. '
140 140 'The default is to use 0, which will autoselect an open port.',
141 141 metavar='FCEngineServiceFactory.port')
142 142 ),
143 143 (('--engine-location',), dict(
144 144 type=str, dest='FCEngineServiceFactory.location', default=NoConfigDefault,
145 145 help='The hostname or IP that engines should connect to. This does '
146 146 'not control which interface the controller listens on. Instead, this '
147 147 'determines the hostname/IP that is listed in the FURL, which is how '
148 148 'engines know where to connect. Useful if the controller is listening '
149 149 'on multiple interfaces.',
150 150 metavar='FCEngineServiceFactory.location')
151 151 ),
152 152 # Global config
153 153 (('--log-to-file',), dict(
154 154 action='store_true', dest='Global.log_to_file', default=NoConfigDefault,
155 155 help='Log to a file in the log directory (default is stdout)')
156 156 ),
157 157 (('-r','--reuse-furls'), dict(
158 158 action='store_true', dest='Global.reuse_furls', default=NoConfigDefault,
159 159 help='Try to reuse all FURL files. If this is not set all FURL files '
160 160 'are deleted before the controller starts. This must be set if '
161 161 'specific ports are specified by --engine-port or --client-port.')
162 162 ),
163 163 (('--no-secure',), dict(
164 164 action='store_false', dest='Global.secure', default=NoConfigDefault,
165 165 help='Turn off SSL encryption for all connections.')
166 166 ),
167 167 (('--secure',), dict(
168 168 action='store_true', dest='Global.secure', default=NoConfigDefault,
169 169 help='Turn off SSL encryption for all connections.')
170 170 )
171 171 )
172 172
173 173
174 174 class IPControllerAppCLConfigLoader(AppWithClusterDirArgParseConfigLoader):
175 175
176 176 arguments = cl_args
177 177
178 178
179 179 default_config_file_name = u'ipcontroller_config.py'
180 180
181 181
182 182 class IPControllerApp(ApplicationWithClusterDir):
183 183
184 184 name = u'ipcontroller'
185 185 description = 'Start the IPython controller for parallel computing.'
186 186 config_file_name = default_config_file_name
187 187 auto_create_cluster_dir = True
188 188
189 189 def create_default_config(self):
190 190 super(IPControllerApp, self).create_default_config()
191 191 self.default_config.Global.reuse_furls = False
192 192 self.default_config.Global.secure = True
193 193 self.default_config.Global.import_statements = []
194 194 self.default_config.Global.clean_logs = True
195 195
196 196 def create_command_line_config(self):
197 197 """Create and return a command line config loader."""
198 198 return IPControllerAppCLConfigLoader(
199 199 description=self.description,
200 200 version=release.version
201 201 )
202 202
203 203 def post_load_command_line_config(self):
204 204 # Now setup reuse_furls
205 205 c = self.command_line_config
206 206 if hasattr(c.Global, 'reuse_furls'):
207 207 c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
208 208 c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
209 209 del c.Global.reuse_furls
210 210 if hasattr(c.Global, 'secure'):
211 211 c.FCClientServiceFactory.secure = c.Global.secure
212 212 c.FCEngineServiceFactory.secure = c.Global.secure
213 213 del c.Global.secure
214 214
215 215 def construct(self):
216 216 # I am a little hesitant to put these into InteractiveShell itself.
217 217 # But that might be the place for them
218 218 sys.path.insert(0, '')
219 219
220 220 self.start_logging()
221 221 self.import_statements()
222 222
223 223 # Create the service hierarchy
224 224 self.main_service = service.MultiService()
225 225 # The controller service
226 226 controller_service = controllerservice.ControllerService()
227 227 controller_service.setServiceParent(self.main_service)
228 228 # The client tub and all its refereceables
229 229 csfactory = FCClientServiceFactory(self.master_config, controller_service)
230 230 client_service = csfactory.create()
231 231 client_service.setServiceParent(self.main_service)
232 232 # The engine tub
233 233 esfactory = FCEngineServiceFactory(self.master_config, controller_service)
234 234 engine_service = esfactory.create()
235 235 engine_service.setServiceParent(self.main_service)
236 236
237 237 def import_statements(self):
238 238 statements = self.master_config.Global.import_statements
239 239 for s in statements:
240 240 try:
241 241 log.msg("Executing statement: '%s'" % s)
242 242 exec s in globals(), locals()
243 243 except:
244 244 log.msg("Error running statement: %s" % s)
245 245
246 246 def start_app(self):
247 247 # Start the controller service.
248 248 self.main_service.startService()
249 249 # Write the .pid file overwriting old ones. This allow multiple
250 250 # controllers to clober each other. But Windows is not cleaning
251 251 # these up properly.
252 252 self.write_pid_file(overwrite=True)
253 253 # cd to the cluster_dir as our working directory.
254 254 os.chdir(self.master_config.Global.cluster_dir)
255 255 # Add a trigger to delete the .pid file upon shutting down.
256 256 reactor.addSystemEventTrigger('during','shutdown', self.remove_pid_file)
257 257 reactor.run()
258 258
259 259
260 260 def launch_new_instance():
261 261 """Create and run the IPython controller"""
262 262 app = IPControllerApp()
263 263 app.start()
264 264
265 265
266 266 if __name__ == '__main__':
267 267 launch_new_instance()
268 268
@@ -1,808 +1,815 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 Facilities for launching processing asynchronously.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import re
20 20 import sys
21 21
22 22 from IPython.core.component import Component
23 23 from IPython.external import Itpl
24 24 from IPython.utils.traitlets import Str, Int, List, Unicode, Enum
25 25 from IPython.utils.platutils import find_cmd
26 26 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
27 27 from IPython.kernel.winhpcjob import (
28 28 WinHPCJob, WinHPCTask,
29 29 IPControllerTask, IPEngineTask
30 30 )
31 31
32 32 from twisted.internet import reactor, defer
33 33 from twisted.internet.defer import inlineCallbacks
34 34 from twisted.internet.protocol import ProcessProtocol
35 35 from twisted.internet.utils import getProcessOutput
36 36 from twisted.internet.error import ProcessDone, ProcessTerminated
37 37 from twisted.python import log
38 38 from twisted.python.failure import Failure
39 39
40 40 #-----------------------------------------------------------------------------
41 41 # Generic launchers
42 42 #-----------------------------------------------------------------------------
43 43
44 44
45 45 class LauncherError(Exception):
46 46 pass
47 47
48 48
49 49 class ProcessStateError(LauncherError):
50 50 pass
51 51
52 52
53 53 class UnknownStatus(LauncherError):
54 54 pass
55 55
56 56
57 57 class BaseLauncher(Component):
58 58 """An asbtraction for starting, stopping and signaling a process."""
59 59
60 60 # A directory for files related to the process. But, we don't cd to
61 61 # this directory,
62 62 working_dir = Unicode(u'')
63 63
64 64 def __init__(self, working_dir, parent=None, name=None, config=None):
65 65 super(BaseLauncher, self).__init__(parent, name, config)
66 66 self.working_dir = working_dir
67 67 self.state = 'before' # can be before, running, after
68 68 self.stop_deferreds = []
69 69 self.start_data = None
70 70 self.stop_data = None
71 71
72 72 @property
73 73 def args(self):
74 74 """A list of cmd and args that will be used to start the process.
75 75
76 76 This is what is passed to :func:`spawnProcess` and the first element
77 77 will be the process name.
78 78 """
79 79 return self.find_args()
80 80
81 81 def find_args(self):
82 82 """The ``.args`` property calls this to find the args list.
83 83
84 84 Subcommand should implement this to construct the cmd and args.
85 85 """
86 86 raise NotImplementedError('find_args must be implemented in a subclass')
87 87
88 88 @property
89 89 def arg_str(self):
90 90 """The string form of the program arguments."""
91 91 return ' '.join(self.args)
92 92
93 93 @property
94 94 def running(self):
95 95 """Am I running."""
96 96 if self.state == 'running':
97 97 return True
98 98 else:
99 99 return False
100 100
101 101 def start(self):
102 102 """Start the process.
103 103
104 104 This must return a deferred that fires with information about the
105 105 process starting (like a pid, job id, etc.).
106 106 """
107 107 return defer.fail(
108 108 Failure(NotImplementedError(
109 109 'start must be implemented in a subclass')
110 110 )
111 111 )
112 112
113 113 def stop(self):
114 114 """Stop the process and notify observers of stopping.
115 115
116 116 This must return a deferred that fires with information about the
117 117 processing stopping, like errors that occur while the process is
118 118 attempting to be shut down. This deferred won't fire when the process
119 119 actually stops. To observe the actual process stopping, see
120 120 :func:`observe_stop`.
121 121 """
122 122 return defer.fail(
123 123 Failure(NotImplementedError(
124 124 'stop must be implemented in a subclass')
125 125 )
126 126 )
127 127
128 128 def observe_stop(self):
129 129 """Get a deferred that will fire when the process stops.
130 130
131 131 The deferred will fire with data that contains information about
132 132 the exit status of the process.
133 133 """
134 134 if self.state=='after':
135 135 return defer.succeed(self.stop_data)
136 136 else:
137 137 d = defer.Deferred()
138 138 self.stop_deferreds.append(d)
139 139 return d
140 140
141 141 def notify_start(self, data):
142 142 """Call this to trigger startup actions.
143 143
144 144 This logs the process startup and sets the state to 'running'. It is
145 145 a pass-through so it can be used as a callback.
146 146 """
147 147
148 148 log.msg('Process %r started: %r' % (self.args[0], data))
149 149 self.start_data = data
150 150 self.state = 'running'
151 151 return data
152 152
153 153 def notify_stop(self, data):
154 154 """Call this to trigger process stop actions.
155 155
156 156 This logs the process stopping and sets the state to 'after'. Call
157 157 this to trigger all the deferreds from :func:`observe_stop`."""
158 158
159 159 log.msg('Process %r stopped: %r' % (self.args[0], data))
160 160 self.stop_data = data
161 161 self.state = 'after'
162 162 for i in range(len(self.stop_deferreds)):
163 163 d = self.stop_deferreds.pop()
164 164 d.callback(data)
165 165 return data
166 166
167 167 def signal(self, sig):
168 168 """Signal the process.
169 169
170 170 Return a semi-meaningless deferred after signaling the process.
171 171
172 172 Parameters
173 173 ----------
174 174 sig : str or int
175 175 'KILL', 'INT', etc., or any signal number
176 176 """
177 177 return defer.fail(
178 178 Failure(NotImplementedError(
179 179 'signal must be implemented in a subclass')
180 180 )
181 181 )
182 182
183 183
184 184 class LocalProcessLauncherProtocol(ProcessProtocol):
185 185 """A ProcessProtocol to go with the LocalProcessLauncher."""
186 186
187 187 def __init__(self, process_launcher):
188 188 self.process_launcher = process_launcher
189 189 self.pid = None
190 190
191 191 def connectionMade(self):
192 192 self.pid = self.transport.pid
193 193 self.process_launcher.notify_start(self.transport.pid)
194 194
195 195 def processEnded(self, status):
196 196 value = status.value
197 197 if isinstance(value, ProcessDone):
198 198 self.process_launcher.notify_stop(
199 199 {'exit_code':0,
200 200 'signal':None,
201 201 'status':None,
202 202 'pid':self.pid
203 203 }
204 204 )
205 205 elif isinstance(value, ProcessTerminated):
206 206 self.process_launcher.notify_stop(
207 207 {'exit_code':value.exitCode,
208 208 'signal':value.signal,
209 209 'status':value.status,
210 210 'pid':self.pid
211 211 }
212 212 )
213 213 else:
214 214 raise UnknownStatus("Unknown exit status, this is probably a "
215 215 "bug in Twisted")
216 216
217 217 def outReceived(self, data):
218 218 log.msg(data)
219 219
220 220 def errReceived(self, data):
221 221 log.err(data)
222 222
223 223
224 224 class LocalProcessLauncher(BaseLauncher):
225 225 """Start and stop an external process in an asynchronous manner."""
226 226
227 227 # This is used to to construct self.args, which is passed to
228 228 # spawnProcess.
229 229 cmd_and_args = List([])
230 230
231 231 def __init__(self, working_dir, parent=None, name=None, config=None):
232 232 super(LocalProcessLauncher, self).__init__(
233 233 working_dir, parent, name, config
234 234 )
235 235 self.process_protocol = None
236 236 self.start_deferred = None
237 237
238 238 def find_args(self):
239 239 return self.cmd_and_args
240 240
241 241 def start(self):
242 242 if self.state == 'before':
243 243 self.process_protocol = LocalProcessLauncherProtocol(self)
244 244 self.start_deferred = defer.Deferred()
245 245 self.process_transport = reactor.spawnProcess(
246 246 self.process_protocol,
247 247 str(self.args[0]), # twisted expects these to be str, not unicode
248 248 [str(a) for a in self.args], # str expected, not unicode
249 249 env=os.environ
250 250 )
251 251 return self.start_deferred
252 252 else:
253 253 s = 'The process was already started and has state: %r' % self.state
254 254 return defer.fail(ProcessStateError(s))
255 255
256 256 def notify_start(self, data):
257 257 super(LocalProcessLauncher, self).notify_start(data)
258 258 self.start_deferred.callback(data)
259 259
260 260 def stop(self):
261 261 return self.interrupt_then_kill()
262 262
263 263 @make_deferred
264 264 def signal(self, sig):
265 265 if self.state == 'running':
266 266 self.process_transport.signalProcess(sig)
267 267
268 268 @inlineCallbacks
269 269 def interrupt_then_kill(self, delay=2.0):
270 270 """Send INT, wait a delay and then send KILL."""
271 271 yield self.signal('INT')
272 272 yield sleep_deferred(delay)
273 273 yield self.signal('KILL')
274 274
275 275
276 276 class MPIExecLauncher(LocalProcessLauncher):
277 277 """Launch an external process using mpiexec."""
278 278
279 279 # The mpiexec command to use in starting the process.
280 280 mpi_cmd = List(['mpiexec'], config=True)
281 281 # The command line arguments to pass to mpiexec.
282 282 mpi_args = List([], config=True)
283 283 # The program to start using mpiexec.
284 284 program = List(['date'], config=True)
285 285 # The command line argument to the program.
286 286 program_args = List([], config=True)
287 287 # The number of instances of the program to start.
288 288 n = Int(1, config=True)
289 289
290 290 def find_args(self):
291 291 """Build self.args using all the fields."""
292 292 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
293 293 self.program + self.program_args
294 294
295 295 def start(self, n):
296 296 """Start n instances of the program using mpiexec."""
297 297 self.n = n
298 298 return super(MPIExecLauncher, self).start()
299 299
300 300
301 301 class SSHLauncher(BaseLauncher):
302 302 """A minimal launcher for ssh.
303 303
304 304 To be useful this will probably have to be extended to use the ``sshx``
305 305 idea for environment variables. There could be other things this needs
306 306 as well.
307 307 """
308 308
309 309 ssh_cmd = List(['ssh'], config=True)
310 310 ssh_args = List([], config=True)
311 311 program = List(['date'], config=True)
312 312 program_args = List([], config=True)
313 313 hostname = Str('', config=True)
314 314 user = Str('', config=True)
315 315 location = Str('')
316 316
317 317 def _hostname_changed(self, name, old, new):
318 318 self.location = '%s@%s' % (self.user, new)
319 319
320 320 def _user_changed(self, name, old, new):
321 321 self.location = '%s@%s' % (new, self.hostname)
322 322
323 323 def find_args(self):
324 324 return self.ssh_cmd + self.ssh_args + [self.location] + \
325 325 self.program + self.program_args
326 326
327 327 def start(self, n, hostname=None, user=None):
328 328 if hostname is not None:
329 329 self.hostname = hostname
330 330 if user is not None:
331 331 self.user = user
332 332 return super(SSHLauncher, self).start()
333 333
334 334
335 # This is only used on Windows.
336 if os.name=='nt':
337 job_cmd = find_cmd('job')
338 else:
339 job_cmd = 'job'
340
341
335 342 class WindowsHPCLauncher(BaseLauncher):
336 343
337 344 # A regular expression used to get the job id from the output of the
338 345 # submit_command.
339 346 job_id_regexp = Str('\d+', config=True)
340 347 # The filename of the instantiated job script.
341 348 job_file_name = Unicode(u'ipython_job.xml', config=True)
342 349 # The full path to the instantiated job script. This gets made dynamically
343 350 # by combining the working_dir with the job_file_name.
344 351 job_file = Unicode(u'')
345 352 # The hostname of the scheduler to submit the job to
346 353 scheduler = Str('HEADNODE', config=True)
347 354 username = Str(os.environ.get('USERNAME', ''), config=True)
348 355 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
349 356 default_value='Highest', config=True)
350 357 requested_nodes = Str('', config=True)
351 358 project = Str('MyProject', config=True)
352 job_cmd = Str(find_cmd('job'), config=True)
359 job_cmd = Str(job_cmd, config=True)
353 360
354 361 def __init__(self, working_dir, parent=None, name=None, config=None):
355 362 super(WindowsHPCLauncher, self).__init__(
356 363 working_dir, parent, name, config
357 364 )
358 365 self.job_file = os.path.join(self.working_dir, self.job_file_name)
359 366
360 367 def write_job_file(self, n):
361 368 raise NotImplementedError("Implement write_job_file in a subclass.")
362 369
363 370 def find_args(self):
364 371 return ['job.exe']
365 372
366 373 def parse_job_id(self, output):
367 374 """Take the output of the submit command and return the job id."""
368 375 m = re.search(self.job_id_regexp, output)
369 376 if m is not None:
370 377 job_id = m.group()
371 378 else:
372 379 raise LauncherError("Job id couldn't be determined: %s" % output)
373 380 self.job_id = job_id
374 381 log.msg('Job started with job id: %r' % job_id)
375 382 return job_id
376 383
377 384 @inlineCallbacks
378 385 def start(self, n):
379 386 """Start n copies of the process using the Win HPC job scheduler."""
380 387 self.write_job_file(n)
381 388 args = [
382 389 'submit',
383 390 '/jobfile:%s' % self.job_file,
384 391 '/scheduler:%s' % self.scheduler
385 392 ]
386 393 log.msg("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
387 394 output = yield getProcessOutput(self.job_cmd,
388 395 args,
389 396 env=os.environ,
390 397 path=self.working_dir
391 398 )
392 399 job_id = self.parse_job_id(output)
393 400 self.notify_start(job_id)
394 401 defer.returnValue(job_id)
395 402
396 403 @inlineCallbacks
397 404 def stop(self):
398 405 args = [
399 406 'cancel',
400 407 self.job_id,
401 408 '/scheduler:%s' % self.scheduler
402 409 ]
403 410 log.msg("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
404 411 try:
405 412 output = yield getProcessOutput(self.job_cmd,
406 413 args,
407 414 env=os.environ,
408 415 path=self.working_dir
409 416 )
410 417 except:
411 418 output = 'The job already appears to be stoppped: %r' % self.job_id
412 419 self.notify_stop(output) # Pass the output of the kill cmd
413 420 defer.returnValue(output)
414 421
415 422
416 423 class BatchSystemLauncher(BaseLauncher):
417 424 """Launch an external process using a batch system.
418 425
419 426 This class is designed to work with UNIX batch systems like PBS, LSF,
420 427 GridEngine, etc. The overall model is that there are different commands
421 428 like qsub, qdel, etc. that handle the starting and stopping of the process.
422 429
423 430 This class also has the notion of a batch script. The ``batch_template``
424 431 attribute can be set to a string that is a template for the batch script.
425 432 This template is instantiated using Itpl. Thus the template can use
426 433 ${n} fot the number of instances. Subclasses can add additional variables
427 434 to the template dict.
428 435 """
429 436
430 437 # Subclasses must fill these in. See PBSEngineSet
431 438 # The name of the command line program used to submit jobs.
432 439 submit_command = Str('', config=True)
433 440 # The name of the command line program used to delete jobs.
434 441 delete_command = Str('', config=True)
435 442 # A regular expression used to get the job id from the output of the
436 443 # submit_command.
437 444 job_id_regexp = Str('', config=True)
438 445 # The string that is the batch script template itself.
439 446 batch_template = Str('', config=True)
440 447 # The filename of the instantiated batch script.
441 448 batch_file_name = Unicode(u'batch_script', config=True)
442 449 # The full path to the instantiated batch script.
443 450 batch_file = Unicode(u'')
444 451
445 452 def __init__(self, working_dir, parent=None, name=None, config=None):
446 453 super(BatchSystemLauncher, self).__init__(
447 454 working_dir, parent, name, config
448 455 )
449 456 self.batch_file = os.path.join(self.working_dir, self.batch_file_name)
450 457 self.context = {}
451 458
452 459 def parse_job_id(self, output):
453 460 """Take the output of the submit command and return the job id."""
454 461 m = re.match(self.job_id_regexp, output)
455 462 if m is not None:
456 463 job_id = m.group()
457 464 else:
458 465 raise LauncherError("Job id couldn't be determined: %s" % output)
459 466 self.job_id = job_id
460 467 log.msg('Job started with job id: %r' % job_id)
461 468 return job_id
462 469
463 470 def write_batch_script(self, n):
464 471 """Instantiate and write the batch script to the working_dir."""
465 472 self.context['n'] = n
466 473 script_as_string = Itpl.itplns(self.batch_template, self.context)
467 474 log.msg('Writing instantiated batch script: %s' % self.batch_file)
468 475 f = open(self.batch_file, 'w')
469 476 f.write(script_as_string)
470 477 f.close()
471 478
472 479 @inlineCallbacks
473 480 def start(self, n):
474 481 """Start n copies of the process using a batch system."""
475 482 self.write_batch_script(n)
476 483 output = yield getProcessOutput(self.submit_command,
477 484 [self.batch_file], env=os.environ)
478 485 job_id = self.parse_job_id(output)
479 486 self.notify_start(job_id)
480 487 defer.returnValue(job_id)
481 488
482 489 @inlineCallbacks
483 490 def stop(self):
484 491 output = yield getProcessOutput(self.delete_command,
485 492 [self.job_id], env=os.environ
486 493 )
487 494 self.notify_stop(output) # Pass the output of the kill cmd
488 495 defer.returnValue(output)
489 496
490 497
491 498 class PBSLauncher(BatchSystemLauncher):
492 499 """A BatchSystemLauncher subclass for PBS."""
493 500
494 501 submit_command = Str('qsub', config=True)
495 502 delete_command = Str('qdel', config=True)
496 503 job_id_regexp = Str('\d+', config=True)
497 504 batch_template = Str('', config=True)
498 505 batch_file_name = Unicode(u'pbs_batch_script', config=True)
499 506 batch_file = Unicode(u'')
500 507
501 508
502 509 #-----------------------------------------------------------------------------
503 510 # Controller launchers
504 511 #-----------------------------------------------------------------------------
505 512
506 513 def find_controller_cmd():
507 514 """Find the command line ipcontroller program in a cross platform way."""
508 515 if sys.platform == 'win32':
509 516 # This logic is needed because the ipcontroller script doesn't
510 517 # always get installed in the same way or in the same location.
511 518 from IPython.kernel import ipcontrollerapp
512 519 script_location = ipcontrollerapp.__file__.replace('.pyc', '.py')
513 520 # The -u option here turns on unbuffered output, which is required
514 521 # on Win32 to prevent wierd conflict and problems with Twisted.
515 522 # Also, use sys.executable to make sure we are picking up the
516 523 # right python exe.
517 524 cmd = [sys.executable, '-u', script_location]
518 525 else:
519 526 # ipcontroller has to be on the PATH in this case.
520 527 cmd = ['ipcontroller']
521 528 return cmd
522 529
523 530
524 531 class LocalControllerLauncher(LocalProcessLauncher):
525 532 """Launch a controller as a regular external process."""
526 533
527 534 controller_cmd = List(find_controller_cmd(), config=True)
528 535 # Command line arguments to ipcontroller.
529 536 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
530 537
531 538 def find_args(self):
532 539 return self.controller_cmd + self.controller_args
533 540
534 541 def start(self, profile=None, cluster_dir=None):
535 542 """Start the controller by profile or cluster_dir."""
536 543 if cluster_dir is not None:
537 544 self.controller_args.extend(['--cluster-dir', cluster_dir])
538 545 if profile is not None:
539 546 self.controller_args.extend(['--profile', profile])
540 547 log.msg("Starting LocalControllerLauncher: %r" % self.args)
541 548 return super(LocalControllerLauncher, self).start()
542 549
543 550
544 551 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
545 552
546 553 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
547 554 extra_args = List([],config=False)
548 555
549 556 def write_job_file(self, n):
550 557 job = WinHPCJob(self)
551 558 job.job_name = "IPController"
552 559 job.username = self.username
553 560 job.priority = self.priority
554 561 job.requested_nodes = self.requested_nodes
555 562 job.project = self.project
556 563
557 564 t = IPControllerTask(self)
558 565 t.work_directory = self.working_dir
559 566 # Add the --profile and --cluster-dir args from start.
560 567 t.controller_args.extend(self.extra_args)
561 568 job.add_task(t)
562 569 log.msg("Writing job description file: %s" % self.job_file)
563 570 job.write(self.job_file)
564 571
565 572 def start(self, profile=None, cluster_dir=None):
566 573 """Start the controller by profile or cluster_dir."""
567 574 if cluster_dir is not None:
568 575 self.extra_args = ['--cluster-dir', cluster_dir]
569 576 if profile is not None:
570 577 self.extra_args = ['--profile', profile]
571 578 return super(WindowsHPCControllerLauncher, self).start(1)
572 579
573 580
574 581 class MPIExecControllerLauncher(MPIExecLauncher):
575 582 """Launch a controller using mpiexec."""
576 583
577 584 controller_cmd = List(find_controller_cmd(), config=True)
578 585 # Command line arguments to ipcontroller.
579 586 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
580 587 n = Int(1, config=False)
581 588
582 589 def start(self, profile=None, cluster_dir=None):
583 590 """Start the controller by profile or cluster_dir."""
584 591 if cluster_dir is not None:
585 592 self.controller_args.extend(['--cluster-dir', cluster_dir])
586 593 if profile is not None:
587 594 self.controller_args.extend(['--profile', profile])
588 595 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
589 596 return super(MPIExecControllerLauncher, self).start(1)
590 597
591 598 def find_args(self):
592 599 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
593 600 self.controller_cmd + self.controller_args
594 601
595 602
596 603 class PBSControllerLauncher(PBSLauncher):
597 604 """Launch a controller using PBS."""
598 605
599 606 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
600 607
601 608 def start(self, profile=None, cluster_dir=None):
602 609 """Start the controller by profile or cluster_dir."""
603 610 # Here we save profile and cluster_dir in the context so they
604 611 # can be used in the batch script template as ${profile} and
605 612 # ${cluster_dir}
606 613 if cluster_dir is not None:
607 614 self.context['cluster_dir'] = cluster_dir
608 615 if profile is not None:
609 616 self.context['profile'] = profile
610 617 log.msg("Starting PBSControllerLauncher: %r" % self.args)
611 618 return super(PBSControllerLauncher, self).start(1)
612 619
613 620
614 621 class SSHControllerLauncher(SSHLauncher):
615 622 pass
616 623
617 624
618 625 #-----------------------------------------------------------------------------
619 626 # Engine launchers
620 627 #-----------------------------------------------------------------------------
621 628
622 629
623 630 def find_engine_cmd():
624 631 """Find the command line ipengine program in a cross platform way."""
625 632 if sys.platform == 'win32':
626 633 # This logic is needed because the ipengine script doesn't
627 634 # always get installed in the same way or in the same location.
628 635 from IPython.kernel import ipengineapp
629 636 script_location = ipengineapp.__file__.replace('.pyc', '.py')
630 637 # The -u option here turns on unbuffered output, which is required
631 638 # on Win32 to prevent wierd conflict and problems with Twisted.
632 639 # Also, use sys.executable to make sure we are picking up the
633 640 # right python exe.
634 641 cmd = [sys.executable, '-u', script_location]
635 642 else:
636 643 # ipcontroller has to be on the PATH in this case.
637 644 cmd = ['ipengine']
638 645 return cmd
639 646
640 647
641 648 class LocalEngineLauncher(LocalProcessLauncher):
642 649 """Launch a single engine as a regular externall process."""
643 650
644 651 engine_cmd = List(find_engine_cmd(), config=True)
645 652 # Command line arguments for ipengine.
646 653 engine_args = List(
647 654 ['--log-to-file','--log-level', '40'], config=True
648 655 )
649 656
650 657 def find_args(self):
651 658 return self.engine_cmd + self.engine_args
652 659
653 660 def start(self, profile=None, cluster_dir=None):
654 661 """Start the engine by profile or cluster_dir."""
655 662 if cluster_dir is not None:
656 663 self.engine_args.extend(['--cluster-dir', cluster_dir])
657 664 if profile is not None:
658 665 self.engine_args.extend(['--profile', profile])
659 666 return super(LocalEngineLauncher, self).start()
660 667
661 668
662 669 class LocalEngineSetLauncher(BaseLauncher):
663 670 """Launch a set of engines as regular external processes."""
664 671
665 672 # Command line arguments for ipengine.
666 673 engine_args = List(
667 674 ['--log-to-file','--log-level', '40'], config=True
668 675 )
669 676
670 677 def __init__(self, working_dir, parent=None, name=None, config=None):
671 678 super(LocalEngineSetLauncher, self).__init__(
672 679 working_dir, parent, name, config
673 680 )
674 681 self.launchers = []
675 682
676 683 def start(self, n, profile=None, cluster_dir=None):
677 684 """Start n engines by profile or cluster_dir."""
678 685 dlist = []
679 686 for i in range(n):
680 687 el = LocalEngineLauncher(self.working_dir, self)
681 688 # Copy the engine args over to each engine launcher.
682 689 import copy
683 690 el.engine_args = copy.deepcopy(self.engine_args)
684 691 d = el.start(profile, cluster_dir)
685 692 if i==0:
686 693 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
687 694 self.launchers.append(el)
688 695 dlist.append(d)
689 696 # The consumeErrors here could be dangerous
690 697 dfinal = gatherBoth(dlist, consumeErrors=True)
691 698 dfinal.addCallback(self.notify_start)
692 699 return dfinal
693 700
694 701 def find_args(self):
695 702 return ['engine set']
696 703
697 704 def signal(self, sig):
698 705 dlist = []
699 706 for el in self.launchers:
700 707 d = el.signal(sig)
701 708 dlist.append(d)
702 709 dfinal = gatherBoth(dlist, consumeErrors=True)
703 710 return dfinal
704 711
705 712 def interrupt_then_kill(self, delay=1.0):
706 713 dlist = []
707 714 for el in self.launchers:
708 715 d = el.interrupt_then_kill(delay)
709 716 dlist.append(d)
710 717 dfinal = gatherBoth(dlist, consumeErrors=True)
711 718 return dfinal
712 719
713 720 def stop(self):
714 721 return self.interrupt_then_kill()
715 722
716 723 def observe_stop(self):
717 724 dlist = [el.observe_stop() for el in self.launchers]
718 725 dfinal = gatherBoth(dlist, consumeErrors=False)
719 726 dfinal.addCallback(self.notify_stop)
720 727 return dfinal
721 728
722 729
723 730 class MPIExecEngineSetLauncher(MPIExecLauncher):
724 731
725 732 engine_cmd = List(find_engine_cmd(), config=True)
726 733 # Command line arguments for ipengine.
727 734 engine_args = List(
728 735 ['--log-to-file','--log-level', '40'], config=True
729 736 )
730 737 n = Int(1, config=True)
731 738
732 739 def start(self, n, profile=None, cluster_dir=None):
733 740 """Start n engines by profile or cluster_dir."""
734 741 if cluster_dir is not None:
735 742 self.engine_args.extend(['--cluster-dir', cluster_dir])
736 743 if profile is not None:
737 744 self.engine_args.extend(['--profile', profile])
738 745 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
739 746 return super(MPIExecEngineSetLauncher, self).start(n)
740 747
741 748 def find_args(self):
742 749 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
743 750 self.engine_cmd + self.engine_args
744 751
745 752
746 753 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
747 754 pass
748 755
749 756
750 757 class PBSEngineSetLauncher(PBSLauncher):
751 758
752 759 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
753 760
754 761 def start(self, n, profile=None, cluster_dir=None):
755 762 """Start n engines by profile or cluster_dir."""
756 763 if cluster_dir is not None:
757 764 self.program_args.extend(['--cluster-dir', cluster_dir])
758 765 if profile is not None:
759 766 self.program_args.extend(['-p', profile])
760 767 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
761 768 return super(PBSEngineSetLauncher, self).start(n)
762 769
763 770
764 771 class SSHEngineSetLauncher(BaseLauncher):
765 772 pass
766 773
767 774
768 775 #-----------------------------------------------------------------------------
769 776 # A launcher for ipcluster itself!
770 777 #-----------------------------------------------------------------------------
771 778
772 779
773 780 def find_ipcluster_cmd():
774 781 """Find the command line ipcluster program in a cross platform way."""
775 782 if sys.platform == 'win32':
776 783 # This logic is needed because the ipcluster script doesn't
777 784 # always get installed in the same way or in the same location.
778 785 from IPython.kernel import ipclusterapp
779 786 script_location = ipclusterapp.__file__.replace('.pyc', '.py')
780 787 # The -u option here turns on unbuffered output, which is required
781 788 # on Win32 to prevent wierd conflict and problems with Twisted.
782 789 # Also, use sys.executable to make sure we are picking up the
783 790 # right python exe.
784 791 cmd = [sys.executable, '-u', script_location]
785 792 else:
786 793 # ipcontroller has to be on the PATH in this case.
787 794 cmd = ['ipcluster']
788 795 return cmd
789 796
790 797
791 798 class IPClusterLauncher(LocalProcessLauncher):
792 799 """Launch the ipcluster program in an external process."""
793 800
794 801 ipcluster_cmd = List(find_ipcluster_cmd(), config=True)
795 802 # Command line arguments to pass to ipcluster.
796 803 ipcluster_args = List(
797 804 ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True)
798 805 ipcluster_subcommand = Str('start')
799 806 ipcluster_n = Int(2)
800 807
801 808 def find_args(self):
802 809 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
803 810 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
804 811
805 812 def start(self):
806 813 log.msg("Starting ipcluster: %r" % self.args)
807 814 return super(IPClusterLauncher, self).start()
808 815
General Comments 0
You need to be logged in to leave comments. Login now