##// END OF EJS Templates
Fixing a few bugs in the unicode path changes.
Brian Granger -
Show More
@@ -1,268 +1,268 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import copy
20 import copy
21 import os
21 import os
22 import sys
22 import sys
23
23
24 from twisted.application import service
24 from twisted.application import service
25 from twisted.internet import reactor
25 from twisted.internet import reactor
26 from twisted.python import log
26 from twisted.python import log
27
27
28 from IPython.config.loader import Config, NoConfigDefault
28 from IPython.config.loader import Config, NoConfigDefault
29
29
30 from IPython.kernel.clusterdir import (
30 from IPython.kernel.clusterdir import (
31 ApplicationWithClusterDir,
31 ApplicationWithClusterDir,
32 AppWithClusterDirArgParseConfigLoader
32 AppWithClusterDirArgParseConfigLoader
33 )
33 )
34
34
35 from IPython.core import release
35 from IPython.core import release
36
36
37 from IPython.utils.traitlets import Str, Instance
37 from IPython.utils.traitlets import Str, Instance, Unicode
38
38
39 from IPython.kernel import controllerservice
39 from IPython.kernel import controllerservice
40
40
41 from IPython.kernel.fcutil import FCServiceFactory
41 from IPython.kernel.fcutil import FCServiceFactory
42
42
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44 # Default interfaces
44 # Default interfaces
45 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
46
46
47
47
48 # The default client interfaces for FCClientServiceFactory.interfaces
48 # The default client interfaces for FCClientServiceFactory.interfaces
49 default_client_interfaces = Config()
49 default_client_interfaces = Config()
50 default_client_interfaces.Task.interface_chain = [
50 default_client_interfaces.Task.interface_chain = [
51 'IPython.kernel.task.ITaskController',
51 'IPython.kernel.task.ITaskController',
52 'IPython.kernel.taskfc.IFCTaskController'
52 'IPython.kernel.taskfc.IFCTaskController'
53 ]
53 ]
54
54
55 default_client_interfaces.Task.furl_file = 'ipcontroller-tc.furl'
55 default_client_interfaces.Task.furl_file = 'ipcontroller-tc.furl'
56
56
57 default_client_interfaces.MultiEngine.interface_chain = [
57 default_client_interfaces.MultiEngine.interface_chain = [
58 'IPython.kernel.multiengine.IMultiEngine',
58 'IPython.kernel.multiengine.IMultiEngine',
59 'IPython.kernel.multienginefc.IFCSynchronousMultiEngine'
59 'IPython.kernel.multienginefc.IFCSynchronousMultiEngine'
60 ]
60 ]
61
61
62 default_client_interfaces.MultiEngine.furl_file = u'ipcontroller-mec.furl'
62 default_client_interfaces.MultiEngine.furl_file = u'ipcontroller-mec.furl'
63
63
64 # Make this a dict we can pass to Config.__init__ for the default
64 # Make this a dict we can pass to Config.__init__ for the default
65 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
65 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
66
66
67
67
68
68
69 # The default engine interfaces for FCEngineServiceFactory.interfaces
69 # The default engine interfaces for FCEngineServiceFactory.interfaces
70 default_engine_interfaces = Config()
70 default_engine_interfaces = Config()
71 default_engine_interfaces.Default.interface_chain = [
71 default_engine_interfaces.Default.interface_chain = [
72 'IPython.kernel.enginefc.IFCControllerBase'
72 'IPython.kernel.enginefc.IFCControllerBase'
73 ]
73 ]
74
74
75 default_engine_interfaces.Default.furl_file = u'ipcontroller-engine.furl'
75 default_engine_interfaces.Default.furl_file = u'ipcontroller-engine.furl'
76
76
77 # Make this a dict we can pass to Config.__init__ for the default
77 # Make this a dict we can pass to Config.__init__ for the default
78 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
78 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
79
79
80
80
81 #-----------------------------------------------------------------------------
81 #-----------------------------------------------------------------------------
82 # Service factories
82 # Service factories
83 #-----------------------------------------------------------------------------
83 #-----------------------------------------------------------------------------
84
84
85
85
86 class FCClientServiceFactory(FCServiceFactory):
86 class FCClientServiceFactory(FCServiceFactory):
87 """A Foolscap implementation of the client services."""
87 """A Foolscap implementation of the client services."""
88
88
89 cert_file = Unicode(u'ipcontroller-client.pem', config=True)
89 cert_file = Unicode(u'ipcontroller-client.pem', config=True)
90 interfaces = Instance(klass=Config, kw=default_client_interfaces,
90 interfaces = Instance(klass=Config, kw=default_client_interfaces,
91 allow_none=False, config=True)
91 allow_none=False, config=True)
92
92
93
93
94 class FCEngineServiceFactory(FCServiceFactory):
94 class FCEngineServiceFactory(FCServiceFactory):
95 """A Foolscap implementation of the engine services."""
95 """A Foolscap implementation of the engine services."""
96
96
97 cert_file = Unicode(u'ipcontroller-engine.pem', config=True)
97 cert_file = Unicode(u'ipcontroller-engine.pem', config=True)
98 interfaces = Instance(klass=dict, kw=default_engine_interfaces,
98 interfaces = Instance(klass=dict, kw=default_engine_interfaces,
99 allow_none=False, config=True)
99 allow_none=False, config=True)
100
100
101
101
102 #-----------------------------------------------------------------------------
102 #-----------------------------------------------------------------------------
103 # The main application
103 # The main application
104 #-----------------------------------------------------------------------------
104 #-----------------------------------------------------------------------------
105
105
106
106
107 cl_args = (
107 cl_args = (
108 # Client config
108 # Client config
109 (('--client-ip',), dict(
109 (('--client-ip',), dict(
110 type=str, dest='FCClientServiceFactory.ip', default=NoConfigDefault,
110 type=str, dest='FCClientServiceFactory.ip', default=NoConfigDefault,
111 help='The IP address or hostname the controller will listen on for '
111 help='The IP address or hostname the controller will listen on for '
112 'client connections.',
112 'client connections.',
113 metavar='FCClientServiceFactory.ip')
113 metavar='FCClientServiceFactory.ip')
114 ),
114 ),
115 (('--client-port',), dict(
115 (('--client-port',), dict(
116 type=int, dest='FCClientServiceFactory.port', default=NoConfigDefault,
116 type=int, dest='FCClientServiceFactory.port', default=NoConfigDefault,
117 help='The port the controller will listen on for client connections. '
117 help='The port the controller will listen on for client connections. '
118 'The default is to use 0, which will autoselect an open port.',
118 'The default is to use 0, which will autoselect an open port.',
119 metavar='FCClientServiceFactory.port')
119 metavar='FCClientServiceFactory.port')
120 ),
120 ),
121 (('--client-location',), dict(
121 (('--client-location',), dict(
122 type=str, dest='FCClientServiceFactory.location', default=NoConfigDefault,
122 type=str, dest='FCClientServiceFactory.location', default=NoConfigDefault,
123 help='The hostname or IP that clients should connect to. This does '
123 help='The hostname or IP that clients should connect to. This does '
124 'not control which interface the controller listens on. Instead, this '
124 'not control which interface the controller listens on. Instead, this '
125 'determines the hostname/IP that is listed in the FURL, which is how '
125 'determines the hostname/IP that is listed in the FURL, which is how '
126 'clients know where to connect. Useful if the controller is listening '
126 'clients know where to connect. Useful if the controller is listening '
127 'on multiple interfaces.',
127 'on multiple interfaces.',
128 metavar='FCClientServiceFactory.location')
128 metavar='FCClientServiceFactory.location')
129 ),
129 ),
130 # Engine config
130 # Engine config
131 (('--engine-ip',), dict(
131 (('--engine-ip',), dict(
132 type=str, dest='FCEngineServiceFactory.ip', default=NoConfigDefault,
132 type=str, dest='FCEngineServiceFactory.ip', default=NoConfigDefault,
133 help='The IP address or hostname the controller will listen on for '
133 help='The IP address or hostname the controller will listen on for '
134 'engine connections.',
134 'engine connections.',
135 metavar='FCEngineServiceFactory.ip')
135 metavar='FCEngineServiceFactory.ip')
136 ),
136 ),
137 (('--engine-port',), dict(
137 (('--engine-port',), dict(
138 type=int, dest='FCEngineServiceFactory.port', default=NoConfigDefault,
138 type=int, dest='FCEngineServiceFactory.port', default=NoConfigDefault,
139 help='The port the controller will listen on for engine connections. '
139 help='The port the controller will listen on for engine connections. '
140 'The default is to use 0, which will autoselect an open port.',
140 'The default is to use 0, which will autoselect an open port.',
141 metavar='FCEngineServiceFactory.port')
141 metavar='FCEngineServiceFactory.port')
142 ),
142 ),
143 (('--engine-location',), dict(
143 (('--engine-location',), dict(
144 type=str, dest='FCEngineServiceFactory.location', default=NoConfigDefault,
144 type=str, dest='FCEngineServiceFactory.location', default=NoConfigDefault,
145 help='The hostname or IP that engines should connect to. This does '
145 help='The hostname or IP that engines should connect to. This does '
146 'not control which interface the controller listens on. Instead, this '
146 'not control which interface the controller listens on. Instead, this '
147 'determines the hostname/IP that is listed in the FURL, which is how '
147 'determines the hostname/IP that is listed in the FURL, which is how '
148 'engines know where to connect. Useful if the controller is listening '
148 'engines know where to connect. Useful if the controller is listening '
149 'on multiple interfaces.',
149 'on multiple interfaces.',
150 metavar='FCEngineServiceFactory.location')
150 metavar='FCEngineServiceFactory.location')
151 ),
151 ),
152 # Global config
152 # Global config
153 (('--log-to-file',), dict(
153 (('--log-to-file',), dict(
154 action='store_true', dest='Global.log_to_file', default=NoConfigDefault,
154 action='store_true', dest='Global.log_to_file', default=NoConfigDefault,
155 help='Log to a file in the log directory (default is stdout)')
155 help='Log to a file in the log directory (default is stdout)')
156 ),
156 ),
157 (('-r','--reuse-furls'), dict(
157 (('-r','--reuse-furls'), dict(
158 action='store_true', dest='Global.reuse_furls', default=NoConfigDefault,
158 action='store_true', dest='Global.reuse_furls', default=NoConfigDefault,
159 help='Try to reuse all FURL files. If this is not set all FURL files '
159 help='Try to reuse all FURL files. If this is not set all FURL files '
160 'are deleted before the controller starts. This must be set if '
160 'are deleted before the controller starts. This must be set if '
161 'specific ports are specified by --engine-port or --client-port.')
161 'specific ports are specified by --engine-port or --client-port.')
162 ),
162 ),
163 (('--no-secure',), dict(
163 (('--no-secure',), dict(
164 action='store_false', dest='Global.secure', default=NoConfigDefault,
164 action='store_false', dest='Global.secure', default=NoConfigDefault,
165 help='Turn off SSL encryption for all connections.')
165 help='Turn off SSL encryption for all connections.')
166 ),
166 ),
167 (('--secure',), dict(
167 (('--secure',), dict(
168 action='store_true', dest='Global.secure', default=NoConfigDefault,
168 action='store_true', dest='Global.secure', default=NoConfigDefault,
169 help='Turn off SSL encryption for all connections.')
169 help='Turn off SSL encryption for all connections.')
170 )
170 )
171 )
171 )
172
172
173
173
174 class IPControllerAppCLConfigLoader(AppWithClusterDirArgParseConfigLoader):
174 class IPControllerAppCLConfigLoader(AppWithClusterDirArgParseConfigLoader):
175
175
176 arguments = cl_args
176 arguments = cl_args
177
177
178
178
179 default_config_file_name = u'ipcontroller_config.py'
179 default_config_file_name = u'ipcontroller_config.py'
180
180
181
181
182 class IPControllerApp(ApplicationWithClusterDir):
182 class IPControllerApp(ApplicationWithClusterDir):
183
183
184 name = u'ipcontroller'
184 name = u'ipcontroller'
185 description = 'Start the IPython controller for parallel computing.'
185 description = 'Start the IPython controller for parallel computing.'
186 config_file_name = default_config_file_name
186 config_file_name = default_config_file_name
187 auto_create_cluster_dir = True
187 auto_create_cluster_dir = True
188
188
189 def create_default_config(self):
189 def create_default_config(self):
190 super(IPControllerApp, self).create_default_config()
190 super(IPControllerApp, self).create_default_config()
191 self.default_config.Global.reuse_furls = False
191 self.default_config.Global.reuse_furls = False
192 self.default_config.Global.secure = True
192 self.default_config.Global.secure = True
193 self.default_config.Global.import_statements = []
193 self.default_config.Global.import_statements = []
194 self.default_config.Global.clean_logs = True
194 self.default_config.Global.clean_logs = True
195
195
196 def create_command_line_config(self):
196 def create_command_line_config(self):
197 """Create and return a command line config loader."""
197 """Create and return a command line config loader."""
198 return IPControllerAppCLConfigLoader(
198 return IPControllerAppCLConfigLoader(
199 description=self.description,
199 description=self.description,
200 version=release.version
200 version=release.version
201 )
201 )
202
202
203 def post_load_command_line_config(self):
203 def post_load_command_line_config(self):
204 # Now setup reuse_furls
204 # Now setup reuse_furls
205 c = self.command_line_config
205 c = self.command_line_config
206 if hasattr(c.Global, 'reuse_furls'):
206 if hasattr(c.Global, 'reuse_furls'):
207 c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
207 c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
208 c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
208 c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
209 del c.Global.reuse_furls
209 del c.Global.reuse_furls
210 if hasattr(c.Global, 'secure'):
210 if hasattr(c.Global, 'secure'):
211 c.FCClientServiceFactory.secure = c.Global.secure
211 c.FCClientServiceFactory.secure = c.Global.secure
212 c.FCEngineServiceFactory.secure = c.Global.secure
212 c.FCEngineServiceFactory.secure = c.Global.secure
213 del c.Global.secure
213 del c.Global.secure
214
214
215 def construct(self):
215 def construct(self):
216 # I am a little hesitant to put these into InteractiveShell itself.
216 # I am a little hesitant to put these into InteractiveShell itself.
217 # But that might be the place for them
217 # But that might be the place for them
218 sys.path.insert(0, '')
218 sys.path.insert(0, '')
219
219
220 self.start_logging()
220 self.start_logging()
221 self.import_statements()
221 self.import_statements()
222
222
223 # Create the service hierarchy
223 # Create the service hierarchy
224 self.main_service = service.MultiService()
224 self.main_service = service.MultiService()
225 # The controller service
225 # The controller service
226 controller_service = controllerservice.ControllerService()
226 controller_service = controllerservice.ControllerService()
227 controller_service.setServiceParent(self.main_service)
227 controller_service.setServiceParent(self.main_service)
228 # The client tub and all its refereceables
228 # The client tub and all its refereceables
229 csfactory = FCClientServiceFactory(self.master_config, controller_service)
229 csfactory = FCClientServiceFactory(self.master_config, controller_service)
230 client_service = csfactory.create()
230 client_service = csfactory.create()
231 client_service.setServiceParent(self.main_service)
231 client_service.setServiceParent(self.main_service)
232 # The engine tub
232 # The engine tub
233 esfactory = FCEngineServiceFactory(self.master_config, controller_service)
233 esfactory = FCEngineServiceFactory(self.master_config, controller_service)
234 engine_service = esfactory.create()
234 engine_service = esfactory.create()
235 engine_service.setServiceParent(self.main_service)
235 engine_service.setServiceParent(self.main_service)
236
236
237 def import_statements(self):
237 def import_statements(self):
238 statements = self.master_config.Global.import_statements
238 statements = self.master_config.Global.import_statements
239 for s in statements:
239 for s in statements:
240 try:
240 try:
241 log.msg("Executing statement: '%s'" % s)
241 log.msg("Executing statement: '%s'" % s)
242 exec s in globals(), locals()
242 exec s in globals(), locals()
243 except:
243 except:
244 log.msg("Error running statement: %s" % s)
244 log.msg("Error running statement: %s" % s)
245
245
246 def start_app(self):
246 def start_app(self):
247 # Start the controller service.
247 # Start the controller service.
248 self.main_service.startService()
248 self.main_service.startService()
249 # Write the .pid file overwriting old ones. This allow multiple
249 # Write the .pid file overwriting old ones. This allow multiple
250 # controllers to clober each other. But Windows is not cleaning
250 # controllers to clober each other. But Windows is not cleaning
251 # these up properly.
251 # these up properly.
252 self.write_pid_file(overwrite=True)
252 self.write_pid_file(overwrite=True)
253 # cd to the cluster_dir as our working directory.
253 # cd to the cluster_dir as our working directory.
254 os.chdir(self.master_config.Global.cluster_dir)
254 os.chdir(self.master_config.Global.cluster_dir)
255 # Add a trigger to delete the .pid file upon shutting down.
255 # Add a trigger to delete the .pid file upon shutting down.
256 reactor.addSystemEventTrigger('during','shutdown', self.remove_pid_file)
256 reactor.addSystemEventTrigger('during','shutdown', self.remove_pid_file)
257 reactor.run()
257 reactor.run()
258
258
259
259
260 def launch_new_instance():
260 def launch_new_instance():
261 """Create and run the IPython controller"""
261 """Create and run the IPython controller"""
262 app = IPControllerApp()
262 app = IPControllerApp()
263 app.start()
263 app.start()
264
264
265
265
266 if __name__ == '__main__':
266 if __name__ == '__main__':
267 launch_new_instance()
267 launch_new_instance()
268
268
@@ -1,808 +1,815 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching processing asynchronously.
4 Facilities for launching processing asynchronously.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import re
19 import re
20 import sys
20 import sys
21
21
22 from IPython.core.component import Component
22 from IPython.core.component import Component
23 from IPython.external import Itpl
23 from IPython.external import Itpl
24 from IPython.utils.traitlets import Str, Int, List, Unicode, Enum
24 from IPython.utils.traitlets import Str, Int, List, Unicode, Enum
25 from IPython.utils.platutils import find_cmd
25 from IPython.utils.platutils import find_cmd
26 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
26 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
27 from IPython.kernel.winhpcjob import (
27 from IPython.kernel.winhpcjob import (
28 WinHPCJob, WinHPCTask,
28 WinHPCJob, WinHPCTask,
29 IPControllerTask, IPEngineTask
29 IPControllerTask, IPEngineTask
30 )
30 )
31
31
32 from twisted.internet import reactor, defer
32 from twisted.internet import reactor, defer
33 from twisted.internet.defer import inlineCallbacks
33 from twisted.internet.defer import inlineCallbacks
34 from twisted.internet.protocol import ProcessProtocol
34 from twisted.internet.protocol import ProcessProtocol
35 from twisted.internet.utils import getProcessOutput
35 from twisted.internet.utils import getProcessOutput
36 from twisted.internet.error import ProcessDone, ProcessTerminated
36 from twisted.internet.error import ProcessDone, ProcessTerminated
37 from twisted.python import log
37 from twisted.python import log
38 from twisted.python.failure import Failure
38 from twisted.python.failure import Failure
39
39
40 #-----------------------------------------------------------------------------
40 #-----------------------------------------------------------------------------
41 # Generic launchers
41 # Generic launchers
42 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
43
43
44
44
45 class LauncherError(Exception):
45 class LauncherError(Exception):
46 pass
46 pass
47
47
48
48
49 class ProcessStateError(LauncherError):
49 class ProcessStateError(LauncherError):
50 pass
50 pass
51
51
52
52
53 class UnknownStatus(LauncherError):
53 class UnknownStatus(LauncherError):
54 pass
54 pass
55
55
56
56
57 class BaseLauncher(Component):
57 class BaseLauncher(Component):
58 """An asbtraction for starting, stopping and signaling a process."""
58 """An asbtraction for starting, stopping and signaling a process."""
59
59
60 # A directory for files related to the process. But, we don't cd to
60 # A directory for files related to the process. But, we don't cd to
61 # this directory,
61 # this directory,
62 working_dir = Unicode(u'')
62 working_dir = Unicode(u'')
63
63
64 def __init__(self, working_dir, parent=None, name=None, config=None):
64 def __init__(self, working_dir, parent=None, name=None, config=None):
65 super(BaseLauncher, self).__init__(parent, name, config)
65 super(BaseLauncher, self).__init__(parent, name, config)
66 self.working_dir = working_dir
66 self.working_dir = working_dir
67 self.state = 'before' # can be before, running, after
67 self.state = 'before' # can be before, running, after
68 self.stop_deferreds = []
68 self.stop_deferreds = []
69 self.start_data = None
69 self.start_data = None
70 self.stop_data = None
70 self.stop_data = None
71
71
72 @property
72 @property
73 def args(self):
73 def args(self):
74 """A list of cmd and args that will be used to start the process.
74 """A list of cmd and args that will be used to start the process.
75
75
76 This is what is passed to :func:`spawnProcess` and the first element
76 This is what is passed to :func:`spawnProcess` and the first element
77 will be the process name.
77 will be the process name.
78 """
78 """
79 return self.find_args()
79 return self.find_args()
80
80
81 def find_args(self):
81 def find_args(self):
82 """The ``.args`` property calls this to find the args list.
82 """The ``.args`` property calls this to find the args list.
83
83
84 Subcommand should implement this to construct the cmd and args.
84 Subcommand should implement this to construct the cmd and args.
85 """
85 """
86 raise NotImplementedError('find_args must be implemented in a subclass')
86 raise NotImplementedError('find_args must be implemented in a subclass')
87
87
88 @property
88 @property
89 def arg_str(self):
89 def arg_str(self):
90 """The string form of the program arguments."""
90 """The string form of the program arguments."""
91 return ' '.join(self.args)
91 return ' '.join(self.args)
92
92
93 @property
93 @property
94 def running(self):
94 def running(self):
95 """Am I running."""
95 """Am I running."""
96 if self.state == 'running':
96 if self.state == 'running':
97 return True
97 return True
98 else:
98 else:
99 return False
99 return False
100
100
101 def start(self):
101 def start(self):
102 """Start the process.
102 """Start the process.
103
103
104 This must return a deferred that fires with information about the
104 This must return a deferred that fires with information about the
105 process starting (like a pid, job id, etc.).
105 process starting (like a pid, job id, etc.).
106 """
106 """
107 return defer.fail(
107 return defer.fail(
108 Failure(NotImplementedError(
108 Failure(NotImplementedError(
109 'start must be implemented in a subclass')
109 'start must be implemented in a subclass')
110 )
110 )
111 )
111 )
112
112
113 def stop(self):
113 def stop(self):
114 """Stop the process and notify observers of stopping.
114 """Stop the process and notify observers of stopping.
115
115
116 This must return a deferred that fires with information about the
116 This must return a deferred that fires with information about the
117 processing stopping, like errors that occur while the process is
117 processing stopping, like errors that occur while the process is
118 attempting to be shut down. This deferred won't fire when the process
118 attempting to be shut down. This deferred won't fire when the process
119 actually stops. To observe the actual process stopping, see
119 actually stops. To observe the actual process stopping, see
120 :func:`observe_stop`.
120 :func:`observe_stop`.
121 """
121 """
122 return defer.fail(
122 return defer.fail(
123 Failure(NotImplementedError(
123 Failure(NotImplementedError(
124 'stop must be implemented in a subclass')
124 'stop must be implemented in a subclass')
125 )
125 )
126 )
126 )
127
127
128 def observe_stop(self):
128 def observe_stop(self):
129 """Get a deferred that will fire when the process stops.
129 """Get a deferred that will fire when the process stops.
130
130
131 The deferred will fire with data that contains information about
131 The deferred will fire with data that contains information about
132 the exit status of the process.
132 the exit status of the process.
133 """
133 """
134 if self.state=='after':
134 if self.state=='after':
135 return defer.succeed(self.stop_data)
135 return defer.succeed(self.stop_data)
136 else:
136 else:
137 d = defer.Deferred()
137 d = defer.Deferred()
138 self.stop_deferreds.append(d)
138 self.stop_deferreds.append(d)
139 return d
139 return d
140
140
141 def notify_start(self, data):
141 def notify_start(self, data):
142 """Call this to trigger startup actions.
142 """Call this to trigger startup actions.
143
143
144 This logs the process startup and sets the state to 'running'. It is
144 This logs the process startup and sets the state to 'running'. It is
145 a pass-through so it can be used as a callback.
145 a pass-through so it can be used as a callback.
146 """
146 """
147
147
148 log.msg('Process %r started: %r' % (self.args[0], data))
148 log.msg('Process %r started: %r' % (self.args[0], data))
149 self.start_data = data
149 self.start_data = data
150 self.state = 'running'
150 self.state = 'running'
151 return data
151 return data
152
152
153 def notify_stop(self, data):
153 def notify_stop(self, data):
154 """Call this to trigger process stop actions.
154 """Call this to trigger process stop actions.
155
155
156 This logs the process stopping and sets the state to 'after'. Call
156 This logs the process stopping and sets the state to 'after'. Call
157 this to trigger all the deferreds from :func:`observe_stop`."""
157 this to trigger all the deferreds from :func:`observe_stop`."""
158
158
159 log.msg('Process %r stopped: %r' % (self.args[0], data))
159 log.msg('Process %r stopped: %r' % (self.args[0], data))
160 self.stop_data = data
160 self.stop_data = data
161 self.state = 'after'
161 self.state = 'after'
162 for i in range(len(self.stop_deferreds)):
162 for i in range(len(self.stop_deferreds)):
163 d = self.stop_deferreds.pop()
163 d = self.stop_deferreds.pop()
164 d.callback(data)
164 d.callback(data)
165 return data
165 return data
166
166
167 def signal(self, sig):
167 def signal(self, sig):
168 """Signal the process.
168 """Signal the process.
169
169
170 Return a semi-meaningless deferred after signaling the process.
170 Return a semi-meaningless deferred after signaling the process.
171
171
172 Parameters
172 Parameters
173 ----------
173 ----------
174 sig : str or int
174 sig : str or int
175 'KILL', 'INT', etc., or any signal number
175 'KILL', 'INT', etc., or any signal number
176 """
176 """
177 return defer.fail(
177 return defer.fail(
178 Failure(NotImplementedError(
178 Failure(NotImplementedError(
179 'signal must be implemented in a subclass')
179 'signal must be implemented in a subclass')
180 )
180 )
181 )
181 )
182
182
183
183
184 class LocalProcessLauncherProtocol(ProcessProtocol):
184 class LocalProcessLauncherProtocol(ProcessProtocol):
185 """A ProcessProtocol to go with the LocalProcessLauncher."""
185 """A ProcessProtocol to go with the LocalProcessLauncher."""
186
186
187 def __init__(self, process_launcher):
187 def __init__(self, process_launcher):
188 self.process_launcher = process_launcher
188 self.process_launcher = process_launcher
189 self.pid = None
189 self.pid = None
190
190
191 def connectionMade(self):
191 def connectionMade(self):
192 self.pid = self.transport.pid
192 self.pid = self.transport.pid
193 self.process_launcher.notify_start(self.transport.pid)
193 self.process_launcher.notify_start(self.transport.pid)
194
194
195 def processEnded(self, status):
195 def processEnded(self, status):
196 value = status.value
196 value = status.value
197 if isinstance(value, ProcessDone):
197 if isinstance(value, ProcessDone):
198 self.process_launcher.notify_stop(
198 self.process_launcher.notify_stop(
199 {'exit_code':0,
199 {'exit_code':0,
200 'signal':None,
200 'signal':None,
201 'status':None,
201 'status':None,
202 'pid':self.pid
202 'pid':self.pid
203 }
203 }
204 )
204 )
205 elif isinstance(value, ProcessTerminated):
205 elif isinstance(value, ProcessTerminated):
206 self.process_launcher.notify_stop(
206 self.process_launcher.notify_stop(
207 {'exit_code':value.exitCode,
207 {'exit_code':value.exitCode,
208 'signal':value.signal,
208 'signal':value.signal,
209 'status':value.status,
209 'status':value.status,
210 'pid':self.pid
210 'pid':self.pid
211 }
211 }
212 )
212 )
213 else:
213 else:
214 raise UnknownStatus("Unknown exit status, this is probably a "
214 raise UnknownStatus("Unknown exit status, this is probably a "
215 "bug in Twisted")
215 "bug in Twisted")
216
216
217 def outReceived(self, data):
217 def outReceived(self, data):
218 log.msg(data)
218 log.msg(data)
219
219
220 def errReceived(self, data):
220 def errReceived(self, data):
221 log.err(data)
221 log.err(data)
222
222
223
223
224 class LocalProcessLauncher(BaseLauncher):
224 class LocalProcessLauncher(BaseLauncher):
225 """Start and stop an external process in an asynchronous manner."""
225 """Start and stop an external process in an asynchronous manner."""
226
226
227 # This is used to to construct self.args, which is passed to
227 # This is used to to construct self.args, which is passed to
228 # spawnProcess.
228 # spawnProcess.
229 cmd_and_args = List([])
229 cmd_and_args = List([])
230
230
231 def __init__(self, working_dir, parent=None, name=None, config=None):
231 def __init__(self, working_dir, parent=None, name=None, config=None):
232 super(LocalProcessLauncher, self).__init__(
232 super(LocalProcessLauncher, self).__init__(
233 working_dir, parent, name, config
233 working_dir, parent, name, config
234 )
234 )
235 self.process_protocol = None
235 self.process_protocol = None
236 self.start_deferred = None
236 self.start_deferred = None
237
237
238 def find_args(self):
238 def find_args(self):
239 return self.cmd_and_args
239 return self.cmd_and_args
240
240
241 def start(self):
241 def start(self):
242 if self.state == 'before':
242 if self.state == 'before':
243 self.process_protocol = LocalProcessLauncherProtocol(self)
243 self.process_protocol = LocalProcessLauncherProtocol(self)
244 self.start_deferred = defer.Deferred()
244 self.start_deferred = defer.Deferred()
245 self.process_transport = reactor.spawnProcess(
245 self.process_transport = reactor.spawnProcess(
246 self.process_protocol,
246 self.process_protocol,
247 str(self.args[0]), # twisted expects these to be str, not unicode
247 str(self.args[0]), # twisted expects these to be str, not unicode
248 [str(a) for a in self.args], # str expected, not unicode
248 [str(a) for a in self.args], # str expected, not unicode
249 env=os.environ
249 env=os.environ
250 )
250 )
251 return self.start_deferred
251 return self.start_deferred
252 else:
252 else:
253 s = 'The process was already started and has state: %r' % self.state
253 s = 'The process was already started and has state: %r' % self.state
254 return defer.fail(ProcessStateError(s))
254 return defer.fail(ProcessStateError(s))
255
255
256 def notify_start(self, data):
256 def notify_start(self, data):
257 super(LocalProcessLauncher, self).notify_start(data)
257 super(LocalProcessLauncher, self).notify_start(data)
258 self.start_deferred.callback(data)
258 self.start_deferred.callback(data)
259
259
260 def stop(self):
260 def stop(self):
261 return self.interrupt_then_kill()
261 return self.interrupt_then_kill()
262
262
263 @make_deferred
263 @make_deferred
264 def signal(self, sig):
264 def signal(self, sig):
265 if self.state == 'running':
265 if self.state == 'running':
266 self.process_transport.signalProcess(sig)
266 self.process_transport.signalProcess(sig)
267
267
268 @inlineCallbacks
268 @inlineCallbacks
269 def interrupt_then_kill(self, delay=2.0):
269 def interrupt_then_kill(self, delay=2.0):
270 """Send INT, wait a delay and then send KILL."""
270 """Send INT, wait a delay and then send KILL."""
271 yield self.signal('INT')
271 yield self.signal('INT')
272 yield sleep_deferred(delay)
272 yield sleep_deferred(delay)
273 yield self.signal('KILL')
273 yield self.signal('KILL')
274
274
275
275
276 class MPIExecLauncher(LocalProcessLauncher):
276 class MPIExecLauncher(LocalProcessLauncher):
277 """Launch an external process using mpiexec."""
277 """Launch an external process using mpiexec."""
278
278
279 # The mpiexec command to use in starting the process.
279 # The mpiexec command to use in starting the process.
280 mpi_cmd = List(['mpiexec'], config=True)
280 mpi_cmd = List(['mpiexec'], config=True)
281 # The command line arguments to pass to mpiexec.
281 # The command line arguments to pass to mpiexec.
282 mpi_args = List([], config=True)
282 mpi_args = List([], config=True)
283 # The program to start using mpiexec.
283 # The program to start using mpiexec.
284 program = List(['date'], config=True)
284 program = List(['date'], config=True)
285 # The command line argument to the program.
285 # The command line argument to the program.
286 program_args = List([], config=True)
286 program_args = List([], config=True)
287 # The number of instances of the program to start.
287 # The number of instances of the program to start.
288 n = Int(1, config=True)
288 n = Int(1, config=True)
289
289
290 def find_args(self):
290 def find_args(self):
291 """Build self.args using all the fields."""
291 """Build self.args using all the fields."""
292 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
292 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
293 self.program + self.program_args
293 self.program + self.program_args
294
294
295 def start(self, n):
295 def start(self, n):
296 """Start n instances of the program using mpiexec."""
296 """Start n instances of the program using mpiexec."""
297 self.n = n
297 self.n = n
298 return super(MPIExecLauncher, self).start()
298 return super(MPIExecLauncher, self).start()
299
299
300
300
301 class SSHLauncher(BaseLauncher):
301 class SSHLauncher(BaseLauncher):
302 """A minimal launcher for ssh.
302 """A minimal launcher for ssh.
303
303
304 To be useful this will probably have to be extended to use the ``sshx``
304 To be useful this will probably have to be extended to use the ``sshx``
305 idea for environment variables. There could be other things this needs
305 idea for environment variables. There could be other things this needs
306 as well.
306 as well.
307 """
307 """
308
308
309 ssh_cmd = List(['ssh'], config=True)
309 ssh_cmd = List(['ssh'], config=True)
310 ssh_args = List([], config=True)
310 ssh_args = List([], config=True)
311 program = List(['date'], config=True)
311 program = List(['date'], config=True)
312 program_args = List([], config=True)
312 program_args = List([], config=True)
313 hostname = Str('', config=True)
313 hostname = Str('', config=True)
314 user = Str('', config=True)
314 user = Str('', config=True)
315 location = Str('')
315 location = Str('')
316
316
317 def _hostname_changed(self, name, old, new):
317 def _hostname_changed(self, name, old, new):
318 self.location = '%s@%s' % (self.user, new)
318 self.location = '%s@%s' % (self.user, new)
319
319
320 def _user_changed(self, name, old, new):
320 def _user_changed(self, name, old, new):
321 self.location = '%s@%s' % (new, self.hostname)
321 self.location = '%s@%s' % (new, self.hostname)
322
322
323 def find_args(self):
323 def find_args(self):
324 return self.ssh_cmd + self.ssh_args + [self.location] + \
324 return self.ssh_cmd + self.ssh_args + [self.location] + \
325 self.program + self.program_args
325 self.program + self.program_args
326
326
327 def start(self, n, hostname=None, user=None):
327 def start(self, n, hostname=None, user=None):
328 if hostname is not None:
328 if hostname is not None:
329 self.hostname = hostname
329 self.hostname = hostname
330 if user is not None:
330 if user is not None:
331 self.user = user
331 self.user = user
332 return super(SSHLauncher, self).start()
332 return super(SSHLauncher, self).start()
333
333
334
334
335 # This is only used on Windows.
336 if os.name=='nt':
337 job_cmd = find_cmd('job')
338 else:
339 job_cmd = 'job'
340
341
335 class WindowsHPCLauncher(BaseLauncher):
342 class WindowsHPCLauncher(BaseLauncher):
336
343
337 # A regular expression used to get the job id from the output of the
344 # A regular expression used to get the job id from the output of the
338 # submit_command.
345 # submit_command.
339 job_id_regexp = Str('\d+', config=True)
346 job_id_regexp = Str('\d+', config=True)
340 # The filename of the instantiated job script.
347 # The filename of the instantiated job script.
341 job_file_name = Unicode(u'ipython_job.xml', config=True)
348 job_file_name = Unicode(u'ipython_job.xml', config=True)
342 # The full path to the instantiated job script. This gets made dynamically
349 # The full path to the instantiated job script. This gets made dynamically
343 # by combining the working_dir with the job_file_name.
350 # by combining the working_dir with the job_file_name.
344 job_file = Unicode(u'')
351 job_file = Unicode(u'')
345 # The hostname of the scheduler to submit the job to
352 # The hostname of the scheduler to submit the job to
346 scheduler = Str('HEADNODE', config=True)
353 scheduler = Str('HEADNODE', config=True)
347 username = Str(os.environ.get('USERNAME', ''), config=True)
354 username = Str(os.environ.get('USERNAME', ''), config=True)
348 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
355 priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
349 default_value='Highest', config=True)
356 default_value='Highest', config=True)
350 requested_nodes = Str('', config=True)
357 requested_nodes = Str('', config=True)
351 project = Str('MyProject', config=True)
358 project = Str('MyProject', config=True)
352 job_cmd = Str(find_cmd('job'), config=True)
359 job_cmd = Str(job_cmd, config=True)
353
360
354 def __init__(self, working_dir, parent=None, name=None, config=None):
361 def __init__(self, working_dir, parent=None, name=None, config=None):
355 super(WindowsHPCLauncher, self).__init__(
362 super(WindowsHPCLauncher, self).__init__(
356 working_dir, parent, name, config
363 working_dir, parent, name, config
357 )
364 )
358 self.job_file = os.path.join(self.working_dir, self.job_file_name)
365 self.job_file = os.path.join(self.working_dir, self.job_file_name)
359
366
360 def write_job_file(self, n):
367 def write_job_file(self, n):
361 raise NotImplementedError("Implement write_job_file in a subclass.")
368 raise NotImplementedError("Implement write_job_file in a subclass.")
362
369
363 def find_args(self):
370 def find_args(self):
364 return ['job.exe']
371 return ['job.exe']
365
372
366 def parse_job_id(self, output):
373 def parse_job_id(self, output):
367 """Take the output of the submit command and return the job id."""
374 """Take the output of the submit command and return the job id."""
368 m = re.search(self.job_id_regexp, output)
375 m = re.search(self.job_id_regexp, output)
369 if m is not None:
376 if m is not None:
370 job_id = m.group()
377 job_id = m.group()
371 else:
378 else:
372 raise LauncherError("Job id couldn't be determined: %s" % output)
379 raise LauncherError("Job id couldn't be determined: %s" % output)
373 self.job_id = job_id
380 self.job_id = job_id
374 log.msg('Job started with job id: %r' % job_id)
381 log.msg('Job started with job id: %r' % job_id)
375 return job_id
382 return job_id
376
383
377 @inlineCallbacks
384 @inlineCallbacks
378 def start(self, n):
385 def start(self, n):
379 """Start n copies of the process using the Win HPC job scheduler."""
386 """Start n copies of the process using the Win HPC job scheduler."""
380 self.write_job_file(n)
387 self.write_job_file(n)
381 args = [
388 args = [
382 'submit',
389 'submit',
383 '/jobfile:%s' % self.job_file,
390 '/jobfile:%s' % self.job_file,
384 '/scheduler:%s' % self.scheduler
391 '/scheduler:%s' % self.scheduler
385 ]
392 ]
386 log.msg("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
393 log.msg("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
387 output = yield getProcessOutput(self.job_cmd,
394 output = yield getProcessOutput(self.job_cmd,
388 args,
395 args,
389 env=os.environ,
396 env=os.environ,
390 path=self.working_dir
397 path=self.working_dir
391 )
398 )
392 job_id = self.parse_job_id(output)
399 job_id = self.parse_job_id(output)
393 self.notify_start(job_id)
400 self.notify_start(job_id)
394 defer.returnValue(job_id)
401 defer.returnValue(job_id)
395
402
396 @inlineCallbacks
403 @inlineCallbacks
397 def stop(self):
404 def stop(self):
398 args = [
405 args = [
399 'cancel',
406 'cancel',
400 self.job_id,
407 self.job_id,
401 '/scheduler:%s' % self.scheduler
408 '/scheduler:%s' % self.scheduler
402 ]
409 ]
403 log.msg("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
410 log.msg("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
404 try:
411 try:
405 output = yield getProcessOutput(self.job_cmd,
412 output = yield getProcessOutput(self.job_cmd,
406 args,
413 args,
407 env=os.environ,
414 env=os.environ,
408 path=self.working_dir
415 path=self.working_dir
409 )
416 )
410 except:
417 except:
411 output = 'The job already appears to be stoppped: %r' % self.job_id
418 output = 'The job already appears to be stoppped: %r' % self.job_id
412 self.notify_stop(output) # Pass the output of the kill cmd
419 self.notify_stop(output) # Pass the output of the kill cmd
413 defer.returnValue(output)
420 defer.returnValue(output)
414
421
415
422
416 class BatchSystemLauncher(BaseLauncher):
423 class BatchSystemLauncher(BaseLauncher):
417 """Launch an external process using a batch system.
424 """Launch an external process using a batch system.
418
425
419 This class is designed to work with UNIX batch systems like PBS, LSF,
426 This class is designed to work with UNIX batch systems like PBS, LSF,
420 GridEngine, etc. The overall model is that there are different commands
427 GridEngine, etc. The overall model is that there are different commands
421 like qsub, qdel, etc. that handle the starting and stopping of the process.
428 like qsub, qdel, etc. that handle the starting and stopping of the process.
422
429
423 This class also has the notion of a batch script. The ``batch_template``
430 This class also has the notion of a batch script. The ``batch_template``
424 attribute can be set to a string that is a template for the batch script.
431 attribute can be set to a string that is a template for the batch script.
425 This template is instantiated using Itpl. Thus the template can use
432 This template is instantiated using Itpl. Thus the template can use
426 ${n} fot the number of instances. Subclasses can add additional variables
433 ${n} fot the number of instances. Subclasses can add additional variables
427 to the template dict.
434 to the template dict.
428 """
435 """
429
436
430 # Subclasses must fill these in. See PBSEngineSet
437 # Subclasses must fill these in. See PBSEngineSet
431 # The name of the command line program used to submit jobs.
438 # The name of the command line program used to submit jobs.
432 submit_command = Str('', config=True)
439 submit_command = Str('', config=True)
433 # The name of the command line program used to delete jobs.
440 # The name of the command line program used to delete jobs.
434 delete_command = Str('', config=True)
441 delete_command = Str('', config=True)
435 # A regular expression used to get the job id from the output of the
442 # A regular expression used to get the job id from the output of the
436 # submit_command.
443 # submit_command.
437 job_id_regexp = Str('', config=True)
444 job_id_regexp = Str('', config=True)
438 # The string that is the batch script template itself.
445 # The string that is the batch script template itself.
439 batch_template = Str('', config=True)
446 batch_template = Str('', config=True)
440 # The filename of the instantiated batch script.
447 # The filename of the instantiated batch script.
441 batch_file_name = Unicode(u'batch_script', config=True)
448 batch_file_name = Unicode(u'batch_script', config=True)
442 # The full path to the instantiated batch script.
449 # The full path to the instantiated batch script.
443 batch_file = Unicode(u'')
450 batch_file = Unicode(u'')
444
451
445 def __init__(self, working_dir, parent=None, name=None, config=None):
452 def __init__(self, working_dir, parent=None, name=None, config=None):
446 super(BatchSystemLauncher, self).__init__(
453 super(BatchSystemLauncher, self).__init__(
447 working_dir, parent, name, config
454 working_dir, parent, name, config
448 )
455 )
449 self.batch_file = os.path.join(self.working_dir, self.batch_file_name)
456 self.batch_file = os.path.join(self.working_dir, self.batch_file_name)
450 self.context = {}
457 self.context = {}
451
458
452 def parse_job_id(self, output):
459 def parse_job_id(self, output):
453 """Take the output of the submit command and return the job id."""
460 """Take the output of the submit command and return the job id."""
454 m = re.match(self.job_id_regexp, output)
461 m = re.match(self.job_id_regexp, output)
455 if m is not None:
462 if m is not None:
456 job_id = m.group()
463 job_id = m.group()
457 else:
464 else:
458 raise LauncherError("Job id couldn't be determined: %s" % output)
465 raise LauncherError("Job id couldn't be determined: %s" % output)
459 self.job_id = job_id
466 self.job_id = job_id
460 log.msg('Job started with job id: %r' % job_id)
467 log.msg('Job started with job id: %r' % job_id)
461 return job_id
468 return job_id
462
469
463 def write_batch_script(self, n):
470 def write_batch_script(self, n):
464 """Instantiate and write the batch script to the working_dir."""
471 """Instantiate and write the batch script to the working_dir."""
465 self.context['n'] = n
472 self.context['n'] = n
466 script_as_string = Itpl.itplns(self.batch_template, self.context)
473 script_as_string = Itpl.itplns(self.batch_template, self.context)
467 log.msg('Writing instantiated batch script: %s' % self.batch_file)
474 log.msg('Writing instantiated batch script: %s' % self.batch_file)
468 f = open(self.batch_file, 'w')
475 f = open(self.batch_file, 'w')
469 f.write(script_as_string)
476 f.write(script_as_string)
470 f.close()
477 f.close()
471
478
472 @inlineCallbacks
479 @inlineCallbacks
473 def start(self, n):
480 def start(self, n):
474 """Start n copies of the process using a batch system."""
481 """Start n copies of the process using a batch system."""
475 self.write_batch_script(n)
482 self.write_batch_script(n)
476 output = yield getProcessOutput(self.submit_command,
483 output = yield getProcessOutput(self.submit_command,
477 [self.batch_file], env=os.environ)
484 [self.batch_file], env=os.environ)
478 job_id = self.parse_job_id(output)
485 job_id = self.parse_job_id(output)
479 self.notify_start(job_id)
486 self.notify_start(job_id)
480 defer.returnValue(job_id)
487 defer.returnValue(job_id)
481
488
482 @inlineCallbacks
489 @inlineCallbacks
483 def stop(self):
490 def stop(self):
484 output = yield getProcessOutput(self.delete_command,
491 output = yield getProcessOutput(self.delete_command,
485 [self.job_id], env=os.environ
492 [self.job_id], env=os.environ
486 )
493 )
487 self.notify_stop(output) # Pass the output of the kill cmd
494 self.notify_stop(output) # Pass the output of the kill cmd
488 defer.returnValue(output)
495 defer.returnValue(output)
489
496
490
497
491 class PBSLauncher(BatchSystemLauncher):
498 class PBSLauncher(BatchSystemLauncher):
492 """A BatchSystemLauncher subclass for PBS."""
499 """A BatchSystemLauncher subclass for PBS."""
493
500
494 submit_command = Str('qsub', config=True)
501 submit_command = Str('qsub', config=True)
495 delete_command = Str('qdel', config=True)
502 delete_command = Str('qdel', config=True)
496 job_id_regexp = Str('\d+', config=True)
503 job_id_regexp = Str('\d+', config=True)
497 batch_template = Str('', config=True)
504 batch_template = Str('', config=True)
498 batch_file_name = Unicode(u'pbs_batch_script', config=True)
505 batch_file_name = Unicode(u'pbs_batch_script', config=True)
499 batch_file = Unicode(u'')
506 batch_file = Unicode(u'')
500
507
501
508
502 #-----------------------------------------------------------------------------
509 #-----------------------------------------------------------------------------
503 # Controller launchers
510 # Controller launchers
504 #-----------------------------------------------------------------------------
511 #-----------------------------------------------------------------------------
505
512
506 def find_controller_cmd():
513 def find_controller_cmd():
507 """Find the command line ipcontroller program in a cross platform way."""
514 """Find the command line ipcontroller program in a cross platform way."""
508 if sys.platform == 'win32':
515 if sys.platform == 'win32':
509 # This logic is needed because the ipcontroller script doesn't
516 # This logic is needed because the ipcontroller script doesn't
510 # always get installed in the same way or in the same location.
517 # always get installed in the same way or in the same location.
511 from IPython.kernel import ipcontrollerapp
518 from IPython.kernel import ipcontrollerapp
512 script_location = ipcontrollerapp.__file__.replace('.pyc', '.py')
519 script_location = ipcontrollerapp.__file__.replace('.pyc', '.py')
513 # The -u option here turns on unbuffered output, which is required
520 # The -u option here turns on unbuffered output, which is required
514 # on Win32 to prevent wierd conflict and problems with Twisted.
521 # on Win32 to prevent wierd conflict and problems with Twisted.
515 # Also, use sys.executable to make sure we are picking up the
522 # Also, use sys.executable to make sure we are picking up the
516 # right python exe.
523 # right python exe.
517 cmd = [sys.executable, '-u', script_location]
524 cmd = [sys.executable, '-u', script_location]
518 else:
525 else:
519 # ipcontroller has to be on the PATH in this case.
526 # ipcontroller has to be on the PATH in this case.
520 cmd = ['ipcontroller']
527 cmd = ['ipcontroller']
521 return cmd
528 return cmd
522
529
523
530
524 class LocalControllerLauncher(LocalProcessLauncher):
531 class LocalControllerLauncher(LocalProcessLauncher):
525 """Launch a controller as a regular external process."""
532 """Launch a controller as a regular external process."""
526
533
527 controller_cmd = List(find_controller_cmd(), config=True)
534 controller_cmd = List(find_controller_cmd(), config=True)
528 # Command line arguments to ipcontroller.
535 # Command line arguments to ipcontroller.
529 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
536 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
530
537
531 def find_args(self):
538 def find_args(self):
532 return self.controller_cmd + self.controller_args
539 return self.controller_cmd + self.controller_args
533
540
534 def start(self, profile=None, cluster_dir=None):
541 def start(self, profile=None, cluster_dir=None):
535 """Start the controller by profile or cluster_dir."""
542 """Start the controller by profile or cluster_dir."""
536 if cluster_dir is not None:
543 if cluster_dir is not None:
537 self.controller_args.extend(['--cluster-dir', cluster_dir])
544 self.controller_args.extend(['--cluster-dir', cluster_dir])
538 if profile is not None:
545 if profile is not None:
539 self.controller_args.extend(['--profile', profile])
546 self.controller_args.extend(['--profile', profile])
540 log.msg("Starting LocalControllerLauncher: %r" % self.args)
547 log.msg("Starting LocalControllerLauncher: %r" % self.args)
541 return super(LocalControllerLauncher, self).start()
548 return super(LocalControllerLauncher, self).start()
542
549
543
550
544 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
551 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
545
552
546 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
553 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
547 extra_args = List([],config=False)
554 extra_args = List([],config=False)
548
555
549 def write_job_file(self, n):
556 def write_job_file(self, n):
550 job = WinHPCJob(self)
557 job = WinHPCJob(self)
551 job.job_name = "IPController"
558 job.job_name = "IPController"
552 job.username = self.username
559 job.username = self.username
553 job.priority = self.priority
560 job.priority = self.priority
554 job.requested_nodes = self.requested_nodes
561 job.requested_nodes = self.requested_nodes
555 job.project = self.project
562 job.project = self.project
556
563
557 t = IPControllerTask(self)
564 t = IPControllerTask(self)
558 t.work_directory = self.working_dir
565 t.work_directory = self.working_dir
559 # Add the --profile and --cluster-dir args from start.
566 # Add the --profile and --cluster-dir args from start.
560 t.controller_args.extend(self.extra_args)
567 t.controller_args.extend(self.extra_args)
561 job.add_task(t)
568 job.add_task(t)
562 log.msg("Writing job description file: %s" % self.job_file)
569 log.msg("Writing job description file: %s" % self.job_file)
563 job.write(self.job_file)
570 job.write(self.job_file)
564
571
565 def start(self, profile=None, cluster_dir=None):
572 def start(self, profile=None, cluster_dir=None):
566 """Start the controller by profile or cluster_dir."""
573 """Start the controller by profile or cluster_dir."""
567 if cluster_dir is not None:
574 if cluster_dir is not None:
568 self.extra_args = ['--cluster-dir', cluster_dir]
575 self.extra_args = ['--cluster-dir', cluster_dir]
569 if profile is not None:
576 if profile is not None:
570 self.extra_args = ['--profile', profile]
577 self.extra_args = ['--profile', profile]
571 return super(WindowsHPCControllerLauncher, self).start(1)
578 return super(WindowsHPCControllerLauncher, self).start(1)
572
579
573
580
574 class MPIExecControllerLauncher(MPIExecLauncher):
581 class MPIExecControllerLauncher(MPIExecLauncher):
575 """Launch a controller using mpiexec."""
582 """Launch a controller using mpiexec."""
576
583
577 controller_cmd = List(find_controller_cmd(), config=True)
584 controller_cmd = List(find_controller_cmd(), config=True)
578 # Command line arguments to ipcontroller.
585 # Command line arguments to ipcontroller.
579 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
586 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
580 n = Int(1, config=False)
587 n = Int(1, config=False)
581
588
582 def start(self, profile=None, cluster_dir=None):
589 def start(self, profile=None, cluster_dir=None):
583 """Start the controller by profile or cluster_dir."""
590 """Start the controller by profile or cluster_dir."""
584 if cluster_dir is not None:
591 if cluster_dir is not None:
585 self.controller_args.extend(['--cluster-dir', cluster_dir])
592 self.controller_args.extend(['--cluster-dir', cluster_dir])
586 if profile is not None:
593 if profile is not None:
587 self.controller_args.extend(['--profile', profile])
594 self.controller_args.extend(['--profile', profile])
588 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
595 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
589 return super(MPIExecControllerLauncher, self).start(1)
596 return super(MPIExecControllerLauncher, self).start(1)
590
597
591 def find_args(self):
598 def find_args(self):
592 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
599 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
593 self.controller_cmd + self.controller_args
600 self.controller_cmd + self.controller_args
594
601
595
602
596 class PBSControllerLauncher(PBSLauncher):
603 class PBSControllerLauncher(PBSLauncher):
597 """Launch a controller using PBS."""
604 """Launch a controller using PBS."""
598
605
599 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
606 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
600
607
601 def start(self, profile=None, cluster_dir=None):
608 def start(self, profile=None, cluster_dir=None):
602 """Start the controller by profile or cluster_dir."""
609 """Start the controller by profile or cluster_dir."""
603 # Here we save profile and cluster_dir in the context so they
610 # Here we save profile and cluster_dir in the context so they
604 # can be used in the batch script template as ${profile} and
611 # can be used in the batch script template as ${profile} and
605 # ${cluster_dir}
612 # ${cluster_dir}
606 if cluster_dir is not None:
613 if cluster_dir is not None:
607 self.context['cluster_dir'] = cluster_dir
614 self.context['cluster_dir'] = cluster_dir
608 if profile is not None:
615 if profile is not None:
609 self.context['profile'] = profile
616 self.context['profile'] = profile
610 log.msg("Starting PBSControllerLauncher: %r" % self.args)
617 log.msg("Starting PBSControllerLauncher: %r" % self.args)
611 return super(PBSControllerLauncher, self).start(1)
618 return super(PBSControllerLauncher, self).start(1)
612
619
613
620
614 class SSHControllerLauncher(SSHLauncher):
621 class SSHControllerLauncher(SSHLauncher):
615 pass
622 pass
616
623
617
624
618 #-----------------------------------------------------------------------------
625 #-----------------------------------------------------------------------------
619 # Engine launchers
626 # Engine launchers
620 #-----------------------------------------------------------------------------
627 #-----------------------------------------------------------------------------
621
628
622
629
623 def find_engine_cmd():
630 def find_engine_cmd():
624 """Find the command line ipengine program in a cross platform way."""
631 """Find the command line ipengine program in a cross platform way."""
625 if sys.platform == 'win32':
632 if sys.platform == 'win32':
626 # This logic is needed because the ipengine script doesn't
633 # This logic is needed because the ipengine script doesn't
627 # always get installed in the same way or in the same location.
634 # always get installed in the same way or in the same location.
628 from IPython.kernel import ipengineapp
635 from IPython.kernel import ipengineapp
629 script_location = ipengineapp.__file__.replace('.pyc', '.py')
636 script_location = ipengineapp.__file__.replace('.pyc', '.py')
630 # The -u option here turns on unbuffered output, which is required
637 # The -u option here turns on unbuffered output, which is required
631 # on Win32 to prevent wierd conflict and problems with Twisted.
638 # on Win32 to prevent wierd conflict and problems with Twisted.
632 # Also, use sys.executable to make sure we are picking up the
639 # Also, use sys.executable to make sure we are picking up the
633 # right python exe.
640 # right python exe.
634 cmd = [sys.executable, '-u', script_location]
641 cmd = [sys.executable, '-u', script_location]
635 else:
642 else:
636 # ipcontroller has to be on the PATH in this case.
643 # ipcontroller has to be on the PATH in this case.
637 cmd = ['ipengine']
644 cmd = ['ipengine']
638 return cmd
645 return cmd
639
646
640
647
641 class LocalEngineLauncher(LocalProcessLauncher):
648 class LocalEngineLauncher(LocalProcessLauncher):
642 """Launch a single engine as a regular externall process."""
649 """Launch a single engine as a regular externall process."""
643
650
644 engine_cmd = List(find_engine_cmd(), config=True)
651 engine_cmd = List(find_engine_cmd(), config=True)
645 # Command line arguments for ipengine.
652 # Command line arguments for ipengine.
646 engine_args = List(
653 engine_args = List(
647 ['--log-to-file','--log-level', '40'], config=True
654 ['--log-to-file','--log-level', '40'], config=True
648 )
655 )
649
656
650 def find_args(self):
657 def find_args(self):
651 return self.engine_cmd + self.engine_args
658 return self.engine_cmd + self.engine_args
652
659
653 def start(self, profile=None, cluster_dir=None):
660 def start(self, profile=None, cluster_dir=None):
654 """Start the engine by profile or cluster_dir."""
661 """Start the engine by profile or cluster_dir."""
655 if cluster_dir is not None:
662 if cluster_dir is not None:
656 self.engine_args.extend(['--cluster-dir', cluster_dir])
663 self.engine_args.extend(['--cluster-dir', cluster_dir])
657 if profile is not None:
664 if profile is not None:
658 self.engine_args.extend(['--profile', profile])
665 self.engine_args.extend(['--profile', profile])
659 return super(LocalEngineLauncher, self).start()
666 return super(LocalEngineLauncher, self).start()
660
667
661
668
662 class LocalEngineSetLauncher(BaseLauncher):
669 class LocalEngineSetLauncher(BaseLauncher):
663 """Launch a set of engines as regular external processes."""
670 """Launch a set of engines as regular external processes."""
664
671
665 # Command line arguments for ipengine.
672 # Command line arguments for ipengine.
666 engine_args = List(
673 engine_args = List(
667 ['--log-to-file','--log-level', '40'], config=True
674 ['--log-to-file','--log-level', '40'], config=True
668 )
675 )
669
676
670 def __init__(self, working_dir, parent=None, name=None, config=None):
677 def __init__(self, working_dir, parent=None, name=None, config=None):
671 super(LocalEngineSetLauncher, self).__init__(
678 super(LocalEngineSetLauncher, self).__init__(
672 working_dir, parent, name, config
679 working_dir, parent, name, config
673 )
680 )
674 self.launchers = []
681 self.launchers = []
675
682
676 def start(self, n, profile=None, cluster_dir=None):
683 def start(self, n, profile=None, cluster_dir=None):
677 """Start n engines by profile or cluster_dir."""
684 """Start n engines by profile or cluster_dir."""
678 dlist = []
685 dlist = []
679 for i in range(n):
686 for i in range(n):
680 el = LocalEngineLauncher(self.working_dir, self)
687 el = LocalEngineLauncher(self.working_dir, self)
681 # Copy the engine args over to each engine launcher.
688 # Copy the engine args over to each engine launcher.
682 import copy
689 import copy
683 el.engine_args = copy.deepcopy(self.engine_args)
690 el.engine_args = copy.deepcopy(self.engine_args)
684 d = el.start(profile, cluster_dir)
691 d = el.start(profile, cluster_dir)
685 if i==0:
692 if i==0:
686 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
693 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
687 self.launchers.append(el)
694 self.launchers.append(el)
688 dlist.append(d)
695 dlist.append(d)
689 # The consumeErrors here could be dangerous
696 # The consumeErrors here could be dangerous
690 dfinal = gatherBoth(dlist, consumeErrors=True)
697 dfinal = gatherBoth(dlist, consumeErrors=True)
691 dfinal.addCallback(self.notify_start)
698 dfinal.addCallback(self.notify_start)
692 return dfinal
699 return dfinal
693
700
694 def find_args(self):
701 def find_args(self):
695 return ['engine set']
702 return ['engine set']
696
703
697 def signal(self, sig):
704 def signal(self, sig):
698 dlist = []
705 dlist = []
699 for el in self.launchers:
706 for el in self.launchers:
700 d = el.signal(sig)
707 d = el.signal(sig)
701 dlist.append(d)
708 dlist.append(d)
702 dfinal = gatherBoth(dlist, consumeErrors=True)
709 dfinal = gatherBoth(dlist, consumeErrors=True)
703 return dfinal
710 return dfinal
704
711
705 def interrupt_then_kill(self, delay=1.0):
712 def interrupt_then_kill(self, delay=1.0):
706 dlist = []
713 dlist = []
707 for el in self.launchers:
714 for el in self.launchers:
708 d = el.interrupt_then_kill(delay)
715 d = el.interrupt_then_kill(delay)
709 dlist.append(d)
716 dlist.append(d)
710 dfinal = gatherBoth(dlist, consumeErrors=True)
717 dfinal = gatherBoth(dlist, consumeErrors=True)
711 return dfinal
718 return dfinal
712
719
713 def stop(self):
720 def stop(self):
714 return self.interrupt_then_kill()
721 return self.interrupt_then_kill()
715
722
716 def observe_stop(self):
723 def observe_stop(self):
717 dlist = [el.observe_stop() for el in self.launchers]
724 dlist = [el.observe_stop() for el in self.launchers]
718 dfinal = gatherBoth(dlist, consumeErrors=False)
725 dfinal = gatherBoth(dlist, consumeErrors=False)
719 dfinal.addCallback(self.notify_stop)
726 dfinal.addCallback(self.notify_stop)
720 return dfinal
727 return dfinal
721
728
722
729
723 class MPIExecEngineSetLauncher(MPIExecLauncher):
730 class MPIExecEngineSetLauncher(MPIExecLauncher):
724
731
725 engine_cmd = List(find_engine_cmd(), config=True)
732 engine_cmd = List(find_engine_cmd(), config=True)
726 # Command line arguments for ipengine.
733 # Command line arguments for ipengine.
727 engine_args = List(
734 engine_args = List(
728 ['--log-to-file','--log-level', '40'], config=True
735 ['--log-to-file','--log-level', '40'], config=True
729 )
736 )
730 n = Int(1, config=True)
737 n = Int(1, config=True)
731
738
732 def start(self, n, profile=None, cluster_dir=None):
739 def start(self, n, profile=None, cluster_dir=None):
733 """Start n engines by profile or cluster_dir."""
740 """Start n engines by profile or cluster_dir."""
734 if cluster_dir is not None:
741 if cluster_dir is not None:
735 self.engine_args.extend(['--cluster-dir', cluster_dir])
742 self.engine_args.extend(['--cluster-dir', cluster_dir])
736 if profile is not None:
743 if profile is not None:
737 self.engine_args.extend(['--profile', profile])
744 self.engine_args.extend(['--profile', profile])
738 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
745 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
739 return super(MPIExecEngineSetLauncher, self).start(n)
746 return super(MPIExecEngineSetLauncher, self).start(n)
740
747
741 def find_args(self):
748 def find_args(self):
742 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
749 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
743 self.engine_cmd + self.engine_args
750 self.engine_cmd + self.engine_args
744
751
745
752
746 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
753 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
747 pass
754 pass
748
755
749
756
750 class PBSEngineSetLauncher(PBSLauncher):
757 class PBSEngineSetLauncher(PBSLauncher):
751
758
752 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
759 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
753
760
754 def start(self, n, profile=None, cluster_dir=None):
761 def start(self, n, profile=None, cluster_dir=None):
755 """Start n engines by profile or cluster_dir."""
762 """Start n engines by profile or cluster_dir."""
756 if cluster_dir is not None:
763 if cluster_dir is not None:
757 self.program_args.extend(['--cluster-dir', cluster_dir])
764 self.program_args.extend(['--cluster-dir', cluster_dir])
758 if profile is not None:
765 if profile is not None:
759 self.program_args.extend(['-p', profile])
766 self.program_args.extend(['-p', profile])
760 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
767 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
761 return super(PBSEngineSetLauncher, self).start(n)
768 return super(PBSEngineSetLauncher, self).start(n)
762
769
763
770
764 class SSHEngineSetLauncher(BaseLauncher):
771 class SSHEngineSetLauncher(BaseLauncher):
765 pass
772 pass
766
773
767
774
768 #-----------------------------------------------------------------------------
775 #-----------------------------------------------------------------------------
769 # A launcher for ipcluster itself!
776 # A launcher for ipcluster itself!
770 #-----------------------------------------------------------------------------
777 #-----------------------------------------------------------------------------
771
778
772
779
773 def find_ipcluster_cmd():
780 def find_ipcluster_cmd():
774 """Find the command line ipcluster program in a cross platform way."""
781 """Find the command line ipcluster program in a cross platform way."""
775 if sys.platform == 'win32':
782 if sys.platform == 'win32':
776 # This logic is needed because the ipcluster script doesn't
783 # This logic is needed because the ipcluster script doesn't
777 # always get installed in the same way or in the same location.
784 # always get installed in the same way or in the same location.
778 from IPython.kernel import ipclusterapp
785 from IPython.kernel import ipclusterapp
779 script_location = ipclusterapp.__file__.replace('.pyc', '.py')
786 script_location = ipclusterapp.__file__.replace('.pyc', '.py')
780 # The -u option here turns on unbuffered output, which is required
787 # The -u option here turns on unbuffered output, which is required
781 # on Win32 to prevent wierd conflict and problems with Twisted.
788 # on Win32 to prevent wierd conflict and problems with Twisted.
782 # Also, use sys.executable to make sure we are picking up the
789 # Also, use sys.executable to make sure we are picking up the
783 # right python exe.
790 # right python exe.
784 cmd = [sys.executable, '-u', script_location]
791 cmd = [sys.executable, '-u', script_location]
785 else:
792 else:
786 # ipcontroller has to be on the PATH in this case.
793 # ipcontroller has to be on the PATH in this case.
787 cmd = ['ipcluster']
794 cmd = ['ipcluster']
788 return cmd
795 return cmd
789
796
790
797
791 class IPClusterLauncher(LocalProcessLauncher):
798 class IPClusterLauncher(LocalProcessLauncher):
792 """Launch the ipcluster program in an external process."""
799 """Launch the ipcluster program in an external process."""
793
800
794 ipcluster_cmd = List(find_ipcluster_cmd(), config=True)
801 ipcluster_cmd = List(find_ipcluster_cmd(), config=True)
795 # Command line arguments to pass to ipcluster.
802 # Command line arguments to pass to ipcluster.
796 ipcluster_args = List(
803 ipcluster_args = List(
797 ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True)
804 ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True)
798 ipcluster_subcommand = Str('start')
805 ipcluster_subcommand = Str('start')
799 ipcluster_n = Int(2)
806 ipcluster_n = Int(2)
800
807
801 def find_args(self):
808 def find_args(self):
802 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
809 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
803 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
810 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
804
811
805 def start(self):
812 def start(self):
806 log.msg("Starting ipcluster: %r" % self.args)
813 log.msg("Starting ipcluster: %r" % self.args)
807 return super(IPClusterLauncher, self).start()
814 return super(IPClusterLauncher, self).start()
808
815
General Comments 0
You need to be logged in to leave comments. Login now