##// END OF EJS Templates
Minor fixes to get Win HPC support working fully.
bgranger -
Show More
@@ -1,412 +1,412 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import logging
18 import logging
19 import os
19 import os
20 import signal
20 import signal
21 import sys
21 import sys
22
22
23 if os.name=='posix':
23 if os.name=='posix':
24 from twisted.scripts._twistd_unix import daemonize
24 from twisted.scripts._twistd_unix import daemonize
25
25
26 from IPython.core import release
26 from IPython.core import release
27 from IPython.external import argparse
27 from IPython.external import argparse
28 from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault
28 from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault
29 from IPython.utils.importstring import import_item
29 from IPython.utils.importstring import import_item
30
30
31 from IPython.kernel.clusterdir import (
31 from IPython.kernel.clusterdir import (
32 ApplicationWithClusterDir, ClusterDirError, PIDFileError
32 ApplicationWithClusterDir, ClusterDirError, PIDFileError
33 )
33 )
34
34
35 from twisted.internet import reactor
35 from twisted.internet import reactor
36 from twisted.python import log
36 from twisted.python import log
37
37
38
38
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40 # The ipcluster application
40 # The ipcluster application
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42
42
43
43
44 # Exit codes for ipcluster
44 # Exit codes for ipcluster
45
45
46 # This will be the exit code if the ipcluster appears to be running because
46 # This will be the exit code if the ipcluster appears to be running because
47 # a .pid file exists
47 # a .pid file exists
48 ALREADY_STARTED = 10
48 ALREADY_STARTED = 10
49
49
50 # This will be the exit code if ipcluster stop is run, but there is not .pid
50 # This will be the exit code if ipcluster stop is run, but there is not .pid
51 # file to be found.
51 # file to be found.
52 ALREADY_STOPPED = 11
52 ALREADY_STOPPED = 11
53
53
54
54
55 class IPClusterCLLoader(ArgParseConfigLoader):
55 class IPClusterCLLoader(ArgParseConfigLoader):
56
56
57 def _add_arguments(self):
57 def _add_arguments(self):
58 # This has all the common options that all subcommands use
58 # This has all the common options that all subcommands use
59 parent_parser1 = argparse.ArgumentParser(add_help=False)
59 parent_parser1 = argparse.ArgumentParser(add_help=False)
60 parent_parser1.add_argument('--ipython-dir',
60 parent_parser1.add_argument('--ipython-dir',
61 dest='Global.ipython_dir',type=unicode,
61 dest='Global.ipython_dir',type=unicode,
62 help='Set to override default location of Global.ipython_dir.',
62 help='Set to override default location of Global.ipython_dir.',
63 default=NoConfigDefault,
63 default=NoConfigDefault,
64 metavar='Global.ipython_dir')
64 metavar='Global.ipython_dir')
65 parent_parser1.add_argument('--log-level',
65 parent_parser1.add_argument('--log-level',
66 dest="Global.log_level",type=int,
66 dest="Global.log_level",type=int,
67 help='Set the log level (0,10,20,30,40,50). Default is 30.',
67 help='Set the log level (0,10,20,30,40,50). Default is 30.',
68 default=NoConfigDefault,
68 default=NoConfigDefault,
69 metavar='Global.log_level')
69 metavar='Global.log_level')
70
70
71 # This has all the common options that other subcommands use
71 # This has all the common options that other subcommands use
72 parent_parser2 = argparse.ArgumentParser(add_help=False)
72 parent_parser2 = argparse.ArgumentParser(add_help=False)
73 parent_parser2.add_argument('-p','--profile',
73 parent_parser2.add_argument('-p','--profile',
74 dest='Global.profile',type=unicode,
74 dest='Global.profile',type=unicode,
75 default=NoConfigDefault,
75 default=NoConfigDefault,
76 help='The string name of the profile to be used. This determines '
76 help='The string name of the profile to be used. This determines '
77 'the name of the cluster dir as: cluster_<profile>. The default profile '
77 'the name of the cluster dir as: cluster_<profile>. The default profile '
78 'is named "default". The cluster directory is resolve this way '
78 'is named "default". The cluster directory is resolve this way '
79 'if the --cluster-dir option is not used.',
79 'if the --cluster-dir option is not used.',
80 default=NoConfigDefault,
80 default=NoConfigDefault,
81 metavar='Global.profile')
81 metavar='Global.profile')
82 parent_parser2.add_argument('--cluster-dir',
82 parent_parser2.add_argument('--cluster-dir',
83 dest='Global.cluster_dir',type=unicode,
83 dest='Global.cluster_dir',type=unicode,
84 default=NoConfigDefault,
84 default=NoConfigDefault,
85 help='Set the cluster dir. This overrides the logic used by the '
85 help='Set the cluster dir. This overrides the logic used by the '
86 '--profile option.',
86 '--profile option.',
87 default=NoConfigDefault,
87 default=NoConfigDefault,
88 metavar='Global.cluster_dir'),
88 metavar='Global.cluster_dir'),
89 parent_parser2.add_argument('--working-dir',
89 parent_parser2.add_argument('--working-dir',
90 dest='Global.working_dir',type=unicode,
90 dest='Global.working_dir',type=unicode,
91 help='Set the working dir for the process.',
91 help='Set the working dir for the process.',
92 default=NoConfigDefault,
92 default=NoConfigDefault,
93 metavar='Global.working_dir')
93 metavar='Global.working_dir')
94 parent_parser2.add_argument('--log-to-file',
94 parent_parser2.add_argument('--log-to-file',
95 action='store_true', dest='Global.log_to_file',
95 action='store_true', dest='Global.log_to_file',
96 default=NoConfigDefault,
96 default=NoConfigDefault,
97 help='Log to a file in the log directory (default is stdout)'
97 help='Log to a file in the log directory (default is stdout)'
98 )
98 )
99
99
100 subparsers = self.parser.add_subparsers(
100 subparsers = self.parser.add_subparsers(
101 dest='Global.subcommand',
101 dest='Global.subcommand',
102 title='ipcluster subcommands',
102 title='ipcluster subcommands',
103 description='ipcluster has a variety of subcommands. '
103 description='ipcluster has a variety of subcommands. '
104 'The general way of running ipcluster is "ipcluster <cmd> '
104 'The general way of running ipcluster is "ipcluster <cmd> '
105 ' [options]""',
105 ' [options]""',
106 help='For more help, type "ipcluster <cmd> -h"')
106 help='For more help, type "ipcluster <cmd> -h"')
107
107
108 parser_list = subparsers.add_parser(
108 parser_list = subparsers.add_parser(
109 'list',
109 'list',
110 help='List all clusters in cwd and ipython_dir.',
110 help='List all clusters in cwd and ipython_dir.',
111 parents=[parent_parser1]
111 parents=[parent_parser1]
112 )
112 )
113
113
114 parser_create = subparsers.add_parser(
114 parser_create = subparsers.add_parser(
115 'create',
115 'create',
116 help='Create a new cluster directory.',
116 help='Create a new cluster directory.',
117 parents=[parent_parser1, parent_parser2]
117 parents=[parent_parser1, parent_parser2]
118 )
118 )
119 parser_create.add_argument(
119 parser_create.add_argument(
120 '--reset-config',
120 '--reset-config',
121 dest='Global.reset_config', action='store_true',
121 dest='Global.reset_config', action='store_true',
122 default=NoConfigDefault,
122 default=NoConfigDefault,
123 help='Recopy the default config files to the cluster directory. '
123 help='Recopy the default config files to the cluster directory. '
124 'You will loose any modifications you have made to these files.'
124 'You will loose any modifications you have made to these files.'
125 )
125 )
126
126
127 parser_start = subparsers.add_parser(
127 parser_start = subparsers.add_parser(
128 'start',
128 'start',
129 help='Start a cluster.',
129 help='Start a cluster.',
130 parents=[parent_parser1, parent_parser2]
130 parents=[parent_parser1, parent_parser2]
131 )
131 )
132 parser_start.add_argument(
132 parser_start.add_argument(
133 '-n', '--number',
133 '-n', '--number',
134 type=int, dest='Global.n',
134 type=int, dest='Global.n',
135 default=NoConfigDefault,
135 default=NoConfigDefault,
136 help='The number of engines to start.',
136 help='The number of engines to start.',
137 metavar='Global.n'
137 metavar='Global.n'
138 )
138 )
139 parser_start.add_argument('--clean-logs',
139 parser_start.add_argument('--clean-logs',
140 dest='Global.clean_logs', action='store_true',
140 dest='Global.clean_logs', action='store_true',
141 help='Delete old log flies before starting.',
141 help='Delete old log flies before starting.',
142 default=NoConfigDefault
142 default=NoConfigDefault
143 )
143 )
144 parser_start.add_argument('--no-clean-logs',
144 parser_start.add_argument('--no-clean-logs',
145 dest='Global.clean_logs', action='store_false',
145 dest='Global.clean_logs', action='store_false',
146 help="Don't delete old log flies before starting.",
146 help="Don't delete old log flies before starting.",
147 default=NoConfigDefault
147 default=NoConfigDefault
148 )
148 )
149 parser_start.add_argument('--daemon',
149 parser_start.add_argument('--daemon',
150 dest='Global.daemonize', action='store_true',
150 dest='Global.daemonize', action='store_true',
151 help='Daemonize the ipcluster program. This implies --log-to-file',
151 help='Daemonize the ipcluster program. This implies --log-to-file',
152 default=NoConfigDefault
152 default=NoConfigDefault
153 )
153 )
154 parser_start.add_argument('--no-daemon',
154 parser_start.add_argument('--no-daemon',
155 dest='Global.daemonize', action='store_false',
155 dest='Global.daemonize', action='store_false',
156 help="Dont't daemonize the ipcluster program.",
156 help="Dont't daemonize the ipcluster program.",
157 default=NoConfigDefault
157 default=NoConfigDefault
158 )
158 )
159
159
160 parser_start = subparsers.add_parser(
160 parser_start = subparsers.add_parser(
161 'stop',
161 'stop',
162 help='Stop a cluster.',
162 help='Stop a cluster.',
163 parents=[parent_parser1, parent_parser2]
163 parents=[parent_parser1, parent_parser2]
164 )
164 )
165 parser_start.add_argument('--signal',
165 parser_start.add_argument('--signal',
166 dest='Global.signal', type=int,
166 dest='Global.signal', type=int,
167 help="The signal number to use in stopping the cluster (default=2).",
167 help="The signal number to use in stopping the cluster (default=2).",
168 metavar="Global.signal",
168 metavar="Global.signal",
169 default=NoConfigDefault
169 default=NoConfigDefault
170 )
170 )
171
171
172
172
173 default_config_file_name = u'ipcluster_config.py'
173 default_config_file_name = u'ipcluster_config.py'
174
174
175
175
176 class IPClusterApp(ApplicationWithClusterDir):
176 class IPClusterApp(ApplicationWithClusterDir):
177
177
178 name = u'ipcluster'
178 name = u'ipcluster'
179 description = 'Start an IPython cluster (controller and engines).'
179 description = 'Start an IPython cluster (controller and engines).'
180 config_file_name = default_config_file_name
180 config_file_name = default_config_file_name
181 default_log_level = logging.INFO
181 default_log_level = logging.INFO
182 auto_create_cluster_dir = False
182 auto_create_cluster_dir = False
183
183
184 def create_default_config(self):
184 def create_default_config(self):
185 super(IPClusterApp, self).create_default_config()
185 super(IPClusterApp, self).create_default_config()
186 self.default_config.Global.controller_launcher = \
186 self.default_config.Global.controller_launcher = \
187 'IPython.kernel.launcher.LocalControllerLauncher'
187 'IPython.kernel.launcher.LocalControllerLauncher'
188 self.default_config.Global.engine_launcher = \
188 self.default_config.Global.engine_launcher = \
189 'IPython.kernel.launcher.LocalEngineSetLauncher'
189 'IPython.kernel.launcher.LocalEngineSetLauncher'
190 self.default_config.Global.n = 2
190 self.default_config.Global.n = 2
191 self.default_config.Global.reset_config = False
191 self.default_config.Global.reset_config = False
192 self.default_config.Global.clean_logs = True
192 self.default_config.Global.clean_logs = True
193 self.default_config.Global.signal = 2
193 self.default_config.Global.signal = 2
194 self.default_config.Global.daemonize = False
194 self.default_config.Global.daemonize = False
195
195
196 def create_command_line_config(self):
196 def create_command_line_config(self):
197 """Create and return a command line config loader."""
197 """Create and return a command line config loader."""
198 return IPClusterCLLoader(
198 return IPClusterCLLoader(
199 description=self.description,
199 description=self.description,
200 version=release.version
200 version=release.version
201 )
201 )
202
202
203 def find_resources(self):
203 def find_resources(self):
204 subcommand = self.command_line_config.Global.subcommand
204 subcommand = self.command_line_config.Global.subcommand
205 if subcommand=='list':
205 if subcommand=='list':
206 self.list_cluster_dirs()
206 self.list_cluster_dirs()
207 # Exit immediately because there is nothing left to do.
207 # Exit immediately because there is nothing left to do.
208 self.exit()
208 self.exit()
209 elif subcommand=='create':
209 elif subcommand=='create':
210 self.auto_create_cluster_dir = True
210 self.auto_create_cluster_dir = True
211 super(IPClusterApp, self).find_resources()
211 super(IPClusterApp, self).find_resources()
212 elif subcommand=='start' or subcommand=='stop':
212 elif subcommand=='start' or subcommand=='stop':
213 self.auto_create_cluster_dir = False
213 self.auto_create_cluster_dir = False
214 try:
214 try:
215 super(IPClusterApp, self).find_resources()
215 super(IPClusterApp, self).find_resources()
216 except ClusterDirError:
216 except ClusterDirError:
217 raise ClusterDirError(
217 raise ClusterDirError(
218 "Could not find a cluster directory. A cluster dir must "
218 "Could not find a cluster directory. A cluster dir must "
219 "be created before running 'ipcluster start'. Do "
219 "be created before running 'ipcluster start'. Do "
220 "'ipcluster create -h' or 'ipcluster list -h' for more "
220 "'ipcluster create -h' or 'ipcluster list -h' for more "
221 "information about creating and listing cluster dirs."
221 "information about creating and listing cluster dirs."
222 )
222 )
223
223
224 def list_cluster_dirs(self):
224 def list_cluster_dirs(self):
225 # Find the search paths
225 # Find the search paths
226 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
226 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
227 if cluster_dir_paths:
227 if cluster_dir_paths:
228 cluster_dir_paths = cluster_dir_paths.split(':')
228 cluster_dir_paths = cluster_dir_paths.split(':')
229 else:
229 else:
230 cluster_dir_paths = []
230 cluster_dir_paths = []
231 try:
231 try:
232 ipython_dir = self.command_line_config.Global.ipython_dir
232 ipython_dir = self.command_line_config.Global.ipython_dir
233 except AttributeError:
233 except AttributeError:
234 ipython_dir = self.default_config.Global.ipython_dir
234 ipython_dir = self.default_config.Global.ipython_dir
235 paths = [os.getcwd(), ipython_dir] + \
235 paths = [os.getcwd(), ipython_dir] + \
236 cluster_dir_paths
236 cluster_dir_paths
237 paths = list(set(paths))
237 paths = list(set(paths))
238
238
239 self.log.info('Searching for cluster dirs in paths: %r' % paths)
239 self.log.info('Searching for cluster dirs in paths: %r' % paths)
240 for path in paths:
240 for path in paths:
241 files = os.listdir(path)
241 files = os.listdir(path)
242 for f in files:
242 for f in files:
243 full_path = os.path.join(path, f)
243 full_path = os.path.join(path, f)
244 if os.path.isdir(full_path) and f.startswith('cluster_'):
244 if os.path.isdir(full_path) and f.startswith('cluster_'):
245 profile = full_path.split('_')[-1]
245 profile = full_path.split('_')[-1]
246 start_cmd = '"ipcluster start -n 4 -p %s"' % profile
246 start_cmd = '"ipcluster start -n 4 -p %s"' % profile
247 print start_cmd + " ==> " + full_path
247 print start_cmd + " ==> " + full_path
248
248
249 def pre_construct(self):
249 def pre_construct(self):
250 # This is where we cd to the working directory.
250 # This is where we cd to the working directory.
251 super(IPClusterApp, self).pre_construct()
251 super(IPClusterApp, self).pre_construct()
252 config = self.master_config
252 config = self.master_config
253 try:
253 try:
254 daemon = config.Global.daemonize
254 daemon = config.Global.daemonize
255 if daemon:
255 if daemon:
256 config.Global.log_to_file = True
256 config.Global.log_to_file = True
257 except AttributeError:
257 except AttributeError:
258 pass
258 pass
259
259
260 def construct(self):
260 def construct(self):
261 config = self.master_config
261 config = self.master_config
262 if config.Global.subcommand=='list':
262 if config.Global.subcommand=='list':
263 pass
263 pass
264 elif config.Global.subcommand=='create':
264 elif config.Global.subcommand=='create':
265 self.log.info('Copying default config files to cluster directory '
265 self.log.info('Copying default config files to cluster directory '
266 '[overwrite=%r]' % (config.Global.reset_config,))
266 '[overwrite=%r]' % (config.Global.reset_config,))
267 self.cluster_dir_obj.copy_all_config_files(overwrite=config.Global.reset_config)
267 self.cluster_dir_obj.copy_all_config_files(overwrite=config.Global.reset_config)
268 elif config.Global.subcommand=='start':
268 elif config.Global.subcommand=='start':
269 self.start_logging()
269 self.start_logging()
270 reactor.callWhenRunning(self.start_launchers)
270 reactor.callWhenRunning(self.start_launchers)
271
271
272 def start_launchers(self):
272 def start_launchers(self):
273 config = self.master_config
273 config = self.master_config
274
274
275 # Create the launchers
275 # Create the launchers
276 el_class = import_item(config.Global.engine_launcher)
276 el_class = import_item(config.Global.engine_launcher)
277 self.engine_launcher = el_class(
277 self.engine_launcher = el_class(
278 self.cluster_dir, config=config
278 self.cluster_dir, config=config
279 )
279 )
280 cl_class = import_item(config.Global.controller_launcher)
280 cl_class = import_item(config.Global.controller_launcher)
281 self.controller_launcher = cl_class(
281 self.controller_launcher = cl_class(
282 self.cluster_dir, config=config
282 self.cluster_dir, config=config
283 )
283 )
284
284
285 # Setup signals
285 # Setup signals
286 signal.signal(signal.SIGINT, self.stop_launchers)
286 signal.signal(signal.SIGINT, self.stop_launchers)
287
287
288 # Setup the observing of stopping
288 # Setup the observing of stopping
289 d1 = self.controller_launcher.observe_stop()
289 d1 = self.controller_launcher.observe_stop()
290 d1.addCallback(self.stop_engines)
290 d1.addCallback(self.stop_engines)
291 d1.addErrback(self.err_and_stop)
291 d1.addErrback(self.err_and_stop)
292 # If this triggers, just let them die
292 # If this triggers, just let them die
293 # d2 = self.engine_launcher.observe_stop()
293 # d2 = self.engine_launcher.observe_stop()
294
294
295 # Start the controller and engines
295 # Start the controller and engines
296 d = self.controller_launcher.start(
296 d = self.controller_launcher.start(
297 profile=None, cluster_dir=config.Global.cluster_dir
297 cluster_dir=config.Global.cluster_dir
298 )
298 )
299 d.addCallback(lambda _: self.start_engines())
299 d.addCallback(lambda _: self.start_engines())
300 d.addErrback(self.err_and_stop)
300 d.addErrback(self.err_and_stop)
301
301
302 def err_and_stop(self, f):
302 def err_and_stop(self, f):
303 log.msg('Unexpected error in ipcluster:')
303 log.msg('Unexpected error in ipcluster:')
304 log.err(f)
304 log.err(f)
305 reactor.stop()
305 reactor.stop()
306
306
307 def stop_engines(self, r):
307 def stop_engines(self, r):
308 return self.engine_launcher.stop()
308 return self.engine_launcher.stop()
309
309
310 def start_engines(self):
310 def start_engines(self):
311 config = self.master_config
311 config = self.master_config
312 d = self.engine_launcher.start(
312 d = self.engine_launcher.start(
313 config.Global.n,
313 config.Global.n,
314 profile=None, cluster_dir=config.Global.cluster_dir
314 cluster_dir=config.Global.cluster_dir
315 )
315 )
316 return d
316 return d
317
317
318 def stop_launchers(self, signum, frame):
318 def stop_launchers(self, signum, frame):
319 log.msg("Stopping cluster")
319 log.msg("Stopping cluster")
320 d1 = self.engine_launcher.stop()
320 d1 = self.engine_launcher.stop()
321 d2 = self.controller_launcher.stop()
321 d2 = self.controller_launcher.stop()
322 # d1.addCallback(lambda _: self.controller_launcher.stop)
322 # d1.addCallback(lambda _: self.controller_launcher.stop)
323 d1.addErrback(self.err_and_stop)
323 d1.addErrback(self.err_and_stop)
324 d2.addErrback(self.err_and_stop)
324 d2.addErrback(self.err_and_stop)
325 reactor.callLater(2.0, reactor.stop)
325 reactor.callLater(2.0, reactor.stop)
326
326
327 def start_logging(self):
327 def start_logging(self):
328 # Remove old log files
328 # Remove old log files
329 if self.master_config.Global.clean_logs:
329 if self.master_config.Global.clean_logs:
330 log_dir = self.master_config.Global.log_dir
330 log_dir = self.master_config.Global.log_dir
331 for f in os.listdir(log_dir):
331 for f in os.listdir(log_dir):
332 if f.startswith('ipengine' + '-') and f.endswith('.log'):
332 if f.startswith('ipengine' + '-') and f.endswith('.log'):
333 os.remove(os.path.join(log_dir, f))
333 os.remove(os.path.join(log_dir, f))
334 for f in os.listdir(log_dir):
334 for f in os.listdir(log_dir):
335 if f.startswith('ipcontroller' + '-') and f.endswith('.log'):
335 if f.startswith('ipcontroller' + '-') and f.endswith('.log'):
336 os.remove(os.path.join(log_dir, f))
336 os.remove(os.path.join(log_dir, f))
337 super(IPClusterApp, self).start_logging()
337 super(IPClusterApp, self).start_logging()
338
338
339 def start_app(self):
339 def start_app(self):
340 """Start the application, depending on what subcommand is used."""
340 """Start the application, depending on what subcommand is used."""
341 subcmd = self.master_config.Global.subcommand
341 subcmd = self.master_config.Global.subcommand
342 if subcmd=='create' or subcmd=='list':
342 if subcmd=='create' or subcmd=='list':
343 return
343 return
344 elif subcmd=='start':
344 elif subcmd=='start':
345 self.start_app_start()
345 self.start_app_start()
346 elif subcmd=='stop':
346 elif subcmd=='stop':
347 self.start_app_stop()
347 self.start_app_stop()
348
348
349 def start_app_start(self):
349 def start_app_start(self):
350 """Start the app for the start subcommand."""
350 """Start the app for the start subcommand."""
351 config = self.master_config
351 config = self.master_config
352 # First see if the cluster is already running
352 # First see if the cluster is already running
353 try:
353 try:
354 pid = self.get_pid_from_file()
354 pid = self.get_pid_from_file()
355 except PIDFileError:
355 except PIDFileError:
356 pass
356 pass
357 else:
357 else:
358 self.log.critical(
358 self.log.critical(
359 'Cluster is already running with [pid=%s]. '
359 'Cluster is already running with [pid=%s]. '
360 'use "ipcluster stop" to stop the cluster.' % pid
360 'use "ipcluster stop" to stop the cluster.' % pid
361 )
361 )
362 # Here I exit with a unusual exit status that other processes
362 # Here I exit with a unusual exit status that other processes
363 # can watch for to learn how I existed.
363 # can watch for to learn how I existed.
364 self.exit(ALREADY_STARTED)
364 self.exit(ALREADY_STARTED)
365
365
366 # Now log and daemonize
366 # Now log and daemonize
367 self.log.info(
367 self.log.info(
368 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize
368 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize
369 )
369 )
370 if config.Global.daemonize:
370 if config.Global.daemonize:
371 if os.name=='posix':
371 if os.name=='posix':
372 daemonize()
372 daemonize()
373
373
374 # Now write the new pid file AFTER our new forked pid is active.
374 # Now write the new pid file AFTER our new forked pid is active.
375 self.write_pid_file()
375 self.write_pid_file()
376 reactor.addSystemEventTrigger('during','shutdown', self.remove_pid_file)
376 reactor.addSystemEventTrigger('during','shutdown', self.remove_pid_file)
377 reactor.run()
377 reactor.run()
378
378
379 def start_app_stop(self):
379 def start_app_stop(self):
380 """Start the app for the stop subcommand."""
380 """Start the app for the stop subcommand."""
381 config = self.master_config
381 config = self.master_config
382 try:
382 try:
383 pid = self.get_pid_from_file()
383 pid = self.get_pid_from_file()
384 except PIDFileError:
384 except PIDFileError:
385 self.log.critical(
385 self.log.critical(
386 'Problem reading pid file, cluster is probably not running.'
386 'Problem reading pid file, cluster is probably not running.'
387 )
387 )
388 # Here I exit with a unusual exit status that other processes
388 # Here I exit with a unusual exit status that other processes
389 # can watch for to learn how I existed.
389 # can watch for to learn how I existed.
390 self.exit(ALREADY_STOPPED)
390 self.exit(ALREADY_STOPPED)
391 else:
391 else:
392 if os.name=='posix':
392 if os.name=='posix':
393 sig = config.Global.signal
393 sig = config.Global.signal
394 self.log.info(
394 self.log.info(
395 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
395 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
396 )
396 )
397 os.kill(pid, sig)
397 os.kill(pid, sig)
398 elif os.name=='nt':
398 elif os.name=='nt':
399 # As of right now, we don't support daemonize on Windows, so
399 # As of right now, we don't support daemonize on Windows, so
400 # stop will not do anything. Minimally, it should clean up the
400 # stop will not do anything. Minimally, it should clean up the
401 # old .pid files.
401 # old .pid files.
402 self.remove_pid_file()
402 self.remove_pid_file()
403
403
404 def launch_new_instance():
404 def launch_new_instance():
405 """Create and run the IPython cluster."""
405 """Create and run the IPython cluster."""
406 app = IPClusterApp()
406 app = IPClusterApp()
407 app.start()
407 app.start()
408
408
409
409
410 if __name__ == '__main__':
410 if __name__ == '__main__':
411 launch_new_instance()
411 launch_new_instance()
412
412
@@ -1,866 +1,867 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching IPython processes asynchronously.
4 Facilities for launching IPython processes asynchronously.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import re
19 import re
20 import sys
20 import sys
21
21
22 from IPython.core.component import Component
22 from IPython.core.component import Component
23 from IPython.external import Itpl
23 from IPython.external import Itpl
24 from IPython.utils.traitlets import Str, Int, List, Unicode, Enum
24 from IPython.utils.traitlets import Str, Int, List, Unicode, Enum
25 from IPython.utils.platutils import find_cmd
25 from IPython.utils.platutils import find_cmd
26 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
26 from IPython.kernel.twistedutil import gatherBoth, make_deferred, sleep_deferred
27 from IPython.kernel.winhpcjob import (
27 from IPython.kernel.winhpcjob import (
28 WinHPCJob, WinHPCTask,
28 WinHPCJob, WinHPCTask,
29 IPControllerTask, IPEngineTask,
29 IPControllerTask, IPEngineTask,
30 IPControllerJob, IPEngineSetJob
30 IPControllerJob, IPEngineSetJob
31 )
31 )
32
32
33 from twisted.internet import reactor, defer
33 from twisted.internet import reactor, defer
34 from twisted.internet.defer import inlineCallbacks
34 from twisted.internet.defer import inlineCallbacks
35 from twisted.internet.protocol import ProcessProtocol
35 from twisted.internet.protocol import ProcessProtocol
36 from twisted.internet.utils import getProcessOutput
36 from twisted.internet.utils import getProcessOutput
37 from twisted.internet.error import ProcessDone, ProcessTerminated
37 from twisted.internet.error import ProcessDone, ProcessTerminated
38 from twisted.python import log
38 from twisted.python import log
39 from twisted.python.failure import Failure
39 from twisted.python.failure import Failure
40
40
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42 # Utilities
42 # Utilities
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44
44
45
45
46 def find_controller_cmd():
46 def find_controller_cmd():
47 """Find the command line ipcontroller program in a cross platform way."""
47 """Find the command line ipcontroller program in a cross platform way."""
48 if sys.platform == 'win32':
48 if sys.platform == 'win32':
49 # This logic is needed because the ipcontroller script doesn't
49 # This logic is needed because the ipcontroller script doesn't
50 # always get installed in the same way or in the same location.
50 # always get installed in the same way or in the same location.
51 from IPython.kernel import ipcontrollerapp
51 from IPython.kernel import ipcontrollerapp
52 script_location = ipcontrollerapp.__file__.replace('.pyc', '.py')
52 script_location = ipcontrollerapp.__file__.replace('.pyc', '.py')
53 # The -u option here turns on unbuffered output, which is required
53 # The -u option here turns on unbuffered output, which is required
54 # on Win32 to prevent wierd conflict and problems with Twisted.
54 # on Win32 to prevent wierd conflict and problems with Twisted.
55 # Also, use sys.executable to make sure we are picking up the
55 # Also, use sys.executable to make sure we are picking up the
56 # right python exe.
56 # right python exe.
57 cmd = [sys.executable, '-u', script_location]
57 cmd = [sys.executable, '-u', script_location]
58 else:
58 else:
59 # ipcontroller has to be on the PATH in this case.
59 # ipcontroller has to be on the PATH in this case.
60 cmd = ['ipcontroller']
60 cmd = ['ipcontroller']
61 return cmd
61 return cmd
62
62
63
63
64 def find_engine_cmd():
64 def find_engine_cmd():
65 """Find the command line ipengine program in a cross platform way."""
65 """Find the command line ipengine program in a cross platform way."""
66 if sys.platform == 'win32':
66 if sys.platform == 'win32':
67 # This logic is needed because the ipengine script doesn't
67 # This logic is needed because the ipengine script doesn't
68 # always get installed in the same way or in the same location.
68 # always get installed in the same way or in the same location.
69 from IPython.kernel import ipengineapp
69 from IPython.kernel import ipengineapp
70 script_location = ipengineapp.__file__.replace('.pyc', '.py')
70 script_location = ipengineapp.__file__.replace('.pyc', '.py')
71 # The -u option here turns on unbuffered output, which is required
71 # The -u option here turns on unbuffered output, which is required
72 # on Win32 to prevent wierd conflict and problems with Twisted.
72 # on Win32 to prevent wierd conflict and problems with Twisted.
73 # Also, use sys.executable to make sure we are picking up the
73 # Also, use sys.executable to make sure we are picking up the
74 # right python exe.
74 # right python exe.
75 cmd = [sys.executable, '-u', script_location]
75 cmd = [sys.executable, '-u', script_location]
76 else:
76 else:
77 # ipcontroller has to be on the PATH in this case.
77 # ipcontroller has to be on the PATH in this case.
78 cmd = ['ipengine']
78 cmd = ['ipengine']
79 return cmd
79 return cmd
80
80
81
81
82 #-----------------------------------------------------------------------------
82 #-----------------------------------------------------------------------------
83 # Base launchers and errors
83 # Base launchers and errors
84 #-----------------------------------------------------------------------------
84 #-----------------------------------------------------------------------------
85
85
86
86
87 class LauncherError(Exception):
87 class LauncherError(Exception):
88 pass
88 pass
89
89
90
90
91 class ProcessStateError(LauncherError):
91 class ProcessStateError(LauncherError):
92 pass
92 pass
93
93
94
94
95 class UnknownStatus(LauncherError):
95 class UnknownStatus(LauncherError):
96 pass
96 pass
97
97
98
98
99 class BaseLauncher(Component):
99 class BaseLauncher(Component):
100 """An asbtraction for starting, stopping and signaling a process."""
100 """An asbtraction for starting, stopping and signaling a process."""
101
101
102 working_dir = Unicode(u'')
102 working_dir = Unicode(u'')
103
103
104 def __init__(self, working_dir, parent=None, name=None, config=None):
104 def __init__(self, working_dir, parent=None, name=None, config=None):
105 super(BaseLauncher, self).__init__(parent, name, config)
105 super(BaseLauncher, self).__init__(parent, name, config)
106 self.working_dir = working_dir
106 self.working_dir = working_dir
107 self.state = 'before' # can be before, running, after
107 self.state = 'before' # can be before, running, after
108 self.stop_deferreds = []
108 self.stop_deferreds = []
109 self.start_data = None
109 self.start_data = None
110 self.stop_data = None
110 self.stop_data = None
111
111
112 @property
112 @property
113 def args(self):
113 def args(self):
114 """A list of cmd and args that will be used to start the process.
114 """A list of cmd and args that will be used to start the process.
115
115
116 This is what is passed to :func:`spawnProcess` and the first element
116 This is what is passed to :func:`spawnProcess` and the first element
117 will be the process name.
117 will be the process name.
118 """
118 """
119 return self.find_args()
119 return self.find_args()
120
120
121 def find_args(self):
121 def find_args(self):
122 """The ``.args`` property calls this to find the args list.
122 """The ``.args`` property calls this to find the args list.
123
123
124 Subcommand should implement this to construct the cmd and args.
124 Subcommand should implement this to construct the cmd and args.
125 """
125 """
126 raise NotImplementedError('find_args must be implemented in a subclass')
126 raise NotImplementedError('find_args must be implemented in a subclass')
127
127
128 @property
128 @property
129 def arg_str(self):
129 def arg_str(self):
130 """The string form of the program arguments."""
130 """The string form of the program arguments."""
131 return ' '.join(self.args)
131 return ' '.join(self.args)
132
132
133 @property
133 @property
134 def running(self):
134 def running(self):
135 """Am I running."""
135 """Am I running."""
136 if self.state == 'running':
136 if self.state == 'running':
137 return True
137 return True
138 else:
138 else:
139 return False
139 return False
140
140
141 def start(self):
141 def start(self):
142 """Start the process.
142 """Start the process.
143
143
144 This must return a deferred that fires with information about the
144 This must return a deferred that fires with information about the
145 process starting (like a pid, job id, etc.).
145 process starting (like a pid, job id, etc.).
146 """
146 """
147 return defer.fail(
147 return defer.fail(
148 Failure(NotImplementedError(
148 Failure(NotImplementedError(
149 'start must be implemented in a subclass')
149 'start must be implemented in a subclass')
150 )
150 )
151 )
151 )
152
152
153 def stop(self):
153 def stop(self):
154 """Stop the process and notify observers of stopping.
154 """Stop the process and notify observers of stopping.
155
155
156 This must return a deferred that fires with information about the
156 This must return a deferred that fires with information about the
157 processing stopping, like errors that occur while the process is
157 processing stopping, like errors that occur while the process is
158 attempting to be shut down. This deferred won't fire when the process
158 attempting to be shut down. This deferred won't fire when the process
159 actually stops. To observe the actual process stopping, see
159 actually stops. To observe the actual process stopping, see
160 :func:`observe_stop`.
160 :func:`observe_stop`.
161 """
161 """
162 return defer.fail(
162 return defer.fail(
163 Failure(NotImplementedError(
163 Failure(NotImplementedError(
164 'stop must be implemented in a subclass')
164 'stop must be implemented in a subclass')
165 )
165 )
166 )
166 )
167
167
168 def observe_stop(self):
168 def observe_stop(self):
169 """Get a deferred that will fire when the process stops.
169 """Get a deferred that will fire when the process stops.
170
170
171 The deferred will fire with data that contains information about
171 The deferred will fire with data that contains information about
172 the exit status of the process.
172 the exit status of the process.
173 """
173 """
174 if self.state=='after':
174 if self.state=='after':
175 return defer.succeed(self.stop_data)
175 return defer.succeed(self.stop_data)
176 else:
176 else:
177 d = defer.Deferred()
177 d = defer.Deferred()
178 self.stop_deferreds.append(d)
178 self.stop_deferreds.append(d)
179 return d
179 return d
180
180
181 def notify_start(self, data):
181 def notify_start(self, data):
182 """Call this to trigger startup actions.
182 """Call this to trigger startup actions.
183
183
184 This logs the process startup and sets the state to 'running'. It is
184 This logs the process startup and sets the state to 'running'. It is
185 a pass-through so it can be used as a callback.
185 a pass-through so it can be used as a callback.
186 """
186 """
187
187
188 log.msg('Process %r started: %r' % (self.args[0], data))
188 log.msg('Process %r started: %r' % (self.args[0], data))
189 self.start_data = data
189 self.start_data = data
190 self.state = 'running'
190 self.state = 'running'
191 return data
191 return data
192
192
193 def notify_stop(self, data):
193 def notify_stop(self, data):
194 """Call this to trigger process stop actions.
194 """Call this to trigger process stop actions.
195
195
196 This logs the process stopping and sets the state to 'after'. Call
196 This logs the process stopping and sets the state to 'after'. Call
197 this to trigger all the deferreds from :func:`observe_stop`."""
197 this to trigger all the deferreds from :func:`observe_stop`."""
198
198
199 log.msg('Process %r stopped: %r' % (self.args[0], data))
199 log.msg('Process %r stopped: %r' % (self.args[0], data))
200 self.stop_data = data
200 self.stop_data = data
201 self.state = 'after'
201 self.state = 'after'
202 for i in range(len(self.stop_deferreds)):
202 for i in range(len(self.stop_deferreds)):
203 d = self.stop_deferreds.pop()
203 d = self.stop_deferreds.pop()
204 d.callback(data)
204 d.callback(data)
205 return data
205 return data
206
206
207 def signal(self, sig):
207 def signal(self, sig):
208 """Signal the process.
208 """Signal the process.
209
209
210 Return a semi-meaningless deferred after signaling the process.
210 Return a semi-meaningless deferred after signaling the process.
211
211
212 Parameters
212 Parameters
213 ----------
213 ----------
214 sig : str or int
214 sig : str or int
215 'KILL', 'INT', etc., or any signal number
215 'KILL', 'INT', etc., or any signal number
216 """
216 """
217 return defer.fail(
217 return defer.fail(
218 Failure(NotImplementedError(
218 Failure(NotImplementedError(
219 'signal must be implemented in a subclass')
219 'signal must be implemented in a subclass')
220 )
220 )
221 )
221 )
222
222
223
223
224 #-----------------------------------------------------------------------------
224 #-----------------------------------------------------------------------------
225 # Local process launchers
225 # Local process launchers
226 #-----------------------------------------------------------------------------
226 #-----------------------------------------------------------------------------
227
227
228
228
229 class LocalProcessLauncherProtocol(ProcessProtocol):
229 class LocalProcessLauncherProtocol(ProcessProtocol):
230 """A ProcessProtocol to go with the LocalProcessLauncher."""
230 """A ProcessProtocol to go with the LocalProcessLauncher."""
231
231
232 def __init__(self, process_launcher):
232 def __init__(self, process_launcher):
233 self.process_launcher = process_launcher
233 self.process_launcher = process_launcher
234 self.pid = None
234 self.pid = None
235
235
236 def connectionMade(self):
236 def connectionMade(self):
237 self.pid = self.transport.pid
237 self.pid = self.transport.pid
238 self.process_launcher.notify_start(self.transport.pid)
238 self.process_launcher.notify_start(self.transport.pid)
239
239
240 def processEnded(self, status):
240 def processEnded(self, status):
241 value = status.value
241 value = status.value
242 if isinstance(value, ProcessDone):
242 if isinstance(value, ProcessDone):
243 self.process_launcher.notify_stop(
243 self.process_launcher.notify_stop(
244 {'exit_code':0,
244 {'exit_code':0,
245 'signal':None,
245 'signal':None,
246 'status':None,
246 'status':None,
247 'pid':self.pid
247 'pid':self.pid
248 }
248 }
249 )
249 )
250 elif isinstance(value, ProcessTerminated):
250 elif isinstance(value, ProcessTerminated):
251 self.process_launcher.notify_stop(
251 self.process_launcher.notify_stop(
252 {'exit_code':value.exitCode,
252 {'exit_code':value.exitCode,
253 'signal':value.signal,
253 'signal':value.signal,
254 'status':value.status,
254 'status':value.status,
255 'pid':self.pid
255 'pid':self.pid
256 }
256 }
257 )
257 )
258 else:
258 else:
259 raise UnknownStatus("Unknown exit status, this is probably a "
259 raise UnknownStatus("Unknown exit status, this is probably a "
260 "bug in Twisted")
260 "bug in Twisted")
261
261
262 def outReceived(self, data):
262 def outReceived(self, data):
263 log.msg(data)
263 log.msg(data)
264
264
265 def errReceived(self, data):
265 def errReceived(self, data):
266 log.err(data)
266 log.err(data)
267
267
268
268
269 class LocalProcessLauncher(BaseLauncher):
269 class LocalProcessLauncher(BaseLauncher):
270 """Start and stop an external process in an asynchronous manner.
270 """Start and stop an external process in an asynchronous manner.
271
271
272 This will launch the external process with a working directory of
272 This will launch the external process with a working directory of
273 ``self.working_dir``.
273 ``self.working_dir``.
274 """
274 """
275
275
276 # This is used to to construct self.args, which is passed to
276 # This is used to to construct self.args, which is passed to
277 # spawnProcess.
277 # spawnProcess.
278 cmd_and_args = List([])
278 cmd_and_args = List([])
279
279
280 def __init__(self, working_dir, parent=None, name=None, config=None):
280 def __init__(self, working_dir, parent=None, name=None, config=None):
281 super(LocalProcessLauncher, self).__init__(
281 super(LocalProcessLauncher, self).__init__(
282 working_dir, parent, name, config
282 working_dir, parent, name, config
283 )
283 )
284 self.process_protocol = None
284 self.process_protocol = None
285 self.start_deferred = None
285 self.start_deferred = None
286
286
287 def find_args(self):
287 def find_args(self):
288 return self.cmd_and_args
288 return self.cmd_and_args
289
289
290 def start(self):
290 def start(self):
291 if self.state == 'before':
291 if self.state == 'before':
292 self.process_protocol = LocalProcessLauncherProtocol(self)
292 self.process_protocol = LocalProcessLauncherProtocol(self)
293 self.start_deferred = defer.Deferred()
293 self.start_deferred = defer.Deferred()
294 self.process_transport = reactor.spawnProcess(
294 self.process_transport = reactor.spawnProcess(
295 self.process_protocol,
295 self.process_protocol,
296 str(self.args[0]), # twisted expects these to be str, not unicode
296 str(self.args[0]), # twisted expects these to be str, not unicode
297 [str(a) for a in self.args], # str expected, not unicode
297 [str(a) for a in self.args], # str expected, not unicode
298 env=os.environ,
298 env=os.environ,
299 path=self.working_dir # start in the working_dir
299 path=self.working_dir # start in the working_dir
300 )
300 )
301 return self.start_deferred
301 return self.start_deferred
302 else:
302 else:
303 s = 'The process was already started and has state: %r' % self.state
303 s = 'The process was already started and has state: %r' % self.state
304 return defer.fail(ProcessStateError(s))
304 return defer.fail(ProcessStateError(s))
305
305
306 def notify_start(self, data):
306 def notify_start(self, data):
307 super(LocalProcessLauncher, self).notify_start(data)
307 super(LocalProcessLauncher, self).notify_start(data)
308 self.start_deferred.callback(data)
308 self.start_deferred.callback(data)
309
309
310 def stop(self):
310 def stop(self):
311 return self.interrupt_then_kill()
311 return self.interrupt_then_kill()
312
312
313 @make_deferred
313 @make_deferred
314 def signal(self, sig):
314 def signal(self, sig):
315 if self.state == 'running':
315 if self.state == 'running':
316 self.process_transport.signalProcess(sig)
316 self.process_transport.signalProcess(sig)
317
317
318 @inlineCallbacks
318 @inlineCallbacks
319 def interrupt_then_kill(self, delay=2.0):
319 def interrupt_then_kill(self, delay=2.0):
320 """Send INT, wait a delay and then send KILL."""
320 """Send INT, wait a delay and then send KILL."""
321 yield self.signal('INT')
321 yield self.signal('INT')
322 yield sleep_deferred(delay)
322 yield sleep_deferred(delay)
323 yield self.signal('KILL')
323 yield self.signal('KILL')
324
324
325
325
326 class LocalControllerLauncher(LocalProcessLauncher):
326 class LocalControllerLauncher(LocalProcessLauncher):
327 """Launch a controller as a regular external process."""
327 """Launch a controller as a regular external process."""
328
328
329 controller_cmd = List(find_controller_cmd(), config=True)
329 controller_cmd = List(find_controller_cmd(), config=True)
330 # Command line arguments to ipcontroller.
330 # Command line arguments to ipcontroller.
331 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
331 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
332
332
333 def find_args(self):
333 def find_args(self):
334 return self.controller_cmd + self.controller_args + \
334 return self.controller_cmd + self.controller_args + \
335 ['--working-dir', self.working_dir]
335 ['--working-dir', self.working_dir]
336
336
337 def start(self, cluster_dir):
337 def start(self, cluster_dir):
338 """Start the controller by cluster_dir."""
338 """Start the controller by cluster_dir."""
339 self.controller_args.extend(['--cluster-dir', cluster_dir])
339 self.controller_args.extend(['--cluster-dir', cluster_dir])
340 self.cluster_dir = unicode(cluster_dir)
340 self.cluster_dir = unicode(cluster_dir)
341 log.msg("Starting LocalControllerLauncher: %r" % self.args)
341 log.msg("Starting LocalControllerLauncher: %r" % self.args)
342 return super(LocalControllerLauncher, self).start()
342 return super(LocalControllerLauncher, self).start()
343
343
344
344
345 class LocalEngineLauncher(LocalProcessLauncher):
345 class LocalEngineLauncher(LocalProcessLauncher):
346 """Launch a single engine as a regular externall process."""
346 """Launch a single engine as a regular externall process."""
347
347
348 engine_cmd = List(find_engine_cmd(), config=True)
348 engine_cmd = List(find_engine_cmd(), config=True)
349 # Command line arguments for ipengine.
349 # Command line arguments for ipengine.
350 engine_args = List(
350 engine_args = List(
351 ['--log-to-file','--log-level', '40'], config=True
351 ['--log-to-file','--log-level', '40'], config=True
352 )
352 )
353
353
354 def find_args(self):
354 def find_args(self):
355 return self.engine_cmd + self.engine_args + \
355 return self.engine_cmd + self.engine_args + \
356 ['--working-dir', self.working_dir]
356 ['--working-dir', self.working_dir]
357
357
358 def start(self, cluster_dir):
358 def start(self, cluster_dir):
359 """Start the engine by cluster_dir."""
359 """Start the engine by cluster_dir."""
360 self.engine_args.extend(['--cluster-dir', cluster_dir])
360 self.engine_args.extend(['--cluster-dir', cluster_dir])
361 self.cluster_dir = unicode(cluster_dir)
361 self.cluster_dir = unicode(cluster_dir)
362 return super(LocalEngineLauncher, self).start()
362 return super(LocalEngineLauncher, self).start()
363
363
364
364
365 class LocalEngineSetLauncher(BaseLauncher):
365 class LocalEngineSetLauncher(BaseLauncher):
366 """Launch a set of engines as regular external processes."""
366 """Launch a set of engines as regular external processes."""
367
367
368 # Command line arguments for ipengine.
368 # Command line arguments for ipengine.
369 engine_args = List(
369 engine_args = List(
370 ['--log-to-file','--log-level', '40'], config=True
370 ['--log-to-file','--log-level', '40'], config=True
371 )
371 )
372
372
373 def __init__(self, working_dir, parent=None, name=None, config=None):
373 def __init__(self, working_dir, parent=None, name=None, config=None):
374 super(LocalEngineSetLauncher, self).__init__(
374 super(LocalEngineSetLauncher, self).__init__(
375 working_dir, parent, name, config
375 working_dir, parent, name, config
376 )
376 )
377 self.launchers = []
377 self.launchers = []
378
378
379 def start(self, n, cluster_dir):
379 def start(self, n, cluster_dir):
380 """Start n engines by profile or cluster_dir."""
380 """Start n engines by profile or cluster_dir."""
381 self.cluster_dir = unicode(cluster_dir)
381 self.cluster_dir = unicode(cluster_dir)
382 dlist = []
382 dlist = []
383 for i in range(n):
383 for i in range(n):
384 el = LocalEngineLauncher(self.working_dir, self)
384 el = LocalEngineLauncher(self.working_dir, self)
385 # Copy the engine args over to each engine launcher.
385 # Copy the engine args over to each engine launcher.
386 import copy
386 import copy
387 el.engine_args = copy.deepcopy(self.engine_args)
387 el.engine_args = copy.deepcopy(self.engine_args)
388 d = el.start(cluster_dir)
388 d = el.start(cluster_dir)
389 if i==0:
389 if i==0:
390 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
390 log.msg("Starting LocalEngineSetLauncher: %r" % el.args)
391 self.launchers.append(el)
391 self.launchers.append(el)
392 dlist.append(d)
392 dlist.append(d)
393 # The consumeErrors here could be dangerous
393 # The consumeErrors here could be dangerous
394 dfinal = gatherBoth(dlist, consumeErrors=True)
394 dfinal = gatherBoth(dlist, consumeErrors=True)
395 dfinal.addCallback(self.notify_start)
395 dfinal.addCallback(self.notify_start)
396 return dfinal
396 return dfinal
397
397
398 def find_args(self):
398 def find_args(self):
399 return ['engine set']
399 return ['engine set']
400
400
401 def signal(self, sig):
401 def signal(self, sig):
402 dlist = []
402 dlist = []
403 for el in self.launchers:
403 for el in self.launchers:
404 d = el.signal(sig)
404 d = el.signal(sig)
405 dlist.append(d)
405 dlist.append(d)
406 dfinal = gatherBoth(dlist, consumeErrors=True)
406 dfinal = gatherBoth(dlist, consumeErrors=True)
407 return dfinal
407 return dfinal
408
408
409 def interrupt_then_kill(self, delay=1.0):
409 def interrupt_then_kill(self, delay=1.0):
410 dlist = []
410 dlist = []
411 for el in self.launchers:
411 for el in self.launchers:
412 d = el.interrupt_then_kill(delay)
412 d = el.interrupt_then_kill(delay)
413 dlist.append(d)
413 dlist.append(d)
414 dfinal = gatherBoth(dlist, consumeErrors=True)
414 dfinal = gatherBoth(dlist, consumeErrors=True)
415 return dfinal
415 return dfinal
416
416
417 def stop(self):
417 def stop(self):
418 return self.interrupt_then_kill()
418 return self.interrupt_then_kill()
419
419
420 def observe_stop(self):
420 def observe_stop(self):
421 dlist = [el.observe_stop() for el in self.launchers]
421 dlist = [el.observe_stop() for el in self.launchers]
422 dfinal = gatherBoth(dlist, consumeErrors=False)
422 dfinal = gatherBoth(dlist, consumeErrors=False)
423 dfinal.addCallback(self.notify_stop)
423 dfinal.addCallback(self.notify_stop)
424 return dfinal
424 return dfinal
425
425
426
426
427 #-----------------------------------------------------------------------------
427 #-----------------------------------------------------------------------------
428 # MPIExec launchers
428 # MPIExec launchers
429 #-----------------------------------------------------------------------------
429 #-----------------------------------------------------------------------------
430
430
431
431
432 class MPIExecLauncher(LocalProcessLauncher):
432 class MPIExecLauncher(LocalProcessLauncher):
433 """Launch an external process using mpiexec."""
433 """Launch an external process using mpiexec."""
434
434
435 # The mpiexec command to use in starting the process.
435 # The mpiexec command to use in starting the process.
436 mpi_cmd = List(['mpiexec'], config=True)
436 mpi_cmd = List(['mpiexec'], config=True)
437 # The command line arguments to pass to mpiexec.
437 # The command line arguments to pass to mpiexec.
438 mpi_args = List([], config=True)
438 mpi_args = List([], config=True)
439 # The program to start using mpiexec.
439 # The program to start using mpiexec.
440 program = List(['date'], config=True)
440 program = List(['date'], config=True)
441 # The command line argument to the program.
441 # The command line argument to the program.
442 program_args = List([], config=True)
442 program_args = List([], config=True)
443 # The number of instances of the program to start.
443 # The number of instances of the program to start.
444 n = Int(1, config=True)
444 n = Int(1, config=True)
445
445
446 def find_args(self):
446 def find_args(self):
447 """Build self.args using all the fields."""
447 """Build self.args using all the fields."""
448 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
448 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
449 self.program + self.program_args
449 self.program + self.program_args
450
450
451 def start(self, n):
451 def start(self, n):
452 """Start n instances of the program using mpiexec."""
452 """Start n instances of the program using mpiexec."""
453 self.n = n
453 self.n = n
454 return super(MPIExecLauncher, self).start()
454 return super(MPIExecLauncher, self).start()
455
455
456
456
457 class MPIExecControllerLauncher(MPIExecLauncher):
457 class MPIExecControllerLauncher(MPIExecLauncher):
458 """Launch a controller using mpiexec."""
458 """Launch a controller using mpiexec."""
459
459
460 controller_cmd = List(find_controller_cmd(), config=True)
460 controller_cmd = List(find_controller_cmd(), config=True)
461 # Command line arguments to ipcontroller.
461 # Command line arguments to ipcontroller.
462 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
462 controller_args = List(['--log-to-file','--log-level', '40'], config=True)
463 n = Int(1, config=False)
463 n = Int(1, config=False)
464
464
465 def start(self, cluster_dir):
465 def start(self, cluster_dir):
466 """Start the controller by cluster_dir."""
466 """Start the controller by cluster_dir."""
467 self.controller_args.extend(['--cluster-dir', cluster_dir])
467 self.controller_args.extend(['--cluster-dir', cluster_dir])
468 self.cluster_dir = unicode(cluster_dir)
468 self.cluster_dir = unicode(cluster_dir)
469 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
469 log.msg("Starting MPIExecControllerLauncher: %r" % self.args)
470 return super(MPIExecControllerLauncher, self).start(1)
470 return super(MPIExecControllerLauncher, self).start(1)
471
471
472 def find_args(self):
472 def find_args(self):
473 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
473 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
474 self.controller_cmd + self.controller_args + \
474 self.controller_cmd + self.controller_args + \
475 ['--working-dir', self.working_dir]
475 ['--working-dir', self.working_dir]
476
476
477
477
478 class MPIExecEngineSetLauncher(MPIExecLauncher):
478 class MPIExecEngineSetLauncher(MPIExecLauncher):
479
479
480 engine_cmd = List(find_engine_cmd(), config=True)
480 engine_cmd = List(find_engine_cmd(), config=True)
481 # Command line arguments for ipengine.
481 # Command line arguments for ipengine.
482 engine_args = List(
482 engine_args = List(
483 ['--log-to-file','--log-level', '40'], config=True
483 ['--log-to-file','--log-level', '40'], config=True
484 )
484 )
485 n = Int(1, config=True)
485 n = Int(1, config=True)
486
486
487 def start(self, n, cluster_dir):
487 def start(self, n, cluster_dir):
488 """Start n engines by profile or cluster_dir."""
488 """Start n engines by profile or cluster_dir."""
489 self.engine_args.extend(['--cluster-dir', cluster_dir])
489 self.engine_args.extend(['--cluster-dir', cluster_dir])
490 self.cluster_dir = unicode(cluster_dir)
490 self.cluster_dir = unicode(cluster_dir)
491 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
491 log.msg('Starting MPIExecEngineSetLauncher: %r' % self.args)
492 return super(MPIExecEngineSetLauncher, self).start(n)
492 return super(MPIExecEngineSetLauncher, self).start(n)
493
493
494 def find_args(self):
494 def find_args(self):
495 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
495 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
496 self.engine_cmd + self.engine_args + \
496 self.engine_cmd + self.engine_args + \
497 ['--working-dir', self.working_dir]
497 ['--working-dir', self.working_dir]
498
498
499
499
500 #-----------------------------------------------------------------------------
500 #-----------------------------------------------------------------------------
501 # SSH launchers
501 # SSH launchers
502 #-----------------------------------------------------------------------------
502 #-----------------------------------------------------------------------------
503
503
504
504
505 class SSHLauncher(BaseLauncher):
505 class SSHLauncher(BaseLauncher):
506 """A minimal launcher for ssh.
506 """A minimal launcher for ssh.
507
507
508 To be useful this will probably have to be extended to use the ``sshx``
508 To be useful this will probably have to be extended to use the ``sshx``
509 idea for environment variables. There could be other things this needs
509 idea for environment variables. There could be other things this needs
510 as well.
510 as well.
511 """
511 """
512
512
513 ssh_cmd = List(['ssh'], config=True)
513 ssh_cmd = List(['ssh'], config=True)
514 ssh_args = List([], config=True)
514 ssh_args = List([], config=True)
515 program = List(['date'], config=True)
515 program = List(['date'], config=True)
516 program_args = List([], config=True)
516 program_args = List([], config=True)
517 hostname = Str('', config=True)
517 hostname = Str('', config=True)
518 user = Str('', config=True)
518 user = Str('', config=True)
519 location = Str('')
519 location = Str('')
520
520
521 def _hostname_changed(self, name, old, new):
521 def _hostname_changed(self, name, old, new):
522 self.location = '%s@%s' % (self.user, new)
522 self.location = '%s@%s' % (self.user, new)
523
523
524 def _user_changed(self, name, old, new):
524 def _user_changed(self, name, old, new):
525 self.location = '%s@%s' % (new, self.hostname)
525 self.location = '%s@%s' % (new, self.hostname)
526
526
527 def find_args(self):
527 def find_args(self):
528 return self.ssh_cmd + self.ssh_args + [self.location] + \
528 return self.ssh_cmd + self.ssh_args + [self.location] + \
529 self.program + self.program_args
529 self.program + self.program_args
530
530
531 def start(self, n, hostname=None, user=None):
531 def start(self, n, hostname=None, user=None):
532 if hostname is not None:
532 if hostname is not None:
533 self.hostname = hostname
533 self.hostname = hostname
534 if user is not None:
534 if user is not None:
535 self.user = user
535 self.user = user
536 return super(SSHLauncher, self).start()
536 return super(SSHLauncher, self).start()
537
537
538
538
539 class SSHControllerLauncher(SSHLauncher):
539 class SSHControllerLauncher(SSHLauncher):
540 pass
540 pass
541
541
542
542
543 class SSHEngineSetLauncher(BaseLauncher):
543 class SSHEngineSetLauncher(BaseLauncher):
544 pass
544 pass
545
545
546
546
547 #-----------------------------------------------------------------------------
547 #-----------------------------------------------------------------------------
548 # Windows HPC Server 2008 scheduler launchers
548 # Windows HPC Server 2008 scheduler launchers
549 #-----------------------------------------------------------------------------
549 #-----------------------------------------------------------------------------
550
550
551
551
552 # This is only used on Windows.
552 # This is only used on Windows.
553 def find_job_cmd():
553 def find_job_cmd():
554 if os.name=='nt':
554 if os.name=='nt':
555 return find_cmd('job')
555 return find_cmd('job')
556 else:
556 else:
557 return 'job'
557 return 'job'
558
558
559
559
560 class WindowsHPCLauncher(BaseLauncher):
560 class WindowsHPCLauncher(BaseLauncher):
561
561
562 # A regular expression used to get the job id from the output of the
562 # A regular expression used to get the job id from the output of the
563 # submit_command.
563 # submit_command.
564 job_id_regexp = Str('\d+', config=True)
564 job_id_regexp = Str('\d+', config=True)
565 # The filename of the instantiated job script.
565 # The filename of the instantiated job script.
566 job_file_name = Unicode(u'ipython_job.xml', config=True)
566 job_file_name = Unicode(u'ipython_job.xml', config=True)
567 # The full path to the instantiated job script. This gets made dynamically
567 # The full path to the instantiated job script. This gets made dynamically
568 # by combining the working_dir with the job_file_name.
568 # by combining the working_dir with the job_file_name.
569 job_file = Unicode(u'')
569 job_file = Unicode(u'')
570 # The hostname of the scheduler to submit the job to
570 # The hostname of the scheduler to submit the job to
571 scheduler = Str('', config=True)
571 scheduler = Str('', config=True)
572 job_cmd = Str(find_job_cmd, config=True)
572 job_cmd = Str(find_job_cmd(), config=True)
573
573
574 def __init__(self, working_dir, parent=None, name=None, config=None):
574 def __init__(self, working_dir, parent=None, name=None, config=None):
575 super(WindowsHPCLauncher, self).__init__(
575 super(WindowsHPCLauncher, self).__init__(
576 working_dir, parent, name, config
576 working_dir, parent, name, config
577 )
577 )
578 self.job_file = os.path.join(self.working_dir, self.job_file_name)
579
578
580 @property
579 @property
581 def job_file(self):
580 def job_file(self):
582 return os.path.join(self.working_dir, self.job_file_name)
581 return os.path.join(self.working_dir, self.job_file_name)
583
582
584 def write_job_file(self, n):
583 def write_job_file(self, n):
585 raise NotImplementedError("Implement write_job_file in a subclass.")
584 raise NotImplementedError("Implement write_job_file in a subclass.")
586
585
587 def find_args(self):
586 def find_args(self):
588 return ['job.exe']
587 return ['job.exe']
589
588
590 def parse_job_id(self, output):
589 def parse_job_id(self, output):
591 """Take the output of the submit command and return the job id."""
590 """Take the output of the submit command and return the job id."""
592 m = re.search(self.job_id_regexp, output)
591 m = re.search(self.job_id_regexp, output)
593 if m is not None:
592 if m is not None:
594 job_id = m.group()
593 job_id = m.group()
595 else:
594 else:
596 raise LauncherError("Job id couldn't be determined: %s" % output)
595 raise LauncherError("Job id couldn't be determined: %s" % output)
597 self.job_id = job_id
596 self.job_id = job_id
598 log.msg('Job started with job id: %r' % job_id)
597 log.msg('Job started with job id: %r' % job_id)
599 return job_id
598 return job_id
600
599
601 @inlineCallbacks
600 @inlineCallbacks
602 def start(self, n):
601 def start(self, n):
603 """Start n copies of the process using the Win HPC job scheduler."""
602 """Start n copies of the process using the Win HPC job scheduler."""
604 self.write_job_file(n)
603 self.write_job_file(n)
605 args = [
604 args = [
606 'submit',
605 'submit',
607 '/jobfile:%s' % self.job_file,
606 '/jobfile:%s' % self.job_file,
608 '/scheduler:%s' % self.scheduler
607 '/scheduler:%s' % self.scheduler
609 ]
608 ]
610 log.msg("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
609 log.msg("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
611 output = yield getProcessOutput(self.job_cmd,
610 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
612 args,
611 output = yield getProcessOutput(str(self.job_cmd),
613 env=os.environ,
612 [str(a) for a in args],
613 env=dict((str(k),str(v)) for k,v in os.environ.items()),
614 path=self.working_dir
614 path=self.working_dir
615 )
615 )
616 job_id = self.parse_job_id(output)
616 job_id = self.parse_job_id(output)
617 self.notify_start(job_id)
617 self.notify_start(job_id)
618 defer.returnValue(job_id)
618 defer.returnValue(job_id)
619
619
620 @inlineCallbacks
620 @inlineCallbacks
621 def stop(self):
621 def stop(self):
622 args = [
622 args = [
623 'cancel',
623 'cancel',
624 self.job_id,
624 self.job_id,
625 '/scheduler:%s' % self.scheduler
625 '/scheduler:%s' % self.scheduler
626 ]
626 ]
627 log.msg("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
627 log.msg("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
628 try:
628 try:
629 output = yield getProcessOutput(self.job_cmd,
629 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
630 args,
630 output = yield getProcessOutput(str(self.job_cmd),
631 env=os.environ,
631 [str(a) for a in args],
632 env=dict((str(k),str(v)) for k,v in os.environ.items()),
632 path=self.working_dir
633 path=self.working_dir
633 )
634 )
634 except:
635 except:
635 output = 'The job already appears to be stoppped: %r' % self.job_id
636 output = 'The job already appears to be stoppped: %r' % self.job_id
636 self.notify_stop(output) # Pass the output of the kill cmd
637 self.notify_stop(output) # Pass the output of the kill cmd
637 defer.returnValue(output)
638 defer.returnValue(output)
638
639
639
640
640 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
641 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
641
642
642 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
643 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
643 extra_args = List([], config=False)
644 extra_args = List([], config=False)
644
645
645 def write_job_file(self, n):
646 def write_job_file(self, n):
646 job = IPControllerJob(self)
647 job = IPControllerJob(self)
647
648
648 t = IPControllerTask(self)
649 t = IPControllerTask(self)
649 # The tasks work directory is *not* the actual work directory of
650 # The tasks work directory is *not* the actual work directory of
650 # the controller. It is used as the base path for the stdout/stderr
651 # the controller. It is used as the base path for the stdout/stderr
651 # files that the scheduler redirects to.
652 # files that the scheduler redirects to.
652 t.work_directory = self.cluster_dir
653 t.work_directory = self.cluster_dir
653 # Add the --cluster-dir and --working-dir from self.start().
654 # Add the --cluster-dir and --working-dir from self.start().
654 t.controller_args.extend(self.extra_args)
655 t.controller_args.extend(self.extra_args)
655 job.add_task(t)
656 job.add_task(t)
656
657
657 log.msg("Writing job description file: %s" % self.job_file)
658 log.msg("Writing job description file: %s" % self.job_file)
658 job.write(self.job_file)
659 job.write(self.job_file)
659
660
660 @property
661 @property
661 def job_file(self):
662 def job_file(self):
662 return os.path.join(self.cluster_dir, self.job_file_name)
663 return os.path.join(self.cluster_dir, self.job_file_name)
663
664
664 def start(self, cluster_dir):
665 def start(self, cluster_dir):
665 """Start the controller by cluster_dir."""
666 """Start the controller by cluster_dir."""
666 self.extra_args = [
667 self.extra_args = [
667 '--cluster-dir', cluster_dir, '--working-dir', self.working_dir
668 '--cluster-dir', cluster_dir, '--working-dir', self.working_dir
668 ]
669 ]
669 self.cluster_dir = unicode(cluster_dir)
670 self.cluster_dir = unicode(cluster_dir)
670 return super(WindowsHPCControllerLauncher, self).start(1)
671 return super(WindowsHPCControllerLauncher, self).start(1)
671
672
672
673
673 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
674 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
674
675
675 job_file_name = Unicode(u'ipengineset_job.xml', config=True)
676 job_file_name = Unicode(u'ipengineset_job.xml', config=True)
676 extra_args = List([], config=False)
677 extra_args = List([], config=False)
677
678
678 def write_job_file(self, n):
679 def write_job_file(self, n):
679 job = IPControllerJob(self)
680 job = IPEngineSetJob(self)
680
681
681 for i in range(n):
682 for i in range(n):
682 t = IPEngineTask(self)
683 t = IPEngineTask(self)
683 # The tasks work directory is *not* the actual work directory of
684 # The tasks work directory is *not* the actual work directory of
684 # the engine. It is used as the base path for the stdout/stderr
685 # the engine. It is used as the base path for the stdout/stderr
685 # files that the scheduler redirects to.
686 # files that the scheduler redirects to.
686 t.work_directory = self.cluster_dir
687 t.work_directory = self.cluster_dir
687 # Add the --cluster-dir and --working-dir from self.start().
688 # Add the --cluster-dir and --working-dir from self.start().
688 t.engine_args.extend(self.extra_args)
689 t.engine_args.extend(self.extra_args)
689 job.add_task(t)
690 job.add_task(t)
690
691
691 log.msg("Writing job description file: %s" % self.job_file)
692 log.msg("Writing job description file: %s" % self.job_file)
692 job.write(self.job_file)
693 job.write(self.job_file)
693
694
694 @property
695 @property
695 def job_file(self):
696 def job_file(self):
696 return os.path.join(self.cluster_dir, self.job_file_name)
697 return os.path.join(self.cluster_dir, self.job_file_name)
697
698
698 def start(self, cluster_dir):
699 def start(self, n, cluster_dir):
699 """Start the controller by cluster_dir."""
700 """Start the controller by cluster_dir."""
700 self.extra_args = [
701 self.extra_args = [
701 '--cluster-dir', cluster_dir, '--working-dir', self.working_dir
702 '--cluster-dir', cluster_dir, '--working-dir', self.working_dir
702 ]
703 ]
703 self.cluster_dir = unicode(cluster_dir)
704 self.cluster_dir = unicode(cluster_dir)
704 return super(WindowsHPCControllerLauncher, self).start(n)
705 return super(WindowsHPCEngineSetLauncher, self).start(n)
705
706
706
707
707 #-----------------------------------------------------------------------------
708 #-----------------------------------------------------------------------------
708 # Batch (PBS) system launchers
709 # Batch (PBS) system launchers
709 #-----------------------------------------------------------------------------
710 #-----------------------------------------------------------------------------
710
711
711
712
712 class BatchSystemLauncher(BaseLauncher):
713 class BatchSystemLauncher(BaseLauncher):
713 """Launch an external process using a batch system.
714 """Launch an external process using a batch system.
714
715
715 This class is designed to work with UNIX batch systems like PBS, LSF,
716 This class is designed to work with UNIX batch systems like PBS, LSF,
716 GridEngine, etc. The overall model is that there are different commands
717 GridEngine, etc. The overall model is that there are different commands
717 like qsub, qdel, etc. that handle the starting and stopping of the process.
718 like qsub, qdel, etc. that handle the starting and stopping of the process.
718
719
719 This class also has the notion of a batch script. The ``batch_template``
720 This class also has the notion of a batch script. The ``batch_template``
720 attribute can be set to a string that is a template for the batch script.
721 attribute can be set to a string that is a template for the batch script.
721 This template is instantiated using Itpl. Thus the template can use
722 This template is instantiated using Itpl. Thus the template can use
722 ${n} fot the number of instances. Subclasses can add additional variables
723 ${n} fot the number of instances. Subclasses can add additional variables
723 to the template dict.
724 to the template dict.
724 """
725 """
725
726
726 # Subclasses must fill these in. See PBSEngineSet
727 # Subclasses must fill these in. See PBSEngineSet
727 # The name of the command line program used to submit jobs.
728 # The name of the command line program used to submit jobs.
728 submit_command = Str('', config=True)
729 submit_command = Str('', config=True)
729 # The name of the command line program used to delete jobs.
730 # The name of the command line program used to delete jobs.
730 delete_command = Str('', config=True)
731 delete_command = Str('', config=True)
731 # A regular expression used to get the job id from the output of the
732 # A regular expression used to get the job id from the output of the
732 # submit_command.
733 # submit_command.
733 job_id_regexp = Str('', config=True)
734 job_id_regexp = Str('', config=True)
734 # The string that is the batch script template itself.
735 # The string that is the batch script template itself.
735 batch_template = Str('', config=True)
736 batch_template = Str('', config=True)
736 # The filename of the instantiated batch script.
737 # The filename of the instantiated batch script.
737 batch_file_name = Unicode(u'batch_script', config=True)
738 batch_file_name = Unicode(u'batch_script', config=True)
738 # The full path to the instantiated batch script.
739 # The full path to the instantiated batch script.
739 batch_file = Unicode(u'')
740 batch_file = Unicode(u'')
740
741
741 def __init__(self, working_dir, parent=None, name=None, config=None):
742 def __init__(self, working_dir, parent=None, name=None, config=None):
742 super(BatchSystemLauncher, self).__init__(
743 super(BatchSystemLauncher, self).__init__(
743 working_dir, parent, name, config
744 working_dir, parent, name, config
744 )
745 )
745 self.batch_file = os.path.join(self.working_dir, self.batch_file_name)
746 self.batch_file = os.path.join(self.working_dir, self.batch_file_name)
746 self.context = {}
747 self.context = {}
747
748
748 def parse_job_id(self, output):
749 def parse_job_id(self, output):
749 """Take the output of the submit command and return the job id."""
750 """Take the output of the submit command and return the job id."""
750 m = re.match(self.job_id_regexp, output)
751 m = re.match(self.job_id_regexp, output)
751 if m is not None:
752 if m is not None:
752 job_id = m.group()
753 job_id = m.group()
753 else:
754 else:
754 raise LauncherError("Job id couldn't be determined: %s" % output)
755 raise LauncherError("Job id couldn't be determined: %s" % output)
755 self.job_id = job_id
756 self.job_id = job_id
756 log.msg('Job started with job id: %r' % job_id)
757 log.msg('Job started with job id: %r' % job_id)
757 return job_id
758 return job_id
758
759
759 def write_batch_script(self, n):
760 def write_batch_script(self, n):
760 """Instantiate and write the batch script to the working_dir."""
761 """Instantiate and write the batch script to the working_dir."""
761 self.context['n'] = n
762 self.context['n'] = n
762 script_as_string = Itpl.itplns(self.batch_template, self.context)
763 script_as_string = Itpl.itplns(self.batch_template, self.context)
763 log.msg('Writing instantiated batch script: %s' % self.batch_file)
764 log.msg('Writing instantiated batch script: %s' % self.batch_file)
764 f = open(self.batch_file, 'w')
765 f = open(self.batch_file, 'w')
765 f.write(script_as_string)
766 f.write(script_as_string)
766 f.close()
767 f.close()
767
768
768 @inlineCallbacks
769 @inlineCallbacks
769 def start(self, n):
770 def start(self, n):
770 """Start n copies of the process using a batch system."""
771 """Start n copies of the process using a batch system."""
771 self.write_batch_script(n)
772 self.write_batch_script(n)
772 output = yield getProcessOutput(self.submit_command,
773 output = yield getProcessOutput(self.submit_command,
773 [self.batch_file], env=os.environ)
774 [self.batch_file], env=os.environ)
774 job_id = self.parse_job_id(output)
775 job_id = self.parse_job_id(output)
775 self.notify_start(job_id)
776 self.notify_start(job_id)
776 defer.returnValue(job_id)
777 defer.returnValue(job_id)
777
778
778 @inlineCallbacks
779 @inlineCallbacks
779 def stop(self):
780 def stop(self):
780 output = yield getProcessOutput(self.delete_command,
781 output = yield getProcessOutput(self.delete_command,
781 [self.job_id], env=os.environ
782 [self.job_id], env=os.environ
782 )
783 )
783 self.notify_stop(output) # Pass the output of the kill cmd
784 self.notify_stop(output) # Pass the output of the kill cmd
784 defer.returnValue(output)
785 defer.returnValue(output)
785
786
786
787
787 class PBSLauncher(BatchSystemLauncher):
788 class PBSLauncher(BatchSystemLauncher):
788 """A BatchSystemLauncher subclass for PBS."""
789 """A BatchSystemLauncher subclass for PBS."""
789
790
790 submit_command = Str('qsub', config=True)
791 submit_command = Str('qsub', config=True)
791 delete_command = Str('qdel', config=True)
792 delete_command = Str('qdel', config=True)
792 job_id_regexp = Str('\d+', config=True)
793 job_id_regexp = Str('\d+', config=True)
793 batch_template = Str('', config=True)
794 batch_template = Str('', config=True)
794 batch_file_name = Unicode(u'pbs_batch_script', config=True)
795 batch_file_name = Unicode(u'pbs_batch_script', config=True)
795 batch_file = Unicode(u'')
796 batch_file = Unicode(u'')
796
797
797
798
798 class PBSControllerLauncher(PBSLauncher):
799 class PBSControllerLauncher(PBSLauncher):
799 """Launch a controller using PBS."""
800 """Launch a controller using PBS."""
800
801
801 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
802 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
802
803
803 def start(self, cluster_dir):
804 def start(self, cluster_dir):
804 """Start the controller by profile or cluster_dir."""
805 """Start the controller by profile or cluster_dir."""
805 # Here we save profile and cluster_dir in the context so they
806 # Here we save profile and cluster_dir in the context so they
806 # can be used in the batch script template as ${profile} and
807 # can be used in the batch script template as ${profile} and
807 # ${cluster_dir}
808 # ${cluster_dir}
808 self.context['cluster_dir'] = cluster_dir
809 self.context['cluster_dir'] = cluster_dir
809 self.cluster_dir = unicode(cluster_dir)
810 self.cluster_dir = unicode(cluster_dir)
810 log.msg("Starting PBSControllerLauncher: %r" % self.args)
811 log.msg("Starting PBSControllerLauncher: %r" % self.args)
811 return super(PBSControllerLauncher, self).start(1)
812 return super(PBSControllerLauncher, self).start(1)
812
813
813
814
814 class PBSEngineSetLauncher(PBSLauncher):
815 class PBSEngineSetLauncher(PBSLauncher):
815
816
816 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
817 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
817
818
818 def start(self, n, cluster_dir):
819 def start(self, n, cluster_dir):
819 """Start n engines by profile or cluster_dir."""
820 """Start n engines by profile or cluster_dir."""
820 self.program_args.extend(['--cluster-dir', cluster_dir])
821 self.program_args.extend(['--cluster-dir', cluster_dir])
821 self.cluster_dir = unicode(cluster_dir)
822 self.cluster_dir = unicode(cluster_dir)
822 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
823 log.msg('Starting PBSEngineSetLauncher: %r' % self.args)
823 return super(PBSEngineSetLauncher, self).start(n)
824 return super(PBSEngineSetLauncher, self).start(n)
824
825
825
826
826 #-----------------------------------------------------------------------------
827 #-----------------------------------------------------------------------------
827 # A launcher for ipcluster itself!
828 # A launcher for ipcluster itself!
828 #-----------------------------------------------------------------------------
829 #-----------------------------------------------------------------------------
829
830
830
831
831 def find_ipcluster_cmd():
832 def find_ipcluster_cmd():
832 """Find the command line ipcluster program in a cross platform way."""
833 """Find the command line ipcluster program in a cross platform way."""
833 if sys.platform == 'win32':
834 if sys.platform == 'win32':
834 # This logic is needed because the ipcluster script doesn't
835 # This logic is needed because the ipcluster script doesn't
835 # always get installed in the same way or in the same location.
836 # always get installed in the same way or in the same location.
836 from IPython.kernel import ipclusterapp
837 from IPython.kernel import ipclusterapp
837 script_location = ipclusterapp.__file__.replace('.pyc', '.py')
838 script_location = ipclusterapp.__file__.replace('.pyc', '.py')
838 # The -u option here turns on unbuffered output, which is required
839 # The -u option here turns on unbuffered output, which is required
839 # on Win32 to prevent wierd conflict and problems with Twisted.
840 # on Win32 to prevent wierd conflict and problems with Twisted.
840 # Also, use sys.executable to make sure we are picking up the
841 # Also, use sys.executable to make sure we are picking up the
841 # right python exe.
842 # right python exe.
842 cmd = [sys.executable, '-u', script_location]
843 cmd = [sys.executable, '-u', script_location]
843 else:
844 else:
844 # ipcontroller has to be on the PATH in this case.
845 # ipcontroller has to be on the PATH in this case.
845 cmd = ['ipcluster']
846 cmd = ['ipcluster']
846 return cmd
847 return cmd
847
848
848
849
849 class IPClusterLauncher(LocalProcessLauncher):
850 class IPClusterLauncher(LocalProcessLauncher):
850 """Launch the ipcluster program in an external process."""
851 """Launch the ipcluster program in an external process."""
851
852
852 ipcluster_cmd = List(find_ipcluster_cmd(), config=True)
853 ipcluster_cmd = List(find_ipcluster_cmd(), config=True)
853 # Command line arguments to pass to ipcluster.
854 # Command line arguments to pass to ipcluster.
854 ipcluster_args = List(
855 ipcluster_args = List(
855 ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True)
856 ['--clean-logs', '--log-to-file', '--log-level', '40'], config=True)
856 ipcluster_subcommand = Str('start')
857 ipcluster_subcommand = Str('start')
857 ipcluster_n = Int(2)
858 ipcluster_n = Int(2)
858
859
859 def find_args(self):
860 def find_args(self):
860 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
861 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
861 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
862 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
862
863
863 def start(self):
864 def start(self):
864 log.msg("Starting ipcluster: %r" % self.args)
865 log.msg("Starting ipcluster: %r" % self.args)
865 return super(IPClusterLauncher, self).start()
866 return super(IPClusterLauncher, self).start()
866
867
General Comments 0
You need to be logged in to leave comments. Login now