##// END OF EJS Templates
add ipcluster engines; fix ssh process shutdown
MinRK -
Show More
@@ -1,505 +1,580 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import re
18 import re
19 import logging
19 import logging
20 import os
20 import os
21 import signal
21 import signal
22 import logging
22 import logging
23
23
24 from zmq.eventloop import ioloop
24 from zmq.eventloop import ioloop
25
25
26 from IPython.external.argparse import ArgumentParser, SUPPRESS
26 from IPython.external.argparse import ArgumentParser, SUPPRESS
27 from IPython.utils.importstring import import_item
27 from IPython.utils.importstring import import_item
28 from IPython.zmq.parallel.clusterdir import (
28 from IPython.zmq.parallel.clusterdir import (
29 ApplicationWithClusterDir, ClusterDirConfigLoader,
29 ApplicationWithClusterDir, ClusterDirConfigLoader,
30 ClusterDirError, PIDFileError
30 ClusterDirError, PIDFileError
31 )
31 )
32
32
33
33
34 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
35 # Module level variables
35 # Module level variables
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37
37
38
38
39 default_config_file_name = u'ipcluster_config.py'
39 default_config_file_name = u'ipcluster_config.py'
40
40
41
41
42 _description = """\
42 _description = """\
43 Start an IPython cluster for parallel computing.\n\n
43 Start an IPython cluster for parallel computing.\n\n
44
44
45 An IPython cluster consists of 1 controller and 1 or more engines.
45 An IPython cluster consists of 1 controller and 1 or more engines.
46 This command automates the startup of these processes using a wide
46 This command automates the startup of these processes using a wide
47 range of startup methods (SSH, local processes, PBS, mpiexec,
47 range of startup methods (SSH, local processes, PBS, mpiexec,
48 Windows HPC Server 2008). To start a cluster with 4 engines on your
48 Windows HPC Server 2008). To start a cluster with 4 engines on your
49 local host simply do 'ipclusterz start -n 4'. For more complex usage
49 local host simply do 'ipclusterz start -n 4'. For more complex usage
50 you will typically do 'ipclusterz create -p mycluster', then edit
50 you will typically do 'ipclusterz create -p mycluster', then edit
51 configuration files, followed by 'ipclusterz start -p mycluster -n 4'.
51 configuration files, followed by 'ipclusterz start -p mycluster -n 4'.
52 """
52 """
53
53
54
54
55 # Exit codes for ipcluster
55 # Exit codes for ipcluster
56
56
57 # This will be the exit code if the ipcluster appears to be running because
57 # This will be the exit code if the ipcluster appears to be running because
58 # a .pid file exists
58 # a .pid file exists
59 ALREADY_STARTED = 10
59 ALREADY_STARTED = 10
60
60
61
61
62 # This will be the exit code if ipcluster stop is run, but there is not .pid
62 # This will be the exit code if ipcluster stop is run, but there is not .pid
63 # file to be found.
63 # file to be found.
64 ALREADY_STOPPED = 11
64 ALREADY_STOPPED = 11
65
65
66 # This will be the exit code if ipcluster engines is run, but there is not .pid
67 # file to be found.
68 NO_CLUSTER = 12
69
66
70
67 #-----------------------------------------------------------------------------
71 #-----------------------------------------------------------------------------
68 # Command line options
72 # Command line options
69 #-----------------------------------------------------------------------------
73 #-----------------------------------------------------------------------------
70
74
71
75
72 class IPClusterAppConfigLoader(ClusterDirConfigLoader):
76 class IPClusterAppConfigLoader(ClusterDirConfigLoader):
73
77
74 def _add_arguments(self):
78 def _add_arguments(self):
75 # Don't call ClusterDirConfigLoader._add_arguments as we don't want
79 # Don't call ClusterDirConfigLoader._add_arguments as we don't want
76 # its defaults on self.parser. Instead, we will put those on
80 # its defaults on self.parser. Instead, we will put those on
77 # default options on our subparsers.
81 # default options on our subparsers.
78
82
79 # This has all the common options that all subcommands use
83 # This has all the common options that all subcommands use
80 parent_parser1 = ArgumentParser(
84 parent_parser1 = ArgumentParser(
81 add_help=False,
85 add_help=False,
82 argument_default=SUPPRESS
86 argument_default=SUPPRESS
83 )
87 )
84 self._add_ipython_dir(parent_parser1)
88 self._add_ipython_dir(parent_parser1)
85 self._add_log_level(parent_parser1)
89 self._add_log_level(parent_parser1)
86
90
87 # This has all the common options that other subcommands use
91 # This has all the common options that other subcommands use
88 parent_parser2 = ArgumentParser(
92 parent_parser2 = ArgumentParser(
89 add_help=False,
93 add_help=False,
90 argument_default=SUPPRESS
94 argument_default=SUPPRESS
91 )
95 )
92 self._add_cluster_profile(parent_parser2)
96 self._add_cluster_profile(parent_parser2)
93 self._add_cluster_dir(parent_parser2)
97 self._add_cluster_dir(parent_parser2)
94 self._add_work_dir(parent_parser2)
98 self._add_work_dir(parent_parser2)
95 paa = parent_parser2.add_argument
99 paa = parent_parser2.add_argument
96 paa('--log-to-file',
100 paa('--log-to-file',
97 action='store_true', dest='Global.log_to_file',
101 action='store_true', dest='Global.log_to_file',
98 help='Log to a file in the log directory (default is stdout)')
102 help='Log to a file in the log directory (default is stdout)')
99
103
100 # Create the object used to create the subparsers.
104 # Create the object used to create the subparsers.
101 subparsers = self.parser.add_subparsers(
105 subparsers = self.parser.add_subparsers(
102 dest='Global.subcommand',
106 dest='Global.subcommand',
103 title='ipcluster subcommands',
107 title='ipcluster subcommands',
104 description=
108 description=
105 """ipcluster has a variety of subcommands. The general way of
109 """ipcluster has a variety of subcommands. The general way of
106 running ipcluster is 'ipclusterz <cmd> [options]'. To get help
110 running ipcluster is 'ipclusterz <cmd> [options]'. To get help
107 on a particular subcommand do 'ipclusterz <cmd> -h'."""
111 on a particular subcommand do 'ipclusterz <cmd> -h'."""
108 # help="For more help, type 'ipclusterz <cmd> -h'",
112 # help="For more help, type 'ipclusterz <cmd> -h'",
109 )
113 )
110
114
111 # The "list" subcommand parser
115 # The "list" subcommand parser
112 parser_list = subparsers.add_parser(
116 parser_list = subparsers.add_parser(
113 'list',
117 'list',
114 parents=[parent_parser1],
118 parents=[parent_parser1],
115 argument_default=SUPPRESS,
119 argument_default=SUPPRESS,
116 help="List all clusters in cwd and ipython_dir.",
120 help="List all clusters in cwd and ipython_dir.",
117 description=
121 description=
118 """List all available clusters, by cluster directory, that can
122 """List all available clusters, by cluster directory, that can
119 be found in the current working directly or in the ipython
123 be found in the current working directly or in the ipython
120 directory. Cluster directories are named using the convention
124 directory. Cluster directories are named using the convention
121 'cluster_<profile>'."""
125 'cluster_<profile>'."""
122 )
126 )
123
127
124 # The "create" subcommand parser
128 # The "create" subcommand parser
125 parser_create = subparsers.add_parser(
129 parser_create = subparsers.add_parser(
126 'create',
130 'create',
127 parents=[parent_parser1, parent_parser2],
131 parents=[parent_parser1, parent_parser2],
128 argument_default=SUPPRESS,
132 argument_default=SUPPRESS,
129 help="Create a new cluster directory.",
133 help="Create a new cluster directory.",
130 description=
134 description=
131 """Create an ipython cluster directory by its profile name or
135 """Create an ipython cluster directory by its profile name or
132 cluster directory path. Cluster directories contain
136 cluster directory path. Cluster directories contain
133 configuration, log and security related files and are named
137 configuration, log and security related files and are named
134 using the convention 'cluster_<profile>'. By default they are
138 using the convention 'cluster_<profile>'. By default they are
135 located in your ipython directory. Once created, you will
139 located in your ipython directory. Once created, you will
136 probably need to edit the configuration files in the cluster
140 probably need to edit the configuration files in the cluster
137 directory to configure your cluster. Most users will create a
141 directory to configure your cluster. Most users will create a
138 cluster directory by profile name,
142 cluster directory by profile name,
139 'ipclusterz create -p mycluster', which will put the directory
143 'ipclusterz create -p mycluster', which will put the directory
140 in '<ipython_dir>/cluster_mycluster'.
144 in '<ipython_dir>/cluster_mycluster'.
141 """
145 """
142 )
146 )
143 paa = parser_create.add_argument
147 paa = parser_create.add_argument
144 paa('--reset-config',
148 paa('--reset-config',
145 dest='Global.reset_config', action='store_true',
149 dest='Global.reset_config', action='store_true',
146 help=
150 help=
147 """Recopy the default config files to the cluster directory.
151 """Recopy the default config files to the cluster directory.
148 You will loose any modifications you have made to these files.""")
152 You will loose any modifications you have made to these files.""")
149
153
150 # The "start" subcommand parser
154 # The "start" subcommand parser
151 parser_start = subparsers.add_parser(
155 parser_start = subparsers.add_parser(
152 'start',
156 'start',
153 parents=[parent_parser1, parent_parser2],
157 parents=[parent_parser1, parent_parser2],
154 argument_default=SUPPRESS,
158 argument_default=SUPPRESS,
155 help="Start a cluster.",
159 help="Start a cluster.",
156 description=
160 description=
157 """Start an ipython cluster by its profile name or cluster
161 """Start an ipython cluster by its profile name or cluster
158 directory. Cluster directories contain configuration, log and
162 directory. Cluster directories contain configuration, log and
159 security related files and are named using the convention
163 security related files and are named using the convention
160 'cluster_<profile>' and should be creating using the 'start'
164 'cluster_<profile>' and should be creating using the 'start'
161 subcommand of 'ipcluster'. If your cluster directory is in
165 subcommand of 'ipcluster'. If your cluster directory is in
162 the cwd or the ipython directory, you can simply refer to it
166 the cwd or the ipython directory, you can simply refer to it
163 using its profile name, 'ipclusterz start -n 4 -p <profile>`,
167 using its profile name, 'ipclusterz start -n 4 -p <profile>`,
164 otherwise use the '--cluster-dir' option.
168 otherwise use the '--cluster-dir' option.
165 """
169 """
166 )
170 )
171
167 paa = parser_start.add_argument
172 paa = parser_start.add_argument
168 paa('-n', '--number',
173 paa('-n', '--number',
169 type=int, dest='Global.n',
174 type=int, dest='Global.n',
170 help='The number of engines to start.',
175 help='The number of engines to start.',
171 metavar='Global.n')
176 metavar='Global.n')
172 paa('--clean-logs',
177 paa('--clean-logs',
173 dest='Global.clean_logs', action='store_true',
178 dest='Global.clean_logs', action='store_true',
174 help='Delete old log flies before starting.')
179 help='Delete old log flies before starting.')
175 paa('--no-clean-logs',
180 paa('--no-clean-logs',
176 dest='Global.clean_logs', action='store_false',
181 dest='Global.clean_logs', action='store_false',
177 help="Don't delete old log flies before starting.")
182 help="Don't delete old log flies before starting.")
178 paa('--daemon',
183 paa('--daemon',
179 dest='Global.daemonize', action='store_true',
184 dest='Global.daemonize', action='store_true',
180 help='Daemonize the ipcluster program. This implies --log-to-file')
185 help='Daemonize the ipcluster program. This implies --log-to-file')
181 paa('--no-daemon',
186 paa('--no-daemon',
182 dest='Global.daemonize', action='store_false',
187 dest='Global.daemonize', action='store_false',
183 help="Dont't daemonize the ipcluster program.")
188 help="Dont't daemonize the ipcluster program.")
184 paa('--delay',
189 paa('--delay',
185 type=float, dest='Global.delay',
190 type=float, dest='Global.delay',
186 help="Specify the delay (in seconds) between starting the controller and starting the engine(s).")
191 help="Specify the delay (in seconds) between starting the controller and starting the engine(s).")
187
192
188 # The "stop" subcommand parser
193 # The "stop" subcommand parser
189 parser_stop = subparsers.add_parser(
194 parser_stop = subparsers.add_parser(
190 'stop',
195 'stop',
191 parents=[parent_parser1, parent_parser2],
196 parents=[parent_parser1, parent_parser2],
192 argument_default=SUPPRESS,
197 argument_default=SUPPRESS,
193 help="Stop a running cluster.",
198 help="Stop a running cluster.",
194 description=
199 description=
195 """Stop a running ipython cluster by its profile name or cluster
200 """Stop a running ipython cluster by its profile name or cluster
196 directory. Cluster directories are named using the convention
201 directory. Cluster directories are named using the convention
197 'cluster_<profile>'. If your cluster directory is in
202 'cluster_<profile>'. If your cluster directory is in
198 the cwd or the ipython directory, you can simply refer to it
203 the cwd or the ipython directory, you can simply refer to it
199 using its profile name, 'ipclusterz stop -p <profile>`, otherwise
204 using its profile name, 'ipclusterz stop -p <profile>`, otherwise
200 use the '--cluster-dir' option.
205 use the '--cluster-dir' option.
201 """
206 """
202 )
207 )
203 paa = parser_stop.add_argument
208 paa = parser_stop.add_argument
204 paa('--signal',
209 paa('--signal',
205 dest='Global.signal', type=int,
210 dest='Global.signal', type=int,
206 help="The signal number to use in stopping the cluster (default=2).",
211 help="The signal number to use in stopping the cluster (default=2).",
207 metavar="Global.signal")
212 metavar="Global.signal")
208
213
214 # the "engines" subcommand parser
215 parser_engines = subparsers.add_parser(
216 'engines',
217 parents=[parent_parser1, parent_parser2],
218 argument_default=SUPPRESS,
219 help="Attach some engines to an existing controller or cluster.",
220 description=
221 """Start one or more engines to connect to an existing Cluster
222 by profile name or cluster directory.
223 Cluster directories contain configuration, log and
224 security related files and are named using the convention
225 'cluster_<profile>' and should be creating using the 'start'
226 subcommand of 'ipcluster'. If your cluster directory is in
227 the cwd or the ipython directory, you can simply refer to it
228 using its profile name, 'ipclusterz engines -n 4 -p <profile>`,
229 otherwise use the '--cluster-dir' option.
230 """
231 )
232 paa = parser_engines.add_argument
233 paa('-n', '--number',
234 type=int, dest='Global.n',
235 help='The number of engines to start.',
236 metavar='Global.n')
237 paa('--daemon',
238 dest='Global.daemonize', action='store_true',
239 help='Daemonize the ipcluster program. This implies --log-to-file')
240 paa('--no-daemon',
241 dest='Global.daemonize', action='store_false',
242 help="Dont't daemonize the ipcluster program.")
209
243
210 #-----------------------------------------------------------------------------
244 #-----------------------------------------------------------------------------
211 # Main application
245 # Main application
212 #-----------------------------------------------------------------------------
246 #-----------------------------------------------------------------------------
213
247
214
248
215 class IPClusterApp(ApplicationWithClusterDir):
249 class IPClusterApp(ApplicationWithClusterDir):
216
250
217 name = u'ipclusterz'
251 name = u'ipclusterz'
218 description = _description
252 description = _description
219 usage = None
253 usage = None
220 command_line_loader = IPClusterAppConfigLoader
254 command_line_loader = IPClusterAppConfigLoader
221 default_config_file_name = default_config_file_name
255 default_config_file_name = default_config_file_name
222 default_log_level = logging.INFO
256 default_log_level = logging.INFO
223 auto_create_cluster_dir = False
257 auto_create_cluster_dir = False
224
258
225 def create_default_config(self):
259 def create_default_config(self):
226 super(IPClusterApp, self).create_default_config()
260 super(IPClusterApp, self).create_default_config()
227 self.default_config.Global.controller_launcher = \
261 self.default_config.Global.controller_launcher = \
228 'IPython.zmq.parallel.launcher.LocalControllerLauncher'
262 'IPython.zmq.parallel.launcher.LocalControllerLauncher'
229 self.default_config.Global.engine_launcher = \
263 self.default_config.Global.engine_launcher = \
230 'IPython.zmq.parallel.launcher.LocalEngineSetLauncher'
264 'IPython.zmq.parallel.launcher.LocalEngineSetLauncher'
231 self.default_config.Global.n = 2
265 self.default_config.Global.n = 2
232 self.default_config.Global.delay = 1
266 self.default_config.Global.delay = 1
233 self.default_config.Global.reset_config = False
267 self.default_config.Global.reset_config = False
234 self.default_config.Global.clean_logs = True
268 self.default_config.Global.clean_logs = True
235 self.default_config.Global.signal = 2
269 self.default_config.Global.signal = signal.SIGINT
236 self.default_config.Global.daemonize = False
270 self.default_config.Global.daemonize = False
237
271
238 def find_resources(self):
272 def find_resources(self):
239 subcommand = self.command_line_config.Global.subcommand
273 subcommand = self.command_line_config.Global.subcommand
240 if subcommand=='list':
274 if subcommand=='list':
241 self.list_cluster_dirs()
275 self.list_cluster_dirs()
242 # Exit immediately because there is nothing left to do.
276 # Exit immediately because there is nothing left to do.
243 self.exit()
277 self.exit()
244 elif subcommand=='create':
278 elif subcommand=='create':
245 self.auto_create_cluster_dir = True
279 self.auto_create_cluster_dir = True
246 super(IPClusterApp, self).find_resources()
280 super(IPClusterApp, self).find_resources()
247 elif subcommand=='start' or subcommand=='stop':
281 elif subcommand=='start' or subcommand=='stop':
248 self.auto_create_cluster_dir = True
282 self.auto_create_cluster_dir = True
249 try:
283 try:
250 super(IPClusterApp, self).find_resources()
284 super(IPClusterApp, self).find_resources()
251 except ClusterDirError:
285 except ClusterDirError:
252 raise ClusterDirError(
286 raise ClusterDirError(
253 "Could not find a cluster directory. A cluster dir must "
287 "Could not find a cluster directory. A cluster dir must "
254 "be created before running 'ipclusterz start'. Do "
288 "be created before running 'ipclusterz start'. Do "
255 "'ipclusterz create -h' or 'ipclusterz list -h' for more "
289 "'ipclusterz create -h' or 'ipclusterz list -h' for more "
256 "information about creating and listing cluster dirs."
290 "information about creating and listing cluster dirs."
257 )
291 )
292 elif subcommand=='engines':
293 self.auto_create_cluster_dir = False
294 try:
295 super(IPClusterApp, self).find_resources()
296 except ClusterDirError:
297 raise ClusterDirError(
298 "Could not find a cluster directory. A cluster dir must "
299 "be created before running 'ipclusterz start'. Do "
300 "'ipclusterz create -h' or 'ipclusterz list -h' for more "
301 "information about creating and listing cluster dirs."
302 )
258
303
259 def list_cluster_dirs(self):
304 def list_cluster_dirs(self):
260 # Find the search paths
305 # Find the search paths
261 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
306 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
262 if cluster_dir_paths:
307 if cluster_dir_paths:
263 cluster_dir_paths = cluster_dir_paths.split(':')
308 cluster_dir_paths = cluster_dir_paths.split(':')
264 else:
309 else:
265 cluster_dir_paths = []
310 cluster_dir_paths = []
266 try:
311 try:
267 ipython_dir = self.command_line_config.Global.ipython_dir
312 ipython_dir = self.command_line_config.Global.ipython_dir
268 except AttributeError:
313 except AttributeError:
269 ipython_dir = self.default_config.Global.ipython_dir
314 ipython_dir = self.default_config.Global.ipython_dir
270 paths = [os.getcwd(), ipython_dir] + \
315 paths = [os.getcwd(), ipython_dir] + \
271 cluster_dir_paths
316 cluster_dir_paths
272 paths = list(set(paths))
317 paths = list(set(paths))
273
318
274 self.log.info('Searching for cluster dirs in paths: %r' % paths)
319 self.log.info('Searching for cluster dirs in paths: %r' % paths)
275 for path in paths:
320 for path in paths:
276 files = os.listdir(path)
321 files = os.listdir(path)
277 for f in files:
322 for f in files:
278 full_path = os.path.join(path, f)
323 full_path = os.path.join(path, f)
279 if os.path.isdir(full_path) and f.startswith('cluster_'):
324 if os.path.isdir(full_path) and f.startswith('cluster_'):
280 profile = full_path.split('_')[-1]
325 profile = full_path.split('_')[-1]
281 start_cmd = 'ipclusterz start -p %s -n 4' % profile
326 start_cmd = 'ipclusterz start -p %s -n 4' % profile
282 print start_cmd + " ==> " + full_path
327 print start_cmd + " ==> " + full_path
283
328
284 def pre_construct(self):
329 def pre_construct(self):
285 # IPClusterApp.pre_construct() is where we cd to the working directory.
330 # IPClusterApp.pre_construct() is where we cd to the working directory.
286 super(IPClusterApp, self).pre_construct()
331 super(IPClusterApp, self).pre_construct()
287 config = self.master_config
332 config = self.master_config
288 try:
333 try:
289 daemon = config.Global.daemonize
334 daemon = config.Global.daemonize
290 if daemon:
335 if daemon:
291 config.Global.log_to_file = True
336 config.Global.log_to_file = True
292 except AttributeError:
337 except AttributeError:
293 pass
338 pass
294
339
295 def construct(self):
340 def construct(self):
296 config = self.master_config
341 config = self.master_config
297 subcmd = config.Global.subcommand
342 subcmd = config.Global.subcommand
298 reset = config.Global.reset_config
343 reset = config.Global.reset_config
299 if subcmd == 'list':
344 if subcmd == 'list':
300 return
345 return
301 if subcmd == 'create':
346 if subcmd == 'create':
302 self.log.info('Copying default config files to cluster directory '
347 self.log.info('Copying default config files to cluster directory '
303 '[overwrite=%r]' % (reset,))
348 '[overwrite=%r]' % (reset,))
304 self.cluster_dir_obj.copy_all_config_files(overwrite=reset)
349 self.cluster_dir_obj.copy_all_config_files(overwrite=reset)
305 if subcmd =='start':
350 if subcmd =='start':
306 self.cluster_dir_obj.copy_all_config_files(overwrite=False)
351 self.cluster_dir_obj.copy_all_config_files(overwrite=False)
307 self.start_logging()
352 self.start_logging()
308 self.loop = ioloop.IOLoop.instance()
353 self.loop = ioloop.IOLoop.instance()
309 # reactor.callWhenRunning(self.start_launchers)
354 # reactor.callWhenRunning(self.start_launchers)
310 dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop)
355 dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop)
311 dc.start()
356 dc.start()
357 if subcmd == 'engines':
358 self.start_logging()
359 self.loop = ioloop.IOLoop.instance()
360 # reactor.callWhenRunning(self.start_launchers)
361 engine_only = lambda : self.start_launchers(controller=False)
362 dc = ioloop.DelayedCallback(engine_only, 0, self.loop)
363 dc.start()
312
364
313 def start_launchers(self):
365 def start_launchers(self, controller=True):
314 config = self.master_config
366 config = self.master_config
315
367
316 # Create the launchers. In both bases, we set the work_dir of
368 # Create the launchers. In both bases, we set the work_dir of
317 # the launcher to the cluster_dir. This is where the launcher's
369 # the launcher to the cluster_dir. This is where the launcher's
318 # subprocesses will be launched. It is not where the controller
370 # subprocesses will be launched. It is not where the controller
319 # and engine will be launched.
371 # and engine will be launched.
372 if controller:
373 cl_class = import_item(config.Global.controller_launcher)
374 self.controller_launcher = cl_class(
375 work_dir=self.cluster_dir, config=config,
376 logname=self.log.name
377 )
378 # Setup the observing of stopping. If the controller dies, shut
379 # everything down as that will be completely fatal for the engines.
380 self.controller_launcher.on_stop(self.stop_launchers)
381 # But, we don't monitor the stopping of engines. An engine dying
382 # is just fine and in principle a user could start a new engine.
383 # Also, if we did monitor engine stopping, it is difficult to
384 # know what to do when only some engines die. Currently, the
385 # observing of engine stopping is inconsistent. Some launchers
386 # might trigger on a single engine stopping, other wait until
387 # all stop. TODO: think more about how to handle this.
388
389
320 el_class = import_item(config.Global.engine_launcher)
390 el_class = import_item(config.Global.engine_launcher)
321 self.engine_launcher = el_class(
391 self.engine_launcher = el_class(
322 work_dir=self.cluster_dir, config=config, logname=self.log.name
392 work_dir=self.cluster_dir, config=config, logname=self.log.name
323 )
393 )
324 cl_class = import_item(config.Global.controller_launcher)
325 self.controller_launcher = cl_class(
326 work_dir=self.cluster_dir, config=config,
327 logname=self.log.name
328 )
329
394
330 # Setup signals
395 # Setup signals
331 signal.signal(signal.SIGINT, self.sigint_handler)
396 signal.signal(signal.SIGINT, self.sigint_handler)
332
397
333 # Setup the observing of stopping. If the controller dies, shut
334 # everything down as that will be completely fatal for the engines.
335 self.controller_launcher.on_stop(self.stop_launchers)
336 # d1.addCallback(self.stop_launchers)
337 # But, we don't monitor the stopping of engines. An engine dying
338 # is just fine and in principle a user could start a new engine.
339 # Also, if we did monitor engine stopping, it is difficult to
340 # know what to do when only some engines die. Currently, the
341 # observing of engine stopping is inconsistent. Some launchers
342 # might trigger on a single engine stopping, other wait until
343 # all stop. TODO: think more about how to handle this.
344
345 # Start the controller and engines
398 # Start the controller and engines
346 self._stopping = False # Make sure stop_launchers is not called 2x.
399 self._stopping = False # Make sure stop_launchers is not called 2x.
347 d = self.start_controller()
400 if controller:
348 dc = ioloop.DelayedCallback(self.start_engines, 1000*config.Global.delay, self.loop)
401 self.start_controller()
402 dc = ioloop.DelayedCallback(self.start_engines, 1000*config.Global.delay*controller, self.loop)
349 dc.start()
403 dc.start()
350 self.startup_message()
404 self.startup_message()
351 # d.addCallback(self.start_engines)
352 # d.addCallback(self.startup_message)
353 # If the controller or engines fail to start, stop everything
354 # d.addErrback(self.stop_launchers)
355 return d
356
405
357 def startup_message(self, r=None):
406 def startup_message(self, r=None):
358 self.log.info("IPython cluster: started")
407 self.log.info("IPython cluster: started")
359 return r
408 return r
360
409
361 def start_controller(self, r=None):
410 def start_controller(self, r=None):
362 # self.log.info("In start_controller")
411 # self.log.info("In start_controller")
363 config = self.master_config
412 config = self.master_config
364 d = self.controller_launcher.start(
413 d = self.controller_launcher.start(
365 cluster_dir=config.Global.cluster_dir
414 cluster_dir=config.Global.cluster_dir
366 )
415 )
367 return d
416 return d
368
417
369 def start_engines(self, r=None):
418 def start_engines(self, r=None):
370 # self.log.info("In start_engines")
419 # self.log.info("In start_engines")
371 config = self.master_config
420 config = self.master_config
421
372 d = self.engine_launcher.start(
422 d = self.engine_launcher.start(
373 config.Global.n,
423 config.Global.n,
374 cluster_dir=config.Global.cluster_dir
424 cluster_dir=config.Global.cluster_dir
375 )
425 )
376 return d
426 return d
377
427
378 def stop_controller(self, r=None):
428 def stop_controller(self, r=None):
379 # self.log.info("In stop_controller")
429 # self.log.info("In stop_controller")
380 if self.controller_launcher.running:
430 if self.controller_launcher.running:
381 return self.controller_launcher.stop()
431 return self.controller_launcher.stop()
382
432
383 def stop_engines(self, r=None):
433 def stop_engines(self, r=None):
384 # self.log.info("In stop_engines")
434 # self.log.info("In stop_engines")
385 if self.engine_launcher.running:
435 if self.engine_launcher.running:
386 d = self.engine_launcher.stop()
436 d = self.engine_launcher.stop()
387 # d.addErrback(self.log_err)
437 # d.addErrback(self.log_err)
388 return d
438 return d
389 else:
439 else:
390 return None
440 return None
391
441
392 def log_err(self, f):
442 def log_err(self, f):
393 self.log.error(f.getTraceback())
443 self.log.error(f.getTraceback())
394 return None
444 return None
395
445
396 def stop_launchers(self, r=None):
446 def stop_launchers(self, r=None):
397 if not self._stopping:
447 if not self._stopping:
398 self._stopping = True
448 self._stopping = True
399 # if isinstance(r, failure.Failure):
449 # if isinstance(r, failure.Failure):
400 # self.log.error('Unexpected error in ipcluster:')
450 # self.log.error('Unexpected error in ipcluster:')
401 # self.log.info(r.getTraceback())
451 # self.log.info(r.getTraceback())
402 self.log.error("IPython cluster: stopping")
452 self.log.error("IPython cluster: stopping")
403 # These return deferreds. We are not doing anything with them
453 # These return deferreds. We are not doing anything with them
404 # but we are holding refs to them as a reminder that they
454 # but we are holding refs to them as a reminder that they
405 # do return deferreds.
455 # do return deferreds.
406 d1 = self.stop_engines()
456 d1 = self.stop_engines()
407 d2 = self.stop_controller()
457 d2 = self.stop_controller()
408 # Wait a few seconds to let things shut down.
458 # Wait a few seconds to let things shut down.
409 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
459 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
410 dc.start()
460 dc.start()
411 # reactor.callLater(4.0, reactor.stop)
461 # reactor.callLater(4.0, reactor.stop)
412
462
413 def sigint_handler(self, signum, frame):
463 def sigint_handler(self, signum, frame):
414 self.stop_launchers()
464 self.stop_launchers()
415
465
416 def start_logging(self):
466 def start_logging(self):
417 # Remove old log files of the controller and engine
467 # Remove old log files of the controller and engine
418 if self.master_config.Global.clean_logs:
468 if self.master_config.Global.clean_logs:
419 log_dir = self.master_config.Global.log_dir
469 log_dir = self.master_config.Global.log_dir
420 for f in os.listdir(log_dir):
470 for f in os.listdir(log_dir):
421 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
471 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
422 os.remove(os.path.join(log_dir, f))
472 os.remove(os.path.join(log_dir, f))
423 # This will remove old log files for ipcluster itself
473 # This will remove old log files for ipcluster itself
424 super(IPClusterApp, self).start_logging()
474 super(IPClusterApp, self).start_logging()
425
475
426 def start_app(self):
476 def start_app(self):
427 """Start the application, depending on what subcommand is used."""
477 """Start the application, depending on what subcommand is used."""
428 subcmd = self.master_config.Global.subcommand
478 subcmd = self.master_config.Global.subcommand
429 if subcmd=='create' or subcmd=='list':
479 if subcmd=='create' or subcmd=='list':
430 return
480 return
431 elif subcmd=='start':
481 elif subcmd=='start':
432 self.start_app_start()
482 self.start_app_start()
433 elif subcmd=='stop':
483 elif subcmd=='stop':
434 self.start_app_stop()
484 self.start_app_stop()
485 elif subcmd=='engines':
486 self.start_app_engines()
435
487
436 def start_app_start(self):
488 def start_app_start(self):
437 """Start the app for the start subcommand."""
489 """Start the app for the start subcommand."""
438 config = self.master_config
490 config = self.master_config
439 # First see if the cluster is already running
491 # First see if the cluster is already running
440 try:
492 try:
441 pid = self.get_pid_from_file()
493 pid = self.get_pid_from_file()
442 except PIDFileError:
494 except PIDFileError:
443 pass
495 pass
444 else:
496 else:
445 self.log.critical(
497 self.log.critical(
446 'Cluster is already running with [pid=%s]. '
498 'Cluster is already running with [pid=%s]. '
447 'use "ipclusterz stop" to stop the cluster.' % pid
499 'use "ipclusterz stop" to stop the cluster.' % pid
448 )
500 )
449 # Here I exit with a unusual exit status that other processes
501 # Here I exit with a unusual exit status that other processes
450 # can watch for to learn how I existed.
502 # can watch for to learn how I existed.
451 self.exit(ALREADY_STARTED)
503 self.exit(ALREADY_STARTED)
452
504
453 # Now log and daemonize
505 # Now log and daemonize
454 self.log.info(
506 self.log.info(
455 'Starting ipclusterz with [daemon=%r]' % config.Global.daemonize
507 'Starting ipclusterz with [daemon=%r]' % config.Global.daemonize
456 )
508 )
457 # TODO: Get daemonize working on Windows or as a Windows Server.
509 # TODO: Get daemonize working on Windows or as a Windows Server.
458 if config.Global.daemonize:
510 if config.Global.daemonize:
459 if os.name=='posix':
511 if os.name=='posix':
460 from twisted.scripts._twistd_unix import daemonize
512 from twisted.scripts._twistd_unix import daemonize
461 daemonize()
513 daemonize()
462
514
463 # Now write the new pid file AFTER our new forked pid is active.
515 # Now write the new pid file AFTER our new forked pid is active.
464 self.write_pid_file()
516 self.write_pid_file()
465 try:
517 try:
466 self.loop.start()
518 self.loop.start()
467 except:
519 except:
468 self.log.info("stopping...")
520 self.log.info("stopping...")
469 self.remove_pid_file()
521 self.remove_pid_file()
470
522
523 def start_app_engines(self):
524 """Start the app for the start subcommand."""
525 config = self.master_config
526 # First see if the cluster is already running
527
528 # Now log and daemonize
529 self.log.info(
530 'Starting engines with [daemon=%r]' % config.Global.daemonize
531 )
532 # TODO: Get daemonize working on Windows or as a Windows Server.
533 if config.Global.daemonize:
534 if os.name=='posix':
535 from twisted.scripts._twistd_unix import daemonize
536 daemonize()
537
538 # Now write the new pid file AFTER our new forked pid is active.
539 # self.write_pid_file()
540 try:
541 self.loop.start()
542 except:
543 self.log.fatal("stopping...")
544 # self.remove_pid_file()
545
471 def start_app_stop(self):
546 def start_app_stop(self):
472 """Start the app for the stop subcommand."""
547 """Start the app for the stop subcommand."""
473 config = self.master_config
548 config = self.master_config
474 try:
549 try:
475 pid = self.get_pid_from_file()
550 pid = self.get_pid_from_file()
476 except PIDFileError:
551 except PIDFileError:
477 self.log.critical(
552 self.log.critical(
478 'Problem reading pid file, cluster is probably not running.'
553 'Problem reading pid file, cluster is probably not running.'
479 )
554 )
480 # Here I exit with a unusual exit status that other processes
555 # Here I exit with a unusual exit status that other processes
481 # can watch for to learn how I existed.
556 # can watch for to learn how I existed.
482 self.exit(ALREADY_STOPPED)
557 self.exit(ALREADY_STOPPED)
483 else:
558 else:
484 if os.name=='posix':
559 if os.name=='posix':
485 sig = config.Global.signal
560 sig = config.Global.signal
486 self.log.info(
561 self.log.info(
487 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
562 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
488 )
563 )
489 os.kill(pid, sig)
564 os.kill(pid, sig)
490 elif os.name=='nt':
565 elif os.name=='nt':
491 # As of right now, we don't support daemonize on Windows, so
566 # As of right now, we don't support daemonize on Windows, so
492 # stop will not do anything. Minimally, it should clean up the
567 # stop will not do anything. Minimally, it should clean up the
493 # old .pid files.
568 # old .pid files.
494 self.remove_pid_file()
569 self.remove_pid_file()
495
570
496
571
497 def launch_new_instance():
572 def launch_new_instance():
498 """Create and run the IPython cluster."""
573 """Create and run the IPython cluster."""
499 app = IPClusterApp()
574 app = IPClusterApp()
500 app.start()
575 app.start()
501
576
502
577
503 if __name__ == '__main__':
578 if __name__ == '__main__':
504 launch_new_instance()
579 launch_new_instance()
505
580
@@ -1,835 +1,844 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching IPython processes asynchronously.
4 Facilities for launching IPython processes asynchronously.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import re
19 import re
20 import sys
20 import sys
21 import logging
21 import logging
22
22
23 from signal import SIGINT, SIGTERM
23 from signal import SIGINT, SIGTERM
24 try:
24 try:
25 from signal import SIGKILL
25 from signal import SIGKILL
26 except ImportError:
26 except ImportError:
27 SIGKILL=SIGTERM
27 SIGKILL=SIGTERM
28
28
29 from subprocess import Popen, PIPE, STDOUT
29 from subprocess import Popen, PIPE, STDOUT
30 try:
30 try:
31 from subprocess import check_open
31 from subprocess import check_output
32 except ImportError:
32 except ImportError:
33 # pre-2.7:
33 # pre-2.7:
34 from StringIO import StringIO
34 from StringIO import StringIO
35
35
36 def check_open(*args, **kwargs):
36 def check_output(*args, **kwargs):
37 sio = StringIO()
37 sio = StringIO()
38 kwargs.update(dict(stdout=PIPE, stderr=STDOUT))
38 kwargs.update(dict(stdout=PIPE, stderr=STDOUT))
39 p = Popen(*args, **kwargs)
39 p = Popen(*args, **kwargs)
40 out,err = p.communicate()
40 out,err = p.communicate()
41 return out
41 return out
42
42
43 from zmq.eventloop import ioloop
43 from zmq.eventloop import ioloop
44
44
45 from IPython.external import Itpl
45 from IPython.external import Itpl
46 # from IPython.config.configurable import Configurable
46 # from IPython.config.configurable import Configurable
47 from IPython.utils.traitlets import Str, Int, List, Unicode, Instance
47 from IPython.utils.traitlets import Str, Int, List, Unicode, Instance
48 from IPython.utils.path import get_ipython_module_path
48 from IPython.utils.path import get_ipython_module_path
49 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
49 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
50
50
51 from factory import LoggingFactory
51 from factory import LoggingFactory
52
52
53 # load winhpcjob from IPython.kernel
53 # load winhpcjob from IPython.kernel
54 try:
54 try:
55 from IPython.kernel.winhpcjob import (
55 from IPython.kernel.winhpcjob import (
56 IPControllerTask, IPEngineTask,
56 IPControllerTask, IPEngineTask,
57 IPControllerJob, IPEngineSetJob
57 IPControllerJob, IPEngineSetJob
58 )
58 )
59 except ImportError:
59 except ImportError:
60 pass
60 pass
61
61
62
62
63 #-----------------------------------------------------------------------------
63 #-----------------------------------------------------------------------------
64 # Paths to the kernel apps
64 # Paths to the kernel apps
65 #-----------------------------------------------------------------------------
65 #-----------------------------------------------------------------------------
66
66
67
67
68 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
68 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
69 'IPython.zmq.parallel.ipclusterapp'
69 'IPython.zmq.parallel.ipclusterapp'
70 ))
70 ))
71
71
72 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
72 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
73 'IPython.zmq.parallel.ipengineapp'
73 'IPython.zmq.parallel.ipengineapp'
74 ))
74 ))
75
75
76 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
76 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
77 'IPython.zmq.parallel.ipcontrollerapp'
77 'IPython.zmq.parallel.ipcontrollerapp'
78 ))
78 ))
79
79
80 #-----------------------------------------------------------------------------
80 #-----------------------------------------------------------------------------
81 # Base launchers and errors
81 # Base launchers and errors
82 #-----------------------------------------------------------------------------
82 #-----------------------------------------------------------------------------
83
83
84
84
85 class LauncherError(Exception):
85 class LauncherError(Exception):
86 pass
86 pass
87
87
88
88
89 class ProcessStateError(LauncherError):
89 class ProcessStateError(LauncherError):
90 pass
90 pass
91
91
92
92
93 class UnknownStatus(LauncherError):
93 class UnknownStatus(LauncherError):
94 pass
94 pass
95
95
96
96
97 class BaseLauncher(LoggingFactory):
97 class BaseLauncher(LoggingFactory):
98 """An asbtraction for starting, stopping and signaling a process."""
98 """An asbtraction for starting, stopping and signaling a process."""
99
99
100 # In all of the launchers, the work_dir is where child processes will be
100 # In all of the launchers, the work_dir is where child processes will be
101 # run. This will usually be the cluster_dir, but may not be. any work_dir
101 # run. This will usually be the cluster_dir, but may not be. any work_dir
102 # passed into the __init__ method will override the config value.
102 # passed into the __init__ method will override the config value.
103 # This should not be used to set the work_dir for the actual engine
103 # This should not be used to set the work_dir for the actual engine
104 # and controller. Instead, use their own config files or the
104 # and controller. Instead, use their own config files or the
105 # controller_args, engine_args attributes of the launchers to add
105 # controller_args, engine_args attributes of the launchers to add
106 # the --work-dir option.
106 # the --work-dir option.
107 work_dir = Unicode(u'.')
107 work_dir = Unicode(u'.')
108 loop = Instance('zmq.eventloop.ioloop.IOLoop')
108 loop = Instance('zmq.eventloop.ioloop.IOLoop')
109 def _loop_default(self):
109 def _loop_default(self):
110 return ioloop.IOLoop.instance()
110 return ioloop.IOLoop.instance()
111
111
112 def __init__(self, work_dir=u'.', config=None, **kwargs):
112 def __init__(self, work_dir=u'.', config=None, **kwargs):
113 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
113 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
114 self.state = 'before' # can be before, running, after
114 self.state = 'before' # can be before, running, after
115 self.stop_callbacks = []
115 self.stop_callbacks = []
116 self.start_data = None
116 self.start_data = None
117 self.stop_data = None
117 self.stop_data = None
118
118
119 @property
119 @property
120 def args(self):
120 def args(self):
121 """A list of cmd and args that will be used to start the process.
121 """A list of cmd and args that will be used to start the process.
122
122
123 This is what is passed to :func:`spawnProcess` and the first element
123 This is what is passed to :func:`spawnProcess` and the first element
124 will be the process name.
124 will be the process name.
125 """
125 """
126 return self.find_args()
126 return self.find_args()
127
127
128 def find_args(self):
128 def find_args(self):
129 """The ``.args`` property calls this to find the args list.
129 """The ``.args`` property calls this to find the args list.
130
130
131 Subcommand should implement this to construct the cmd and args.
131 Subcommand should implement this to construct the cmd and args.
132 """
132 """
133 raise NotImplementedError('find_args must be implemented in a subclass')
133 raise NotImplementedError('find_args must be implemented in a subclass')
134
134
135 @property
135 @property
136 def arg_str(self):
136 def arg_str(self):
137 """The string form of the program arguments."""
137 """The string form of the program arguments."""
138 return ' '.join(self.args)
138 return ' '.join(self.args)
139
139
140 @property
140 @property
141 def running(self):
141 def running(self):
142 """Am I running."""
142 """Am I running."""
143 if self.state == 'running':
143 if self.state == 'running':
144 return True
144 return True
145 else:
145 else:
146 return False
146 return False
147
147
148 def start(self):
148 def start(self):
149 """Start the process.
149 """Start the process.
150
150
151 This must return a deferred that fires with information about the
151 This must return a deferred that fires with information about the
152 process starting (like a pid, job id, etc.).
152 process starting (like a pid, job id, etc.).
153 """
153 """
154 raise NotImplementedError('start must be implemented in a subclass')
154 raise NotImplementedError('start must be implemented in a subclass')
155
155
156 def stop(self):
156 def stop(self):
157 """Stop the process and notify observers of stopping.
157 """Stop the process and notify observers of stopping.
158
158
159 This must return a deferred that fires with information about the
159 This must return a deferred that fires with information about the
160 processing stopping, like errors that occur while the process is
160 processing stopping, like errors that occur while the process is
161 attempting to be shut down. This deferred won't fire when the process
161 attempting to be shut down. This deferred won't fire when the process
162 actually stops. To observe the actual process stopping, see
162 actually stops. To observe the actual process stopping, see
163 :func:`observe_stop`.
163 :func:`observe_stop`.
164 """
164 """
165 raise NotImplementedError('stop must be implemented in a subclass')
165 raise NotImplementedError('stop must be implemented in a subclass')
166
166
167 def on_stop(self, f):
167 def on_stop(self, f):
168 """Get a deferred that will fire when the process stops.
168 """Get a deferred that will fire when the process stops.
169
169
170 The deferred will fire with data that contains information about
170 The deferred will fire with data that contains information about
171 the exit status of the process.
171 the exit status of the process.
172 """
172 """
173 if self.state=='after':
173 if self.state=='after':
174 return f(self.stop_data)
174 return f(self.stop_data)
175 else:
175 else:
176 self.stop_callbacks.append(f)
176 self.stop_callbacks.append(f)
177
177
178 def notify_start(self, data):
178 def notify_start(self, data):
179 """Call this to trigger startup actions.
179 """Call this to trigger startup actions.
180
180
181 This logs the process startup and sets the state to 'running'. It is
181 This logs the process startup and sets the state to 'running'. It is
182 a pass-through so it can be used as a callback.
182 a pass-through so it can be used as a callback.
183 """
183 """
184
184
185 self.log.info('Process %r started: %r' % (self.args[0], data))
185 self.log.info('Process %r started: %r' % (self.args[0], data))
186 self.start_data = data
186 self.start_data = data
187 self.state = 'running'
187 self.state = 'running'
188 return data
188 return data
189
189
190 def notify_stop(self, data):
190 def notify_stop(self, data):
191 """Call this to trigger process stop actions.
191 """Call this to trigger process stop actions.
192
192
193 This logs the process stopping and sets the state to 'after'. Call
193 This logs the process stopping and sets the state to 'after'. Call
194 this to trigger all the deferreds from :func:`observe_stop`."""
194 this to trigger all the deferreds from :func:`observe_stop`."""
195
195
196 self.log.info('Process %r stopped: %r' % (self.args[0], data))
196 self.log.info('Process %r stopped: %r' % (self.args[0], data))
197 self.stop_data = data
197 self.stop_data = data
198 self.state = 'after'
198 self.state = 'after'
199 for i in range(len(self.stop_callbacks)):
199 for i in range(len(self.stop_callbacks)):
200 d = self.stop_callbacks.pop()
200 d = self.stop_callbacks.pop()
201 d(data)
201 d(data)
202 return data
202 return data
203
203
204 def signal(self, sig):
204 def signal(self, sig):
205 """Signal the process.
205 """Signal the process.
206
206
207 Return a semi-meaningless deferred after signaling the process.
207 Return a semi-meaningless deferred after signaling the process.
208
208
209 Parameters
209 Parameters
210 ----------
210 ----------
211 sig : str or int
211 sig : str or int
212 'KILL', 'INT', etc., or any signal number
212 'KILL', 'INT', etc., or any signal number
213 """
213 """
214 raise NotImplementedError('signal must be implemented in a subclass')
214 raise NotImplementedError('signal must be implemented in a subclass')
215
215
216
216
217 #-----------------------------------------------------------------------------
217 #-----------------------------------------------------------------------------
218 # Local process launchers
218 # Local process launchers
219 #-----------------------------------------------------------------------------
219 #-----------------------------------------------------------------------------
220
220
221
221
222 class LocalProcessLauncher(BaseLauncher):
222 class LocalProcessLauncher(BaseLauncher):
223 """Start and stop an external process in an asynchronous manner.
223 """Start and stop an external process in an asynchronous manner.
224
224
225 This will launch the external process with a working directory of
225 This will launch the external process with a working directory of
226 ``self.work_dir``.
226 ``self.work_dir``.
227 """
227 """
228
228
229 # This is used to to construct self.args, which is passed to
229 # This is used to to construct self.args, which is passed to
230 # spawnProcess.
230 # spawnProcess.
231 cmd_and_args = List([])
231 cmd_and_args = List([])
232 poll_frequency = Int(100) # in ms
232 poll_frequency = Int(100) # in ms
233
233
234 def __init__(self, work_dir=u'.', config=None, **kwargs):
234 def __init__(self, work_dir=u'.', config=None, **kwargs):
235 super(LocalProcessLauncher, self).__init__(
235 super(LocalProcessLauncher, self).__init__(
236 work_dir=work_dir, config=config, **kwargs
236 work_dir=work_dir, config=config, **kwargs
237 )
237 )
238 self.process = None
238 self.process = None
239 self.start_deferred = None
239 self.start_deferred = None
240 self.poller = None
240 self.poller = None
241
241
242 def find_args(self):
242 def find_args(self):
243 return self.cmd_and_args
243 return self.cmd_and_args
244
244
245 def start(self):
245 def start(self):
246 if self.state == 'before':
246 if self.state == 'before':
247 self.process = Popen(self.args,
247 self.process = Popen(self.args,
248 stdout=PIPE,stderr=PIPE,stdin=PIPE,
248 stdout=PIPE,stderr=PIPE,stdin=PIPE,
249 env=os.environ,
249 env=os.environ,
250 cwd=self.work_dir
250 cwd=self.work_dir
251 )
251 )
252
252
253 self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ)
253 self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ)
254 self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ)
254 self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ)
255 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
255 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
256 self.poller.start()
256 self.poller.start()
257 self.notify_start(self.process.pid)
257 self.notify_start(self.process.pid)
258 else:
258 else:
259 s = 'The process was already started and has state: %r' % self.state
259 s = 'The process was already started and has state: %r' % self.state
260 raise ProcessStateError(s)
260 raise ProcessStateError(s)
261
261
262 def stop(self):
262 def stop(self):
263 return self.interrupt_then_kill()
263 return self.interrupt_then_kill()
264
264
265 def signal(self, sig):
265 def signal(self, sig):
266 if self.state == 'running':
266 if self.state == 'running':
267 self.process.send_signal(sig)
267 self.process.send_signal(sig)
268
268
269 def interrupt_then_kill(self, delay=2.0):
269 def interrupt_then_kill(self, delay=2.0):
270 """Send INT, wait a delay and then send KILL."""
270 """Send INT, wait a delay and then send KILL."""
271 self.signal(SIGINT)
271 self.signal(SIGINT)
272 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
272 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
273 self.killer.start()
273 self.killer.start()
274
274
275 # callbacks, etc:
275 # callbacks, etc:
276
276
277 def handle_stdout(self, fd, events):
277 def handle_stdout(self, fd, events):
278 line = self.process.stdout.readline()
278 line = self.process.stdout.readline()
279 # a stopped process will be readable but return empty strings
279 # a stopped process will be readable but return empty strings
280 if line:
280 if line:
281 self.log.info(line[:-1])
281 self.log.info(line[:-1])
282 else:
282 else:
283 self.poll()
283 self.poll()
284
284
285 def handle_stderr(self, fd, events):
285 def handle_stderr(self, fd, events):
286 line = self.process.stderr.readline()
286 line = self.process.stderr.readline()
287 # a stopped process will be readable but return empty strings
287 # a stopped process will be readable but return empty strings
288 if line:
288 if line:
289 self.log.error(line[:-1])
289 self.log.error(line[:-1])
290 else:
290 else:
291 self.poll()
291 self.poll()
292
292
293 def poll(self):
293 def poll(self):
294 status = self.process.poll()
294 status = self.process.poll()
295 if status is not None:
295 if status is not None:
296 self.poller.stop()
296 self.poller.stop()
297 self.loop.remove_handler(self.process.stdout.fileno())
297 self.loop.remove_handler(self.process.stdout.fileno())
298 self.loop.remove_handler(self.process.stderr.fileno())
298 self.loop.remove_handler(self.process.stderr.fileno())
299 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
299 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
300 return status
300 return status
301
301
302 class LocalControllerLauncher(LocalProcessLauncher):
302 class LocalControllerLauncher(LocalProcessLauncher):
303 """Launch a controller as a regular external process."""
303 """Launch a controller as a regular external process."""
304
304
305 controller_cmd = List(ipcontroller_cmd_argv, config=True)
305 controller_cmd = List(ipcontroller_cmd_argv, config=True)
306 # Command line arguments to ipcontroller.
306 # Command line arguments to ipcontroller.
307 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
307 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
308
308
309 def find_args(self):
309 def find_args(self):
310 return self.controller_cmd + self.controller_args
310 return self.controller_cmd + self.controller_args
311
311
312 def start(self, cluster_dir):
312 def start(self, cluster_dir):
313 """Start the controller by cluster_dir."""
313 """Start the controller by cluster_dir."""
314 self.controller_args.extend(['--cluster-dir', cluster_dir])
314 self.controller_args.extend(['--cluster-dir', cluster_dir])
315 self.cluster_dir = unicode(cluster_dir)
315 self.cluster_dir = unicode(cluster_dir)
316 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
316 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
317 return super(LocalControllerLauncher, self).start()
317 return super(LocalControllerLauncher, self).start()
318
318
319
319
320 class LocalEngineLauncher(LocalProcessLauncher):
320 class LocalEngineLauncher(LocalProcessLauncher):
321 """Launch a single engine as a regular externall process."""
321 """Launch a single engine as a regular externall process."""
322
322
323 engine_cmd = List(ipengine_cmd_argv, config=True)
323 engine_cmd = List(ipengine_cmd_argv, config=True)
324 # Command line arguments for ipengine.
324 # Command line arguments for ipengine.
325 engine_args = List(
325 engine_args = List(
326 ['--log-to-file','--log-level', str(logging.INFO)], config=True
326 ['--log-to-file','--log-level', str(logging.INFO)], config=True
327 )
327 )
328
328
329 def find_args(self):
329 def find_args(self):
330 return self.engine_cmd + self.engine_args
330 return self.engine_cmd + self.engine_args
331
331
332 def start(self, cluster_dir):
332 def start(self, cluster_dir):
333 """Start the engine by cluster_dir."""
333 """Start the engine by cluster_dir."""
334 self.engine_args.extend(['--cluster-dir', cluster_dir])
334 self.engine_args.extend(['--cluster-dir', cluster_dir])
335 self.cluster_dir = unicode(cluster_dir)
335 self.cluster_dir = unicode(cluster_dir)
336 return super(LocalEngineLauncher, self).start()
336 return super(LocalEngineLauncher, self).start()
337
337
338
338
339 class LocalEngineSetLauncher(BaseLauncher):
339 class LocalEngineSetLauncher(BaseLauncher):
340 """Launch a set of engines as regular external processes."""
340 """Launch a set of engines as regular external processes."""
341
341
342 # Command line arguments for ipengine.
342 # Command line arguments for ipengine.
343 engine_args = List(
343 engine_args = List(
344 ['--log-to-file','--log-level', str(logging.INFO)], config=True
344 ['--log-to-file','--log-level', str(logging.INFO)], config=True
345 )
345 )
346 # launcher class
346 # launcher class
347 launcher_class = LocalEngineLauncher
347 launcher_class = LocalEngineLauncher
348
348
349 def __init__(self, work_dir=u'.', config=None, **kwargs):
349 def __init__(self, work_dir=u'.', config=None, **kwargs):
350 super(LocalEngineSetLauncher, self).__init__(
350 super(LocalEngineSetLauncher, self).__init__(
351 work_dir=work_dir, config=config, **kwargs
351 work_dir=work_dir, config=config, **kwargs
352 )
352 )
353 self.launchers = {}
353 self.launchers = {}
354 self.stop_data = {}
354 self.stop_data = {}
355
355
356 def start(self, n, cluster_dir):
356 def start(self, n, cluster_dir):
357 """Start n engines by profile or cluster_dir."""
357 """Start n engines by profile or cluster_dir."""
358 self.cluster_dir = unicode(cluster_dir)
358 self.cluster_dir = unicode(cluster_dir)
359 dlist = []
359 dlist = []
360 for i in range(n):
360 for i in range(n):
361 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
361 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
362 # Copy the engine args over to each engine launcher.
362 # Copy the engine args over to each engine launcher.
363 import copy
363 import copy
364 el.engine_args = copy.deepcopy(self.engine_args)
364 el.engine_args = copy.deepcopy(self.engine_args)
365 el.on_stop(self._notice_engine_stopped)
365 el.on_stop(self._notice_engine_stopped)
366 d = el.start(cluster_dir)
366 d = el.start(cluster_dir)
367 if i==0:
367 if i==0:
368 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
368 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
369 self.launchers[i] = el
369 self.launchers[i] = el
370 dlist.append(d)
370 dlist.append(d)
371 self.notify_start(dlist)
371 self.notify_start(dlist)
372 # The consumeErrors here could be dangerous
372 # The consumeErrors here could be dangerous
373 # dfinal = gatherBoth(dlist, consumeErrors=True)
373 # dfinal = gatherBoth(dlist, consumeErrors=True)
374 # dfinal.addCallback(self.notify_start)
374 # dfinal.addCallback(self.notify_start)
375 return dlist
375 return dlist
376
376
377 def find_args(self):
377 def find_args(self):
378 return ['engine set']
378 return ['engine set']
379
379
380 def signal(self, sig):
380 def signal(self, sig):
381 dlist = []
381 dlist = []
382 for el in self.launchers.itervalues():
382 for el in self.launchers.itervalues():
383 d = el.signal(sig)
383 d = el.signal(sig)
384 dlist.append(d)
384 dlist.append(d)
385 # dfinal = gatherBoth(dlist, consumeErrors=True)
385 # dfinal = gatherBoth(dlist, consumeErrors=True)
386 return dlist
386 return dlist
387
387
388 def interrupt_then_kill(self, delay=1.0):
388 def interrupt_then_kill(self, delay=1.0):
389 dlist = []
389 dlist = []
390 for el in self.launchers.itervalues():
390 for el in self.launchers.itervalues():
391 d = el.interrupt_then_kill(delay)
391 d = el.interrupt_then_kill(delay)
392 dlist.append(d)
392 dlist.append(d)
393 # dfinal = gatherBoth(dlist, consumeErrors=True)
393 # dfinal = gatherBoth(dlist, consumeErrors=True)
394 return dlist
394 return dlist
395
395
396 def stop(self):
396 def stop(self):
397 return self.interrupt_then_kill()
397 return self.interrupt_then_kill()
398
398
399 def _notice_engine_stopped(self, data):
399 def _notice_engine_stopped(self, data):
400 print "notice", data
400 print "notice", data
401 pid = data['pid']
401 pid = data['pid']
402 for idx,el in self.launchers.iteritems():
402 for idx,el in self.launchers.iteritems():
403 if el.process.pid == pid:
403 if el.process.pid == pid:
404 break
404 break
405 self.launchers.pop(idx)
405 self.launchers.pop(idx)
406 self.stop_data[idx] = data
406 self.stop_data[idx] = data
407 if not self.launchers:
407 if not self.launchers:
408 self.notify_stop(self.stop_data)
408 self.notify_stop(self.stop_data)
409
409
410
410
411 #-----------------------------------------------------------------------------
411 #-----------------------------------------------------------------------------
412 # MPIExec launchers
412 # MPIExec launchers
413 #-----------------------------------------------------------------------------
413 #-----------------------------------------------------------------------------
414
414
415
415
416 class MPIExecLauncher(LocalProcessLauncher):
416 class MPIExecLauncher(LocalProcessLauncher):
417 """Launch an external process using mpiexec."""
417 """Launch an external process using mpiexec."""
418
418
419 # The mpiexec command to use in starting the process.
419 # The mpiexec command to use in starting the process.
420 mpi_cmd = List(['mpiexec'], config=True)
420 mpi_cmd = List(['mpiexec'], config=True)
421 # The command line arguments to pass to mpiexec.
421 # The command line arguments to pass to mpiexec.
422 mpi_args = List([], config=True)
422 mpi_args = List([], config=True)
423 # The program to start using mpiexec.
423 # The program to start using mpiexec.
424 program = List(['date'], config=True)
424 program = List(['date'], config=True)
425 # The command line argument to the program.
425 # The command line argument to the program.
426 program_args = List([], config=True)
426 program_args = List([], config=True)
427 # The number of instances of the program to start.
427 # The number of instances of the program to start.
428 n = Int(1, config=True)
428 n = Int(1, config=True)
429
429
430 def find_args(self):
430 def find_args(self):
431 """Build self.args using all the fields."""
431 """Build self.args using all the fields."""
432 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
432 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
433 self.program + self.program_args
433 self.program + self.program_args
434
434
435 def start(self, n):
435 def start(self, n):
436 """Start n instances of the program using mpiexec."""
436 """Start n instances of the program using mpiexec."""
437 self.n = n
437 self.n = n
438 return super(MPIExecLauncher, self).start()
438 return super(MPIExecLauncher, self).start()
439
439
440
440
441 class MPIExecControllerLauncher(MPIExecLauncher):
441 class MPIExecControllerLauncher(MPIExecLauncher):
442 """Launch a controller using mpiexec."""
442 """Launch a controller using mpiexec."""
443
443
444 controller_cmd = List(ipcontroller_cmd_argv, config=True)
444 controller_cmd = List(ipcontroller_cmd_argv, config=True)
445 # Command line arguments to ipcontroller.
445 # Command line arguments to ipcontroller.
446 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
446 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
447 n = Int(1, config=False)
447 n = Int(1, config=False)
448
448
449 def start(self, cluster_dir):
449 def start(self, cluster_dir):
450 """Start the controller by cluster_dir."""
450 """Start the controller by cluster_dir."""
451 self.controller_args.extend(['--cluster-dir', cluster_dir])
451 self.controller_args.extend(['--cluster-dir', cluster_dir])
452 self.cluster_dir = unicode(cluster_dir)
452 self.cluster_dir = unicode(cluster_dir)
453 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
453 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
454 return super(MPIExecControllerLauncher, self).start(1)
454 return super(MPIExecControllerLauncher, self).start(1)
455
455
456 def find_args(self):
456 def find_args(self):
457 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
457 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
458 self.controller_cmd + self.controller_args
458 self.controller_cmd + self.controller_args
459
459
460
460
461 class MPIExecEngineSetLauncher(MPIExecLauncher):
461 class MPIExecEngineSetLauncher(MPIExecLauncher):
462
462
463 engine_cmd = List(ipengine_cmd_argv, config=True)
463 engine_cmd = List(ipengine_cmd_argv, config=True)
464 # Command line arguments for ipengine.
464 # Command line arguments for ipengine.
465 engine_args = List(
465 engine_args = List(
466 ['--log-to-file','--log-level', str(logging.INFO)], config=True
466 ['--log-to-file','--log-level', str(logging.INFO)], config=True
467 )
467 )
468 n = Int(1, config=True)
468 n = Int(1, config=True)
469
469
470 def start(self, n, cluster_dir):
470 def start(self, n, cluster_dir):
471 """Start n engines by profile or cluster_dir."""
471 """Start n engines by profile or cluster_dir."""
472 self.engine_args.extend(['--cluster-dir', cluster_dir])
472 self.engine_args.extend(['--cluster-dir', cluster_dir])
473 self.cluster_dir = unicode(cluster_dir)
473 self.cluster_dir = unicode(cluster_dir)
474 self.n = n
474 self.n = n
475 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
475 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
476 return super(MPIExecEngineSetLauncher, self).start(n)
476 return super(MPIExecEngineSetLauncher, self).start(n)
477
477
478 def find_args(self):
478 def find_args(self):
479 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
479 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
480 self.engine_cmd + self.engine_args
480 self.engine_cmd + self.engine_args
481
481
482
482
483 #-----------------------------------------------------------------------------
483 #-----------------------------------------------------------------------------
484 # SSH launchers
484 # SSH launchers
485 #-----------------------------------------------------------------------------
485 #-----------------------------------------------------------------------------
486
486
487 # TODO: Get SSH Launcher working again.
487 # TODO: Get SSH Launcher working again.
488
488
489 class SSHLauncher(LocalProcessLauncher):
489 class SSHLauncher(LocalProcessLauncher):
490 """A minimal launcher for ssh.
490 """A minimal launcher for ssh.
491
491
492 To be useful this will probably have to be extended to use the ``sshx``
492 To be useful this will probably have to be extended to use the ``sshx``
493 idea for environment variables. There could be other things this needs
493 idea for environment variables. There could be other things this needs
494 as well.
494 as well.
495 """
495 """
496
496
497 ssh_cmd = List(['ssh'], config=True)
497 ssh_cmd = List(['ssh'], config=True)
498 ssh_args = List([], config=True)
498 ssh_args = List(['-tt'], config=True)
499 program = List(['date'], config=True)
499 program = List(['date'], config=True)
500 program_args = List([], config=True)
500 program_args = List([], config=True)
501 hostname = Str('', config=True)
501 hostname = Str('', config=True)
502 user = Str(os.environ.get('USER','username'), config=True)
502 user = Str(os.environ.get('USER','username'), config=True)
503 location = Str('')
503 location = Str('')
504
504
505 def _hostname_changed(self, name, old, new):
505 def _hostname_changed(self, name, old, new):
506 self.location = '%s@%s' % (self.user, new)
506 self.location = '%s@%s' % (self.user, new)
507
507
508 def _user_changed(self, name, old, new):
508 def _user_changed(self, name, old, new):
509 self.location = '%s@%s' % (new, self.hostname)
509 self.location = '%s@%s' % (new, self.hostname)
510
510
511 def find_args(self):
511 def find_args(self):
512 return self.ssh_cmd + self.ssh_args + [self.location] + \
512 return self.ssh_cmd + self.ssh_args + [self.location] + \
513 self.program + self.program_args
513 self.program + self.program_args
514
514
515 def start(self, cluster_dir, hostname=None, user=None):
515 def start(self, cluster_dir, hostname=None, user=None):
516 print self.config
516 if hostname is not None:
517 if hostname is not None:
517 self.hostname = hostname
518 self.hostname = hostname
518 if user is not None:
519 if user is not None:
519 self.user = user
520 self.user = user
521 print (self.location, hostname, user)
520 return super(SSHLauncher, self).start()
522 return super(SSHLauncher, self).start()
523
524 def signal(self, sig):
525 if self.state == 'running':
526 # send escaped ssh connection-closer
527 self.process.stdin.write('~.')
528 self.process.stdin.flush()
529
521
530
522
531
523 class SSHControllerLauncher(SSHLauncher):
532 class SSHControllerLauncher(SSHLauncher):
524
533
525 program = List(ipcontroller_cmd_argv, config=True)
534 program = List(ipcontroller_cmd_argv, config=True)
526 # Command line arguments to ipcontroller.
535 # Command line arguments to ipcontroller.
527 program_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
536 program_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
528
537
529
538
530 class SSHEngineLauncher(SSHLauncher):
539 class SSHEngineLauncher(SSHLauncher):
531 program = List(ipengine_cmd_argv, config=True)
540 program = List(ipengine_cmd_argv, config=True)
532 # Command line arguments for ipengine.
541 # Command line arguments for ipengine.
533 program_args = List(
542 program_args = List(
534 ['--log-to-file','--log-level', str(logging.INFO)], config=True
543 ['--log-to-file','--log-level', str(logging.INFO)], config=True
535 )
544 )
536
545
537 class SSHEngineSetLauncher(LocalEngineSetLauncher):
546 class SSHEngineSetLauncher(LocalEngineSetLauncher):
538 launcher_class = SSHEngineLauncher
547 launcher_class = SSHEngineLauncher
539
548
540
549
541 #-----------------------------------------------------------------------------
550 #-----------------------------------------------------------------------------
542 # Windows HPC Server 2008 scheduler launchers
551 # Windows HPC Server 2008 scheduler launchers
543 #-----------------------------------------------------------------------------
552 #-----------------------------------------------------------------------------
544
553
545
554
546 # This is only used on Windows.
555 # This is only used on Windows.
547 def find_job_cmd():
556 def find_job_cmd():
548 if os.name=='nt':
557 if os.name=='nt':
549 try:
558 try:
550 return find_cmd('job')
559 return find_cmd('job')
551 except FindCmdError:
560 except FindCmdError:
552 return 'job'
561 return 'job'
553 else:
562 else:
554 return 'job'
563 return 'job'
555
564
556
565
557 class WindowsHPCLauncher(BaseLauncher):
566 class WindowsHPCLauncher(BaseLauncher):
558
567
559 # A regular expression used to get the job id from the output of the
568 # A regular expression used to get the job id from the output of the
560 # submit_command.
569 # submit_command.
561 job_id_regexp = Str(r'\d+', config=True)
570 job_id_regexp = Str(r'\d+', config=True)
562 # The filename of the instantiated job script.
571 # The filename of the instantiated job script.
563 job_file_name = Unicode(u'ipython_job.xml', config=True)
572 job_file_name = Unicode(u'ipython_job.xml', config=True)
564 # The full path to the instantiated job script. This gets made dynamically
573 # The full path to the instantiated job script. This gets made dynamically
565 # by combining the work_dir with the job_file_name.
574 # by combining the work_dir with the job_file_name.
566 job_file = Unicode(u'')
575 job_file = Unicode(u'')
567 # The hostname of the scheduler to submit the job to
576 # The hostname of the scheduler to submit the job to
568 scheduler = Str('', config=True)
577 scheduler = Str('', config=True)
569 job_cmd = Str(find_job_cmd(), config=True)
578 job_cmd = Str(find_job_cmd(), config=True)
570
579
571 def __init__(self, work_dir=u'.', config=None):
580 def __init__(self, work_dir=u'.', config=None, **kwargs):
572 super(WindowsHPCLauncher, self).__init__(
581 super(WindowsHPCLauncher, self).__init__(
573 work_dir=work_dir, config=config
582 work_dir=work_dir, config=config, **kwargs
574 )
583 )
575
584
576 @property
585 @property
577 def job_file(self):
586 def job_file(self):
578 return os.path.join(self.work_dir, self.job_file_name)
587 return os.path.join(self.work_dir, self.job_file_name)
579
588
580 def write_job_file(self, n):
589 def write_job_file(self, n):
581 raise NotImplementedError("Implement write_job_file in a subclass.")
590 raise NotImplementedError("Implement write_job_file in a subclass.")
582
591
583 def find_args(self):
592 def find_args(self):
584 return ['job.exe']
593 return ['job.exe']
585
594
586 def parse_job_id(self, output):
595 def parse_job_id(self, output):
587 """Take the output of the submit command and return the job id."""
596 """Take the output of the submit command and return the job id."""
588 m = re.search(self.job_id_regexp, output)
597 m = re.search(self.job_id_regexp, output)
589 if m is not None:
598 if m is not None:
590 job_id = m.group()
599 job_id = m.group()
591 else:
600 else:
592 raise LauncherError("Job id couldn't be determined: %s" % output)
601 raise LauncherError("Job id couldn't be determined: %s" % output)
593 self.job_id = job_id
602 self.job_id = job_id
594 self.log.info('Job started with job id: %r' % job_id)
603 self.log.info('Job started with job id: %r' % job_id)
595 return job_id
604 return job_id
596
605
597 def start(self, n):
606 def start(self, n):
598 """Start n copies of the process using the Win HPC job scheduler."""
607 """Start n copies of the process using the Win HPC job scheduler."""
599 self.write_job_file(n)
608 self.write_job_file(n)
600 args = [
609 args = [
601 'submit',
610 'submit',
602 '/jobfile:%s' % self.job_file,
611 '/jobfile:%s' % self.job_file,
603 '/scheduler:%s' % self.scheduler
612 '/scheduler:%s' % self.scheduler
604 ]
613 ]
605 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
614 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
606 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
615 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
607 output = check_output([self.job_cmd]+args,
616 output = check_output([self.job_cmd]+args,
608 env=os.environ,
617 env=os.environ,
609 cwd=self.work_dir,
618 cwd=self.work_dir,
610 stderr=STDOUT
619 stderr=STDOUT
611 )
620 )
612 job_id = self.parse_job_id(output)
621 job_id = self.parse_job_id(output)
613 # self.notify_start(job_id)
622 # self.notify_start(job_id)
614 return job_id
623 return job_id
615
624
616 def stop(self):
625 def stop(self):
617 args = [
626 args = [
618 'cancel',
627 'cancel',
619 self.job_id,
628 self.job_id,
620 '/scheduler:%s' % self.scheduler
629 '/scheduler:%s' % self.scheduler
621 ]
630 ]
622 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
631 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
623 try:
632 try:
624 output = check_output([self.job_cmd]+args,
633 output = check_output([self.job_cmd]+args,
625 env=os.environ,
634 env=os.environ,
626 cwd=self.work_dir,
635 cwd=self.work_dir,
627 stderr=STDOUT
636 stderr=STDOUT
628 )
637 )
629 except:
638 except:
630 output = 'The job already appears to be stoppped: %r' % self.job_id
639 output = 'The job already appears to be stoppped: %r' % self.job_id
631 self.notify_stop(output) # Pass the output of the kill cmd
640 self.notify_stop(output) # Pass the output of the kill cmd
632 return output
641 return output
633
642
634
643
635 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
644 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
636
645
637 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
646 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
638 extra_args = List([], config=False)
647 extra_args = List([], config=False)
639
648
640 def write_job_file(self, n):
649 def write_job_file(self, n):
641 job = IPControllerJob(config=self.config)
650 job = IPControllerJob(config=self.config)
642
651
643 t = IPControllerTask(config=self.config)
652 t = IPControllerTask(config=self.config)
644 # The tasks work directory is *not* the actual work directory of
653 # The tasks work directory is *not* the actual work directory of
645 # the controller. It is used as the base path for the stdout/stderr
654 # the controller. It is used as the base path for the stdout/stderr
646 # files that the scheduler redirects to.
655 # files that the scheduler redirects to.
647 t.work_directory = self.cluster_dir
656 t.work_directory = self.cluster_dir
648 # Add the --cluster-dir and from self.start().
657 # Add the --cluster-dir and from self.start().
649 t.controller_args.extend(self.extra_args)
658 t.controller_args.extend(self.extra_args)
650 job.add_task(t)
659 job.add_task(t)
651
660
652 self.log.info("Writing job description file: %s" % self.job_file)
661 self.log.info("Writing job description file: %s" % self.job_file)
653 job.write(self.job_file)
662 job.write(self.job_file)
654
663
655 @property
664 @property
656 def job_file(self):
665 def job_file(self):
657 return os.path.join(self.cluster_dir, self.job_file_name)
666 return os.path.join(self.cluster_dir, self.job_file_name)
658
667
659 def start(self, cluster_dir):
668 def start(self, cluster_dir):
660 """Start the controller by cluster_dir."""
669 """Start the controller by cluster_dir."""
661 self.extra_args = ['--cluster-dir', cluster_dir]
670 self.extra_args = ['--cluster-dir', cluster_dir]
662 self.cluster_dir = unicode(cluster_dir)
671 self.cluster_dir = unicode(cluster_dir)
663 return super(WindowsHPCControllerLauncher, self).start(1)
672 return super(WindowsHPCControllerLauncher, self).start(1)
664
673
665
674
666 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
675 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
667
676
668 job_file_name = Unicode(u'ipengineset_job.xml', config=True)
677 job_file_name = Unicode(u'ipengineset_job.xml', config=True)
669 extra_args = List([], config=False)
678 extra_args = List([], config=False)
670
679
671 def write_job_file(self, n):
680 def write_job_file(self, n):
672 job = IPEngineSetJob(config=self.config)
681 job = IPEngineSetJob(config=self.config)
673
682
674 for i in range(n):
683 for i in range(n):
675 t = IPEngineTask(config=self.config)
684 t = IPEngineTask(config=self.config)
676 # The tasks work directory is *not* the actual work directory of
685 # The tasks work directory is *not* the actual work directory of
677 # the engine. It is used as the base path for the stdout/stderr
686 # the engine. It is used as the base path for the stdout/stderr
678 # files that the scheduler redirects to.
687 # files that the scheduler redirects to.
679 t.work_directory = self.cluster_dir
688 t.work_directory = self.cluster_dir
680 # Add the --cluster-dir and from self.start().
689 # Add the --cluster-dir and from self.start().
681 t.engine_args.extend(self.extra_args)
690 t.engine_args.extend(self.extra_args)
682 job.add_task(t)
691 job.add_task(t)
683
692
684 self.log.info("Writing job description file: %s" % self.job_file)
693 self.log.info("Writing job description file: %s" % self.job_file)
685 job.write(self.job_file)
694 job.write(self.job_file)
686
695
687 @property
696 @property
688 def job_file(self):
697 def job_file(self):
689 return os.path.join(self.cluster_dir, self.job_file_name)
698 return os.path.join(self.cluster_dir, self.job_file_name)
690
699
691 def start(self, n, cluster_dir):
700 def start(self, n, cluster_dir):
692 """Start the controller by cluster_dir."""
701 """Start the controller by cluster_dir."""
693 self.extra_args = ['--cluster-dir', cluster_dir]
702 self.extra_args = ['--cluster-dir', cluster_dir]
694 self.cluster_dir = unicode(cluster_dir)
703 self.cluster_dir = unicode(cluster_dir)
695 return super(WindowsHPCEngineSetLauncher, self).start(n)
704 return super(WindowsHPCEngineSetLauncher, self).start(n)
696
705
697
706
698 #-----------------------------------------------------------------------------
707 #-----------------------------------------------------------------------------
699 # Batch (PBS) system launchers
708 # Batch (PBS) system launchers
700 #-----------------------------------------------------------------------------
709 #-----------------------------------------------------------------------------
701
710
702 # TODO: Get PBS launcher working again.
711 # TODO: Get PBS launcher working again.
703
712
704 class BatchSystemLauncher(BaseLauncher):
713 class BatchSystemLauncher(BaseLauncher):
705 """Launch an external process using a batch system.
714 """Launch an external process using a batch system.
706
715
707 This class is designed to work with UNIX batch systems like PBS, LSF,
716 This class is designed to work with UNIX batch systems like PBS, LSF,
708 GridEngine, etc. The overall model is that there are different commands
717 GridEngine, etc. The overall model is that there are different commands
709 like qsub, qdel, etc. that handle the starting and stopping of the process.
718 like qsub, qdel, etc. that handle the starting and stopping of the process.
710
719
711 This class also has the notion of a batch script. The ``batch_template``
720 This class also has the notion of a batch script. The ``batch_template``
712 attribute can be set to a string that is a template for the batch script.
721 attribute can be set to a string that is a template for the batch script.
713 This template is instantiated using Itpl. Thus the template can use
722 This template is instantiated using Itpl. Thus the template can use
714 ${n} fot the number of instances. Subclasses can add additional variables
723 ${n} fot the number of instances. Subclasses can add additional variables
715 to the template dict.
724 to the template dict.
716 """
725 """
717
726
718 # Subclasses must fill these in. See PBSEngineSet
727 # Subclasses must fill these in. See PBSEngineSet
719 # The name of the command line program used to submit jobs.
728 # The name of the command line program used to submit jobs.
720 submit_command = Str('', config=True)
729 submit_command = Str('', config=True)
721 # The name of the command line program used to delete jobs.
730 # The name of the command line program used to delete jobs.
722 delete_command = Str('', config=True)
731 delete_command = Str('', config=True)
723 # A regular expression used to get the job id from the output of the
732 # A regular expression used to get the job id from the output of the
724 # submit_command.
733 # submit_command.
725 job_id_regexp = Str('', config=True)
734 job_id_regexp = Str('', config=True)
726 # The string that is the batch script template itself.
735 # The string that is the batch script template itself.
727 batch_template = Str('', config=True)
736 batch_template = Str('', config=True)
728 # The filename of the instantiated batch script.
737 # The filename of the instantiated batch script.
729 batch_file_name = Unicode(u'batch_script', config=True)
738 batch_file_name = Unicode(u'batch_script', config=True)
730 # The full path to the instantiated batch script.
739 # The full path to the instantiated batch script.
731 batch_file = Unicode(u'')
740 batch_file = Unicode(u'')
732
741
733 def __init__(self, work_dir=u'.', config=None):
742 def __init__(self, work_dir=u'.', config=None, **kwargs):
734 super(BatchSystemLauncher, self).__init__(
743 super(BatchSystemLauncher, self).__init__(
735 work_dir=work_dir, config=config
744 work_dir=work_dir, config=config, **kwargs
736 )
745 )
737 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
746 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
738 self.context = {}
747 self.context = {}
739
748
740 def parse_job_id(self, output):
749 def parse_job_id(self, output):
741 """Take the output of the submit command and return the job id."""
750 """Take the output of the submit command and return the job id."""
742 m = re.match(self.job_id_regexp, output)
751 m = re.match(self.job_id_regexp, output)
743 if m is not None:
752 if m is not None:
744 job_id = m.group()
753 job_id = m.group()
745 else:
754 else:
746 raise LauncherError("Job id couldn't be determined: %s" % output)
755 raise LauncherError("Job id couldn't be determined: %s" % output)
747 self.job_id = job_id
756 self.job_id = job_id
748 self.log.info('Job started with job id: %r' % job_id)
757 self.log.info('Job started with job id: %r' % job_id)
749 return job_id
758 return job_id
750
759
751 def write_batch_script(self, n):
760 def write_batch_script(self, n):
752 """Instantiate and write the batch script to the work_dir."""
761 """Instantiate and write the batch script to the work_dir."""
753 self.context['n'] = n
762 self.context['n'] = n
754 script_as_string = Itpl.itplns(self.batch_template, self.context)
763 script_as_string = Itpl.itplns(self.batch_template, self.context)
755 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
764 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
756 f = open(self.batch_file, 'w')
765 f = open(self.batch_file, 'w')
757 f.write(script_as_string)
766 f.write(script_as_string)
758 f.close()
767 f.close()
759
768
760 def start(self, n):
769 def start(self, n):
761 """Start n copies of the process using a batch system."""
770 """Start n copies of the process using a batch system."""
762 self.write_batch_script(n)
771 self.write_batch_script(n)
763 output = check_output([self.submit_command, self.batch_file], env=os.environ, stdout=STDOUT)
772 output = check_output([self.submit_command, self.batch_file], env=os.environ, stdout=STDOUT)
764 job_id = self.parse_job_id(output)
773 job_id = self.parse_job_id(output)
765 # self.notify_start(job_id)
774 # self.notify_start(job_id)
766 return job_id
775 return job_id
767
776
768 def stop(self):
777 def stop(self):
769 output = Popen([self.delete_command, self.job_id], env=os.environ, stderr=STDOUT)
778 output = check_output([self.delete_command, self.job_id], env=os.environ, stderr=STDOUT)
770 self.notify_stop(output) # Pass the output of the kill cmd
779 self.notify_stop(output) # Pass the output of the kill cmd
771 return output
780 return output
772
781
773
782
774 class PBSLauncher(BatchSystemLauncher):
783 class PBSLauncher(BatchSystemLauncher):
775 """A BatchSystemLauncher subclass for PBS."""
784 """A BatchSystemLauncher subclass for PBS."""
776
785
777 submit_command = Str('qsub', config=True)
786 submit_command = Str('qsub', config=True)
778 delete_command = Str('qdel', config=True)
787 delete_command = Str('qdel', config=True)
779 job_id_regexp = Str(r'\d+', config=True)
788 job_id_regexp = Str(r'\d+', config=True)
780 batch_template = Str('', config=True)
789 batch_template = Str('', config=True)
781 batch_file_name = Unicode(u'pbs_batch_script', config=True)
790 batch_file_name = Unicode(u'pbs_batch_script', config=True)
782 batch_file = Unicode(u'')
791 batch_file = Unicode(u'')
783
792
784
793
785 class PBSControllerLauncher(PBSLauncher):
794 class PBSControllerLauncher(PBSLauncher):
786 """Launch a controller using PBS."""
795 """Launch a controller using PBS."""
787
796
788 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
797 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
789
798
790 def start(self, cluster_dir):
799 def start(self, cluster_dir):
791 """Start the controller by profile or cluster_dir."""
800 """Start the controller by profile or cluster_dir."""
792 # Here we save profile and cluster_dir in the context so they
801 # Here we save profile and cluster_dir in the context so they
793 # can be used in the batch script template as ${profile} and
802 # can be used in the batch script template as ${profile} and
794 # ${cluster_dir}
803 # ${cluster_dir}
795 self.context['cluster_dir'] = cluster_dir
804 self.context['cluster_dir'] = cluster_dir
796 self.cluster_dir = unicode(cluster_dir)
805 self.cluster_dir = unicode(cluster_dir)
797 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
806 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
798 return super(PBSControllerLauncher, self).start(1)
807 return super(PBSControllerLauncher, self).start(1)
799
808
800
809
801 class PBSEngineSetLauncher(PBSLauncher):
810 class PBSEngineSetLauncher(PBSLauncher):
802
811
803 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
812 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
804
813
805 def start(self, n, cluster_dir):
814 def start(self, n, cluster_dir):
806 """Start n engines by profile or cluster_dir."""
815 """Start n engines by profile or cluster_dir."""
807 self.program_args.extend(['--cluster-dir', cluster_dir])
816 self.program_args.extend(['--cluster-dir', cluster_dir])
808 self.cluster_dir = unicode(cluster_dir)
817 self.cluster_dir = unicode(cluster_dir)
809 self.log.info('Starting PBSEngineSetLauncher: %r' % self.args)
818 self.log.info('Starting PBSEngineSetLauncher: %r' % self.args)
810 return super(PBSEngineSetLauncher, self).start(n)
819 return super(PBSEngineSetLauncher, self).start(n)
811
820
812
821
813 #-----------------------------------------------------------------------------
822 #-----------------------------------------------------------------------------
814 # A launcher for ipcluster itself!
823 # A launcher for ipcluster itself!
815 #-----------------------------------------------------------------------------
824 #-----------------------------------------------------------------------------
816
825
817
826
818 class IPClusterLauncher(LocalProcessLauncher):
827 class IPClusterLauncher(LocalProcessLauncher):
819 """Launch the ipcluster program in an external process."""
828 """Launch the ipcluster program in an external process."""
820
829
821 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
830 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
822 # Command line arguments to pass to ipcluster.
831 # Command line arguments to pass to ipcluster.
823 ipcluster_args = List(
832 ipcluster_args = List(
824 ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True)
833 ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True)
825 ipcluster_subcommand = Str('start')
834 ipcluster_subcommand = Str('start')
826 ipcluster_n = Int(2)
835 ipcluster_n = Int(2)
827
836
828 def find_args(self):
837 def find_args(self):
829 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
838 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
830 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
839 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
831
840
832 def start(self):
841 def start(self):
833 self.log.info("Starting ipcluster: %r" % self.args)
842 self.log.info("Starting ipcluster: %r" % self.args)
834 return super(IPClusterLauncher, self).start()
843 return super(IPClusterLauncher, self).start()
835
844
General Comments 0
You need to be logged in to leave comments. Login now