##// END OF EJS Templates
Added better documentation to command line programs.
Brian Granger -
Show More
@@ -1,459 +1,471 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The ipcluster application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import logging
19 19 import os
20 20 import signal
21 21 import sys
22 22
23 23 if os.name=='posix':
24 24 from twisted.scripts._twistd_unix import daemonize
25 25
26 26 from IPython.core import release
27 27 from IPython.external import argparse
28 28 from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault
29 29 from IPython.utils.importstring import import_item
30 30
31 31 from IPython.kernel.clusterdir import (
32 32 ApplicationWithClusterDir, ClusterDirError, PIDFileError
33 33 )
34 34
35 35 from twisted.internet import reactor, defer
36 36 from twisted.python import log, failure
37 37
38 38
39 39 #-----------------------------------------------------------------------------
40 40 # The ipcluster application
41 41 #-----------------------------------------------------------------------------
42 42
43 43
44 44 # Exit codes for ipcluster
45 45
46 46 # This will be the exit code if the ipcluster appears to be running because
47 47 # a .pid file exists
48 48 ALREADY_STARTED = 10
49 49
50 50 # This will be the exit code if ipcluster stop is run, but there is not .pid
51 51 # file to be found.
52 52 ALREADY_STOPPED = 11
53 53
54 54
55 55 class IPClusterCLLoader(ArgParseConfigLoader):
56 56
57 57 def _add_arguments(self):
58 58 # This has all the common options that all subcommands use
59 59 parent_parser1 = argparse.ArgumentParser(add_help=False)
60 60 parent_parser1.add_argument('--ipython-dir',
61 61 dest='Global.ipython_dir',type=unicode,
62 62 help='Set to override default location of Global.ipython_dir.',
63 63 default=NoConfigDefault,
64 64 metavar='Global.ipython_dir')
65 65 parent_parser1.add_argument('--log-level',
66 66 dest="Global.log_level",type=int,
67 67 help='Set the log level (0,10,20,30,40,50). Default is 30.',
68 68 default=NoConfigDefault,
69 69 metavar='Global.log_level')
70 70
71 71 # This has all the common options that other subcommands use
72 72 parent_parser2 = argparse.ArgumentParser(add_help=False)
73 73 parent_parser2.add_argument('-p','--profile',
74 74 dest='Global.profile',type=unicode,
75 75 help='The string name of the profile to be used. This determines '
76 76 'the name of the cluster dir as: cluster_<profile>. The default profile '
77 77 'is named "default". The cluster directory is resolve this way '
78 78 'if the --cluster-dir option is not used.',
79 79 default=NoConfigDefault,
80 80 metavar='Global.profile')
81 81 parent_parser2.add_argument('--cluster-dir',
82 82 dest='Global.cluster_dir',type=unicode,
83 83 help='Set the cluster dir. This overrides the logic used by the '
84 84 '--profile option.',
85 85 default=NoConfigDefault,
86 86 metavar='Global.cluster_dir'),
87 87 parent_parser2.add_argument('--work-dir',
88 88 dest='Global.work_dir',type=unicode,
89 89 help='Set the working dir for the process.',
90 90 default=NoConfigDefault,
91 91 metavar='Global.work_dir')
92 92 parent_parser2.add_argument('--log-to-file',
93 93 action='store_true', dest='Global.log_to_file',
94 94 default=NoConfigDefault,
95 95 help='Log to a file in the log directory (default is stdout)'
96 96 )
97 97
98 98 subparsers = self.parser.add_subparsers(
99 99 dest='Global.subcommand',
100 100 title='ipcluster subcommands',
101 101 description='ipcluster has a variety of subcommands. '
102 102 'The general way of running ipcluster is "ipcluster <cmd> '
103 103 ' [options]""',
104 104 help='For more help, type "ipcluster <cmd> -h"')
105 105
106 106 parser_list = subparsers.add_parser(
107 107 'list',
108 108 help='List all clusters in cwd and ipython_dir.',
109 109 parents=[parent_parser1]
110 110 )
111 111
112 112 parser_create = subparsers.add_parser(
113 113 'create',
114 114 help='Create a new cluster directory.',
115 115 parents=[parent_parser1, parent_parser2]
116 116 )
117 117 parser_create.add_argument(
118 118 '--reset-config',
119 119 dest='Global.reset_config', action='store_true',
120 120 default=NoConfigDefault,
121 121 help='Recopy the default config files to the cluster directory. '
122 122 'You will loose any modifications you have made to these files.'
123 123 )
124 124
125 125 parser_start = subparsers.add_parser(
126 126 'start',
127 127 help='Start a cluster.',
128 128 parents=[parent_parser1, parent_parser2]
129 129 )
130 130 parser_start.add_argument(
131 131 '-n', '--number',
132 132 type=int, dest='Global.n',
133 133 default=NoConfigDefault,
134 134 help='The number of engines to start.',
135 135 metavar='Global.n'
136 136 )
137 137 parser_start.add_argument('--clean-logs',
138 138 dest='Global.clean_logs', action='store_true',
139 139 help='Delete old log flies before starting.',
140 140 default=NoConfigDefault
141 141 )
142 142 parser_start.add_argument('--no-clean-logs',
143 143 dest='Global.clean_logs', action='store_false',
144 144 help="Don't delete old log flies before starting.",
145 145 default=NoConfigDefault
146 146 )
147 147 parser_start.add_argument('--daemon',
148 148 dest='Global.daemonize', action='store_true',
149 149 help='Daemonize the ipcluster program. This implies --log-to-file',
150 150 default=NoConfigDefault
151 151 )
152 152 parser_start.add_argument('--no-daemon',
153 153 dest='Global.daemonize', action='store_false',
154 154 help="Dont't daemonize the ipcluster program.",
155 155 default=NoConfigDefault
156 156 )
157 157
158 158 parser_start = subparsers.add_parser(
159 159 'stop',
160 160 help='Stop a cluster.',
161 161 parents=[parent_parser1, parent_parser2]
162 162 )
163 163 parser_start.add_argument('--signal',
164 164 dest='Global.signal', type=int,
165 165 help="The signal number to use in stopping the cluster (default=2).",
166 166 metavar="Global.signal",
167 167 default=NoConfigDefault
168 168 )
169 169
170 170
171 171 default_config_file_name = u'ipcluster_config.py'
172 172
173 173
174 _description = """Start an IPython cluster for parallel computing.\n\n
175
176 An IPython cluster consists of 1 controller and 1 or more engines.
177 This command automates the startup of these processes using a wide
178 range of startup methods (SSH, local processes, PBS, mpiexec,
179 Windows HPC Server 2008). To start a cluster with 4 engines on your
180 local host simply do "ipcluster start -n 4". For more complex usage
181 you will typically do "ipcluster create -p mycluster", then edit
182 configuration files, followed by "ipcluster start -p mycluster -n 4".
183 """
184
185
174 186 class IPClusterApp(ApplicationWithClusterDir):
175 187
176 188 name = u'ipcluster'
177 description = 'Start an IPython cluster (controller and engines).'
189 description = _description
178 190 config_file_name = default_config_file_name
179 191 default_log_level = logging.INFO
180 192 auto_create_cluster_dir = False
181 193
182 194 def create_default_config(self):
183 195 super(IPClusterApp, self).create_default_config()
184 196 self.default_config.Global.controller_launcher = \
185 197 'IPython.kernel.launcher.LocalControllerLauncher'
186 198 self.default_config.Global.engine_launcher = \
187 199 'IPython.kernel.launcher.LocalEngineSetLauncher'
188 200 self.default_config.Global.n = 2
189 201 self.default_config.Global.reset_config = False
190 202 self.default_config.Global.clean_logs = True
191 203 self.default_config.Global.signal = 2
192 204 self.default_config.Global.daemonize = False
193 205
194 206 def create_command_line_config(self):
195 207 """Create and return a command line config loader."""
196 208 return IPClusterCLLoader(
197 209 description=self.description,
198 210 version=release.version
199 211 )
200 212
201 213 def find_resources(self):
202 214 subcommand = self.command_line_config.Global.subcommand
203 215 if subcommand=='list':
204 216 self.list_cluster_dirs()
205 217 # Exit immediately because there is nothing left to do.
206 218 self.exit()
207 219 elif subcommand=='create':
208 220 self.auto_create_cluster_dir = True
209 221 super(IPClusterApp, self).find_resources()
210 222 elif subcommand=='start' or subcommand=='stop':
211 223 self.auto_create_cluster_dir = True
212 224 try:
213 225 super(IPClusterApp, self).find_resources()
214 226 except ClusterDirError:
215 227 raise ClusterDirError(
216 228 "Could not find a cluster directory. A cluster dir must "
217 229 "be created before running 'ipcluster start'. Do "
218 230 "'ipcluster create -h' or 'ipcluster list -h' for more "
219 231 "information about creating and listing cluster dirs."
220 232 )
221 233
222 234 def list_cluster_dirs(self):
223 235 # Find the search paths
224 236 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
225 237 if cluster_dir_paths:
226 238 cluster_dir_paths = cluster_dir_paths.split(':')
227 239 else:
228 240 cluster_dir_paths = []
229 241 try:
230 242 ipython_dir = self.command_line_config.Global.ipython_dir
231 243 except AttributeError:
232 244 ipython_dir = self.default_config.Global.ipython_dir
233 245 paths = [os.getcwd(), ipython_dir] + \
234 246 cluster_dir_paths
235 247 paths = list(set(paths))
236 248
237 249 self.log.info('Searching for cluster dirs in paths: %r' % paths)
238 250 for path in paths:
239 251 files = os.listdir(path)
240 252 for f in files:
241 253 full_path = os.path.join(path, f)
242 254 if os.path.isdir(full_path) and f.startswith('cluster_'):
243 255 profile = full_path.split('_')[-1]
244 256 start_cmd = 'ipcluster start -p %s -n 4' % profile
245 257 print start_cmd + " ==> " + full_path
246 258
247 259 def pre_construct(self):
248 260 # IPClusterApp.pre_construct() is where we cd to the working directory.
249 261 super(IPClusterApp, self).pre_construct()
250 262 config = self.master_config
251 263 try:
252 264 daemon = config.Global.daemonize
253 265 if daemon:
254 266 config.Global.log_to_file = True
255 267 except AttributeError:
256 268 pass
257 269
258 270 def construct(self):
259 271 config = self.master_config
260 272 subcmd = config.Global.subcommand
261 273 reset = config.Global.reset_config
262 274 if subcmd == 'list':
263 275 return
264 276 if subcmd == 'create':
265 277 self.log.info('Copying default config files to cluster directory '
266 278 '[overwrite=%r]' % (reset,))
267 279 self.cluster_dir_obj.copy_all_config_files(overwrite=reset)
268 280 if subcmd =='start':
269 281 self.cluster_dir_obj.copy_all_config_files(overwrite=False)
270 282 self.start_logging()
271 283 reactor.callWhenRunning(self.start_launchers)
272 284
273 285 def start_launchers(self):
274 286 config = self.master_config
275 287
276 288 # Create the launchers. In both bases, we set the work_dir of
277 289 # the launcher to the cluster_dir. This is where the launcher's
278 290 # subprocesses will be launched. It is not where the controller
279 291 # and engine will be launched.
280 292 el_class = import_item(config.Global.engine_launcher)
281 293 self.engine_launcher = el_class(
282 294 work_dir=self.cluster_dir, config=config
283 295 )
284 296 cl_class = import_item(config.Global.controller_launcher)
285 297 self.controller_launcher = cl_class(
286 298 work_dir=self.cluster_dir, config=config
287 299 )
288 300
289 301 # Setup signals
290 302 signal.signal(signal.SIGINT, self.sigint_handler)
291 303
292 304 # Setup the observing of stopping. If the controller dies, shut
293 305 # everything down as that will be completely fatal for the engines.
294 306 d1 = self.controller_launcher.observe_stop()
295 307 d1.addCallback(self.stop_launchers)
296 308 # But, we don't monitor the stopping of engines. An engine dying
297 309 # is just fine and in principle a user could start a new engine.
298 310 # Also, if we did monitor engine stopping, it is difficult to
299 311 # know what to do when only some engines die. Currently, the
300 312 # observing of engine stopping is inconsistent. Some launchers
301 313 # might trigger on a single engine stopping, other wait until
302 314 # all stop. TODO: think more about how to handle this.
303 315
304 316 # Start the controller and engines
305 317 self._stopping = False # Make sure stop_launchers is not called 2x.
306 318 d = self.start_controller()
307 319 d.addCallback(self.start_engines)
308 320 d.addCallback(self.startup_message)
309 321 # If the controller or engines fail to start, stop everything
310 322 d.addErrback(self.stop_launchers)
311 323 return d
312 324
313 325 def startup_message(self, r=None):
314 326 log.msg("IPython cluster: started")
315 327 return r
316 328
317 329 def start_controller(self, r=None):
318 330 # log.msg("In start_controller")
319 331 config = self.master_config
320 332 d = self.controller_launcher.start(
321 333 cluster_dir=config.Global.cluster_dir
322 334 )
323 335 return d
324 336
325 337 def start_engines(self, r=None):
326 338 # log.msg("In start_engines")
327 339 config = self.master_config
328 340 d = self.engine_launcher.start(
329 341 config.Global.n,
330 342 cluster_dir=config.Global.cluster_dir
331 343 )
332 344 return d
333 345
334 346 def stop_controller(self, r=None):
335 347 # log.msg("In stop_controller")
336 348 if self.controller_launcher.running:
337 349 d = self.controller_launcher.stop()
338 350 d.addErrback(self.log_err)
339 351 return d
340 352 else:
341 353 return defer.succeed(None)
342 354
343 355 def stop_engines(self, r=None):
344 356 # log.msg("In stop_engines")
345 357 if self.engine_launcher.running:
346 358 d = self.engine_launcher.stop()
347 359 d.addErrback(self.log_err)
348 360 return d
349 361 else:
350 362 return defer.succeed(None)
351 363
352 364 def log_err(self, f):
353 365 log.msg(f.getTraceback())
354 366 return None
355 367
356 368 def stop_launchers(self, r=None):
357 369 if not self._stopping:
358 370 self._stopping = True
359 371 if isinstance(r, failure.Failure):
360 372 log.msg('Unexpected error in ipcluster:')
361 373 log.msg(r.getTraceback())
362 374 log.msg("IPython cluster: stopping")
363 375 d= self.stop_engines()
364 376 d2 = self.stop_controller()
365 377 # Wait a few seconds to let things shut down.
366 378 reactor.callLater(4.0, reactor.stop)
367 379
368 380 def sigint_handler(self, signum, frame):
369 381 self.stop_launchers()
370 382
371 383 def start_logging(self):
372 384 # Remove old log files of the controller and engine
373 385 if self.master_config.Global.clean_logs:
374 386 log_dir = self.master_config.Global.log_dir
375 387 for f in os.listdir(log_dir):
376 388 if f.startswith('ipengine' + '-'):
377 389 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
378 390 os.remove(os.path.join(log_dir, f))
379 391 if f.startswith('ipcontroller' + '-'):
380 392 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
381 393 os.remove(os.path.join(log_dir, f))
382 394 # This will remote old log files for ipcluster itself
383 395 super(IPClusterApp, self).start_logging()
384 396
385 397 def start_app(self):
386 398 """Start the application, depending on what subcommand is used."""
387 399 subcmd = self.master_config.Global.subcommand
388 400 if subcmd=='create' or subcmd=='list':
389 401 return
390 402 elif subcmd=='start':
391 403 self.start_app_start()
392 404 elif subcmd=='stop':
393 405 self.start_app_stop()
394 406
395 407 def start_app_start(self):
396 408 """Start the app for the start subcommand."""
397 409 config = self.master_config
398 410 # First see if the cluster is already running
399 411 try:
400 412 pid = self.get_pid_from_file()
401 413 except PIDFileError:
402 414 pass
403 415 else:
404 416 self.log.critical(
405 417 'Cluster is already running with [pid=%s]. '
406 418 'use "ipcluster stop" to stop the cluster.' % pid
407 419 )
408 420 # Here I exit with a unusual exit status that other processes
409 421 # can watch for to learn how I existed.
410 422 self.exit(ALREADY_STARTED)
411 423
412 424 # Now log and daemonize
413 425 self.log.info(
414 426 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize
415 427 )
416 428 # TODO: Get daemonize working on Windows or as a Windows Server.
417 429 if config.Global.daemonize:
418 430 if os.name=='posix':
419 431 daemonize()
420 432
421 433 # Now write the new pid file AFTER our new forked pid is active.
422 434 self.write_pid_file()
423 435 reactor.addSystemEventTrigger('during','shutdown', self.remove_pid_file)
424 436 reactor.run()
425 437
426 438 def start_app_stop(self):
427 439 """Start the app for the stop subcommand."""
428 440 config = self.master_config
429 441 try:
430 442 pid = self.get_pid_from_file()
431 443 except PIDFileError:
432 444 self.log.critical(
433 445 'Problem reading pid file, cluster is probably not running.'
434 446 )
435 447 # Here I exit with a unusual exit status that other processes
436 448 # can watch for to learn how I existed.
437 449 self.exit(ALREADY_STOPPED)
438 450 else:
439 451 if os.name=='posix':
440 452 sig = config.Global.signal
441 453 self.log.info(
442 454 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
443 455 )
444 456 os.kill(pid, sig)
445 457 elif os.name=='nt':
446 458 # As of right now, we don't support daemonize on Windows, so
447 459 # stop will not do anything. Minimally, it should clean up the
448 460 # old .pid files.
449 461 self.remove_pid_file()
450 462
451 463 def launch_new_instance():
452 464 """Create and run the IPython cluster."""
453 465 app = IPClusterApp()
454 466 app.start()
455 467
456 468
457 469 if __name__ == '__main__':
458 470 launch_new_instance()
459 471
@@ -1,265 +1,275 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import with_statement
19 19
20 20 import copy
21 21 import os
22 22 import sys
23 23
24 24 from twisted.application import service
25 25 from twisted.internet import reactor
26 26 from twisted.python import log
27 27
28 28 from IPython.config.loader import Config, NoConfigDefault
29 29
30 30 from IPython.kernel.clusterdir import (
31 31 ApplicationWithClusterDir,
32 32 AppWithClusterDirArgParseConfigLoader
33 33 )
34 34
35 35 from IPython.core import release
36 36
37 37 from IPython.utils.traitlets import Str, Instance, Unicode
38 38
39 39 from IPython.kernel import controllerservice
40 40
41 41 from IPython.kernel.fcutil import FCServiceFactory
42 42
43 43 #-----------------------------------------------------------------------------
44 44 # Default interfaces
45 45 #-----------------------------------------------------------------------------
46 46
47 47
48 48 # The default client interfaces for FCClientServiceFactory.interfaces
49 49 default_client_interfaces = Config()
50 50 default_client_interfaces.Task.interface_chain = [
51 51 'IPython.kernel.task.ITaskController',
52 52 'IPython.kernel.taskfc.IFCTaskController'
53 53 ]
54 54
55 55 default_client_interfaces.Task.furl_file = 'ipcontroller-tc.furl'
56 56
57 57 default_client_interfaces.MultiEngine.interface_chain = [
58 58 'IPython.kernel.multiengine.IMultiEngine',
59 59 'IPython.kernel.multienginefc.IFCSynchronousMultiEngine'
60 60 ]
61 61
62 62 default_client_interfaces.MultiEngine.furl_file = u'ipcontroller-mec.furl'
63 63
64 64 # Make this a dict we can pass to Config.__init__ for the default
65 65 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
66 66
67 67
68 68
69 69 # The default engine interfaces for FCEngineServiceFactory.interfaces
70 70 default_engine_interfaces = Config()
71 71 default_engine_interfaces.Default.interface_chain = [
72 72 'IPython.kernel.enginefc.IFCControllerBase'
73 73 ]
74 74
75 75 default_engine_interfaces.Default.furl_file = u'ipcontroller-engine.furl'
76 76
77 77 # Make this a dict we can pass to Config.__init__ for the default
78 78 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
79 79
80 80
81 81 #-----------------------------------------------------------------------------
82 82 # Service factories
83 83 #-----------------------------------------------------------------------------
84 84
85 85
86 86 class FCClientServiceFactory(FCServiceFactory):
87 87 """A Foolscap implementation of the client services."""
88 88
89 89 cert_file = Unicode(u'ipcontroller-client.pem', config=True)
90 90 interfaces = Instance(klass=Config, kw=default_client_interfaces,
91 91 allow_none=False, config=True)
92 92
93 93
94 94 class FCEngineServiceFactory(FCServiceFactory):
95 95 """A Foolscap implementation of the engine services."""
96 96
97 97 cert_file = Unicode(u'ipcontroller-engine.pem', config=True)
98 98 interfaces = Instance(klass=dict, kw=default_engine_interfaces,
99 99 allow_none=False, config=True)
100 100
101 101
102 102 #-----------------------------------------------------------------------------
103 103 # The main application
104 104 #-----------------------------------------------------------------------------
105 105
106 106
107 107 cl_args = (
108 108 # Client config
109 109 (('--client-ip',), dict(
110 110 type=str, dest='FCClientServiceFactory.ip', default=NoConfigDefault,
111 111 help='The IP address or hostname the controller will listen on for '
112 112 'client connections.',
113 113 metavar='FCClientServiceFactory.ip')
114 114 ),
115 115 (('--client-port',), dict(
116 116 type=int, dest='FCClientServiceFactory.port', default=NoConfigDefault,
117 117 help='The port the controller will listen on for client connections. '
118 118 'The default is to use 0, which will autoselect an open port.',
119 119 metavar='FCClientServiceFactory.port')
120 120 ),
121 121 (('--client-location',), dict(
122 122 type=str, dest='FCClientServiceFactory.location', default=NoConfigDefault,
123 123 help='The hostname or IP that clients should connect to. This does '
124 124 'not control which interface the controller listens on. Instead, this '
125 125 'determines the hostname/IP that is listed in the FURL, which is how '
126 126 'clients know where to connect. Useful if the controller is listening '
127 127 'on multiple interfaces.',
128 128 metavar='FCClientServiceFactory.location')
129 129 ),
130 130 # Engine config
131 131 (('--engine-ip',), dict(
132 132 type=str, dest='FCEngineServiceFactory.ip', default=NoConfigDefault,
133 133 help='The IP address or hostname the controller will listen on for '
134 134 'engine connections.',
135 135 metavar='FCEngineServiceFactory.ip')
136 136 ),
137 137 (('--engine-port',), dict(
138 138 type=int, dest='FCEngineServiceFactory.port', default=NoConfigDefault,
139 139 help='The port the controller will listen on for engine connections. '
140 140 'The default is to use 0, which will autoselect an open port.',
141 141 metavar='FCEngineServiceFactory.port')
142 142 ),
143 143 (('--engine-location',), dict(
144 144 type=str, dest='FCEngineServiceFactory.location', default=NoConfigDefault,
145 145 help='The hostname or IP that engines should connect to. This does '
146 146 'not control which interface the controller listens on. Instead, this '
147 147 'determines the hostname/IP that is listed in the FURL, which is how '
148 148 'engines know where to connect. Useful if the controller is listening '
149 149 'on multiple interfaces.',
150 150 metavar='FCEngineServiceFactory.location')
151 151 ),
152 152 # Global config
153 153 (('--log-to-file',), dict(
154 154 action='store_true', dest='Global.log_to_file', default=NoConfigDefault,
155 155 help='Log to a file in the log directory (default is stdout)')
156 156 ),
157 157 (('-r','--reuse-furls'), dict(
158 158 action='store_true', dest='Global.reuse_furls', default=NoConfigDefault,
159 159 help='Try to reuse all FURL files. If this is not set all FURL files '
160 160 'are deleted before the controller starts. This must be set if '
161 161 'specific ports are specified by --engine-port or --client-port.')
162 162 ),
163 163 (('--no-secure',), dict(
164 164 action='store_false', dest='Global.secure', default=NoConfigDefault,
165 165 help='Turn off SSL encryption for all connections.')
166 166 ),
167 167 (('--secure',), dict(
168 168 action='store_true', dest='Global.secure', default=NoConfigDefault,
169 169 help='Turn off SSL encryption for all connections.')
170 170 )
171 171 )
172 172
173 173
174 174 class IPControllerAppCLConfigLoader(AppWithClusterDirArgParseConfigLoader):
175 175
176 176 arguments = cl_args
177 177
178 178
179 _description = """Start the IPython controller for parallel computing.
180
181 The IPython controller provides a gateway between the IPython engines and
182 clients. The controller needs to be started before the engines and can be
183 configured using command line options or using a cluster directory. Cluster
184 directories contain config, log and security files and are usually located in
185 your .ipython directory and named as "cluster_<profile>". See the --profile
186 and --cluster-dir options for details.
187 """
188
179 189 default_config_file_name = u'ipcontroller_config.py'
180 190
181 191
182 192 class IPControllerApp(ApplicationWithClusterDir):
183 193
184 194 name = u'ipcontroller'
185 description = 'Start the IPython controller for parallel computing.'
195 description = _description
186 196 config_file_name = default_config_file_name
187 197 auto_create_cluster_dir = True
188 198
189 199 def create_default_config(self):
190 200 super(IPControllerApp, self).create_default_config()
191 201 self.default_config.Global.reuse_furls = False
192 202 self.default_config.Global.secure = True
193 203 self.default_config.Global.import_statements = []
194 204 self.default_config.Global.clean_logs = True
195 205
196 206 def create_command_line_config(self):
197 207 """Create and return a command line config loader."""
198 208 return IPControllerAppCLConfigLoader(
199 209 description=self.description,
200 210 version=release.version
201 211 )
202 212
203 213 def post_load_command_line_config(self):
204 214 # Now setup reuse_furls
205 215 c = self.command_line_config
206 216 if hasattr(c.Global, 'reuse_furls'):
207 217 c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
208 218 c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
209 219 del c.Global.reuse_furls
210 220 if hasattr(c.Global, 'secure'):
211 221 c.FCClientServiceFactory.secure = c.Global.secure
212 222 c.FCEngineServiceFactory.secure = c.Global.secure
213 223 del c.Global.secure
214 224
215 225 def construct(self):
216 226 # This is the working dir by now.
217 227 sys.path.insert(0, '')
218 228
219 229 self.start_logging()
220 230 self.import_statements()
221 231
222 232 # Create the service hierarchy
223 233 self.main_service = service.MultiService()
224 234 # The controller service
225 235 controller_service = controllerservice.ControllerService()
226 236 controller_service.setServiceParent(self.main_service)
227 237 # The client tub and all its refereceables
228 238 csfactory = FCClientServiceFactory(self.master_config, controller_service)
229 239 client_service = csfactory.create()
230 240 client_service.setServiceParent(self.main_service)
231 241 # The engine tub
232 242 esfactory = FCEngineServiceFactory(self.master_config, controller_service)
233 243 engine_service = esfactory.create()
234 244 engine_service.setServiceParent(self.main_service)
235 245
236 246 def import_statements(self):
237 247 statements = self.master_config.Global.import_statements
238 248 for s in statements:
239 249 try:
240 250 log.msg("Executing statement: '%s'" % s)
241 251 exec s in globals(), locals()
242 252 except:
243 253 log.msg("Error running statement: %s" % s)
244 254
245 255 def start_app(self):
246 256 # Start the controller service.
247 257 self.main_service.startService()
248 258 # Write the .pid file overwriting old ones. This allow multiple
249 259 # controllers to clober each other. But Windows is not cleaning
250 260 # these up properly.
251 261 self.write_pid_file(overwrite=True)
252 262 # Add a trigger to delete the .pid file upon shutting down.
253 263 reactor.addSystemEventTrigger('during','shutdown', self.remove_pid_file)
254 264 reactor.run()
255 265
256 266
257 267 def launch_new_instance():
258 268 """Create and run the IPython controller"""
259 269 app = IPControllerApp()
260 270 app.start()
261 271
262 272
263 273 if __name__ == '__main__':
264 274 launch_new_instance()
265 275
@@ -1,237 +1,248 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import os
19 19 import sys
20 20
21 21 from twisted.application import service
22 22 from twisted.internet import reactor
23 23 from twisted.python import log
24 24
25 25 from IPython.config.loader import NoConfigDefault
26 26
27 27 from IPython.kernel.clusterdir import (
28 28 ApplicationWithClusterDir,
29 29 AppWithClusterDirArgParseConfigLoader
30 30 )
31 31 from IPython.core import release
32 32
33 33 from IPython.utils.importstring import import_item
34 34
35 35 from IPython.kernel.engineservice import EngineService
36 36 from IPython.kernel.fcutil import Tub
37 37 from IPython.kernel.engineconnector import EngineConnector
38 38
39 39 #-----------------------------------------------------------------------------
40 40 # The main application
41 41 #-----------------------------------------------------------------------------
42 42
43 43
44 44 cl_args = (
45 45 # Controller config
46 46 (('--furl-file',), dict(
47 47 type=unicode, dest='Global.furl_file', default=NoConfigDefault,
48 48 help='The full location of the file containing the FURL of the '
49 49 'controller. If this is not given, the FURL file must be in the '
50 50 'security directory of the cluster directory. This location is '
51 51 'resolved using the --profile and --app-dir options.',
52 52 metavar='Global.furl_file')
53 53 ),
54 54 # MPI
55 55 (('--mpi',), dict(
56 56 type=str, dest='MPI.use', default=NoConfigDefault,
57 57 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).',
58 58 metavar='MPI.use')
59 59 ),
60 60 # Global config
61 61 (('--log-to-file',), dict(
62 62 action='store_true', dest='Global.log_to_file', default=NoConfigDefault,
63 63 help='Log to a file in the log directory (default is stdout)')
64 64 )
65 65 )
66 66
67 67
68 68 class IPEngineAppCLConfigLoader(AppWithClusterDirArgParseConfigLoader):
69 69
70 70 arguments = cl_args
71 71
72 72
73 73 mpi4py_init = """from mpi4py import MPI as mpi
74 74 mpi.size = mpi.COMM_WORLD.Get_size()
75 75 mpi.rank = mpi.COMM_WORLD.Get_rank()
76 76 """
77 77
78 78 pytrilinos_init = """from PyTrilinos import Epetra
79 79 class SimpleStruct:
80 80 pass
81 81 mpi = SimpleStruct()
82 82 mpi.rank = 0
83 83 mpi.size = 0
84 84 """
85 85
86 86
87 87 default_config_file_name = u'ipengine_config.py'
88 88
89 89
90 _description = """Start an IPython engine for parallel computing.\n\n
91
92 IPython engines run in parallel and perform computations on behalf of a client
93 and controller. A controller needs to be started before the engines. The
94 engine can be configured using command line options or using a cluster
95 directory. Cluster directories contain config, log and security files and are
96 usually located in your .ipython directory and named as "cluster_<profile>".
97 See the --profile and --cluster-dir options for details.
98 """
99
100
90 101 class IPEngineApp(ApplicationWithClusterDir):
91 102
92 103 name = u'ipengine'
93 description = 'Start the IPython engine for parallel computing.'
104 description = _description
94 105 config_file_name = default_config_file_name
95 106 auto_create_cluster_dir = True
96 107
97 108 def create_default_config(self):
98 109 super(IPEngineApp, self).create_default_config()
99 110
100 111 # The engine should not clean logs as we don't want to remove the
101 112 # active log files of other running engines.
102 113 self.default_config.Global.clean_logs = False
103 114
104 115 # Global config attributes
105 116 self.default_config.Global.exec_lines = []
106 117 self.default_config.Global.shell_class = 'IPython.kernel.core.interpreter.Interpreter'
107 118
108 119 # Configuration related to the controller
109 120 # This must match the filename (path not included) that the controller
110 121 # used for the FURL file.
111 122 self.default_config.Global.furl_file_name = u'ipcontroller-engine.furl'
112 123 # If given, this is the actual location of the controller's FURL file.
113 124 # If not, this is computed using the profile, app_dir and furl_file_name
114 125 self.default_config.Global.furl_file = u''
115 126
116 127 # The max number of connection attemps and the initial delay between
117 128 # those attemps.
118 129 self.default_config.Global.connect_delay = 0.1
119 130 self.default_config.Global.connect_max_tries = 15
120 131
121 132 # MPI related config attributes
122 133 self.default_config.MPI.use = ''
123 134 self.default_config.MPI.mpi4py = mpi4py_init
124 135 self.default_config.MPI.pytrilinos = pytrilinos_init
125 136
126 137 def create_command_line_config(self):
127 138 """Create and return a command line config loader."""
128 139 return IPEngineAppCLConfigLoader(
129 140 description=self.description,
130 141 version=release.version
131 142 )
132 143
133 144 def post_load_command_line_config(self):
134 145 pass
135 146
136 147 def pre_construct(self):
137 148 super(IPEngineApp, self).pre_construct()
138 149 self.find_cont_furl_file()
139 150
140 151 def find_cont_furl_file(self):
141 152 """Set the furl file.
142 153
143 154 Here we don't try to actually see if it exists for is valid as that
144 155 is hadled by the connection logic.
145 156 """
146 157 config = self.master_config
147 158 # Find the actual controller FURL file
148 159 if not config.Global.furl_file:
149 160 try_this = os.path.join(
150 161 config.Global.cluster_dir,
151 162 config.Global.security_dir,
152 163 config.Global.furl_file_name
153 164 )
154 165 config.Global.furl_file = try_this
155 166
156 167 def construct(self):
157 168 # This is the working dir by now.
158 169 sys.path.insert(0, '')
159 170
160 171 self.start_mpi()
161 172 self.start_logging()
162 173
163 174 # Create the underlying shell class and EngineService
164 175 shell_class = import_item(self.master_config.Global.shell_class)
165 176 self.engine_service = EngineService(shell_class, mpi=mpi)
166 177
167 178 self.exec_lines()
168 179
169 180 # Create the service hierarchy
170 181 self.main_service = service.MultiService()
171 182 self.engine_service.setServiceParent(self.main_service)
172 183 self.tub_service = Tub()
173 184 self.tub_service.setServiceParent(self.main_service)
174 185 # This needs to be called before the connection is initiated
175 186 self.main_service.startService()
176 187
177 188 # This initiates the connection to the controller and calls
178 189 # register_engine to tell the controller we are ready to do work
179 190 self.engine_connector = EngineConnector(self.tub_service)
180 191
181 192 log.msg("Using furl file: %s" % self.master_config.Global.furl_file)
182 193
183 194 reactor.callWhenRunning(self.call_connect)
184 195
185 196 def call_connect(self):
186 197 d = self.engine_connector.connect_to_controller(
187 198 self.engine_service,
188 199 self.master_config.Global.furl_file,
189 200 self.master_config.Global.connect_delay,
190 201 self.master_config.Global.connect_max_tries
191 202 )
192 203
193 204 def handle_error(f):
194 205 log.msg('Error connecting to controller. This usually means that '
195 206 'i) the controller was not started, ii) a firewall was blocking '
196 207 'the engine from connecting to the controller or iii) the engine '
197 208 ' was not pointed at the right FURL file:')
198 209 log.msg(f.getErrorMessage())
199 210 reactor.callLater(0.1, reactor.stop)
200 211
201 212 d.addErrback(handle_error)
202 213
203 214 def start_mpi(self):
204 215 global mpi
205 216 mpikey = self.master_config.MPI.use
206 217 mpi_import_statement = self.master_config.MPI.get(mpikey, None)
207 218 if mpi_import_statement is not None:
208 219 try:
209 220 self.log.info("Initializing MPI:")
210 221 self.log.info(mpi_import_statement)
211 222 exec mpi_import_statement in globals()
212 223 except:
213 224 mpi = None
214 225 else:
215 226 mpi = None
216 227
217 228 def exec_lines(self):
218 229 for line in self.master_config.Global.exec_lines:
219 230 try:
220 231 log.msg("Executing statement: '%s'" % line)
221 232 self.engine_service.execute(line)
222 233 except:
223 234 log.msg("Error executing statement: %s" % line)
224 235
225 236 def start_app(self):
226 237 reactor.run()
227 238
228 239
229 240 def launch_new_instance():
230 241 """Create and run the IPython controller"""
231 242 app = IPEngineApp()
232 243 app.start()
233 244
234 245
235 246 if __name__ == '__main__':
236 247 launch_new_instance()
237 248
General Comments 0
You need to be logged in to leave comments. Login now