##// END OF EJS Templates
launcher updates for PBS
MinRK -
Show More
@@ -1,184 +1,192
1 1 import os
2 2
3 3 c = get_config()
4 4
5 5 #-----------------------------------------------------------------------------
6 6 # Select which launchers to use
7 7 #-----------------------------------------------------------------------------
8 8
9 9 # This allows you to control what method is used to start the controller
10 10 # and engines. The following methods are currently supported:
11 11 # - Start as a regular process on localhost.
12 12 # - Start using mpiexec.
13 13 # - Start using the Windows HPC Server 2008 scheduler
14 14 # - Start using PBS
15 15 # - Start using SSH
16 16
17 17
18 18 # The selected launchers can be configured below.
19 19
20 20 # Options are:
21 21 # - LocalControllerLauncher
22 22 # - MPIExecControllerLauncher
23 23 # - PBSControllerLauncher
24 24 # - WindowsHPCControllerLauncher
25 25 # c.Global.controller_launcher = 'IPython.zmq.parallel.launcher.LocalControllerLauncher'
26 26
27 27 # Options are:
28 28 # - LocalEngineSetLauncher
29 29 # - MPIExecEngineSetLauncher
30 30 # - PBSEngineSetLauncher
31 31 # - WindowsHPCEngineSetLauncher
32 32 # c.Global.engine_launcher = 'IPython.zmq.parallel.launcher.LocalEngineSetLauncher'
33 33
34 34 #-----------------------------------------------------------------------------
35 35 # Global configuration
36 36 #-----------------------------------------------------------------------------
37 37
38 38 # The default number of engines that will be started. This is overridden by
39 39 # the -n command line option: "ipcluster start -n 4"
40 40 # c.Global.n = 2
41 41
42 42 # Log to a file in cluster_dir/log, otherwise just log to sys.stdout.
43 43 # c.Global.log_to_file = False
44 44
45 45 # Remove old logs from cluster_dir/log before starting.
46 46 # c.Global.clean_logs = True
47 47
48 48 # The working directory for the process. The application will use os.chdir
49 49 # to change to this directory before starting.
50 50 # c.Global.work_dir = os.getcwd()
51 51
52 52
53 53 #-----------------------------------------------------------------------------
54 54 # Local process launchers
55 55 #-----------------------------------------------------------------------------
56 56
57 57 # The command line arguments to call the controller with.
58 58 # c.LocalControllerLauncher.controller_args = \
59 59 # ['--log-to-file','--log-level', '40']
60 60
61 61 # The working directory for the controller
62 62 # c.LocalEngineSetLauncher.work_dir = u''
63 63
64 64 # Command line argument passed to the engines.
65 65 # c.LocalEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
66 66
67 67 #-----------------------------------------------------------------------------
68 68 # MPIExec launchers
69 69 #-----------------------------------------------------------------------------
70 70
71 71 # The mpiexec/mpirun command to use in started the controller.
72 72 # c.MPIExecControllerLauncher.mpi_cmd = ['mpiexec']
73 73
74 74 # Additional arguments to pass to the actual mpiexec command.
75 75 # c.MPIExecControllerLauncher.mpi_args = []
76 76
77 77 # The command line argument to call the controller with.
78 78 # c.MPIExecControllerLauncher.controller_args = \
79 79 # ['--log-to-file','--log-level', '40']
80 80
81 81
82 82 # The mpiexec/mpirun command to use in started the controller.
83 83 # c.MPIExecEngineSetLauncher.mpi_cmd = ['mpiexec']
84 84
85 85 # Additional arguments to pass to the actual mpiexec command.
86 86 # c.MPIExecEngineSetLauncher.mpi_args = []
87 87
88 88 # Command line argument passed to the engines.
89 89 # c.MPIExecEngineSetLauncher.engine_args = ['--log-to-file','--log-level', '40']
90 90
91 91 # The default number of engines to start if not given elsewhere.
92 92 # c.MPIExecEngineSetLauncher.n = 1
93 93
94 94 #-----------------------------------------------------------------------------
95 95 # SSH launchers
96 96 #-----------------------------------------------------------------------------
97 97
98 98 # Todo
99 99
100 100
101 101 #-----------------------------------------------------------------------------
102 102 # Unix batch (PBS) schedulers launchers
103 103 #-----------------------------------------------------------------------------
104 104
105 105 # The command line program to use to submit a PBS job.
106 106 # c.PBSControllerLauncher.submit_command = 'qsub'
107 107
108 108 # The command line program to use to delete a PBS job.
109 109 # c.PBSControllerLauncher.delete_command = 'qdel'
110 110
111 111 # A regular expression that takes the output of qsub and find the job id.
112 112 # c.PBSControllerLauncher.job_id_regexp = r'\d+'
113 113
114 114 # The batch submission script used to start the controller. This is where
115 115 # environment variables would be setup, etc. This string is interpolated using
116 116 # the Itpl module in IPython.external. Basically, you can use ${n} for the
117 117 # number of engine and ${cluster_dir} for the cluster_dir.
118 # c.PBSControllerLauncher.batch_template = """"""
118 # c.PBSControllerLauncher.batch_template = """
119 # #PBS -l nprocs=$n
120 #
121 # ipcontrollerz --cluster-dir $cluster_dir
122 # """
119 123
120 124 # The name of the instantiated batch script that will actually be used to
121 125 # submit the job. This will be written to the cluster directory.
122 126 # c.PBSControllerLauncher.batch_file_name = u'pbs_batch_script_controller'
123 127
124 128
125 129 # The command line program to use to submit a PBS job.
126 130 # c.PBSEngineSetLauncher.submit_command = 'qsub'
127 131
128 132 # The command line program to use to delete a PBS job.
129 133 # c.PBSEngineSetLauncher.delete_command = 'qdel'
130 134
131 135 # A regular expression that takes the output of qsub and find the job id.
132 136 # c.PBSEngineSetLauncher.job_id_regexp = r'\d+'
133 137
134 138 # The batch submission script used to start the engines. This is where
135 139 # environment variables would be setup, etc. This string is interpolated using
136 140 # the Itpl module in IPython.external. Basically, you can use ${n} for the
137 141 # number of engine and ${cluster_dir} for the cluster_dir.
138 # c.PBSEngineSetLauncher.batch_template = """"""
142 # c.PBSEngineSetLauncher.batch_template = """
143 # #PBS -l nprocs=$n
144 #
145 # ipenginez --cluster-dir $cluster_dir$s
146 # """
139 147
140 148 # The name of the instantiated batch script that will actually be used to
141 149 # submit the job. This will be written to the cluster directory.
142 150 # c.PBSEngineSetLauncher.batch_file_name = u'pbs_batch_script_engines'
143 151
144 152 #-----------------------------------------------------------------------------
145 153 # Windows HPC Server 2008 launcher configuration
146 154 #-----------------------------------------------------------------------------
147 155
148 156 # c.IPControllerJob.job_name = 'IPController'
149 157 # c.IPControllerJob.is_exclusive = False
150 158 # c.IPControllerJob.username = r'USERDOMAIN\USERNAME'
151 159 # c.IPControllerJob.priority = 'Highest'
152 160 # c.IPControllerJob.requested_nodes = ''
153 161 # c.IPControllerJob.project = 'MyProject'
154 162
155 163 # c.IPControllerTask.task_name = 'IPController'
156 164 # c.IPControllerTask.controller_cmd = [u'ipcontroller.exe']
157 165 # c.IPControllerTask.controller_args = ['--log-to-file', '--log-level', '40']
158 166 # c.IPControllerTask.environment_variables = {}
159 167
160 168 # c.WindowsHPCControllerLauncher.scheduler = 'HEADNODE'
161 169 # c.WindowsHPCControllerLauncher.job_file_name = u'ipcontroller_job.xml'
162 170
163 171
164 172 # c.IPEngineSetJob.job_name = 'IPEngineSet'
165 173 # c.IPEngineSetJob.is_exclusive = False
166 174 # c.IPEngineSetJob.username = r'USERDOMAIN\USERNAME'
167 175 # c.IPEngineSetJob.priority = 'Highest'
168 176 # c.IPEngineSetJob.requested_nodes = ''
169 177 # c.IPEngineSetJob.project = 'MyProject'
170 178
171 179 # c.IPEngineTask.task_name = 'IPEngine'
172 180 # c.IPEngineTask.engine_cmd = [u'ipengine.exe']
173 181 # c.IPEngineTask.engine_args = ['--log-to-file', '--log-level', '40']
174 182 # c.IPEngineTask.environment_variables = {}
175 183
176 184 # c.WindowsHPCEngineSetLauncher.scheduler = 'HEADNODE'
177 185 # c.WindowsHPCEngineSetLauncher.job_file_name = u'ipengineset_job.xml'
178 186
179 187
180 188
181 189
182 190
183 191
184 192
@@ -1,592 +1,592
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The ipcluster application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import errno
19 19 import logging
20 20 import os
21 21 import re
22 22 import signal
23 23
24 24 import zmq
25 25 from zmq.eventloop import ioloop
26 26
27 27 from IPython.external.argparse import ArgumentParser, SUPPRESS
28 28 from IPython.utils.importstring import import_item
29 29 from IPython.zmq.parallel.clusterdir import (
30 30 ApplicationWithClusterDir, ClusterDirConfigLoader,
31 31 ClusterDirError, PIDFileError
32 32 )
33 33
34 34
35 35 #-----------------------------------------------------------------------------
36 36 # Module level variables
37 37 #-----------------------------------------------------------------------------
38 38
39 39
40 40 default_config_file_name = u'ipclusterz_config.py'
41 41
42 42
43 43 _description = """\
44 44 Start an IPython cluster for parallel computing.\n\n
45 45
46 46 An IPython cluster consists of 1 controller and 1 or more engines.
47 47 This command automates the startup of these processes using a wide
48 48 range of startup methods (SSH, local processes, PBS, mpiexec,
49 49 Windows HPC Server 2008). To start a cluster with 4 engines on your
50 50 local host simply do 'ipclusterz start -n 4'. For more complex usage
51 51 you will typically do 'ipclusterz create -p mycluster', then edit
52 52 configuration files, followed by 'ipclusterz start -p mycluster -n 4'.
53 53 """
54 54
55 55
56 56 # Exit codes for ipcluster
57 57
58 58 # This will be the exit code if the ipcluster appears to be running because
59 59 # a .pid file exists
60 60 ALREADY_STARTED = 10
61 61
62 62
63 63 # This will be the exit code if ipcluster stop is run, but there is not .pid
64 64 # file to be found.
65 65 ALREADY_STOPPED = 11
66 66
67 67 # This will be the exit code if ipcluster engines is run, but there is not .pid
68 68 # file to be found.
69 69 NO_CLUSTER = 12
70 70
71 71
72 72 #-----------------------------------------------------------------------------
73 73 # Command line options
74 74 #-----------------------------------------------------------------------------
75 75
76 76
77 77 class IPClusterAppConfigLoader(ClusterDirConfigLoader):
78 78
79 79 def _add_arguments(self):
80 80 # Don't call ClusterDirConfigLoader._add_arguments as we don't want
81 81 # its defaults on self.parser. Instead, we will put those on
82 82 # default options on our subparsers.
83 83
84 84 # This has all the common options that all subcommands use
85 85 parent_parser1 = ArgumentParser(
86 86 add_help=False,
87 87 argument_default=SUPPRESS
88 88 )
89 89 self._add_ipython_dir(parent_parser1)
90 90 self._add_log_level(parent_parser1)
91 91
92 92 # This has all the common options that other subcommands use
93 93 parent_parser2 = ArgumentParser(
94 94 add_help=False,
95 95 argument_default=SUPPRESS
96 96 )
97 97 self._add_cluster_profile(parent_parser2)
98 98 self._add_cluster_dir(parent_parser2)
99 99 self._add_work_dir(parent_parser2)
100 100 paa = parent_parser2.add_argument
101 101 paa('--log-to-file',
102 102 action='store_true', dest='Global.log_to_file',
103 103 help='Log to a file in the log directory (default is stdout)')
104 104
105 105 # Create the object used to create the subparsers.
106 106 subparsers = self.parser.add_subparsers(
107 107 dest='Global.subcommand',
108 108 title='ipcluster subcommands',
109 109 description=
110 110 """ipcluster has a variety of subcommands. The general way of
111 111 running ipcluster is 'ipclusterz <cmd> [options]'. To get help
112 112 on a particular subcommand do 'ipclusterz <cmd> -h'."""
113 113 # help="For more help, type 'ipclusterz <cmd> -h'",
114 114 )
115 115
116 116 # The "list" subcommand parser
117 117 parser_list = subparsers.add_parser(
118 118 'list',
119 119 parents=[parent_parser1],
120 120 argument_default=SUPPRESS,
121 121 help="List all clusters in cwd and ipython_dir.",
122 122 description=
123 123 """List all available clusters, by cluster directory, that can
124 124 be found in the current working directly or in the ipython
125 125 directory. Cluster directories are named using the convention
126 'cluster_<profile>'."""
126 'clusterz_<profile>'."""
127 127 )
128 128
129 129 # The "create" subcommand parser
130 130 parser_create = subparsers.add_parser(
131 131 'create',
132 132 parents=[parent_parser1, parent_parser2],
133 133 argument_default=SUPPRESS,
134 134 help="Create a new cluster directory.",
135 135 description=
136 136 """Create an ipython cluster directory by its profile name or
137 137 cluster directory path. Cluster directories contain
138 138 configuration, log and security related files and are named
139 using the convention 'cluster_<profile>'. By default they are
139 using the convention 'clusterz_<profile>'. By default they are
140 140 located in your ipython directory. Once created, you will
141 141 probably need to edit the configuration files in the cluster
142 142 directory to configure your cluster. Most users will create a
143 143 cluster directory by profile name,
144 144 'ipclusterz create -p mycluster', which will put the directory
145 in '<ipython_dir>/cluster_mycluster'.
145 in '<ipython_dir>/clusterz_mycluster'.
146 146 """
147 147 )
148 148 paa = parser_create.add_argument
149 149 paa('--reset-config',
150 150 dest='Global.reset_config', action='store_true',
151 151 help=
152 152 """Recopy the default config files to the cluster directory.
153 153 You will loose any modifications you have made to these files.""")
154 154
155 155 # The "start" subcommand parser
156 156 parser_start = subparsers.add_parser(
157 157 'start',
158 158 parents=[parent_parser1, parent_parser2],
159 159 argument_default=SUPPRESS,
160 160 help="Start a cluster.",
161 161 description=
162 162 """Start an ipython cluster by its profile name or cluster
163 163 directory. Cluster directories contain configuration, log and
164 164 security related files and are named using the convention
165 'cluster_<profile>' and should be creating using the 'start'
165 'clusterz_<profile>' and should be creating using the 'start'
166 166 subcommand of 'ipcluster'. If your cluster directory is in
167 167 the cwd or the ipython directory, you can simply refer to it
168 168 using its profile name, 'ipclusterz start -n 4 -p <profile>`,
169 169 otherwise use the '--cluster-dir' option.
170 170 """
171 171 )
172 172
173 173 paa = parser_start.add_argument
174 174 paa('-n', '--number',
175 175 type=int, dest='Global.n',
176 176 help='The number of engines to start.',
177 177 metavar='Global.n')
178 178 paa('--clean-logs',
179 179 dest='Global.clean_logs', action='store_true',
180 180 help='Delete old log flies before starting.')
181 181 paa('--no-clean-logs',
182 182 dest='Global.clean_logs', action='store_false',
183 183 help="Don't delete old log flies before starting.")
184 184 paa('--daemon',
185 185 dest='Global.daemonize', action='store_true',
186 186 help='Daemonize the ipcluster program. This implies --log-to-file')
187 187 paa('--no-daemon',
188 188 dest='Global.daemonize', action='store_false',
189 189 help="Dont't daemonize the ipcluster program.")
190 190 paa('--delay',
191 191 type=float, dest='Global.delay',
192 192 help="Specify the delay (in seconds) between starting the controller and starting the engine(s).")
193 193
194 194 # The "stop" subcommand parser
195 195 parser_stop = subparsers.add_parser(
196 196 'stop',
197 197 parents=[parent_parser1, parent_parser2],
198 198 argument_default=SUPPRESS,
199 199 help="Stop a running cluster.",
200 200 description=
201 201 """Stop a running ipython cluster by its profile name or cluster
202 202 directory. Cluster directories are named using the convention
203 'cluster_<profile>'. If your cluster directory is in
203 'clusterz_<profile>'. If your cluster directory is in
204 204 the cwd or the ipython directory, you can simply refer to it
205 205 using its profile name, 'ipclusterz stop -p <profile>`, otherwise
206 206 use the '--cluster-dir' option.
207 207 """
208 208 )
209 209 paa = parser_stop.add_argument
210 210 paa('--signal',
211 211 dest='Global.signal', type=int,
212 212 help="The signal number to use in stopping the cluster (default=2).",
213 213 metavar="Global.signal")
214 214
215 215 # the "engines" subcommand parser
216 216 parser_engines = subparsers.add_parser(
217 217 'engines',
218 218 parents=[parent_parser1, parent_parser2],
219 219 argument_default=SUPPRESS,
220 220 help="Attach some engines to an existing controller or cluster.",
221 221 description=
222 222 """Start one or more engines to connect to an existing Cluster
223 223 by profile name or cluster directory.
224 224 Cluster directories contain configuration, log and
225 225 security related files and are named using the convention
226 'cluster_<profile>' and should be creating using the 'start'
226 'clusterz_<profile>' and should be creating using the 'start'
227 227 subcommand of 'ipcluster'. If your cluster directory is in
228 228 the cwd or the ipython directory, you can simply refer to it
229 229 using its profile name, 'ipclusterz engines -n 4 -p <profile>`,
230 230 otherwise use the '--cluster-dir' option.
231 231 """
232 232 )
233 233 paa = parser_engines.add_argument
234 234 paa('-n', '--number',
235 235 type=int, dest='Global.n',
236 236 help='The number of engines to start.',
237 237 metavar='Global.n')
238 238 paa('--daemon',
239 239 dest='Global.daemonize', action='store_true',
240 240 help='Daemonize the ipcluster program. This implies --log-to-file')
241 241 paa('--no-daemon',
242 242 dest='Global.daemonize', action='store_false',
243 243 help="Dont't daemonize the ipcluster program.")
244 244
245 245 #-----------------------------------------------------------------------------
246 246 # Main application
247 247 #-----------------------------------------------------------------------------
248 248
249 249
250 250 class IPClusterApp(ApplicationWithClusterDir):
251 251
252 252 name = u'ipclusterz'
253 253 description = _description
254 254 usage = None
255 255 command_line_loader = IPClusterAppConfigLoader
256 256 default_config_file_name = default_config_file_name
257 257 default_log_level = logging.INFO
258 258 auto_create_cluster_dir = False
259 259
260 260 def create_default_config(self):
261 261 super(IPClusterApp, self).create_default_config()
262 262 self.default_config.Global.controller_launcher = \
263 263 'IPython.zmq.parallel.launcher.LocalControllerLauncher'
264 264 self.default_config.Global.engine_launcher = \
265 265 'IPython.zmq.parallel.launcher.LocalEngineSetLauncher'
266 266 self.default_config.Global.n = 2
267 267 self.default_config.Global.delay = 2
268 268 self.default_config.Global.reset_config = False
269 269 self.default_config.Global.clean_logs = True
270 270 self.default_config.Global.signal = signal.SIGINT
271 271 self.default_config.Global.daemonize = False
272 272
273 273 def find_resources(self):
274 274 subcommand = self.command_line_config.Global.subcommand
275 275 if subcommand=='list':
276 276 self.list_cluster_dirs()
277 277 # Exit immediately because there is nothing left to do.
278 278 self.exit()
279 279 elif subcommand=='create':
280 280 self.auto_create_cluster_dir = True
281 281 super(IPClusterApp, self).find_resources()
282 282 elif subcommand=='start' or subcommand=='stop':
283 283 self.auto_create_cluster_dir = True
284 284 try:
285 285 super(IPClusterApp, self).find_resources()
286 286 except ClusterDirError:
287 287 raise ClusterDirError(
288 288 "Could not find a cluster directory. A cluster dir must "
289 289 "be created before running 'ipclusterz start'. Do "
290 290 "'ipclusterz create -h' or 'ipclusterz list -h' for more "
291 291 "information about creating and listing cluster dirs."
292 292 )
293 293 elif subcommand=='engines':
294 294 self.auto_create_cluster_dir = False
295 295 try:
296 296 super(IPClusterApp, self).find_resources()
297 297 except ClusterDirError:
298 298 raise ClusterDirError(
299 299 "Could not find a cluster directory. A cluster dir must "
300 300 "be created before running 'ipclusterz start'. Do "
301 301 "'ipclusterz create -h' or 'ipclusterz list -h' for more "
302 302 "information about creating and listing cluster dirs."
303 303 )
304 304
305 305 def list_cluster_dirs(self):
306 306 # Find the search paths
307 307 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
308 308 if cluster_dir_paths:
309 309 cluster_dir_paths = cluster_dir_paths.split(':')
310 310 else:
311 311 cluster_dir_paths = []
312 312 try:
313 313 ipython_dir = self.command_line_config.Global.ipython_dir
314 314 except AttributeError:
315 315 ipython_dir = self.default_config.Global.ipython_dir
316 316 paths = [os.getcwd(), ipython_dir] + \
317 317 cluster_dir_paths
318 318 paths = list(set(paths))
319 319
320 320 self.log.info('Searching for cluster dirs in paths: %r' % paths)
321 321 for path in paths:
322 322 files = os.listdir(path)
323 323 for f in files:
324 324 full_path = os.path.join(path, f)
325 if os.path.isdir(full_path) and f.startswith('cluster_'):
325 if os.path.isdir(full_path) and f.startswith('clusterz_'):
326 326 profile = full_path.split('_')[-1]
327 327 start_cmd = 'ipclusterz start -p %s -n 4' % profile
328 328 print start_cmd + " ==> " + full_path
329 329
330 330 def pre_construct(self):
331 331 # IPClusterApp.pre_construct() is where we cd to the working directory.
332 332 super(IPClusterApp, self).pre_construct()
333 333 config = self.master_config
334 334 try:
335 335 daemon = config.Global.daemonize
336 336 if daemon:
337 337 config.Global.log_to_file = True
338 338 except AttributeError:
339 339 pass
340 340
341 341 def construct(self):
342 342 config = self.master_config
343 343 subcmd = config.Global.subcommand
344 344 reset = config.Global.reset_config
345 345 if subcmd == 'list':
346 346 return
347 347 if subcmd == 'create':
348 348 self.log.info('Copying default config files to cluster directory '
349 349 '[overwrite=%r]' % (reset,))
350 350 self.cluster_dir_obj.copy_all_config_files(overwrite=reset)
351 351 if subcmd =='start':
352 352 self.cluster_dir_obj.copy_all_config_files(overwrite=False)
353 353 self.start_logging()
354 354 self.loop = ioloop.IOLoop.instance()
355 355 # reactor.callWhenRunning(self.start_launchers)
356 356 dc = ioloop.DelayedCallback(self.start_launchers, 0, self.loop)
357 357 dc.start()
358 358 if subcmd == 'engines':
359 359 self.start_logging()
360 360 self.loop = ioloop.IOLoop.instance()
361 361 # reactor.callWhenRunning(self.start_launchers)
362 362 engine_only = lambda : self.start_launchers(controller=False)
363 363 dc = ioloop.DelayedCallback(engine_only, 0, self.loop)
364 364 dc.start()
365 365
366 366 def start_launchers(self, controller=True):
367 367 config = self.master_config
368 368
369 369 # Create the launchers. In both bases, we set the work_dir of
370 370 # the launcher to the cluster_dir. This is where the launcher's
371 371 # subprocesses will be launched. It is not where the controller
372 372 # and engine will be launched.
373 373 if controller:
374 374 cl_class = import_item(config.Global.controller_launcher)
375 375 self.controller_launcher = cl_class(
376 376 work_dir=self.cluster_dir, config=config,
377 377 logname=self.log.name
378 378 )
379 379 # Setup the observing of stopping. If the controller dies, shut
380 380 # everything down as that will be completely fatal for the engines.
381 381 self.controller_launcher.on_stop(self.stop_launchers)
382 382 # But, we don't monitor the stopping of engines. An engine dying
383 383 # is just fine and in principle a user could start a new engine.
384 384 # Also, if we did monitor engine stopping, it is difficult to
385 385 # know what to do when only some engines die. Currently, the
386 386 # observing of engine stopping is inconsistent. Some launchers
387 387 # might trigger on a single engine stopping, other wait until
388 388 # all stop. TODO: think more about how to handle this.
389 389 else:
390 390 self.controller_launcher = None
391 391
392 392 el_class = import_item(config.Global.engine_launcher)
393 393 self.engine_launcher = el_class(
394 394 work_dir=self.cluster_dir, config=config, logname=self.log.name
395 395 )
396 396
397 397 # Setup signals
398 398 signal.signal(signal.SIGINT, self.sigint_handler)
399 399
400 400 # Start the controller and engines
401 401 self._stopping = False # Make sure stop_launchers is not called 2x.
402 402 if controller:
403 403 self.start_controller()
404 404 dc = ioloop.DelayedCallback(self.start_engines, 1000*config.Global.delay*controller, self.loop)
405 405 dc.start()
406 406 self.startup_message()
407 407
408 408 def startup_message(self, r=None):
409 409 self.log.info("IPython cluster: started")
410 410 return r
411 411
412 412 def start_controller(self, r=None):
413 413 # self.log.info("In start_controller")
414 414 config = self.master_config
415 415 d = self.controller_launcher.start(
416 416 cluster_dir=config.Global.cluster_dir
417 417 )
418 418 return d
419 419
420 420 def start_engines(self, r=None):
421 421 # self.log.info("In start_engines")
422 422 config = self.master_config
423 423
424 424 d = self.engine_launcher.start(
425 425 config.Global.n,
426 426 cluster_dir=config.Global.cluster_dir
427 427 )
428 428 return d
429 429
430 430 def stop_controller(self, r=None):
431 431 # self.log.info("In stop_controller")
432 432 if self.controller_launcher and self.controller_launcher.running:
433 433 return self.controller_launcher.stop()
434 434
435 435 def stop_engines(self, r=None):
436 436 # self.log.info("In stop_engines")
437 437 if self.engine_launcher.running:
438 438 d = self.engine_launcher.stop()
439 439 # d.addErrback(self.log_err)
440 440 return d
441 441 else:
442 442 return None
443 443
444 444 def log_err(self, f):
445 445 self.log.error(f.getTraceback())
446 446 return None
447 447
448 448 def stop_launchers(self, r=None):
449 449 if not self._stopping:
450 450 self._stopping = True
451 451 # if isinstance(r, failure.Failure):
452 452 # self.log.error('Unexpected error in ipcluster:')
453 453 # self.log.info(r.getTraceback())
454 454 self.log.error("IPython cluster: stopping")
455 455 # These return deferreds. We are not doing anything with them
456 456 # but we are holding refs to them as a reminder that they
457 457 # do return deferreds.
458 458 d1 = self.stop_engines()
459 459 d2 = self.stop_controller()
460 460 # Wait a few seconds to let things shut down.
461 461 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
462 462 dc.start()
463 463 # reactor.callLater(4.0, reactor.stop)
464 464
465 465 def sigint_handler(self, signum, frame):
466 466 self.stop_launchers()
467 467
468 468 def start_logging(self):
469 469 # Remove old log files of the controller and engine
470 470 if self.master_config.Global.clean_logs:
471 471 log_dir = self.master_config.Global.log_dir
472 472 for f in os.listdir(log_dir):
473 473 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
474 474 os.remove(os.path.join(log_dir, f))
475 475 # This will remove old log files for ipcluster itself
476 476 super(IPClusterApp, self).start_logging()
477 477
478 478 def start_app(self):
479 479 """Start the application, depending on what subcommand is used."""
480 480 subcmd = self.master_config.Global.subcommand
481 481 if subcmd=='create' or subcmd=='list':
482 482 return
483 483 elif subcmd=='start':
484 484 self.start_app_start()
485 485 elif subcmd=='stop':
486 486 self.start_app_stop()
487 487 elif subcmd=='engines':
488 488 self.start_app_engines()
489 489
490 490 def start_app_start(self):
491 491 """Start the app for the start subcommand."""
492 492 config = self.master_config
493 493 # First see if the cluster is already running
494 494 try:
495 495 pid = self.get_pid_from_file()
496 496 except PIDFileError:
497 497 pass
498 498 else:
499 499 self.log.critical(
500 500 'Cluster is already running with [pid=%s]. '
501 501 'use "ipclusterz stop" to stop the cluster.' % pid
502 502 )
503 503 # Here I exit with a unusual exit status that other processes
504 504 # can watch for to learn how I existed.
505 505 self.exit(ALREADY_STARTED)
506 506
507 507 # Now log and daemonize
508 508 self.log.info(
509 509 'Starting ipclusterz with [daemon=%r]' % config.Global.daemonize
510 510 )
511 511 # TODO: Get daemonize working on Windows or as a Windows Server.
512 512 if config.Global.daemonize:
513 513 if os.name=='posix':
514 514 from twisted.scripts._twistd_unix import daemonize
515 515 daemonize()
516 516
517 517 # Now write the new pid file AFTER our new forked pid is active.
518 518 self.write_pid_file()
519 519 try:
520 520 self.loop.start()
521 521 except KeyboardInterrupt:
522 522 pass
523 523 except zmq.ZMQError as e:
524 524 if e.errno == errno.EINTR:
525 525 pass
526 526 else:
527 527 raise
528 528 self.remove_pid_file()
529 529
530 530 def start_app_engines(self):
531 531 """Start the app for the start subcommand."""
532 532 config = self.master_config
533 533 # First see if the cluster is already running
534 534
535 535 # Now log and daemonize
536 536 self.log.info(
537 537 'Starting engines with [daemon=%r]' % config.Global.daemonize
538 538 )
539 539 # TODO: Get daemonize working on Windows or as a Windows Server.
540 540 if config.Global.daemonize:
541 541 if os.name=='posix':
542 542 from twisted.scripts._twistd_unix import daemonize
543 543 daemonize()
544 544
545 545 # Now write the new pid file AFTER our new forked pid is active.
546 546 # self.write_pid_file()
547 547 try:
548 548 self.loop.start()
549 549 except KeyboardInterrupt:
550 550 pass
551 551 except zmq.ZMQError as e:
552 552 if e.errno == errno.EINTR:
553 553 pass
554 554 else:
555 555 raise
556 556 # self.remove_pid_file()
557 557
558 558 def start_app_stop(self):
559 559 """Start the app for the stop subcommand."""
560 560 config = self.master_config
561 561 try:
562 562 pid = self.get_pid_from_file()
563 563 except PIDFileError:
564 564 self.log.critical(
565 565 'Problem reading pid file, cluster is probably not running.'
566 566 )
567 567 # Here I exit with a unusual exit status that other processes
568 568 # can watch for to learn how I existed.
569 569 self.exit(ALREADY_STOPPED)
570 570 else:
571 571 if os.name=='posix':
572 572 sig = config.Global.signal
573 573 self.log.info(
574 574 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
575 575 )
576 576 os.kill(pid, sig)
577 577 elif os.name=='nt':
578 578 # As of right now, we don't support daemonize on Windows, so
579 579 # stop will not do anything. Minimally, it should clean up the
580 580 # old .pid files.
581 581 self.remove_pid_file()
582 582
583 583
584 584 def launch_new_instance():
585 585 """Create and run the IPython cluster."""
586 586 app = IPClusterApp()
587 587 app.start()
588 588
589 589
590 590 if __name__ == '__main__':
591 591 launch_new_instance()
592 592
@@ -1,427 +1,427
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import with_statement
19 19
20 20 import copy
21 21 import os
22 22 import logging
23 23 import socket
24 24 import stat
25 25 import sys
26 26 import uuid
27 27
28 28 import zmq
29 29 from zmq.log.handlers import PUBHandler
30 30 from zmq.utils import jsonapi as json
31 31
32 32 from IPython.config.loader import Config
33 33 from IPython.zmq.parallel import factory
34 34 from IPython.zmq.parallel.controller import ControllerFactory
35 35 from IPython.zmq.parallel.clusterdir import (
36 36 ApplicationWithClusterDir,
37 37 ClusterDirConfigLoader
38 38 )
39 39 from IPython.zmq.parallel.util import disambiguate_ip_address, split_url
40 40 # from IPython.kernel.fcutil import FCServiceFactory, FURLError
41 41 from IPython.utils.traitlets import Instance, Unicode
42 42
43 43
44 44
45 45 #-----------------------------------------------------------------------------
46 46 # Module level variables
47 47 #-----------------------------------------------------------------------------
48 48
49 49
50 50 #: The default config file name for this application
51 51 default_config_file_name = u'ipcontrollerz_config.py'
52 52
53 53
54 54 _description = """Start the IPython controller for parallel computing.
55 55
56 56 The IPython controller provides a gateway between the IPython engines and
57 57 clients. The controller needs to be started before the engines and can be
58 58 configured using command line options or using a cluster directory. Cluster
59 59 directories contain config, log and security files and are usually located in
60 your ipython directory and named as "cluster_<profile>". See the --profile
60 your ipython directory and named as "clusterz_<profile>". See the --profile
61 61 and --cluster-dir options for details.
62 62 """
63 63
64 64 #-----------------------------------------------------------------------------
65 65 # Default interfaces
66 66 #-----------------------------------------------------------------------------
67 67
68 68 # The default client interfaces for FCClientServiceFactory.interfaces
69 69 default_client_interfaces = Config()
70 70 default_client_interfaces.Default.url_file = 'ipcontroller-client.url'
71 71
72 72 # Make this a dict we can pass to Config.__init__ for the default
73 73 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
74 74
75 75
76 76
77 77 # The default engine interfaces for FCEngineServiceFactory.interfaces
78 78 default_engine_interfaces = Config()
79 79 default_engine_interfaces.Default.url_file = u'ipcontroller-engine.url'
80 80
81 81 # Make this a dict we can pass to Config.__init__ for the default
82 82 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
83 83
84 84
85 85 #-----------------------------------------------------------------------------
86 86 # Service factories
87 87 #-----------------------------------------------------------------------------
88 88
89 89 #
90 90 # class FCClientServiceFactory(FCServiceFactory):
91 91 # """A Foolscap implementation of the client services."""
92 92 #
93 93 # cert_file = Unicode(u'ipcontroller-client.pem', config=True)
94 94 # interfaces = Instance(klass=Config, kw=default_client_interfaces,
95 95 # allow_none=False, config=True)
96 96 #
97 97 #
98 98 # class FCEngineServiceFactory(FCServiceFactory):
99 99 # """A Foolscap implementation of the engine services."""
100 100 #
101 101 # cert_file = Unicode(u'ipcontroller-engine.pem', config=True)
102 102 # interfaces = Instance(klass=dict, kw=default_engine_interfaces,
103 103 # allow_none=False, config=True)
104 104 #
105 105
106 106 #-----------------------------------------------------------------------------
107 107 # Command line options
108 108 #-----------------------------------------------------------------------------
109 109
110 110
111 111 class IPControllerAppConfigLoader(ClusterDirConfigLoader):
112 112
113 113 def _add_arguments(self):
114 114 super(IPControllerAppConfigLoader, self)._add_arguments()
115 115 paa = self.parser.add_argument
116 116
117 117 ## Hub Config:
118 118 paa('--mongodb',
119 119 dest='HubFactory.db_class', action='store_const',
120 120 const='IPython.zmq.parallel.mongodb.MongoDB',
121 121 help='Use MongoDB task storage [default: in-memory]')
122 122 paa('--hb',
123 123 type=int, dest='HubFactory.hb', nargs=2,
124 124 help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat '
125 125 'connections [default: random]',
126 126 metavar='Hub.hb_ports')
127 127 paa('--ping',
128 128 type=int, dest='HubFactory.ping',
129 129 help='The frequency at which the Hub pings the engines for heartbeats '
130 130 ' (in ms) [default: 100]',
131 131 metavar='Hub.ping')
132 132
133 133 # Client config
134 134 paa('--client-ip',
135 135 type=str, dest='HubFactory.client_ip',
136 136 help='The IP address or hostname the Hub will listen on for '
137 137 'client connections. Both engine-ip and client-ip can be set simultaneously '
138 138 'via --ip [default: loopback]',
139 139 metavar='Hub.client_ip')
140 140 paa('--client-transport',
141 141 type=str, dest='HubFactory.client_transport',
142 142 help='The ZeroMQ transport the Hub will use for '
143 143 'client connections. Both engine-transport and client-transport can be set simultaneously '
144 144 'via --transport [default: tcp]',
145 145 metavar='Hub.client_transport')
146 146 paa('--query',
147 147 type=int, dest='HubFactory.query_port',
148 148 help='The port on which the Hub XREP socket will listen for result queries from clients [default: random]',
149 149 metavar='Hub.query_port')
150 150 paa('--notifier',
151 151 type=int, dest='HubFactory.notifier_port',
152 152 help='The port on which the Hub PUB socket will listen for notification connections [default: random]',
153 153 metavar='Hub.notifier_port')
154 154
155 155 # Engine config
156 156 paa('--engine-ip',
157 157 type=str, dest='HubFactory.engine_ip',
158 158 help='The IP address or hostname the Hub will listen on for '
159 159 'engine connections. This applies to the Hub and its schedulers'
160 160 'engine-ip and client-ip can be set simultaneously '
161 161 'via --ip [default: loopback]',
162 162 metavar='Hub.engine_ip')
163 163 paa('--engine-transport',
164 164 type=str, dest='HubFactory.engine_transport',
165 165 help='The ZeroMQ transport the Hub will use for '
166 166 'client connections. Both engine-transport and client-transport can be set simultaneously '
167 167 'via --transport [default: tcp]',
168 168 metavar='Hub.engine_transport')
169 169
170 170 # Scheduler config
171 171 paa('--mux',
172 172 type=int, dest='ControllerFactory.mux', nargs=2,
173 173 help='The (2) ports the MUX scheduler will listen on for client,engine '
174 174 'connections, respectively [default: random]',
175 175 metavar='Scheduler.mux_ports')
176 176 paa('--task',
177 177 type=int, dest='ControllerFactory.task', nargs=2,
178 178 help='The (2) ports the Task scheduler will listen on for client,engine '
179 179 'connections, respectively [default: random]',
180 180 metavar='Scheduler.task_ports')
181 181 paa('--control',
182 182 type=int, dest='ControllerFactory.control', nargs=2,
183 183 help='The (2) ports the Control scheduler will listen on for client,engine '
184 184 'connections, respectively [default: random]',
185 185 metavar='Scheduler.control_ports')
186 186 paa('--iopub',
187 187 type=int, dest='ControllerFactory.iopub', nargs=2,
188 188 help='The (2) ports the IOPub scheduler will listen on for client,engine '
189 189 'connections, respectively [default: random]',
190 190 metavar='Scheduler.iopub_ports')
191 191
192 192 paa('--scheme',
193 193 type=str, dest='HubFactory.scheme',
194 194 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
195 195 help='select the task scheduler scheme [default: Python LRU]',
196 196 metavar='Scheduler.scheme')
197 197 paa('--usethreads',
198 198 dest='ControllerFactory.usethreads', action="store_true",
199 199 help='Use threads instead of processes for the schedulers',
200 200 )
201 201 paa('--hwm',
202 202 dest='ControllerFactory.hwm', type=int,
203 203 help='specify the High Water Mark (HWM) for the downstream '
204 204 'socket in the pure ZMQ scheduler. This is the maximum number '
205 205 'of allowed outstanding tasks on each engine.',
206 206 )
207 207
208 208 ## Global config
209 209 paa('--log-to-file',
210 210 action='store_true', dest='Global.log_to_file',
211 211 help='Log to a file in the log directory (default is stdout)')
212 212 paa('--log-url',
213 213 type=str, dest='Global.log_url',
214 214 help='Broadcast logs to an iploggerz process [default: disabled]')
215 215 paa('-r','--reuse-files',
216 216 action='store_true', dest='Global.reuse_files',
217 217 help='Try to reuse existing json connection files.')
218 218 paa('--no-secure',
219 219 action='store_false', dest='Global.secure',
220 220 help='Turn off execution keys (default).')
221 221 paa('--secure',
222 222 action='store_true', dest='Global.secure',
223 223 help='Turn on execution keys.')
224 224 paa('--execkey',
225 225 type=str, dest='Global.exec_key',
226 226 help='path to a file containing an execution key.',
227 227 metavar='keyfile')
228 228 paa('--ssh',
229 229 type=str, dest='Global.sshserver',
230 230 help='ssh url for clients to use when connecting to the Controller '
231 231 'processes. It should be of the form: [user@]server[:port]. The '
232 232 'Controller\'s listening addresses must be accessible from the ssh server',
233 233 metavar='Global.sshserver')
234 234 paa('--location',
235 235 type=str, dest='Global.location',
236 236 help="The external IP or domain name of this machine, used for disambiguating "
237 237 "engine and client connections.",
238 238 metavar='Global.location')
239 239 factory.add_session_arguments(self.parser)
240 240 factory.add_registration_arguments(self.parser)
241 241
242 242
243 243 #-----------------------------------------------------------------------------
244 244 # The main application
245 245 #-----------------------------------------------------------------------------
246 246
247 247
248 248 class IPControllerApp(ApplicationWithClusterDir):
249 249
250 250 name = u'ipcontrollerz'
251 251 description = _description
252 252 command_line_loader = IPControllerAppConfigLoader
253 253 default_config_file_name = default_config_file_name
254 254 auto_create_cluster_dir = True
255 255
256 256
257 257 def create_default_config(self):
258 258 super(IPControllerApp, self).create_default_config()
259 259 # Don't set defaults for Global.secure or Global.reuse_furls
260 260 # as those are set in a component.
261 261 self.default_config.Global.import_statements = []
262 262 self.default_config.Global.clean_logs = True
263 263 self.default_config.Global.secure = True
264 264 self.default_config.Global.reuse_files = False
265 265 self.default_config.Global.exec_key = "exec_key.key"
266 266 self.default_config.Global.sshserver = None
267 267 self.default_config.Global.location = None
268 268
269 269 def pre_construct(self):
270 270 super(IPControllerApp, self).pre_construct()
271 271 c = self.master_config
272 272 # The defaults for these are set in FCClientServiceFactory and
273 273 # FCEngineServiceFactory, so we only set them here if the global
274 274 # options have be set to override the class level defaults.
275 275
276 276 # if hasattr(c.Global, 'reuse_furls'):
277 277 # c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
278 278 # c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
279 279 # del c.Global.reuse_furls
280 280 # if hasattr(c.Global, 'secure'):
281 281 # c.FCClientServiceFactory.secure = c.Global.secure
282 282 # c.FCEngineServiceFactory.secure = c.Global.secure
283 283 # del c.Global.secure
284 284
285 285 def save_connection_dict(self, fname, cdict):
286 286 """save a connection dict to json file."""
287 287 c = self.master_config
288 288 url = cdict['url']
289 289 location = cdict['location']
290 290 if not location:
291 291 try:
292 292 proto,ip,port = split_url(url)
293 293 except AssertionError:
294 294 pass
295 295 else:
296 296 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
297 297 cdict['location'] = location
298 298 fname = os.path.join(c.Global.security_dir, fname)
299 299 with open(fname, 'w') as f:
300 300 f.write(json.dumps(cdict, indent=2))
301 301 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
302 302
303 303 def load_config_from_json(self):
304 304 """load config from existing json connector files."""
305 305 c = self.master_config
306 306 # load from engine config
307 307 with open(os.path.join(c.Global.security_dir, 'ipcontroller-engine.json')) as f:
308 308 cfg = json.loads(f.read())
309 309 key = c.SessionFactory.exec_key = cfg['exec_key']
310 310 xport,addr = cfg['url'].split('://')
311 311 c.HubFactory.engine_transport = xport
312 312 ip,ports = addr.split(':')
313 313 c.HubFactory.engine_ip = ip
314 314 c.HubFactory.regport = int(ports)
315 315 c.Global.location = cfg['location']
316 316
317 317 # load client config
318 318 with open(os.path.join(c.Global.security_dir, 'ipcontroller-client.json')) as f:
319 319 cfg = json.loads(f.read())
320 320 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
321 321 xport,addr = cfg['url'].split('://')
322 322 c.HubFactory.client_transport = xport
323 323 ip,ports = addr.split(':')
324 324 c.HubFactory.client_ip = ip
325 325 c.Global.sshserver = cfg['ssh']
326 326 assert int(ports) == c.HubFactory.regport, "regport mismatch"
327 327
328 328 def construct(self):
329 329 # This is the working dir by now.
330 330 sys.path.insert(0, '')
331 331 c = self.master_config
332 332
333 333 self.import_statements()
334 334 reusing = c.Global.reuse_files
335 335 if reusing:
336 336 try:
337 337 self.load_config_from_json()
338 338 except (AssertionError,IOError):
339 339 reusing=False
340 340 # check again, because reusing may have failed:
341 341 if reusing:
342 342 pass
343 343 elif c.Global.secure:
344 344 keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key)
345 345 key = str(uuid.uuid4())
346 346 with open(keyfile, 'w') as f:
347 347 f.write(key)
348 348 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
349 349 c.SessionFactory.exec_key = key
350 350 else:
351 351 c.SessionFactory.exec_key = ''
352 352 key = None
353 353
354 354 try:
355 355 self.factory = ControllerFactory(config=c, logname=self.log.name)
356 356 self.start_logging()
357 357 self.factory.construct()
358 358 except:
359 359 self.log.error("Couldn't construct the Controller", exc_info=True)
360 360 self.exit(1)
361 361
362 362 if not reusing:
363 363 # save to new json config files
364 364 f = self.factory
365 365 cdict = {'exec_key' : key,
366 366 'ssh' : c.Global.sshserver,
367 367 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
368 368 'location' : c.Global.location
369 369 }
370 370 self.save_connection_dict('ipcontroller-client.json', cdict)
371 371 edict = cdict
372 372 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
373 373 self.save_connection_dict('ipcontroller-engine.json', edict)
374 374
375 375
376 376 def save_urls(self):
377 377 """save the registration urls to files."""
378 378 c = self.master_config
379 379
380 380 sec_dir = c.Global.security_dir
381 381 cf = self.factory
382 382
383 383 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
384 384 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
385 385
386 386 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
387 387 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
388 388
389 389
390 390 def import_statements(self):
391 391 statements = self.master_config.Global.import_statements
392 392 for s in statements:
393 393 try:
394 394 self.log.msg("Executing statement: '%s'" % s)
395 395 exec s in globals(), locals()
396 396 except:
397 397 self.log.msg("Error running statement: %s" % s)
398 398
399 399 def start_logging(self):
400 400 super(IPControllerApp, self).start_logging()
401 401 if self.master_config.Global.log_url:
402 402 context = self.factory.context
403 403 lsock = context.socket(zmq.PUB)
404 404 lsock.connect(self.master_config.Global.log_url)
405 405 handler = PUBHandler(lsock)
406 406 handler.root_topic = 'controller'
407 407 handler.setLevel(self.log_level)
408 408 self.log.addHandler(handler)
409 409 #
410 410 def start_app(self):
411 411 # Start the subprocesses:
412 412 self.factory.start()
413 413 self.write_pid_file(overwrite=True)
414 414 try:
415 415 self.factory.loop.start()
416 416 except KeyboardInterrupt:
417 417 self.log.critical("Interrupted, Exiting...\n")
418 418
419 419
420 420 def launch_new_instance():
421 421 """Create and run the IPython controller"""
422 422 app = IPControllerApp()
423 423 app.start()
424 424
425 425
426 426 if __name__ == '__main__':
427 427 launch_new_instance()
@@ -1,294 +1,294
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython engine application
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import json
19 19 import os
20 20 import sys
21 21
22 22 import zmq
23 23 from zmq.eventloop import ioloop
24 24
25 25 from IPython.zmq.parallel.clusterdir import (
26 26 ApplicationWithClusterDir,
27 27 ClusterDirConfigLoader
28 28 )
29 29 from IPython.zmq.log import EnginePUBHandler
30 30
31 31 from IPython.zmq.parallel import factory
32 32 from IPython.zmq.parallel.engine import EngineFactory
33 33 from IPython.zmq.parallel.streamkernel import Kernel
34 34 from IPython.zmq.parallel.util import disambiguate_url
35 35 from IPython.utils.importstring import import_item
36 36
37 37
38 38 #-----------------------------------------------------------------------------
39 39 # Module level variables
40 40 #-----------------------------------------------------------------------------
41 41
42 42 #: The default config file name for this application
43 43 default_config_file_name = u'ipenginez_config.py'
44 44
45 45
46 46 mpi4py_init = """from mpi4py import MPI as mpi
47 47 mpi.size = mpi.COMM_WORLD.Get_size()
48 48 mpi.rank = mpi.COMM_WORLD.Get_rank()
49 49 """
50 50
51 51
52 52 pytrilinos_init = """from PyTrilinos import Epetra
53 53 class SimpleStruct:
54 54 pass
55 55 mpi = SimpleStruct()
56 56 mpi.rank = 0
57 57 mpi.size = 0
58 58 """
59 59
60 60
61 61 _description = """Start an IPython engine for parallel computing.\n\n
62 62
63 63 IPython engines run in parallel and perform computations on behalf of a client
64 64 and controller. A controller needs to be started before the engines. The
65 65 engine can be configured using command line options or using a cluster
66 66 directory. Cluster directories contain config, log and security files and are
67 usually located in your ipython directory and named as "cluster_<profile>".
67 usually located in your ipython directory and named as "clusterz_<profile>".
68 68 See the --profile and --cluster-dir options for details.
69 69 """
70 70
71 71 #-----------------------------------------------------------------------------
72 72 # Command line options
73 73 #-----------------------------------------------------------------------------
74 74
75 75
76 76 class IPEngineAppConfigLoader(ClusterDirConfigLoader):
77 77
78 78 def _add_arguments(self):
79 79 super(IPEngineAppConfigLoader, self)._add_arguments()
80 80 paa = self.parser.add_argument
81 81 # Controller config
82 82 paa('--file', '-f',
83 83 type=unicode, dest='Global.url_file',
84 84 help='The full location of the file containing the connection information fo '
85 85 'controller. If this is not given, the file must be in the '
86 86 'security directory of the cluster directory. This location is '
87 87 'resolved using the --profile and --app-dir options.',
88 88 metavar='Global.url_file')
89 89 # MPI
90 90 paa('--mpi',
91 91 type=str, dest='MPI.use',
92 92 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).',
93 93 metavar='MPI.use')
94 94 # Global config
95 95 paa('--log-to-file',
96 96 action='store_true', dest='Global.log_to_file',
97 97 help='Log to a file in the log directory (default is stdout)')
98 98 paa('--log-url',
99 99 dest='Global.log_url',
100 100 help="url of ZMQ logger, as started with iploggerz")
101 101 # paa('--execkey',
102 102 # type=str, dest='Global.exec_key',
103 103 # help='path to a file containing an execution key.',
104 104 # metavar='keyfile')
105 105 # paa('--no-secure',
106 106 # action='store_false', dest='Global.secure',
107 107 # help='Turn off execution keys.')
108 108 # paa('--secure',
109 109 # action='store_true', dest='Global.secure',
110 110 # help='Turn on execution keys (default).')
111 111 # init command
112 112 paa('-c',
113 113 type=str, dest='Global.extra_exec_lines',
114 114 help='specify a command to be run at startup')
115 115
116 116 factory.add_session_arguments(self.parser)
117 117 factory.add_registration_arguments(self.parser)
118 118
119 119
120 120 #-----------------------------------------------------------------------------
121 121 # Main application
122 122 #-----------------------------------------------------------------------------
123 123
124 124
125 125 class IPEngineApp(ApplicationWithClusterDir):
126 126
127 127 name = u'ipenginez'
128 128 description = _description
129 129 command_line_loader = IPEngineAppConfigLoader
130 130 default_config_file_name = default_config_file_name
131 131 auto_create_cluster_dir = True
132 132
133 133 def create_default_config(self):
134 134 super(IPEngineApp, self).create_default_config()
135 135
136 136 # The engine should not clean logs as we don't want to remove the
137 137 # active log files of other running engines.
138 138 self.default_config.Global.clean_logs = False
139 139 self.default_config.Global.secure = True
140 140
141 141 # Global config attributes
142 142 self.default_config.Global.exec_lines = []
143 143 self.default_config.Global.extra_exec_lines = ''
144 144
145 145 # Configuration related to the controller
146 146 # This must match the filename (path not included) that the controller
147 147 # used for the FURL file.
148 148 self.default_config.Global.url_file = u''
149 149 self.default_config.Global.url_file_name = u'ipcontroller-engine.json'
150 150 # If given, this is the actual location of the controller's FURL file.
151 151 # If not, this is computed using the profile, app_dir and furl_file_name
152 152 # self.default_config.Global.key_file_name = u'exec_key.key'
153 153 # self.default_config.Global.key_file = u''
154 154
155 155 # MPI related config attributes
156 156 self.default_config.MPI.use = ''
157 157 self.default_config.MPI.mpi4py = mpi4py_init
158 158 self.default_config.MPI.pytrilinos = pytrilinos_init
159 159
160 160 def post_load_command_line_config(self):
161 161 pass
162 162
163 163 def pre_construct(self):
164 164 super(IPEngineApp, self).pre_construct()
165 165 # self.find_cont_url_file()
166 166 self.find_url_file()
167 167 if self.master_config.Global.extra_exec_lines:
168 168 self.master_config.Global.exec_lines.append(self.master_config.Global.extra_exec_lines)
169 169
170 170 # def find_key_file(self):
171 171 # """Set the key file.
172 172 #
173 173 # Here we don't try to actually see if it exists for is valid as that
174 174 # is hadled by the connection logic.
175 175 # """
176 176 # config = self.master_config
177 177 # # Find the actual controller key file
178 178 # if not config.Global.key_file:
179 179 # try_this = os.path.join(
180 180 # config.Global.cluster_dir,
181 181 # config.Global.security_dir,
182 182 # config.Global.key_file_name
183 183 # )
184 184 # config.Global.key_file = try_this
185 185
186 186 def find_url_file(self):
187 187 """Set the key file.
188 188
189 189 Here we don't try to actually see if it exists for is valid as that
190 190 is hadled by the connection logic.
191 191 """
192 192 config = self.master_config
193 193 # Find the actual controller key file
194 194 if not config.Global.url_file:
195 195 try_this = os.path.join(
196 196 config.Global.cluster_dir,
197 197 config.Global.security_dir,
198 198 config.Global.url_file_name
199 199 )
200 200 config.Global.url_file = try_this
201 201
202 202 def construct(self):
203 203 # This is the working dir by now.
204 204 sys.path.insert(0, '')
205 205 config = self.master_config
206 206 # if os.path.exists(config.Global.key_file) and config.Global.secure:
207 207 # config.SessionFactory.exec_key = config.Global.key_file
208 208 if os.path.exists(config.Global.url_file):
209 209 with open(config.Global.url_file) as f:
210 210 d = json.loads(f.read())
211 211 for k,v in d.iteritems():
212 212 if isinstance(v, unicode):
213 213 d[k] = v.encode()
214 214 if d['exec_key']:
215 215 config.SessionFactory.exec_key = d['exec_key']
216 216 d['url'] = disambiguate_url(d['url'], d['location'])
217 217 config.RegistrationFactory.url=d['url']
218 218 config.EngineFactory.location = d['location']
219 219
220 220
221 221
222 222 config.Kernel.exec_lines = config.Global.exec_lines
223 223
224 224 self.start_mpi()
225 225
226 226 # Create the underlying shell class and EngineService
227 227 # shell_class = import_item(self.master_config.Global.shell_class)
228 228 try:
229 229 self.engine = EngineFactory(config=config, logname=self.log.name)
230 230 except:
231 231 self.log.error("Couldn't start the Engine", exc_info=True)
232 232 self.exit(1)
233 233
234 234 self.start_logging()
235 235
236 236 # Create the service hierarchy
237 237 # self.main_service = service.MultiService()
238 238 # self.engine_service.setServiceParent(self.main_service)
239 239 # self.tub_service = Tub()
240 240 # self.tub_service.setServiceParent(self.main_service)
241 241 # # This needs to be called before the connection is initiated
242 242 # self.main_service.startService()
243 243
244 244 # This initiates the connection to the controller and calls
245 245 # register_engine to tell the controller we are ready to do work
246 246 # self.engine_connector = EngineConnector(self.tub_service)
247 247
248 248 # self.log.info("Using furl file: %s" % self.master_config.Global.furl_file)
249 249
250 250 # reactor.callWhenRunning(self.call_connect)
251 251
252 252
253 253 def start_logging(self):
254 254 super(IPEngineApp, self).start_logging()
255 255 if self.master_config.Global.log_url:
256 256 context = self.engine.context
257 257 lsock = context.socket(zmq.PUB)
258 258 lsock.connect(self.master_config.Global.log_url)
259 259 handler = EnginePUBHandler(self.engine, lsock)
260 260 handler.setLevel(self.log_level)
261 261 self.log.addHandler(handler)
262 262
263 263 def start_mpi(self):
264 264 global mpi
265 265 mpikey = self.master_config.MPI.use
266 266 mpi_import_statement = self.master_config.MPI.get(mpikey, None)
267 267 if mpi_import_statement is not None:
268 268 try:
269 269 self.log.info("Initializing MPI:")
270 270 self.log.info(mpi_import_statement)
271 271 exec mpi_import_statement in globals()
272 272 except:
273 273 mpi = None
274 274 else:
275 275 mpi = None
276 276
277 277
278 278 def start_app(self):
279 279 self.engine.start()
280 280 try:
281 281 self.engine.loop.start()
282 282 except KeyboardInterrupt:
283 283 self.log.critical("Engine Interrupted, shutting down...\n")
284 284
285 285
286 286 def launch_new_instance():
287 287 """Create and run the IPython controller"""
288 288 app = IPEngineApp()
289 289 app.start()
290 290
291 291
292 292 if __name__ == '__main__':
293 293 launch_new_instance()
294 294
@@ -1,132 +1,132
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 A simple IPython logger application
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import sys
20 20
21 21 import zmq
22 22
23 23 from IPython.zmq.parallel.clusterdir import (
24 24 ApplicationWithClusterDir,
25 25 ClusterDirConfigLoader
26 26 )
27 27 from .logwatcher import LogWatcher
28 28
29 29 #-----------------------------------------------------------------------------
30 30 # Module level variables
31 31 #-----------------------------------------------------------------------------
32 32
33 33 #: The default config file name for this application
34 34 default_config_file_name = u'iplogger_config.py'
35 35
36 36 _description = """Start an IPython logger for parallel computing.\n\n
37 37
38 38 IPython controllers and engines (and your own processes) can broadcast log messages
39 39 by registering a `zmq.log.handlers.PUBHandler` with the `logging` module. The
40 40 logger can be configured using command line options or using a cluster
41 41 directory. Cluster directories contain config, log and security files and are
42 usually located in your ipython directory and named as "cluster_<profile>".
42 usually located in your ipython directory and named as "clusterz_<profile>".
43 43 See the --profile and --cluster-dir options for details.
44 44 """
45 45
46 46 #-----------------------------------------------------------------------------
47 47 # Command line options
48 48 #-----------------------------------------------------------------------------
49 49
50 50
51 51 class IPLoggerAppConfigLoader(ClusterDirConfigLoader):
52 52
53 53 def _add_arguments(self):
54 54 super(IPLoggerAppConfigLoader, self)._add_arguments()
55 55 paa = self.parser.add_argument
56 56 # Controller config
57 57 paa('--url',
58 58 type=str, dest='LogWatcher.url',
59 59 help='The url the LogWatcher will listen on',
60 60 )
61 61 # MPI
62 62 paa('--topics',
63 63 type=str, dest='LogWatcher.topics', nargs='+',
64 64 help='What topics to subscribe to',
65 65 metavar='topics')
66 66 # Global config
67 67 paa('--log-to-file',
68 68 action='store_true', dest='Global.log_to_file',
69 69 help='Log to a file in the log directory (default is stdout)')
70 70
71 71
72 72 #-----------------------------------------------------------------------------
73 73 # Main application
74 74 #-----------------------------------------------------------------------------
75 75
76 76
77 77 class IPLoggerApp(ApplicationWithClusterDir):
78 78
79 79 name = u'iploggerz'
80 80 description = _description
81 81 command_line_loader = IPLoggerAppConfigLoader
82 82 default_config_file_name = default_config_file_name
83 83 auto_create_cluster_dir = True
84 84
85 85 def create_default_config(self):
86 86 super(IPLoggerApp, self).create_default_config()
87 87
88 88 # The engine should not clean logs as we don't want to remove the
89 89 # active log files of other running engines.
90 90 self.default_config.Global.clean_logs = False
91 91
92 92 # If given, this is the actual location of the logger's URL file.
93 93 # If not, this is computed using the profile, app_dir and furl_file_name
94 94 self.default_config.Global.url_file_name = u'iplogger.url'
95 95 self.default_config.Global.url_file = u''
96 96
97 97 def post_load_command_line_config(self):
98 98 pass
99 99
100 100 def pre_construct(self):
101 101 super(IPLoggerApp, self).pre_construct()
102 102
103 103 def construct(self):
104 104 # This is the working dir by now.
105 105 sys.path.insert(0, '')
106 106
107 107 self.start_logging()
108 108
109 109 try:
110 110 self.watcher = LogWatcher(config=self.master_config, logname=self.log.name)
111 111 except:
112 112 self.log.error("Couldn't start the LogWatcher", exc_info=True)
113 113 self.exit(1)
114 114
115 115
116 116 def start_app(self):
117 117 try:
118 118 self.watcher.start()
119 119 self.watcher.loop.start()
120 120 except KeyboardInterrupt:
121 121 self.log.critical("Logging Interrupted, shutting down...\n")
122 122
123 123
124 124 def launch_new_instance():
125 125 """Create and run the IPython LogWatcher"""
126 126 app = IPLoggerApp()
127 127 app.start()
128 128
129 129
130 130 if __name__ == '__main__':
131 131 launch_new_instance()
132 132
@@ -1,844 +1,847
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 Facilities for launching IPython processes asynchronously.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import logging
19 19 import os
20 20 import re
21 21 import sys
22 22
23 23 from signal import SIGINT, SIGTERM
24 24 try:
25 25 from signal import SIGKILL
26 26 except ImportError:
27 27 SIGKILL=SIGTERM
28 28
29 29 from subprocess import Popen, PIPE, STDOUT
30 30 try:
31 31 from subprocess import check_output
32 32 except ImportError:
33 33 # pre-2.7:
34 34 from StringIO import StringIO
35 35
36 36 def check_output(*args, **kwargs):
37 37 sio = StringIO()
38 38 kwargs.update(dict(stdout=PIPE, stderr=STDOUT))
39 39 p = Popen(*args, **kwargs)
40 40 out,err = p.communicate()
41 41 return out
42 42
43 43 from zmq.eventloop import ioloop
44 44
45 45 from IPython.external import Itpl
46 46 # from IPython.config.configurable import Configurable
47 from IPython.utils.traitlets import Str, Int, List, Unicode, Instance
47 from IPython.utils.traitlets import Str, Int, List, Unicode, Dict, Instance
48 48 from IPython.utils.path import get_ipython_module_path
49 49 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
50 50
51 51 from .factory import LoggingFactory
52 52
53 53 # load winhpcjob from IPython.kernel
54 54 try:
55 55 from IPython.kernel.winhpcjob import (
56 56 IPControllerTask, IPEngineTask,
57 57 IPControllerJob, IPEngineSetJob
58 58 )
59 59 except ImportError:
60 60 pass
61 61
62 62
63 63 #-----------------------------------------------------------------------------
64 64 # Paths to the kernel apps
65 65 #-----------------------------------------------------------------------------
66 66
67 67
68 68 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
69 69 'IPython.zmq.parallel.ipclusterapp'
70 70 ))
71 71
72 72 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
73 73 'IPython.zmq.parallel.ipengineapp'
74 74 ))
75 75
76 76 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
77 77 'IPython.zmq.parallel.ipcontrollerapp'
78 78 ))
79 79
80 80 #-----------------------------------------------------------------------------
81 81 # Base launchers and errors
82 82 #-----------------------------------------------------------------------------
83 83
84 84
85 85 class LauncherError(Exception):
86 86 pass
87 87
88 88
89 89 class ProcessStateError(LauncherError):
90 90 pass
91 91
92 92
93 93 class UnknownStatus(LauncherError):
94 94 pass
95 95
96 96
97 97 class BaseLauncher(LoggingFactory):
98 98 """An asbtraction for starting, stopping and signaling a process."""
99 99
100 100 # In all of the launchers, the work_dir is where child processes will be
101 101 # run. This will usually be the cluster_dir, but may not be. any work_dir
102 102 # passed into the __init__ method will override the config value.
103 103 # This should not be used to set the work_dir for the actual engine
104 104 # and controller. Instead, use their own config files or the
105 105 # controller_args, engine_args attributes of the launchers to add
106 106 # the --work-dir option.
107 107 work_dir = Unicode(u'.')
108 108 loop = Instance('zmq.eventloop.ioloop.IOLoop')
109 109 def _loop_default(self):
110 110 return ioloop.IOLoop.instance()
111 111
112 112 def __init__(self, work_dir=u'.', config=None, **kwargs):
113 113 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
114 114 self.state = 'before' # can be before, running, after
115 115 self.stop_callbacks = []
116 116 self.start_data = None
117 117 self.stop_data = None
118 118
119 119 @property
120 120 def args(self):
121 121 """A list of cmd and args that will be used to start the process.
122 122
123 123 This is what is passed to :func:`spawnProcess` and the first element
124 124 will be the process name.
125 125 """
126 126 return self.find_args()
127 127
128 128 def find_args(self):
129 129 """The ``.args`` property calls this to find the args list.
130 130
131 131 Subcommand should implement this to construct the cmd and args.
132 132 """
133 133 raise NotImplementedError('find_args must be implemented in a subclass')
134 134
135 135 @property
136 136 def arg_str(self):
137 137 """The string form of the program arguments."""
138 138 return ' '.join(self.args)
139 139
140 140 @property
141 141 def running(self):
142 142 """Am I running."""
143 143 if self.state == 'running':
144 144 return True
145 145 else:
146 146 return False
147 147
148 148 def start(self):
149 149 """Start the process.
150 150
151 151 This must return a deferred that fires with information about the
152 152 process starting (like a pid, job id, etc.).
153 153 """
154 154 raise NotImplementedError('start must be implemented in a subclass')
155 155
156 156 def stop(self):
157 157 """Stop the process and notify observers of stopping.
158 158
159 159 This must return a deferred that fires with information about the
160 160 processing stopping, like errors that occur while the process is
161 161 attempting to be shut down. This deferred won't fire when the process
162 162 actually stops. To observe the actual process stopping, see
163 163 :func:`observe_stop`.
164 164 """
165 165 raise NotImplementedError('stop must be implemented in a subclass')
166 166
167 167 def on_stop(self, f):
168 168 """Get a deferred that will fire when the process stops.
169 169
170 170 The deferred will fire with data that contains information about
171 171 the exit status of the process.
172 172 """
173 173 if self.state=='after':
174 174 return f(self.stop_data)
175 175 else:
176 176 self.stop_callbacks.append(f)
177 177
178 178 def notify_start(self, data):
179 179 """Call this to trigger startup actions.
180 180
181 181 This logs the process startup and sets the state to 'running'. It is
182 182 a pass-through so it can be used as a callback.
183 183 """
184 184
185 185 self.log.info('Process %r started: %r' % (self.args[0], data))
186 186 self.start_data = data
187 187 self.state = 'running'
188 188 return data
189 189
190 190 def notify_stop(self, data):
191 191 """Call this to trigger process stop actions.
192 192
193 193 This logs the process stopping and sets the state to 'after'. Call
194 194 this to trigger all the deferreds from :func:`observe_stop`."""
195 195
196 196 self.log.info('Process %r stopped: %r' % (self.args[0], data))
197 197 self.stop_data = data
198 198 self.state = 'after'
199 199 for i in range(len(self.stop_callbacks)):
200 200 d = self.stop_callbacks.pop()
201 201 d(data)
202 202 return data
203 203
204 204 def signal(self, sig):
205 205 """Signal the process.
206 206
207 207 Return a semi-meaningless deferred after signaling the process.
208 208
209 209 Parameters
210 210 ----------
211 211 sig : str or int
212 212 'KILL', 'INT', etc., or any signal number
213 213 """
214 214 raise NotImplementedError('signal must be implemented in a subclass')
215 215
216 216
217 217 #-----------------------------------------------------------------------------
218 218 # Local process launchers
219 219 #-----------------------------------------------------------------------------
220 220
221 221
222 222 class LocalProcessLauncher(BaseLauncher):
223 223 """Start and stop an external process in an asynchronous manner.
224 224
225 225 This will launch the external process with a working directory of
226 226 ``self.work_dir``.
227 227 """
228 228
229 229 # This is used to to construct self.args, which is passed to
230 230 # spawnProcess.
231 231 cmd_and_args = List([])
232 232 poll_frequency = Int(100) # in ms
233 233
234 234 def __init__(self, work_dir=u'.', config=None, **kwargs):
235 235 super(LocalProcessLauncher, self).__init__(
236 236 work_dir=work_dir, config=config, **kwargs
237 237 )
238 238 self.process = None
239 239 self.start_deferred = None
240 240 self.poller = None
241 241
242 242 def find_args(self):
243 243 return self.cmd_and_args
244 244
245 245 def start(self):
246 246 if self.state == 'before':
247 247 self.process = Popen(self.args,
248 248 stdout=PIPE,stderr=PIPE,stdin=PIPE,
249 249 env=os.environ,
250 250 cwd=self.work_dir
251 251 )
252 252
253 253 self.loop.add_handler(self.process.stdout.fileno(), self.handle_stdout, self.loop.READ)
254 254 self.loop.add_handler(self.process.stderr.fileno(), self.handle_stderr, self.loop.READ)
255 255 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
256 256 self.poller.start()
257 257 self.notify_start(self.process.pid)
258 258 else:
259 259 s = 'The process was already started and has state: %r' % self.state
260 260 raise ProcessStateError(s)
261 261
262 262 def stop(self):
263 263 return self.interrupt_then_kill()
264 264
265 265 def signal(self, sig):
266 266 if self.state == 'running':
267 267 self.process.send_signal(sig)
268 268
269 269 def interrupt_then_kill(self, delay=2.0):
270 270 """Send INT, wait a delay and then send KILL."""
271 271 self.signal(SIGINT)
272 272 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
273 273 self.killer.start()
274 274
275 275 # callbacks, etc:
276 276
277 277 def handle_stdout(self, fd, events):
278 278 line = self.process.stdout.readline()
279 279 # a stopped process will be readable but return empty strings
280 280 if line:
281 281 self.log.info(line[:-1])
282 282 else:
283 283 self.poll()
284 284
285 285 def handle_stderr(self, fd, events):
286 286 line = self.process.stderr.readline()
287 287 # a stopped process will be readable but return empty strings
288 288 if line:
289 289 self.log.error(line[:-1])
290 290 else:
291 291 self.poll()
292 292
293 293 def poll(self):
294 294 status = self.process.poll()
295 295 if status is not None:
296 296 self.poller.stop()
297 297 self.loop.remove_handler(self.process.stdout.fileno())
298 298 self.loop.remove_handler(self.process.stderr.fileno())
299 299 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
300 300 return status
301 301
302 302 class LocalControllerLauncher(LocalProcessLauncher):
303 303 """Launch a controller as a regular external process."""
304 304
305 305 controller_cmd = List(ipcontroller_cmd_argv, config=True)
306 306 # Command line arguments to ipcontroller.
307 307 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
308 308
309 309 def find_args(self):
310 310 return self.controller_cmd + self.controller_args
311 311
312 312 def start(self, cluster_dir):
313 313 """Start the controller by cluster_dir."""
314 314 self.controller_args.extend(['--cluster-dir', cluster_dir])
315 315 self.cluster_dir = unicode(cluster_dir)
316 316 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
317 317 return super(LocalControllerLauncher, self).start()
318 318
319 319
320 320 class LocalEngineLauncher(LocalProcessLauncher):
321 321 """Launch a single engine as a regular externall process."""
322 322
323 323 engine_cmd = List(ipengine_cmd_argv, config=True)
324 324 # Command line arguments for ipengine.
325 325 engine_args = List(
326 326 ['--log-to-file','--log-level', str(logging.INFO)], config=True
327 327 )
328 328
329 329 def find_args(self):
330 330 return self.engine_cmd + self.engine_args
331 331
332 332 def start(self, cluster_dir):
333 333 """Start the engine by cluster_dir."""
334 334 self.engine_args.extend(['--cluster-dir', cluster_dir])
335 335 self.cluster_dir = unicode(cluster_dir)
336 336 return super(LocalEngineLauncher, self).start()
337 337
338 338
339 339 class LocalEngineSetLauncher(BaseLauncher):
340 340 """Launch a set of engines as regular external processes."""
341 341
342 342 # Command line arguments for ipengine.
343 343 engine_args = List(
344 344 ['--log-to-file','--log-level', str(logging.INFO)], config=True
345 345 )
346 346 # launcher class
347 347 launcher_class = LocalEngineLauncher
348 348
349 349 def __init__(self, work_dir=u'.', config=None, **kwargs):
350 350 super(LocalEngineSetLauncher, self).__init__(
351 351 work_dir=work_dir, config=config, **kwargs
352 352 )
353 353 self.launchers = {}
354 354 self.stop_data = {}
355 355
356 356 def start(self, n, cluster_dir):
357 357 """Start n engines by profile or cluster_dir."""
358 358 self.cluster_dir = unicode(cluster_dir)
359 359 dlist = []
360 360 for i in range(n):
361 361 el = self.launcher_class(work_dir=self.work_dir, config=self.config, logname=self.log.name)
362 362 # Copy the engine args over to each engine launcher.
363 363 import copy
364 364 el.engine_args = copy.deepcopy(self.engine_args)
365 365 el.on_stop(self._notice_engine_stopped)
366 366 d = el.start(cluster_dir)
367 367 if i==0:
368 368 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
369 369 self.launchers[i] = el
370 370 dlist.append(d)
371 371 self.notify_start(dlist)
372 372 # The consumeErrors here could be dangerous
373 373 # dfinal = gatherBoth(dlist, consumeErrors=True)
374 374 # dfinal.addCallback(self.notify_start)
375 375 return dlist
376 376
377 377 def find_args(self):
378 378 return ['engine set']
379 379
380 380 def signal(self, sig):
381 381 dlist = []
382 382 for el in self.launchers.itervalues():
383 383 d = el.signal(sig)
384 384 dlist.append(d)
385 385 # dfinal = gatherBoth(dlist, consumeErrors=True)
386 386 return dlist
387 387
388 388 def interrupt_then_kill(self, delay=1.0):
389 389 dlist = []
390 390 for el in self.launchers.itervalues():
391 391 d = el.interrupt_then_kill(delay)
392 392 dlist.append(d)
393 393 # dfinal = gatherBoth(dlist, consumeErrors=True)
394 394 return dlist
395 395
396 396 def stop(self):
397 397 return self.interrupt_then_kill()
398 398
399 399 def _notice_engine_stopped(self, data):
400 400 print "notice", data
401 401 pid = data['pid']
402 402 for idx,el in self.launchers.iteritems():
403 403 if el.process.pid == pid:
404 404 break
405 405 self.launchers.pop(idx)
406 406 self.stop_data[idx] = data
407 407 if not self.launchers:
408 408 self.notify_stop(self.stop_data)
409 409
410 410
411 411 #-----------------------------------------------------------------------------
412 412 # MPIExec launchers
413 413 #-----------------------------------------------------------------------------
414 414
415 415
416 416 class MPIExecLauncher(LocalProcessLauncher):
417 417 """Launch an external process using mpiexec."""
418 418
419 419 # The mpiexec command to use in starting the process.
420 420 mpi_cmd = List(['mpiexec'], config=True)
421 421 # The command line arguments to pass to mpiexec.
422 422 mpi_args = List([], config=True)
423 423 # The program to start using mpiexec.
424 424 program = List(['date'], config=True)
425 425 # The command line argument to the program.
426 426 program_args = List([], config=True)
427 427 # The number of instances of the program to start.
428 428 n = Int(1, config=True)
429 429
430 430 def find_args(self):
431 431 """Build self.args using all the fields."""
432 432 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
433 433 self.program + self.program_args
434 434
435 435 def start(self, n):
436 436 """Start n instances of the program using mpiexec."""
437 437 self.n = n
438 438 return super(MPIExecLauncher, self).start()
439 439
440 440
441 441 class MPIExecControllerLauncher(MPIExecLauncher):
442 442 """Launch a controller using mpiexec."""
443 443
444 444 controller_cmd = List(ipcontroller_cmd_argv, config=True)
445 445 # Command line arguments to ipcontroller.
446 446 controller_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
447 447 n = Int(1, config=False)
448 448
449 449 def start(self, cluster_dir):
450 450 """Start the controller by cluster_dir."""
451 451 self.controller_args.extend(['--cluster-dir', cluster_dir])
452 452 self.cluster_dir = unicode(cluster_dir)
453 453 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
454 454 return super(MPIExecControllerLauncher, self).start(1)
455 455
456 456 def find_args(self):
457 457 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
458 458 self.controller_cmd + self.controller_args
459 459
460 460
461 461 class MPIExecEngineSetLauncher(MPIExecLauncher):
462 462
463 463 engine_cmd = List(ipengine_cmd_argv, config=True)
464 464 # Command line arguments for ipengine.
465 465 engine_args = List(
466 466 ['--log-to-file','--log-level', str(logging.INFO)], config=True
467 467 )
468 468 n = Int(1, config=True)
469 469
470 470 def start(self, n, cluster_dir):
471 471 """Start n engines by profile or cluster_dir."""
472 472 self.engine_args.extend(['--cluster-dir', cluster_dir])
473 473 self.cluster_dir = unicode(cluster_dir)
474 474 self.n = n
475 475 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
476 476 return super(MPIExecEngineSetLauncher, self).start(n)
477 477
478 478 def find_args(self):
479 479 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
480 480 self.engine_cmd + self.engine_args
481 481
482 482
483 483 #-----------------------------------------------------------------------------
484 484 # SSH launchers
485 485 #-----------------------------------------------------------------------------
486 486
487 487 # TODO: Get SSH Launcher working again.
488 488
489 489 class SSHLauncher(LocalProcessLauncher):
490 490 """A minimal launcher for ssh.
491 491
492 492 To be useful this will probably have to be extended to use the ``sshx``
493 493 idea for environment variables. There could be other things this needs
494 494 as well.
495 495 """
496 496
497 497 ssh_cmd = List(['ssh'], config=True)
498 498 ssh_args = List(['-tt'], config=True)
499 499 program = List(['date'], config=True)
500 500 program_args = List([], config=True)
501 501 hostname = Str('', config=True)
502 502 user = Str(os.environ.get('USER','username'), config=True)
503 503 location = Str('')
504 504
505 505 def _hostname_changed(self, name, old, new):
506 506 self.location = '%s@%s' % (self.user, new)
507 507
508 508 def _user_changed(self, name, old, new):
509 509 self.location = '%s@%s' % (new, self.hostname)
510 510
511 511 def find_args(self):
512 512 return self.ssh_cmd + self.ssh_args + [self.location] + \
513 513 self.program + self.program_args
514 514
515 515 def start(self, cluster_dir, hostname=None, user=None):
516 516 print self.config
517 517 if hostname is not None:
518 518 self.hostname = hostname
519 519 if user is not None:
520 520 self.user = user
521 521 print (self.location, hostname, user)
522 522 return super(SSHLauncher, self).start()
523 523
524 524 def signal(self, sig):
525 525 if self.state == 'running':
526 526 # send escaped ssh connection-closer
527 527 self.process.stdin.write('~.')
528 528 self.process.stdin.flush()
529 529
530 530
531 531
532 532 class SSHControllerLauncher(SSHLauncher):
533 533
534 534 program = List(ipcontroller_cmd_argv, config=True)
535 535 # Command line arguments to ipcontroller.
536 536 program_args = List(['--log-to-file','--log-level', str(logging.INFO)], config=True)
537 537
538 538
539 539 class SSHEngineLauncher(SSHLauncher):
540 540 program = List(ipengine_cmd_argv, config=True)
541 541 # Command line arguments for ipengine.
542 542 program_args = List(
543 543 ['--log-to-file','--log-level', str(logging.INFO)], config=True
544 544 )
545 545
546 546 class SSHEngineSetLauncher(LocalEngineSetLauncher):
547 547 launcher_class = SSHEngineLauncher
548 548
549 549
550 550 #-----------------------------------------------------------------------------
551 551 # Windows HPC Server 2008 scheduler launchers
552 552 #-----------------------------------------------------------------------------
553 553
554 554
555 555 # This is only used on Windows.
556 556 def find_job_cmd():
557 557 if os.name=='nt':
558 558 try:
559 559 return find_cmd('job')
560 560 except FindCmdError:
561 561 return 'job'
562 562 else:
563 563 return 'job'
564 564
565 565
566 566 class WindowsHPCLauncher(BaseLauncher):
567 567
568 568 # A regular expression used to get the job id from the output of the
569 569 # submit_command.
570 570 job_id_regexp = Str(r'\d+', config=True)
571 571 # The filename of the instantiated job script.
572 572 job_file_name = Unicode(u'ipython_job.xml', config=True)
573 573 # The full path to the instantiated job script. This gets made dynamically
574 574 # by combining the work_dir with the job_file_name.
575 575 job_file = Unicode(u'')
576 576 # The hostname of the scheduler to submit the job to
577 577 scheduler = Str('', config=True)
578 578 job_cmd = Str(find_job_cmd(), config=True)
579 579
580 580 def __init__(self, work_dir=u'.', config=None, **kwargs):
581 581 super(WindowsHPCLauncher, self).__init__(
582 582 work_dir=work_dir, config=config, **kwargs
583 583 )
584 584
585 585 @property
586 586 def job_file(self):
587 587 return os.path.join(self.work_dir, self.job_file_name)
588 588
589 589 def write_job_file(self, n):
590 590 raise NotImplementedError("Implement write_job_file in a subclass.")
591 591
592 592 def find_args(self):
593 593 return ['job.exe']
594 594
595 595 def parse_job_id(self, output):
596 596 """Take the output of the submit command and return the job id."""
597 597 m = re.search(self.job_id_regexp, output)
598 598 if m is not None:
599 599 job_id = m.group()
600 600 else:
601 601 raise LauncherError("Job id couldn't be determined: %s" % output)
602 602 self.job_id = job_id
603 603 self.log.info('Job started with job id: %r' % job_id)
604 604 return job_id
605 605
606 606 def start(self, n):
607 607 """Start n copies of the process using the Win HPC job scheduler."""
608 608 self.write_job_file(n)
609 609 args = [
610 610 'submit',
611 611 '/jobfile:%s' % self.job_file,
612 612 '/scheduler:%s' % self.scheduler
613 613 ]
614 614 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
615 615 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
616 616 output = check_output([self.job_cmd]+args,
617 617 env=os.environ,
618 618 cwd=self.work_dir,
619 619 stderr=STDOUT
620 620 )
621 621 job_id = self.parse_job_id(output)
622 622 # self.notify_start(job_id)
623 623 return job_id
624 624
625 625 def stop(self):
626 626 args = [
627 627 'cancel',
628 628 self.job_id,
629 629 '/scheduler:%s' % self.scheduler
630 630 ]
631 631 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
632 632 try:
633 633 output = check_output([self.job_cmd]+args,
634 634 env=os.environ,
635 635 cwd=self.work_dir,
636 636 stderr=STDOUT
637 637 )
638 638 except:
639 639 output = 'The job already appears to be stoppped: %r' % self.job_id
640 640 self.notify_stop(output) # Pass the output of the kill cmd
641 641 return output
642 642
643 643
644 644 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
645 645
646 646 job_file_name = Unicode(u'ipcontroller_job.xml', config=True)
647 647 extra_args = List([], config=False)
648 648
649 649 def write_job_file(self, n):
650 650 job = IPControllerJob(config=self.config)
651 651
652 652 t = IPControllerTask(config=self.config)
653 653 # The tasks work directory is *not* the actual work directory of
654 654 # the controller. It is used as the base path for the stdout/stderr
655 655 # files that the scheduler redirects to.
656 656 t.work_directory = self.cluster_dir
657 657 # Add the --cluster-dir and from self.start().
658 658 t.controller_args.extend(self.extra_args)
659 659 job.add_task(t)
660 660
661 661 self.log.info("Writing job description file: %s" % self.job_file)
662 662 job.write(self.job_file)
663 663
664 664 @property
665 665 def job_file(self):
666 666 return os.path.join(self.cluster_dir, self.job_file_name)
667 667
668 668 def start(self, cluster_dir):
669 669 """Start the controller by cluster_dir."""
670 670 self.extra_args = ['--cluster-dir', cluster_dir]
671 671 self.cluster_dir = unicode(cluster_dir)
672 672 return super(WindowsHPCControllerLauncher, self).start(1)
673 673
674 674
675 675 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
676 676
677 677 job_file_name = Unicode(u'ipengineset_job.xml', config=True)
678 678 extra_args = List([], config=False)
679 679
680 680 def write_job_file(self, n):
681 681 job = IPEngineSetJob(config=self.config)
682 682
683 683 for i in range(n):
684 684 t = IPEngineTask(config=self.config)
685 685 # The tasks work directory is *not* the actual work directory of
686 686 # the engine. It is used as the base path for the stdout/stderr
687 687 # files that the scheduler redirects to.
688 688 t.work_directory = self.cluster_dir
689 689 # Add the --cluster-dir and from self.start().
690 690 t.engine_args.extend(self.extra_args)
691 691 job.add_task(t)
692 692
693 693 self.log.info("Writing job description file: %s" % self.job_file)
694 694 job.write(self.job_file)
695 695
696 696 @property
697 697 def job_file(self):
698 698 return os.path.join(self.cluster_dir, self.job_file_name)
699 699
700 700 def start(self, n, cluster_dir):
701 701 """Start the controller by cluster_dir."""
702 702 self.extra_args = ['--cluster-dir', cluster_dir]
703 703 self.cluster_dir = unicode(cluster_dir)
704 704 return super(WindowsHPCEngineSetLauncher, self).start(n)
705 705
706 706
707 707 #-----------------------------------------------------------------------------
708 708 # Batch (PBS) system launchers
709 709 #-----------------------------------------------------------------------------
710 710
711 711 # TODO: Get PBS launcher working again.
712 712
713 713 class BatchSystemLauncher(BaseLauncher):
714 714 """Launch an external process using a batch system.
715 715
716 716 This class is designed to work with UNIX batch systems like PBS, LSF,
717 717 GridEngine, etc. The overall model is that there are different commands
718 718 like qsub, qdel, etc. that handle the starting and stopping of the process.
719 719
720 720 This class also has the notion of a batch script. The ``batch_template``
721 721 attribute can be set to a string that is a template for the batch script.
722 722 This template is instantiated using Itpl. Thus the template can use
723 723 ${n} fot the number of instances. Subclasses can add additional variables
724 724 to the template dict.
725 725 """
726 726
727 727 # Subclasses must fill these in. See PBSEngineSet
728 728 # The name of the command line program used to submit jobs.
729 729 submit_command = Str('', config=True)
730 730 # The name of the command line program used to delete jobs.
731 731 delete_command = Str('', config=True)
732 732 # A regular expression used to get the job id from the output of the
733 733 # submit_command.
734 734 job_id_regexp = Str('', config=True)
735 735 # The string that is the batch script template itself.
736 736 batch_template = Str('', config=True)
737 737 # The filename of the instantiated batch script.
738 738 batch_file_name = Unicode(u'batch_script', config=True)
739 739 # The full path to the instantiated batch script.
740 740 batch_file = Unicode(u'')
741 # the format dict used with batch_template:
742 context = Dict()
741 743
744
745 def find_args(self):
746 return [self.submit_command]
747
742 748 def __init__(self, work_dir=u'.', config=None, **kwargs):
743 749 super(BatchSystemLauncher, self).__init__(
744 750 work_dir=work_dir, config=config, **kwargs
745 751 )
746 752 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
747 self.context = {}
748 753
749 754 def parse_job_id(self, output):
750 755 """Take the output of the submit command and return the job id."""
751 756 m = re.match(self.job_id_regexp, output)
752 757 if m is not None:
753 758 job_id = m.group()
754 759 else:
755 760 raise LauncherError("Job id couldn't be determined: %s" % output)
756 761 self.job_id = job_id
757 762 self.log.info('Job started with job id: %r' % job_id)
758 763 return job_id
759 764
760 765 def write_batch_script(self, n):
761 766 """Instantiate and write the batch script to the work_dir."""
762 767 self.context['n'] = n
763 768 script_as_string = Itpl.itplns(self.batch_template, self.context)
764 769 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
765 770 f = open(self.batch_file, 'w')
766 771 f.write(script_as_string)
767 772 f.close()
768 773
769 def start(self, n):
774 def start(self, n, cluster_dir):
770 775 """Start n copies of the process using a batch system."""
776 # Here we save profile and cluster_dir in the context so they
777 # can be used in the batch script template as ${profile} and
778 # ${cluster_dir}
779 self.context['cluster_dir'] = cluster_dir
780 self.cluster_dir = unicode(cluster_dir)
771 781 self.write_batch_script(n)
772 782 output = check_output([self.submit_command, self.batch_file], env=os.environ, stdout=STDOUT)
773 783 job_id = self.parse_job_id(output)
774 784 # self.notify_start(job_id)
775 785 return job_id
776 786
777 787 def stop(self):
778 788 output = check_output([self.delete_command, self.job_id], env=os.environ, stderr=STDOUT)
779 789 self.notify_stop(output) # Pass the output of the kill cmd
780 790 return output
781 791
782 792
783 793 class PBSLauncher(BatchSystemLauncher):
784 794 """A BatchSystemLauncher subclass for PBS."""
785 795
786 796 submit_command = Str('qsub', config=True)
787 797 delete_command = Str('qdel', config=True)
788 798 job_id_regexp = Str(r'\d+', config=True)
789 799 batch_template = Str('', config=True)
790 800 batch_file_name = Unicode(u'pbs_batch_script', config=True)
791 801 batch_file = Unicode(u'')
792 802
793 803
794 804 class PBSControllerLauncher(PBSLauncher):
795 805 """Launch a controller using PBS."""
796 806
797 807 batch_file_name = Unicode(u'pbs_batch_script_controller', config=True)
798 808
799 809 def start(self, cluster_dir):
800 810 """Start the controller by profile or cluster_dir."""
801 # Here we save profile and cluster_dir in the context so they
802 # can be used in the batch script template as ${profile} and
803 # ${cluster_dir}
804 self.context['cluster_dir'] = cluster_dir
805 self.cluster_dir = unicode(cluster_dir)
806 811 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
807 return super(PBSControllerLauncher, self).start(1)
812 return super(PBSControllerLauncher, self).start(1, cluster_dir)
808 813
809 814
810 815 class PBSEngineSetLauncher(PBSLauncher):
811 816
812 817 batch_file_name = Unicode(u'pbs_batch_script_engines', config=True)
813 818
814 819 def start(self, n, cluster_dir):
815 820 """Start n engines by profile or cluster_dir."""
816 self.program_args.extend(['--cluster-dir', cluster_dir])
817 self.cluster_dir = unicode(cluster_dir)
818 821 self.log.info('Starting PBSEngineSetLauncher: %r' % self.args)
819 return super(PBSEngineSetLauncher, self).start(n)
822 return super(PBSEngineSetLauncher, self).start(n, cluster_dir)
820 823
821 824
822 825 #-----------------------------------------------------------------------------
823 826 # A launcher for ipcluster itself!
824 827 #-----------------------------------------------------------------------------
825 828
826 829
827 830 class IPClusterLauncher(LocalProcessLauncher):
828 831 """Launch the ipcluster program in an external process."""
829 832
830 833 ipcluster_cmd = List(ipcluster_cmd_argv, config=True)
831 834 # Command line arguments to pass to ipcluster.
832 835 ipcluster_args = List(
833 836 ['--clean-logs', '--log-to-file', '--log-level', str(logging.INFO)], config=True)
834 837 ipcluster_subcommand = Str('start')
835 838 ipcluster_n = Int(2)
836 839
837 840 def find_args(self):
838 841 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
839 842 ['-n', repr(self.ipcluster_n)] + self.ipcluster_args
840 843
841 844 def start(self):
842 845 self.log.info("Starting ipcluster: %r" % self.args)
843 846 return super(IPClusterLauncher, self).start()
844 847
General Comments 0
You need to be logged in to leave comments. Login now