##// END OF EJS Templates
Notebook cluster manager now uses proper launchers.
Brian Granger -
Show More
@@ -1,678 +1,702 b''
1 1 """A simple configuration system.
2 2
3 3 Authors
4 4 -------
5 5 * Brian Granger
6 6 * Fernando Perez
7 7 * Min RK
8 8 """
9 9
10 10 #-----------------------------------------------------------------------------
11 11 # Copyright (C) 2008-2011 The IPython Development Team
12 12 #
13 13 # Distributed under the terms of the BSD License. The full license is in
14 14 # the file COPYING, distributed as part of this software.
15 15 #-----------------------------------------------------------------------------
16 16
17 17 #-----------------------------------------------------------------------------
18 18 # Imports
19 19 #-----------------------------------------------------------------------------
20 20
21 21 import __builtin__ as builtin_mod
22 22 import os
23 23 import re
24 24 import sys
25 25
26 26 from IPython.external import argparse
27 27 from IPython.utils.path import filefind, get_ipython_dir
28 28 from IPython.utils import py3compat, text, warn
29 29
30 30 #-----------------------------------------------------------------------------
31 31 # Exceptions
32 32 #-----------------------------------------------------------------------------
33 33
34 34
35 35 class ConfigError(Exception):
36 36 pass
37 37
38 38 class ConfigLoaderError(ConfigError):
39 39 pass
40 40
41 41 class ConfigFileNotFound(ConfigError):
42 42 pass
43 43
44 44 class ArgumentError(ConfigLoaderError):
45 45 pass
46 46
47 47 #-----------------------------------------------------------------------------
48 48 # Argparse fix
49 49 #-----------------------------------------------------------------------------
50 50
51 51 # Unfortunately argparse by default prints help messages to stderr instead of
52 52 # stdout. This makes it annoying to capture long help screens at the command
53 53 # line, since one must know how to pipe stderr, which many users don't know how
54 54 # to do. So we override the print_help method with one that defaults to
55 55 # stdout and use our class instead.
56 56
57 57 class ArgumentParser(argparse.ArgumentParser):
58 58 """Simple argparse subclass that prints help to stdout by default."""
59 59
60 60 def print_help(self, file=None):
61 61 if file is None:
62 62 file = sys.stdout
63 63 return super(ArgumentParser, self).print_help(file)
64 64
65 65 print_help.__doc__ = argparse.ArgumentParser.print_help.__doc__
66 66
67 67 #-----------------------------------------------------------------------------
68 68 # Config class for holding config information
69 69 #-----------------------------------------------------------------------------
70 70
71 71
72 72 class Config(dict):
73 73 """An attribute based dict that can do smart merges."""
74 74
75 75 def __init__(self, *args, **kwds):
76 76 dict.__init__(self, *args, **kwds)
77 77 # This sets self.__dict__ = self, but it has to be done this way
78 78 # because we are also overriding __setattr__.
79 79 dict.__setattr__(self, '__dict__', self)
80 80
81 81 def _merge(self, other):
82 82 to_update = {}
83 83 for k, v in other.iteritems():
84 84 if not self.has_key(k):
85 85 to_update[k] = v
86 86 else: # I have this key
87 87 if isinstance(v, Config):
88 88 # Recursively merge common sub Configs
89 89 self[k]._merge(v)
90 90 else:
91 91 # Plain updates for non-Configs
92 92 to_update[k] = v
93 93
94 94 self.update(to_update)
95 95
96 96 def _is_section_key(self, key):
97 97 if key[0].upper()==key[0] and not key.startswith('_'):
98 98 return True
99 99 else:
100 100 return False
101 101
102 102 def __contains__(self, key):
103 103 if self._is_section_key(key):
104 104 return True
105 105 else:
106 106 return super(Config, self).__contains__(key)
107 107 # .has_key is deprecated for dictionaries.
108 108 has_key = __contains__
109 109
110 110 def _has_section(self, key):
111 111 if self._is_section_key(key):
112 112 if super(Config, self).__contains__(key):
113 113 return True
114 114 return False
115 115
116 116 def copy(self):
117 117 return type(self)(dict.copy(self))
118 118
119 119 def __copy__(self):
120 120 return self.copy()
121 121
122 122 def __deepcopy__(self, memo):
123 123 import copy
124 124 return type(self)(copy.deepcopy(self.items()))
125 125
126 126 def __getitem__(self, key):
127 127 # We cannot use directly self._is_section_key, because it triggers
128 128 # infinite recursion on top of PyPy. Instead, we manually fish the
129 129 # bound method.
130 130 is_section_key = self.__class__._is_section_key.__get__(self)
131 131
132 132 # Because we use this for an exec namespace, we need to delegate
133 133 # the lookup of names in __builtin__ to itself. This means
134 134 # that you can't have section or attribute names that are
135 135 # builtins.
136 136 try:
137 137 return getattr(builtin_mod, key)
138 138 except AttributeError:
139 139 pass
140 140 if is_section_key(key):
141 141 try:
142 142 return dict.__getitem__(self, key)
143 143 except KeyError:
144 144 c = Config()
145 145 dict.__setitem__(self, key, c)
146 146 return c
147 147 else:
148 148 return dict.__getitem__(self, key)
149 149
150 150 def __setitem__(self, key, value):
151 151 # Don't allow names in __builtin__ to be modified.
152 152 if hasattr(builtin_mod, key):
153 153 raise ConfigError('Config variable names cannot have the same name '
154 154 'as a Python builtin: %s' % key)
155 155 if self._is_section_key(key):
156 156 if not isinstance(value, Config):
157 157 raise ValueError('values whose keys begin with an uppercase '
158 158 'char must be Config instances: %r, %r' % (key, value))
159 159 else:
160 160 dict.__setitem__(self, key, value)
161 161
162 162 def __getattr__(self, key):
163 163 try:
164 164 return self.__getitem__(key)
165 165 except KeyError, e:
166 166 raise AttributeError(e)
167 167
168 168 def __setattr__(self, key, value):
169 169 try:
170 170 self.__setitem__(key, value)
171 171 except KeyError, e:
172 172 raise AttributeError(e)
173 173
174 174 def __delattr__(self, key):
175 175 try:
176 176 dict.__delitem__(self, key)
177 177 except KeyError, e:
178 178 raise AttributeError(e)
179 179
180 180
181 181 #-----------------------------------------------------------------------------
182 182 # Config loading classes
183 183 #-----------------------------------------------------------------------------
184 184
185 185
186 186 class ConfigLoader(object):
187 187 """A object for loading configurations from just about anywhere.
188 188
189 189 The resulting configuration is packaged as a :class:`Struct`.
190 190
191 191 Notes
192 192 -----
193 193 A :class:`ConfigLoader` does one thing: load a config from a source
194 194 (file, command line arguments) and returns the data as a :class:`Struct`.
195 195 There are lots of things that :class:`ConfigLoader` does not do. It does
196 196 not implement complex logic for finding config files. It does not handle
197 197 default values or merge multiple configs. These things need to be
198 198 handled elsewhere.
199 199 """
200 200
201 201 def __init__(self):
202 202 """A base class for config loaders.
203 203
204 204 Examples
205 205 --------
206 206
207 207 >>> cl = ConfigLoader()
208 208 >>> config = cl.load_config()
209 209 >>> config
210 210 {}
211 211 """
212 212 self.clear()
213 213
214 214 def clear(self):
215 215 self.config = Config()
216 216
217 217 def load_config(self):
218 218 """Load a config from somewhere, return a :class:`Config` instance.
219 219
220 220 Usually, this will cause self.config to be set and then returned.
221 221 However, in most cases, :meth:`ConfigLoader.clear` should be called
222 222 to erase any previous state.
223 223 """
224 224 self.clear()
225 225 return self.config
226 226
227 227
228 228 class FileConfigLoader(ConfigLoader):
229 229 """A base class for file based configurations.
230 230
231 231 As we add more file based config loaders, the common logic should go
232 232 here.
233 233 """
234 234 pass
235 235
236 236
237 237 class PyFileConfigLoader(FileConfigLoader):
238 238 """A config loader for pure python files.
239 239
240 240 This calls execfile on a plain python file and looks for attributes
241 241 that are all caps. These attribute are added to the config Struct.
242 242 """
243 243
244 244 def __init__(self, filename, path=None):
245 245 """Build a config loader for a filename and path.
246 246
247 247 Parameters
248 248 ----------
249 249 filename : str
250 250 The file name of the config file.
251 251 path : str, list, tuple
252 252 The path to search for the config file on, or a sequence of
253 253 paths to try in order.
254 254 """
255 255 super(PyFileConfigLoader, self).__init__()
256 256 self.filename = filename
257 257 self.path = path
258 258 self.full_filename = ''
259 259 self.data = None
260 260
261 261 def load_config(self):
262 262 """Load the config from a file and return it as a Struct."""
263 263 self.clear()
264 264 try:
265 265 self._find_file()
266 266 except IOError as e:
267 267 raise ConfigFileNotFound(str(e))
268 268 self._read_file_as_dict()
269 269 self._convert_to_config()
270 270 return self.config
271 271
272 272 def _find_file(self):
273 273 """Try to find the file by searching the paths."""
274 274 self.full_filename = filefind(self.filename, self.path)
275 275
276 276 def _read_file_as_dict(self):
277 277 """Load the config file into self.config, with recursive loading."""
278 278 # This closure is made available in the namespace that is used
279 279 # to exec the config file. It allows users to call
280 280 # load_subconfig('myconfig.py') to load config files recursively.
281 281 # It needs to be a closure because it has references to self.path
282 282 # and self.config. The sub-config is loaded with the same path
283 283 # as the parent, but it uses an empty config which is then merged
284 284 # with the parents.
285 285
286 286 # If a profile is specified, the config file will be loaded
287 287 # from that profile
288 288
289 289 def load_subconfig(fname, profile=None):
290 290 # import here to prevent circular imports
291 291 from IPython.core.profiledir import ProfileDir, ProfileDirError
292 292 if profile is not None:
293 293 try:
294 294 profile_dir = ProfileDir.find_profile_dir_by_name(
295 295 get_ipython_dir(),
296 296 profile,
297 297 )
298 298 except ProfileDirError:
299 299 return
300 300 path = profile_dir.location
301 301 else:
302 302 path = self.path
303 303 loader = PyFileConfigLoader(fname, path)
304 304 try:
305 305 sub_config = loader.load_config()
306 306 except ConfigFileNotFound:
307 307 # Pass silently if the sub config is not there. This happens
308 308 # when a user s using a profile, but not the default config.
309 309 pass
310 310 else:
311 311 self.config._merge(sub_config)
312 312
313 313 # Again, this needs to be a closure and should be used in config
314 314 # files to get the config being loaded.
315 315 def get_config():
316 316 return self.config
317 317
318 318 namespace = dict(load_subconfig=load_subconfig, get_config=get_config)
319 319 fs_encoding = sys.getfilesystemencoding() or 'ascii'
320 320 conf_filename = self.full_filename.encode(fs_encoding)
321 321 py3compat.execfile(conf_filename, namespace)
322 322
323 323 def _convert_to_config(self):
324 324 if self.data is None:
325 325 ConfigLoaderError('self.data does not exist')
326 326
327 327
328 328 class CommandLineConfigLoader(ConfigLoader):
329 329 """A config loader for command line arguments.
330 330
331 331 As we add more command line based loaders, the common logic should go
332 332 here.
333 333 """
334 334
335 335 def _exec_config_str(self, lhs, rhs):
336 336 """execute self.config.<lhs>=<rhs>
337 337
338 338 * expands ~ with expanduser
339 339 * tries to assign with raw exec, otherwise assigns with just the string,
340 340 allowing `--C.a=foobar` and `--C.a="foobar"` to be equivalent. *Not*
341 341 equivalent are `--C.a=4` and `--C.a='4'`.
342 342 """
343 343 rhs = os.path.expanduser(rhs)
344 344 exec_str = 'self.config.' + lhs + '=' + rhs
345 345 try:
346 346 # Try to see if regular Python syntax will work. This
347 347 # won't handle strings as the quote marks are removed
348 348 # by the system shell.
349 349 exec exec_str in locals(), globals()
350 350 except (NameError, SyntaxError):
351 351 # This case happens if the rhs is a string but without
352 352 # the quote marks. Use repr, to get quote marks, and
353 353 # 'u' prefix and see if
354 354 # it succeeds. If it still fails, we let it raise.
355 355 exec_str = u'self.config.' + lhs + '= rhs'
356 356 exec exec_str in locals(), globals()
357 357
358 358 def _load_flag(self, cfg):
359 359 """update self.config from a flag, which can be a dict or Config"""
360 360 if isinstance(cfg, (dict, Config)):
361 361 # don't clobber whole config sections, update
362 362 # each section from config:
363 363 for sec,c in cfg.iteritems():
364 364 self.config[sec].update(c)
365 365 else:
366 366 raise ValueError("Invalid flag: '%s'"%raw)
367 367
368 368 # raw --identifier=value pattern
369 369 # but *also* accept '-' as wordsep, for aliases
370 370 # accepts: --foo=a
371 371 # --Class.trait=value
372 372 # --alias-name=value
373 373 # rejects: -foo=value
374 374 # --foo
375 375 # --Class.trait
376 376 kv_pattern = re.compile(r'\-\-[A-Za-z][\w\-]*(\.[\w\-]+)*\=.*')
377 377
378 378 # just flags, no assignments, with two *or one* leading '-'
379 379 # accepts: --foo
380 380 # -foo-bar-again
381 381 # rejects: --anything=anything
382 382 # --two.word
383 383
384 384 flag_pattern = re.compile(r'\-\-?\w+[\-\w]*$')
385 385
386 386 class KeyValueConfigLoader(CommandLineConfigLoader):
387 387 """A config loader that loads key value pairs from the command line.
388 388
389 389 This allows command line options to be gives in the following form::
390 390
391 391 ipython --profile="foo" --InteractiveShell.autocall=False
392 392 """
393 393
394 394 def __init__(self, argv=None, aliases=None, flags=None):
395 395 """Create a key value pair config loader.
396 396
397 397 Parameters
398 398 ----------
399 399 argv : list
400 400 A list that has the form of sys.argv[1:] which has unicode
401 401 elements of the form u"key=value". If this is None (default),
402 402 then sys.argv[1:] will be used.
403 403 aliases : dict
404 404 A dict of aliases for configurable traits.
405 405 Keys are the short aliases, Values are the resolved trait.
406 406 Of the form: `{'alias' : 'Configurable.trait'}`
407 407 flags : dict
408 408 A dict of flags, keyed by str name. Vaues can be Config objects,
409 409 dicts, or "key=value" strings. If Config or dict, when the flag
410 410 is triggered, The flag is loaded as `self.config.update(m)`.
411 411
412 412 Returns
413 413 -------
414 414 config : Config
415 415 The resulting Config object.
416 416
417 417 Examples
418 418 --------
419 419
420 420 >>> from IPython.config.loader import KeyValueConfigLoader
421 421 >>> cl = KeyValueConfigLoader()
422 422 >>> cl.load_config(["--A.name='brian'","--B.number=0"])
423 423 {'A': {'name': 'brian'}, 'B': {'number': 0}}
424 424 """
425 425 self.clear()
426 426 if argv is None:
427 427 argv = sys.argv[1:]
428 428 self.argv = argv
429 429 self.aliases = aliases or {}
430 430 self.flags = flags or {}
431 431
432 432
433 433 def clear(self):
434 434 super(KeyValueConfigLoader, self).clear()
435 435 self.extra_args = []
436 436
437 437
438 438 def _decode_argv(self, argv, enc=None):
439 439 """decode argv if bytes, using stin.encoding, falling back on default enc"""
440 440 uargv = []
441 441 if enc is None:
442 442 enc = text.getdefaultencoding()
443 443 for arg in argv:
444 444 if not isinstance(arg, unicode):
445 445 # only decode if not already decoded
446 446 arg = arg.decode(enc)
447 447 uargv.append(arg)
448 448 return uargv
449 449
450 450
451 451 def load_config(self, argv=None, aliases=None, flags=None):
452 452 """Parse the configuration and generate the Config object.
453 453
454 454 After loading, any arguments that are not key-value or
455 455 flags will be stored in self.extra_args - a list of
456 456 unparsed command-line arguments. This is used for
457 457 arguments such as input files or subcommands.
458 458
459 459 Parameters
460 460 ----------
461 461 argv : list, optional
462 462 A list that has the form of sys.argv[1:] which has unicode
463 463 elements of the form u"key=value". If this is None (default),
464 464 then self.argv will be used.
465 465 aliases : dict
466 466 A dict of aliases for configurable traits.
467 467 Keys are the short aliases, Values are the resolved trait.
468 468 Of the form: `{'alias' : 'Configurable.trait'}`
469 469 flags : dict
470 470 A dict of flags, keyed by str name. Values can be Config objects
471 471 or dicts. When the flag is triggered, The config is loaded as
472 472 `self.config.update(cfg)`.
473 473 """
474 474 from IPython.config.configurable import Configurable
475 475
476 476 self.clear()
477 477 if argv is None:
478 478 argv = self.argv
479 479 if aliases is None:
480 480 aliases = self.aliases
481 481 if flags is None:
482 482 flags = self.flags
483 483
484 484 # ensure argv is a list of unicode strings:
485 485 uargv = self._decode_argv(argv)
486 486 for idx,raw in enumerate(uargv):
487 487 # strip leading '-'
488 488 item = raw.lstrip('-')
489 489
490 490 if raw == '--':
491 491 # don't parse arguments after '--'
492 492 # this is useful for relaying arguments to scripts, e.g.
493 493 # ipython -i foo.py --pylab=qt -- args after '--' go-to-foo.py
494 494 self.extra_args.extend(uargv[idx+1:])
495 495 break
496 496
497 497 if kv_pattern.match(raw):
498 498 lhs,rhs = item.split('=',1)
499 499 # Substitute longnames for aliases.
500 500 if lhs in aliases:
501 501 lhs = aliases[lhs]
502 502 if '.' not in lhs:
503 503 # probably a mistyped alias, but not technically illegal
504 504 warn.warn("Unrecognized alias: '%s', it will probably have no effect."%lhs)
505 505 try:
506 506 self._exec_config_str(lhs, rhs)
507 507 except Exception:
508 508 raise ArgumentError("Invalid argument: '%s'" % raw)
509 509
510 510 elif flag_pattern.match(raw):
511 511 if item in flags:
512 512 cfg,help = flags[item]
513 513 self._load_flag(cfg)
514 514 else:
515 515 raise ArgumentError("Unrecognized flag: '%s'"%raw)
516 516 elif raw.startswith('-'):
517 517 kv = '--'+item
518 518 if kv_pattern.match(kv):
519 519 raise ArgumentError("Invalid argument: '%s', did you mean '%s'?"%(raw, kv))
520 520 else:
521 521 raise ArgumentError("Invalid argument: '%s'"%raw)
522 522 else:
523 523 # keep all args that aren't valid in a list,
524 524 # in case our parent knows what to do with them.
525 525 self.extra_args.append(item)
526 526 return self.config
527 527
528 528 class ArgParseConfigLoader(CommandLineConfigLoader):
529 529 """A loader that uses the argparse module to load from the command line."""
530 530
531 531 def __init__(self, argv=None, aliases=None, flags=None, *parser_args, **parser_kw):
532 532 """Create a config loader for use with argparse.
533 533
534 534 Parameters
535 535 ----------
536 536
537 537 argv : optional, list
538 538 If given, used to read command-line arguments from, otherwise
539 539 sys.argv[1:] is used.
540 540
541 541 parser_args : tuple
542 542 A tuple of positional arguments that will be passed to the
543 543 constructor of :class:`argparse.ArgumentParser`.
544 544
545 545 parser_kw : dict
546 546 A tuple of keyword arguments that will be passed to the
547 547 constructor of :class:`argparse.ArgumentParser`.
548 548
549 549 Returns
550 550 -------
551 551 config : Config
552 552 The resulting Config object.
553 553 """
554 554 super(CommandLineConfigLoader, self).__init__()
555 555 self.clear()
556 556 if argv is None:
557 557 argv = sys.argv[1:]
558 558 self.argv = argv
559 559 self.aliases = aliases or {}
560 560 self.flags = flags or {}
561 561
562 562 self.parser_args = parser_args
563 563 self.version = parser_kw.pop("version", None)
564 564 kwargs = dict(argument_default=argparse.SUPPRESS)
565 565 kwargs.update(parser_kw)
566 566 self.parser_kw = kwargs
567 567
568 568 def load_config(self, argv=None, aliases=None, flags=None):
569 569 """Parse command line arguments and return as a Config object.
570 570
571 571 Parameters
572 572 ----------
573 573
574 574 args : optional, list
575 575 If given, a list with the structure of sys.argv[1:] to parse
576 576 arguments from. If not given, the instance's self.argv attribute
577 577 (given at construction time) is used."""
578 578 self.clear()
579 579 if argv is None:
580 580 argv = self.argv
581 581 if aliases is None:
582 582 aliases = self.aliases
583 583 if flags is None:
584 584 flags = self.flags
585 585 self._create_parser(aliases, flags)
586 586 self._parse_args(argv)
587 587 self._convert_to_config()
588 588 return self.config
589 589
590 590 def get_extra_args(self):
591 591 if hasattr(self, 'extra_args'):
592 592 return self.extra_args
593 593 else:
594 594 return []
595 595
596 596 def _create_parser(self, aliases=None, flags=None):
597 597 self.parser = ArgumentParser(*self.parser_args, **self.parser_kw)
598 598 self._add_arguments(aliases, flags)
599 599
600 600 def _add_arguments(self, aliases=None, flags=None):
601 601 raise NotImplementedError("subclasses must implement _add_arguments")
602 602
603 603 def _parse_args(self, args):
604 604 """self.parser->self.parsed_data"""
605 605 # decode sys.argv to support unicode command-line options
606 606 enc = text.getdefaultencoding()
607 607 uargs = [py3compat.cast_unicode(a, enc) for a in args]
608 608 self.parsed_data, self.extra_args = self.parser.parse_known_args(uargs)
609 609
610 610 def _convert_to_config(self):
611 611 """self.parsed_data->self.config"""
612 612 for k, v in vars(self.parsed_data).iteritems():
613 613 exec "self.config.%s = v"%k in locals(), globals()
614 614
615 615 class KVArgParseConfigLoader(ArgParseConfigLoader):
616 616 """A config loader that loads aliases and flags with argparse,
617 617 but will use KVLoader for the rest. This allows better parsing
618 618 of common args, such as `ipython -c 'print 5'`, but still gets
619 619 arbitrary config with `ipython --InteractiveShell.use_readline=False`"""
620 620
621 621 def _convert_to_config(self):
622 622 """self.parsed_data->self.config"""
623 623 for k, v in vars(self.parsed_data).iteritems():
624 624 self._exec_config_str(k, v)
625 625
626 626 def _add_arguments(self, aliases=None, flags=None):
627 627 self.alias_flags = {}
628 628 # print aliases, flags
629 629 if aliases is None:
630 630 aliases = self.aliases
631 631 if flags is None:
632 632 flags = self.flags
633 633 paa = self.parser.add_argument
634 634 for key,value in aliases.iteritems():
635 635 if key in flags:
636 636 # flags
637 637 nargs = '?'
638 638 else:
639 639 nargs = None
640 640 if len(key) is 1:
641 641 paa('-'+key, '--'+key, type=unicode, dest=value, nargs=nargs)
642 642 else:
643 643 paa('--'+key, type=unicode, dest=value, nargs=nargs)
644 644 for key, (value, help) in flags.iteritems():
645 645 if key in self.aliases:
646 646 #
647 647 self.alias_flags[self.aliases[key]] = value
648 648 continue
649 649 if len(key) is 1:
650 650 paa('-'+key, '--'+key, action='append_const', dest='_flags', const=value)
651 651 else:
652 652 paa('--'+key, action='append_const', dest='_flags', const=value)
653 653
654 654 def _convert_to_config(self):
655 655 """self.parsed_data->self.config, parse unrecognized extra args via KVLoader."""
656 656 # remove subconfigs list from namespace before transforming the Namespace
657 657 if '_flags' in self.parsed_data:
658 658 subcs = self.parsed_data._flags
659 659 del self.parsed_data._flags
660 660 else:
661 661 subcs = []
662 662
663 663 for k, v in vars(self.parsed_data).iteritems():
664 664 if v is None:
665 665 # it was a flag that shares the name of an alias
666 666 subcs.append(self.alias_flags[k])
667 667 else:
668 668 # eval the KV assignment
669 669 self._exec_config_str(k, v)
670 670
671 671 for subc in subcs:
672 672 self._load_flag(subc)
673 673
674 674 if self.extra_args:
675 675 sub_parser = KeyValueConfigLoader()
676 676 sub_parser.load_config(self.extra_args)
677 677 self.config._merge(sub_parser.config)
678 678 self.extra_args = sub_parser.extra_args
679
680
681 def load_pyconfig_files(config_files, path):
682 """Load multiple Python config files, merging each of them in turn.
683
684 Parameters
685 ==========
686 config_files : list of str
687 List of config files names to load and merge into the config.
688 path : unicode
689 The full path to the location of the config files.
690 """
691 config = Config()
692 for cf in config_files:
693 loader = PyFileConfigLoader(cf, path=path)
694 try:
695 next_config = loader.load_config()
696 except ConfigFileNotFound:
697 pass
698 except:
699 raise
700 else:
701 config._merge(next_config)
702 return config
@@ -1,92 +1,168 b''
1 1 """Manage IPython.parallel clusters in the notebook.
2 2
3 3 Authors:
4 4
5 5 * Brian Granger
6 6 """
7 7
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2008-2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 import datetime
20 19 import os
21 import uuid
22 import glob
23 20
24 21 from tornado import web
25 22 from zmq.eventloop import ioloop
26 23
27 24 from IPython.config.configurable import LoggingConfigurable
28 from IPython.utils.traitlets import Unicode, List, Dict, Bool
29 from IPython.parallel.apps.launcher import IPClusterLauncher
30 from IPython.core.profileapp import list_profiles_in, list_bundled_profiles
31 from IPython.utils.path import get_ipython_dir, get_ipython_package_dir
25 from IPython.config.loader import load_pyconfig_files
26 from IPython.utils.traitlets import Dict, Instance, CFloat
27 from IPython.parallel.apps.ipclusterapp import find_launcher_class
28 from IPython.core.profileapp import list_profiles_in
29 from IPython.core.profiledir import ProfileDir
30 from IPython.utils.path import get_ipython_dir
31 from IPython.utils.sysinfo import num_cpus
32
32 33
33 34 #-----------------------------------------------------------------------------
34 35 # Classes
35 36 #-----------------------------------------------------------------------------
36 37
37 38 class ClusterManager(LoggingConfigurable):
38 39
39 40 profiles = Dict()
40 41
41
42 def list_profile_names(self):
42 delay = CFloat(1., config=True,
43 help="delay (in s) between starting the controller and the engines")
44
45 loop = Instance('zmq.eventloop.ioloop.IOLoop')
46 def _loop_default(self):
47 from zmq.eventloop.ioloop import IOLoop
48 return IOLoop.instance()
49
50 def load_cluster_config(self, profile_dir):
51 config_files = ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
52 config = load_pyconfig_files(config_files, profile_dir)
53 return config
54
55 def build_launchers(self, profile_dir):
56 config = self.load_cluster_config(profile_dir)
57 cont_clsname = config.IPClusterStart.get('controller_launcher_class','Local')
58 cont_class = find_launcher_class(cont_clsname,'Controller')
59 cl = cont_class(work_dir=u'.',config=config, profile_dir=profile_dir)
60
61 engine_clsname = config.IPClusterEngines.get('engine_launcher_class','Local')
62 engine_class = find_launcher_class(engine_clsname,'EngineSet')
63 esl = engine_class(work_dir=u'.',config=config, profile_dir=profile_dir)
64 n = config.IPClusterEngines.get('n', num_cpus())
65 n = getattr(esl, 'engine_count', n)
66 return cl, esl, n
67
68 def get_profile_dir(self, name, path):
69 p = ProfileDir.find_profile_dir_by_name(path,name=name)
70 return p.location
71
72 def update_profiles(self):
43 73 """List all profiles in the ipython_dir and cwd.
44 74 """
45 profiles = list_profiles_in(get_ipython_dir())
46 profiles += list_profiles_in(os.getcwdu())
47 return profiles
48
75 for path in [get_ipython_dir(), os.getcwdu()]:
76 for profile in list_profiles_in(path):
77 pd = self.get_profile_dir(profile, path)
78 if profile not in self.profiles:
79 self.log.debug("Overwriting profile %s" % profile)
80 self.profiles[profile] = {
81 'profile': profile,
82 'profile_dir': pd,
83 'status': 'stopped'
84 }
49 85
50 86 def list_profiles(self):
51 profiles = self.list_profile_names()
52 result = [self.profile_info(p) for p in profiles]
87 self.update_profiles()
88 result = [self.profile_info(p) for p in self.profiles.keys()]
53 89 return result
54 90
91 def check_profile(self, profile):
92 if profile not in self.profiles:
93 raise web.HTTPError(404, u'profile not found')
55 94
56 95 def profile_info(self, profile):
57 if profile not in self.list_profile_names():
58 raise web.HTTPError(404, u'profile not found')
59 result = dict(profile=profile)
96 self.check_profile(profile)
97 result = {}
60 98 data = self.profiles.get(profile)
61 if data is None:
62 result['status'] = 'stopped'
63 else:
64 result['status'] = 'running'
99 result['profile'] = profile
100 result['profile_dir'] = data['profile_dir']
101 result['status'] = data['status']
102 if 'n' in data:
65 103 result['n'] = data['n']
66 104 return result
67 105
68 def start_cluster(self, profile, n=4):
106 def start_cluster(self, profile, n=None):
69 107 """Start a cluster for a given profile."""
70 if profile not in self.list_profile_names():
71 raise web.HTTPError(404, u'profile not found')
72 if profile in self.profiles:
108 self.check_profile(profile)
109 data = self.profiles[profile]
110 if data['status'] == 'running':
73 111 raise web.HTTPError(409, u'cluster already running')
74 launcher = IPClusterLauncher(ipcluster_profile=profile, ipcluster_n=n)
75 launcher.start()
76 self.profiles[profile] = {
77 'launcher': launcher,
78 'n': n
79 }
112 cl, esl, default_n = self.build_launchers(data['profile_dir'])
113 n = n if n is not None else default_n
114 def clean_data():
115 data.pop('controller_launcher',None)
116 data.pop('engine_set_launcher',None)
117 data.pop('n',None)
118 data['status'] = 'stopped'
119 def engines_stopped(r):
120 self.log.debug('Engines stopped')
121 if cl.running:
122 cl.stop()
123 clean_data()
124 esl.on_stop(engines_stopped)
125 def controller_stopped(r):
126 self.log.debug('Controller stopped')
127 if esl.running:
128 esl.stop()
129 clean_data()
130 cl.on_stop(controller_stopped)
131
132 dc = ioloop.DelayedCallback(lambda: cl.start(), 0, self.loop)
133 dc.start()
134 dc = ioloop.DelayedCallback(lambda: esl.start(n), 1000*self.delay, self.loop)
135 dc.start()
136
137 self.log.debug('Cluster started')
138 data['controller_launcher'] = cl
139 data['engine_set_launcher'] = esl
140 data['n'] = n
141 data['status'] = 'running'
80 142 return self.profile_info(profile)
81 143
82 144 def stop_cluster(self, profile):
83 145 """Stop a cluster for a given profile."""
84 if profile not in self.profiles:
146 self.check_profile(profile)
147 data = self.profiles[profile]
148 if data['status'] == 'stopped':
85 149 raise web.HTTPError(409, u'cluster not running')
86 launcher = self.profiles.pop(profile)['launcher']
87 launcher.stop()
88 return self.profile_info(profile)
150 data = self.profiles[profile]
151 cl = data['controller_launcher']
152 esl = data['engine_set_launcher']
153 if cl.running:
154 cl.stop()
155 if esl.running:
156 esl.stop()
157 # Return a temp info dict, the real one is updated in the on_stop
158 # logic above.
159 result = {
160 'profile': data['profile'],
161 'profile_dir': data['profile_dir'],
162 'status': 'stopped'
163 }
164 return result
89 165
90 166 def stop_all_clusters(self):
91 for p in self.profiles.values():
92 p['launcher'].stop()
167 for p in self.profiles.keys():
168 self.stop_cluster(profile)
@@ -1,730 +1,733 b''
1 1 """Tornado handlers for the notebook.
2 2
3 3 Authors:
4 4
5 5 * Brian Granger
6 6 """
7 7
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2008-2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 19 import logging
20 20 import Cookie
21 21 import time
22 22 import uuid
23 23
24 24 from tornado import web
25 25 from tornado import websocket
26 26
27 27 from zmq.eventloop import ioloop
28 28 from zmq.utils import jsonapi
29 29
30 30 from IPython.external.decorator import decorator
31 31 from IPython.zmq.session import Session
32 32 from IPython.lib.security import passwd_check
33 33
34 34 try:
35 35 from docutils.core import publish_string
36 36 except ImportError:
37 37 publish_string = None
38 38
39 39 #-----------------------------------------------------------------------------
40 40 # Monkeypatch for Tornado <= 2.1.1 - Remove when no longer necessary!
41 41 #-----------------------------------------------------------------------------
42 42
43 43 # Google Chrome, as of release 16, changed its websocket protocol number. The
44 44 # parts tornado cares about haven't really changed, so it's OK to continue
45 45 # accepting Chrome connections, but as of Tornado 2.1.1 (the currently released
46 46 # version as of Oct 30/2011) the version check fails, see the issue report:
47 47
48 48 # https://github.com/facebook/tornado/issues/385
49 49
50 50 # This issue has been fixed in Tornado post 2.1.1:
51 51
52 52 # https://github.com/facebook/tornado/commit/84d7b458f956727c3b0d6710
53 53
54 54 # Here we manually apply the same patch as above so that users of IPython can
55 55 # continue to work with an officially released Tornado. We make the
56 56 # monkeypatch version check as narrow as possible to limit its effects; once
57 57 # Tornado 2.1.1 is no longer found in the wild we'll delete this code.
58 58
59 59 import tornado
60 60
61 61 if tornado.version_info <= (2,1,1):
62 62
63 63 def _execute(self, transforms, *args, **kwargs):
64 64 from tornado.websocket import WebSocketProtocol8, WebSocketProtocol76
65 65
66 66 self.open_args = args
67 67 self.open_kwargs = kwargs
68 68
69 69 # The difference between version 8 and 13 is that in 8 the
70 70 # client sends a "Sec-Websocket-Origin" header and in 13 it's
71 71 # simply "Origin".
72 72 if self.request.headers.get("Sec-WebSocket-Version") in ("7", "8", "13"):
73 73 self.ws_connection = WebSocketProtocol8(self)
74 74 self.ws_connection.accept_connection()
75 75
76 76 elif self.request.headers.get("Sec-WebSocket-Version"):
77 77 self.stream.write(tornado.escape.utf8(
78 78 "HTTP/1.1 426 Upgrade Required\r\n"
79 79 "Sec-WebSocket-Version: 8\r\n\r\n"))
80 80 self.stream.close()
81 81
82 82 else:
83 83 self.ws_connection = WebSocketProtocol76(self)
84 84 self.ws_connection.accept_connection()
85 85
86 86 websocket.WebSocketHandler._execute = _execute
87 87 del _execute
88 88
89 89 #-----------------------------------------------------------------------------
90 90 # Decorator for disabling read-only handlers
91 91 #-----------------------------------------------------------------------------
92 92
93 93 @decorator
94 94 def not_if_readonly(f, self, *args, **kwargs):
95 95 if self.application.read_only:
96 96 raise web.HTTPError(403, "Notebook server is read-only")
97 97 else:
98 98 return f(self, *args, **kwargs)
99 99
100 100 @decorator
101 101 def authenticate_unless_readonly(f, self, *args, **kwargs):
102 102 """authenticate this page *unless* readonly view is active.
103 103
104 104 In read-only mode, the notebook list and print view should
105 105 be accessible without authentication.
106 106 """
107 107
108 108 @web.authenticated
109 109 def auth_f(self, *args, **kwargs):
110 110 return f(self, *args, **kwargs)
111 111
112 112 if self.application.read_only:
113 113 return f(self, *args, **kwargs)
114 114 else:
115 115 return auth_f(self, *args, **kwargs)
116 116
117 117 #-----------------------------------------------------------------------------
118 118 # Top-level handlers
119 119 #-----------------------------------------------------------------------------
120 120
121 121 class RequestHandler(web.RequestHandler):
122 122 """RequestHandler with default variable setting."""
123 123
124 124 def render(*args, **kwargs):
125 125 kwargs.setdefault('message', '')
126 126 return web.RequestHandler.render(*args, **kwargs)
127 127
128 128 class AuthenticatedHandler(RequestHandler):
129 129 """A RequestHandler with an authenticated user."""
130 130
131 131 def get_current_user(self):
132 132 user_id = self.get_secure_cookie("username")
133 133 # For now the user_id should not return empty, but it could eventually
134 134 if user_id == '':
135 135 user_id = 'anonymous'
136 136 if user_id is None:
137 137 # prevent extra Invalid cookie sig warnings:
138 138 self.clear_cookie('username')
139 139 if not self.application.password and not self.application.read_only:
140 140 user_id = 'anonymous'
141 141 return user_id
142 142
143 143 @property
144 144 def logged_in(self):
145 145 """Is a user currently logged in?
146 146
147 147 """
148 148 user = self.get_current_user()
149 149 return (user and not user == 'anonymous')
150 150
151 151 @property
152 152 def login_available(self):
153 153 """May a user proceed to log in?
154 154
155 155 This returns True if login capability is available, irrespective of
156 156 whether the user is already logged in or not.
157 157
158 158 """
159 159 return bool(self.application.password)
160 160
161 161 @property
162 162 def read_only(self):
163 163 """Is the notebook read-only?
164 164
165 165 """
166 166 return self.application.read_only
167 167
168 168 @property
169 169 def ws_url(self):
170 170 """websocket url matching the current request
171 171
172 172 turns http[s]://host[:port] into
173 173 ws[s]://host[:port]
174 174 """
175 175 proto = self.request.protocol.replace('http', 'ws')
176 176 host = self.application.ipython_app.websocket_host # default to config value
177 177 if host == '':
178 178 host = self.request.host # get from request
179 179 return "%s://%s" % (proto, host)
180 180
181 181
182 182 class AuthenticatedFileHandler(AuthenticatedHandler, web.StaticFileHandler):
183 183 """static files should only be accessible when logged in"""
184 184
185 185 @authenticate_unless_readonly
186 186 def get(self, path):
187 187 return web.StaticFileHandler.get(self, path)
188 188
189 189
190 190 class ProjectDashboardHandler(AuthenticatedHandler):
191 191
192 192 @authenticate_unless_readonly
193 193 def get(self):
194 194 nbm = self.application.notebook_manager
195 195 project = nbm.notebook_dir
196 196 self.render(
197 197 'projectdashboard.html', project=project,
198 198 base_project_url=self.application.ipython_app.base_project_url,
199 199 base_kernel_url=self.application.ipython_app.base_kernel_url,
200 200 read_only=self.read_only,
201 201 logged_in=self.logged_in,
202 202 login_available=self.login_available
203 203 )
204 204
205 205
206 206 class LoginHandler(AuthenticatedHandler):
207 207
208 208 def _render(self, message=None):
209 209 self.render('login.html',
210 210 next=self.get_argument('next', default='/'),
211 211 read_only=self.read_only,
212 212 logged_in=self.logged_in,
213 213 login_available=self.login_available,
214 214 base_project_url=self.application.ipython_app.base_project_url,
215 215 message=message
216 216 )
217 217
218 218 def get(self):
219 219 if self.current_user:
220 220 self.redirect(self.get_argument('next', default='/'))
221 221 else:
222 222 self._render()
223 223
224 224 def post(self):
225 225 pwd = self.get_argument('password', default=u'')
226 226 if self.application.password:
227 227 if passwd_check(self.application.password, pwd):
228 228 self.set_secure_cookie('username', str(uuid.uuid4()))
229 229 else:
230 230 self._render(message={'error': 'Invalid password'})
231 231 return
232 232
233 233 self.redirect(self.get_argument('next', default='/'))
234 234
235 235
236 236 class LogoutHandler(AuthenticatedHandler):
237 237
238 238 def get(self):
239 239 self.clear_cookie('username')
240 240 if self.login_available:
241 241 message = {'info': 'Successfully logged out.'}
242 242 else:
243 243 message = {'warning': 'Cannot log out. Notebook authentication '
244 244 'is disabled.'}
245 245
246 246 self.render('logout.html',
247 247 read_only=self.read_only,
248 248 logged_in=self.logged_in,
249 249 login_available=self.login_available,
250 250 base_project_url=self.application.ipython_app.base_project_url,
251 251 message=message)
252 252
253 253
254 254 class NewHandler(AuthenticatedHandler):
255 255
256 256 @web.authenticated
257 257 def get(self):
258 258 nbm = self.application.notebook_manager
259 259 project = nbm.notebook_dir
260 260 notebook_id = nbm.new_notebook()
261 261 self.render(
262 262 'notebook.html', project=project,
263 263 notebook_id=notebook_id,
264 264 base_project_url=self.application.ipython_app.base_project_url,
265 265 base_kernel_url=self.application.ipython_app.base_kernel_url,
266 266 kill_kernel=False,
267 267 read_only=False,
268 268 logged_in=self.logged_in,
269 269 login_available=self.login_available,
270 270 mathjax_url=self.application.ipython_app.mathjax_url,
271 271 )
272 272
273 273
274 274 class NamedNotebookHandler(AuthenticatedHandler):
275 275
276 276 @authenticate_unless_readonly
277 277 def get(self, notebook_id):
278 278 nbm = self.application.notebook_manager
279 279 project = nbm.notebook_dir
280 280 if not nbm.notebook_exists(notebook_id):
281 281 raise web.HTTPError(404, u'Notebook does not exist: %s' % notebook_id)
282 282
283 283 self.render(
284 284 'notebook.html', project=project,
285 285 notebook_id=notebook_id,
286 286 base_project_url=self.application.ipython_app.base_project_url,
287 287 base_kernel_url=self.application.ipython_app.base_kernel_url,
288 288 kill_kernel=False,
289 289 read_only=self.read_only,
290 290 logged_in=self.logged_in,
291 291 login_available=self.login_available,
292 292 mathjax_url=self.application.ipython_app.mathjax_url,
293 293 )
294 294
295 295
296 296 class PrintNotebookHandler(AuthenticatedHandler):
297 297
298 298 @authenticate_unless_readonly
299 299 def get(self, notebook_id):
300 300 nbm = self.application.notebook_manager
301 301 project = nbm.notebook_dir
302 302 if not nbm.notebook_exists(notebook_id):
303 303 raise web.HTTPError(404, u'Notebook does not exist: %s' % notebook_id)
304 304
305 305 self.render(
306 306 'printnotebook.html', project=project,
307 307 notebook_id=notebook_id,
308 308 base_project_url=self.application.ipython_app.base_project_url,
309 309 base_kernel_url=self.application.ipython_app.base_kernel_url,
310 310 kill_kernel=False,
311 311 read_only=self.read_only,
312 312 logged_in=self.logged_in,
313 313 login_available=self.login_available,
314 314 mathjax_url=self.application.ipython_app.mathjax_url,
315 315 )
316 316
317 317 #-----------------------------------------------------------------------------
318 318 # Kernel handlers
319 319 #-----------------------------------------------------------------------------
320 320
321 321
322 322 class MainKernelHandler(AuthenticatedHandler):
323 323
324 324 @web.authenticated
325 325 def get(self):
326 326 km = self.application.kernel_manager
327 327 self.finish(jsonapi.dumps(km.kernel_ids))
328 328
329 329 @web.authenticated
330 330 def post(self):
331 331 km = self.application.kernel_manager
332 332 notebook_id = self.get_argument('notebook', default=None)
333 333 kernel_id = km.start_kernel(notebook_id)
334 334 data = {'ws_url':self.ws_url,'kernel_id':kernel_id}
335 335 self.set_header('Location', '/'+kernel_id)
336 336 self.finish(jsonapi.dumps(data))
337 337
338 338
339 339 class KernelHandler(AuthenticatedHandler):
340 340
341 341 SUPPORTED_METHODS = ('DELETE')
342 342
343 343 @web.authenticated
344 344 def delete(self, kernel_id):
345 345 km = self.application.kernel_manager
346 346 km.kill_kernel(kernel_id)
347 347 self.set_status(204)
348 348 self.finish()
349 349
350 350
351 351 class KernelActionHandler(AuthenticatedHandler):
352 352
353 353 @web.authenticated
354 354 def post(self, kernel_id, action):
355 355 km = self.application.kernel_manager
356 356 if action == 'interrupt':
357 357 km.interrupt_kernel(kernel_id)
358 358 self.set_status(204)
359 359 if action == 'restart':
360 360 new_kernel_id = km.restart_kernel(kernel_id)
361 361 data = {'ws_url':self.ws_url,'kernel_id':new_kernel_id}
362 362 self.set_header('Location', '/'+new_kernel_id)
363 363 self.write(jsonapi.dumps(data))
364 364 self.finish()
365 365
366 366
367 367 class ZMQStreamHandler(websocket.WebSocketHandler):
368 368
369 369 def _reserialize_reply(self, msg_list):
370 370 """Reserialize a reply message using JSON.
371 371
372 372 This takes the msg list from the ZMQ socket, unserializes it using
373 373 self.session and then serializes the result using JSON. This method
374 374 should be used by self._on_zmq_reply to build messages that can
375 375 be sent back to the browser.
376 376 """
377 377 idents, msg_list = self.session.feed_identities(msg_list)
378 378 msg = self.session.unserialize(msg_list)
379 379 try:
380 380 msg['header'].pop('date')
381 381 except KeyError:
382 382 pass
383 383 try:
384 384 msg['parent_header'].pop('date')
385 385 except KeyError:
386 386 pass
387 387 msg.pop('buffers')
388 388 return jsonapi.dumps(msg)
389 389
390 390 def _on_zmq_reply(self, msg_list):
391 391 try:
392 392 msg = self._reserialize_reply(msg_list)
393 393 except:
394 394 self.application.log.critical("Malformed message: %r" % msg_list)
395 395 else:
396 396 self.write_message(msg)
397 397
398 398 def allow_draft76(self):
399 399 """Allow draft 76, until browsers such as Safari update to RFC 6455.
400 400
401 401 This has been disabled by default in tornado in release 2.2.0, and
402 402 support will be removed in later versions.
403 403 """
404 404 return True
405 405
406 406
407 407 class AuthenticatedZMQStreamHandler(ZMQStreamHandler):
408 408
409 409 def open(self, kernel_id):
410 410 self.kernel_id = kernel_id.decode('ascii')
411 411 try:
412 412 cfg = self.application.ipython_app.config
413 413 except AttributeError:
414 414 # protect from the case where this is run from something other than
415 415 # the notebook app:
416 416 cfg = None
417 417 self.session = Session(config=cfg)
418 418 self.save_on_message = self.on_message
419 419 self.on_message = self.on_first_message
420 420
421 421 def get_current_user(self):
422 422 user_id = self.get_secure_cookie("username")
423 423 if user_id == '' or (user_id is None and not self.application.password):
424 424 user_id = 'anonymous'
425 425 return user_id
426 426
427 427 def _inject_cookie_message(self, msg):
428 428 """Inject the first message, which is the document cookie,
429 429 for authentication."""
430 430 if isinstance(msg, unicode):
431 431 # Cookie can't constructor doesn't accept unicode strings for some reason
432 432 msg = msg.encode('utf8', 'replace')
433 433 try:
434 434 self.request._cookies = Cookie.SimpleCookie(msg)
435 435 except:
436 436 logging.warn("couldn't parse cookie string: %s",msg, exc_info=True)
437 437
438 438 def on_first_message(self, msg):
439 439 self._inject_cookie_message(msg)
440 440 if self.get_current_user() is None:
441 441 logging.warn("Couldn't authenticate WebSocket connection")
442 442 raise web.HTTPError(403)
443 443 self.on_message = self.save_on_message
444 444
445 445
446 446 class IOPubHandler(AuthenticatedZMQStreamHandler):
447 447
448 448 def initialize(self, *args, **kwargs):
449 449 self._kernel_alive = True
450 450 self._beating = False
451 451 self.iopub_stream = None
452 452 self.hb_stream = None
453 453
454 454 def on_first_message(self, msg):
455 455 try:
456 456 super(IOPubHandler, self).on_first_message(msg)
457 457 except web.HTTPError:
458 458 self.close()
459 459 return
460 460 km = self.application.kernel_manager
461 461 self.time_to_dead = km.time_to_dead
462 462 self.first_beat = km.first_beat
463 463 kernel_id = self.kernel_id
464 464 try:
465 465 self.iopub_stream = km.create_iopub_stream(kernel_id)
466 466 self.hb_stream = km.create_hb_stream(kernel_id)
467 467 except web.HTTPError:
468 468 # WebSockets don't response to traditional error codes so we
469 469 # close the connection.
470 470 if not self.stream.closed():
471 471 self.stream.close()
472 472 self.close()
473 473 else:
474 474 self.iopub_stream.on_recv(self._on_zmq_reply)
475 475 self.start_hb(self.kernel_died)
476 476
477 477 def on_message(self, msg):
478 478 pass
479 479
480 480 def on_close(self):
481 481 # This method can be called twice, once by self.kernel_died and once
482 482 # from the WebSocket close event. If the WebSocket connection is
483 483 # closed before the ZMQ streams are setup, they could be None.
484 484 self.stop_hb()
485 485 if self.iopub_stream is not None and not self.iopub_stream.closed():
486 486 self.iopub_stream.on_recv(None)
487 487 self.iopub_stream.close()
488 488 if self.hb_stream is not None and not self.hb_stream.closed():
489 489 self.hb_stream.close()
490 490
491 491 def start_hb(self, callback):
492 492 """Start the heartbeating and call the callback if the kernel dies."""
493 493 if not self._beating:
494 494 self._kernel_alive = True
495 495
496 496 def ping_or_dead():
497 497 self.hb_stream.flush()
498 498 if self._kernel_alive:
499 499 self._kernel_alive = False
500 500 self.hb_stream.send(b'ping')
501 501 # flush stream to force immediate socket send
502 502 self.hb_stream.flush()
503 503 else:
504 504 try:
505 505 callback()
506 506 except:
507 507 pass
508 508 finally:
509 509 self.stop_hb()
510 510
511 511 def beat_received(msg):
512 512 self._kernel_alive = True
513 513
514 514 self.hb_stream.on_recv(beat_received)
515 515 loop = ioloop.IOLoop.instance()
516 516 self._hb_periodic_callback = ioloop.PeriodicCallback(ping_or_dead, self.time_to_dead*1000, loop)
517 517 loop.add_timeout(time.time()+self.first_beat, self._really_start_hb)
518 518 self._beating= True
519 519
520 520 def _really_start_hb(self):
521 521 """callback for delayed heartbeat start
522 522
523 523 Only start the hb loop if we haven't been closed during the wait.
524 524 """
525 525 if self._beating and not self.hb_stream.closed():
526 526 self._hb_periodic_callback.start()
527 527
528 528 def stop_hb(self):
529 529 """Stop the heartbeating and cancel all related callbacks."""
530 530 if self._beating:
531 531 self._beating = False
532 532 self._hb_periodic_callback.stop()
533 533 if not self.hb_stream.closed():
534 534 self.hb_stream.on_recv(None)
535 535
536 536 def kernel_died(self):
537 537 self.application.kernel_manager.delete_mapping_for_kernel(self.kernel_id)
538 538 self.application.log.error("Kernel %s failed to respond to heartbeat", self.kernel_id)
539 539 self.write_message(
540 540 {'header': {'msg_type': 'status'},
541 541 'parent_header': {},
542 542 'content': {'execution_state':'dead'}
543 543 }
544 544 )
545 545 self.on_close()
546 546
547 547
548 548 class ShellHandler(AuthenticatedZMQStreamHandler):
549 549
550 550 def initialize(self, *args, **kwargs):
551 551 self.shell_stream = None
552 552
553 553 def on_first_message(self, msg):
554 554 try:
555 555 super(ShellHandler, self).on_first_message(msg)
556 556 except web.HTTPError:
557 557 self.close()
558 558 return
559 559 km = self.application.kernel_manager
560 560 self.max_msg_size = km.max_msg_size
561 561 kernel_id = self.kernel_id
562 562 try:
563 563 self.shell_stream = km.create_shell_stream(kernel_id)
564 564 except web.HTTPError:
565 565 # WebSockets don't response to traditional error codes so we
566 566 # close the connection.
567 567 if not self.stream.closed():
568 568 self.stream.close()
569 569 self.close()
570 570 else:
571 571 self.shell_stream.on_recv(self._on_zmq_reply)
572 572
573 573 def on_message(self, msg):
574 574 if len(msg) < self.max_msg_size:
575 575 msg = jsonapi.loads(msg)
576 576 self.session.send(self.shell_stream, msg)
577 577
578 578 def on_close(self):
579 579 # Make sure the stream exists and is not already closed.
580 580 if self.shell_stream is not None and not self.shell_stream.closed():
581 581 self.shell_stream.close()
582 582
583 583
584 584 #-----------------------------------------------------------------------------
585 585 # Notebook web service handlers
586 586 #-----------------------------------------------------------------------------
587 587
588 588 class NotebookRootHandler(AuthenticatedHandler):
589 589
590 590 @authenticate_unless_readonly
591 591 def get(self):
592 592 nbm = self.application.notebook_manager
593 593 files = nbm.list_notebooks()
594 594 self.finish(jsonapi.dumps(files))
595 595
596 596 @web.authenticated
597 597 def post(self):
598 598 nbm = self.application.notebook_manager
599 599 body = self.request.body.strip()
600 600 format = self.get_argument('format', default='json')
601 601 name = self.get_argument('name', default=None)
602 602 if body:
603 603 notebook_id = nbm.save_new_notebook(body, name=name, format=format)
604 604 else:
605 605 notebook_id = nbm.new_notebook()
606 606 self.set_header('Location', '/'+notebook_id)
607 607 self.finish(jsonapi.dumps(notebook_id))
608 608
609 609
610 610 class NotebookHandler(AuthenticatedHandler):
611 611
612 612 SUPPORTED_METHODS = ('GET', 'PUT', 'DELETE')
613 613
614 614 @authenticate_unless_readonly
615 615 def get(self, notebook_id):
616 616 nbm = self.application.notebook_manager
617 617 format = self.get_argument('format', default='json')
618 618 last_mod, name, data = nbm.get_notebook(notebook_id, format)
619 619
620 620 if format == u'json':
621 621 self.set_header('Content-Type', 'application/json')
622 622 self.set_header('Content-Disposition','attachment; filename="%s.ipynb"' % name)
623 623 elif format == u'py':
624 624 self.set_header('Content-Type', 'application/x-python')
625 625 self.set_header('Content-Disposition','attachment; filename="%s.py"' % name)
626 626 self.set_header('Last-Modified', last_mod)
627 627 self.finish(data)
628 628
629 629 @web.authenticated
630 630 def put(self, notebook_id):
631 631 nbm = self.application.notebook_manager
632 632 format = self.get_argument('format', default='json')
633 633 name = self.get_argument('name', default=None)
634 634 nbm.save_notebook(notebook_id, self.request.body, name=name, format=format)
635 635 self.set_status(204)
636 636 self.finish()
637 637
638 638 @web.authenticated
639 639 def delete(self, notebook_id):
640 640 nbm = self.application.notebook_manager
641 641 nbm.delete_notebook(notebook_id)
642 642 self.set_status(204)
643 643 self.finish()
644 644
645 645
646 646 class NotebookCopyHandler(AuthenticatedHandler):
647 647
648 648 @web.authenticated
649 649 def get(self, notebook_id):
650 650 nbm = self.application.notebook_manager
651 651 project = nbm.notebook_dir
652 652 notebook_id = nbm.copy_notebook(notebook_id)
653 653 self.render(
654 654 'notebook.html', project=project,
655 655 notebook_id=notebook_id,
656 656 base_project_url=self.application.ipython_app.base_project_url,
657 657 base_kernel_url=self.application.ipython_app.base_kernel_url,
658 658 kill_kernel=False,
659 659 read_only=False,
660 660 logged_in=self.logged_in,
661 661 login_available=self.login_available,
662 662 mathjax_url=self.application.ipython_app.mathjax_url,
663 663 )
664 664
665 665
666 666 #-----------------------------------------------------------------------------
667 667 # Cluster handlers
668 668 #-----------------------------------------------------------------------------
669 669
670 670
671 671 class MainClusterHandler(AuthenticatedHandler):
672 672
673 673 @web.authenticated
674 674 def get(self):
675 675 cm = self.application.cluster_manager
676 676 self.finish(jsonapi.dumps(cm.list_profiles()))
677 677
678 678
679 679 class ClusterProfileHandler(AuthenticatedHandler):
680 680
681 681 @web.authenticated
682 682 def get(self, profile):
683 683 cm = self.application.cluster_manager
684 684 self.finish(jsonapi.dumps(cm.profile_info(profile)))
685 685
686 686
687 687 class ClusterActionHandler(AuthenticatedHandler):
688 688
689 689 @web.authenticated
690 690 def post(self, profile, action):
691 691 cm = self.application.cluster_manager
692 692 if action == 'start':
693 n = int(self.get_argument('n', default=4))
694 data = cm.start_cluster(profile, n)
693 n = self.get_argument('n',default=None)
694 if n is None:
695 data = cm.start_cluster(profile)
696 else:
697 data = cm.start_cluster(profile,int(n))
695 698 if action == 'stop':
696 699 data = cm.stop_cluster(profile)
697 700 self.finish(jsonapi.dumps(data))
698 701
699 702
700 703 #-----------------------------------------------------------------------------
701 704 # RST web service handlers
702 705 #-----------------------------------------------------------------------------
703 706
704 707
705 708 class RSTHandler(AuthenticatedHandler):
706 709
707 710 @web.authenticated
708 711 def post(self):
709 712 if publish_string is None:
710 713 raise web.HTTPError(503, u'docutils not available')
711 714 body = self.request.body.strip()
712 715 source = body
713 716 # template_path=os.path.join(os.path.dirname(__file__), u'templates', u'rst_template.html')
714 717 defaults = {'file_insertion_enabled': 0,
715 718 'raw_enabled': 0,
716 719 '_disable_config': 1,
717 720 'stylesheet_path': 0
718 721 # 'template': template_path
719 722 }
720 723 try:
721 724 html = publish_string(source, writer_name='html',
722 725 settings_overrides=defaults
723 726 )
724 727 except:
725 728 raise web.HTTPError(400, u'Invalid RST')
726 729 print html
727 730 self.set_header('Content-Type', 'text/html')
728 731 self.finish(html)
729 732
730 733
@@ -1,503 +1,504 b''
1 1 # coding: utf-8
2 2 """A tornado based IPython notebook server.
3 3
4 4 Authors:
5 5
6 6 * Brian Granger
7 7 """
8 8 #-----------------------------------------------------------------------------
9 9 # Copyright (C) 2008-2011 The IPython Development Team
10 10 #
11 11 # Distributed under the terms of the BSD License. The full license is in
12 12 # the file COPYING, distributed as part of this software.
13 13 #-----------------------------------------------------------------------------
14 14
15 15 #-----------------------------------------------------------------------------
16 16 # Imports
17 17 #-----------------------------------------------------------------------------
18 18
19 19 # stdlib
20 20 import errno
21 21 import logging
22 22 import os
23 23 import signal
24 24 import socket
25 25 import sys
26 26 import threading
27 27 import webbrowser
28 28
29 29 # Third party
30 30 import zmq
31 31
32 32 # Install the pyzmq ioloop. This has to be done before anything else from
33 33 # tornado is imported.
34 34 from zmq.eventloop import ioloop
35 35 # FIXME: ioloop.install is new in pyzmq-2.1.7, so remove this conditional
36 36 # when pyzmq dependency is updated beyond that.
37 37 if hasattr(ioloop, 'install'):
38 38 ioloop.install()
39 39 else:
40 40 import tornado.ioloop
41 41 tornado.ioloop.IOLoop = ioloop.IOLoop
42 42
43 43 from tornado import httpserver
44 44 from tornado import web
45 45
46 46 # Our own libraries
47 47 from .kernelmanager import MappingKernelManager
48 48 from .handlers import (LoginHandler, LogoutHandler,
49 49 ProjectDashboardHandler, NewHandler, NamedNotebookHandler,
50 50 MainKernelHandler, KernelHandler, KernelActionHandler, IOPubHandler,
51 51 ShellHandler, NotebookRootHandler, NotebookHandler, NotebookCopyHandler,
52 52 RSTHandler, AuthenticatedFileHandler, PrintNotebookHandler,
53 53 MainClusterHandler, ClusterProfileHandler, ClusterActionHandler
54 54 )
55 55 from .notebookmanager import NotebookManager
56 56 from .clustermanager import ClusterManager
57 57
58 58 from IPython.config.application import catch_config_error, boolean_flag
59 59 from IPython.core.application import BaseIPythonApplication
60 60 from IPython.core.profiledir import ProfileDir
61 61 from IPython.lib.kernel import swallow_argv
62 62 from IPython.zmq.session import Session, default_secure
63 63 from IPython.zmq.zmqshell import ZMQInteractiveShell
64 64 from IPython.zmq.ipkernel import (
65 65 flags as ipkernel_flags,
66 66 aliases as ipkernel_aliases,
67 67 IPKernelApp
68 68 )
69 69 from IPython.utils.traitlets import Dict, Unicode, Integer, List, Enum, Bool
70 70 from IPython.utils import py3compat
71 71
72 72 #-----------------------------------------------------------------------------
73 73 # Module globals
74 74 #-----------------------------------------------------------------------------
75 75
76 76 _kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
77 77 _kernel_action_regex = r"(?P<action>restart|interrupt)"
78 78 _notebook_id_regex = r"(?P<notebook_id>\w+-\w+-\w+-\w+-\w+)"
79 79 _profile_regex = r"(?P<profile>[a-zA-Z0-9]+)"
80 80 _cluster_action_regex = r"(?P<action>start|stop)"
81 81
82 82
83 83 LOCALHOST = '127.0.0.1'
84 84
85 85 _examples = """
86 86 ipython notebook # start the notebook
87 87 ipython notebook --profile=sympy # use the sympy profile
88 88 ipython notebook --pylab=inline # pylab in inline plotting mode
89 89 ipython notebook --certfile=mycert.pem # use SSL/TLS certificate
90 90 ipython notebook --port=5555 --ip=* # Listen on port 5555, all interfaces
91 91 """
92 92
93 93 #-----------------------------------------------------------------------------
94 94 # Helper functions
95 95 #-----------------------------------------------------------------------------
96 96
97 97 def url_path_join(a,b):
98 98 if a.endswith('/') and b.startswith('/'):
99 99 return a[:-1]+b
100 100 else:
101 101 return a+b
102 102
103 103 #-----------------------------------------------------------------------------
104 104 # The Tornado web application
105 105 #-----------------------------------------------------------------------------
106 106
107 107 class NotebookWebApplication(web.Application):
108 108
109 109 def __init__(self, ipython_app, kernel_manager, notebook_manager,
110 110 cluster_manager, log,
111 111 base_project_url, settings_overrides):
112 112 handlers = [
113 113 (r"/", ProjectDashboardHandler),
114 114 (r"/login", LoginHandler),
115 115 (r"/logout", LogoutHandler),
116 116 (r"/new", NewHandler),
117 117 (r"/%s" % _notebook_id_regex, NamedNotebookHandler),
118 118 (r"/%s/copy" % _notebook_id_regex, NotebookCopyHandler),
119 119 (r"/%s/print" % _notebook_id_regex, PrintNotebookHandler),
120 120 (r"/kernels", MainKernelHandler),
121 121 (r"/kernels/%s" % _kernel_id_regex, KernelHandler),
122 122 (r"/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler),
123 123 (r"/kernels/%s/iopub" % _kernel_id_regex, IOPubHandler),
124 124 (r"/kernels/%s/shell" % _kernel_id_regex, ShellHandler),
125 125 (r"/notebooks", NotebookRootHandler),
126 126 (r"/notebooks/%s" % _notebook_id_regex, NotebookHandler),
127 127 (r"/rstservice/render", RSTHandler),
128 128 (r"/files/(.*)", AuthenticatedFileHandler, {'path' : notebook_manager.notebook_dir}),
129 129 (r"/clusters", MainClusterHandler),
130 130 (r"/clusters/%s/%s" % (_profile_regex, _cluster_action_regex), ClusterActionHandler),
131 131 (r"/clusters/%s" % _profile_regex, ClusterProfileHandler),
132 132 ]
133 133 settings = dict(
134 134 template_path=os.path.join(os.path.dirname(__file__), "templates"),
135 135 static_path=os.path.join(os.path.dirname(__file__), "static"),
136 136 cookie_secret=os.urandom(1024),
137 137 login_url="/login",
138 138 )
139 139
140 140 # allow custom overrides for the tornado web app.
141 141 settings.update(settings_overrides)
142 142
143 143 # Python < 2.6.5 doesn't accept unicode keys in f(**kwargs), and
144 144 # base_project_url will always be unicode, which will in turn
145 145 # make the patterns unicode, and ultimately result in unicode
146 146 # keys in kwargs to handler._execute(**kwargs) in tornado.
147 147 # This enforces that base_project_url be ascii in that situation.
148 148 #
149 149 # Note that the URLs these patterns check against are escaped,
150 150 # and thus guaranteed to be ASCII: 'hΓ©llo' is really 'h%C3%A9llo'.
151 151 base_project_url = py3compat.unicode_to_str(base_project_url, 'ascii')
152 152
153 153 # prepend base_project_url onto the patterns that we match
154 154 new_handlers = []
155 155 for handler in handlers:
156 156 pattern = url_path_join(base_project_url, handler[0])
157 157 new_handler = tuple([pattern]+list(handler[1:]))
158 158 new_handlers.append( new_handler )
159 159
160 160 super(NotebookWebApplication, self).__init__(new_handlers, **settings)
161 161
162 162 self.kernel_manager = kernel_manager
163 163 self.notebook_manager = notebook_manager
164 164 self.cluster_manager = cluster_manager
165 165 self.ipython_app = ipython_app
166 166 self.read_only = self.ipython_app.read_only
167 167 self.log = log
168 168
169 169
170 170 #-----------------------------------------------------------------------------
171 171 # Aliases and Flags
172 172 #-----------------------------------------------------------------------------
173 173
174 174 flags = dict(ipkernel_flags)
175 175 flags['no-browser']=(
176 176 {'NotebookApp' : {'open_browser' : False}},
177 177 "Don't open the notebook in a browser after startup."
178 178 )
179 179 flags['no-mathjax']=(
180 180 {'NotebookApp' : {'enable_mathjax' : False}},
181 181 """Disable MathJax
182 182
183 183 MathJax is the javascript library IPython uses to render math/LaTeX. It is
184 184 very large, so you may want to disable it if you have a slow internet
185 185 connection, or for offline use of the notebook.
186 186
187 187 When disabled, equations etc. will appear as their untransformed TeX source.
188 188 """
189 189 )
190 190 flags['read-only'] = (
191 191 {'NotebookApp' : {'read_only' : True}},
192 192 """Allow read-only access to notebooks.
193 193
194 194 When using a password to protect the notebook server, this flag
195 195 allows unauthenticated clients to view the notebook list, and
196 196 individual notebooks, but not edit them, start kernels, or run
197 197 code.
198 198
199 199 If no password is set, the server will be entirely read-only.
200 200 """
201 201 )
202 202
203 203 # Add notebook manager flags
204 204 flags.update(boolean_flag('script', 'NotebookManager.save_script',
205 205 'Auto-save a .py script everytime the .ipynb notebook is saved',
206 206 'Do not auto-save .py scripts for every notebook'))
207 207
208 208 # the flags that are specific to the frontend
209 209 # these must be scrubbed before being passed to the kernel,
210 210 # or it will raise an error on unrecognized flags
211 211 notebook_flags = ['no-browser', 'no-mathjax', 'read-only', 'script', 'no-script']
212 212
213 213 aliases = dict(ipkernel_aliases)
214 214
215 215 aliases.update({
216 216 'ip': 'NotebookApp.ip',
217 217 'port': 'NotebookApp.port',
218 218 'keyfile': 'NotebookApp.keyfile',
219 219 'certfile': 'NotebookApp.certfile',
220 220 'notebook-dir': 'NotebookManager.notebook_dir',
221 221 'browser': 'NotebookApp.browser',
222 222 })
223 223
224 224 # remove ipkernel flags that are singletons, and don't make sense in
225 225 # multi-kernel evironment:
226 226 aliases.pop('f', None)
227 227
228 228 notebook_aliases = [u'port', u'ip', u'keyfile', u'certfile',
229 229 u'notebook-dir']
230 230
231 231 #-----------------------------------------------------------------------------
232 232 # NotebookApp
233 233 #-----------------------------------------------------------------------------
234 234
235 235 class NotebookApp(BaseIPythonApplication):
236 236
237 237 name = 'ipython-notebook'
238 238 default_config_file_name='ipython_notebook_config.py'
239 239
240 240 description = """
241 241 The IPython HTML Notebook.
242 242
243 243 This launches a Tornado based HTML Notebook Server that serves up an
244 244 HTML5/Javascript Notebook client.
245 245 """
246 246 examples = _examples
247 247
248 248 classes = [IPKernelApp, ZMQInteractiveShell, ProfileDir, Session,
249 249 MappingKernelManager, NotebookManager]
250 250 flags = Dict(flags)
251 251 aliases = Dict(aliases)
252 252
253 253 kernel_argv = List(Unicode)
254 254
255 255 log_level = Enum((0,10,20,30,40,50,'DEBUG','INFO','WARN','ERROR','CRITICAL'),
256 256 default_value=logging.INFO,
257 257 config=True,
258 258 help="Set the log level by value or name.")
259 259
260 260 # create requested profiles by default, if they don't exist:
261 261 auto_create = Bool(True)
262 262
263 263 # Network related information.
264 264
265 265 ip = Unicode(LOCALHOST, config=True,
266 266 help="The IP address the notebook server will listen on."
267 267 )
268 268
269 269 def _ip_changed(self, name, old, new):
270 270 if new == u'*': self.ip = u''
271 271
272 272 port = Integer(8888, config=True,
273 273 help="The port the notebook server will listen on."
274 274 )
275 275
276 276 certfile = Unicode(u'', config=True,
277 277 help="""The full path to an SSL/TLS certificate file."""
278 278 )
279 279
280 280 keyfile = Unicode(u'', config=True,
281 281 help="""The full path to a private key file for usage with SSL/TLS."""
282 282 )
283 283
284 284 password = Unicode(u'', config=True,
285 285 help="""Hashed password to use for web authentication.
286 286
287 287 To generate, type in a python/IPython shell:
288 288
289 289 from IPython.lib import passwd; passwd()
290 290
291 291 The string should be of the form type:salt:hashed-password.
292 292 """
293 293 )
294 294
295 295 open_browser = Bool(True, config=True,
296 296 help="""Whether to open in a browser after starting.
297 297 The specific browser used is platform dependent and
298 298 determined by the python standard library `webbrowser`
299 299 module, unless it is overridden using the --browser
300 300 (NotebookApp.browser) configuration option.
301 301 """)
302 302
303 303 browser = Unicode(u'', config=True,
304 304 help="""Specify what command to use to invoke a web
305 305 browser when opening the notebook. If not specified, the
306 306 default browser will be determined by the `webbrowser`
307 307 standard library module, which allows setting of the
308 308 BROWSER environment variable to override it.
309 309 """)
310 310
311 311 read_only = Bool(False, config=True,
312 312 help="Whether to prevent editing/execution of notebooks."
313 313 )
314 314
315 315 webapp_settings = Dict(config=True,
316 316 help="Supply overrides for the tornado.web.Application that the "
317 317 "IPython notebook uses.")
318 318
319 319 enable_mathjax = Bool(True, config=True,
320 320 help="""Whether to enable MathJax for typesetting math/TeX
321 321
322 322 MathJax is the javascript library IPython uses to render math/LaTeX. It is
323 323 very large, so you may want to disable it if you have a slow internet
324 324 connection, or for offline use of the notebook.
325 325
326 326 When disabled, equations etc. will appear as their untransformed TeX source.
327 327 """
328 328 )
329 329 def _enable_mathjax_changed(self, name, old, new):
330 330 """set mathjax url to empty if mathjax is disabled"""
331 331 if not new:
332 332 self.mathjax_url = u''
333 333
334 334 base_project_url = Unicode('/', config=True,
335 335 help='''The base URL for the notebook server''')
336 336 base_kernel_url = Unicode('/', config=True,
337 337 help='''The base URL for the kernel server''')
338 338 websocket_host = Unicode("", config=True,
339 339 help="""The hostname for the websocket server."""
340 340 )
341 341
342 342 mathjax_url = Unicode("", config=True,
343 343 help="""The url for MathJax.js."""
344 344 )
345 345 def _mathjax_url_default(self):
346 346 if not self.enable_mathjax:
347 347 return u''
348 348 static_path = self.webapp_settings.get("static_path", os.path.join(os.path.dirname(__file__), "static"))
349 349 static_url_prefix = self.webapp_settings.get("static_url_prefix",
350 350 "/static/")
351 351 if os.path.exists(os.path.join(static_path, 'mathjax', "MathJax.js")):
352 352 self.log.info("Using local MathJax")
353 353 return static_url_prefix+u"mathjax/MathJax.js"
354 354 else:
355 355 self.log.info("Using MathJax from CDN")
356 356 hostname = "cdn.mathjax.org"
357 357 try:
358 358 # resolve mathjax cdn alias to cloudfront, because Amazon's SSL certificate
359 359 # only works on *.cloudfront.net
360 360 true_host, aliases, IPs = socket.gethostbyname_ex(hostname)
361 361 # I've run this on a few machines, and some put the right answer in true_host,
362 362 # while others gave it in the aliases list, so we check both.
363 363 aliases.insert(0, true_host)
364 364 except Exception:
365 365 self.log.warn("Couldn't determine MathJax CDN info")
366 366 else:
367 367 for alias in aliases:
368 368 parts = alias.split('.')
369 369 # want static foo.cloudfront.net, not dynamic foo.lax3.cloudfront.net
370 370 if len(parts) == 3 and alias.endswith(".cloudfront.net"):
371 371 hostname = alias
372 372 break
373 373
374 374 if not hostname.endswith(".cloudfront.net"):
375 375 self.log.error("Couldn't resolve CloudFront host, required for HTTPS MathJax.")
376 376 self.log.error("Loading from https://cdn.mathjax.org will probably fail due to invalid certificate.")
377 377 self.log.error("For unsecured HTTP access to MathJax use config:")
378 378 self.log.error("NotebookApp.mathjax_url='http://cdn.mathjax.org/mathjax/latest/MathJax.js'")
379 379 return u"https://%s/mathjax/latest/MathJax.js" % hostname
380 380
381 381 def _mathjax_url_changed(self, name, old, new):
382 382 if new and not self.enable_mathjax:
383 383 # enable_mathjax=False overrides mathjax_url
384 384 self.mathjax_url = u''
385 385 else:
386 386 self.log.info("Using MathJax: %s", new)
387 387
388 388 def parse_command_line(self, argv=None):
389 389 super(NotebookApp, self).parse_command_line(argv)
390 390 if argv is None:
391 391 argv = sys.argv[1:]
392 392
393 393 # Scrub frontend-specific flags
394 394 self.kernel_argv = swallow_argv(argv, notebook_aliases, notebook_flags)
395 395 # Kernel should inherit default config file from frontend
396 396 self.kernel_argv.append("--KernelApp.parent_appname='%s'"%self.name)
397 397
398 398 def init_configurables(self):
399 399 # force Session default to be secure
400 400 default_secure(self.config)
401 401 # Create a KernelManager and start a kernel.
402 402 self.kernel_manager = MappingKernelManager(
403 403 config=self.config, log=self.log, kernel_argv=self.kernel_argv,
404 404 connection_dir = self.profile_dir.security_dir,
405 405 )
406 406 self.notebook_manager = NotebookManager(config=self.config, log=self.log)
407 407 self.notebook_manager.list_notebooks()
408 408 self.cluster_manager = ClusterManager(config=self.config, log=self.log)
409 self.cluster_manager.update_profiles()
409 410
410 411 def init_logging(self):
411 412 super(NotebookApp, self).init_logging()
412 413 # This prevents double log messages because tornado use a root logger that
413 414 # self.log is a child of. The logging module dipatches log messages to a log
414 415 # and all of its ancenstors until propagate is set to False.
415 416 self.log.propagate = False
416 417
417 418 def init_webapp(self):
418 419 """initialize tornado webapp and httpserver"""
419 420 self.web_app = NotebookWebApplication(
420 421 self, self.kernel_manager, self.notebook_manager,
421 422 self.cluster_manager, self.log,
422 423 self.base_project_url, self.webapp_settings
423 424 )
424 425 if self.certfile:
425 426 ssl_options = dict(certfile=self.certfile)
426 427 if self.keyfile:
427 428 ssl_options['keyfile'] = self.keyfile
428 429 else:
429 430 ssl_options = None
430 431 self.web_app.password = self.password
431 432 self.http_server = httpserver.HTTPServer(self.web_app, ssl_options=ssl_options)
432 433 if ssl_options is None and not self.ip and not (self.read_only and not self.password):
433 434 self.log.critical('WARNING: the notebook server is listening on all IP addresses '
434 435 'but not using any encryption or authentication. This is highly '
435 436 'insecure and not recommended.')
436 437
437 438 # Try random ports centered around the default.
438 439 from random import randint
439 440 n = 50 # Max number of attempts, keep reasonably large.
440 441 for port in range(self.port, self.port+5) + [self.port + randint(-2*n, 2*n) for i in range(n-5)]:
441 442 try:
442 443 self.http_server.listen(port, self.ip)
443 444 except socket.error, e:
444 445 if e.errno != errno.EADDRINUSE:
445 446 raise
446 447 self.log.info('The port %i is already in use, trying another random port.' % port)
447 448 else:
448 449 self.port = port
449 450 break
450 451
451 452 @catch_config_error
452 453 def initialize(self, argv=None):
453 454 super(NotebookApp, self).initialize(argv)
454 455 self.init_configurables()
455 456 self.init_webapp()
456 457
457 458 def cleanup_kernels(self):
458 459 """shutdown all kernels
459 460
460 461 The kernels will shutdown themselves when this process no longer exists,
461 462 but explicit shutdown allows the KernelManagers to cleanup the connection files.
462 463 """
463 464 self.log.info('Shutting down kernels')
464 465 km = self.kernel_manager
465 466 # copy list, since kill_kernel deletes keys
466 467 for kid in list(km.kernel_ids):
467 468 km.kill_kernel(kid)
468 469
469 470 def start(self):
470 471 ip = self.ip if self.ip else '[all ip addresses on your system]'
471 472 proto = 'https' if self.certfile else 'http'
472 473 info = self.log.info
473 474 info("The IPython Notebook is running at: %s://%s:%i%s" %
474 475 (proto, ip, self.port,self.base_project_url) )
475 476 info("Use Control-C to stop this server and shut down all kernels.")
476 477
477 478 if self.open_browser:
478 479 ip = self.ip or '127.0.0.1'
479 480 if self.browser:
480 481 browser = webbrowser.get(self.browser)
481 482 else:
482 483 browser = webbrowser.get()
483 484 b = lambda : browser.open("%s://%s:%i%s" % (proto, ip, self.port,
484 485 self.base_project_url),
485 486 new=2)
486 487 threading.Thread(target=b).start()
487 488 try:
488 489 ioloop.IOLoop.instance().start()
489 490 except KeyboardInterrupt:
490 491 info("Interrupted...")
491 492 finally:
492 493 self.cleanup_kernels()
493 494
494 495
495 496 #-----------------------------------------------------------------------------
496 497 # Main entry point
497 498 #-----------------------------------------------------------------------------
498 499
499 500 def launch_new_instance():
500 501 app = NotebookApp.instance()
501 502 app.initialize()
502 503 app.start()
503 504
@@ -1,183 +1,181 b''
1 1 //----------------------------------------------------------------------------
2 2 // Copyright (C) 2008-2011 The IPython Development Team
3 3 //
4 4 // Distributed under the terms of the BSD License. The full license is in
5 5 // the file COPYING, distributed as part of this software.
6 6 //----------------------------------------------------------------------------
7 7
8 8 //============================================================================
9 9 // NotebookList
10 10 //============================================================================
11 11
12 12 var IPython = (function (IPython) {
13 13
14 14 var ClusterList = function (selector) {
15 15 this.selector = selector;
16 16 if (this.selector !== undefined) {
17 17 this.element = $(selector);
18 18 this.style();
19 19 this.bind_events();
20 20 }
21 21 };
22 22
23 23 ClusterList.prototype.style = function () {
24 24 $('#cluster_toolbar').addClass('list_toolbar');
25 25 $('#cluster_list_info').addClass('toolbar_info');
26 26 $('#cluster_buttons').addClass('toolbar_buttons');
27 27 $('div#cluster_header').addClass('list_header ui-widget ui-widget-header ui-helper-clearfix');
28 28 $('div#cluster_header').children().eq(0).addClass('profile_col');
29 29 $('div#cluster_header').children().eq(1).addClass('action_col');
30 30 $('div#cluster_header').children().eq(2).addClass('engines_col');
31 31 $('div#cluster_header').children().eq(3).addClass('status_col');
32 32 $('#refresh_cluster_list').button({
33 33 icons : {primary: 'ui-icon-arrowrefresh-1-s'},
34 34 text : false
35 35 });
36 36 };
37 37
38 38
39 39 ClusterList.prototype.bind_events = function () {
40 40 var that = this;
41 41 $('#refresh_cluster_list').click(function () {
42 42 that.load_list();
43 43 });
44 44 };
45 45
46 46
47 47 ClusterList.prototype.load_list = function () {
48 48 var settings = {
49 49 processData : false,
50 50 cache : false,
51 51 type : "GET",
52 52 dataType : "json",
53 53 success : $.proxy(this.load_list_success, this)
54 54 };
55 55 var url = $('body').data('baseProjectUrl') + 'clusters';
56 56 $.ajax(url, settings);
57 57 };
58 58
59 59
60 60 ClusterList.prototype.clear_list = function () {
61 61 this.element.children('.list_item').remove();
62 62 }
63 63
64 64 ClusterList.prototype.load_list_success = function (data, status, xhr) {
65 65 this.clear_list();
66 66 var len = data.length;
67 67 for (var i=0; i<len; i++) {
68 68 var item_div = $('<div/>');
69 69 var item = new ClusterItem(item_div);
70 70 item.update_state(data[i]);
71 71 item_div.data('item', item);
72 72 this.element.append(item_div);
73 73 };
74 74 };
75 75
76 76
77 77 var ClusterItem = function (element) {
78 78 this.element = $(element);
79 79 this.data = null;
80 80 this.style();
81 81 };
82 82
83 83
84 84 ClusterItem.prototype.style = function () {
85 85 this.element.addClass('list_item ui-widget ui-widget-content ui-helper-clearfix');
86 86 this.element.css('border-top-style','none');
87 87 }
88 88
89 89 ClusterItem.prototype.update_state = function (data) {
90 90 this.data = data;
91 91 if (data.status === 'running') {
92 92 this.state_running();
93 93 } else if (data.status === 'stopped') {
94 94 this.state_stopped();
95 95 };
96 96
97 97 }
98 98
99 99
100 100 ClusterItem.prototype.state_stopped = function () {
101 101 var that = this;
102 102 this.element.empty();
103 103 var profile_col = $('<span/>').addClass('profile_col').text(this.data.profile);
104 104 var status_col = $('<span/>').addClass('status_col').html('stopped');
105 105 var engines_col = $('<span/>').addClass('engines_col');
106 106 var label = $('<label/>').addClass('engine_num_label').html('# of engines:');
107 107 var input = $('<input/>').attr('type','text').
108 108 attr('size',3).addClass('engine_num_input');
109 109 engines_col.append(label).append(input);
110 110 var action_col = $('<span/>').addClass('action_col');
111 111 var start_button = $('<button>Start</button>').button();
112 112 action_col.append(start_button);
113 113 this.element.append(profile_col).
114 114 append(action_col).
115 115 append(engines_col).
116 116 append(status_col);
117 117 start_button.click(function (e) {
118 118 var n = that.element.find('.engine_num_input').val();
119 console.log(n);
120 if (!/^\d+$/.test(n)) {
119 if (!/^\d+$/.test(n) && n.length>0) {
121 120 status_col.html('invalid engine #');
122 121 } else {
123 console.log('ajax...');
124 122 var settings = {
125 123 cache : false,
126 124 data : {n:n},
127 125 type : "POST",
128 126 dataType : "json",
129 127 success : function (data, status, xhr) {
130 128 that.update_state(data);
131 129 },
132 130 error : function (data, status, xhr) {
133 131 status_col.html("error starting cluster")
134 132 }
135 133 };
136 134 status_col.html('starting');
137 135 var url = $('body').data('baseProjectUrl') + 'clusters/' + that.data.profile + '/start';
138 136 $.ajax(url, settings);
139 137 };
140 138 });
141 139 };
142 140
143 141
144 142 ClusterItem.prototype.state_running = function () {
145 143 this.element.empty();
146 144 var that = this;
147 145 var profile_col = $('<span/>').addClass('profile_col').text(this.data.profile);
148 146 var status_col = $('<span/>').addClass('status_col').html('running');
149 147 var engines_col = $('<span/>').addClass('engines_col').html(this.data.n);
150 148 var action_col = $('<span/>').addClass('action_col');
151 149 var stop_button = $('<button>Stop</button>').button();
152 150 action_col.append(stop_button);
153 151 this.element.append(profile_col).
154 152 append(action_col).
155 153 append(engines_col).
156 154 append(status_col);
157 155 stop_button.click(function (e) {
158 156 var settings = {
159 157 cache : false,
160 158 type : "POST",
161 159 dataType : "json",
162 160 success : function (data, status, xhr) {
163 161 that.update_state(data);
164 162 },
165 163 error : function (data, status, xhr) {
166 164 console.log('error',data);
167 165 status_col.html("error stopping cluster")
168 166 }
169 167 };
170 168 status_col.html('stopping')
171 169 var url = $('body').data('baseProjectUrl') + 'clusters/' + that.data.profile + '/stop';
172 170 $.ajax(url, settings);
173 171 });
174 172 };
175 173
176 174
177 175 IPython.ClusterList = ClusterList;
178 176 IPython.ClusterItem = ClusterItem;
179 177
180 178 return IPython;
181 179
182 180 }(IPython));
183 181
@@ -1,598 +1,618 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 The ipcluster application.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * MinRK
10 10
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Copyright (C) 2008-2011 The IPython Development Team
15 15 #
16 16 # Distributed under the terms of the BSD License. The full license is in
17 17 # the file COPYING, distributed as part of this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Imports
22 22 #-----------------------------------------------------------------------------
23 23
24 24 import errno
25 25 import logging
26 26 import os
27 27 import re
28 28 import signal
29 29
30 30 from subprocess import check_call, CalledProcessError, PIPE
31 31 import zmq
32 32 from zmq.eventloop import ioloop
33 33
34 34 from IPython.config.application import Application, boolean_flag, catch_config_error
35 35 from IPython.config.loader import Config
36 36 from IPython.core.application import BaseIPythonApplication
37 37 from IPython.core.profiledir import ProfileDir
38 38 from IPython.utils.daemonize import daemonize
39 39 from IPython.utils.importstring import import_item
40 40 from IPython.utils.sysinfo import num_cpus
41 41 from IPython.utils.traitlets import (Integer, Unicode, Bool, CFloat, Dict, List, Any,
42 42 DottedObjectName)
43 43
44 44 from IPython.parallel.apps.baseapp import (
45 45 BaseParallelApplication,
46 46 PIDFileError,
47 47 base_flags, base_aliases
48 48 )
49 49
50 50
51 51 #-----------------------------------------------------------------------------
52 52 # Module level variables
53 53 #-----------------------------------------------------------------------------
54 54
55 55
56 56 default_config_file_name = u'ipcluster_config.py'
57 57
58 58
59 59 _description = """Start an IPython cluster for parallel computing.
60 60
61 61 An IPython cluster consists of 1 controller and 1 or more engines.
62 62 This command automates the startup of these processes using a wide
63 63 range of startup methods (SSH, local processes, PBS, mpiexec,
64 64 Windows HPC Server 2008). To start a cluster with 4 engines on your
65 65 local host simply do 'ipcluster start --n=4'. For more complex usage
66 66 you will typically do 'ipython profile create mycluster --parallel', then edit
67 67 configuration files, followed by 'ipcluster start --profile=mycluster --n=4'.
68 68 """
69 69
70 70 _main_examples = """
71 71 ipcluster start --n=4 # start a 4 node cluster on localhost
72 72 ipcluster start -h # show the help string for the start subcmd
73 73
74 74 ipcluster stop -h # show the help string for the stop subcmd
75 75 ipcluster engines -h # show the help string for the engines subcmd
76 76 """
77 77
78 78 _start_examples = """
79 79 ipython profile create mycluster --parallel # create mycluster profile
80 80 ipcluster start --profile=mycluster --n=4 # start mycluster with 4 nodes
81 81 """
82 82
83 83 _stop_examples = """
84 84 ipcluster stop --profile=mycluster # stop a running cluster by profile name
85 85 """
86 86
87 87 _engines_examples = """
88 88 ipcluster engines --profile=mycluster --n=4 # start 4 engines only
89 89 """
90 90
91 91
92 92 # Exit codes for ipcluster
93 93
94 94 # This will be the exit code if the ipcluster appears to be running because
95 95 # a .pid file exists
96 96 ALREADY_STARTED = 10
97 97
98 98
99 99 # This will be the exit code if ipcluster stop is run, but there is not .pid
100 100 # file to be found.
101 101 ALREADY_STOPPED = 11
102 102
103 103 # This will be the exit code if ipcluster engines is run, but there is not .pid
104 104 # file to be found.
105 105 NO_CLUSTER = 12
106 106
107 107
108 108 #-----------------------------------------------------------------------------
109 # Utilities
110 #-----------------------------------------------------------------------------
111
112 def find_launcher_class(clsname, kind):
113 """Return a launcher for a given clsname and kind.
114
115 Parameters
116 ==========
117 clsname : str
118 The full name of the launcher class, either with or without the
119 module path, or an abbreviation (MPI, SSH, SGE, PBS, LSF,
120 WindowsHPC).
121 kind : str
122 Either 'EngineSet' or 'Controller'.
123 """
124 if '.' not in clsname:
125 # not a module, presume it's the raw name in apps.launcher
126 if kind and kind not in clsname:
127 # doesn't match necessary full class name, assume it's
128 # just 'PBS' or 'MPI' prefix:
129 clsname = clsname + kind + 'Launcher'
130 clsname = 'IPython.parallel.apps.launcher.'+clsname
131 klass = import_item(clsname)
132 return klass
133
134 #-----------------------------------------------------------------------------
109 135 # Main application
110 136 #-----------------------------------------------------------------------------
137
111 138 start_help = """Start an IPython cluster for parallel computing
112 139
113 140 Start an ipython cluster by its profile name or cluster
114 141 directory. Cluster directories contain configuration, log and
115 142 security related files and are named using the convention
116 143 'profile_<name>' and should be creating using the 'start'
117 144 subcommand of 'ipcluster'. If your cluster directory is in
118 145 the cwd or the ipython directory, you can simply refer to it
119 146 using its profile name, 'ipcluster start --n=4 --profile=<profile>`,
120 147 otherwise use the 'profile-dir' option.
121 148 """
122 149 stop_help = """Stop a running IPython cluster
123 150
124 151 Stop a running ipython cluster by its profile name or cluster
125 152 directory. Cluster directories are named using the convention
126 153 'profile_<name>'. If your cluster directory is in
127 154 the cwd or the ipython directory, you can simply refer to it
128 155 using its profile name, 'ipcluster stop --profile=<profile>`, otherwise
129 156 use the '--profile-dir' option.
130 157 """
131 158 engines_help = """Start engines connected to an existing IPython cluster
132 159
133 160 Start one or more engines to connect to an existing Cluster
134 161 by profile name or cluster directory.
135 162 Cluster directories contain configuration, log and
136 163 security related files and are named using the convention
137 164 'profile_<name>' and should be creating using the 'start'
138 165 subcommand of 'ipcluster'. If your cluster directory is in
139 166 the cwd or the ipython directory, you can simply refer to it
140 167 using its profile name, 'ipcluster engines --n=4 --profile=<profile>`,
141 168 otherwise use the 'profile-dir' option.
142 169 """
143 170 stop_aliases = dict(
144 171 signal='IPClusterStop.signal',
145 172 )
146 173 stop_aliases.update(base_aliases)
147 174
148 175 class IPClusterStop(BaseParallelApplication):
149 176 name = u'ipcluster'
150 177 description = stop_help
151 178 examples = _stop_examples
152 179 config_file_name = Unicode(default_config_file_name)
153 180
154 181 signal = Integer(signal.SIGINT, config=True,
155 182 help="signal to use for stopping processes.")
156 183
157 184 aliases = Dict(stop_aliases)
158 185
159 186 def start(self):
160 187 """Start the app for the stop subcommand."""
161 188 try:
162 189 pid = self.get_pid_from_file()
163 190 except PIDFileError:
164 191 self.log.critical(
165 192 'Could not read pid file, cluster is probably not running.'
166 193 )
167 194 # Here I exit with a unusual exit status that other processes
168 195 # can watch for to learn how I existed.
169 196 self.remove_pid_file()
170 197 self.exit(ALREADY_STOPPED)
171 198
172 199 if not self.check_pid(pid):
173 200 self.log.critical(
174 201 'Cluster [pid=%r] is not running.' % pid
175 202 )
176 203 self.remove_pid_file()
177 204 # Here I exit with a unusual exit status that other processes
178 205 # can watch for to learn how I existed.
179 206 self.exit(ALREADY_STOPPED)
180 207
181 208 elif os.name=='posix':
182 209 sig = self.signal
183 210 self.log.info(
184 211 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
185 212 )
186 213 try:
187 214 os.kill(pid, sig)
188 215 except OSError:
189 216 self.log.error("Stopping cluster failed, assuming already dead.",
190 217 exc_info=True)
191 218 self.remove_pid_file()
192 219 elif os.name=='nt':
193 220 try:
194 221 # kill the whole tree
195 222 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
196 223 except (CalledProcessError, OSError):
197 224 self.log.error("Stopping cluster failed, assuming already dead.",
198 225 exc_info=True)
199 226 self.remove_pid_file()
200 227
201 228 engine_aliases = {}
202 229 engine_aliases.update(base_aliases)
203 230 engine_aliases.update(dict(
204 231 n='IPClusterEngines.n',
205 232 engines = 'IPClusterEngines.engine_launcher_class',
206 233 daemonize = 'IPClusterEngines.daemonize',
207 234 ))
208 235 engine_flags = {}
209 236 engine_flags.update(base_flags)
210 237
211 238 engine_flags.update(dict(
212 239 daemonize=(
213 240 {'IPClusterEngines' : {'daemonize' : True}},
214 241 """run the cluster into the background (not available on Windows)""",
215 242 )
216 243 ))
217 244 class IPClusterEngines(BaseParallelApplication):
218 245
219 246 name = u'ipcluster'
220 247 description = engines_help
221 248 examples = _engines_examples
222 249 usage = None
223 250 config_file_name = Unicode(default_config_file_name)
224 251 default_log_level = logging.INFO
225 252 classes = List()
226 253 def _classes_default(self):
227 254 from IPython.parallel.apps import launcher
228 255 launchers = launcher.all_launchers
229 256 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
230 257 return [ProfileDir]+eslaunchers
231 258
232 259 n = Integer(num_cpus(), config=True,
233 260 help="""The number of engines to start. The default is to use one for each
234 261 CPU on your machine""")
235 262
236 263 engine_launcher = Any(config=True, help="Deprecated, use engine_launcher_class")
237 264 def _engine_launcher_changed(self, name, old, new):
238 265 if isinstance(new, basestring):
239 266 self.log.warn("WARNING: %s.engine_launcher is deprecated as of 0.12,"
240 267 " use engine_launcher_class" % self.__class__.__name__)
241 268 self.engine_launcher_class = new
242 269 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
243 270 config=True,
244 271 help="""The class for launching a set of Engines. Change this value
245 272 to use various batch systems to launch your engines, such as PBS,SGE,MPI,etc.
246 273 Each launcher class has its own set of configuration options, for making sure
247 274 it will work in your environment.
248 275
249 276 You can also write your own launcher, and specify it's absolute import path,
250 277 as in 'mymodule.launcher.FTLEnginesLauncher`.
251 278
252 279 IPython's bundled examples include:
253 280
254 281 Local : start engines locally as subprocesses [default]
255 282 MPI : use mpiexec to launch engines in an MPI environment
256 283 PBS : use PBS (qsub) to submit engines to a batch queue
257 284 SGE : use SGE (qsub) to submit engines to a batch queue
258 285 LSF : use LSF (bsub) to submit engines to a batch queue
259 286 SSH : use SSH to start the controller
260 287 Note that SSH does *not* move the connection files
261 288 around, so you will likely have to do this manually
262 289 unless the machines are on a shared file system.
263 290 WindowsHPC : use Windows HPC
264 291
265 292 If you are using one of IPython's builtin launchers, you can specify just the
266 293 prefix, e.g:
267 294
268 295 c.IPClusterEngines.engine_launcher_class = 'SSH'
269 296
270 297 or:
271 298
272 299 ipcluster start --engines=MPI
273 300
274 301 """
275 302 )
276 303 daemonize = Bool(False, config=True,
277 304 help="""Daemonize the ipcluster program. This implies --log-to-file.
278 305 Not available on Windows.
279 306 """)
280 307
281 308 def _daemonize_changed(self, name, old, new):
282 309 if new:
283 310 self.log_to_file = True
284 311
285 312 early_shutdown = Integer(30, config=True, help="The timeout (in seconds)")
286 313 _stopping = False
287 314
288 315 aliases = Dict(engine_aliases)
289 316 flags = Dict(engine_flags)
290 317
291 318 @catch_config_error
292 319 def initialize(self, argv=None):
293 320 super(IPClusterEngines, self).initialize(argv)
294 321 self.init_signal()
295 322 self.init_launchers()
296 323
297 324 def init_launchers(self):
298 325 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
299 326
300 327 def init_signal(self):
301 328 # Setup signals
302 329 signal.signal(signal.SIGINT, self.sigint_handler)
303 330
304 331 def build_launcher(self, clsname, kind=None):
305 332 """import and instantiate a Launcher based on importstring"""
306 if '.' not in clsname:
307 # not a module, presume it's the raw name in apps.launcher
308 if kind and kind not in clsname:
309 # doesn't match necessary full class name, assume it's
310 # just 'PBS' or 'MPI' prefix:
311 clsname = clsname + kind + 'Launcher'
312 clsname = 'IPython.parallel.apps.launcher.'+clsname
313 333 try:
314 klass = import_item(clsname)
334 klass = find_launcher_class(clsname, kind)
315 335 except (ImportError, KeyError):
316 336 self.log.fatal("Could not import launcher class: %r"%clsname)
317 337 self.exit(1)
318 338
319 339 launcher = klass(
320 340 work_dir=u'.', config=self.config, log=self.log,
321 341 profile_dir=self.profile_dir.location, cluster_id=self.cluster_id,
322 342 )
323 343 return launcher
324 344
325 345 def engines_started_ok(self):
326 346 self.log.info("Engines appear to have started successfully")
327 347 self.early_shutdown = 0
328 348
329 349 def start_engines(self):
330 350 # Some EngineSetLaunchers ignore `n` and use their own engine count, such as SSH:
331 351 n = getattr(self.engine_launcher, 'engine_count', self.n)
332 352 self.log.info("Starting %s Engines with %s", n, self.engine_launcher_class)
333 353 self.engine_launcher.start(self.n)
334 354 self.engine_launcher.on_stop(self.engines_stopped_early)
335 355 if self.early_shutdown:
336 356 ioloop.DelayedCallback(self.engines_started_ok, self.early_shutdown*1000, self.loop).start()
337 357
338 358 def engines_stopped_early(self, r):
339 359 if self.early_shutdown and not self._stopping:
340 360 self.log.error("""
341 361 Engines shutdown early, they probably failed to connect.
342 362
343 363 Check the engine log files for output.
344 364
345 365 If your controller and engines are not on the same machine, you probably
346 366 have to instruct the controller to listen on an interface other than localhost.
347 367
348 368 You can set this by adding "--ip='*'" to your ControllerLauncher.controller_args.
349 369
350 370 Be sure to read our security docs before instructing your controller to listen on
351 371 a public interface.
352 372 """)
353 373 self.stop_launchers()
354 374
355 375 return self.engines_stopped(r)
356 376
357 377 def engines_stopped(self, r):
358 378 return self.loop.stop()
359 379
360 380 def stop_engines(self):
361 381 if self.engine_launcher.running:
362 382 self.log.info("Stopping Engines...")
363 383 d = self.engine_launcher.stop()
364 384 return d
365 385 else:
366 386 return None
367 387
368 388 def stop_launchers(self, r=None):
369 389 if not self._stopping:
370 390 self._stopping = True
371 391 self.log.error("IPython cluster: stopping")
372 392 self.stop_engines()
373 393 # Wait a few seconds to let things shut down.
374 394 dc = ioloop.DelayedCallback(self.loop.stop, 3000, self.loop)
375 395 dc.start()
376 396
377 397 def sigint_handler(self, signum, frame):
378 398 self.log.debug("SIGINT received, stopping launchers...")
379 399 self.stop_launchers()
380 400
381 401 def start_logging(self):
382 402 # Remove old log files of the controller and engine
383 403 if self.clean_logs:
384 404 log_dir = self.profile_dir.log_dir
385 405 for f in os.listdir(log_dir):
386 406 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
387 407 os.remove(os.path.join(log_dir, f))
388 408 # This will remove old log files for ipcluster itself
389 409 # super(IPBaseParallelApplication, self).start_logging()
390 410
391 411 def start(self):
392 412 """Start the app for the engines subcommand."""
393 413 self.log.info("IPython cluster: started")
394 414 # First see if the cluster is already running
395 415
396 416 # Now log and daemonize
397 417 self.log.info(
398 418 'Starting engines with [daemon=%r]' % self.daemonize
399 419 )
400 420 # TODO: Get daemonize working on Windows or as a Windows Server.
401 421 if self.daemonize:
402 422 if os.name=='posix':
403 423 daemonize()
404 424
405 425 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
406 426 dc.start()
407 427 # Now write the new pid file AFTER our new forked pid is active.
408 428 # self.write_pid_file()
409 429 try:
410 430 self.loop.start()
411 431 except KeyboardInterrupt:
412 432 pass
413 433 except zmq.ZMQError as e:
414 434 if e.errno == errno.EINTR:
415 435 pass
416 436 else:
417 437 raise
418 438
419 439 start_aliases = {}
420 440 start_aliases.update(engine_aliases)
421 441 start_aliases.update(dict(
422 442 delay='IPClusterStart.delay',
423 443 controller = 'IPClusterStart.controller_launcher_class',
424 444 ))
425 445 start_aliases['clean-logs'] = 'IPClusterStart.clean_logs'
426 446
427 447 class IPClusterStart(IPClusterEngines):
428 448
429 449 name = u'ipcluster'
430 450 description = start_help
431 451 examples = _start_examples
432 452 default_log_level = logging.INFO
433 453 auto_create = Bool(True, config=True,
434 454 help="whether to create the profile_dir if it doesn't exist")
435 455 classes = List()
436 456 def _classes_default(self,):
437 457 from IPython.parallel.apps import launcher
438 458 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
439 459
440 460 clean_logs = Bool(True, config=True,
441 461 help="whether to cleanup old logs before starting")
442 462
443 463 delay = CFloat(1., config=True,
444 464 help="delay (in s) between starting the controller and the engines")
445 465
446 466 controller_launcher = Any(config=True, help="Deprecated, use controller_launcher_class")
447 467 def _controller_launcher_changed(self, name, old, new):
448 468 if isinstance(new, basestring):
449 469 # old 0.11-style config
450 470 self.log.warn("WARNING: %s.controller_launcher is deprecated as of 0.12,"
451 471 " use controller_launcher_class" % self.__class__.__name__)
452 472 self.controller_launcher_class = new
453 473 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
454 474 config=True,
455 475 help="""The class for launching a Controller. Change this value if you want
456 476 your controller to also be launched by a batch system, such as PBS,SGE,MPI,etc.
457 477
458 478 Each launcher class has its own set of configuration options, for making sure
459 479 it will work in your environment.
460 480
461 481 Note that using a batch launcher for the controller *does not* put it
462 482 in the same batch job as the engines, so they will still start separately.
463 483
464 484 IPython's bundled examples include:
465 485
466 486 Local : start engines locally as subprocesses
467 487 MPI : use mpiexec to launch the controller in an MPI universe
468 488 PBS : use PBS (qsub) to submit the controller to a batch queue
469 489 SGE : use SGE (qsub) to submit the controller to a batch queue
470 490 LSF : use LSF (bsub) to submit the controller to a batch queue
471 491 SSH : use SSH to start the controller
472 492 WindowsHPC : use Windows HPC
473 493
474 494 If you are using one of IPython's builtin launchers, you can specify just the
475 495 prefix, e.g:
476 496
477 497 c.IPClusterStart.controller_launcher_class = 'SSH'
478 498
479 499 or:
480 500
481 501 ipcluster start --controller=MPI
482 502
483 503 """
484 504 )
485 505 reset = Bool(False, config=True,
486 506 help="Whether to reset config files as part of '--create'."
487 507 )
488 508
489 509 # flags = Dict(flags)
490 510 aliases = Dict(start_aliases)
491 511
492 512 def init_launchers(self):
493 513 self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller')
494 514 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
495 515 self.controller_launcher.on_stop(self.stop_launchers)
496 516
497 517 def engines_stopped(self, r):
498 518 """prevent parent.engines_stopped from stopping everything on engine shutdown"""
499 519 pass
500 520
501 521 def start_controller(self):
502 522 self.log.info("Starting Controller with %s", self.controller_launcher_class)
503 523 self.controller_launcher.start()
504 524
505 525 def stop_controller(self):
506 526 # self.log.info("In stop_controller")
507 527 if self.controller_launcher and self.controller_launcher.running:
508 528 return self.controller_launcher.stop()
509 529
510 530 def stop_launchers(self, r=None):
511 531 if not self._stopping:
512 532 self.stop_controller()
513 533 super(IPClusterStart, self).stop_launchers()
514 534
515 535 def start(self):
516 536 """Start the app for the start subcommand."""
517 537 # First see if the cluster is already running
518 538 try:
519 539 pid = self.get_pid_from_file()
520 540 except PIDFileError:
521 541 pass
522 542 else:
523 543 if self.check_pid(pid):
524 544 self.log.critical(
525 545 'Cluster is already running with [pid=%s]. '
526 546 'use "ipcluster stop" to stop the cluster.' % pid
527 547 )
528 548 # Here I exit with a unusual exit status that other processes
529 549 # can watch for to learn how I existed.
530 550 self.exit(ALREADY_STARTED)
531 551 else:
532 552 self.remove_pid_file()
533 553
534 554
535 555 # Now log and daemonize
536 556 self.log.info(
537 557 'Starting ipcluster with [daemon=%r]' % self.daemonize
538 558 )
539 559 # TODO: Get daemonize working on Windows or as a Windows Server.
540 560 if self.daemonize:
541 561 if os.name=='posix':
542 562 daemonize()
543 563
544 564 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
545 565 dc.start()
546 566 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
547 567 dc.start()
548 568 # Now write the new pid file AFTER our new forked pid is active.
549 569 self.write_pid_file()
550 570 try:
551 571 self.loop.start()
552 572 except KeyboardInterrupt:
553 573 pass
554 574 except zmq.ZMQError as e:
555 575 if e.errno == errno.EINTR:
556 576 pass
557 577 else:
558 578 raise
559 579 finally:
560 580 self.remove_pid_file()
561 581
562 582 base='IPython.parallel.apps.ipclusterapp.IPCluster'
563 583
564 584 class IPClusterApp(Application):
565 585 name = u'ipcluster'
566 586 description = _description
567 587 examples = _main_examples
568 588
569 589 subcommands = {
570 590 'start' : (base+'Start', start_help),
571 591 'stop' : (base+'Stop', stop_help),
572 592 'engines' : (base+'Engines', engines_help),
573 593 }
574 594
575 595 # no aliases or flags for parent App
576 596 aliases = Dict()
577 597 flags = Dict()
578 598
579 599 def start(self):
580 600 if self.subapp is None:
581 601 print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys())
582 602 print
583 603 self.print_description()
584 604 self.print_subcommands()
585 605 self.exit(1)
586 606 else:
587 607 return self.subapp.start()
588 608
589 609 def launch_new_instance():
590 610 """Create and run the IPython cluster."""
591 611 app = IPClusterApp.instance()
592 612 app.initialize()
593 613 app.start()
594 614
595 615
596 616 if __name__ == '__main__':
597 617 launch_new_instance()
598 618
@@ -1,1223 +1,1223 b''
1 1 # encoding: utf-8
2 2 """
3 3 Facilities for launching IPython processes asynchronously.
4 4
5 5 Authors:
6 6
7 7 * Brian Granger
8 8 * MinRK
9 9 """
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2008-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #-----------------------------------------------------------------------------
19 19 # Imports
20 20 #-----------------------------------------------------------------------------
21 21
22 22 import copy
23 23 import logging
24 24 import os
25 25 import re
26 26 import stat
27 27 import time
28 28
29 29 # signal imports, handling various platforms, versions
30 30
31 31 from signal import SIGINT, SIGTERM
32 32 try:
33 33 from signal import SIGKILL
34 34 except ImportError:
35 35 # Windows
36 36 SIGKILL=SIGTERM
37 37
38 38 try:
39 39 # Windows >= 2.7, 3.2
40 40 from signal import CTRL_C_EVENT as SIGINT
41 41 except ImportError:
42 42 pass
43 43
44 44 from subprocess import Popen, PIPE, STDOUT
45 45 try:
46 46 from subprocess import check_output
47 47 except ImportError:
48 48 # pre-2.7, define check_output with Popen
49 49 def check_output(*args, **kwargs):
50 50 kwargs.update(dict(stdout=PIPE))
51 51 p = Popen(*args, **kwargs)
52 52 out,err = p.communicate()
53 53 return out
54 54
55 55 from zmq.eventloop import ioloop
56 56
57 57 from IPython.config.application import Application
58 58 from IPython.config.configurable import LoggingConfigurable
59 59 from IPython.utils.text import EvalFormatter
60 60 from IPython.utils.traitlets import (
61 61 Any, Integer, CFloat, List, Unicode, Dict, Instance, HasTraits,
62 62 )
63 63 from IPython.utils.path import get_ipython_module_path
64 64 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
65 65
66 66 from .win32support import forward_read_events
67 67
68 68 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
69 69
70 70 WINDOWS = os.name == 'nt'
71 71
72 72 #-----------------------------------------------------------------------------
73 73 # Paths to the kernel apps
74 74 #-----------------------------------------------------------------------------
75 75
76 76
77 77 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
78 78 'IPython.parallel.apps.ipclusterapp'
79 79 ))
80 80
81 81 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
82 82 'IPython.parallel.apps.ipengineapp'
83 83 ))
84 84
85 85 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
86 86 'IPython.parallel.apps.ipcontrollerapp'
87 87 ))
88 88
89 89 #-----------------------------------------------------------------------------
90 90 # Base launchers and errors
91 91 #-----------------------------------------------------------------------------
92 92
93 93
94 94 class LauncherError(Exception):
95 95 pass
96 96
97 97
98 98 class ProcessStateError(LauncherError):
99 99 pass
100 100
101 101
102 102 class UnknownStatus(LauncherError):
103 103 pass
104 104
105 105
106 106 class BaseLauncher(LoggingConfigurable):
107 107 """An asbtraction for starting, stopping and signaling a process."""
108 108
109 109 # In all of the launchers, the work_dir is where child processes will be
110 110 # run. This will usually be the profile_dir, but may not be. any work_dir
111 111 # passed into the __init__ method will override the config value.
112 112 # This should not be used to set the work_dir for the actual engine
113 113 # and controller. Instead, use their own config files or the
114 114 # controller_args, engine_args attributes of the launchers to add
115 115 # the work_dir option.
116 116 work_dir = Unicode(u'.')
117 117 loop = Instance('zmq.eventloop.ioloop.IOLoop')
118 118
119 119 start_data = Any()
120 120 stop_data = Any()
121 121
122 122 def _loop_default(self):
123 123 return ioloop.IOLoop.instance()
124 124
125 125 def __init__(self, work_dir=u'.', config=None, **kwargs):
126 126 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
127 127 self.state = 'before' # can be before, running, after
128 128 self.stop_callbacks = []
129 129 self.start_data = None
130 130 self.stop_data = None
131 131
132 132 @property
133 133 def args(self):
134 134 """A list of cmd and args that will be used to start the process.
135 135
136 136 This is what is passed to :func:`spawnProcess` and the first element
137 137 will be the process name.
138 138 """
139 139 return self.find_args()
140 140
141 141 def find_args(self):
142 142 """The ``.args`` property calls this to find the args list.
143 143
144 144 Subcommand should implement this to construct the cmd and args.
145 145 """
146 146 raise NotImplementedError('find_args must be implemented in a subclass')
147 147
148 148 @property
149 149 def arg_str(self):
150 150 """The string form of the program arguments."""
151 151 return ' '.join(self.args)
152 152
153 153 @property
154 154 def running(self):
155 155 """Am I running."""
156 156 if self.state == 'running':
157 157 return True
158 158 else:
159 159 return False
160 160
161 161 def start(self):
162 162 """Start the process."""
163 163 raise NotImplementedError('start must be implemented in a subclass')
164 164
165 165 def stop(self):
166 166 """Stop the process and notify observers of stopping.
167 167
168 168 This method will return None immediately.
169 169 To observe the actual process stopping, see :meth:`on_stop`.
170 170 """
171 171 raise NotImplementedError('stop must be implemented in a subclass')
172 172
173 173 def on_stop(self, f):
174 174 """Register a callback to be called with this Launcher's stop_data
175 175 when the process actually finishes.
176 176 """
177 177 if self.state=='after':
178 178 return f(self.stop_data)
179 179 else:
180 180 self.stop_callbacks.append(f)
181 181
182 182 def notify_start(self, data):
183 183 """Call this to trigger startup actions.
184 184
185 185 This logs the process startup and sets the state to 'running'. It is
186 186 a pass-through so it can be used as a callback.
187 187 """
188 188
189 189 self.log.debug('Process %r started: %r', self.args[0], data)
190 190 self.start_data = data
191 191 self.state = 'running'
192 192 return data
193 193
194 194 def notify_stop(self, data):
195 195 """Call this to trigger process stop actions.
196 196
197 197 This logs the process stopping and sets the state to 'after'. Call
198 198 this to trigger callbacks registered via :meth:`on_stop`."""
199 199
200 200 self.log.debug('Process %r stopped: %r', self.args[0], data)
201 201 self.stop_data = data
202 202 self.state = 'after'
203 203 for i in range(len(self.stop_callbacks)):
204 204 d = self.stop_callbacks.pop()
205 205 d(data)
206 206 return data
207 207
208 208 def signal(self, sig):
209 209 """Signal the process.
210 210
211 211 Parameters
212 212 ----------
213 213 sig : str or int
214 214 'KILL', 'INT', etc., or any signal number
215 215 """
216 216 raise NotImplementedError('signal must be implemented in a subclass')
217 217
218 218 class ClusterAppMixin(HasTraits):
219 219 """MixIn for cluster args as traits"""
220 220 cluster_args = List([])
221 221 profile_dir=Unicode('')
222 222 cluster_id=Unicode('')
223 223 def _profile_dir_changed(self, name, old, new):
224 224 self.cluster_args = []
225 225 if self.profile_dir:
226 226 self.cluster_args.extend(['--profile-dir', self.profile_dir])
227 227 if self.cluster_id:
228 228 self.cluster_args.extend(['--cluster-id', self.cluster_id])
229 229 _cluster_id_changed = _profile_dir_changed
230 230
231 231 class ControllerMixin(ClusterAppMixin):
232 232 controller_cmd = List(ipcontroller_cmd_argv, config=True,
233 233 help="""Popen command to launch ipcontroller.""")
234 234 # Command line arguments to ipcontroller.
235 235 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
236 236 help="""command-line args to pass to ipcontroller""")
237 237
238 238 class EngineMixin(ClusterAppMixin):
239 239 engine_cmd = List(ipengine_cmd_argv, config=True,
240 240 help="""command to launch the Engine.""")
241 241 # Command line arguments for ipengine.
242 242 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
243 243 help="command-line arguments to pass to ipengine"
244 244 )
245 245
246 246 #-----------------------------------------------------------------------------
247 247 # Local process launchers
248 248 #-----------------------------------------------------------------------------
249 249
250 250
251 251 class LocalProcessLauncher(BaseLauncher):
252 252 """Start and stop an external process in an asynchronous manner.
253 253
254 254 This will launch the external process with a working directory of
255 255 ``self.work_dir``.
256 256 """
257 257
258 258 # This is used to to construct self.args, which is passed to
259 259 # spawnProcess.
260 260 cmd_and_args = List([])
261 261 poll_frequency = Integer(100) # in ms
262 262
263 263 def __init__(self, work_dir=u'.', config=None, **kwargs):
264 264 super(LocalProcessLauncher, self).__init__(
265 265 work_dir=work_dir, config=config, **kwargs
266 266 )
267 267 self.process = None
268 268 self.poller = None
269 269
270 270 def find_args(self):
271 271 return self.cmd_and_args
272 272
273 273 def start(self):
274 274 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
275 275 if self.state == 'before':
276 276 self.process = Popen(self.args,
277 277 stdout=PIPE,stderr=PIPE,stdin=PIPE,
278 278 env=os.environ,
279 279 cwd=self.work_dir
280 280 )
281 281 if WINDOWS:
282 282 self.stdout = forward_read_events(self.process.stdout)
283 283 self.stderr = forward_read_events(self.process.stderr)
284 284 else:
285 285 self.stdout = self.process.stdout.fileno()
286 286 self.stderr = self.process.stderr.fileno()
287 287 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
288 288 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
289 289 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
290 290 self.poller.start()
291 291 self.notify_start(self.process.pid)
292 292 else:
293 293 s = 'The process was already started and has state: %r' % self.state
294 294 raise ProcessStateError(s)
295 295
296 296 def stop(self):
297 297 return self.interrupt_then_kill()
298 298
299 299 def signal(self, sig):
300 300 if self.state == 'running':
301 301 if WINDOWS and sig != SIGINT:
302 302 # use Windows tree-kill for better child cleanup
303 303 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
304 304 else:
305 305 self.process.send_signal(sig)
306 306
307 307 def interrupt_then_kill(self, delay=2.0):
308 308 """Send INT, wait a delay and then send KILL."""
309 309 try:
310 310 self.signal(SIGINT)
311 311 except Exception:
312 312 self.log.debug("interrupt failed")
313 313 pass
314 314 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
315 315 self.killer.start()
316 316
317 317 # callbacks, etc:
318 318
319 319 def handle_stdout(self, fd, events):
320 320 if WINDOWS:
321 321 line = self.stdout.recv()
322 322 else:
323 323 line = self.process.stdout.readline()
324 324 # a stopped process will be readable but return empty strings
325 325 if line:
326 326 self.log.debug(line[:-1])
327 327 else:
328 328 self.poll()
329 329
330 330 def handle_stderr(self, fd, events):
331 331 if WINDOWS:
332 332 line = self.stderr.recv()
333 333 else:
334 334 line = self.process.stderr.readline()
335 335 # a stopped process will be readable but return empty strings
336 336 if line:
337 337 self.log.debug(line[:-1])
338 338 else:
339 339 self.poll()
340 340
341 341 def poll(self):
342 342 status = self.process.poll()
343 343 if status is not None:
344 344 self.poller.stop()
345 345 self.loop.remove_handler(self.stdout)
346 346 self.loop.remove_handler(self.stderr)
347 347 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
348 348 return status
349 349
350 350 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
351 351 """Launch a controller as a regular external process."""
352 352
353 353 def find_args(self):
354 354 return self.controller_cmd + self.cluster_args + self.controller_args
355 355
356 356 def start(self):
357 357 """Start the controller by profile_dir."""
358 358 return super(LocalControllerLauncher, self).start()
359 359
360 360
361 361 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
362 362 """Launch a single engine as a regular externall process."""
363 363
364 364 def find_args(self):
365 365 return self.engine_cmd + self.cluster_args + self.engine_args
366 366
367 367
368 368 class LocalEngineSetLauncher(LocalEngineLauncher):
369 369 """Launch a set of engines as regular external processes."""
370 370
371 371 delay = CFloat(0.1, config=True,
372 372 help="""delay (in seconds) between starting each engine after the first.
373 373 This can help force the engines to get their ids in order, or limit
374 374 process flood when starting many engines."""
375 375 )
376 376
377 377 # launcher class
378 378 launcher_class = LocalEngineLauncher
379 379
380 380 launchers = Dict()
381 381 stop_data = Dict()
382 382
383 383 def __init__(self, work_dir=u'.', config=None, **kwargs):
384 384 super(LocalEngineSetLauncher, self).__init__(
385 385 work_dir=work_dir, config=config, **kwargs
386 386 )
387 387 self.stop_data = {}
388 388
389 389 def start(self, n):
390 390 """Start n engines by profile or profile_dir."""
391 391 dlist = []
392 392 for i in range(n):
393 393 if i > 0:
394 394 time.sleep(self.delay)
395 395 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
396 396 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
397 397 )
398 398
399 399 # Copy the engine args over to each engine launcher.
400 400 el.engine_cmd = copy.deepcopy(self.engine_cmd)
401 401 el.engine_args = copy.deepcopy(self.engine_args)
402 402 el.on_stop(self._notice_engine_stopped)
403 403 d = el.start()
404 404 self.launchers[i] = el
405 405 dlist.append(d)
406 406 self.notify_start(dlist)
407 407 return dlist
408 408
409 409 def find_args(self):
410 410 return ['engine set']
411 411
412 412 def signal(self, sig):
413 413 dlist = []
414 414 for el in self.launchers.itervalues():
415 415 d = el.signal(sig)
416 416 dlist.append(d)
417 417 return dlist
418 418
419 419 def interrupt_then_kill(self, delay=1.0):
420 420 dlist = []
421 421 for el in self.launchers.itervalues():
422 422 d = el.interrupt_then_kill(delay)
423 423 dlist.append(d)
424 424 return dlist
425 425
426 426 def stop(self):
427 427 return self.interrupt_then_kill()
428 428
429 429 def _notice_engine_stopped(self, data):
430 430 pid = data['pid']
431 431 for idx,el in self.launchers.iteritems():
432 432 if el.process.pid == pid:
433 433 break
434 434 self.launchers.pop(idx)
435 435 self.stop_data[idx] = data
436 436 if not self.launchers:
437 437 self.notify_stop(self.stop_data)
438 438
439 439
440 440 #-----------------------------------------------------------------------------
441 441 # MPI launchers
442 442 #-----------------------------------------------------------------------------
443 443
444 444
445 445 class MPILauncher(LocalProcessLauncher):
446 446 """Launch an external process using mpiexec."""
447 447
448 448 mpi_cmd = List(['mpiexec'], config=True,
449 449 help="The mpiexec command to use in starting the process."
450 450 )
451 451 mpi_args = List([], config=True,
452 452 help="The command line arguments to pass to mpiexec."
453 453 )
454 454 program = List(['date'],
455 455 help="The program to start via mpiexec.")
456 456 program_args = List([],
457 457 help="The command line argument to the program."
458 458 )
459 459 n = Integer(1)
460 460
461 461 def __init__(self, *args, **kwargs):
462 462 # deprecation for old MPIExec names:
463 463 config = kwargs.get('config', {})
464 464 for oldname in ('MPIExecLauncher', 'MPIExecControllerLauncher', 'MPIExecEngineSetLauncher'):
465 465 deprecated = config.get(oldname)
466 466 if deprecated:
467 467 newname = oldname.replace('MPIExec', 'MPI')
468 468 config[newname].update(deprecated)
469 469 self.log.warn("WARNING: %s name has been deprecated, use %s", oldname, newname)
470 470
471 471 super(MPILauncher, self).__init__(*args, **kwargs)
472 472
473 473 def find_args(self):
474 474 """Build self.args using all the fields."""
475 475 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
476 476 self.program + self.program_args
477 477
478 478 def start(self, n):
479 479 """Start n instances of the program using mpiexec."""
480 480 self.n = n
481 481 return super(MPILauncher, self).start()
482 482
483 483
484 484 class MPIControllerLauncher(MPILauncher, ControllerMixin):
485 485 """Launch a controller using mpiexec."""
486 486
487 487 # alias back to *non-configurable* program[_args] for use in find_args()
488 488 # this way all Controller/EngineSetLaunchers have the same form, rather
489 489 # than *some* having `program_args` and others `controller_args`
490 490 @property
491 491 def program(self):
492 492 return self.controller_cmd
493 493
494 494 @property
495 495 def program_args(self):
496 496 return self.cluster_args + self.controller_args
497 497
498 498 def start(self):
499 499 """Start the controller by profile_dir."""
500 500 return super(MPIControllerLauncher, self).start(1)
501 501
502 502
503 503 class MPIEngineSetLauncher(MPILauncher, EngineMixin):
504 504 """Launch engines using mpiexec"""
505 505
506 506 # alias back to *non-configurable* program[_args] for use in find_args()
507 507 # this way all Controller/EngineSetLaunchers have the same form, rather
508 508 # than *some* having `program_args` and others `controller_args`
509 509 @property
510 510 def program(self):
511 511 return self.engine_cmd
512 512
513 513 @property
514 514 def program_args(self):
515 515 return self.cluster_args + self.engine_args
516 516
517 517 def start(self, n):
518 518 """Start n engines by profile or profile_dir."""
519 519 self.n = n
520 520 return super(MPIEngineSetLauncher, self).start(n)
521 521
522 522 # deprecated MPIExec names
523 523 class DeprecatedMPILauncher(object):
524 524 def warn(self):
525 525 oldname = self.__class__.__name__
526 526 newname = oldname.replace('MPIExec', 'MPI')
527 527 self.log.warn("WARNING: %s name is deprecated, use %s", oldname, newname)
528 528
529 529 class MPIExecLauncher(MPILauncher, DeprecatedMPILauncher):
530 530 """Deprecated, use MPILauncher"""
531 531 def __init__(self, *args, **kwargs):
532 532 super(MPIExecLauncher, self).__init__(*args, **kwargs)
533 533 self.warn()
534 534
535 535 class MPIExecControllerLauncher(MPIControllerLauncher, DeprecatedMPILauncher):
536 536 """Deprecated, use MPIControllerLauncher"""
537 537 def __init__(self, *args, **kwargs):
538 538 super(MPIExecControllerLauncher, self).__init__(*args, **kwargs)
539 539 self.warn()
540 540
541 541 class MPIExecEngineSetLauncher(MPIEngineSetLauncher, DeprecatedMPILauncher):
542 542 """Deprecated, use MPIEngineSetLauncher"""
543 543 def __init__(self, *args, **kwargs):
544 544 super(MPIExecEngineSetLauncher, self).__init__(*args, **kwargs)
545 545 self.warn()
546 546
547 547
548 548 #-----------------------------------------------------------------------------
549 549 # SSH launchers
550 550 #-----------------------------------------------------------------------------
551 551
552 552 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
553 553
554 554 class SSHLauncher(LocalProcessLauncher):
555 555 """A minimal launcher for ssh.
556 556
557 557 To be useful this will probably have to be extended to use the ``sshx``
558 558 idea for environment variables. There could be other things this needs
559 559 as well.
560 560 """
561 561
562 562 ssh_cmd = List(['ssh'], config=True,
563 563 help="command for starting ssh")
564 564 ssh_args = List(['-tt'], config=True,
565 565 help="args to pass to ssh")
566 566 program = List(['date'],
567 567 help="Program to launch via ssh")
568 568 program_args = List([],
569 569 help="args to pass to remote program")
570 570 hostname = Unicode('', config=True,
571 571 help="hostname on which to launch the program")
572 572 user = Unicode('', config=True,
573 573 help="username for ssh")
574 574 location = Unicode('', config=True,
575 575 help="user@hostname location for ssh in one setting")
576 576
577 577 def _hostname_changed(self, name, old, new):
578 578 if self.user:
579 579 self.location = u'%s@%s' % (self.user, new)
580 580 else:
581 581 self.location = new
582 582
583 583 def _user_changed(self, name, old, new):
584 584 self.location = u'%s@%s' % (new, self.hostname)
585 585
586 586 def find_args(self):
587 587 return self.ssh_cmd + self.ssh_args + [self.location] + \
588 588 self.program + self.program_args
589 589
590 590 def start(self, hostname=None, user=None):
591 591 if hostname is not None:
592 592 self.hostname = hostname
593 593 if user is not None:
594 594 self.user = user
595 595
596 596 return super(SSHLauncher, self).start()
597 597
598 598 def signal(self, sig):
599 599 if self.state == 'running':
600 600 # send escaped ssh connection-closer
601 601 self.process.stdin.write('~.')
602 602 self.process.stdin.flush()
603 603
604 604
605 605
606 606 class SSHControllerLauncher(SSHLauncher, ControllerMixin):
607 607
608 608 # alias back to *non-configurable* program[_args] for use in find_args()
609 609 # this way all Controller/EngineSetLaunchers have the same form, rather
610 610 # than *some* having `program_args` and others `controller_args`
611 611 @property
612 612 def program(self):
613 613 return self.controller_cmd
614 614
615 615 @property
616 616 def program_args(self):
617 617 return self.cluster_args + self.controller_args
618 618
619 619
620 620 class SSHEngineLauncher(SSHLauncher, EngineMixin):
621 621
622 622 # alias back to *non-configurable* program[_args] for use in find_args()
623 623 # this way all Controller/EngineSetLaunchers have the same form, rather
624 624 # than *some* having `program_args` and others `controller_args`
625 625 @property
626 626 def program(self):
627 627 return self.engine_cmd
628 628
629 629 @property
630 630 def program_args(self):
631 631 return self.cluster_args + self.engine_args
632 632
633 633
634 634 class SSHEngineSetLauncher(LocalEngineSetLauncher):
635 635 launcher_class = SSHEngineLauncher
636 636 engines = Dict(config=True,
637 637 help="""dict of engines to launch. This is a dict by hostname of ints,
638 638 corresponding to the number of engines to start on that host.""")
639 639
640 640 @property
641 641 def engine_count(self):
642 642 """determine engine count from `engines` dict"""
643 643 count = 0
644 644 for n in self.engines.itervalues():
645 645 if isinstance(n, (tuple,list)):
646 646 n,args = n
647 647 count += n
648 648 return count
649 649
650 650 def start(self, n):
651 651 """Start engines by profile or profile_dir.
652 652 `n` is ignored, and the `engines` config property is used instead.
653 653 """
654 654
655 655 dlist = []
656 656 for host, n in self.engines.iteritems():
657 657 if isinstance(n, (tuple, list)):
658 658 n, args = n
659 659 else:
660 660 args = copy.deepcopy(self.engine_args)
661 661
662 662 if '@' in host:
663 663 user,host = host.split('@',1)
664 664 else:
665 665 user=None
666 666 for i in range(n):
667 667 if i > 0:
668 668 time.sleep(self.delay)
669 669 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
670 670 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
671 671 )
672 672
673 673 # Copy the engine args over to each engine launcher.
674 674 el.engine_cmd = self.engine_cmd
675 675 el.engine_args = args
676 676 el.on_stop(self._notice_engine_stopped)
677 677 d = el.start(user=user, hostname=host)
678 678 self.launchers[ "%s/%i" % (host,i) ] = el
679 679 dlist.append(d)
680 680 self.notify_start(dlist)
681 681 return dlist
682 682
683 683
684 684
685 685 #-----------------------------------------------------------------------------
686 686 # Windows HPC Server 2008 scheduler launchers
687 687 #-----------------------------------------------------------------------------
688 688
689 689
690 690 # This is only used on Windows.
691 691 def find_job_cmd():
692 692 if WINDOWS:
693 693 try:
694 694 return find_cmd('job')
695 695 except (FindCmdError, ImportError):
696 696 # ImportError will be raised if win32api is not installed
697 697 return 'job'
698 698 else:
699 699 return 'job'
700 700
701 701
702 702 class WindowsHPCLauncher(BaseLauncher):
703 703
704 704 job_id_regexp = Unicode(r'\d+', config=True,
705 705 help="""A regular expression used to get the job id from the output of the
706 706 submit_command. """
707 707 )
708 708 job_file_name = Unicode(u'ipython_job.xml', config=True,
709 709 help="The filename of the instantiated job script.")
710 710 # The full path to the instantiated job script. This gets made dynamically
711 711 # by combining the work_dir with the job_file_name.
712 712 job_file = Unicode(u'')
713 713 scheduler = Unicode('', config=True,
714 714 help="The hostname of the scheduler to submit the job to.")
715 715 job_cmd = Unicode(find_job_cmd(), config=True,
716 716 help="The command for submitting jobs.")
717 717
718 718 def __init__(self, work_dir=u'.', config=None, **kwargs):
719 719 super(WindowsHPCLauncher, self).__init__(
720 720 work_dir=work_dir, config=config, **kwargs
721 721 )
722 722
723 723 @property
724 724 def job_file(self):
725 725 return os.path.join(self.work_dir, self.job_file_name)
726 726
727 727 def write_job_file(self, n):
728 728 raise NotImplementedError("Implement write_job_file in a subclass.")
729 729
730 730 def find_args(self):
731 731 return [u'job.exe']
732 732
733 733 def parse_job_id(self, output):
734 734 """Take the output of the submit command and return the job id."""
735 735 m = re.search(self.job_id_regexp, output)
736 736 if m is not None:
737 737 job_id = m.group()
738 738 else:
739 739 raise LauncherError("Job id couldn't be determined: %s" % output)
740 740 self.job_id = job_id
741 741 self.log.info('Job started with id: %r', job_id)
742 742 return job_id
743 743
744 744 def start(self, n):
745 745 """Start n copies of the process using the Win HPC job scheduler."""
746 746 self.write_job_file(n)
747 747 args = [
748 748 'submit',
749 749 '/jobfile:%s' % self.job_file,
750 750 '/scheduler:%s' % self.scheduler
751 751 ]
752 752 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
753 753
754 754 output = check_output([self.job_cmd]+args,
755 755 env=os.environ,
756 756 cwd=self.work_dir,
757 757 stderr=STDOUT
758 758 )
759 759 job_id = self.parse_job_id(output)
760 760 self.notify_start(job_id)
761 761 return job_id
762 762
763 763 def stop(self):
764 764 args = [
765 765 'cancel',
766 766 self.job_id,
767 767 '/scheduler:%s' % self.scheduler
768 768 ]
769 769 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
770 770 try:
771 771 output = check_output([self.job_cmd]+args,
772 772 env=os.environ,
773 773 cwd=self.work_dir,
774 774 stderr=STDOUT
775 775 )
776 776 except:
777 777 output = 'The job already appears to be stoppped: %r' % self.job_id
778 778 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
779 779 return output
780 780
781 781
782 782 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
783 783
784 784 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
785 785 help="WinHPC xml job file.")
786 786 controller_args = List([], config=False,
787 787 help="extra args to pass to ipcontroller")
788 788
789 789 def write_job_file(self, n):
790 790 job = IPControllerJob(config=self.config)
791 791
792 792 t = IPControllerTask(config=self.config)
793 793 # The tasks work directory is *not* the actual work directory of
794 794 # the controller. It is used as the base path for the stdout/stderr
795 795 # files that the scheduler redirects to.
796 796 t.work_directory = self.profile_dir
797 797 # Add the profile_dir and from self.start().
798 798 t.controller_args.extend(self.cluster_args)
799 799 t.controller_args.extend(self.controller_args)
800 800 job.add_task(t)
801 801
802 802 self.log.debug("Writing job description file: %s", self.job_file)
803 803 job.write(self.job_file)
804 804
805 805 @property
806 806 def job_file(self):
807 807 return os.path.join(self.profile_dir, self.job_file_name)
808 808
809 809 def start(self):
810 810 """Start the controller by profile_dir."""
811 811 return super(WindowsHPCControllerLauncher, self).start(1)
812 812
813 813
814 814 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
815 815
816 816 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
817 817 help="jobfile for ipengines job")
818 818 engine_args = List([], config=False,
819 819 help="extra args to pas to ipengine")
820 820
821 821 def write_job_file(self, n):
822 822 job = IPEngineSetJob(config=self.config)
823 823
824 824 for i in range(n):
825 825 t = IPEngineTask(config=self.config)
826 826 # The tasks work directory is *not* the actual work directory of
827 827 # the engine. It is used as the base path for the stdout/stderr
828 828 # files that the scheduler redirects to.
829 829 t.work_directory = self.profile_dir
830 830 # Add the profile_dir and from self.start().
831 831 t.engine_args.extend(self.cluster_args)
832 832 t.engine_args.extend(self.engine_args)
833 833 job.add_task(t)
834 834
835 835 self.log.debug("Writing job description file: %s", self.job_file)
836 836 job.write(self.job_file)
837 837
838 838 @property
839 839 def job_file(self):
840 840 return os.path.join(self.profile_dir, self.job_file_name)
841 841
842 842 def start(self, n):
843 843 """Start the controller by profile_dir."""
844 844 return super(WindowsHPCEngineSetLauncher, self).start(n)
845 845
846 846
847 847 #-----------------------------------------------------------------------------
848 848 # Batch (PBS) system launchers
849 849 #-----------------------------------------------------------------------------
850 850
851 851 class BatchClusterAppMixin(ClusterAppMixin):
852 852 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
853 853 def _profile_dir_changed(self, name, old, new):
854 854 self.context[name] = new
855 855 _cluster_id_changed = _profile_dir_changed
856 856
857 857 def _profile_dir_default(self):
858 858 self.context['profile_dir'] = ''
859 859 return ''
860 860 def _cluster_id_default(self):
861 861 self.context['cluster_id'] = ''
862 862 return ''
863 863
864 864
865 865 class BatchSystemLauncher(BaseLauncher):
866 866 """Launch an external process using a batch system.
867 867
868 868 This class is designed to work with UNIX batch systems like PBS, LSF,
869 869 GridEngine, etc. The overall model is that there are different commands
870 870 like qsub, qdel, etc. that handle the starting and stopping of the process.
871 871
872 872 This class also has the notion of a batch script. The ``batch_template``
873 873 attribute can be set to a string that is a template for the batch script.
874 874 This template is instantiated using string formatting. Thus the template can
875 875 use {n} fot the number of instances. Subclasses can add additional variables
876 876 to the template dict.
877 877 """
878 878
879 879 # Subclasses must fill these in. See PBSEngineSet
880 880 submit_command = List([''], config=True,
881 881 help="The name of the command line program used to submit jobs.")
882 882 delete_command = List([''], config=True,
883 883 help="The name of the command line program used to delete jobs.")
884 884 job_id_regexp = Unicode('', config=True,
885 885 help="""A regular expression used to get the job id from the output of the
886 886 submit_command.""")
887 887 batch_template = Unicode('', config=True,
888 888 help="The string that is the batch script template itself.")
889 889 batch_template_file = Unicode(u'', config=True,
890 890 help="The file that contains the batch template.")
891 891 batch_file_name = Unicode(u'batch_script', config=True,
892 892 help="The filename of the instantiated batch script.")
893 893 queue = Unicode(u'', config=True,
894 894 help="The PBS Queue.")
895 895
896 896 def _queue_changed(self, name, old, new):
897 897 self.context[name] = new
898 898
899 899 n = Integer(1)
900 900 _n_changed = _queue_changed
901 901
902 902 # not configurable, override in subclasses
903 903 # PBS Job Array regex
904 904 job_array_regexp = Unicode('')
905 905 job_array_template = Unicode('')
906 906 # PBS Queue regex
907 907 queue_regexp = Unicode('')
908 908 queue_template = Unicode('')
909 909 # The default batch template, override in subclasses
910 910 default_template = Unicode('')
911 911 # The full path to the instantiated batch script.
912 912 batch_file = Unicode(u'')
913 913 # the format dict used with batch_template:
914 914 context = Dict()
915 915 def _context_default(self):
916 916 """load the default context with the default values for the basic keys
917 917
918 918 because the _trait_changed methods only load the context if they
919 919 are set to something other than the default value.
920 920 """
921 921 return dict(n=1, queue=u'', profile_dir=u'', cluster_id=u'')
922 922
923 923 # the Formatter instance for rendering the templates:
924 924 formatter = Instance(EvalFormatter, (), {})
925 925
926 926
927 927 def find_args(self):
928 928 return self.submit_command + [self.batch_file]
929 929
930 930 def __init__(self, work_dir=u'.', config=None, **kwargs):
931 931 super(BatchSystemLauncher, self).__init__(
932 932 work_dir=work_dir, config=config, **kwargs
933 933 )
934 934 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
935 935
936 936 def parse_job_id(self, output):
937 937 """Take the output of the submit command and return the job id."""
938 938 m = re.search(self.job_id_regexp, output)
939 939 if m is not None:
940 940 job_id = m.group()
941 941 else:
942 942 raise LauncherError("Job id couldn't be determined: %s" % output)
943 943 self.job_id = job_id
944 944 self.log.info('Job submitted with job id: %r', job_id)
945 945 return job_id
946 946
947 947 def write_batch_script(self, n):
948 948 """Instantiate and write the batch script to the work_dir."""
949 949 self.n = n
950 950 # first priority is batch_template if set
951 951 if self.batch_template_file and not self.batch_template:
952 952 # second priority is batch_template_file
953 953 with open(self.batch_template_file) as f:
954 954 self.batch_template = f.read()
955 955 if not self.batch_template:
956 956 # third (last) priority is default_template
957 957 self.batch_template = self.default_template
958 958
959 959 # add jobarray or queue lines to user-specified template
960 960 # note that this is *only* when user did not specify a template.
961 961 regex = re.compile(self.job_array_regexp)
962 962 # print regex.search(self.batch_template)
963 963 if not regex.search(self.batch_template):
964 964 self.log.debug("adding job array settings to batch script")
965 965 firstline, rest = self.batch_template.split('\n',1)
966 966 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
967 967
968 968 regex = re.compile(self.queue_regexp)
969 969 # print regex.search(self.batch_template)
970 970 if self.queue and not regex.search(self.batch_template):
971 971 self.log.debug("adding PBS queue settings to batch script")
972 972 firstline, rest = self.batch_template.split('\n',1)
973 973 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
974 974
975 975 script_as_string = self.formatter.format(self.batch_template, **self.context)
976 976 self.log.debug('Writing batch script: %s', self.batch_file)
977 977
978 978 with open(self.batch_file, 'w') as f:
979 979 f.write(script_as_string)
980 980 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
981 981
982 982 def start(self, n):
983 983 """Start n copies of the process using a batch system."""
984 984 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
985 985 # Here we save profile_dir in the context so they
986 986 # can be used in the batch script template as {profile_dir}
987 987 self.write_batch_script(n)
988 988 output = check_output(self.args, env=os.environ)
989 989
990 990 job_id = self.parse_job_id(output)
991 991 self.notify_start(job_id)
992 992 return job_id
993 993
994 994 def stop(self):
995 995 output = check_output(self.delete_command+[self.job_id], env=os.environ)
996 996 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
997 997 return output
998 998
999 999
1000 1000 class PBSLauncher(BatchSystemLauncher):
1001 1001 """A BatchSystemLauncher subclass for PBS."""
1002 1002
1003 1003 submit_command = List(['qsub'], config=True,
1004 1004 help="The PBS submit command ['qsub']")
1005 1005 delete_command = List(['qdel'], config=True,
1006 1006 help="The PBS delete command ['qsub']")
1007 1007 job_id_regexp = Unicode(r'\d+', config=True,
1008 1008 help="Regular expresion for identifying the job ID [r'\d+']")
1009 1009
1010 1010 batch_file = Unicode(u'')
1011 1011 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
1012 1012 job_array_template = Unicode('#PBS -t 1-{n}')
1013 1013 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
1014 1014 queue_template = Unicode('#PBS -q {queue}')
1015 1015
1016 1016
1017 1017 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
1018 1018 """Launch a controller using PBS."""
1019 1019
1020 1020 batch_file_name = Unicode(u'pbs_controller', config=True,
1021 1021 help="batch file name for the controller job.")
1022 1022 default_template= Unicode("""#!/bin/sh
1023 1023 #PBS -V
1024 1024 #PBS -N ipcontroller
1025 1025 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1026 1026 """%(' '.join(ipcontroller_cmd_argv)))
1027 1027
1028 1028
1029 1029 def start(self):
1030 1030 """Start the controller by profile or profile_dir."""
1031 1031 return super(PBSControllerLauncher, self).start(1)
1032 1032
1033 1033
1034 1034 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
1035 1035 """Launch Engines using PBS"""
1036 1036 batch_file_name = Unicode(u'pbs_engines', config=True,
1037 1037 help="batch file name for the engine(s) job.")
1038 1038 default_template= Unicode(u"""#!/bin/sh
1039 1039 #PBS -V
1040 1040 #PBS -N ipengine
1041 1041 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1042 1042 """%(' '.join(ipengine_cmd_argv)))
1043 1043
1044 1044 def start(self, n):
1045 1045 """Start n engines by profile or profile_dir."""
1046 1046 return super(PBSEngineSetLauncher, self).start(n)
1047 1047
1048 1048 #SGE is very similar to PBS
1049 1049
1050 1050 class SGELauncher(PBSLauncher):
1051 1051 """Sun GridEngine is a PBS clone with slightly different syntax"""
1052 1052 job_array_regexp = Unicode('#\$\W+\-t')
1053 1053 job_array_template = Unicode('#$ -t 1-{n}')
1054 1054 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
1055 1055 queue_template = Unicode('#$ -q {queue}')
1056 1056
1057 1057 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
1058 1058 """Launch a controller using SGE."""
1059 1059
1060 1060 batch_file_name = Unicode(u'sge_controller', config=True,
1061 1061 help="batch file name for the ipontroller job.")
1062 1062 default_template= Unicode(u"""#$ -V
1063 1063 #$ -S /bin/sh
1064 1064 #$ -N ipcontroller
1065 1065 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1066 1066 """%(' '.join(ipcontroller_cmd_argv)))
1067 1067
1068 1068 def start(self):
1069 1069 """Start the controller by profile or profile_dir."""
1070 1070 return super(SGEControllerLauncher, self).start(1)
1071 1071
1072 1072 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1073 1073 """Launch Engines with SGE"""
1074 1074 batch_file_name = Unicode(u'sge_engines', config=True,
1075 1075 help="batch file name for the engine(s) job.")
1076 1076 default_template = Unicode("""#$ -V
1077 1077 #$ -S /bin/sh
1078 1078 #$ -N ipengine
1079 1079 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1080 1080 """%(' '.join(ipengine_cmd_argv)))
1081 1081
1082 1082 def start(self, n):
1083 1083 """Start n engines by profile or profile_dir."""
1084 1084 return super(SGEEngineSetLauncher, self).start(n)
1085 1085
1086 1086
1087 1087 # LSF launchers
1088 1088
1089 1089 class LSFLauncher(BatchSystemLauncher):
1090 1090 """A BatchSystemLauncher subclass for LSF."""
1091 1091
1092 1092 submit_command = List(['bsub'], config=True,
1093 1093 help="The PBS submit command ['bsub']")
1094 1094 delete_command = List(['bkill'], config=True,
1095 1095 help="The PBS delete command ['bkill']")
1096 1096 job_id_regexp = Unicode(r'\d+', config=True,
1097 1097 help="Regular expresion for identifying the job ID [r'\d+']")
1098 1098
1099 1099 batch_file = Unicode(u'')
1100 1100 job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1101 1101 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1102 1102 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1103 1103 queue_template = Unicode('#BSUB -q {queue}')
1104 1104
1105 1105 def start(self, n):
1106 1106 """Start n copies of the process using LSF batch system.
1107 1107 This cant inherit from the base class because bsub expects
1108 1108 to be piped a shell script in order to honor the #BSUB directives :
1109 1109 bsub < script
1110 1110 """
1111 1111 # Here we save profile_dir in the context so they
1112 1112 # can be used in the batch script template as {profile_dir}
1113 1113 self.write_batch_script(n)
1114 1114 #output = check_output(self.args, env=os.environ)
1115 1115 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1116 1116 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1117 1117 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1118 1118 output,err = p.communicate()
1119 1119 job_id = self.parse_job_id(output)
1120 1120 self.notify_start(job_id)
1121 1121 return job_id
1122 1122
1123 1123
1124 1124 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1125 1125 """Launch a controller using LSF."""
1126 1126
1127 1127 batch_file_name = Unicode(u'lsf_controller', config=True,
1128 1128 help="batch file name for the controller job.")
1129 1129 default_template= Unicode("""#!/bin/sh
1130 1130 #BSUB -J ipcontroller
1131 1131 #BSUB -oo ipcontroller.o.%%J
1132 1132 #BSUB -eo ipcontroller.e.%%J
1133 1133 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1134 1134 """%(' '.join(ipcontroller_cmd_argv)))
1135 1135
1136 1136 def start(self):
1137 1137 """Start the controller by profile or profile_dir."""
1138 1138 return super(LSFControllerLauncher, self).start(1)
1139 1139
1140 1140
1141 1141 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1142 1142 """Launch Engines using LSF"""
1143 1143 batch_file_name = Unicode(u'lsf_engines', config=True,
1144 1144 help="batch file name for the engine(s) job.")
1145 1145 default_template= Unicode(u"""#!/bin/sh
1146 1146 #BSUB -oo ipengine.o.%%J
1147 1147 #BSUB -eo ipengine.e.%%J
1148 1148 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1149 1149 """%(' '.join(ipengine_cmd_argv)))
1150 1150
1151 1151 def start(self, n):
1152 1152 """Start n engines by profile or profile_dir."""
1153 1153 return super(LSFEngineSetLauncher, self).start(n)
1154 1154
1155 1155
1156 1156 #-----------------------------------------------------------------------------
1157 1157 # A launcher for ipcluster itself!
1158 1158 #-----------------------------------------------------------------------------
1159 1159
1160 1160
1161 1161 class IPClusterLauncher(LocalProcessLauncher):
1162 1162 """Launch the ipcluster program in an external process."""
1163 1163
1164 1164 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1165 1165 help="Popen command for ipcluster")
1166 1166 ipcluster_args = List(
1167 1167 ['--clean-logs=True', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1168 1168 help="Command line arguments to pass to ipcluster.")
1169 1169 ipcluster_subcommand = Unicode('start')
1170 ipcluster_profile = Unicode('default')
1171 ipcluster_n = Integer(2)
1170 profile = Unicode('default')
1171 n = Integer(2)
1172 1172
1173 1173 def find_args(self):
1174 1174 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1175 ['--n=%i'%self.ipcluster_n, '--profile=%s'%self.ipcluster_profile] + \
1175 ['--n=%i'%self.n, '--profile=%s'%self.profile] + \
1176 1176 self.ipcluster_args
1177 1177
1178 1178 def start(self):
1179 1179 return super(IPClusterLauncher, self).start()
1180 1180
1181 1181 #-----------------------------------------------------------------------------
1182 1182 # Collections of launchers
1183 1183 #-----------------------------------------------------------------------------
1184 1184
1185 1185 local_launchers = [
1186 1186 LocalControllerLauncher,
1187 1187 LocalEngineLauncher,
1188 1188 LocalEngineSetLauncher,
1189 1189 ]
1190 1190 mpi_launchers = [
1191 1191 MPILauncher,
1192 1192 MPIControllerLauncher,
1193 1193 MPIEngineSetLauncher,
1194 1194 ]
1195 1195 ssh_launchers = [
1196 1196 SSHLauncher,
1197 1197 SSHControllerLauncher,
1198 1198 SSHEngineLauncher,
1199 1199 SSHEngineSetLauncher,
1200 1200 ]
1201 1201 winhpc_launchers = [
1202 1202 WindowsHPCLauncher,
1203 1203 WindowsHPCControllerLauncher,
1204 1204 WindowsHPCEngineSetLauncher,
1205 1205 ]
1206 1206 pbs_launchers = [
1207 1207 PBSLauncher,
1208 1208 PBSControllerLauncher,
1209 1209 PBSEngineSetLauncher,
1210 1210 ]
1211 1211 sge_launchers = [
1212 1212 SGELauncher,
1213 1213 SGEControllerLauncher,
1214 1214 SGEEngineSetLauncher,
1215 1215 ]
1216 1216 lsf_launchers = [
1217 1217 LSFLauncher,
1218 1218 LSFControllerLauncher,
1219 1219 LSFEngineSetLauncher,
1220 1220 ]
1221 1221 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1222 1222 + pbs_launchers + sge_launchers + lsf_launchers
1223 1223
General Comments 0
You need to be logged in to leave comments. Login now