##// END OF EJS Templates
move default log setup to _log_default from init_logging...
MinRK -
Show More
@@ -1,508 +1,508 b''
1 1 # encoding: utf-8
2 2 """
3 3 A base class for a configurable application.
4 4
5 5 Authors:
6 6
7 7 * Brian Granger
8 8 * Min RK
9 9 """
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2008-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #-----------------------------------------------------------------------------
19 19 # Imports
20 20 #-----------------------------------------------------------------------------
21 21
22 22 import logging
23 23 import os
24 24 import re
25 25 import sys
26 26 from copy import deepcopy
27 27 from collections import defaultdict
28 28
29 29 from IPython.external.decorator import decorator
30 30
31 31 from IPython.config.configurable import SingletonConfigurable
32 32 from IPython.config.loader import (
33 33 KVArgParseConfigLoader, PyFileConfigLoader, Config, ArgumentError, ConfigFileNotFound,
34 34 )
35 35
36 36 from IPython.utils.traitlets import (
37 37 Unicode, List, Enum, Dict, Instance, TraitError
38 38 )
39 39 from IPython.utils.importstring import import_item
40 40 from IPython.utils.text import indent, wrap_paragraphs, dedent
41 41
42 42 #-----------------------------------------------------------------------------
43 43 # function for re-wrapping a helpstring
44 44 #-----------------------------------------------------------------------------
45 45
46 46 #-----------------------------------------------------------------------------
47 47 # Descriptions for the various sections
48 48 #-----------------------------------------------------------------------------
49 49
50 50 # merge flags&aliases into options
51 51 option_description = """
52 52 Arguments that take values are actually convenience aliases to full
53 53 Configurables, whose aliases are listed on the help line. For more information
54 54 on full configurables, see '--help-all'.
55 55 """.strip() # trim newlines of front and back
56 56
57 57 keyvalue_description = """
58 58 Parameters are set from command-line arguments of the form:
59 59 `--Class.trait=value`.
60 60 This line is evaluated in Python, so simple expressions are allowed, e.g.::
61 61 `--C.a='range(3)'` For setting C.a=[0,1,2].
62 62 """.strip() # trim newlines of front and back
63 63
64 64 subcommand_description = """
65 65 Subcommands are launched as `{app} cmd [args]`. For information on using
66 66 subcommand 'cmd', do: `{app} cmd -h`.
67 67 """.strip().format(app=os.path.basename(sys.argv[0]))
68 68 # get running program name
69 69
70 70 #-----------------------------------------------------------------------------
71 71 # Application class
72 72 #-----------------------------------------------------------------------------
73 73
74 74 @decorator
75 75 def catch_config_error(method, app, *args, **kwargs):
76 76 """Method decorator for catching invalid config (Trait/ArgumentErrors) during init.
77 77
78 78 On a TraitError (generally caused by bad config), this will print the trait's
79 79 message, and exit the app.
80 80
81 81 For use on init methods, to prevent invoking excepthook on invalid input.
82 82 """
83 83 try:
84 84 return method(app, *args, **kwargs)
85 85 except (TraitError, ArgumentError) as e:
86 86 app.print_description()
87 87 app.print_help()
88 88 app.print_examples()
89 89 app.log.fatal("Bad config encountered during initialization:")
90 90 app.log.fatal(str(e))
91 91 app.log.debug("Config at the time: %s", app.config)
92 92 app.exit(1)
93 93
94 94
95 95 class ApplicationError(Exception):
96 96 pass
97 97
98 98
99 99 class Application(SingletonConfigurable):
100 100 """A singleton application with full configuration support."""
101 101
102 102 # The name of the application, will usually match the name of the command
103 103 # line application
104 104 name = Unicode(u'application')
105 105
106 106 # The description of the application that is printed at the beginning
107 107 # of the help.
108 108 description = Unicode(u'This is an application.')
109 109 # default section descriptions
110 110 option_description = Unicode(option_description)
111 111 keyvalue_description = Unicode(keyvalue_description)
112 112 subcommand_description = Unicode(subcommand_description)
113 113
114 114 # The usage and example string that goes at the end of the help string.
115 115 examples = Unicode()
116 116
117 117 # A sequence of Configurable subclasses whose config=True attributes will
118 118 # be exposed at the command line.
119 119 classes = List([])
120 120
121 121 # The version string of this application.
122 122 version = Unicode(u'0.0')
123 123
124 124 # The log level for the application
125 125 log_level = Enum((0,10,20,30,40,50,'DEBUG','INFO','WARN','ERROR','CRITICAL'),
126 126 default_value=logging.WARN,
127 127 config=True,
128 128 help="Set the log level by value or name.")
129 129 def _log_level_changed(self, name, old, new):
130 130 """Adjust the log level when log_level is set."""
131 131 if isinstance(new, basestring):
132 132 new = getattr(logging, new)
133 133 self.log_level = new
134 134 self.log.setLevel(new)
135
136 log = Instance(logging.Logger)
137 def _log_default(self):
138 """Start logging for this application.
139
140 The default is to log to stdout using a StreaHandler. The log level
141 starts at loggin.WARN, but this can be adjusted by setting the
142 ``log_level`` attribute.
143 """
144 log = logging.getLogger(self.__class__.__name__)
145 log.setLevel(self.log_level)
146 if sys.executable.endswith('pythonw.exe'):
147 # this should really go to a file, but file-logging is only
148 # hooked up in parallel applications
149 _log_handler = logging.StreamHandler(open(os.devnull, 'w'))
150 else:
151 _log_handler = logging.StreamHandler()
152 _log_formatter = logging.Formatter("[%(name)s] %(message)s")
153 _log_handler.setFormatter(_log_formatter)
154 log.addHandler(_log_handler)
155 return log
135 156
136 157 # the alias map for configurables
137 158 aliases = Dict({'log-level' : 'Application.log_level'})
138 159
139 160 # flags for loading Configurables or store_const style flags
140 161 # flags are loaded from this dict by '--key' flags
141 162 # this must be a dict of two-tuples, the first element being the Config/dict
142 163 # and the second being the help string for the flag
143 164 flags = Dict()
144 165 def _flags_changed(self, name, old, new):
145 166 """ensure flags dict is valid"""
146 167 for key,value in new.iteritems():
147 168 assert len(value) == 2, "Bad flag: %r:%s"%(key,value)
148 169 assert isinstance(value[0], (dict, Config)), "Bad flag: %r:%s"%(key,value)
149 170 assert isinstance(value[1], basestring), "Bad flag: %r:%s"%(key,value)
150 171
151 172
152 173 # subcommands for launching other applications
153 174 # if this is not empty, this will be a parent Application
154 175 # this must be a dict of two-tuples,
155 176 # the first element being the application class/import string
156 177 # and the second being the help string for the subcommand
157 178 subcommands = Dict()
158 179 # parse_command_line will initialize a subapp, if requested
159 180 subapp = Instance('IPython.config.application.Application', allow_none=True)
160 181
161 182 # extra command-line arguments that don't set config values
162 183 extra_args = List(Unicode)
163 184
164 185
165 186 def __init__(self, **kwargs):
166 187 SingletonConfigurable.__init__(self, **kwargs)
167 188 # Ensure my class is in self.classes, so my attributes appear in command line
168 189 # options and config files.
169 190 if self.__class__ not in self.classes:
170 191 self.classes.insert(0, self.__class__)
171 192
172 self.init_logging()
173
174 193 def _config_changed(self, name, old, new):
175 194 SingletonConfigurable._config_changed(self, name, old, new)
176 195 self.log.debug('Config changed:')
177 196 self.log.debug(repr(new))
178 197
179 def init_logging(self):
180 """Start logging for this application.
181
182 The default is to log to stdout using a StreaHandler. The log level
183 starts at loggin.WARN, but this can be adjusted by setting the
184 ``log_level`` attribute.
185 """
186 self.log = logging.getLogger(self.__class__.__name__)
187 self.log.setLevel(self.log_level)
188 if sys.executable.endswith('pythonw.exe'):
189 # this should really go to a file, but file-logging is only
190 # hooked up in parallel applications
191 self._log_handler = logging.StreamHandler(open(os.devnull, 'w'))
192 else:
193 self._log_handler = logging.StreamHandler()
194 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
195 self._log_handler.setFormatter(self._log_formatter)
196 self.log.addHandler(self._log_handler)
197
198 198 @catch_config_error
199 199 def initialize(self, argv=None):
200 200 """Do the basic steps to configure me.
201 201
202 202 Override in subclasses.
203 203 """
204 204 self.parse_command_line(argv)
205 205
206 206
207 207 def start(self):
208 208 """Start the app mainloop.
209 209
210 210 Override in subclasses.
211 211 """
212 212 if self.subapp is not None:
213 213 return self.subapp.start()
214 214
215 215 def print_alias_help(self):
216 216 """Print the alias part of the help."""
217 217 if not self.aliases:
218 218 return
219 219
220 220 lines = []
221 221 classdict = {}
222 222 for cls in self.classes:
223 223 # include all parents (up to, but excluding Configurable) in available names
224 224 for c in cls.mro()[:-3]:
225 225 classdict[c.__name__] = c
226 226
227 227 for alias, longname in self.aliases.iteritems():
228 228 classname, traitname = longname.split('.',1)
229 229 cls = classdict[classname]
230 230
231 231 trait = cls.class_traits(config=True)[traitname]
232 232 help = cls.class_get_trait_help(trait).splitlines()
233 233 # reformat first line
234 234 help[0] = help[0].replace(longname, alias) + ' (%s)'%longname
235 235 if len(alias) == 1:
236 236 help[0] = help[0].replace('--%s='%alias, '-%s '%alias)
237 237 lines.extend(help)
238 238 # lines.append('')
239 239 print os.linesep.join(lines)
240 240
241 241 def print_flag_help(self):
242 242 """Print the flag part of the help."""
243 243 if not self.flags:
244 244 return
245 245
246 246 lines = []
247 247 for m, (cfg,help) in self.flags.iteritems():
248 248 prefix = '--' if len(m) > 1 else '-'
249 249 lines.append(prefix+m)
250 250 lines.append(indent(dedent(help.strip())))
251 251 # lines.append('')
252 252 print os.linesep.join(lines)
253 253
254 254 def print_options(self):
255 255 if not self.flags and not self.aliases:
256 256 return
257 257 lines = ['Options']
258 258 lines.append('-'*len(lines[0]))
259 259 lines.append('')
260 260 for p in wrap_paragraphs(self.option_description):
261 261 lines.append(p)
262 262 lines.append('')
263 263 print os.linesep.join(lines)
264 264 self.print_flag_help()
265 265 self.print_alias_help()
266 266 print
267 267
268 268 def print_subcommands(self):
269 269 """Print the subcommand part of the help."""
270 270 if not self.subcommands:
271 271 return
272 272
273 273 lines = ["Subcommands"]
274 274 lines.append('-'*len(lines[0]))
275 275 lines.append('')
276 276 for p in wrap_paragraphs(self.subcommand_description):
277 277 lines.append(p)
278 278 lines.append('')
279 279 for subc, (cls, help) in self.subcommands.iteritems():
280 280 lines.append(subc)
281 281 if help:
282 282 lines.append(indent(dedent(help.strip())))
283 283 lines.append('')
284 284 print os.linesep.join(lines)
285 285
286 286 def print_help(self, classes=False):
287 287 """Print the help for each Configurable class in self.classes.
288 288
289 289 If classes=False (the default), only flags and aliases are printed.
290 290 """
291 291 self.print_subcommands()
292 292 self.print_options()
293 293
294 294 if classes:
295 295 if self.classes:
296 296 print "Class parameters"
297 297 print "----------------"
298 298 print
299 299 for p in wrap_paragraphs(self.keyvalue_description):
300 300 print p
301 301 print
302 302
303 303 for cls in self.classes:
304 304 cls.class_print_help()
305 305 print
306 306 else:
307 307 print "To see all available configurables, use `--help-all`"
308 308 print
309 309
310 310 def print_description(self):
311 311 """Print the application description."""
312 312 for p in wrap_paragraphs(self.description):
313 313 print p
314 314 print
315 315
316 316 def print_examples(self):
317 317 """Print usage and examples.
318 318
319 319 This usage string goes at the end of the command line help string
320 320 and should contain examples of the application's usage.
321 321 """
322 322 if self.examples:
323 323 print "Examples"
324 324 print "--------"
325 325 print
326 326 print indent(dedent(self.examples.strip()))
327 327 print
328 328
329 329 def print_version(self):
330 330 """Print the version string."""
331 331 print self.version
332 332
333 333 def update_config(self, config):
334 334 """Fire the traits events when the config is updated."""
335 335 # Save a copy of the current config.
336 336 newconfig = deepcopy(self.config)
337 337 # Merge the new config into the current one.
338 338 newconfig._merge(config)
339 339 # Save the combined config as self.config, which triggers the traits
340 340 # events.
341 341 self.config = newconfig
342 342
343 343 @catch_config_error
344 344 def initialize_subcommand(self, subc, argv=None):
345 345 """Initialize a subcommand with argv."""
346 346 subapp,help = self.subcommands.get(subc)
347 347
348 348 if isinstance(subapp, basestring):
349 349 subapp = import_item(subapp)
350 350
351 351 # clear existing instances
352 352 self.__class__.clear_instance()
353 353 # instantiate
354 354 self.subapp = subapp.instance()
355 355 # and initialize subapp
356 356 self.subapp.initialize(argv)
357 357
358 358 def flatten_flags(self):
359 359 """flatten flags and aliases, so cl-args override as expected.
360 360
361 361 This prevents issues such as an alias pointing to InteractiveShell,
362 362 but a config file setting the same trait in TerminalInteraciveShell
363 363 getting inappropriate priority over the command-line arg.
364 364
365 365 Only aliases with exactly one descendent in the class list
366 366 will be promoted.
367 367
368 368 """
369 369 # build a tree of classes in our list that inherit from a particular
370 370 # it will be a dict by parent classname of classes in our list
371 371 # that are descendents
372 372 mro_tree = defaultdict(list)
373 373 for cls in self.classes:
374 374 clsname = cls.__name__
375 375 for parent in cls.mro()[1:-3]:
376 376 # exclude cls itself and Configurable,HasTraits,object
377 377 mro_tree[parent.__name__].append(clsname)
378 378 # flatten aliases, which have the form:
379 379 # { 'alias' : 'Class.trait' }
380 380 aliases = {}
381 381 for alias, cls_trait in self.aliases.iteritems():
382 382 cls,trait = cls_trait.split('.',1)
383 383 children = mro_tree[cls]
384 384 if len(children) == 1:
385 385 # exactly one descendent, promote alias
386 386 cls = children[0]
387 387 aliases[alias] = '.'.join([cls,trait])
388 388
389 389 # flatten flags, which are of the form:
390 390 # { 'key' : ({'Cls' : {'trait' : value}}, 'help')}
391 391 flags = {}
392 392 for key, (flagdict, help) in self.flags.iteritems():
393 393 newflag = {}
394 394 for cls, subdict in flagdict.iteritems():
395 395 children = mro_tree[cls]
396 396 # exactly one descendent, promote flag section
397 397 if len(children) == 1:
398 398 cls = children[0]
399 399 newflag[cls] = subdict
400 400 flags[key] = (newflag, help)
401 401 return flags, aliases
402 402
403 403 @catch_config_error
404 404 def parse_command_line(self, argv=None):
405 405 """Parse the command line arguments."""
406 406 argv = sys.argv[1:] if argv is None else argv
407 407
408 408 if argv and argv[0] == 'help':
409 409 # turn `ipython help notebook` into `ipython notebook -h`
410 410 argv = argv[1:] + ['-h']
411 411
412 412 if self.subcommands and len(argv) > 0:
413 413 # we have subcommands, and one may have been specified
414 414 subc, subargv = argv[0], argv[1:]
415 415 if re.match(r'^\w(\-?\w)*$', subc) and subc in self.subcommands:
416 416 # it's a subcommand, and *not* a flag or class parameter
417 417 return self.initialize_subcommand(subc, subargv)
418 418
419 419 if '-h' in argv or '--help' in argv or '--help-all' in argv:
420 420 self.print_description()
421 421 self.print_help('--help-all' in argv)
422 422 self.print_examples()
423 423 self.exit(0)
424 424
425 425 if '--version' in argv or '-V' in argv:
426 426 self.print_version()
427 427 self.exit(0)
428 428
429 429 # flatten flags&aliases, so cl-args get appropriate priority:
430 430 flags,aliases = self.flatten_flags()
431 431
432 432 loader = KVArgParseConfigLoader(argv=argv, aliases=aliases,
433 433 flags=flags)
434 434 config = loader.load_config()
435 435 self.update_config(config)
436 436 # store unparsed args in extra_args
437 437 self.extra_args = loader.extra_args
438 438
439 439 @catch_config_error
440 440 def load_config_file(self, filename, path=None):
441 441 """Load a .py based config file by filename and path."""
442 442 loader = PyFileConfigLoader(filename, path=path)
443 443 try:
444 444 config = loader.load_config()
445 445 except ConfigFileNotFound:
446 446 # problem finding the file, raise
447 447 raise
448 448 except Exception:
449 449 # try to get the full filename, but it will be empty in the
450 450 # unlikely event that the error raised before filefind finished
451 451 filename = loader.full_filename or filename
452 452 # problem while running the file
453 453 self.log.error("Exception while loading config file %s",
454 454 filename, exc_info=True)
455 455 else:
456 456 self.log.debug("Loaded config file: %s", loader.full_filename)
457 457 self.update_config(config)
458 458
459 459 def generate_config_file(self):
460 460 """generate default config file from Configurables"""
461 461 lines = ["# Configuration file for %s."%self.name]
462 462 lines.append('')
463 463 lines.append('c = get_config()')
464 464 lines.append('')
465 465 for cls in self.classes:
466 466 lines.append(cls.class_config_section())
467 467 return '\n'.join(lines)
468 468
469 469 def exit(self, exit_status=0):
470 470 self.log.debug("Exiting application: %s" % self.name)
471 471 sys.exit(exit_status)
472 472
473 473 #-----------------------------------------------------------------------------
474 474 # utility functions, for convenience
475 475 #-----------------------------------------------------------------------------
476 476
477 477 def boolean_flag(name, configurable, set_help='', unset_help=''):
478 478 """Helper for building basic --trait, --no-trait flags.
479 479
480 480 Parameters
481 481 ----------
482 482
483 483 name : str
484 484 The name of the flag.
485 485 configurable : str
486 486 The 'Class.trait' string of the trait to be set/unset with the flag
487 487 set_help : unicode
488 488 help string for --name flag
489 489 unset_help : unicode
490 490 help string for --no-name flag
491 491
492 492 Returns
493 493 -------
494 494
495 495 cfg : dict
496 496 A dict with two keys: 'name', and 'no-name', for setting and unsetting
497 497 the trait, respectively.
498 498 """
499 499 # default helpstrings
500 500 set_help = set_help or "set %s=True"%configurable
501 501 unset_help = unset_help or "set %s=False"%configurable
502 502
503 503 cls,trait = configurable.split('.')
504 504
505 505 setter = {cls : {trait : True}}
506 506 unsetter = {cls : {trait : False}}
507 507 return {name : (setter, set_help), 'no-'+name : (unsetter, unset_help)}
508 508
@@ -1,174 +1,172 b''
1 1 """Manage IPython.parallel clusters in the notebook.
2 2
3 3 Authors:
4 4
5 5 * Brian Granger
6 6 """
7 7
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2008-2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 19 import os
20 20
21 21 from tornado import web
22 22 from zmq.eventloop import ioloop
23 23
24 24 from IPython.config.configurable import LoggingConfigurable
25 25 from IPython.config.loader import load_pyconfig_files
26 26 from IPython.utils.traitlets import Dict, Instance, CFloat
27 27 from IPython.parallel.apps.ipclusterapp import IPClusterStart
28 28 from IPython.core.profileapp import list_profiles_in
29 29 from IPython.core.profiledir import ProfileDir
30 30 from IPython.utils.path import get_ipython_dir
31 31 from IPython.utils.sysinfo import num_cpus
32 32
33 33
34 34 #-----------------------------------------------------------------------------
35 35 # Classes
36 36 #-----------------------------------------------------------------------------
37 37
38 38
39 39 class DummyIPClusterStart(IPClusterStart):
40 40 """Dummy subclass to skip init steps that conflict with global app.
41 41
42 42 Instantiating and initializing this class should result in fully configured
43 43 launchers, but no other side effects or state.
44 44 """
45 45
46 46 def init_signal(self):
47 47 pass
48 def init_logging(self):
49 pass
50 48 def reinit_logging(self):
51 49 pass
52 50
53 51
54 52 class ClusterManager(LoggingConfigurable):
55 53
56 54 profiles = Dict()
57 55
58 56 delay = CFloat(1., config=True,
59 57 help="delay (in s) between starting the controller and the engines")
60 58
61 59 loop = Instance('zmq.eventloop.ioloop.IOLoop')
62 60 def _loop_default(self):
63 61 from zmq.eventloop.ioloop import IOLoop
64 62 return IOLoop.instance()
65 63
66 64 def build_launchers(self, profile_dir):
67 65 starter = DummyIPClusterStart(log=self.log)
68 66 starter.initialize(['--profile-dir', profile_dir])
69 67 cl = starter.controller_launcher
70 68 esl = starter.engine_launcher
71 69 n = starter.n
72 70 return cl, esl, n
73 71
74 72 def get_profile_dir(self, name, path):
75 73 p = ProfileDir.find_profile_dir_by_name(path,name=name)
76 74 return p.location
77 75
78 76 def update_profiles(self):
79 77 """List all profiles in the ipython_dir and cwd.
80 78 """
81 79 for path in [get_ipython_dir(), os.getcwdu()]:
82 80 for profile in list_profiles_in(path):
83 81 pd = self.get_profile_dir(profile, path)
84 82 if profile not in self.profiles:
85 83 self.log.debug("Overwriting profile %s" % profile)
86 84 self.profiles[profile] = {
87 85 'profile': profile,
88 86 'profile_dir': pd,
89 87 'status': 'stopped'
90 88 }
91 89
92 90 def list_profiles(self):
93 91 self.update_profiles()
94 92 result = [self.profile_info(p) for p in sorted(self.profiles.keys())]
95 93 return result
96 94
97 95 def check_profile(self, profile):
98 96 if profile not in self.profiles:
99 97 raise web.HTTPError(404, u'profile not found')
100 98
101 99 def profile_info(self, profile):
102 100 self.check_profile(profile)
103 101 result = {}
104 102 data = self.profiles.get(profile)
105 103 result['profile'] = profile
106 104 result['profile_dir'] = data['profile_dir']
107 105 result['status'] = data['status']
108 106 if 'n' in data:
109 107 result['n'] = data['n']
110 108 return result
111 109
112 110 def start_cluster(self, profile, n=None):
113 111 """Start a cluster for a given profile."""
114 112 self.check_profile(profile)
115 113 data = self.profiles[profile]
116 114 if data['status'] == 'running':
117 115 raise web.HTTPError(409, u'cluster already running')
118 116 cl, esl, default_n = self.build_launchers(data['profile_dir'])
119 117 n = n if n is not None else default_n
120 118 def clean_data():
121 119 data.pop('controller_launcher',None)
122 120 data.pop('engine_set_launcher',None)
123 121 data.pop('n',None)
124 122 data['status'] = 'stopped'
125 123 def engines_stopped(r):
126 124 self.log.debug('Engines stopped')
127 125 if cl.running:
128 126 cl.stop()
129 127 clean_data()
130 128 esl.on_stop(engines_stopped)
131 129 def controller_stopped(r):
132 130 self.log.debug('Controller stopped')
133 131 if esl.running:
134 132 esl.stop()
135 133 clean_data()
136 134 cl.on_stop(controller_stopped)
137 135
138 136 dc = ioloop.DelayedCallback(lambda: cl.start(), 0, self.loop)
139 137 dc.start()
140 138 dc = ioloop.DelayedCallback(lambda: esl.start(n), 1000*self.delay, self.loop)
141 139 dc.start()
142 140
143 141 self.log.debug('Cluster started')
144 142 data['controller_launcher'] = cl
145 143 data['engine_set_launcher'] = esl
146 144 data['n'] = n
147 145 data['status'] = 'running'
148 146 return self.profile_info(profile)
149 147
150 148 def stop_cluster(self, profile):
151 149 """Stop a cluster for a given profile."""
152 150 self.check_profile(profile)
153 151 data = self.profiles[profile]
154 152 if data['status'] == 'stopped':
155 153 raise web.HTTPError(409, u'cluster not running')
156 154 data = self.profiles[profile]
157 155 cl = data['controller_launcher']
158 156 esl = data['engine_set_launcher']
159 157 if cl.running:
160 158 cl.stop()
161 159 if esl.running:
162 160 esl.stop()
163 161 # Return a temp info dict, the real one is updated in the on_stop
164 162 # logic above.
165 163 result = {
166 164 'profile': data['profile'],
167 165 'profile_dir': data['profile_dir'],
168 166 'status': 'stopped'
169 167 }
170 168 return result
171 169
172 170 def stop_all_clusters(self):
173 171 for p in self.profiles.keys():
174 172 self.stop_cluster(p)
@@ -1,547 +1,547 b''
1 1 # coding: utf-8
2 2 """A tornado based IPython notebook server.
3 3
4 4 Authors:
5 5
6 6 * Brian Granger
7 7 """
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2008-2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 19 # stdlib
20 20 import errno
21 21 import logging
22 22 import os
23 23 import re
24 24 import select
25 25 import signal
26 26 import socket
27 27 import sys
28 28 import threading
29 29 import time
30 30 import webbrowser
31 31
32 32 # Third party
33 33 import zmq
34 34
35 35 # Install the pyzmq ioloop. This has to be done before anything else from
36 36 # tornado is imported.
37 37 from zmq.eventloop import ioloop
38 38 ioloop.install()
39 39
40 40 from tornado import httpserver
41 41 from tornado import web
42 42
43 43 # Our own libraries
44 44 from .kernelmanager import MappingKernelManager
45 45 from .handlers import (LoginHandler, LogoutHandler,
46 46 ProjectDashboardHandler, NewHandler, NamedNotebookHandler,
47 47 MainKernelHandler, KernelHandler, KernelActionHandler, IOPubHandler,
48 48 ShellHandler, NotebookRootHandler, NotebookHandler, NotebookCopyHandler,
49 49 RSTHandler, AuthenticatedFileHandler, PrintNotebookHandler,
50 50 MainClusterHandler, ClusterProfileHandler, ClusterActionHandler
51 51 )
52 52 from .notebookmanager import NotebookManager
53 53 from .clustermanager import ClusterManager
54 54
55 55 from IPython.config.application import catch_config_error, boolean_flag
56 56 from IPython.core.application import BaseIPythonApplication
57 57 from IPython.core.profiledir import ProfileDir
58 58 from IPython.lib.kernel import swallow_argv
59 59 from IPython.zmq.session import Session, default_secure
60 60 from IPython.zmq.zmqshell import ZMQInteractiveShell
61 61 from IPython.zmq.ipkernel import (
62 62 flags as ipkernel_flags,
63 63 aliases as ipkernel_aliases,
64 64 IPKernelApp
65 65 )
66 66 from IPython.utils.traitlets import Dict, Unicode, Integer, List, Enum, Bool
67 67 from IPython.utils import py3compat
68 68
69 69 #-----------------------------------------------------------------------------
70 70 # Module globals
71 71 #-----------------------------------------------------------------------------
72 72
73 73 _kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
74 74 _kernel_action_regex = r"(?P<action>restart|interrupt)"
75 75 _notebook_id_regex = r"(?P<notebook_id>\w+-\w+-\w+-\w+-\w+)"
76 76 _profile_regex = r"(?P<profile>[a-zA-Z0-9]+)"
77 77 _cluster_action_regex = r"(?P<action>start|stop)"
78 78
79 79
80 80 LOCALHOST = '127.0.0.1'
81 81
82 82 _examples = """
83 83 ipython notebook # start the notebook
84 84 ipython notebook --profile=sympy # use the sympy profile
85 85 ipython notebook --pylab=inline # pylab in inline plotting mode
86 86 ipython notebook --certfile=mycert.pem # use SSL/TLS certificate
87 87 ipython notebook --port=5555 --ip=* # Listen on port 5555, all interfaces
88 88 """
89 89
90 90 #-----------------------------------------------------------------------------
91 91 # Helper functions
92 92 #-----------------------------------------------------------------------------
93 93
94 94 def url_path_join(a,b):
95 95 if a.endswith('/') and b.startswith('/'):
96 96 return a[:-1]+b
97 97 else:
98 98 return a+b
99 99
100 100 #-----------------------------------------------------------------------------
101 101 # The Tornado web application
102 102 #-----------------------------------------------------------------------------
103 103
104 104 class NotebookWebApplication(web.Application):
105 105
106 106 def __init__(self, ipython_app, kernel_manager, notebook_manager,
107 107 cluster_manager, log,
108 108 base_project_url, settings_overrides):
109 109 handlers = [
110 110 (r"/", ProjectDashboardHandler),
111 111 (r"/login", LoginHandler),
112 112 (r"/logout", LogoutHandler),
113 113 (r"/new", NewHandler),
114 114 (r"/%s" % _notebook_id_regex, NamedNotebookHandler),
115 115 (r"/%s/copy" % _notebook_id_regex, NotebookCopyHandler),
116 116 (r"/%s/print" % _notebook_id_regex, PrintNotebookHandler),
117 117 (r"/kernels", MainKernelHandler),
118 118 (r"/kernels/%s" % _kernel_id_regex, KernelHandler),
119 119 (r"/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler),
120 120 (r"/kernels/%s/iopub" % _kernel_id_regex, IOPubHandler),
121 121 (r"/kernels/%s/shell" % _kernel_id_regex, ShellHandler),
122 122 (r"/notebooks", NotebookRootHandler),
123 123 (r"/notebooks/%s" % _notebook_id_regex, NotebookHandler),
124 124 (r"/rstservice/render", RSTHandler),
125 125 (r"/files/(.*)", AuthenticatedFileHandler, {'path' : notebook_manager.notebook_dir}),
126 126 (r"/clusters", MainClusterHandler),
127 127 (r"/clusters/%s/%s" % (_profile_regex, _cluster_action_regex), ClusterActionHandler),
128 128 (r"/clusters/%s" % _profile_regex, ClusterProfileHandler),
129 129 ]
130 130 settings = dict(
131 131 template_path=os.path.join(os.path.dirname(__file__), "templates"),
132 132 static_path=os.path.join(os.path.dirname(__file__), "static"),
133 133 cookie_secret=os.urandom(1024),
134 134 login_url="/login",
135 135 )
136 136
137 137 # allow custom overrides for the tornado web app.
138 138 settings.update(settings_overrides)
139 139
140 140 # Python < 2.6.5 doesn't accept unicode keys in f(**kwargs), and
141 141 # base_project_url will always be unicode, which will in turn
142 142 # make the patterns unicode, and ultimately result in unicode
143 143 # keys in kwargs to handler._execute(**kwargs) in tornado.
144 144 # This enforces that base_project_url be ascii in that situation.
145 145 #
146 146 # Note that the URLs these patterns check against are escaped,
147 147 # and thus guaranteed to be ASCII: 'hΓ©llo' is really 'h%C3%A9llo'.
148 148 base_project_url = py3compat.unicode_to_str(base_project_url, 'ascii')
149 149
150 150 # prepend base_project_url onto the patterns that we match
151 151 new_handlers = []
152 152 for handler in handlers:
153 153 pattern = url_path_join(base_project_url, handler[0])
154 154 new_handler = tuple([pattern]+list(handler[1:]))
155 155 new_handlers.append( new_handler )
156 156
157 157 super(NotebookWebApplication, self).__init__(new_handlers, **settings)
158 158
159 159 self.kernel_manager = kernel_manager
160 160 self.notebook_manager = notebook_manager
161 161 self.cluster_manager = cluster_manager
162 162 self.ipython_app = ipython_app
163 163 self.read_only = self.ipython_app.read_only
164 164 self.log = log
165 165
166 166
167 167 #-----------------------------------------------------------------------------
168 168 # Aliases and Flags
169 169 #-----------------------------------------------------------------------------
170 170
171 171 flags = dict(ipkernel_flags)
172 172 flags['no-browser']=(
173 173 {'NotebookApp' : {'open_browser' : False}},
174 174 "Don't open the notebook in a browser after startup."
175 175 )
176 176 flags['no-mathjax']=(
177 177 {'NotebookApp' : {'enable_mathjax' : False}},
178 178 """Disable MathJax
179 179
180 180 MathJax is the javascript library IPython uses to render math/LaTeX. It is
181 181 very large, so you may want to disable it if you have a slow internet
182 182 connection, or for offline use of the notebook.
183 183
184 184 When disabled, equations etc. will appear as their untransformed TeX source.
185 185 """
186 186 )
187 187 flags['read-only'] = (
188 188 {'NotebookApp' : {'read_only' : True}},
189 189 """Allow read-only access to notebooks.
190 190
191 191 When using a password to protect the notebook server, this flag
192 192 allows unauthenticated clients to view the notebook list, and
193 193 individual notebooks, but not edit them, start kernels, or run
194 194 code.
195 195
196 196 If no password is set, the server will be entirely read-only.
197 197 """
198 198 )
199 199
200 200 # Add notebook manager flags
201 201 flags.update(boolean_flag('script', 'NotebookManager.save_script',
202 202 'Auto-save a .py script everytime the .ipynb notebook is saved',
203 203 'Do not auto-save .py scripts for every notebook'))
204 204
205 205 # the flags that are specific to the frontend
206 206 # these must be scrubbed before being passed to the kernel,
207 207 # or it will raise an error on unrecognized flags
208 208 notebook_flags = ['no-browser', 'no-mathjax', 'read-only', 'script', 'no-script']
209 209
210 210 aliases = dict(ipkernel_aliases)
211 211
212 212 aliases.update({
213 213 'ip': 'NotebookApp.ip',
214 214 'port': 'NotebookApp.port',
215 215 'keyfile': 'NotebookApp.keyfile',
216 216 'certfile': 'NotebookApp.certfile',
217 217 'notebook-dir': 'NotebookManager.notebook_dir',
218 218 'browser': 'NotebookApp.browser',
219 219 })
220 220
221 221 # remove ipkernel flags that are singletons, and don't make sense in
222 222 # multi-kernel evironment:
223 223 aliases.pop('f', None)
224 224
225 225 notebook_aliases = [u'port', u'ip', u'keyfile', u'certfile',
226 226 u'notebook-dir']
227 227
228 228 #-----------------------------------------------------------------------------
229 229 # NotebookApp
230 230 #-----------------------------------------------------------------------------
231 231
232 232 class NotebookApp(BaseIPythonApplication):
233 233
234 234 name = 'ipython-notebook'
235 235 default_config_file_name='ipython_notebook_config.py'
236 236
237 237 description = """
238 238 The IPython HTML Notebook.
239 239
240 240 This launches a Tornado based HTML Notebook Server that serves up an
241 241 HTML5/Javascript Notebook client.
242 242 """
243 243 examples = _examples
244 244
245 245 classes = [IPKernelApp, ZMQInteractiveShell, ProfileDir, Session,
246 246 MappingKernelManager, NotebookManager]
247 247 flags = Dict(flags)
248 248 aliases = Dict(aliases)
249 249
250 250 kernel_argv = List(Unicode)
251 251
252 252 log_level = Enum((0,10,20,30,40,50,'DEBUG','INFO','WARN','ERROR','CRITICAL'),
253 253 default_value=logging.INFO,
254 254 config=True,
255 255 help="Set the log level by value or name.")
256 256
257 257 # create requested profiles by default, if they don't exist:
258 258 auto_create = Bool(True)
259 259
260 260 # Network related information.
261 261
262 262 ip = Unicode(LOCALHOST, config=True,
263 263 help="The IP address the notebook server will listen on."
264 264 )
265 265
266 266 def _ip_changed(self, name, old, new):
267 267 if new == u'*': self.ip = u''
268 268
269 269 port = Integer(8888, config=True,
270 270 help="The port the notebook server will listen on."
271 271 )
272 272
273 273 certfile = Unicode(u'', config=True,
274 274 help="""The full path to an SSL/TLS certificate file."""
275 275 )
276 276
277 277 keyfile = Unicode(u'', config=True,
278 278 help="""The full path to a private key file for usage with SSL/TLS."""
279 279 )
280 280
281 281 password = Unicode(u'', config=True,
282 282 help="""Hashed password to use for web authentication.
283 283
284 284 To generate, type in a python/IPython shell:
285 285
286 286 from IPython.lib import passwd; passwd()
287 287
288 288 The string should be of the form type:salt:hashed-password.
289 289 """
290 290 )
291 291
292 292 open_browser = Bool(True, config=True,
293 293 help="""Whether to open in a browser after starting.
294 294 The specific browser used is platform dependent and
295 295 determined by the python standard library `webbrowser`
296 296 module, unless it is overridden using the --browser
297 297 (NotebookApp.browser) configuration option.
298 298 """)
299 299
300 300 browser = Unicode(u'', config=True,
301 301 help="""Specify what command to use to invoke a web
302 302 browser when opening the notebook. If not specified, the
303 303 default browser will be determined by the `webbrowser`
304 304 standard library module, which allows setting of the
305 305 BROWSER environment variable to override it.
306 306 """)
307 307
308 308 read_only = Bool(False, config=True,
309 309 help="Whether to prevent editing/execution of notebooks."
310 310 )
311 311
312 312 webapp_settings = Dict(config=True,
313 313 help="Supply overrides for the tornado.web.Application that the "
314 314 "IPython notebook uses.")
315 315
316 316 enable_mathjax = Bool(True, config=True,
317 317 help="""Whether to enable MathJax for typesetting math/TeX
318 318
319 319 MathJax is the javascript library IPython uses to render math/LaTeX. It is
320 320 very large, so you may want to disable it if you have a slow internet
321 321 connection, or for offline use of the notebook.
322 322
323 323 When disabled, equations etc. will appear as their untransformed TeX source.
324 324 """
325 325 )
326 326 def _enable_mathjax_changed(self, name, old, new):
327 327 """set mathjax url to empty if mathjax is disabled"""
328 328 if not new:
329 329 self.mathjax_url = u''
330 330
331 331 base_project_url = Unicode('/', config=True,
332 332 help='''The base URL for the notebook server''')
333 333 base_kernel_url = Unicode('/', config=True,
334 334 help='''The base URL for the kernel server''')
335 335 websocket_host = Unicode("", config=True,
336 336 help="""The hostname for the websocket server."""
337 337 )
338 338
339 339 mathjax_url = Unicode("", config=True,
340 340 help="""The url for MathJax.js."""
341 341 )
342 342 def _mathjax_url_default(self):
343 343 if not self.enable_mathjax:
344 344 return u''
345 345 static_path = self.webapp_settings.get("static_path", os.path.join(os.path.dirname(__file__), "static"))
346 346 static_url_prefix = self.webapp_settings.get("static_url_prefix",
347 347 "/static/")
348 348 if os.path.exists(os.path.join(static_path, 'mathjax', "MathJax.js")):
349 349 self.log.info("Using local MathJax")
350 350 return static_url_prefix+u"mathjax/MathJax.js"
351 351 else:
352 352 if self.certfile:
353 353 # HTTPS: load from Rackspace CDN, because SSL certificate requires it
354 354 base = u"https://c328740.ssl.cf1.rackcdn.com"
355 355 else:
356 356 base = u"http://cdn.mathjax.org"
357 357
358 358 url = base + u"/mathjax/latest/MathJax.js"
359 359 self.log.info("Using MathJax from CDN: %s", url)
360 360 return url
361 361
362 362 def _mathjax_url_changed(self, name, old, new):
363 363 if new and not self.enable_mathjax:
364 364 # enable_mathjax=False overrides mathjax_url
365 365 self.mathjax_url = u''
366 366 else:
367 367 self.log.info("Using MathJax: %s", new)
368 368
369 369 def parse_command_line(self, argv=None):
370 370 super(NotebookApp, self).parse_command_line(argv)
371 371 if argv is None:
372 372 argv = sys.argv[1:]
373 373
374 374 # Scrub frontend-specific flags
375 375 self.kernel_argv = swallow_argv(argv, notebook_aliases, notebook_flags)
376 376 # Kernel should inherit default config file from frontend
377 377 self.kernel_argv.append("--KernelApp.parent_appname='%s'"%self.name)
378 378
379 379 def init_configurables(self):
380 380 # force Session default to be secure
381 381 default_secure(self.config)
382 382 # Create a KernelManager and start a kernel.
383 383 self.kernel_manager = MappingKernelManager(
384 384 config=self.config, log=self.log, kernel_argv=self.kernel_argv,
385 385 connection_dir = self.profile_dir.security_dir,
386 386 )
387 387 self.notebook_manager = NotebookManager(config=self.config, log=self.log)
388 388 self.notebook_manager.list_notebooks()
389 389 self.cluster_manager = ClusterManager(config=self.config, log=self.log)
390 390 self.cluster_manager.update_profiles()
391 391
392 392 def init_logging(self):
393 super(NotebookApp, self).init_logging()
394 393 # This prevents double log messages because tornado use a root logger that
395 394 # self.log is a child of. The logging module dipatches log messages to a log
396 395 # and all of its ancenstors until propagate is set to False.
397 396 self.log.propagate = False
398 397
399 398 def init_webapp(self):
400 399 """initialize tornado webapp and httpserver"""
401 400 self.web_app = NotebookWebApplication(
402 401 self, self.kernel_manager, self.notebook_manager,
403 402 self.cluster_manager, self.log,
404 403 self.base_project_url, self.webapp_settings
405 404 )
406 405 if self.certfile:
407 406 ssl_options = dict(certfile=self.certfile)
408 407 if self.keyfile:
409 408 ssl_options['keyfile'] = self.keyfile
410 409 else:
411 410 ssl_options = None
412 411 self.web_app.password = self.password
413 412 self.http_server = httpserver.HTTPServer(self.web_app, ssl_options=ssl_options)
414 413 if ssl_options is None and not self.ip and not (self.read_only and not self.password):
415 414 self.log.critical('WARNING: the notebook server is listening on all IP addresses '
416 415 'but not using any encryption or authentication. This is highly '
417 416 'insecure and not recommended.')
418 417
419 418 # Try random ports centered around the default.
420 419 from random import randint
421 420 n = 50 # Max number of attempts, keep reasonably large.
422 421 for port in range(self.port, self.port+5) + [self.port + randint(-2*n, 2*n) for i in range(n-5)]:
423 422 try:
424 423 self.http_server.listen(port, self.ip)
425 424 except socket.error, e:
426 425 if e.errno != errno.EADDRINUSE:
427 426 raise
428 427 self.log.info('The port %i is already in use, trying another random port.' % port)
429 428 else:
430 429 self.port = port
431 430 break
432 431
433 432 def init_signal(self):
434 433 # FIXME: remove this check when pyzmq dependency is >= 2.1.11
435 434 # safely extract zmq version info:
436 435 try:
437 436 zmq_v = zmq.pyzmq_version_info()
438 437 except AttributeError:
439 438 zmq_v = [ int(n) for n in re.findall(r'\d+', zmq.__version__) ]
440 439 if 'dev' in zmq.__version__:
441 440 zmq_v.append(999)
442 441 zmq_v = tuple(zmq_v)
443 442 if zmq_v >= (2,1,9):
444 443 # This won't work with 2.1.7 and
445 444 # 2.1.9-10 will log ugly 'Interrupted system call' messages,
446 445 # but it will work
447 446 signal.signal(signal.SIGINT, self._handle_sigint)
448 447 signal.signal(signal.SIGTERM, self._signal_stop)
449 448
450 449 def _handle_sigint(self, sig, frame):
451 450 """SIGINT handler spawns confirmation dialog"""
452 451 # register more forceful signal handler for ^C^C case
453 452 signal.signal(signal.SIGINT, self._signal_stop)
454 453 # request confirmation dialog in bg thread, to avoid
455 454 # blocking the App
456 455 thread = threading.Thread(target=self._confirm_exit)
457 456 thread.daemon = True
458 457 thread.start()
459 458
460 459 def _restore_sigint_handler(self):
461 460 """callback for restoring original SIGINT handler"""
462 461 signal.signal(signal.SIGINT, self._handle_sigint)
463 462
464 463 def _confirm_exit(self):
465 464 """confirm shutdown on ^C
466 465
467 466 A second ^C, or answering 'y' within 5s will cause shutdown,
468 467 otherwise original SIGINT handler will be restored.
469 468 """
470 469 # FIXME: remove this delay when pyzmq dependency is >= 2.1.11
471 470 time.sleep(0.1)
472 471 sys.stdout.write("Shutdown Notebook Server (y/[n])? ")
473 472 sys.stdout.flush()
474 473 r,w,x = select.select([sys.stdin], [], [], 5)
475 474 if r:
476 475 line = sys.stdin.readline()
477 476 if line.lower().startswith('y'):
478 477 self.log.critical("Shutdown confirmed")
479 478 ioloop.IOLoop.instance().stop()
480 479 return
481 480 else:
482 481 print "No answer for 5s:",
483 482 print "resuming operation..."
484 483 # no answer, or answer is no:
485 484 # set it back to original SIGINT handler
486 485 # use IOLoop.add_callback because signal.signal must be called
487 486 # from main thread
488 487 ioloop.IOLoop.instance().add_callback(self._restore_sigint_handler)
489 488
490 489 def _signal_stop(self, sig, frame):
491 490 self.log.critical("received signal %s, stopping", sig)
492 491 ioloop.IOLoop.instance().stop()
493 492
494 493 @catch_config_error
495 494 def initialize(self, argv=None):
495 self.init_logging()
496 496 super(NotebookApp, self).initialize(argv)
497 497 self.init_configurables()
498 498 self.init_webapp()
499 499 self.init_signal()
500 500
501 501 def cleanup_kernels(self):
502 502 """shutdown all kernels
503 503
504 504 The kernels will shutdown themselves when this process no longer exists,
505 505 but explicit shutdown allows the KernelManagers to cleanup the connection files.
506 506 """
507 507 self.log.info('Shutting down kernels')
508 508 km = self.kernel_manager
509 509 # copy list, since kill_kernel deletes keys
510 510 for kid in list(km.kernel_ids):
511 511 km.kill_kernel(kid)
512 512
513 513 def start(self):
514 514 ip = self.ip if self.ip else '[all ip addresses on your system]'
515 515 proto = 'https' if self.certfile else 'http'
516 516 info = self.log.info
517 517 info("The IPython Notebook is running at: %s://%s:%i%s" %
518 518 (proto, ip, self.port,self.base_project_url) )
519 519 info("Use Control-C to stop this server and shut down all kernels.")
520 520
521 521 if self.open_browser:
522 522 ip = self.ip or '127.0.0.1'
523 523 if self.browser:
524 524 browser = webbrowser.get(self.browser)
525 525 else:
526 526 browser = webbrowser.get()
527 527 b = lambda : browser.open("%s://%s:%i%s" % (proto, ip, self.port,
528 528 self.base_project_url),
529 529 new=2)
530 530 threading.Thread(target=b).start()
531 531 try:
532 532 ioloop.IOLoop.instance().start()
533 533 except KeyboardInterrupt:
534 534 info("Interrupted...")
535 535 finally:
536 536 self.cleanup_kernels()
537 537
538 538
539 539 #-----------------------------------------------------------------------------
540 540 # Main entry point
541 541 #-----------------------------------------------------------------------------
542 542
543 543 def launch_new_instance():
544 544 app = NotebookApp.instance()
545 545 app.initialize()
546 546 app.start()
547 547
@@ -1,265 +1,268 b''
1 1 # encoding: utf-8
2 2 """
3 3 The Base Application class for IPython.parallel apps
4 4
5 5 Authors:
6 6
7 7 * Brian Granger
8 8 * Min RK
9 9
10 10 """
11 11
12 12 #-----------------------------------------------------------------------------
13 13 # Copyright (C) 2008-2011 The IPython Development Team
14 14 #
15 15 # Distributed under the terms of the BSD License. The full license is in
16 16 # the file COPYING, distributed as part of this software.
17 17 #-----------------------------------------------------------------------------
18 18
19 19 #-----------------------------------------------------------------------------
20 20 # Imports
21 21 #-----------------------------------------------------------------------------
22 22
23 23 from __future__ import with_statement
24 24
25 25 import os
26 26 import logging
27 27 import re
28 28 import sys
29 29
30 30 from subprocess import Popen, PIPE
31 31
32 32 from IPython.config.application import catch_config_error
33 33 from IPython.core import release
34 34 from IPython.core.crashhandler import CrashHandler
35 35 from IPython.core.application import (
36 36 BaseIPythonApplication,
37 37 base_aliases as base_ip_aliases,
38 38 base_flags as base_ip_flags
39 39 )
40 40 from IPython.utils.path import expand_path
41 41
42 42 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
43 43
44 44 #-----------------------------------------------------------------------------
45 45 # Module errors
46 46 #-----------------------------------------------------------------------------
47 47
48 48 class PIDFileError(Exception):
49 49 pass
50 50
51 51
52 52 #-----------------------------------------------------------------------------
53 53 # Crash handler for this application
54 54 #-----------------------------------------------------------------------------
55 55
56 56 class ParallelCrashHandler(CrashHandler):
57 57 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
58 58
59 59 def __init__(self, app):
60 60 contact_name = release.authors['Min'][0]
61 61 contact_email = release.author_email
62 62 bug_tracker = 'https://github.com/ipython/ipython/issues'
63 63 super(ParallelCrashHandler,self).__init__(
64 64 app, contact_name, contact_email, bug_tracker
65 65 )
66 66
67 67
68 68 #-----------------------------------------------------------------------------
69 69 # Main application
70 70 #-----------------------------------------------------------------------------
71 71 base_aliases = {}
72 72 base_aliases.update(base_ip_aliases)
73 73 base_aliases.update({
74 74 'profile-dir' : 'ProfileDir.location',
75 75 'work-dir' : 'BaseParallelApplication.work_dir',
76 76 'log-to-file' : 'BaseParallelApplication.log_to_file',
77 77 'clean-logs' : 'BaseParallelApplication.clean_logs',
78 78 'log-url' : 'BaseParallelApplication.log_url',
79 79 'cluster-id' : 'BaseParallelApplication.cluster_id',
80 80 })
81 81
82 82 base_flags = {
83 83 'log-to-file' : (
84 84 {'BaseParallelApplication' : {'log_to_file' : True}},
85 85 "send log output to a file"
86 86 )
87 87 }
88 88 base_flags.update(base_ip_flags)
89 89
90 90 class BaseParallelApplication(BaseIPythonApplication):
91 91 """The base Application for IPython.parallel apps
92 92
93 93 Principle extensions to BaseIPyythonApplication:
94 94
95 95 * work_dir
96 96 * remote logging via pyzmq
97 97 * IOLoop instance
98 98 """
99 99
100 100 crash_handler_class = ParallelCrashHandler
101 101
102 102 def _log_level_default(self):
103 103 # temporarily override default_log_level to INFO
104 104 return logging.INFO
105 105
106 106 work_dir = Unicode(os.getcwdu(), config=True,
107 107 help='Set the working dir for the process.'
108 108 )
109 109 def _work_dir_changed(self, name, old, new):
110 110 self.work_dir = unicode(expand_path(new))
111 111
112 112 log_to_file = Bool(config=True,
113 113 help="whether to log to a file")
114 114
115 115 clean_logs = Bool(False, config=True,
116 116 help="whether to cleanup old logfiles before starting")
117 117
118 118 log_url = Unicode('', config=True,
119 119 help="The ZMQ URL of the iplogger to aggregate logging.")
120 120
121 121 cluster_id = Unicode('', config=True,
122 122 help="""String id to add to runtime files, to prevent name collisions when
123 123 using multiple clusters with a single profile simultaneously.
124 124
125 125 When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json'
126 126
127 127 Since this is text inserted into filenames, typical recommendations apply:
128 128 Simple character strings are ideal, and spaces are not recommended (but should
129 129 generally work).
130 130 """
131 131 )
132 132 def _cluster_id_changed(self, name, old, new):
133 133 self.name = self.__class__.name
134 134 if new:
135 135 self.name += '-%s'%new
136 136
137 137 def _config_files_default(self):
138 138 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
139 139
140 140 loop = Instance('zmq.eventloop.ioloop.IOLoop')
141 141 def _loop_default(self):
142 142 from zmq.eventloop.ioloop import IOLoop
143 143 return IOLoop.instance()
144 144
145 145 aliases = Dict(base_aliases)
146 146 flags = Dict(base_flags)
147 147
148 148 @catch_config_error
149 149 def initialize(self, argv=None):
150 150 """initialize the app"""
151 151 super(BaseParallelApplication, self).initialize(argv)
152 152 self.to_work_dir()
153 153 self.reinit_logging()
154 154
155 155 def to_work_dir(self):
156 156 wd = self.work_dir
157 157 if unicode(wd) != os.getcwdu():
158 158 os.chdir(wd)
159 159 self.log.info("Changing to working dir: %s" % wd)
160 160 # This is the working dir by now.
161 161 sys.path.insert(0, '')
162 162
163 163 def reinit_logging(self):
164 164 # Remove old log files
165 165 log_dir = self.profile_dir.log_dir
166 166 if self.clean_logs:
167 167 for f in os.listdir(log_dir):
168 168 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
169 169 os.remove(os.path.join(log_dir, f))
170 170 if self.log_to_file:
171 171 # Start logging to the new log file
172 172 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
173 173 logfile = os.path.join(log_dir, log_filename)
174 174 open_log_file = open(logfile, 'w')
175 175 else:
176 176 open_log_file = None
177 177 if open_log_file is not None:
178 self.log.removeHandler(self._log_handler)
178 while self.log.handlers:
179 self.log.removeHandler(self.log.handlers[0])
179 180 self._log_handler = logging.StreamHandler(open_log_file)
180 181 self.log.addHandler(self._log_handler)
182 else:
183 self._log_handler = self.log.handlers[0]
181 184 # Add timestamps to log format:
182 185 self._log_formatter = logging.Formatter("%(asctime)s.%(msecs).03d [%(name)s] %(message)s",
183 186 datefmt="%Y-%m-%d %H:%M:%S")
184 187 self._log_handler.setFormatter(self._log_formatter)
185 188 # do not propagate log messages to root logger
186 189 # ipcluster app will sometimes print duplicate messages during shutdown
187 190 # if this is 1 (default):
188 191 self.log.propagate = False
189 192
190 193 def write_pid_file(self, overwrite=False):
191 194 """Create a .pid file in the pid_dir with my pid.
192 195
193 196 This must be called after pre_construct, which sets `self.pid_dir`.
194 197 This raises :exc:`PIDFileError` if the pid file exists already.
195 198 """
196 199 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
197 200 if os.path.isfile(pid_file):
198 201 pid = self.get_pid_from_file()
199 202 if not overwrite:
200 203 raise PIDFileError(
201 204 'The pid file [%s] already exists. \nThis could mean that this '
202 205 'server is already running with [pid=%s].' % (pid_file, pid)
203 206 )
204 207 with open(pid_file, 'w') as f:
205 208 self.log.info("Creating pid file: %s" % pid_file)
206 209 f.write(repr(os.getpid())+'\n')
207 210
208 211 def remove_pid_file(self):
209 212 """Remove the pid file.
210 213
211 214 This should be called at shutdown by registering a callback with
212 215 :func:`reactor.addSystemEventTrigger`. This needs to return
213 216 ``None``.
214 217 """
215 218 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
216 219 if os.path.isfile(pid_file):
217 220 try:
218 221 self.log.info("Removing pid file: %s" % pid_file)
219 222 os.remove(pid_file)
220 223 except:
221 224 self.log.warn("Error removing the pid file: %s" % pid_file)
222 225
223 226 def get_pid_from_file(self):
224 227 """Get the pid from the pid file.
225 228
226 229 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
227 230 """
228 231 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
229 232 if os.path.isfile(pid_file):
230 233 with open(pid_file, 'r') as f:
231 234 s = f.read().strip()
232 235 try:
233 236 pid = int(s)
234 237 except:
235 238 raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
236 239 return pid
237 240 else:
238 241 raise PIDFileError('pid file not found: %s' % pid_file)
239 242
240 243 def check_pid(self, pid):
241 244 if os.name == 'nt':
242 245 try:
243 246 import ctypes
244 247 # returns 0 if no such process (of ours) exists
245 248 # positive int otherwise
246 249 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
247 250 except Exception:
248 251 self.log.warn(
249 252 "Could not determine whether pid %i is running via `OpenProcess`. "
250 253 " Making the likely assumption that it is."%pid
251 254 )
252 255 return True
253 256 return bool(p)
254 257 else:
255 258 try:
256 259 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
257 260 output,_ = p.communicate()
258 261 except OSError:
259 262 self.log.warn(
260 263 "Could not determine whether pid %i is running via `ps x`. "
261 264 " Making the likely assumption that it is."%pid
262 265 )
263 266 return True
264 267 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
265 268 return pid in pids
@@ -1,493 +1,491 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython controller application.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 from __future__ import with_statement
25 25
26 26 import json
27 27 import os
28 28 import socket
29 29 import stat
30 30 import sys
31 31
32 32 from multiprocessing import Process
33 33 from signal import signal, SIGINT, SIGABRT, SIGTERM
34 34
35 35 import zmq
36 36 from zmq.devices import ProcessMonitoredQueue
37 37 from zmq.log.handlers import PUBHandler
38 38
39 39 from IPython.core.profiledir import ProfileDir
40 40
41 41 from IPython.parallel.apps.baseapp import (
42 42 BaseParallelApplication,
43 43 base_aliases,
44 44 base_flags,
45 45 catch_config_error,
46 46 )
47 47 from IPython.utils.importstring import import_item
48 48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict, TraitError
49 49
50 50 from IPython.zmq.session import (
51 51 Session, session_aliases, session_flags, default_secure
52 52 )
53 53
54 54 from IPython.parallel.controller.heartmonitor import HeartMonitor
55 55 from IPython.parallel.controller.hub import HubFactory
56 56 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
57 57 from IPython.parallel.controller.sqlitedb import SQLiteDB
58 58
59 59 from IPython.parallel.util import split_url, disambiguate_url
60 60
61 61 # conditional import of MongoDB backend class
62 62
63 63 try:
64 64 from IPython.parallel.controller.mongodb import MongoDB
65 65 except ImportError:
66 66 maybe_mongo = []
67 67 else:
68 68 maybe_mongo = [MongoDB]
69 69
70 70
71 71 #-----------------------------------------------------------------------------
72 72 # Module level variables
73 73 #-----------------------------------------------------------------------------
74 74
75 75
76 76 #: The default config file name for this application
77 77 default_config_file_name = u'ipcontroller_config.py'
78 78
79 79
80 80 _description = """Start the IPython controller for parallel computing.
81 81
82 82 The IPython controller provides a gateway between the IPython engines and
83 83 clients. The controller needs to be started before the engines and can be
84 84 configured using command line options or using a cluster directory. Cluster
85 85 directories contain config, log and security files and are usually located in
86 86 your ipython directory and named as "profile_name". See the `profile`
87 87 and `profile-dir` options for details.
88 88 """
89 89
90 90 _examples = """
91 91 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
92 92 ipcontroller --scheme=pure # use the pure zeromq scheduler
93 93 """
94 94
95 95
96 96 #-----------------------------------------------------------------------------
97 97 # The main application
98 98 #-----------------------------------------------------------------------------
99 99 flags = {}
100 100 flags.update(base_flags)
101 101 flags.update({
102 102 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
103 103 'Use threads instead of processes for the schedulers'),
104 104 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
105 105 'use the SQLiteDB backend'),
106 106 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
107 107 'use the MongoDB backend'),
108 108 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
109 109 'use the in-memory DictDB backend'),
110 110 'nodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.NoDB'}},
111 111 """use dummy DB backend, which doesn't store any information.
112 112
113 113 This can be used to prevent growth of the memory footprint of the Hub
114 114 in cases where its record-keeping is not required. Requesting results
115 115 of tasks submitted by other clients, db_queries, and task resubmission
116 116 will not be available."""),
117 117 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
118 118 'reuse existing json connection files')
119 119 })
120 120
121 121 flags.update(session_flags)
122 122
123 123 aliases = dict(
124 124 ssh = 'IPControllerApp.ssh_server',
125 125 enginessh = 'IPControllerApp.engine_ssh_server',
126 126 location = 'IPControllerApp.location',
127 127
128 128 url = 'HubFactory.url',
129 129 ip = 'HubFactory.ip',
130 130 transport = 'HubFactory.transport',
131 131 port = 'HubFactory.regport',
132 132
133 133 ping = 'HeartMonitor.period',
134 134
135 135 scheme = 'TaskScheduler.scheme_name',
136 136 hwm = 'TaskScheduler.hwm',
137 137 )
138 138 aliases.update(base_aliases)
139 139 aliases.update(session_aliases)
140 140
141 141
142 142 class IPControllerApp(BaseParallelApplication):
143 143
144 144 name = u'ipcontroller'
145 145 description = _description
146 146 examples = _examples
147 147 config_file_name = Unicode(default_config_file_name)
148 148 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
149 149
150 150 # change default to True
151 151 auto_create = Bool(True, config=True,
152 152 help="""Whether to create profile dir if it doesn't exist.""")
153 153
154 154 reuse_files = Bool(False, config=True,
155 155 help="""Whether to reuse existing json connection files.
156 156 If False, connection files will be removed on a clean exit.
157 157 """
158 158 )
159 159 ssh_server = Unicode(u'', config=True,
160 160 help="""ssh url for clients to use when connecting to the Controller
161 161 processes. It should be of the form: [user@]server[:port]. The
162 162 Controller's listening addresses must be accessible from the ssh server""",
163 163 )
164 164 engine_ssh_server = Unicode(u'', config=True,
165 165 help="""ssh url for engines to use when connecting to the Controller
166 166 processes. It should be of the form: [user@]server[:port]. The
167 167 Controller's listening addresses must be accessible from the ssh server""",
168 168 )
169 169 location = Unicode(u'', config=True,
170 170 help="""The external IP or domain name of the Controller, used for disambiguating
171 171 engine and client connections.""",
172 172 )
173 173 import_statements = List([], config=True,
174 174 help="import statements to be run at startup. Necessary in some environments"
175 175 )
176 176
177 177 use_threads = Bool(False, config=True,
178 178 help='Use threads instead of processes for the schedulers',
179 179 )
180 180
181 181 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
182 182 help="JSON filename where engine connection info will be stored.")
183 183 client_json_file = Unicode('ipcontroller-client.json', config=True,
184 184 help="JSON filename where client connection info will be stored.")
185 185
186 186 def _cluster_id_changed(self, name, old, new):
187 187 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
188 188 self.engine_json_file = "%s-engine.json" % self.name
189 189 self.client_json_file = "%s-client.json" % self.name
190 190
191 191
192 192 # internal
193 193 children = List()
194 194 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
195 195
196 196 def _use_threads_changed(self, name, old, new):
197 197 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
198 198
199 199 write_connection_files = Bool(True,
200 200 help="""Whether to write connection files to disk.
201 201 True in all cases other than runs with `reuse_files=True` *after the first*
202 202 """
203 203 )
204 204
205 205 aliases = Dict(aliases)
206 206 flags = Dict(flags)
207 207
208 208
209 209 def save_connection_dict(self, fname, cdict):
210 210 """save a connection dict to json file."""
211 211 c = self.config
212 212 url = cdict['url']
213 213 location = cdict['location']
214 214 if not location:
215 215 try:
216 216 proto,ip,port = split_url(url)
217 217 except AssertionError:
218 218 pass
219 219 else:
220 220 try:
221 221 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
222 222 except (socket.gaierror, IndexError):
223 223 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
224 224 " You may need to specify '--location=<external_ip_address>' to help"
225 225 " IPython decide when to connect via loopback.")
226 226 location = '127.0.0.1'
227 227 cdict['location'] = location
228 228 fname = os.path.join(self.profile_dir.security_dir, fname)
229 229 self.log.info("writing connection info to %s", fname)
230 230 with open(fname, 'w') as f:
231 231 f.write(json.dumps(cdict, indent=2))
232 232 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
233 233
234 234 def load_config_from_json(self):
235 235 """load config from existing json connector files."""
236 236 c = self.config
237 237 self.log.debug("loading config from JSON")
238 238 # load from engine config
239 239 fname = os.path.join(self.profile_dir.security_dir, self.engine_json_file)
240 240 self.log.info("loading connection info from %s", fname)
241 241 with open(fname) as f:
242 242 cfg = json.loads(f.read())
243 243 key = cfg['exec_key']
244 244 # json gives unicode, Session.key wants bytes
245 245 c.Session.key = key.encode('ascii')
246 246 xport,addr = cfg['url'].split('://')
247 247 c.HubFactory.engine_transport = xport
248 248 ip,ports = addr.split(':')
249 249 c.HubFactory.engine_ip = ip
250 250 c.HubFactory.regport = int(ports)
251 251 self.location = cfg['location']
252 252 if not self.engine_ssh_server:
253 253 self.engine_ssh_server = cfg['ssh']
254 254 # load client config
255 255 fname = os.path.join(self.profile_dir.security_dir, self.client_json_file)
256 256 self.log.info("loading connection info from %s", fname)
257 257 with open(fname) as f:
258 258 cfg = json.loads(f.read())
259 259 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
260 260 xport,addr = cfg['url'].split('://')
261 261 c.HubFactory.client_transport = xport
262 262 ip,ports = addr.split(':')
263 263 c.HubFactory.client_ip = ip
264 264 if not self.ssh_server:
265 265 self.ssh_server = cfg['ssh']
266 266 assert int(ports) == c.HubFactory.regport, "regport mismatch"
267 267
268 268 def cleanup_connection_files(self):
269 269 if self.reuse_files:
270 270 self.log.debug("leaving JSON connection files for reuse")
271 271 return
272 272 self.log.debug("cleaning up JSON connection files")
273 273 for f in (self.client_json_file, self.engine_json_file):
274 274 f = os.path.join(self.profile_dir.security_dir, f)
275 275 try:
276 276 os.remove(f)
277 277 except Exception as e:
278 278 self.log.error("Failed to cleanup connection file: %s", e)
279 279 else:
280 280 self.log.debug(u"removed %s", f)
281 281
282 282 def load_secondary_config(self):
283 283 """secondary config, loading from JSON and setting defaults"""
284 284 if self.reuse_files:
285 285 try:
286 286 self.load_config_from_json()
287 287 except (AssertionError,IOError) as e:
288 288 self.log.error("Could not load config from JSON: %s" % e)
289 289 else:
290 290 # successfully loaded config from JSON, and reuse=True
291 291 # no need to wite back the same file
292 292 self.write_connection_files = False
293 293
294 294 # switch Session.key default to secure
295 295 default_secure(self.config)
296 296 self.log.debug("Config changed")
297 297 self.log.debug(repr(self.config))
298 298
299 299 def init_hub(self):
300 300 c = self.config
301 301
302 302 self.do_import_statements()
303 303
304 304 try:
305 305 self.factory = HubFactory(config=c, log=self.log)
306 306 # self.start_logging()
307 307 self.factory.init_hub()
308 308 except TraitError:
309 309 raise
310 310 except Exception:
311 311 self.log.error("Couldn't construct the Controller", exc_info=True)
312 312 self.exit(1)
313 313
314 314 if self.write_connection_files:
315 315 # save to new json config files
316 316 f = self.factory
317 317 cdict = {'exec_key' : f.session.key.decode('ascii'),
318 318 'ssh' : self.ssh_server,
319 319 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
320 320 'location' : self.location
321 321 }
322 322 self.save_connection_dict(self.client_json_file, cdict)
323 323 edict = cdict
324 324 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
325 325 edict['ssh'] = self.engine_ssh_server
326 326 self.save_connection_dict(self.engine_json_file, edict)
327 327
328 328 def init_schedulers(self):
329 329 children = self.children
330 330 mq = import_item(str(self.mq_class))
331 331
332 332 hub = self.factory
333 333 # disambiguate url, in case of *
334 334 monitor_url = disambiguate_url(hub.monitor_url)
335 335 # maybe_inproc = 'inproc://monitor' if self.use_threads else monitor_url
336 336 # IOPub relay (in a Process)
337 337 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
338 338 q.bind_in(hub.client_info['iopub'])
339 339 q.bind_out(hub.engine_info['iopub'])
340 340 q.setsockopt_out(zmq.SUBSCRIBE, b'')
341 341 q.connect_mon(monitor_url)
342 342 q.daemon=True
343 343 children.append(q)
344 344
345 345 # Multiplexer Queue (in a Process)
346 346 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
347 347 q.bind_in(hub.client_info['mux'])
348 348 q.setsockopt_in(zmq.IDENTITY, b'mux')
349 349 q.bind_out(hub.engine_info['mux'])
350 350 q.connect_mon(monitor_url)
351 351 q.daemon=True
352 352 children.append(q)
353 353
354 354 # Control Queue (in a Process)
355 355 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
356 356 q.bind_in(hub.client_info['control'])
357 357 q.setsockopt_in(zmq.IDENTITY, b'control')
358 358 q.bind_out(hub.engine_info['control'])
359 359 q.connect_mon(monitor_url)
360 360 q.daemon=True
361 361 children.append(q)
362 362 try:
363 363 scheme = self.config.TaskScheduler.scheme_name
364 364 except AttributeError:
365 365 scheme = TaskScheduler.scheme_name.get_default_value()
366 366 # Task Queue (in a Process)
367 367 if scheme == 'pure':
368 368 self.log.warn("task::using pure XREQ Task scheduler")
369 369 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
370 370 # q.setsockopt_out(zmq.HWM, hub.hwm)
371 371 q.bind_in(hub.client_info['task'][1])
372 372 q.setsockopt_in(zmq.IDENTITY, b'task')
373 373 q.bind_out(hub.engine_info['task'])
374 374 q.connect_mon(monitor_url)
375 375 q.daemon=True
376 376 children.append(q)
377 377 elif scheme == 'none':
378 378 self.log.warn("task::using no Task scheduler")
379 379
380 380 else:
381 381 self.log.info("task::using Python %s Task scheduler"%scheme)
382 382 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
383 383 monitor_url, disambiguate_url(hub.client_info['notification']))
384 384 kwargs = dict(logname='scheduler', loglevel=self.log_level,
385 385 log_url = self.log_url, config=dict(self.config))
386 386 if 'Process' in self.mq_class:
387 387 # run the Python scheduler in a Process
388 388 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
389 389 q.daemon=True
390 390 children.append(q)
391 391 else:
392 392 # single-threaded Controller
393 393 kwargs['in_thread'] = True
394 394 launch_scheduler(*sargs, **kwargs)
395 395
396 396 def terminate_children(self):
397 397 child_procs = []
398 398 for child in self.children:
399 399 if isinstance(child, ProcessMonitoredQueue):
400 400 child_procs.append(child.launcher)
401 401 elif isinstance(child, Process):
402 402 child_procs.append(child)
403 403 if child_procs:
404 404 self.log.critical("terminating children...")
405 405 for child in child_procs:
406 406 try:
407 407 child.terminate()
408 408 except OSError:
409 409 # already dead
410 410 pass
411 411
412 412 def handle_signal(self, sig, frame):
413 413 self.log.critical("Received signal %i, shutting down", sig)
414 414 self.terminate_children()
415 415 self.loop.stop()
416 416
417 417 def init_signal(self):
418 418 for sig in (SIGINT, SIGABRT, SIGTERM):
419 419 signal(sig, self.handle_signal)
420 420
421 421 def do_import_statements(self):
422 422 statements = self.import_statements
423 423 for s in statements:
424 424 try:
425 425 self.log.msg("Executing statement: '%s'" % s)
426 426 exec s in globals(), locals()
427 427 except:
428 428 self.log.msg("Error running statement: %s" % s)
429 429
430 430 def forward_logging(self):
431 431 if self.log_url:
432 432 self.log.info("Forwarding logging to %s"%self.log_url)
433 433 context = zmq.Context.instance()
434 434 lsock = context.socket(zmq.PUB)
435 435 lsock.connect(self.log_url)
436 436 handler = PUBHandler(lsock)
437 self.log.removeHandler(self._log_handler)
438 437 handler.root_topic = 'controller'
439 438 handler.setLevel(self.log_level)
440 439 self.log.addHandler(handler)
441 self._log_handler = handler
442 440
443 441 @catch_config_error
444 442 def initialize(self, argv=None):
445 443 super(IPControllerApp, self).initialize(argv)
446 444 self.forward_logging()
447 445 self.load_secondary_config()
448 446 self.init_hub()
449 447 self.init_schedulers()
450 448
451 449 def start(self):
452 450 # Start the subprocesses:
453 451 self.factory.start()
454 452 # children must be started before signals are setup,
455 453 # otherwise signal-handling will fire multiple times
456 454 for child in self.children:
457 455 child.start()
458 456 self.init_signal()
459 457
460 458 self.write_pid_file(overwrite=True)
461 459
462 460 try:
463 461 self.factory.loop.start()
464 462 except KeyboardInterrupt:
465 463 self.log.critical("Interrupted, Exiting...\n")
466 464 finally:
467 465 self.cleanup_connection_files()
468 466
469 467
470 468
471 469 def launch_new_instance():
472 470 """Create and run the IPython controller"""
473 471 if sys.platform == 'win32':
474 472 # make sure we don't get called from a multiprocessing subprocess
475 473 # this can result in infinite Controllers being started on Windows
476 474 # which doesn't have a proper fork, so multiprocessing is wonky
477 475
478 476 # this only comes up when IPython has been installed using vanilla
479 477 # setuptools, and *not* distribute.
480 478 import multiprocessing
481 479 p = multiprocessing.current_process()
482 480 # the main process has name 'MainProcess'
483 481 # subprocesses will have names like 'Process-1'
484 482 if p.name != 'MainProcess':
485 483 # we are a subprocess, don't start another Controller!
486 484 return
487 485 app = IPControllerApp.instance()
488 486 app.initialize()
489 487 app.start()
490 488
491 489
492 490 if __name__ == '__main__':
493 491 launch_new_instance()
@@ -1,336 +1,334 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The IPython engine application
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 import json
25 25 import os
26 26 import sys
27 27 import time
28 28
29 29 import zmq
30 30 from zmq.eventloop import ioloop
31 31
32 32 from IPython.core.profiledir import ProfileDir
33 33 from IPython.parallel.apps.baseapp import (
34 34 BaseParallelApplication,
35 35 base_aliases,
36 36 base_flags,
37 37 catch_config_error,
38 38 )
39 39 from IPython.zmq.log import EnginePUBHandler
40 40 from IPython.zmq.ipkernel import Kernel
41 41 from IPython.zmq.session import (
42 42 Session, session_aliases, session_flags
43 43 )
44 44
45 45 from IPython.config.configurable import Configurable
46 46
47 47 from IPython.parallel.engine.engine import EngineFactory
48 48 from IPython.parallel.util import disambiguate_url
49 49
50 50 from IPython.utils.importstring import import_item
51 51 from IPython.utils.py3compat import cast_bytes
52 52 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
53 53
54 54
55 55 #-----------------------------------------------------------------------------
56 56 # Module level variables
57 57 #-----------------------------------------------------------------------------
58 58
59 59 #: The default config file name for this application
60 60 default_config_file_name = u'ipengine_config.py'
61 61
62 62 _description = """Start an IPython engine for parallel computing.
63 63
64 64 IPython engines run in parallel and perform computations on behalf of a client
65 65 and controller. A controller needs to be started before the engines. The
66 66 engine can be configured using command line options or using a cluster
67 67 directory. Cluster directories contain config, log and security files and are
68 68 usually located in your ipython directory and named as "profile_name".
69 69 See the `profile` and `profile-dir` options for details.
70 70 """
71 71
72 72 _examples = """
73 73 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
74 74 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
75 75 """
76 76
77 77 #-----------------------------------------------------------------------------
78 78 # MPI configuration
79 79 #-----------------------------------------------------------------------------
80 80
81 81 mpi4py_init = """from mpi4py import MPI as mpi
82 82 mpi.size = mpi.COMM_WORLD.Get_size()
83 83 mpi.rank = mpi.COMM_WORLD.Get_rank()
84 84 """
85 85
86 86
87 87 pytrilinos_init = """from PyTrilinos import Epetra
88 88 class SimpleStruct:
89 89 pass
90 90 mpi = SimpleStruct()
91 91 mpi.rank = 0
92 92 mpi.size = 0
93 93 """
94 94
95 95 class MPI(Configurable):
96 96 """Configurable for MPI initialization"""
97 97 use = Unicode('', config=True,
98 98 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
99 99 )
100 100
101 101 def _use_changed(self, name, old, new):
102 102 # load default init script if it's not set
103 103 if not self.init_script:
104 104 self.init_script = self.default_inits.get(new, '')
105 105
106 106 init_script = Unicode('', config=True,
107 107 help="Initialization code for MPI")
108 108
109 109 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
110 110 config=True)
111 111
112 112
113 113 #-----------------------------------------------------------------------------
114 114 # Main application
115 115 #-----------------------------------------------------------------------------
116 116 aliases = dict(
117 117 file = 'IPEngineApp.url_file',
118 118 c = 'IPEngineApp.startup_command',
119 119 s = 'IPEngineApp.startup_script',
120 120
121 121 url = 'EngineFactory.url',
122 122 ssh = 'EngineFactory.sshserver',
123 123 sshkey = 'EngineFactory.sshkey',
124 124 ip = 'EngineFactory.ip',
125 125 transport = 'EngineFactory.transport',
126 126 port = 'EngineFactory.regport',
127 127 location = 'EngineFactory.location',
128 128
129 129 timeout = 'EngineFactory.timeout',
130 130
131 131 mpi = 'MPI.use',
132 132
133 133 )
134 134 aliases.update(base_aliases)
135 135 aliases.update(session_aliases)
136 136 flags = {}
137 137 flags.update(base_flags)
138 138 flags.update(session_flags)
139 139
140 140 class IPEngineApp(BaseParallelApplication):
141 141
142 142 name = 'ipengine'
143 143 description = _description
144 144 examples = _examples
145 145 config_file_name = Unicode(default_config_file_name)
146 146 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
147 147
148 148 startup_script = Unicode(u'', config=True,
149 149 help='specify a script to be run at startup')
150 150 startup_command = Unicode('', config=True,
151 151 help='specify a command to be run at startup')
152 152
153 153 url_file = Unicode(u'', config=True,
154 154 help="""The full location of the file containing the connection information for
155 155 the controller. If this is not given, the file must be in the
156 156 security directory of the cluster directory. This location is
157 157 resolved using the `profile` or `profile_dir` options.""",
158 158 )
159 159 wait_for_url_file = Float(5, config=True,
160 160 help="""The maximum number of seconds to wait for url_file to exist.
161 161 This is useful for batch-systems and shared-filesystems where the
162 162 controller and engine are started at the same time and it
163 163 may take a moment for the controller to write the connector files.""")
164 164
165 165 url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
166 166
167 167 def _cluster_id_changed(self, name, old, new):
168 168 if new:
169 169 base = 'ipcontroller-%s' % new
170 170 else:
171 171 base = 'ipcontroller'
172 172 self.url_file_name = "%s-engine.json" % base
173 173
174 174 log_url = Unicode('', config=True,
175 175 help="""The URL for the iploggerapp instance, for forwarding
176 176 logging to a central location.""")
177 177
178 178 aliases = Dict(aliases)
179 179 flags = Dict(flags)
180 180
181 181 @property
182 182 def kernel(self):
183 183 """allow access to the Kernel object, so I look like IPKernelApp"""
184 184 return self.engine.kernel
185 185
186 186 def find_url_file(self):
187 187 """Set the url file.
188 188
189 189 Here we don't try to actually see if it exists for is valid as that
190 190 is hadled by the connection logic.
191 191 """
192 192 config = self.config
193 193 # Find the actual controller key file
194 194 if not self.url_file:
195 195 self.url_file = os.path.join(
196 196 self.profile_dir.security_dir,
197 197 self.url_file_name
198 198 )
199 199
200 200 def load_connector_file(self):
201 201 """load config from a JSON connector file,
202 202 at a *lower* priority than command-line/config files.
203 203 """
204 204
205 205 self.log.info("Loading url_file %r", self.url_file)
206 206 config = self.config
207 207
208 208 with open(self.url_file) as f:
209 209 d = json.loads(f.read())
210 210
211 211 if 'exec_key' in d:
212 212 config.Session.key = cast_bytes(d['exec_key'])
213 213
214 214 try:
215 215 config.EngineFactory.location
216 216 except AttributeError:
217 217 config.EngineFactory.location = d['location']
218 218
219 219 d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
220 220 try:
221 221 config.EngineFactory.url
222 222 except AttributeError:
223 223 config.EngineFactory.url = d['url']
224 224
225 225 try:
226 226 config.EngineFactory.sshserver
227 227 except AttributeError:
228 228 config.EngineFactory.sshserver = d['ssh']
229 229
230 230 def init_engine(self):
231 231 # This is the working dir by now.
232 232 sys.path.insert(0, '')
233 233 config = self.config
234 234 # print config
235 235 self.find_url_file()
236 236
237 237 # was the url manually specified?
238 238 keys = set(self.config.EngineFactory.keys())
239 239 keys = keys.union(set(self.config.RegistrationFactory.keys()))
240 240
241 241 if keys.intersection(set(['ip', 'url', 'port'])):
242 242 # Connection info was specified, don't wait for the file
243 243 url_specified = True
244 244 self.wait_for_url_file = 0
245 245 else:
246 246 url_specified = False
247 247
248 248 if self.wait_for_url_file and not os.path.exists(self.url_file):
249 249 self.log.warn("url_file %r not found", self.url_file)
250 250 self.log.warn("Waiting up to %.1f seconds for it to arrive.", self.wait_for_url_file)
251 251 tic = time.time()
252 252 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
253 253 # wait for url_file to exist, or until time limit
254 254 time.sleep(0.1)
255 255
256 256 if os.path.exists(self.url_file):
257 257 self.load_connector_file()
258 258 elif not url_specified:
259 259 self.log.fatal("Fatal: url file never arrived: %s", self.url_file)
260 260 self.exit(1)
261 261
262 262
263 263 try:
264 264 exec_lines = config.Kernel.exec_lines
265 265 except AttributeError:
266 266 config.Kernel.exec_lines = []
267 267 exec_lines = config.Kernel.exec_lines
268 268
269 269 if self.startup_script:
270 270 enc = sys.getfilesystemencoding() or 'utf8'
271 271 cmd="execfile(%r)" % self.startup_script.encode(enc)
272 272 exec_lines.append(cmd)
273 273 if self.startup_command:
274 274 exec_lines.append(self.startup_command)
275 275
276 276 # Create the underlying shell class and Engine
277 277 # shell_class = import_item(self.master_config.Global.shell_class)
278 278 # print self.config
279 279 try:
280 280 self.engine = EngineFactory(config=config, log=self.log)
281 281 except:
282 282 self.log.error("Couldn't start the Engine", exc_info=True)
283 283 self.exit(1)
284 284
285 285 def forward_logging(self):
286 286 if self.log_url:
287 287 self.log.info("Forwarding logging to %s", self.log_url)
288 288 context = self.engine.context
289 289 lsock = context.socket(zmq.PUB)
290 290 lsock.connect(self.log_url)
291 self.log.removeHandler(self._log_handler)
292 291 handler = EnginePUBHandler(self.engine, lsock)
293 292 handler.setLevel(self.log_level)
294 293 self.log.addHandler(handler)
295 self._log_handler = handler
296 294
297 295 def init_mpi(self):
298 296 global mpi
299 297 self.mpi = MPI(config=self.config)
300 298
301 299 mpi_import_statement = self.mpi.init_script
302 300 if mpi_import_statement:
303 301 try:
304 302 self.log.info("Initializing MPI:")
305 303 self.log.info(mpi_import_statement)
306 304 exec mpi_import_statement in globals()
307 305 except:
308 306 mpi = None
309 307 else:
310 308 mpi = None
311 309
312 310 @catch_config_error
313 311 def initialize(self, argv=None):
314 312 super(IPEngineApp, self).initialize(argv)
315 313 self.init_mpi()
316 314 self.init_engine()
317 315 self.forward_logging()
318 316
319 317 def start(self):
320 318 self.engine.start()
321 319 try:
322 320 self.engine.loop.start()
323 321 except KeyboardInterrupt:
324 322 self.log.critical("Engine Interrupted, shutting down...\n")
325 323
326 324
327 325 def launch_new_instance():
328 326 """Create and run the IPython engine"""
329 327 app = IPEngineApp.instance()
330 328 app.initialize()
331 329 app.start()
332 330
333 331
334 332 if __name__ == '__main__':
335 333 launch_new_instance()
336 334
General Comments 0
You need to be logged in to leave comments. Login now