##// END OF EJS Templates
Merge PR #795 (cluster-id and launcher cleanup)...
MinRK -
r4852:3994edb7 merge
parent child Browse files
Show More
@@ -1,327 +1,328 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 A base class for objects that are configurable.
3 A base class for objects that are configurable.
4
4
5 Authors:
5 Authors:
6
6
7 * Brian Granger
7 * Brian Granger
8 * Fernando Perez
8 * Fernando Perez
9 * Min RK
9 * Min RK
10 """
10 """
11
11
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Copyright (C) 2008-2011 The IPython Development Team
13 # Copyright (C) 2008-2011 The IPython Development Team
14 #
14 #
15 # Distributed under the terms of the BSD License. The full license is in
15 # Distributed under the terms of the BSD License. The full license is in
16 # the file COPYING, distributed as part of this software.
16 # the file COPYING, distributed as part of this software.
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 #-----------------------------------------------------------------------------
19 #-----------------------------------------------------------------------------
20 # Imports
20 # Imports
21 #-----------------------------------------------------------------------------
21 #-----------------------------------------------------------------------------
22
22
23 import datetime
23 import datetime
24 from copy import deepcopy
24 from copy import deepcopy
25
25
26 from loader import Config
26 from loader import Config
27 from IPython.utils.traitlets import HasTraits, Instance
27 from IPython.utils.traitlets import HasTraits, Instance
28 from IPython.utils.text import indent, wrap_paragraphs
28 from IPython.utils.text import indent, wrap_paragraphs
29
29
30
30
31 #-----------------------------------------------------------------------------
31 #-----------------------------------------------------------------------------
32 # Helper classes for Configurables
32 # Helper classes for Configurables
33 #-----------------------------------------------------------------------------
33 #-----------------------------------------------------------------------------
34
34
35
35
36 class ConfigurableError(Exception):
36 class ConfigurableError(Exception):
37 pass
37 pass
38
38
39
39
40 class MultipleInstanceError(ConfigurableError):
40 class MultipleInstanceError(ConfigurableError):
41 pass
41 pass
42
42
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44 # Configurable implementation
44 # Configurable implementation
45 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
46
46
47 class Configurable(HasTraits):
47 class Configurable(HasTraits):
48
48
49 config = Instance(Config,(),{})
49 config = Instance(Config,(),{})
50 created = None
50 created = None
51
51
52 def __init__(self, **kwargs):
52 def __init__(self, **kwargs):
53 """Create a configurable given a config config.
53 """Create a configurable given a config config.
54
54
55 Parameters
55 Parameters
56 ----------
56 ----------
57 config : Config
57 config : Config
58 If this is empty, default values are used. If config is a
58 If this is empty, default values are used. If config is a
59 :class:`Config` instance, it will be used to configure the
59 :class:`Config` instance, it will be used to configure the
60 instance.
60 instance.
61
61
62 Notes
62 Notes
63 -----
63 -----
64 Subclasses of Configurable must call the :meth:`__init__` method of
64 Subclasses of Configurable must call the :meth:`__init__` method of
65 :class:`Configurable` *before* doing anything else and using
65 :class:`Configurable` *before* doing anything else and using
66 :func:`super`::
66 :func:`super`::
67
67
68 class MyConfigurable(Configurable):
68 class MyConfigurable(Configurable):
69 def __init__(self, config=None):
69 def __init__(self, config=None):
70 super(MyConfigurable, self).__init__(config)
70 super(MyConfigurable, self).__init__(config)
71 # Then any other code you need to finish initialization.
71 # Then any other code you need to finish initialization.
72
72
73 This ensures that instances will be configured properly.
73 This ensures that instances will be configured properly.
74 """
74 """
75 config = kwargs.pop('config', None)
75 config = kwargs.pop('config', None)
76 if config is not None:
76 if config is not None:
77 # We used to deepcopy, but for now we are trying to just save
77 # We used to deepcopy, but for now we are trying to just save
78 # by reference. This *could* have side effects as all components
78 # by reference. This *could* have side effects as all components
79 # will share config. In fact, I did find such a side effect in
79 # will share config. In fact, I did find such a side effect in
80 # _config_changed below. If a config attribute value was a mutable type
80 # _config_changed below. If a config attribute value was a mutable type
81 # all instances of a component were getting the same copy, effectively
81 # all instances of a component were getting the same copy, effectively
82 # making that a class attribute.
82 # making that a class attribute.
83 # self.config = deepcopy(config)
83 # self.config = deepcopy(config)
84 self.config = config
84 self.config = config
85 # This should go second so individual keyword arguments override
85 # This should go second so individual keyword arguments override
86 # the values in config.
86 # the values in config.
87 super(Configurable, self).__init__(**kwargs)
87 super(Configurable, self).__init__(**kwargs)
88 self.created = datetime.datetime.now()
88 self.created = datetime.datetime.now()
89
89
90 #-------------------------------------------------------------------------
90 #-------------------------------------------------------------------------
91 # Static trait notifiations
91 # Static trait notifiations
92 #-------------------------------------------------------------------------
92 #-------------------------------------------------------------------------
93
93
94 def _config_changed(self, name, old, new):
94 def _config_changed(self, name, old, new):
95 """Update all the class traits having ``config=True`` as metadata.
95 """Update all the class traits having ``config=True`` as metadata.
96
96
97 For any class trait with a ``config`` metadata attribute that is
97 For any class trait with a ``config`` metadata attribute that is
98 ``True``, we update the trait with the value of the corresponding
98 ``True``, we update the trait with the value of the corresponding
99 config entry.
99 config entry.
100 """
100 """
101 # Get all traits with a config metadata entry that is True
101 # Get all traits with a config metadata entry that is True
102 traits = self.traits(config=True)
102 traits = self.traits(config=True)
103
103
104 # We auto-load config section for this class as well as any parent
104 # We auto-load config section for this class as well as any parent
105 # classes that are Configurable subclasses. This starts with Configurable
105 # classes that are Configurable subclasses. This starts with Configurable
106 # and works down the mro loading the config for each section.
106 # and works down the mro loading the config for each section.
107 section_names = [cls.__name__ for cls in \
107 section_names = [cls.__name__ for cls in \
108 reversed(self.__class__.__mro__) if
108 reversed(self.__class__.__mro__) if
109 issubclass(cls, Configurable) and issubclass(self.__class__, cls)]
109 issubclass(cls, Configurable) and issubclass(self.__class__, cls)]
110
110
111 for sname in section_names:
111 for sname in section_names:
112 # Don't do a blind getattr as that would cause the config to
112 # Don't do a blind getattr as that would cause the config to
113 # dynamically create the section with name self.__class__.__name__.
113 # dynamically create the section with name self.__class__.__name__.
114 if new._has_section(sname):
114 if new._has_section(sname):
115 my_config = new[sname]
115 my_config = new[sname]
116 for k, v in traits.iteritems():
116 for k, v in traits.iteritems():
117 # Don't allow traitlets with config=True to start with
117 # Don't allow traitlets with config=True to start with
118 # uppercase. Otherwise, they are confused with Config
118 # uppercase. Otherwise, they are confused with Config
119 # subsections. But, developers shouldn't have uppercase
119 # subsections. But, developers shouldn't have uppercase
120 # attributes anyways! (PEP 6)
120 # attributes anyways! (PEP 6)
121 if k[0].upper()==k[0] and not k.startswith('_'):
121 if k[0].upper()==k[0] and not k.startswith('_'):
122 raise ConfigurableError('Configurable traitlets with '
122 raise ConfigurableError('Configurable traitlets with '
123 'config=True must start with a lowercase so they are '
123 'config=True must start with a lowercase so they are '
124 'not confused with Config subsections: %s.%s' % \
124 'not confused with Config subsections: %s.%s' % \
125 (self.__class__.__name__, k))
125 (self.__class__.__name__, k))
126 try:
126 try:
127 # Here we grab the value from the config
127 # Here we grab the value from the config
128 # If k has the naming convention of a config
128 # If k has the naming convention of a config
129 # section, it will be auto created.
129 # section, it will be auto created.
130 config_value = my_config[k]
130 config_value = my_config[k]
131 except KeyError:
131 except KeyError:
132 pass
132 pass
133 else:
133 else:
134 # print "Setting %s.%s from %s.%s=%r" % \
134 # print "Setting %s.%s from %s.%s=%r" % \
135 # (self.__class__.__name__,k,sname,k,config_value)
135 # (self.__class__.__name__,k,sname,k,config_value)
136 # We have to do a deepcopy here if we don't deepcopy the entire
136 # We have to do a deepcopy here if we don't deepcopy the entire
137 # config object. If we don't, a mutable config_value will be
137 # config object. If we don't, a mutable config_value will be
138 # shared by all instances, effectively making it a class attribute.
138 # shared by all instances, effectively making it a class attribute.
139 setattr(self, k, deepcopy(config_value))
139 setattr(self, k, deepcopy(config_value))
140
140
141 @classmethod
141 @classmethod
142 def class_get_help(cls):
142 def class_get_help(cls):
143 """Get the help string for this class in ReST format."""
143 """Get the help string for this class in ReST format."""
144 cls_traits = cls.class_traits(config=True)
144 cls_traits = cls.class_traits(config=True)
145 final_help = []
145 final_help = []
146 final_help.append(u'%s options' % cls.__name__)
146 final_help.append(u'%s options' % cls.__name__)
147 final_help.append(len(final_help[0])*u'-')
147 final_help.append(len(final_help[0])*u'-')
148 for k,v in cls.class_traits(config=True).iteritems():
148 for k,v in cls.class_traits(config=True).iteritems():
149 help = cls.class_get_trait_help(v)
149 help = cls.class_get_trait_help(v)
150 final_help.append(help)
150 final_help.append(help)
151 return '\n'.join(final_help)
151 return '\n'.join(final_help)
152
152
153 @classmethod
153 @classmethod
154 def class_get_trait_help(cls, trait):
154 def class_get_trait_help(cls, trait):
155 """Get the help string for a single trait."""
155 """Get the help string for a single trait."""
156 lines = []
156 lines = []
157 header = "--%s.%s=<%s>" % (cls.__name__, trait.name, trait.__class__.__name__)
157 header = "--%s.%s=<%s>" % (cls.__name__, trait.name, trait.__class__.__name__)
158 lines.append(header)
158 lines.append(header)
159 try:
159 try:
160 dvr = repr(trait.get_default_value())
160 dvr = repr(trait.get_default_value())
161 except Exception:
161 except Exception:
162 dvr = None # ignore defaults we can't construct
162 dvr = None # ignore defaults we can't construct
163 if dvr is not None:
163 if dvr is not None:
164 if len(dvr) > 64:
164 if len(dvr) > 64:
165 dvr = dvr[:61]+'...'
165 dvr = dvr[:61]+'...'
166 lines.append(indent('Default: %s'%dvr, 4))
166 lines.append(indent('Default: %s'%dvr, 4))
167 if 'Enum' in trait.__class__.__name__:
167 if 'Enum' in trait.__class__.__name__:
168 # include Enum choices
168 # include Enum choices
169 lines.append(indent('Choices: %r'%(trait.values,)))
169 lines.append(indent('Choices: %r'%(trait.values,)))
170
170
171 help = trait.get_metadata('help')
171 help = trait.get_metadata('help')
172 if help is not None:
172 if help is not None:
173 help = '\n'.join(wrap_paragraphs(help, 76))
173 help = '\n'.join(wrap_paragraphs(help, 76))
174 lines.append(indent(help, 4))
174 lines.append(indent(help, 4))
175 return '\n'.join(lines)
175 return '\n'.join(lines)
176
176
177 @classmethod
177 @classmethod
178 def class_print_help(cls):
178 def class_print_help(cls):
179 """Get the help string for a single trait and print it."""
179 """Get the help string for a single trait and print it."""
180 print cls.class_get_help()
180 print cls.class_get_help()
181
181
182 @classmethod
182 @classmethod
183 def class_config_section(cls):
183 def class_config_section(cls):
184 """Get the config class config section"""
184 """Get the config class config section"""
185 def c(s):
185 def c(s):
186 """return a commented, wrapped block."""
186 """return a commented, wrapped block."""
187 s = '\n\n'.join(wrap_paragraphs(s, 78))
187 s = '\n\n'.join(wrap_paragraphs(s, 78))
188
188
189 return '# ' + s.replace('\n', '\n# ')
189 return '# ' + s.replace('\n', '\n# ')
190
190
191 # section header
191 # section header
192 breaker = '#' + '-'*78
192 breaker = '#' + '-'*78
193 s = "# %s configuration"%cls.__name__
193 s = "# %s configuration"%cls.__name__
194 lines = [breaker, s, breaker, '']
194 lines = [breaker, s, breaker, '']
195 # get the description trait
195 # get the description trait
196 desc = cls.class_traits().get('description')
196 desc = cls.class_traits().get('description')
197 if desc:
197 if desc:
198 desc = desc.default_value
198 desc = desc.default_value
199 else:
199 else:
200 # no description trait, use __doc__
200 # no description trait, use __doc__
201 desc = getattr(cls, '__doc__', '')
201 desc = getattr(cls, '__doc__', '')
202 if desc:
202 if desc:
203 lines.append(c(desc))
203 lines.append(c(desc))
204 lines.append('')
204 lines.append('')
205
205
206 parents = []
206 parents = []
207 for parent in cls.mro():
207 for parent in cls.mro():
208 # only include parents that are not base classes
208 # only include parents that are not base classes
209 # and are not the class itself
209 # and are not the class itself
210 if issubclass(parent, Configurable) and \
210 # and have some configurable traits to inherit
211 not parent in (Configurable, SingletonConfigurable, cls):
211 if parent is not cls and issubclass(parent, Configurable) and \
212 parent.class_traits(config=True):
212 parents.append(parent)
213 parents.append(parent)
213
214
214 if parents:
215 if parents:
215 pstr = ', '.join([ p.__name__ for p in parents ])
216 pstr = ', '.join([ p.__name__ for p in parents ])
216 lines.append(c('%s will inherit config from: %s'%(cls.__name__, pstr)))
217 lines.append(c('%s will inherit config from: %s'%(cls.__name__, pstr)))
217 lines.append('')
218 lines.append('')
218
219
219 for name,trait in cls.class_traits(config=True).iteritems():
220 for name,trait in cls.class_traits(config=True).iteritems():
220 help = trait.get_metadata('help') or ''
221 help = trait.get_metadata('help') or ''
221 lines.append(c(help))
222 lines.append(c(help))
222 lines.append('# c.%s.%s = %r'%(cls.__name__, name, trait.get_default_value()))
223 lines.append('# c.%s.%s = %r'%(cls.__name__, name, trait.get_default_value()))
223 lines.append('')
224 lines.append('')
224 return '\n'.join(lines)
225 return '\n'.join(lines)
225
226
226
227
227
228
228 class SingletonConfigurable(Configurable):
229 class SingletonConfigurable(Configurable):
229 """A configurable that only allows one instance.
230 """A configurable that only allows one instance.
230
231
231 This class is for classes that should only have one instance of itself
232 This class is for classes that should only have one instance of itself
232 or *any* subclass. To create and retrieve such a class use the
233 or *any* subclass. To create and retrieve such a class use the
233 :meth:`SingletonConfigurable.instance` method.
234 :meth:`SingletonConfigurable.instance` method.
234 """
235 """
235
236
236 _instance = None
237 _instance = None
237
238
238 @classmethod
239 @classmethod
239 def _walk_mro(cls):
240 def _walk_mro(cls):
240 """Walk the cls.mro() for parent classes that are also singletons
241 """Walk the cls.mro() for parent classes that are also singletons
241
242
242 For use in instance()
243 For use in instance()
243 """
244 """
244
245
245 for subclass in cls.mro():
246 for subclass in cls.mro():
246 if issubclass(cls, subclass) and \
247 if issubclass(cls, subclass) and \
247 issubclass(subclass, SingletonConfigurable) and \
248 issubclass(subclass, SingletonConfigurable) and \
248 subclass != SingletonConfigurable:
249 subclass != SingletonConfigurable:
249 yield subclass
250 yield subclass
250
251
251 @classmethod
252 @classmethod
252 def clear_instance(cls):
253 def clear_instance(cls):
253 """unset _instance for this class and singleton parents.
254 """unset _instance for this class and singleton parents.
254 """
255 """
255 if not cls.initialized():
256 if not cls.initialized():
256 return
257 return
257 for subclass in cls._walk_mro():
258 for subclass in cls._walk_mro():
258 if isinstance(subclass._instance, cls):
259 if isinstance(subclass._instance, cls):
259 # only clear instances that are instances
260 # only clear instances that are instances
260 # of the calling class
261 # of the calling class
261 subclass._instance = None
262 subclass._instance = None
262
263
263 @classmethod
264 @classmethod
264 def instance(cls, *args, **kwargs):
265 def instance(cls, *args, **kwargs):
265 """Returns a global instance of this class.
266 """Returns a global instance of this class.
266
267
267 This method create a new instance if none have previously been created
268 This method create a new instance if none have previously been created
268 and returns a previously created instance is one already exists.
269 and returns a previously created instance is one already exists.
269
270
270 The arguments and keyword arguments passed to this method are passed
271 The arguments and keyword arguments passed to this method are passed
271 on to the :meth:`__init__` method of the class upon instantiation.
272 on to the :meth:`__init__` method of the class upon instantiation.
272
273
273 Examples
274 Examples
274 --------
275 --------
275
276
276 Create a singleton class using instance, and retrieve it::
277 Create a singleton class using instance, and retrieve it::
277
278
278 >>> from IPython.config.configurable import SingletonConfigurable
279 >>> from IPython.config.configurable import SingletonConfigurable
279 >>> class Foo(SingletonConfigurable): pass
280 >>> class Foo(SingletonConfigurable): pass
280 >>> foo = Foo.instance()
281 >>> foo = Foo.instance()
281 >>> foo == Foo.instance()
282 >>> foo == Foo.instance()
282 True
283 True
283
284
284 Create a subclass that is retrived using the base class instance::
285 Create a subclass that is retrived using the base class instance::
285
286
286 >>> class Bar(SingletonConfigurable): pass
287 >>> class Bar(SingletonConfigurable): pass
287 >>> class Bam(Bar): pass
288 >>> class Bam(Bar): pass
288 >>> bam = Bam.instance()
289 >>> bam = Bam.instance()
289 >>> bam == Bar.instance()
290 >>> bam == Bar.instance()
290 True
291 True
291 """
292 """
292 # Create and save the instance
293 # Create and save the instance
293 if cls._instance is None:
294 if cls._instance is None:
294 inst = cls(*args, **kwargs)
295 inst = cls(*args, **kwargs)
295 # Now make sure that the instance will also be returned by
296 # Now make sure that the instance will also be returned by
296 # parent classes' _instance attribute.
297 # parent classes' _instance attribute.
297 for subclass in cls._walk_mro():
298 for subclass in cls._walk_mro():
298 subclass._instance = inst
299 subclass._instance = inst
299
300
300 if isinstance(cls._instance, cls):
301 if isinstance(cls._instance, cls):
301 return cls._instance
302 return cls._instance
302 else:
303 else:
303 raise MultipleInstanceError(
304 raise MultipleInstanceError(
304 'Multiple incompatible subclass instances of '
305 'Multiple incompatible subclass instances of '
305 '%s are being created.' % cls.__name__
306 '%s are being created.' % cls.__name__
306 )
307 )
307
308
308 @classmethod
309 @classmethod
309 def initialized(cls):
310 def initialized(cls):
310 """Has an instance been created?"""
311 """Has an instance been created?"""
311 return hasattr(cls, "_instance") and cls._instance is not None
312 return hasattr(cls, "_instance") and cls._instance is not None
312
313
313
314
314 class LoggingConfigurable(Configurable):
315 class LoggingConfigurable(Configurable):
315 """A parent class for Configurables that log.
316 """A parent class for Configurables that log.
316
317
317 Subclasses have a log trait, and the default behavior
318 Subclasses have a log trait, and the default behavior
318 is to get the logger from the currently running Application
319 is to get the logger from the currently running Application
319 via Application.instance().log.
320 via Application.instance().log.
320 """
321 """
321
322
322 log = Instance('logging.Logger')
323 log = Instance('logging.Logger')
323 def _log_default(self):
324 def _log_default(self):
324 from IPython.config.application import Application
325 from IPython.config.application import Application
325 return Application.instance().log
326 return Application.instance().log
326
327
327
328
@@ -1,244 +1,261 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 The Base Application class for IPython.parallel apps
3 The Base Application class for IPython.parallel apps
4
4
5 Authors:
5 Authors:
6
6
7 * Brian Granger
7 * Brian Granger
8 * Min RK
8 * Min RK
9
9
10 """
10 """
11
11
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Copyright (C) 2008-2011 The IPython Development Team
13 # Copyright (C) 2008-2011 The IPython Development Team
14 #
14 #
15 # Distributed under the terms of the BSD License. The full license is in
15 # Distributed under the terms of the BSD License. The full license is in
16 # the file COPYING, distributed as part of this software.
16 # the file COPYING, distributed as part of this software.
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 #-----------------------------------------------------------------------------
19 #-----------------------------------------------------------------------------
20 # Imports
20 # Imports
21 #-----------------------------------------------------------------------------
21 #-----------------------------------------------------------------------------
22
22
23 from __future__ import with_statement
23 from __future__ import with_statement
24
24
25 import os
25 import os
26 import logging
26 import logging
27 import re
27 import re
28 import sys
28 import sys
29
29
30 from subprocess import Popen, PIPE
30 from subprocess import Popen, PIPE
31
31
32 from IPython.core import release
32 from IPython.core import release
33 from IPython.core.crashhandler import CrashHandler
33 from IPython.core.crashhandler import CrashHandler
34 from IPython.core.application import (
34 from IPython.core.application import (
35 BaseIPythonApplication,
35 BaseIPythonApplication,
36 base_aliases as base_ip_aliases,
36 base_aliases as base_ip_aliases,
37 base_flags as base_ip_flags
37 base_flags as base_ip_flags
38 )
38 )
39 from IPython.utils.path import expand_path
39 from IPython.utils.path import expand_path
40
40
41 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
41 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
42
42
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44 # Module errors
44 # Module errors
45 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
46
46
47 class PIDFileError(Exception):
47 class PIDFileError(Exception):
48 pass
48 pass
49
49
50
50
51 #-----------------------------------------------------------------------------
51 #-----------------------------------------------------------------------------
52 # Crash handler for this application
52 # Crash handler for this application
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54
54
55 class ParallelCrashHandler(CrashHandler):
55 class ParallelCrashHandler(CrashHandler):
56 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
56 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
57
57
58 def __init__(self, app):
58 def __init__(self, app):
59 contact_name = release.authors['Min'][0]
59 contact_name = release.authors['Min'][0]
60 contact_email = release.authors['Min'][1]
60 contact_email = release.authors['Min'][1]
61 bug_tracker = 'http://github.com/ipython/ipython/issues'
61 bug_tracker = 'http://github.com/ipython/ipython/issues'
62 super(ParallelCrashHandler,self).__init__(
62 super(ParallelCrashHandler,self).__init__(
63 app, contact_name, contact_email, bug_tracker
63 app, contact_name, contact_email, bug_tracker
64 )
64 )
65
65
66
66
67 #-----------------------------------------------------------------------------
67 #-----------------------------------------------------------------------------
68 # Main application
68 # Main application
69 #-----------------------------------------------------------------------------
69 #-----------------------------------------------------------------------------
70 base_aliases = {}
70 base_aliases = {}
71 base_aliases.update(base_ip_aliases)
71 base_aliases.update(base_ip_aliases)
72 base_aliases.update({
72 base_aliases.update({
73 'profile-dir' : 'ProfileDir.location',
73 'profile-dir' : 'ProfileDir.location',
74 'work-dir' : 'BaseParallelApplication.work_dir',
74 'work-dir' : 'BaseParallelApplication.work_dir',
75 'log-to-file' : 'BaseParallelApplication.log_to_file',
75 'log-to-file' : 'BaseParallelApplication.log_to_file',
76 'clean-logs' : 'BaseParallelApplication.clean_logs',
76 'clean-logs' : 'BaseParallelApplication.clean_logs',
77 'log-url' : 'BaseParallelApplication.log_url',
77 'log-url' : 'BaseParallelApplication.log_url',
78 'cluster-id' : 'BaseParallelApplication.cluster_id',
78 })
79 })
79
80
80 base_flags = {
81 base_flags = {
81 'log-to-file' : (
82 'log-to-file' : (
82 {'BaseParallelApplication' : {'log_to_file' : True}},
83 {'BaseParallelApplication' : {'log_to_file' : True}},
83 "send log output to a file"
84 "send log output to a file"
84 )
85 )
85 }
86 }
86 base_flags.update(base_ip_flags)
87 base_flags.update(base_ip_flags)
87
88
88 class BaseParallelApplication(BaseIPythonApplication):
89 class BaseParallelApplication(BaseIPythonApplication):
89 """The base Application for IPython.parallel apps
90 """The base Application for IPython.parallel apps
90
91
91 Principle extensions to BaseIPyythonApplication:
92 Principle extensions to BaseIPyythonApplication:
92
93
93 * work_dir
94 * work_dir
94 * remote logging via pyzmq
95 * remote logging via pyzmq
95 * IOLoop instance
96 * IOLoop instance
96 """
97 """
97
98
98 crash_handler_class = ParallelCrashHandler
99 crash_handler_class = ParallelCrashHandler
99
100
100 def _log_level_default(self):
101 def _log_level_default(self):
101 # temporarily override default_log_level to INFO
102 # temporarily override default_log_level to INFO
102 return logging.INFO
103 return logging.INFO
103
104
104 work_dir = Unicode(os.getcwdu(), config=True,
105 work_dir = Unicode(os.getcwdu(), config=True,
105 help='Set the working dir for the process.'
106 help='Set the working dir for the process.'
106 )
107 )
107 def _work_dir_changed(self, name, old, new):
108 def _work_dir_changed(self, name, old, new):
108 self.work_dir = unicode(expand_path(new))
109 self.work_dir = unicode(expand_path(new))
109
110
110 log_to_file = Bool(config=True,
111 log_to_file = Bool(config=True,
111 help="whether to log to a file")
112 help="whether to log to a file")
112
113
113 clean_logs = Bool(False, config=True,
114 clean_logs = Bool(False, config=True,
114 help="whether to cleanup old logfiles before starting")
115 help="whether to cleanup old logfiles before starting")
115
116
116 log_url = Unicode('', config=True,
117 log_url = Unicode('', config=True,
117 help="The ZMQ URL of the iplogger to aggregate logging.")
118 help="The ZMQ URL of the iplogger to aggregate logging.")
118
119
120 cluster_id = Unicode('', config=True,
121 help="""String id to add to runtime files, to prevent name collisions when
122 using multiple clusters with a single profile simultaneously.
123
124 When set, files will be named like: 'ipcontroller-<cluster_id>-engine.json'
125
126 Since this is text inserted into filenames, typical recommendations apply:
127 Simple character strings are ideal, and spaces are not recommended (but should
128 generally work).
129 """
130 )
131 def _cluster_id_changed(self, name, old, new):
132 self.name = self.__class__.name
133 if new:
134 self.name += '-%s'%new
135
119 def _config_files_default(self):
136 def _config_files_default(self):
120 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
137 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
121
138
122 loop = Instance('zmq.eventloop.ioloop.IOLoop')
139 loop = Instance('zmq.eventloop.ioloop.IOLoop')
123 def _loop_default(self):
140 def _loop_default(self):
124 from zmq.eventloop.ioloop import IOLoop
141 from zmq.eventloop.ioloop import IOLoop
125 return IOLoop.instance()
142 return IOLoop.instance()
126
143
127 aliases = Dict(base_aliases)
144 aliases = Dict(base_aliases)
128 flags = Dict(base_flags)
145 flags = Dict(base_flags)
129
146
130 def initialize(self, argv=None):
147 def initialize(self, argv=None):
131 """initialize the app"""
148 """initialize the app"""
132 super(BaseParallelApplication, self).initialize(argv)
149 super(BaseParallelApplication, self).initialize(argv)
133 self.to_work_dir()
150 self.to_work_dir()
134 self.reinit_logging()
151 self.reinit_logging()
135
152
136 def to_work_dir(self):
153 def to_work_dir(self):
137 wd = self.work_dir
154 wd = self.work_dir
138 if unicode(wd) != os.getcwdu():
155 if unicode(wd) != os.getcwdu():
139 os.chdir(wd)
156 os.chdir(wd)
140 self.log.info("Changing to working dir: %s" % wd)
157 self.log.info("Changing to working dir: %s" % wd)
141 # This is the working dir by now.
158 # This is the working dir by now.
142 sys.path.insert(0, '')
159 sys.path.insert(0, '')
143
160
144 def reinit_logging(self):
161 def reinit_logging(self):
145 # Remove old log files
162 # Remove old log files
146 log_dir = self.profile_dir.log_dir
163 log_dir = self.profile_dir.log_dir
147 if self.clean_logs:
164 if self.clean_logs:
148 for f in os.listdir(log_dir):
165 for f in os.listdir(log_dir):
149 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
166 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
150 os.remove(os.path.join(log_dir, f))
167 os.remove(os.path.join(log_dir, f))
151 if self.log_to_file:
168 if self.log_to_file:
152 # Start logging to the new log file
169 # Start logging to the new log file
153 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
170 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
154 logfile = os.path.join(log_dir, log_filename)
171 logfile = os.path.join(log_dir, log_filename)
155 open_log_file = open(logfile, 'w')
172 open_log_file = open(logfile, 'w')
156 else:
173 else:
157 open_log_file = None
174 open_log_file = None
158 if open_log_file is not None:
175 if open_log_file is not None:
159 self.log.removeHandler(self._log_handler)
176 self.log.removeHandler(self._log_handler)
160 self._log_handler = logging.StreamHandler(open_log_file)
177 self._log_handler = logging.StreamHandler(open_log_file)
161 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
178 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
162 self._log_handler.setFormatter(self._log_formatter)
179 self._log_handler.setFormatter(self._log_formatter)
163 self.log.addHandler(self._log_handler)
180 self.log.addHandler(self._log_handler)
164 # do not propagate log messages to root logger
181 # do not propagate log messages to root logger
165 # ipcluster app will sometimes print duplicate messages during shutdown
182 # ipcluster app will sometimes print duplicate messages during shutdown
166 # if this is 1 (default):
183 # if this is 1 (default):
167 self.log.propagate = False
184 self.log.propagate = False
168
185
169 def write_pid_file(self, overwrite=False):
186 def write_pid_file(self, overwrite=False):
170 """Create a .pid file in the pid_dir with my pid.
187 """Create a .pid file in the pid_dir with my pid.
171
188
172 This must be called after pre_construct, which sets `self.pid_dir`.
189 This must be called after pre_construct, which sets `self.pid_dir`.
173 This raises :exc:`PIDFileError` if the pid file exists already.
190 This raises :exc:`PIDFileError` if the pid file exists already.
174 """
191 """
175 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
192 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
176 if os.path.isfile(pid_file):
193 if os.path.isfile(pid_file):
177 pid = self.get_pid_from_file()
194 pid = self.get_pid_from_file()
178 if not overwrite:
195 if not overwrite:
179 raise PIDFileError(
196 raise PIDFileError(
180 'The pid file [%s] already exists. \nThis could mean that this '
197 'The pid file [%s] already exists. \nThis could mean that this '
181 'server is already running with [pid=%s].' % (pid_file, pid)
198 'server is already running with [pid=%s].' % (pid_file, pid)
182 )
199 )
183 with open(pid_file, 'w') as f:
200 with open(pid_file, 'w') as f:
184 self.log.info("Creating pid file: %s" % pid_file)
201 self.log.info("Creating pid file: %s" % pid_file)
185 f.write(repr(os.getpid())+'\n')
202 f.write(repr(os.getpid())+'\n')
186
203
187 def remove_pid_file(self):
204 def remove_pid_file(self):
188 """Remove the pid file.
205 """Remove the pid file.
189
206
190 This should be called at shutdown by registering a callback with
207 This should be called at shutdown by registering a callback with
191 :func:`reactor.addSystemEventTrigger`. This needs to return
208 :func:`reactor.addSystemEventTrigger`. This needs to return
192 ``None``.
209 ``None``.
193 """
210 """
194 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
211 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
195 if os.path.isfile(pid_file):
212 if os.path.isfile(pid_file):
196 try:
213 try:
197 self.log.info("Removing pid file: %s" % pid_file)
214 self.log.info("Removing pid file: %s" % pid_file)
198 os.remove(pid_file)
215 os.remove(pid_file)
199 except:
216 except:
200 self.log.warn("Error removing the pid file: %s" % pid_file)
217 self.log.warn("Error removing the pid file: %s" % pid_file)
201
218
202 def get_pid_from_file(self):
219 def get_pid_from_file(self):
203 """Get the pid from the pid file.
220 """Get the pid from the pid file.
204
221
205 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
222 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
206 """
223 """
207 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
224 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
208 if os.path.isfile(pid_file):
225 if os.path.isfile(pid_file):
209 with open(pid_file, 'r') as f:
226 with open(pid_file, 'r') as f:
210 s = f.read().strip()
227 s = f.read().strip()
211 try:
228 try:
212 pid = int(s)
229 pid = int(s)
213 except:
230 except:
214 raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
231 raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
215 return pid
232 return pid
216 else:
233 else:
217 raise PIDFileError('pid file not found: %s' % pid_file)
234 raise PIDFileError('pid file not found: %s' % pid_file)
218
235
219 def check_pid(self, pid):
236 def check_pid(self, pid):
220 if os.name == 'nt':
237 if os.name == 'nt':
221 try:
238 try:
222 import ctypes
239 import ctypes
223 # returns 0 if no such process (of ours) exists
240 # returns 0 if no such process (of ours) exists
224 # positive int otherwise
241 # positive int otherwise
225 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
242 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
226 except Exception:
243 except Exception:
227 self.log.warn(
244 self.log.warn(
228 "Could not determine whether pid %i is running via `OpenProcess`. "
245 "Could not determine whether pid %i is running via `OpenProcess`. "
229 " Making the likely assumption that it is."%pid
246 " Making the likely assumption that it is."%pid
230 )
247 )
231 return True
248 return True
232 return bool(p)
249 return bool(p)
233 else:
250 else:
234 try:
251 try:
235 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
252 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
236 output,_ = p.communicate()
253 output,_ = p.communicate()
237 except OSError:
254 except OSError:
238 self.log.warn(
255 self.log.warn(
239 "Could not determine whether pid %i is running via `ps x`. "
256 "Could not determine whether pid %i is running via `ps x`. "
240 " Making the likely assumption that it is."%pid
257 " Making the likely assumption that it is."%pid
241 )
258 )
242 return True
259 return True
243 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
260 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
244 return pid in pids
261 return pid in pids
@@ -1,529 +1,528 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 import errno
24 import errno
25 import logging
25 import logging
26 import os
26 import os
27 import re
27 import re
28 import signal
28 import signal
29
29
30 from subprocess import check_call, CalledProcessError, PIPE
30 from subprocess import check_call, CalledProcessError, PIPE
31 import zmq
31 import zmq
32 from zmq.eventloop import ioloop
32 from zmq.eventloop import ioloop
33
33
34 from IPython.config.application import Application, boolean_flag
34 from IPython.config.application import Application, boolean_flag
35 from IPython.config.loader import Config
35 from IPython.config.loader import Config
36 from IPython.core.application import BaseIPythonApplication
36 from IPython.core.application import BaseIPythonApplication
37 from IPython.core.profiledir import ProfileDir
37 from IPython.core.profiledir import ProfileDir
38 from IPython.utils.daemonize import daemonize
38 from IPython.utils.daemonize import daemonize
39 from IPython.utils.importstring import import_item
39 from IPython.utils.importstring import import_item
40 from IPython.utils.sysinfo import num_cpus
40 from IPython.utils.sysinfo import num_cpus
41 from IPython.utils.traitlets import (Int, Unicode, Bool, CFloat, Dict, List,
41 from IPython.utils.traitlets import (Int, Unicode, Bool, CFloat, Dict, List,
42 DottedObjectName)
42 DottedObjectName)
43
43
44 from IPython.parallel.apps.baseapp import (
44 from IPython.parallel.apps.baseapp import (
45 BaseParallelApplication,
45 BaseParallelApplication,
46 PIDFileError,
46 PIDFileError,
47 base_flags, base_aliases
47 base_flags, base_aliases
48 )
48 )
49
49
50
50
51 #-----------------------------------------------------------------------------
51 #-----------------------------------------------------------------------------
52 # Module level variables
52 # Module level variables
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54
54
55
55
56 default_config_file_name = u'ipcluster_config.py'
56 default_config_file_name = u'ipcluster_config.py'
57
57
58
58
59 _description = """Start an IPython cluster for parallel computing.
59 _description = """Start an IPython cluster for parallel computing.
60
60
61 An IPython cluster consists of 1 controller and 1 or more engines.
61 An IPython cluster consists of 1 controller and 1 or more engines.
62 This command automates the startup of these processes using a wide
62 This command automates the startup of these processes using a wide
63 range of startup methods (SSH, local processes, PBS, mpiexec,
63 range of startup methods (SSH, local processes, PBS, mpiexec,
64 Windows HPC Server 2008). To start a cluster with 4 engines on your
64 Windows HPC Server 2008). To start a cluster with 4 engines on your
65 local host simply do 'ipcluster start --n=4'. For more complex usage
65 local host simply do 'ipcluster start --n=4'. For more complex usage
66 you will typically do 'ipython profile create mycluster --parallel', then edit
66 you will typically do 'ipython profile create mycluster --parallel', then edit
67 configuration files, followed by 'ipcluster start --profile=mycluster --n=4'.
67 configuration files, followed by 'ipcluster start --profile=mycluster --n=4'.
68 """
68 """
69
69
70 _main_examples = """
70 _main_examples = """
71 ipcluster start --n=4 # start a 4 node cluster on localhost
71 ipcluster start --n=4 # start a 4 node cluster on localhost
72 ipcluster start -h # show the help string for the start subcmd
72 ipcluster start -h # show the help string for the start subcmd
73
73
74 ipcluster stop -h # show the help string for the stop subcmd
74 ipcluster stop -h # show the help string for the stop subcmd
75 ipcluster engines -h # show the help string for the engines subcmd
75 ipcluster engines -h # show the help string for the engines subcmd
76 """
76 """
77
77
78 _start_examples = """
78 _start_examples = """
79 ipython profile create mycluster --parallel # create mycluster profile
79 ipython profile create mycluster --parallel # create mycluster profile
80 ipcluster start --profile=mycluster --n=4 # start mycluster with 4 nodes
80 ipcluster start --profile=mycluster --n=4 # start mycluster with 4 nodes
81 """
81 """
82
82
83 _stop_examples = """
83 _stop_examples = """
84 ipcluster stop --profile=mycluster # stop a running cluster by profile name
84 ipcluster stop --profile=mycluster # stop a running cluster by profile name
85 """
85 """
86
86
87 _engines_examples = """
87 _engines_examples = """
88 ipcluster engines --profile=mycluster --n=4 # start 4 engines only
88 ipcluster engines --profile=mycluster --n=4 # start 4 engines only
89 """
89 """
90
90
91
91
92 # Exit codes for ipcluster
92 # Exit codes for ipcluster
93
93
94 # This will be the exit code if the ipcluster appears to be running because
94 # This will be the exit code if the ipcluster appears to be running because
95 # a .pid file exists
95 # a .pid file exists
96 ALREADY_STARTED = 10
96 ALREADY_STARTED = 10
97
97
98
98
99 # This will be the exit code if ipcluster stop is run, but there is not .pid
99 # This will be the exit code if ipcluster stop is run, but there is not .pid
100 # file to be found.
100 # file to be found.
101 ALREADY_STOPPED = 11
101 ALREADY_STOPPED = 11
102
102
103 # This will be the exit code if ipcluster engines is run, but there is not .pid
103 # This will be the exit code if ipcluster engines is run, but there is not .pid
104 # file to be found.
104 # file to be found.
105 NO_CLUSTER = 12
105 NO_CLUSTER = 12
106
106
107
107
108 #-----------------------------------------------------------------------------
108 #-----------------------------------------------------------------------------
109 # Main application
109 # Main application
110 #-----------------------------------------------------------------------------
110 #-----------------------------------------------------------------------------
111 start_help = """Start an IPython cluster for parallel computing
111 start_help = """Start an IPython cluster for parallel computing
112
112
113 Start an ipython cluster by its profile name or cluster
113 Start an ipython cluster by its profile name or cluster
114 directory. Cluster directories contain configuration, log and
114 directory. Cluster directories contain configuration, log and
115 security related files and are named using the convention
115 security related files and are named using the convention
116 'profile_<name>' and should be creating using the 'start'
116 'profile_<name>' and should be creating using the 'start'
117 subcommand of 'ipcluster'. If your cluster directory is in
117 subcommand of 'ipcluster'. If your cluster directory is in
118 the cwd or the ipython directory, you can simply refer to it
118 the cwd or the ipython directory, you can simply refer to it
119 using its profile name, 'ipcluster start --n=4 --profile=<profile>`,
119 using its profile name, 'ipcluster start --n=4 --profile=<profile>`,
120 otherwise use the 'profile-dir' option.
120 otherwise use the 'profile-dir' option.
121 """
121 """
122 stop_help = """Stop a running IPython cluster
122 stop_help = """Stop a running IPython cluster
123
123
124 Stop a running ipython cluster by its profile name or cluster
124 Stop a running ipython cluster by its profile name or cluster
125 directory. Cluster directories are named using the convention
125 directory. Cluster directories are named using the convention
126 'profile_<name>'. If your cluster directory is in
126 'profile_<name>'. If your cluster directory is in
127 the cwd or the ipython directory, you can simply refer to it
127 the cwd or the ipython directory, you can simply refer to it
128 using its profile name, 'ipcluster stop --profile=<profile>`, otherwise
128 using its profile name, 'ipcluster stop --profile=<profile>`, otherwise
129 use the '--profile-dir' option.
129 use the '--profile-dir' option.
130 """
130 """
131 engines_help = """Start engines connected to an existing IPython cluster
131 engines_help = """Start engines connected to an existing IPython cluster
132
132
133 Start one or more engines to connect to an existing Cluster
133 Start one or more engines to connect to an existing Cluster
134 by profile name or cluster directory.
134 by profile name or cluster directory.
135 Cluster directories contain configuration, log and
135 Cluster directories contain configuration, log and
136 security related files and are named using the convention
136 security related files and are named using the convention
137 'profile_<name>' and should be creating using the 'start'
137 'profile_<name>' and should be creating using the 'start'
138 subcommand of 'ipcluster'. If your cluster directory is in
138 subcommand of 'ipcluster'. If your cluster directory is in
139 the cwd or the ipython directory, you can simply refer to it
139 the cwd or the ipython directory, you can simply refer to it
140 using its profile name, 'ipcluster engines --n=4 --profile=<profile>`,
140 using its profile name, 'ipcluster engines --n=4 --profile=<profile>`,
141 otherwise use the 'profile-dir' option.
141 otherwise use the 'profile-dir' option.
142 """
142 """
143 stop_aliases = dict(
143 stop_aliases = dict(
144 signal='IPClusterStop.signal',
144 signal='IPClusterStop.signal',
145 )
145 )
146 stop_aliases.update(base_aliases)
146 stop_aliases.update(base_aliases)
147
147
148 class IPClusterStop(BaseParallelApplication):
148 class IPClusterStop(BaseParallelApplication):
149 name = u'ipcluster'
149 name = u'ipcluster'
150 description = stop_help
150 description = stop_help
151 examples = _stop_examples
151 examples = _stop_examples
152 config_file_name = Unicode(default_config_file_name)
152 config_file_name = Unicode(default_config_file_name)
153
153
154 signal = Int(signal.SIGINT, config=True,
154 signal = Int(signal.SIGINT, config=True,
155 help="signal to use for stopping processes.")
155 help="signal to use for stopping processes.")
156
156
157 aliases = Dict(stop_aliases)
157 aliases = Dict(stop_aliases)
158
158
159 def start(self):
159 def start(self):
160 """Start the app for the stop subcommand."""
160 """Start the app for the stop subcommand."""
161 try:
161 try:
162 pid = self.get_pid_from_file()
162 pid = self.get_pid_from_file()
163 except PIDFileError:
163 except PIDFileError:
164 self.log.critical(
164 self.log.critical(
165 'Could not read pid file, cluster is probably not running.'
165 'Could not read pid file, cluster is probably not running.'
166 )
166 )
167 # Here I exit with a unusual exit status that other processes
167 # Here I exit with a unusual exit status that other processes
168 # can watch for to learn how I existed.
168 # can watch for to learn how I existed.
169 self.remove_pid_file()
169 self.remove_pid_file()
170 self.exit(ALREADY_STOPPED)
170 self.exit(ALREADY_STOPPED)
171
171
172 if not self.check_pid(pid):
172 if not self.check_pid(pid):
173 self.log.critical(
173 self.log.critical(
174 'Cluster [pid=%r] is not running.' % pid
174 'Cluster [pid=%r] is not running.' % pid
175 )
175 )
176 self.remove_pid_file()
176 self.remove_pid_file()
177 # Here I exit with a unusual exit status that other processes
177 # Here I exit with a unusual exit status that other processes
178 # can watch for to learn how I existed.
178 # can watch for to learn how I existed.
179 self.exit(ALREADY_STOPPED)
179 self.exit(ALREADY_STOPPED)
180
180
181 elif os.name=='posix':
181 elif os.name=='posix':
182 sig = self.signal
182 sig = self.signal
183 self.log.info(
183 self.log.info(
184 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
184 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
185 )
185 )
186 try:
186 try:
187 os.kill(pid, sig)
187 os.kill(pid, sig)
188 except OSError:
188 except OSError:
189 self.log.error("Stopping cluster failed, assuming already dead.",
189 self.log.error("Stopping cluster failed, assuming already dead.",
190 exc_info=True)
190 exc_info=True)
191 self.remove_pid_file()
191 self.remove_pid_file()
192 elif os.name=='nt':
192 elif os.name=='nt':
193 try:
193 try:
194 # kill the whole tree
194 # kill the whole tree
195 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
195 p = check_call(['taskkill', '-pid', str(pid), '-t', '-f'], stdout=PIPE,stderr=PIPE)
196 except (CalledProcessError, OSError):
196 except (CalledProcessError, OSError):
197 self.log.error("Stopping cluster failed, assuming already dead.",
197 self.log.error("Stopping cluster failed, assuming already dead.",
198 exc_info=True)
198 exc_info=True)
199 self.remove_pid_file()
199 self.remove_pid_file()
200
200
201 engine_aliases = {}
201 engine_aliases = {}
202 engine_aliases.update(base_aliases)
202 engine_aliases.update(base_aliases)
203 engine_aliases.update(dict(
203 engine_aliases.update(dict(
204 n='IPClusterEngines.n',
204 n='IPClusterEngines.n',
205 engines = 'IPClusterEngines.engine_launcher_class',
205 engines = 'IPClusterEngines.engine_launcher_class',
206 daemonize = 'IPClusterEngines.daemonize',
206 daemonize = 'IPClusterEngines.daemonize',
207 ))
207 ))
208 engine_flags = {}
208 engine_flags = {}
209 engine_flags.update(base_flags)
209 engine_flags.update(base_flags)
210
210
211 engine_flags.update(dict(
211 engine_flags.update(dict(
212 daemonize=(
212 daemonize=(
213 {'IPClusterEngines' : {'daemonize' : True}},
213 {'IPClusterEngines' : {'daemonize' : True}},
214 """run the cluster into the background (not available on Windows)""",
214 """run the cluster into the background (not available on Windows)""",
215 )
215 )
216 ))
216 ))
217 class IPClusterEngines(BaseParallelApplication):
217 class IPClusterEngines(BaseParallelApplication):
218
218
219 name = u'ipcluster'
219 name = u'ipcluster'
220 description = engines_help
220 description = engines_help
221 examples = _engines_examples
221 examples = _engines_examples
222 usage = None
222 usage = None
223 config_file_name = Unicode(default_config_file_name)
223 config_file_name = Unicode(default_config_file_name)
224 default_log_level = logging.INFO
224 default_log_level = logging.INFO
225 classes = List()
225 classes = List()
226 def _classes_default(self):
226 def _classes_default(self):
227 from IPython.parallel.apps import launcher
227 from IPython.parallel.apps import launcher
228 launchers = launcher.all_launchers
228 launchers = launcher.all_launchers
229 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
229 eslaunchers = [ l for l in launchers if 'EngineSet' in l.__name__]
230 return [ProfileDir]+eslaunchers
230 return [ProfileDir]+eslaunchers
231
231
232 n = Int(num_cpus(), config=True,
232 n = Int(num_cpus(), config=True,
233 help="""The number of engines to start. The default is to use one for each
233 help="""The number of engines to start. The default is to use one for each
234 CPU on your machine""")
234 CPU on your machine""")
235
235
236 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
236 engine_launcher_class = DottedObjectName('LocalEngineSetLauncher',
237 config=True,
237 config=True,
238 help="""The class for launching a set of Engines. Change this value
238 help="""The class for launching a set of Engines. Change this value
239 to use various batch systems to launch your engines, such as PBS,SGE,MPIExec,etc.
239 to use various batch systems to launch your engines, such as PBS,SGE,MPIExec,etc.
240 Each launcher class has its own set of configuration options, for making sure
240 Each launcher class has its own set of configuration options, for making sure
241 it will work in your environment.
241 it will work in your environment.
242
242
243 You can also write your own launcher, and specify it's absolute import path,
243 You can also write your own launcher, and specify it's absolute import path,
244 as in 'mymodule.launcher.FTLEnginesLauncher`.
244 as in 'mymodule.launcher.FTLEnginesLauncher`.
245
245
246 Examples include:
246 Examples include:
247
247
248 LocalEngineSetLauncher : start engines locally as subprocesses [default]
248 LocalEngineSetLauncher : start engines locally as subprocesses [default]
249 MPIExecEngineSetLauncher : use mpiexec to launch in an MPI environment
249 MPIExecEngineSetLauncher : use mpiexec to launch in an MPI environment
250 PBSEngineSetLauncher : use PBS (qsub) to submit engines to a batch queue
250 PBSEngineSetLauncher : use PBS (qsub) to submit engines to a batch queue
251 SGEEngineSetLauncher : use SGE (qsub) to submit engines to a batch queue
251 SGEEngineSetLauncher : use SGE (qsub) to submit engines to a batch queue
252 SSHEngineSetLauncher : use SSH to start the controller
252 SSHEngineSetLauncher : use SSH to start the controller
253 Note that SSH does *not* move the connection files
253 Note that SSH does *not* move the connection files
254 around, so you will likely have to do this manually
254 around, so you will likely have to do this manually
255 unless the machines are on a shared file system.
255 unless the machines are on a shared file system.
256 WindowsHPCEngineSetLauncher : use Windows HPC
256 WindowsHPCEngineSetLauncher : use Windows HPC
257 """
257 """
258 )
258 )
259 daemonize = Bool(False, config=True,
259 daemonize = Bool(False, config=True,
260 help="""Daemonize the ipcluster program. This implies --log-to-file.
260 help="""Daemonize the ipcluster program. This implies --log-to-file.
261 Not available on Windows.
261 Not available on Windows.
262 """)
262 """)
263
263
264 def _daemonize_changed(self, name, old, new):
264 def _daemonize_changed(self, name, old, new):
265 if new:
265 if new:
266 self.log_to_file = True
266 self.log_to_file = True
267
267
268 aliases = Dict(engine_aliases)
268 aliases = Dict(engine_aliases)
269 flags = Dict(engine_flags)
269 flags = Dict(engine_flags)
270 _stopping = False
270 _stopping = False
271
271
272 def initialize(self, argv=None):
272 def initialize(self, argv=None):
273 super(IPClusterEngines, self).initialize(argv)
273 super(IPClusterEngines, self).initialize(argv)
274 self.init_signal()
274 self.init_signal()
275 self.init_launchers()
275 self.init_launchers()
276
276
277 def init_launchers(self):
277 def init_launchers(self):
278 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
278 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
279 self.engine_launcher.on_stop(lambda r: self.loop.stop())
279 self.engine_launcher.on_stop(lambda r: self.loop.stop())
280
280
281 def init_signal(self):
281 def init_signal(self):
282 # Setup signals
282 # Setup signals
283 signal.signal(signal.SIGINT, self.sigint_handler)
283 signal.signal(signal.SIGINT, self.sigint_handler)
284
284
285 def build_launcher(self, clsname):
285 def build_launcher(self, clsname, kind=None):
286 """import and instantiate a Launcher based on importstring"""
286 """import and instantiate a Launcher based on importstring"""
287 if '.' not in clsname:
287 if '.' not in clsname:
288 # not a module, presume it's the raw name in apps.launcher
288 # not a module, presume it's the raw name in apps.launcher
289 if kind and kind not in clsname:
290 # doesn't match necessary full class name, assume it's
291 # just 'PBS' or 'MPIExec' prefix:
292 clsname = clsname + kind + 'Launcher'
289 clsname = 'IPython.parallel.apps.launcher.'+clsname
293 clsname = 'IPython.parallel.apps.launcher.'+clsname
290 # print repr(clsname)
291 try:
294 try:
292 klass = import_item(clsname)
295 klass = import_item(clsname)
293 except (ImportError, KeyError):
296 except (ImportError, KeyError):
294 self.log.fatal("Could not import launcher class: %r"%clsname)
297 self.log.fatal("Could not import launcher class: %r"%clsname)
295 self.exit(1)
298 self.exit(1)
296
299
297 launcher = klass(
300 launcher = klass(
298 work_dir=u'.', config=self.config, log=self.log
301 work_dir=u'.', config=self.config, log=self.log,
302 profile_dir=self.profile_dir.location, cluster_id=self.cluster_id,
299 )
303 )
300 return launcher
304 return launcher
301
305
302 def start_engines(self):
306 def start_engines(self):
303 self.log.info("Starting %i engines"%self.n)
307 self.log.info("Starting %i engines"%self.n)
304 self.engine_launcher.start(
308 self.engine_launcher.start(self.n)
305 self.n,
306 self.profile_dir.location
307 )
308
309
309 def stop_engines(self):
310 def stop_engines(self):
310 self.log.info("Stopping Engines...")
311 self.log.info("Stopping Engines...")
311 if self.engine_launcher.running:
312 if self.engine_launcher.running:
312 d = self.engine_launcher.stop()
313 d = self.engine_launcher.stop()
313 return d
314 return d
314 else:
315 else:
315 return None
316 return None
316
317
317 def stop_launchers(self, r=None):
318 def stop_launchers(self, r=None):
318 if not self._stopping:
319 if not self._stopping:
319 self._stopping = True
320 self._stopping = True
320 self.log.error("IPython cluster: stopping")
321 self.log.error("IPython cluster: stopping")
321 self.stop_engines()
322 self.stop_engines()
322 # Wait a few seconds to let things shut down.
323 # Wait a few seconds to let things shut down.
323 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
324 dc = ioloop.DelayedCallback(self.loop.stop, 4000, self.loop)
324 dc.start()
325 dc.start()
325
326
326 def sigint_handler(self, signum, frame):
327 def sigint_handler(self, signum, frame):
327 self.log.debug("SIGINT received, stopping launchers...")
328 self.log.debug("SIGINT received, stopping launchers...")
328 self.stop_launchers()
329 self.stop_launchers()
329
330
330 def start_logging(self):
331 def start_logging(self):
331 # Remove old log files of the controller and engine
332 # Remove old log files of the controller and engine
332 if self.clean_logs:
333 if self.clean_logs:
333 log_dir = self.profile_dir.log_dir
334 log_dir = self.profile_dir.log_dir
334 for f in os.listdir(log_dir):
335 for f in os.listdir(log_dir):
335 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
336 if re.match(r'ip(engine|controller)z-\d+\.(log|err|out)',f):
336 os.remove(os.path.join(log_dir, f))
337 os.remove(os.path.join(log_dir, f))
337 # This will remove old log files for ipcluster itself
338 # This will remove old log files for ipcluster itself
338 # super(IPBaseParallelApplication, self).start_logging()
339 # super(IPBaseParallelApplication, self).start_logging()
339
340
340 def start(self):
341 def start(self):
341 """Start the app for the engines subcommand."""
342 """Start the app for the engines subcommand."""
342 self.log.info("IPython cluster: started")
343 self.log.info("IPython cluster: started")
343 # First see if the cluster is already running
344 # First see if the cluster is already running
344
345
345 # Now log and daemonize
346 # Now log and daemonize
346 self.log.info(
347 self.log.info(
347 'Starting engines with [daemon=%r]' % self.daemonize
348 'Starting engines with [daemon=%r]' % self.daemonize
348 )
349 )
349 # TODO: Get daemonize working on Windows or as a Windows Server.
350 # TODO: Get daemonize working on Windows or as a Windows Server.
350 if self.daemonize:
351 if self.daemonize:
351 if os.name=='posix':
352 if os.name=='posix':
352 daemonize()
353 daemonize()
353
354
354 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
355 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
355 dc.start()
356 dc.start()
356 # Now write the new pid file AFTER our new forked pid is active.
357 # Now write the new pid file AFTER our new forked pid is active.
357 # self.write_pid_file()
358 # self.write_pid_file()
358 try:
359 try:
359 self.loop.start()
360 self.loop.start()
360 except KeyboardInterrupt:
361 except KeyboardInterrupt:
361 pass
362 pass
362 except zmq.ZMQError as e:
363 except zmq.ZMQError as e:
363 if e.errno == errno.EINTR:
364 if e.errno == errno.EINTR:
364 pass
365 pass
365 else:
366 else:
366 raise
367 raise
367
368
368 start_aliases = {}
369 start_aliases = {}
369 start_aliases.update(engine_aliases)
370 start_aliases.update(engine_aliases)
370 start_aliases.update(dict(
371 start_aliases.update(dict(
371 delay='IPClusterStart.delay',
372 delay='IPClusterStart.delay',
372 controller = 'IPClusterStart.controller_launcher_class',
373 controller = 'IPClusterStart.controller_launcher_class',
373 ))
374 ))
374 start_aliases['clean-logs'] = 'IPClusterStart.clean_logs'
375 start_aliases['clean-logs'] = 'IPClusterStart.clean_logs'
375
376
376 # set inherited Start keys directly, to ensure command-line args get higher priority
377 # set inherited Start keys directly, to ensure command-line args get higher priority
377 # than config file options.
378 # than config file options.
378 for key,value in start_aliases.items():
379 for key,value in start_aliases.items():
379 if value.startswith('IPClusterEngines'):
380 if value.startswith('IPClusterEngines'):
380 start_aliases[key] = value.replace('IPClusterEngines', 'IPClusterStart')
381 start_aliases[key] = value.replace('IPClusterEngines', 'IPClusterStart')
381
382
382 class IPClusterStart(IPClusterEngines):
383 class IPClusterStart(IPClusterEngines):
383
384
384 name = u'ipcluster'
385 name = u'ipcluster'
385 description = start_help
386 description = start_help
386 examples = _start_examples
387 examples = _start_examples
387 default_log_level = logging.INFO
388 default_log_level = logging.INFO
388 auto_create = Bool(True, config=True,
389 auto_create = Bool(True, config=True,
389 help="whether to create the profile_dir if it doesn't exist")
390 help="whether to create the profile_dir if it doesn't exist")
390 classes = List()
391 classes = List()
391 def _classes_default(self,):
392 def _classes_default(self,):
392 from IPython.parallel.apps import launcher
393 from IPython.parallel.apps import launcher
393 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
394 return [ProfileDir] + [IPClusterEngines] + launcher.all_launchers
394
395
395 clean_logs = Bool(True, config=True,
396 clean_logs = Bool(True, config=True,
396 help="whether to cleanup old logs before starting")
397 help="whether to cleanup old logs before starting")
397
398
398 delay = CFloat(1., config=True,
399 delay = CFloat(1., config=True,
399 help="delay (in s) between starting the controller and the engines")
400 help="delay (in s) between starting the controller and the engines")
400
401
401 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
402 controller_launcher_class = DottedObjectName('LocalControllerLauncher',
402 config=True,
403 config=True,
403 helep="""The class for launching a Controller. Change this value if you want
404 helep="""The class for launching a Controller. Change this value if you want
404 your controller to also be launched by a batch system, such as PBS,SGE,MPIExec,etc.
405 your controller to also be launched by a batch system, such as PBS,SGE,MPIExec,etc.
405
406
406 Each launcher class has its own set of configuration options, for making sure
407 Each launcher class has its own set of configuration options, for making sure
407 it will work in your environment.
408 it will work in your environment.
408
409
409 Examples include:
410 Examples include:
410
411
411 LocalControllerLauncher : start engines locally as subprocesses
412 LocalControllerLauncher : start engines locally as subprocesses
412 MPIExecControllerLauncher : use mpiexec to launch engines in an MPI universe
413 MPIExecControllerLauncher : use mpiexec to launch engines in an MPI universe
413 PBSControllerLauncher : use PBS (qsub) to submit engines to a batch queue
414 PBSControllerLauncher : use PBS (qsub) to submit engines to a batch queue
414 SGEControllerLauncher : use SGE (qsub) to submit engines to a batch queue
415 SGEControllerLauncher : use SGE (qsub) to submit engines to a batch queue
415 SSHControllerLauncher : use SSH to start the controller
416 SSHControllerLauncher : use SSH to start the controller
416 WindowsHPCControllerLauncher : use Windows HPC
417 WindowsHPCControllerLauncher : use Windows HPC
417 """
418 """
418 )
419 )
419 reset = Bool(False, config=True,
420 reset = Bool(False, config=True,
420 help="Whether to reset config files as part of '--create'."
421 help="Whether to reset config files as part of '--create'."
421 )
422 )
422
423
423 # flags = Dict(flags)
424 # flags = Dict(flags)
424 aliases = Dict(start_aliases)
425 aliases = Dict(start_aliases)
425
426
426 def init_launchers(self):
427 def init_launchers(self):
427 self.controller_launcher = self.build_launcher(self.controller_launcher_class)
428 self.controller_launcher = self.build_launcher(self.controller_launcher_class, 'Controller')
428 self.engine_launcher = self.build_launcher(self.engine_launcher_class)
429 self.engine_launcher = self.build_launcher(self.engine_launcher_class, 'EngineSet')
429 self.controller_launcher.on_stop(self.stop_launchers)
430 self.controller_launcher.on_stop(self.stop_launchers)
430
431
431 def start_controller(self):
432 def start_controller(self):
432 self.controller_launcher.start(
433 self.controller_launcher.start()
433 self.profile_dir.location
434 )
435
434
436 def stop_controller(self):
435 def stop_controller(self):
437 # self.log.info("In stop_controller")
436 # self.log.info("In stop_controller")
438 if self.controller_launcher and self.controller_launcher.running:
437 if self.controller_launcher and self.controller_launcher.running:
439 return self.controller_launcher.stop()
438 return self.controller_launcher.stop()
440
439
441 def stop_launchers(self, r=None):
440 def stop_launchers(self, r=None):
442 if not self._stopping:
441 if not self._stopping:
443 self.stop_controller()
442 self.stop_controller()
444 super(IPClusterStart, self).stop_launchers()
443 super(IPClusterStart, self).stop_launchers()
445
444
446 def start(self):
445 def start(self):
447 """Start the app for the start subcommand."""
446 """Start the app for the start subcommand."""
448 # First see if the cluster is already running
447 # First see if the cluster is already running
449 try:
448 try:
450 pid = self.get_pid_from_file()
449 pid = self.get_pid_from_file()
451 except PIDFileError:
450 except PIDFileError:
452 pass
451 pass
453 else:
452 else:
454 if self.check_pid(pid):
453 if self.check_pid(pid):
455 self.log.critical(
454 self.log.critical(
456 'Cluster is already running with [pid=%s]. '
455 'Cluster is already running with [pid=%s]. '
457 'use "ipcluster stop" to stop the cluster.' % pid
456 'use "ipcluster stop" to stop the cluster.' % pid
458 )
457 )
459 # Here I exit with a unusual exit status that other processes
458 # Here I exit with a unusual exit status that other processes
460 # can watch for to learn how I existed.
459 # can watch for to learn how I existed.
461 self.exit(ALREADY_STARTED)
460 self.exit(ALREADY_STARTED)
462 else:
461 else:
463 self.remove_pid_file()
462 self.remove_pid_file()
464
463
465
464
466 # Now log and daemonize
465 # Now log and daemonize
467 self.log.info(
466 self.log.info(
468 'Starting ipcluster with [daemon=%r]' % self.daemonize
467 'Starting ipcluster with [daemon=%r]' % self.daemonize
469 )
468 )
470 # TODO: Get daemonize working on Windows or as a Windows Server.
469 # TODO: Get daemonize working on Windows or as a Windows Server.
471 if self.daemonize:
470 if self.daemonize:
472 if os.name=='posix':
471 if os.name=='posix':
473 daemonize()
472 daemonize()
474
473
475 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
474 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
476 dc.start()
475 dc.start()
477 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
476 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
478 dc.start()
477 dc.start()
479 # Now write the new pid file AFTER our new forked pid is active.
478 # Now write the new pid file AFTER our new forked pid is active.
480 self.write_pid_file()
479 self.write_pid_file()
481 try:
480 try:
482 self.loop.start()
481 self.loop.start()
483 except KeyboardInterrupt:
482 except KeyboardInterrupt:
484 pass
483 pass
485 except zmq.ZMQError as e:
484 except zmq.ZMQError as e:
486 if e.errno == errno.EINTR:
485 if e.errno == errno.EINTR:
487 pass
486 pass
488 else:
487 else:
489 raise
488 raise
490 finally:
489 finally:
491 self.remove_pid_file()
490 self.remove_pid_file()
492
491
493 base='IPython.parallel.apps.ipclusterapp.IPCluster'
492 base='IPython.parallel.apps.ipclusterapp.IPCluster'
494
493
495 class IPClusterApp(Application):
494 class IPClusterApp(Application):
496 name = u'ipcluster'
495 name = u'ipcluster'
497 description = _description
496 description = _description
498 examples = _main_examples
497 examples = _main_examples
499
498
500 subcommands = {
499 subcommands = {
501 'start' : (base+'Start', start_help),
500 'start' : (base+'Start', start_help),
502 'stop' : (base+'Stop', stop_help),
501 'stop' : (base+'Stop', stop_help),
503 'engines' : (base+'Engines', engines_help),
502 'engines' : (base+'Engines', engines_help),
504 }
503 }
505
504
506 # no aliases or flags for parent App
505 # no aliases or flags for parent App
507 aliases = Dict()
506 aliases = Dict()
508 flags = Dict()
507 flags = Dict()
509
508
510 def start(self):
509 def start(self):
511 if self.subapp is None:
510 if self.subapp is None:
512 print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys())
511 print "No subcommand specified. Must specify one of: %s"%(self.subcommands.keys())
513 print
512 print
514 self.print_description()
513 self.print_description()
515 self.print_subcommands()
514 self.print_subcommands()
516 self.exit(1)
515 self.exit(1)
517 else:
516 else:
518 return self.subapp.start()
517 return self.subapp.start()
519
518
520 def launch_new_instance():
519 def launch_new_instance():
521 """Create and run the IPython cluster."""
520 """Create and run the IPython cluster."""
522 app = IPClusterApp.instance()
521 app = IPClusterApp.instance()
523 app.initialize()
522 app.initialize()
524 app.start()
523 app.start()
525
524
526
525
527 if __name__ == '__main__':
526 if __name__ == '__main__':
528 launch_new_instance()
527 launch_new_instance()
529
528
@@ -1,441 +1,452 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 from __future__ import with_statement
24 from __future__ import with_statement
25
25
26 import os
26 import os
27 import socket
27 import socket
28 import stat
28 import stat
29 import sys
29 import sys
30 import uuid
30 import uuid
31
31
32 from multiprocessing import Process
32 from multiprocessing import Process
33
33
34 import zmq
34 import zmq
35 from zmq.devices import ProcessMonitoredQueue
35 from zmq.devices import ProcessMonitoredQueue
36 from zmq.log.handlers import PUBHandler
36 from zmq.log.handlers import PUBHandler
37 from zmq.utils import jsonapi as json
37 from zmq.utils import jsonapi as json
38
38
39 from IPython.config.application import boolean_flag
39 from IPython.config.application import boolean_flag
40 from IPython.core.profiledir import ProfileDir
40 from IPython.core.profiledir import ProfileDir
41
41
42 from IPython.parallel.apps.baseapp import (
42 from IPython.parallel.apps.baseapp import (
43 BaseParallelApplication,
43 BaseParallelApplication,
44 base_aliases,
44 base_aliases,
45 base_flags,
45 base_flags,
46 )
46 )
47 from IPython.utils.importstring import import_item
47 from IPython.utils.importstring import import_item
48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
48 from IPython.utils.traitlets import Instance, Unicode, Bool, List, Dict
49
49
50 # from IPython.parallel.controller.controller import ControllerFactory
50 # from IPython.parallel.controller.controller import ControllerFactory
51 from IPython.zmq.session import Session
51 from IPython.zmq.session import Session
52 from IPython.parallel.controller.heartmonitor import HeartMonitor
52 from IPython.parallel.controller.heartmonitor import HeartMonitor
53 from IPython.parallel.controller.hub import HubFactory
53 from IPython.parallel.controller.hub import HubFactory
54 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
54 from IPython.parallel.controller.scheduler import TaskScheduler,launch_scheduler
55 from IPython.parallel.controller.sqlitedb import SQLiteDB
55 from IPython.parallel.controller.sqlitedb import SQLiteDB
56
56
57 from IPython.parallel.util import signal_children, split_url, asbytes
57 from IPython.parallel.util import signal_children, split_url, asbytes
58
58
59 # conditional import of MongoDB backend class
59 # conditional import of MongoDB backend class
60
60
61 try:
61 try:
62 from IPython.parallel.controller.mongodb import MongoDB
62 from IPython.parallel.controller.mongodb import MongoDB
63 except ImportError:
63 except ImportError:
64 maybe_mongo = []
64 maybe_mongo = []
65 else:
65 else:
66 maybe_mongo = [MongoDB]
66 maybe_mongo = [MongoDB]
67
67
68
68
69 #-----------------------------------------------------------------------------
69 #-----------------------------------------------------------------------------
70 # Module level variables
70 # Module level variables
71 #-----------------------------------------------------------------------------
71 #-----------------------------------------------------------------------------
72
72
73
73
74 #: The default config file name for this application
74 #: The default config file name for this application
75 default_config_file_name = u'ipcontroller_config.py'
75 default_config_file_name = u'ipcontroller_config.py'
76
76
77
77
78 _description = """Start the IPython controller for parallel computing.
78 _description = """Start the IPython controller for parallel computing.
79
79
80 The IPython controller provides a gateway between the IPython engines and
80 The IPython controller provides a gateway between the IPython engines and
81 clients. The controller needs to be started before the engines and can be
81 clients. The controller needs to be started before the engines and can be
82 configured using command line options or using a cluster directory. Cluster
82 configured using command line options or using a cluster directory. Cluster
83 directories contain config, log and security files and are usually located in
83 directories contain config, log and security files and are usually located in
84 your ipython directory and named as "profile_name". See the `profile`
84 your ipython directory and named as "profile_name". See the `profile`
85 and `profile-dir` options for details.
85 and `profile-dir` options for details.
86 """
86 """
87
87
88 _examples = """
88 _examples = """
89 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
89 ipcontroller --ip=192.168.0.1 --port=1000 # listen on ip, port for engines
90 ipcontroller --scheme=pure # use the pure zeromq scheduler
90 ipcontroller --scheme=pure # use the pure zeromq scheduler
91 """
91 """
92
92
93
93
94 #-----------------------------------------------------------------------------
94 #-----------------------------------------------------------------------------
95 # The main application
95 # The main application
96 #-----------------------------------------------------------------------------
96 #-----------------------------------------------------------------------------
97 flags = {}
97 flags = {}
98 flags.update(base_flags)
98 flags.update(base_flags)
99 flags.update({
99 flags.update({
100 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
100 'usethreads' : ( {'IPControllerApp' : {'use_threads' : True}},
101 'Use threads instead of processes for the schedulers'),
101 'Use threads instead of processes for the schedulers'),
102 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
102 'sqlitedb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.sqlitedb.SQLiteDB'}},
103 'use the SQLiteDB backend'),
103 'use the SQLiteDB backend'),
104 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
104 'mongodb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.mongodb.MongoDB'}},
105 'use the MongoDB backend'),
105 'use the MongoDB backend'),
106 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
106 'dictdb' : ({'HubFactory' : {'db_class' : 'IPython.parallel.controller.dictdb.DictDB'}},
107 'use the in-memory DictDB backend'),
107 'use the in-memory DictDB backend'),
108 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
108 'reuse' : ({'IPControllerApp' : {'reuse_files' : True}},
109 'reuse existing json connection files')
109 'reuse existing json connection files')
110 })
110 })
111
111
112 flags.update(boolean_flag('secure', 'IPControllerApp.secure',
112 flags.update(boolean_flag('secure', 'IPControllerApp.secure',
113 "Use HMAC digests for authentication of messages.",
113 "Use HMAC digests for authentication of messages.",
114 "Don't authenticate messages."
114 "Don't authenticate messages."
115 ))
115 ))
116 aliases = dict(
116 aliases = dict(
117 secure = 'IPControllerApp.secure',
117 secure = 'IPControllerApp.secure',
118 ssh = 'IPControllerApp.ssh_server',
118 ssh = 'IPControllerApp.ssh_server',
119 enginessh = 'IPControllerApp.engine_ssh_server',
119 enginessh = 'IPControllerApp.engine_ssh_server',
120 location = 'IPControllerApp.location',
120 location = 'IPControllerApp.location',
121
121
122 ident = 'Session.session',
122 ident = 'Session.session',
123 user = 'Session.username',
123 user = 'Session.username',
124 keyfile = 'Session.keyfile',
124 keyfile = 'Session.keyfile',
125
125
126 url = 'HubFactory.url',
126 url = 'HubFactory.url',
127 ip = 'HubFactory.ip',
127 ip = 'HubFactory.ip',
128 transport = 'HubFactory.transport',
128 transport = 'HubFactory.transport',
129 port = 'HubFactory.regport',
129 port = 'HubFactory.regport',
130
130
131 ping = 'HeartMonitor.period',
131 ping = 'HeartMonitor.period',
132
132
133 scheme = 'TaskScheduler.scheme_name',
133 scheme = 'TaskScheduler.scheme_name',
134 hwm = 'TaskScheduler.hwm',
134 hwm = 'TaskScheduler.hwm',
135 )
135 )
136 aliases.update(base_aliases)
136 aliases.update(base_aliases)
137
137
138
138
139 class IPControllerApp(BaseParallelApplication):
139 class IPControllerApp(BaseParallelApplication):
140
140
141 name = u'ipcontroller'
141 name = u'ipcontroller'
142 description = _description
142 description = _description
143 examples = _examples
143 examples = _examples
144 config_file_name = Unicode(default_config_file_name)
144 config_file_name = Unicode(default_config_file_name)
145 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
145 classes = [ProfileDir, Session, HubFactory, TaskScheduler, HeartMonitor, SQLiteDB] + maybe_mongo
146
146
147 # change default to True
147 # change default to True
148 auto_create = Bool(True, config=True,
148 auto_create = Bool(True, config=True,
149 help="""Whether to create profile dir if it doesn't exist.""")
149 help="""Whether to create profile dir if it doesn't exist.""")
150
150
151 reuse_files = Bool(False, config=True,
151 reuse_files = Bool(False, config=True,
152 help='Whether to reuse existing json connection files.'
152 help='Whether to reuse existing json connection files.'
153 )
153 )
154 secure = Bool(True, config=True,
154 secure = Bool(True, config=True,
155 help='Whether to use HMAC digests for extra message authentication.'
155 help='Whether to use HMAC digests for extra message authentication.'
156 )
156 )
157 ssh_server = Unicode(u'', config=True,
157 ssh_server = Unicode(u'', config=True,
158 help="""ssh url for clients to use when connecting to the Controller
158 help="""ssh url for clients to use when connecting to the Controller
159 processes. It should be of the form: [user@]server[:port]. The
159 processes. It should be of the form: [user@]server[:port]. The
160 Controller's listening addresses must be accessible from the ssh server""",
160 Controller's listening addresses must be accessible from the ssh server""",
161 )
161 )
162 engine_ssh_server = Unicode(u'', config=True,
162 engine_ssh_server = Unicode(u'', config=True,
163 help="""ssh url for engines to use when connecting to the Controller
163 help="""ssh url for engines to use when connecting to the Controller
164 processes. It should be of the form: [user@]server[:port]. The
164 processes. It should be of the form: [user@]server[:port]. The
165 Controller's listening addresses must be accessible from the ssh server""",
165 Controller's listening addresses must be accessible from the ssh server""",
166 )
166 )
167 location = Unicode(u'', config=True,
167 location = Unicode(u'', config=True,
168 help="""The external IP or domain name of the Controller, used for disambiguating
168 help="""The external IP or domain name of the Controller, used for disambiguating
169 engine and client connections.""",
169 engine and client connections.""",
170 )
170 )
171 import_statements = List([], config=True,
171 import_statements = List([], config=True,
172 help="import statements to be run at startup. Necessary in some environments"
172 help="import statements to be run at startup. Necessary in some environments"
173 )
173 )
174
174
175 use_threads = Bool(False, config=True,
175 use_threads = Bool(False, config=True,
176 help='Use threads instead of processes for the schedulers',
176 help='Use threads instead of processes for the schedulers',
177 )
177 )
178
179 engine_json_file = Unicode('ipcontroller-engine.json', config=True,
180 help="JSON filename where engine connection info will be stored.")
181 client_json_file = Unicode('ipcontroller-client.json', config=True,
182 help="JSON filename where client connection info will be stored.")
183
184 def _cluster_id_changed(self, name, old, new):
185 super(IPControllerApp, self)._cluster_id_changed(name, old, new)
186 self.engine_json_file = "%s-engine.json" % self.name
187 self.client_json_file = "%s-client.json" % self.name
188
178
189
179 # internal
190 # internal
180 children = List()
191 children = List()
181 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
192 mq_class = Unicode('zmq.devices.ProcessMonitoredQueue')
182
193
183 def _use_threads_changed(self, name, old, new):
194 def _use_threads_changed(self, name, old, new):
184 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
195 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
185
196
186 aliases = Dict(aliases)
197 aliases = Dict(aliases)
187 flags = Dict(flags)
198 flags = Dict(flags)
188
199
189
200
190 def save_connection_dict(self, fname, cdict):
201 def save_connection_dict(self, fname, cdict):
191 """save a connection dict to json file."""
202 """save a connection dict to json file."""
192 c = self.config
203 c = self.config
193 url = cdict['url']
204 url = cdict['url']
194 location = cdict['location']
205 location = cdict['location']
195 if not location:
206 if not location:
196 try:
207 try:
197 proto,ip,port = split_url(url)
208 proto,ip,port = split_url(url)
198 except AssertionError:
209 except AssertionError:
199 pass
210 pass
200 else:
211 else:
201 try:
212 try:
202 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
213 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
203 except (socket.gaierror, IndexError):
214 except (socket.gaierror, IndexError):
204 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
215 self.log.warn("Could not identify this machine's IP, assuming 127.0.0.1."
205 " You may need to specify '--location=<external_ip_address>' to help"
216 " You may need to specify '--location=<external_ip_address>' to help"
206 " IPython decide when to connect via loopback.")
217 " IPython decide when to connect via loopback.")
207 location = '127.0.0.1'
218 location = '127.0.0.1'
208 cdict['location'] = location
219 cdict['location'] = location
209 fname = os.path.join(self.profile_dir.security_dir, fname)
220 fname = os.path.join(self.profile_dir.security_dir, fname)
210 with open(fname, 'wb') as f:
221 with open(fname, 'wb') as f:
211 f.write(json.dumps(cdict, indent=2))
222 f.write(json.dumps(cdict, indent=2))
212 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
223 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
213
224
214 def load_config_from_json(self):
225 def load_config_from_json(self):
215 """load config from existing json connector files."""
226 """load config from existing json connector files."""
216 c = self.config
227 c = self.config
217 # load from engine config
228 # load from engine config
218 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-engine.json')) as f:
229 with open(os.path.join(self.profile_dir.security_dir, self.engine_json_file)) as f:
219 cfg = json.loads(f.read())
230 cfg = json.loads(f.read())
220 key = c.Session.key = asbytes(cfg['exec_key'])
231 key = c.Session.key = asbytes(cfg['exec_key'])
221 xport,addr = cfg['url'].split('://')
232 xport,addr = cfg['url'].split('://')
222 c.HubFactory.engine_transport = xport
233 c.HubFactory.engine_transport = xport
223 ip,ports = addr.split(':')
234 ip,ports = addr.split(':')
224 c.HubFactory.engine_ip = ip
235 c.HubFactory.engine_ip = ip
225 c.HubFactory.regport = int(ports)
236 c.HubFactory.regport = int(ports)
226 self.location = cfg['location']
237 self.location = cfg['location']
227 if not self.engine_ssh_server:
238 if not self.engine_ssh_server:
228 self.engine_ssh_server = cfg['ssh']
239 self.engine_ssh_server = cfg['ssh']
229 # load client config
240 # load client config
230 with open(os.path.join(self.profile_dir.security_dir, 'ipcontroller-client.json')) as f:
241 with open(os.path.join(self.profile_dir.security_dir, self.client_json_file)) as f:
231 cfg = json.loads(f.read())
242 cfg = json.loads(f.read())
232 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
243 assert key == cfg['exec_key'], "exec_key mismatch between engine and client keys"
233 xport,addr = cfg['url'].split('://')
244 xport,addr = cfg['url'].split('://')
234 c.HubFactory.client_transport = xport
245 c.HubFactory.client_transport = xport
235 ip,ports = addr.split(':')
246 ip,ports = addr.split(':')
236 c.HubFactory.client_ip = ip
247 c.HubFactory.client_ip = ip
237 if not self.ssh_server:
248 if not self.ssh_server:
238 self.ssh_server = cfg['ssh']
249 self.ssh_server = cfg['ssh']
239 assert int(ports) == c.HubFactory.regport, "regport mismatch"
250 assert int(ports) == c.HubFactory.regport, "regport mismatch"
240
251
241 def init_hub(self):
252 def init_hub(self):
242 c = self.config
253 c = self.config
243
254
244 self.do_import_statements()
255 self.do_import_statements()
245 reusing = self.reuse_files
256 reusing = self.reuse_files
246 if reusing:
257 if reusing:
247 try:
258 try:
248 self.load_config_from_json()
259 self.load_config_from_json()
249 except (AssertionError,IOError):
260 except (AssertionError,IOError):
250 reusing=False
261 reusing=False
251 # check again, because reusing may have failed:
262 # check again, because reusing may have failed:
252 if reusing:
263 if reusing:
253 pass
264 pass
254 elif self.secure:
265 elif self.secure:
255 key = str(uuid.uuid4())
266 key = str(uuid.uuid4())
256 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
267 # keyfile = os.path.join(self.profile_dir.security_dir, self.exec_key)
257 # with open(keyfile, 'w') as f:
268 # with open(keyfile, 'w') as f:
258 # f.write(key)
269 # f.write(key)
259 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
270 # os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
260 c.Session.key = asbytes(key)
271 c.Session.key = asbytes(key)
261 else:
272 else:
262 key = c.Session.key = b''
273 key = c.Session.key = b''
263
274
264 try:
275 try:
265 self.factory = HubFactory(config=c, log=self.log)
276 self.factory = HubFactory(config=c, log=self.log)
266 # self.start_logging()
277 # self.start_logging()
267 self.factory.init_hub()
278 self.factory.init_hub()
268 except:
279 except:
269 self.log.error("Couldn't construct the Controller", exc_info=True)
280 self.log.error("Couldn't construct the Controller", exc_info=True)
270 self.exit(1)
281 self.exit(1)
271
282
272 if not reusing:
283 if not reusing:
273 # save to new json config files
284 # save to new json config files
274 f = self.factory
285 f = self.factory
275 cdict = {'exec_key' : key,
286 cdict = {'exec_key' : key,
276 'ssh' : self.ssh_server,
287 'ssh' : self.ssh_server,
277 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
288 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
278 'location' : self.location
289 'location' : self.location
279 }
290 }
280 self.save_connection_dict('ipcontroller-client.json', cdict)
291 self.save_connection_dict(self.client_json_file, cdict)
281 edict = cdict
292 edict = cdict
282 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
293 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
283 edict['ssh'] = self.engine_ssh_server
294 edict['ssh'] = self.engine_ssh_server
284 self.save_connection_dict('ipcontroller-engine.json', edict)
295 self.save_connection_dict(self.engine_json_file, edict)
285
296
286 #
297 #
287 def init_schedulers(self):
298 def init_schedulers(self):
288 children = self.children
299 children = self.children
289 mq = import_item(str(self.mq_class))
300 mq = import_item(str(self.mq_class))
290
301
291 hub = self.factory
302 hub = self.factory
292 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
303 # maybe_inproc = 'inproc://monitor' if self.use_threads else self.monitor_url
293 # IOPub relay (in a Process)
304 # IOPub relay (in a Process)
294 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
305 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, b'N/A',b'iopub')
295 q.bind_in(hub.client_info['iopub'])
306 q.bind_in(hub.client_info['iopub'])
296 q.bind_out(hub.engine_info['iopub'])
307 q.bind_out(hub.engine_info['iopub'])
297 q.setsockopt_out(zmq.SUBSCRIBE, b'')
308 q.setsockopt_out(zmq.SUBSCRIBE, b'')
298 q.connect_mon(hub.monitor_url)
309 q.connect_mon(hub.monitor_url)
299 q.daemon=True
310 q.daemon=True
300 children.append(q)
311 children.append(q)
301
312
302 # Multiplexer Queue (in a Process)
313 # Multiplexer Queue (in a Process)
303 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
314 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
304 q.bind_in(hub.client_info['mux'])
315 q.bind_in(hub.client_info['mux'])
305 q.setsockopt_in(zmq.IDENTITY, b'mux')
316 q.setsockopt_in(zmq.IDENTITY, b'mux')
306 q.bind_out(hub.engine_info['mux'])
317 q.bind_out(hub.engine_info['mux'])
307 q.connect_mon(hub.monitor_url)
318 q.connect_mon(hub.monitor_url)
308 q.daemon=True
319 q.daemon=True
309 children.append(q)
320 children.append(q)
310
321
311 # Control Queue (in a Process)
322 # Control Queue (in a Process)
312 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
323 q = mq(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'incontrol', b'outcontrol')
313 q.bind_in(hub.client_info['control'])
324 q.bind_in(hub.client_info['control'])
314 q.setsockopt_in(zmq.IDENTITY, b'control')
325 q.setsockopt_in(zmq.IDENTITY, b'control')
315 q.bind_out(hub.engine_info['control'])
326 q.bind_out(hub.engine_info['control'])
316 q.connect_mon(hub.monitor_url)
327 q.connect_mon(hub.monitor_url)
317 q.daemon=True
328 q.daemon=True
318 children.append(q)
329 children.append(q)
319 try:
330 try:
320 scheme = self.config.TaskScheduler.scheme_name
331 scheme = self.config.TaskScheduler.scheme_name
321 except AttributeError:
332 except AttributeError:
322 scheme = TaskScheduler.scheme_name.get_default_value()
333 scheme = TaskScheduler.scheme_name.get_default_value()
323 # Task Queue (in a Process)
334 # Task Queue (in a Process)
324 if scheme == 'pure':
335 if scheme == 'pure':
325 self.log.warn("task::using pure XREQ Task scheduler")
336 self.log.warn("task::using pure XREQ Task scheduler")
326 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
337 q = mq(zmq.ROUTER, zmq.DEALER, zmq.PUB, b'intask', b'outtask')
327 # q.setsockopt_out(zmq.HWM, hub.hwm)
338 # q.setsockopt_out(zmq.HWM, hub.hwm)
328 q.bind_in(hub.client_info['task'][1])
339 q.bind_in(hub.client_info['task'][1])
329 q.setsockopt_in(zmq.IDENTITY, b'task')
340 q.setsockopt_in(zmq.IDENTITY, b'task')
330 q.bind_out(hub.engine_info['task'])
341 q.bind_out(hub.engine_info['task'])
331 q.connect_mon(hub.monitor_url)
342 q.connect_mon(hub.monitor_url)
332 q.daemon=True
343 q.daemon=True
333 children.append(q)
344 children.append(q)
334 elif scheme == 'none':
345 elif scheme == 'none':
335 self.log.warn("task::using no Task scheduler")
346 self.log.warn("task::using no Task scheduler")
336
347
337 else:
348 else:
338 self.log.info("task::using Python %s Task scheduler"%scheme)
349 self.log.info("task::using Python %s Task scheduler"%scheme)
339 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
350 sargs = (hub.client_info['task'][1], hub.engine_info['task'],
340 hub.monitor_url, hub.client_info['notification'])
351 hub.monitor_url, hub.client_info['notification'])
341 kwargs = dict(logname='scheduler', loglevel=self.log_level,
352 kwargs = dict(logname='scheduler', loglevel=self.log_level,
342 log_url = self.log_url, config=dict(self.config))
353 log_url = self.log_url, config=dict(self.config))
343 if 'Process' in self.mq_class:
354 if 'Process' in self.mq_class:
344 # run the Python scheduler in a Process
355 # run the Python scheduler in a Process
345 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
356 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
346 q.daemon=True
357 q.daemon=True
347 children.append(q)
358 children.append(q)
348 else:
359 else:
349 # single-threaded Controller
360 # single-threaded Controller
350 kwargs['in_thread'] = True
361 kwargs['in_thread'] = True
351 launch_scheduler(*sargs, **kwargs)
362 launch_scheduler(*sargs, **kwargs)
352
363
353
364
354 def save_urls(self):
365 def save_urls(self):
355 """save the registration urls to files."""
366 """save the registration urls to files."""
356 c = self.config
367 c = self.config
357
368
358 sec_dir = self.profile_dir.security_dir
369 sec_dir = self.profile_dir.security_dir
359 cf = self.factory
370 cf = self.factory
360
371
361 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
372 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
362 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
373 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
363
374
364 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
375 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
365 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
376 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
366
377
367
378
368 def do_import_statements(self):
379 def do_import_statements(self):
369 statements = self.import_statements
380 statements = self.import_statements
370 for s in statements:
381 for s in statements:
371 try:
382 try:
372 self.log.msg("Executing statement: '%s'" % s)
383 self.log.msg("Executing statement: '%s'" % s)
373 exec s in globals(), locals()
384 exec s in globals(), locals()
374 except:
385 except:
375 self.log.msg("Error running statement: %s" % s)
386 self.log.msg("Error running statement: %s" % s)
376
387
377 def forward_logging(self):
388 def forward_logging(self):
378 if self.log_url:
389 if self.log_url:
379 self.log.info("Forwarding logging to %s"%self.log_url)
390 self.log.info("Forwarding logging to %s"%self.log_url)
380 context = zmq.Context.instance()
391 context = zmq.Context.instance()
381 lsock = context.socket(zmq.PUB)
392 lsock = context.socket(zmq.PUB)
382 lsock.connect(self.log_url)
393 lsock.connect(self.log_url)
383 handler = PUBHandler(lsock)
394 handler = PUBHandler(lsock)
384 self.log.removeHandler(self._log_handler)
395 self.log.removeHandler(self._log_handler)
385 handler.root_topic = 'controller'
396 handler.root_topic = 'controller'
386 handler.setLevel(self.log_level)
397 handler.setLevel(self.log_level)
387 self.log.addHandler(handler)
398 self.log.addHandler(handler)
388 self._log_handler = handler
399 self._log_handler = handler
389 # #
400 # #
390
401
391 def initialize(self, argv=None):
402 def initialize(self, argv=None):
392 super(IPControllerApp, self).initialize(argv)
403 super(IPControllerApp, self).initialize(argv)
393 self.forward_logging()
404 self.forward_logging()
394 self.init_hub()
405 self.init_hub()
395 self.init_schedulers()
406 self.init_schedulers()
396
407
397 def start(self):
408 def start(self):
398 # Start the subprocesses:
409 # Start the subprocesses:
399 self.factory.start()
410 self.factory.start()
400 child_procs = []
411 child_procs = []
401 for child in self.children:
412 for child in self.children:
402 child.start()
413 child.start()
403 if isinstance(child, ProcessMonitoredQueue):
414 if isinstance(child, ProcessMonitoredQueue):
404 child_procs.append(child.launcher)
415 child_procs.append(child.launcher)
405 elif isinstance(child, Process):
416 elif isinstance(child, Process):
406 child_procs.append(child)
417 child_procs.append(child)
407 if child_procs:
418 if child_procs:
408 signal_children(child_procs)
419 signal_children(child_procs)
409
420
410 self.write_pid_file(overwrite=True)
421 self.write_pid_file(overwrite=True)
411
422
412 try:
423 try:
413 self.factory.loop.start()
424 self.factory.loop.start()
414 except KeyboardInterrupt:
425 except KeyboardInterrupt:
415 self.log.critical("Interrupted, Exiting...\n")
426 self.log.critical("Interrupted, Exiting...\n")
416
427
417
428
418
429
419 def launch_new_instance():
430 def launch_new_instance():
420 """Create and run the IPython controller"""
431 """Create and run the IPython controller"""
421 if sys.platform == 'win32':
432 if sys.platform == 'win32':
422 # make sure we don't get called from a multiprocessing subprocess
433 # make sure we don't get called from a multiprocessing subprocess
423 # this can result in infinite Controllers being started on Windows
434 # this can result in infinite Controllers being started on Windows
424 # which doesn't have a proper fork, so multiprocessing is wonky
435 # which doesn't have a proper fork, so multiprocessing is wonky
425
436
426 # this only comes up when IPython has been installed using vanilla
437 # this only comes up when IPython has been installed using vanilla
427 # setuptools, and *not* distribute.
438 # setuptools, and *not* distribute.
428 import multiprocessing
439 import multiprocessing
429 p = multiprocessing.current_process()
440 p = multiprocessing.current_process()
430 # the main process has name 'MainProcess'
441 # the main process has name 'MainProcess'
431 # subprocesses will have names like 'Process-1'
442 # subprocesses will have names like 'Process-1'
432 if p.name != 'MainProcess':
443 if p.name != 'MainProcess':
433 # we are a subprocess, don't start another Controller!
444 # we are a subprocess, don't start another Controller!
434 return
445 return
435 app = IPControllerApp.instance()
446 app = IPControllerApp.instance()
436 app.initialize()
447 app.initialize()
437 app.start()
448 app.start()
438
449
439
450
440 if __name__ == '__main__':
451 if __name__ == '__main__':
441 launch_new_instance()
452 launch_new_instance()
@@ -1,336 +1,344 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython engine application
4 The IPython engine application
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * MinRK
9 * MinRK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 import json
24 import json
25 import os
25 import os
26 import sys
26 import sys
27 import time
27 import time
28
28
29 import zmq
29 import zmq
30 from zmq.eventloop import ioloop
30 from zmq.eventloop import ioloop
31
31
32 from IPython.core.profiledir import ProfileDir
32 from IPython.core.profiledir import ProfileDir
33 from IPython.parallel.apps.baseapp import (
33 from IPython.parallel.apps.baseapp import (
34 BaseParallelApplication,
34 BaseParallelApplication,
35 base_aliases,
35 base_aliases,
36 base_flags,
36 base_flags,
37 )
37 )
38 from IPython.zmq.log import EnginePUBHandler
38 from IPython.zmq.log import EnginePUBHandler
39
39
40 from IPython.config.configurable import Configurable
40 from IPython.config.configurable import Configurable
41 from IPython.zmq.session import Session
41 from IPython.zmq.session import Session
42 from IPython.parallel.engine.engine import EngineFactory
42 from IPython.parallel.engine.engine import EngineFactory
43 from IPython.parallel.engine.streamkernel import Kernel
43 from IPython.parallel.engine.streamkernel import Kernel
44 from IPython.parallel.util import disambiguate_url, asbytes
44 from IPython.parallel.util import disambiguate_url, asbytes
45
45
46 from IPython.utils.importstring import import_item
46 from IPython.utils.importstring import import_item
47 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
47 from IPython.utils.traitlets import Bool, Unicode, Dict, List, Float
48
48
49
49
50 #-----------------------------------------------------------------------------
50 #-----------------------------------------------------------------------------
51 # Module level variables
51 # Module level variables
52 #-----------------------------------------------------------------------------
52 #-----------------------------------------------------------------------------
53
53
54 #: The default config file name for this application
54 #: The default config file name for this application
55 default_config_file_name = u'ipengine_config.py'
55 default_config_file_name = u'ipengine_config.py'
56
56
57 _description = """Start an IPython engine for parallel computing.
57 _description = """Start an IPython engine for parallel computing.
58
58
59 IPython engines run in parallel and perform computations on behalf of a client
59 IPython engines run in parallel and perform computations on behalf of a client
60 and controller. A controller needs to be started before the engines. The
60 and controller. A controller needs to be started before the engines. The
61 engine can be configured using command line options or using a cluster
61 engine can be configured using command line options or using a cluster
62 directory. Cluster directories contain config, log and security files and are
62 directory. Cluster directories contain config, log and security files and are
63 usually located in your ipython directory and named as "profile_name".
63 usually located in your ipython directory and named as "profile_name".
64 See the `profile` and `profile-dir` options for details.
64 See the `profile` and `profile-dir` options for details.
65 """
65 """
66
66
67 _examples = """
67 _examples = """
68 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
68 ipengine --ip=192.168.0.1 --port=1000 # connect to hub at ip and port
69 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
69 ipengine --log-to-file --log-level=DEBUG # log to a file with DEBUG verbosity
70 """
70 """
71
71
72 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
73 # MPI configuration
73 # MPI configuration
74 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
75
75
76 mpi4py_init = """from mpi4py import MPI as mpi
76 mpi4py_init = """from mpi4py import MPI as mpi
77 mpi.size = mpi.COMM_WORLD.Get_size()
77 mpi.size = mpi.COMM_WORLD.Get_size()
78 mpi.rank = mpi.COMM_WORLD.Get_rank()
78 mpi.rank = mpi.COMM_WORLD.Get_rank()
79 """
79 """
80
80
81
81
82 pytrilinos_init = """from PyTrilinos import Epetra
82 pytrilinos_init = """from PyTrilinos import Epetra
83 class SimpleStruct:
83 class SimpleStruct:
84 pass
84 pass
85 mpi = SimpleStruct()
85 mpi = SimpleStruct()
86 mpi.rank = 0
86 mpi.rank = 0
87 mpi.size = 0
87 mpi.size = 0
88 """
88 """
89
89
90 class MPI(Configurable):
90 class MPI(Configurable):
91 """Configurable for MPI initialization"""
91 """Configurable for MPI initialization"""
92 use = Unicode('', config=True,
92 use = Unicode('', config=True,
93 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
93 help='How to enable MPI (mpi4py, pytrilinos, or empty string to disable).'
94 )
94 )
95
95
96 def _on_use_changed(self, old, new):
96 def _use_changed(self, name, old, new):
97 # load default init script if it's not set
97 # load default init script if it's not set
98 if not self.init_script:
98 if not self.init_script:
99 self.init_script = self.default_inits.get(new, '')
99 self.init_script = self.default_inits.get(new, '')
100
100
101 init_script = Unicode('', config=True,
101 init_script = Unicode('', config=True,
102 help="Initialization code for MPI")
102 help="Initialization code for MPI")
103
103
104 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
104 default_inits = Dict({'mpi4py' : mpi4py_init, 'pytrilinos':pytrilinos_init},
105 config=True)
105 config=True)
106
106
107
107
108 #-----------------------------------------------------------------------------
108 #-----------------------------------------------------------------------------
109 # Main application
109 # Main application
110 #-----------------------------------------------------------------------------
110 #-----------------------------------------------------------------------------
111 aliases = dict(
111 aliases = dict(
112 file = 'IPEngineApp.url_file',
112 file = 'IPEngineApp.url_file',
113 c = 'IPEngineApp.startup_command',
113 c = 'IPEngineApp.startup_command',
114 s = 'IPEngineApp.startup_script',
114 s = 'IPEngineApp.startup_script',
115
115
116 ident = 'Session.session',
116 ident = 'Session.session',
117 user = 'Session.username',
117 user = 'Session.username',
118 keyfile = 'Session.keyfile',
118 keyfile = 'Session.keyfile',
119
119
120 url = 'EngineFactory.url',
120 url = 'EngineFactory.url',
121 ssh = 'EngineFactory.sshserver',
121 ssh = 'EngineFactory.sshserver',
122 sshkey = 'EngineFactory.sshkey',
122 sshkey = 'EngineFactory.sshkey',
123 ip = 'EngineFactory.ip',
123 ip = 'EngineFactory.ip',
124 transport = 'EngineFactory.transport',
124 transport = 'EngineFactory.transport',
125 port = 'EngineFactory.regport',
125 port = 'EngineFactory.regport',
126 location = 'EngineFactory.location',
126 location = 'EngineFactory.location',
127
127
128 timeout = 'EngineFactory.timeout',
128 timeout = 'EngineFactory.timeout',
129
129
130 mpi = 'MPI.use',
130 mpi = 'MPI.use',
131
131
132 )
132 )
133 aliases.update(base_aliases)
133 aliases.update(base_aliases)
134
134
135
135
136 class IPEngineApp(BaseParallelApplication):
136 class IPEngineApp(BaseParallelApplication):
137
137
138 name = Unicode(u'ipengine')
138 name = 'ipengine'
139 description = Unicode(_description)
139 description = _description
140 examples = _examples
140 examples = _examples
141 config_file_name = Unicode(default_config_file_name)
141 config_file_name = Unicode(default_config_file_name)
142 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
142 classes = List([ProfileDir, Session, EngineFactory, Kernel, MPI])
143
143
144 startup_script = Unicode(u'', config=True,
144 startup_script = Unicode(u'', config=True,
145 help='specify a script to be run at startup')
145 help='specify a script to be run at startup')
146 startup_command = Unicode('', config=True,
146 startup_command = Unicode('', config=True,
147 help='specify a command to be run at startup')
147 help='specify a command to be run at startup')
148
148
149 url_file = Unicode(u'', config=True,
149 url_file = Unicode(u'', config=True,
150 help="""The full location of the file containing the connection information for
150 help="""The full location of the file containing the connection information for
151 the controller. If this is not given, the file must be in the
151 the controller. If this is not given, the file must be in the
152 security directory of the cluster directory. This location is
152 security directory of the cluster directory. This location is
153 resolved using the `profile` or `profile_dir` options.""",
153 resolved using the `profile` or `profile_dir` options.""",
154 )
154 )
155 wait_for_url_file = Float(5, config=True,
155 wait_for_url_file = Float(5, config=True,
156 help="""The maximum number of seconds to wait for url_file to exist.
156 help="""The maximum number of seconds to wait for url_file to exist.
157 This is useful for batch-systems and shared-filesystems where the
157 This is useful for batch-systems and shared-filesystems where the
158 controller and engine are started at the same time and it
158 controller and engine are started at the same time and it
159 may take a moment for the controller to write the connector files.""")
159 may take a moment for the controller to write the connector files.""")
160
160
161 url_file_name = Unicode(u'ipcontroller-engine.json')
161 url_file_name = Unicode(u'ipcontroller-engine.json', config=True)
162
163 def _cluster_id_changed(self, name, old, new):
164 if new:
165 base = 'ipcontroller-%s' % new
166 else:
167 base = 'ipcontroller'
168 self.url_file_name = "%s-engine.json" % base
169
162 log_url = Unicode('', config=True,
170 log_url = Unicode('', config=True,
163 help="""The URL for the iploggerapp instance, for forwarding
171 help="""The URL for the iploggerapp instance, for forwarding
164 logging to a central location.""")
172 logging to a central location.""")
165
173
166 aliases = Dict(aliases)
174 aliases = Dict(aliases)
167
175
168 # def find_key_file(self):
176 # def find_key_file(self):
169 # """Set the key file.
177 # """Set the key file.
170 #
178 #
171 # Here we don't try to actually see if it exists for is valid as that
179 # Here we don't try to actually see if it exists for is valid as that
172 # is hadled by the connection logic.
180 # is hadled by the connection logic.
173 # """
181 # """
174 # config = self.master_config
182 # config = self.master_config
175 # # Find the actual controller key file
183 # # Find the actual controller key file
176 # if not config.Global.key_file:
184 # if not config.Global.key_file:
177 # try_this = os.path.join(
185 # try_this = os.path.join(
178 # config.Global.profile_dir,
186 # config.Global.profile_dir,
179 # config.Global.security_dir,
187 # config.Global.security_dir,
180 # config.Global.key_file_name
188 # config.Global.key_file_name
181 # )
189 # )
182 # config.Global.key_file = try_this
190 # config.Global.key_file = try_this
183
191
184 def find_url_file(self):
192 def find_url_file(self):
185 """Set the url file.
193 """Set the url file.
186
194
187 Here we don't try to actually see if it exists for is valid as that
195 Here we don't try to actually see if it exists for is valid as that
188 is hadled by the connection logic.
196 is hadled by the connection logic.
189 """
197 """
190 config = self.config
198 config = self.config
191 # Find the actual controller key file
199 # Find the actual controller key file
192 if not self.url_file:
200 if not self.url_file:
193 self.url_file = os.path.join(
201 self.url_file = os.path.join(
194 self.profile_dir.security_dir,
202 self.profile_dir.security_dir,
195 self.url_file_name
203 self.url_file_name
196 )
204 )
197
205
198 def load_connector_file(self):
206 def load_connector_file(self):
199 """load config from a JSON connector file,
207 """load config from a JSON connector file,
200 at a *lower* priority than command-line/config files.
208 at a *lower* priority than command-line/config files.
201 """
209 """
202
210
203 self.log.info("Loading url_file %r"%self.url_file)
211 self.log.info("Loading url_file %r"%self.url_file)
204 config = self.config
212 config = self.config
205
213
206 with open(self.url_file) as f:
214 with open(self.url_file) as f:
207 d = json.loads(f.read())
215 d = json.loads(f.read())
208
216
209 try:
217 try:
210 config.Session.key
218 config.Session.key
211 except AttributeError:
219 except AttributeError:
212 if d['exec_key']:
220 if d['exec_key']:
213 config.Session.key = asbytes(d['exec_key'])
221 config.Session.key = asbytes(d['exec_key'])
214
222
215 try:
223 try:
216 config.EngineFactory.location
224 config.EngineFactory.location
217 except AttributeError:
225 except AttributeError:
218 config.EngineFactory.location = d['location']
226 config.EngineFactory.location = d['location']
219
227
220 d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
228 d['url'] = disambiguate_url(d['url'], config.EngineFactory.location)
221 try:
229 try:
222 config.EngineFactory.url
230 config.EngineFactory.url
223 except AttributeError:
231 except AttributeError:
224 config.EngineFactory.url = d['url']
232 config.EngineFactory.url = d['url']
225
233
226 try:
234 try:
227 config.EngineFactory.sshserver
235 config.EngineFactory.sshserver
228 except AttributeError:
236 except AttributeError:
229 config.EngineFactory.sshserver = d['ssh']
237 config.EngineFactory.sshserver = d['ssh']
230
238
231 def init_engine(self):
239 def init_engine(self):
232 # This is the working dir by now.
240 # This is the working dir by now.
233 sys.path.insert(0, '')
241 sys.path.insert(0, '')
234 config = self.config
242 config = self.config
235 # print config
243 # print config
236 self.find_url_file()
244 self.find_url_file()
237
245
238 # was the url manually specified?
246 # was the url manually specified?
239 keys = set(self.config.EngineFactory.keys())
247 keys = set(self.config.EngineFactory.keys())
240 keys = keys.union(set(self.config.RegistrationFactory.keys()))
248 keys = keys.union(set(self.config.RegistrationFactory.keys()))
241
249
242 if keys.intersection(set(['ip', 'url', 'port'])):
250 if keys.intersection(set(['ip', 'url', 'port'])):
243 # Connection info was specified, don't wait for the file
251 # Connection info was specified, don't wait for the file
244 url_specified = True
252 url_specified = True
245 self.wait_for_url_file = 0
253 self.wait_for_url_file = 0
246 else:
254 else:
247 url_specified = False
255 url_specified = False
248
256
249 if self.wait_for_url_file and not os.path.exists(self.url_file):
257 if self.wait_for_url_file and not os.path.exists(self.url_file):
250 self.log.warn("url_file %r not found"%self.url_file)
258 self.log.warn("url_file %r not found"%self.url_file)
251 self.log.warn("Waiting up to %.1f seconds for it to arrive."%self.wait_for_url_file)
259 self.log.warn("Waiting up to %.1f seconds for it to arrive."%self.wait_for_url_file)
252 tic = time.time()
260 tic = time.time()
253 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
261 while not os.path.exists(self.url_file) and (time.time()-tic < self.wait_for_url_file):
254 # wait for url_file to exist, for up to 10 seconds
262 # wait for url_file to exist, for up to 10 seconds
255 time.sleep(0.1)
263 time.sleep(0.1)
256
264
257 if os.path.exists(self.url_file):
265 if os.path.exists(self.url_file):
258 self.load_connector_file()
266 self.load_connector_file()
259 elif not url_specified:
267 elif not url_specified:
260 self.log.critical("Fatal: url file never arrived: %s"%self.url_file)
268 self.log.critical("Fatal: url file never arrived: %s"%self.url_file)
261 self.exit(1)
269 self.exit(1)
262
270
263
271
264 try:
272 try:
265 exec_lines = config.Kernel.exec_lines
273 exec_lines = config.Kernel.exec_lines
266 except AttributeError:
274 except AttributeError:
267 config.Kernel.exec_lines = []
275 config.Kernel.exec_lines = []
268 exec_lines = config.Kernel.exec_lines
276 exec_lines = config.Kernel.exec_lines
269
277
270 if self.startup_script:
278 if self.startup_script:
271 enc = sys.getfilesystemencoding() or 'utf8'
279 enc = sys.getfilesystemencoding() or 'utf8'
272 cmd="execfile(%r)"%self.startup_script.encode(enc)
280 cmd="execfile(%r)"%self.startup_script.encode(enc)
273 exec_lines.append(cmd)
281 exec_lines.append(cmd)
274 if self.startup_command:
282 if self.startup_command:
275 exec_lines.append(self.startup_command)
283 exec_lines.append(self.startup_command)
276
284
277 # Create the underlying shell class and Engine
285 # Create the underlying shell class and Engine
278 # shell_class = import_item(self.master_config.Global.shell_class)
286 # shell_class = import_item(self.master_config.Global.shell_class)
279 # print self.config
287 # print self.config
280 try:
288 try:
281 self.engine = EngineFactory(config=config, log=self.log)
289 self.engine = EngineFactory(config=config, log=self.log)
282 except:
290 except:
283 self.log.error("Couldn't start the Engine", exc_info=True)
291 self.log.error("Couldn't start the Engine", exc_info=True)
284 self.exit(1)
292 self.exit(1)
285
293
286 def forward_logging(self):
294 def forward_logging(self):
287 if self.log_url:
295 if self.log_url:
288 self.log.info("Forwarding logging to %s"%self.log_url)
296 self.log.info("Forwarding logging to %s"%self.log_url)
289 context = self.engine.context
297 context = self.engine.context
290 lsock = context.socket(zmq.PUB)
298 lsock = context.socket(zmq.PUB)
291 lsock.connect(self.log_url)
299 lsock.connect(self.log_url)
292 self.log.removeHandler(self._log_handler)
300 self.log.removeHandler(self._log_handler)
293 handler = EnginePUBHandler(self.engine, lsock)
301 handler = EnginePUBHandler(self.engine, lsock)
294 handler.setLevel(self.log_level)
302 handler.setLevel(self.log_level)
295 self.log.addHandler(handler)
303 self.log.addHandler(handler)
296 self._log_handler = handler
304 self._log_handler = handler
297
305
298 def init_mpi(self):
306 def init_mpi(self):
299 global mpi
307 global mpi
300 self.mpi = MPI(config=self.config)
308 self.mpi = MPI(config=self.config)
301
309
302 mpi_import_statement = self.mpi.init_script
310 mpi_import_statement = self.mpi.init_script
303 if mpi_import_statement:
311 if mpi_import_statement:
304 try:
312 try:
305 self.log.info("Initializing MPI:")
313 self.log.info("Initializing MPI:")
306 self.log.info(mpi_import_statement)
314 self.log.info(mpi_import_statement)
307 exec mpi_import_statement in globals()
315 exec mpi_import_statement in globals()
308 except:
316 except:
309 mpi = None
317 mpi = None
310 else:
318 else:
311 mpi = None
319 mpi = None
312
320
313 def initialize(self, argv=None):
321 def initialize(self, argv=None):
314 super(IPEngineApp, self).initialize(argv)
322 super(IPEngineApp, self).initialize(argv)
315 self.init_mpi()
323 self.init_mpi()
316 self.init_engine()
324 self.init_engine()
317 self.forward_logging()
325 self.forward_logging()
318
326
319 def start(self):
327 def start(self):
320 self.engine.start()
328 self.engine.start()
321 try:
329 try:
322 self.engine.loop.start()
330 self.engine.loop.start()
323 except KeyboardInterrupt:
331 except KeyboardInterrupt:
324 self.log.critical("Engine Interrupted, shutting down...\n")
332 self.log.critical("Engine Interrupted, shutting down...\n")
325
333
326
334
327 def launch_new_instance():
335 def launch_new_instance():
328 """Create and run the IPython engine"""
336 """Create and run the IPython engine"""
329 app = IPEngineApp.instance()
337 app = IPEngineApp.instance()
330 app.initialize()
338 app.initialize()
331 app.start()
339 app.start()
332
340
333
341
334 if __name__ == '__main__':
342 if __name__ == '__main__':
335 launch_new_instance()
343 launch_new_instance()
336
344
@@ -1,1152 +1,1176 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 Facilities for launching IPython processes asynchronously.
3 Facilities for launching IPython processes asynchronously.
4
4
5 Authors:
5 Authors:
6
6
7 * Brian Granger
7 * Brian Granger
8 * MinRK
8 * MinRK
9 """
9 """
10
10
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2008-2011 The IPython Development Team
12 # Copyright (C) 2008-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 # Imports
19 # Imports
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21
21
22 import copy
22 import copy
23 import logging
23 import logging
24 import os
24 import os
25 import re
25 import re
26 import stat
26 import stat
27 import time
27 import time
28
28
29 # signal imports, handling various platforms, versions
29 # signal imports, handling various platforms, versions
30
30
31 from signal import SIGINT, SIGTERM
31 from signal import SIGINT, SIGTERM
32 try:
32 try:
33 from signal import SIGKILL
33 from signal import SIGKILL
34 except ImportError:
34 except ImportError:
35 # Windows
35 # Windows
36 SIGKILL=SIGTERM
36 SIGKILL=SIGTERM
37
37
38 try:
38 try:
39 # Windows >= 2.7, 3.2
39 # Windows >= 2.7, 3.2
40 from signal import CTRL_C_EVENT as SIGINT
40 from signal import CTRL_C_EVENT as SIGINT
41 except ImportError:
41 except ImportError:
42 pass
42 pass
43
43
44 from subprocess import Popen, PIPE, STDOUT
44 from subprocess import Popen, PIPE, STDOUT
45 try:
45 try:
46 from subprocess import check_output
46 from subprocess import check_output
47 except ImportError:
47 except ImportError:
48 # pre-2.7, define check_output with Popen
48 # pre-2.7, define check_output with Popen
49 def check_output(*args, **kwargs):
49 def check_output(*args, **kwargs):
50 kwargs.update(dict(stdout=PIPE))
50 kwargs.update(dict(stdout=PIPE))
51 p = Popen(*args, **kwargs)
51 p = Popen(*args, **kwargs)
52 out,err = p.communicate()
52 out,err = p.communicate()
53 return out
53 return out
54
54
55 from zmq.eventloop import ioloop
55 from zmq.eventloop import ioloop
56
56
57 from IPython.config.application import Application
57 from IPython.config.application import Application
58 from IPython.config.configurable import LoggingConfigurable
58 from IPython.config.configurable import LoggingConfigurable
59 from IPython.utils.text import EvalFormatter
59 from IPython.utils.text import EvalFormatter
60 from IPython.utils.traitlets import Any, Int, CFloat, List, Unicode, Dict, Instance
60 from IPython.utils.traitlets import (
61 Any, Int, CFloat, List, Unicode, Dict, Instance, HasTraits,
62 )
61 from IPython.utils.path import get_ipython_module_path
63 from IPython.utils.path import get_ipython_module_path
62 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
64 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
63
65
64 from .win32support import forward_read_events
66 from .win32support import forward_read_events
65
67
66 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
68 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
67
69
68 WINDOWS = os.name == 'nt'
70 WINDOWS = os.name == 'nt'
69
71
70 #-----------------------------------------------------------------------------
72 #-----------------------------------------------------------------------------
71 # Paths to the kernel apps
73 # Paths to the kernel apps
72 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
73
75
74
76
75 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
77 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
76 'IPython.parallel.apps.ipclusterapp'
78 'IPython.parallel.apps.ipclusterapp'
77 ))
79 ))
78
80
79 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
81 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
80 'IPython.parallel.apps.ipengineapp'
82 'IPython.parallel.apps.ipengineapp'
81 ))
83 ))
82
84
83 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
85 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
84 'IPython.parallel.apps.ipcontrollerapp'
86 'IPython.parallel.apps.ipcontrollerapp'
85 ))
87 ))
86
88
87 #-----------------------------------------------------------------------------
89 #-----------------------------------------------------------------------------
88 # Base launchers and errors
90 # Base launchers and errors
89 #-----------------------------------------------------------------------------
91 #-----------------------------------------------------------------------------
90
92
91
93
92 class LauncherError(Exception):
94 class LauncherError(Exception):
93 pass
95 pass
94
96
95
97
96 class ProcessStateError(LauncherError):
98 class ProcessStateError(LauncherError):
97 pass
99 pass
98
100
99
101
100 class UnknownStatus(LauncherError):
102 class UnknownStatus(LauncherError):
101 pass
103 pass
102
104
103
105
104 class BaseLauncher(LoggingConfigurable):
106 class BaseLauncher(LoggingConfigurable):
105 """An asbtraction for starting, stopping and signaling a process."""
107 """An asbtraction for starting, stopping and signaling a process."""
106
108
107 # In all of the launchers, the work_dir is where child processes will be
109 # In all of the launchers, the work_dir is where child processes will be
108 # run. This will usually be the profile_dir, but may not be. any work_dir
110 # run. This will usually be the profile_dir, but may not be. any work_dir
109 # passed into the __init__ method will override the config value.
111 # passed into the __init__ method will override the config value.
110 # This should not be used to set the work_dir for the actual engine
112 # This should not be used to set the work_dir for the actual engine
111 # and controller. Instead, use their own config files or the
113 # and controller. Instead, use their own config files or the
112 # controller_args, engine_args attributes of the launchers to add
114 # controller_args, engine_args attributes of the launchers to add
113 # the work_dir option.
115 # the work_dir option.
114 work_dir = Unicode(u'.')
116 work_dir = Unicode(u'.')
115 loop = Instance('zmq.eventloop.ioloop.IOLoop')
117 loop = Instance('zmq.eventloop.ioloop.IOLoop')
116
118
117 start_data = Any()
119 start_data = Any()
118 stop_data = Any()
120 stop_data = Any()
119
121
120 def _loop_default(self):
122 def _loop_default(self):
121 return ioloop.IOLoop.instance()
123 return ioloop.IOLoop.instance()
122
124
123 def __init__(self, work_dir=u'.', config=None, **kwargs):
125 def __init__(self, work_dir=u'.', config=None, **kwargs):
124 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
126 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
125 self.state = 'before' # can be before, running, after
127 self.state = 'before' # can be before, running, after
126 self.stop_callbacks = []
128 self.stop_callbacks = []
127 self.start_data = None
129 self.start_data = None
128 self.stop_data = None
130 self.stop_data = None
129
131
130 @property
132 @property
131 def args(self):
133 def args(self):
132 """A list of cmd and args that will be used to start the process.
134 """A list of cmd and args that will be used to start the process.
133
135
134 This is what is passed to :func:`spawnProcess` and the first element
136 This is what is passed to :func:`spawnProcess` and the first element
135 will be the process name.
137 will be the process name.
136 """
138 """
137 return self.find_args()
139 return self.find_args()
138
140
139 def find_args(self):
141 def find_args(self):
140 """The ``.args`` property calls this to find the args list.
142 """The ``.args`` property calls this to find the args list.
141
143
142 Subcommand should implement this to construct the cmd and args.
144 Subcommand should implement this to construct the cmd and args.
143 """
145 """
144 raise NotImplementedError('find_args must be implemented in a subclass')
146 raise NotImplementedError('find_args must be implemented in a subclass')
145
147
146 @property
148 @property
147 def arg_str(self):
149 def arg_str(self):
148 """The string form of the program arguments."""
150 """The string form of the program arguments."""
149 return ' '.join(self.args)
151 return ' '.join(self.args)
150
152
151 @property
153 @property
152 def running(self):
154 def running(self):
153 """Am I running."""
155 """Am I running."""
154 if self.state == 'running':
156 if self.state == 'running':
155 return True
157 return True
156 else:
158 else:
157 return False
159 return False
158
160
159 def start(self):
161 def start(self):
160 """Start the process."""
162 """Start the process."""
161 raise NotImplementedError('start must be implemented in a subclass')
163 raise NotImplementedError('start must be implemented in a subclass')
162
164
163 def stop(self):
165 def stop(self):
164 """Stop the process and notify observers of stopping.
166 """Stop the process and notify observers of stopping.
165
167
166 This method will return None immediately.
168 This method will return None immediately.
167 To observe the actual process stopping, see :meth:`on_stop`.
169 To observe the actual process stopping, see :meth:`on_stop`.
168 """
170 """
169 raise NotImplementedError('stop must be implemented in a subclass')
171 raise NotImplementedError('stop must be implemented in a subclass')
170
172
171 def on_stop(self, f):
173 def on_stop(self, f):
172 """Register a callback to be called with this Launcher's stop_data
174 """Register a callback to be called with this Launcher's stop_data
173 when the process actually finishes.
175 when the process actually finishes.
174 """
176 """
175 if self.state=='after':
177 if self.state=='after':
176 return f(self.stop_data)
178 return f(self.stop_data)
177 else:
179 else:
178 self.stop_callbacks.append(f)
180 self.stop_callbacks.append(f)
179
181
180 def notify_start(self, data):
182 def notify_start(self, data):
181 """Call this to trigger startup actions.
183 """Call this to trigger startup actions.
182
184
183 This logs the process startup and sets the state to 'running'. It is
185 This logs the process startup and sets the state to 'running'. It is
184 a pass-through so it can be used as a callback.
186 a pass-through so it can be used as a callback.
185 """
187 """
186
188
187 self.log.info('Process %r started: %r' % (self.args[0], data))
189 self.log.info('Process %r started: %r' % (self.args[0], data))
188 self.start_data = data
190 self.start_data = data
189 self.state = 'running'
191 self.state = 'running'
190 return data
192 return data
191
193
192 def notify_stop(self, data):
194 def notify_stop(self, data):
193 """Call this to trigger process stop actions.
195 """Call this to trigger process stop actions.
194
196
195 This logs the process stopping and sets the state to 'after'. Call
197 This logs the process stopping and sets the state to 'after'. Call
196 this to trigger callbacks registered via :meth:`on_stop`."""
198 this to trigger callbacks registered via :meth:`on_stop`."""
197
199
198 self.log.info('Process %r stopped: %r' % (self.args[0], data))
200 self.log.info('Process %r stopped: %r' % (self.args[0], data))
199 self.stop_data = data
201 self.stop_data = data
200 self.state = 'after'
202 self.state = 'after'
201 for i in range(len(self.stop_callbacks)):
203 for i in range(len(self.stop_callbacks)):
202 d = self.stop_callbacks.pop()
204 d = self.stop_callbacks.pop()
203 d(data)
205 d(data)
204 return data
206 return data
205
207
206 def signal(self, sig):
208 def signal(self, sig):
207 """Signal the process.
209 """Signal the process.
208
210
209 Parameters
211 Parameters
210 ----------
212 ----------
211 sig : str or int
213 sig : str or int
212 'KILL', 'INT', etc., or any signal number
214 'KILL', 'INT', etc., or any signal number
213 """
215 """
214 raise NotImplementedError('signal must be implemented in a subclass')
216 raise NotImplementedError('signal must be implemented in a subclass')
215
217
218 class ClusterAppMixin(HasTraits):
219 """MixIn for cluster args as traits"""
220 cluster_args = List([])
221 profile_dir=Unicode('')
222 cluster_id=Unicode('')
223 def _profile_dir_changed(self, name, old, new):
224 self.cluster_args = []
225 if self.profile_dir:
226 self.cluster_args.extend(['--profile-dir', self.profile_dir])
227 if self.cluster_id:
228 self.cluster_args.extend(['--cluster-id', self.cluster_id])
229 _cluster_id_changed = _profile_dir_changed
230
231 class ControllerMixin(ClusterAppMixin):
232 controller_cmd = List(ipcontroller_cmd_argv, config=True,
233 help="""Popen command to launch ipcontroller.""")
234 # Command line arguments to ipcontroller.
235 controller_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
236 help="""command-line args to pass to ipcontroller""")
237
238 class EngineMixin(ClusterAppMixin):
239 engine_cmd = List(ipengine_cmd_argv, config=True,
240 help="""command to launch the Engine.""")
241 # Command line arguments for ipengine.
242 engine_args = List(['--log-to-file','--log-level=%i' % logging.INFO], config=True,
243 help="command-line arguments to pass to ipengine"
244 )
216
245
217 #-----------------------------------------------------------------------------
246 #-----------------------------------------------------------------------------
218 # Local process launchers
247 # Local process launchers
219 #-----------------------------------------------------------------------------
248 #-----------------------------------------------------------------------------
220
249
221
250
222 class LocalProcessLauncher(BaseLauncher):
251 class LocalProcessLauncher(BaseLauncher):
223 """Start and stop an external process in an asynchronous manner.
252 """Start and stop an external process in an asynchronous manner.
224
253
225 This will launch the external process with a working directory of
254 This will launch the external process with a working directory of
226 ``self.work_dir``.
255 ``self.work_dir``.
227 """
256 """
228
257
229 # This is used to to construct self.args, which is passed to
258 # This is used to to construct self.args, which is passed to
230 # spawnProcess.
259 # spawnProcess.
231 cmd_and_args = List([])
260 cmd_and_args = List([])
232 poll_frequency = Int(100) # in ms
261 poll_frequency = Int(100) # in ms
233
262
234 def __init__(self, work_dir=u'.', config=None, **kwargs):
263 def __init__(self, work_dir=u'.', config=None, **kwargs):
235 super(LocalProcessLauncher, self).__init__(
264 super(LocalProcessLauncher, self).__init__(
236 work_dir=work_dir, config=config, **kwargs
265 work_dir=work_dir, config=config, **kwargs
237 )
266 )
238 self.process = None
267 self.process = None
239 self.poller = None
268 self.poller = None
240
269
241 def find_args(self):
270 def find_args(self):
242 return self.cmd_and_args
271 return self.cmd_and_args
243
272
244 def start(self):
273 def start(self):
245 if self.state == 'before':
274 if self.state == 'before':
246 self.process = Popen(self.args,
275 self.process = Popen(self.args,
247 stdout=PIPE,stderr=PIPE,stdin=PIPE,
276 stdout=PIPE,stderr=PIPE,stdin=PIPE,
248 env=os.environ,
277 env=os.environ,
249 cwd=self.work_dir
278 cwd=self.work_dir
250 )
279 )
251 if WINDOWS:
280 if WINDOWS:
252 self.stdout = forward_read_events(self.process.stdout)
281 self.stdout = forward_read_events(self.process.stdout)
253 self.stderr = forward_read_events(self.process.stderr)
282 self.stderr = forward_read_events(self.process.stderr)
254 else:
283 else:
255 self.stdout = self.process.stdout.fileno()
284 self.stdout = self.process.stdout.fileno()
256 self.stderr = self.process.stderr.fileno()
285 self.stderr = self.process.stderr.fileno()
257 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
286 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
258 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
287 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
259 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
288 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
260 self.poller.start()
289 self.poller.start()
261 self.notify_start(self.process.pid)
290 self.notify_start(self.process.pid)
262 else:
291 else:
263 s = 'The process was already started and has state: %r' % self.state
292 s = 'The process was already started and has state: %r' % self.state
264 raise ProcessStateError(s)
293 raise ProcessStateError(s)
265
294
266 def stop(self):
295 def stop(self):
267 return self.interrupt_then_kill()
296 return self.interrupt_then_kill()
268
297
269 def signal(self, sig):
298 def signal(self, sig):
270 if self.state == 'running':
299 if self.state == 'running':
271 if WINDOWS and sig != SIGINT:
300 if WINDOWS and sig != SIGINT:
272 # use Windows tree-kill for better child cleanup
301 # use Windows tree-kill for better child cleanup
273 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
302 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
274 else:
303 else:
275 self.process.send_signal(sig)
304 self.process.send_signal(sig)
276
305
277 def interrupt_then_kill(self, delay=2.0):
306 def interrupt_then_kill(self, delay=2.0):
278 """Send INT, wait a delay and then send KILL."""
307 """Send INT, wait a delay and then send KILL."""
279 try:
308 try:
280 self.signal(SIGINT)
309 self.signal(SIGINT)
281 except Exception:
310 except Exception:
282 self.log.debug("interrupt failed")
311 self.log.debug("interrupt failed")
283 pass
312 pass
284 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
313 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
285 self.killer.start()
314 self.killer.start()
286
315
287 # callbacks, etc:
316 # callbacks, etc:
288
317
289 def handle_stdout(self, fd, events):
318 def handle_stdout(self, fd, events):
290 if WINDOWS:
319 if WINDOWS:
291 line = self.stdout.recv()
320 line = self.stdout.recv()
292 else:
321 else:
293 line = self.process.stdout.readline()
322 line = self.process.stdout.readline()
294 # a stopped process will be readable but return empty strings
323 # a stopped process will be readable but return empty strings
295 if line:
324 if line:
296 self.log.info(line[:-1])
325 self.log.info(line[:-1])
297 else:
326 else:
298 self.poll()
327 self.poll()
299
328
300 def handle_stderr(self, fd, events):
329 def handle_stderr(self, fd, events):
301 if WINDOWS:
330 if WINDOWS:
302 line = self.stderr.recv()
331 line = self.stderr.recv()
303 else:
332 else:
304 line = self.process.stderr.readline()
333 line = self.process.stderr.readline()
305 # a stopped process will be readable but return empty strings
334 # a stopped process will be readable but return empty strings
306 if line:
335 if line:
307 self.log.error(line[:-1])
336 self.log.error(line[:-1])
308 else:
337 else:
309 self.poll()
338 self.poll()
310
339
311 def poll(self):
340 def poll(self):
312 status = self.process.poll()
341 status = self.process.poll()
313 if status is not None:
342 if status is not None:
314 self.poller.stop()
343 self.poller.stop()
315 self.loop.remove_handler(self.stdout)
344 self.loop.remove_handler(self.stdout)
316 self.loop.remove_handler(self.stderr)
345 self.loop.remove_handler(self.stderr)
317 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
346 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
318 return status
347 return status
319
348
320 class LocalControllerLauncher(LocalProcessLauncher):
349 class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):
321 """Launch a controller as a regular external process."""
350 """Launch a controller as a regular external process."""
322
351
323 controller_cmd = List(ipcontroller_cmd_argv, config=True,
324 help="""Popen command to launch ipcontroller.""")
325 # Command line arguments to ipcontroller.
326 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
327 help="""command-line args to pass to ipcontroller""")
328
329 def find_args(self):
352 def find_args(self):
330 return self.controller_cmd + self.controller_args
353 return self.controller_cmd + self.cluster_args + self.controller_args
331
354
332 def start(self, profile_dir):
355 def start(self):
333 """Start the controller by profile_dir."""
356 """Start the controller by profile_dir."""
334 self.controller_args.extend(['--profile-dir=%s'%profile_dir])
335 self.profile_dir = unicode(profile_dir)
336 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
357 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
337 return super(LocalControllerLauncher, self).start()
358 return super(LocalControllerLauncher, self).start()
338
359
339
360
340 class LocalEngineLauncher(LocalProcessLauncher):
361 class LocalEngineLauncher(LocalProcessLauncher, EngineMixin):
341 """Launch a single engine as a regular externall process."""
362 """Launch a single engine as a regular externall process."""
342
363
343 engine_cmd = List(ipengine_cmd_argv, config=True,
344 help="""command to launch the Engine.""")
345 # Command line arguments for ipengine.
346 engine_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
347 help="command-line arguments to pass to ipengine"
348 )
349
350 def find_args(self):
364 def find_args(self):
351 return self.engine_cmd + self.engine_args
365 return self.engine_cmd + self.cluster_args + self.engine_args
352
366
353 def start(self, profile_dir):
354 """Start the engine by profile_dir."""
355 self.engine_args.extend(['--profile-dir=%s'%profile_dir])
356 self.profile_dir = unicode(profile_dir)
357 return super(LocalEngineLauncher, self).start()
358
367
359
368 class LocalEngineSetLauncher(LocalEngineLauncher):
360 class LocalEngineSetLauncher(BaseLauncher):
361 """Launch a set of engines as regular external processes."""
369 """Launch a set of engines as regular external processes."""
362
370
363 # Command line arguments for ipengine.
364 engine_args = List(
365 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
366 help="command-line arguments to pass to ipengine"
367 )
368 delay = CFloat(0.1, config=True,
371 delay = CFloat(0.1, config=True,
369 help="""delay (in seconds) between starting each engine after the first.
372 help="""delay (in seconds) between starting each engine after the first.
370 This can help force the engines to get their ids in order, or limit
373 This can help force the engines to get their ids in order, or limit
371 process flood when starting many engines."""
374 process flood when starting many engines."""
372 )
375 )
373
376
374 # launcher class
377 # launcher class
375 launcher_class = LocalEngineLauncher
378 launcher_class = LocalEngineLauncher
376
379
377 launchers = Dict()
380 launchers = Dict()
378 stop_data = Dict()
381 stop_data = Dict()
379
382
380 def __init__(self, work_dir=u'.', config=None, **kwargs):
383 def __init__(self, work_dir=u'.', config=None, **kwargs):
381 super(LocalEngineSetLauncher, self).__init__(
384 super(LocalEngineSetLauncher, self).__init__(
382 work_dir=work_dir, config=config, **kwargs
385 work_dir=work_dir, config=config, **kwargs
383 )
386 )
384 self.stop_data = {}
387 self.stop_data = {}
385
388
386 def start(self, n, profile_dir):
389 def start(self, n):
387 """Start n engines by profile or profile_dir."""
390 """Start n engines by profile or profile_dir."""
388 self.profile_dir = unicode(profile_dir)
389 dlist = []
391 dlist = []
390 for i in range(n):
392 for i in range(n):
391 if i > 0:
393 if i > 0:
392 time.sleep(self.delay)
394 time.sleep(self.delay)
393 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
395 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
396 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
397 )
398
394 # Copy the engine args over to each engine launcher.
399 # Copy the engine args over to each engine launcher.
400 el.engine_cmd = copy.deepcopy(self.engine_cmd)
395 el.engine_args = copy.deepcopy(self.engine_args)
401 el.engine_args = copy.deepcopy(self.engine_args)
396 el.on_stop(self._notice_engine_stopped)
402 el.on_stop(self._notice_engine_stopped)
397 d = el.start(profile_dir)
403 d = el.start()
398 if i==0:
404 if i==0:
399 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
405 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
400 self.launchers[i] = el
406 self.launchers[i] = el
401 dlist.append(d)
407 dlist.append(d)
402 self.notify_start(dlist)
408 self.notify_start(dlist)
403 # The consumeErrors here could be dangerous
404 # dfinal = gatherBoth(dlist, consumeErrors=True)
405 # dfinal.addCallback(self.notify_start)
406 return dlist
409 return dlist
407
410
408 def find_args(self):
411 def find_args(self):
409 return ['engine set']
412 return ['engine set']
410
413
411 def signal(self, sig):
414 def signal(self, sig):
412 dlist = []
415 dlist = []
413 for el in self.launchers.itervalues():
416 for el in self.launchers.itervalues():
414 d = el.signal(sig)
417 d = el.signal(sig)
415 dlist.append(d)
418 dlist.append(d)
416 # dfinal = gatherBoth(dlist, consumeErrors=True)
417 return dlist
419 return dlist
418
420
419 def interrupt_then_kill(self, delay=1.0):
421 def interrupt_then_kill(self, delay=1.0):
420 dlist = []
422 dlist = []
421 for el in self.launchers.itervalues():
423 for el in self.launchers.itervalues():
422 d = el.interrupt_then_kill(delay)
424 d = el.interrupt_then_kill(delay)
423 dlist.append(d)
425 dlist.append(d)
424 # dfinal = gatherBoth(dlist, consumeErrors=True)
425 return dlist
426 return dlist
426
427
427 def stop(self):
428 def stop(self):
428 return self.interrupt_then_kill()
429 return self.interrupt_then_kill()
429
430
430 def _notice_engine_stopped(self, data):
431 def _notice_engine_stopped(self, data):
431 pid = data['pid']
432 pid = data['pid']
432 for idx,el in self.launchers.iteritems():
433 for idx,el in self.launchers.iteritems():
433 if el.process.pid == pid:
434 if el.process.pid == pid:
434 break
435 break
435 self.launchers.pop(idx)
436 self.launchers.pop(idx)
436 self.stop_data[idx] = data
437 self.stop_data[idx] = data
437 if not self.launchers:
438 if not self.launchers:
438 self.notify_stop(self.stop_data)
439 self.notify_stop(self.stop_data)
439
440
440
441
441 #-----------------------------------------------------------------------------
442 #-----------------------------------------------------------------------------
442 # MPIExec launchers
443 # MPIExec launchers
443 #-----------------------------------------------------------------------------
444 #-----------------------------------------------------------------------------
444
445
445
446
446 class MPIExecLauncher(LocalProcessLauncher):
447 class MPIExecLauncher(LocalProcessLauncher):
447 """Launch an external process using mpiexec."""
448 """Launch an external process using mpiexec."""
448
449
449 mpi_cmd = List(['mpiexec'], config=True,
450 mpi_cmd = List(['mpiexec'], config=True,
450 help="The mpiexec command to use in starting the process."
451 help="The mpiexec command to use in starting the process."
451 )
452 )
452 mpi_args = List([], config=True,
453 mpi_args = List([], config=True,
453 help="The command line arguments to pass to mpiexec."
454 help="The command line arguments to pass to mpiexec."
454 )
455 )
455 program = List(['date'], config=True,
456 program = List(['date'],
456 help="The program to start via mpiexec.")
457 help="The program to start via mpiexec.")
457 program_args = List([], config=True,
458 program_args = List([],
458 help="The command line argument to the program."
459 help="The command line argument to the program."
459 )
460 )
460 n = Int(1)
461 n = Int(1)
461
462
462 def find_args(self):
463 def find_args(self):
463 """Build self.args using all the fields."""
464 """Build self.args using all the fields."""
464 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
465 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
465 self.program + self.program_args
466 self.program + self.program_args
466
467
467 def start(self, n):
468 def start(self, n):
468 """Start n instances of the program using mpiexec."""
469 """Start n instances of the program using mpiexec."""
469 self.n = n
470 self.n = n
470 return super(MPIExecLauncher, self).start()
471 return super(MPIExecLauncher, self).start()
471
472
472
473
473 class MPIExecControllerLauncher(MPIExecLauncher):
474 class MPIExecControllerLauncher(MPIExecLauncher, ControllerMixin):
474 """Launch a controller using mpiexec."""
475 """Launch a controller using mpiexec."""
475
476
476 controller_cmd = List(ipcontroller_cmd_argv, config=True,
477 # alias back to *non-configurable* program[_args] for use in find_args()
477 help="Popen command to launch the Contropper"
478 # this way all Controller/EngineSetLaunchers have the same form, rather
478 )
479 # than *some* having `program_args` and others `controller_args`
479 controller_args = List(['--log-to-file','--log-level=%i'%logging.INFO], config=True,
480 @property
480 help="Command line arguments to pass to ipcontroller."
481 def program(self):
481 )
482 return self.controller_cmd
482 n = Int(1)
483
484 @property
485 def program_args(self):
486 return self.cluster_args + self.controller_args
483
487
484 def start(self, profile_dir):
488 def start(self):
485 """Start the controller by profile_dir."""
489 """Start the controller by profile_dir."""
486 self.controller_args.extend(['--profile-dir=%s'%profile_dir])
487 self.profile_dir = unicode(profile_dir)
488 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
490 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
489 return super(MPIExecControllerLauncher, self).start(1)
491 return super(MPIExecControllerLauncher, self).start(1)
490
492
491 def find_args(self):
492 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
493 self.controller_cmd + self.controller_args
494
495
493
496 class MPIExecEngineSetLauncher(MPIExecLauncher):
494 class MPIExecEngineSetLauncher(MPIExecLauncher, EngineMixin):
495 """Launch engines using mpiexec"""
497
496
498 program = List(ipengine_cmd_argv, config=True,
497 # alias back to *non-configurable* program[_args] for use in find_args()
499 help="Popen command for ipengine"
498 # this way all Controller/EngineSetLaunchers have the same form, rather
500 )
499 # than *some* having `program_args` and others `controller_args`
501 program_args = List(
500 @property
502 ['--log-to-file','--log-level=%i'%logging.INFO], config=True,
501 def program(self):
503 help="Command line arguments for ipengine."
502 return self.engine_cmd
504 )
503
505 n = Int(1)
504 @property
505 def program_args(self):
506 return self.cluster_args + self.engine_args
506
507
507 def start(self, n, profile_dir):
508 def start(self, n):
508 """Start n engines by profile or profile_dir."""
509 """Start n engines by profile or profile_dir."""
509 self.program_args.extend(['--profile-dir=%s'%profile_dir])
510 self.profile_dir = unicode(profile_dir)
511 self.n = n
510 self.n = n
512 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
511 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
513 return super(MPIExecEngineSetLauncher, self).start(n)
512 return super(MPIExecEngineSetLauncher, self).start(n)
514
513
515 #-----------------------------------------------------------------------------
514 #-----------------------------------------------------------------------------
516 # SSH launchers
515 # SSH launchers
517 #-----------------------------------------------------------------------------
516 #-----------------------------------------------------------------------------
518
517
519 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
518 # TODO: Get SSH Launcher back to level of sshx in 0.10.2
520
519
521 class SSHLauncher(LocalProcessLauncher):
520 class SSHLauncher(LocalProcessLauncher):
522 """A minimal launcher for ssh.
521 """A minimal launcher for ssh.
523
522
524 To be useful this will probably have to be extended to use the ``sshx``
523 To be useful this will probably have to be extended to use the ``sshx``
525 idea for environment variables. There could be other things this needs
524 idea for environment variables. There could be other things this needs
526 as well.
525 as well.
527 """
526 """
528
527
529 ssh_cmd = List(['ssh'], config=True,
528 ssh_cmd = List(['ssh'], config=True,
530 help="command for starting ssh")
529 help="command for starting ssh")
531 ssh_args = List(['-tt'], config=True,
530 ssh_args = List(['-tt'], config=True,
532 help="args to pass to ssh")
531 help="args to pass to ssh")
533 program = List(['date'], config=True,
532 program = List(['date'],
534 help="Program to launch via ssh")
533 help="Program to launch via ssh")
535 program_args = List([], config=True,
534 program_args = List([],
536 help="args to pass to remote program")
535 help="args to pass to remote program")
537 hostname = Unicode('', config=True,
536 hostname = Unicode('', config=True,
538 help="hostname on which to launch the program")
537 help="hostname on which to launch the program")
539 user = Unicode('', config=True,
538 user = Unicode('', config=True,
540 help="username for ssh")
539 help="username for ssh")
541 location = Unicode('', config=True,
540 location = Unicode('', config=True,
542 help="user@hostname location for ssh in one setting")
541 help="user@hostname location for ssh in one setting")
543
542
544 def _hostname_changed(self, name, old, new):
543 def _hostname_changed(self, name, old, new):
545 if self.user:
544 if self.user:
546 self.location = u'%s@%s' % (self.user, new)
545 self.location = u'%s@%s' % (self.user, new)
547 else:
546 else:
548 self.location = new
547 self.location = new
549
548
550 def _user_changed(self, name, old, new):
549 def _user_changed(self, name, old, new):
551 self.location = u'%s@%s' % (new, self.hostname)
550 self.location = u'%s@%s' % (new, self.hostname)
552
551
553 def find_args(self):
552 def find_args(self):
554 return self.ssh_cmd + self.ssh_args + [self.location] + \
553 return self.ssh_cmd + self.ssh_args + [self.location] + \
555 self.program + self.program_args
554 self.program + self.program_args
556
555
557 def start(self, profile_dir, hostname=None, user=None):
556 def start(self, hostname=None, user=None):
558 self.profile_dir = unicode(profile_dir)
559 if hostname is not None:
557 if hostname is not None:
560 self.hostname = hostname
558 self.hostname = hostname
561 if user is not None:
559 if user is not None:
562 self.user = user
560 self.user = user
563
561
564 return super(SSHLauncher, self).start()
562 return super(SSHLauncher, self).start()
565
563
566 def signal(self, sig):
564 def signal(self, sig):
567 if self.state == 'running':
565 if self.state == 'running':
568 # send escaped ssh connection-closer
566 # send escaped ssh connection-closer
569 self.process.stdin.write('~.')
567 self.process.stdin.write('~.')
570 self.process.stdin.flush()
568 self.process.stdin.flush()
571
569
572
570
573
571
574 class SSHControllerLauncher(SSHLauncher):
572 class SSHControllerLauncher(SSHLauncher, ControllerMixin):
575
573
576 program = List(ipcontroller_cmd_argv, config=True,
574 # alias back to *non-configurable* program[_args] for use in find_args()
577 help="remote ipcontroller command.")
575 # this way all Controller/EngineSetLaunchers have the same form, rather
578 program_args = List(['--reuse-files', '--log-to-file','--log-level=%i'%logging.INFO], config=True,
576 # than *some* having `program_args` and others `controller_args`
579 help="Command line arguments to ipcontroller.")
577 @property
578 def program(self):
579 return self.controller_cmd
580
581 @property
582 def program_args(self):
583 return self.cluster_args + self.controller_args
580
584
581
585
582 class SSHEngineLauncher(SSHLauncher):
586 class SSHEngineLauncher(SSHLauncher, EngineMixin):
583 program = List(ipengine_cmd_argv, config=True,
587
584 help="remote ipengine command.")
588 # alias back to *non-configurable* program[_args] for use in find_args()
585 # Command line arguments for ipengine.
589 # this way all Controller/EngineSetLaunchers have the same form, rather
586 program_args = List(
590 # than *some* having `program_args` and others `controller_args`
587 ['--log-to-file','--log_level=%i'%logging.INFO], config=True,
591 @property
588 help="Command line arguments to ipengine."
592 def program(self):
589 )
593 return self.engine_cmd
594
595 @property
596 def program_args(self):
597 return self.cluster_args + self.engine_args
598
590
599
591 class SSHEngineSetLauncher(LocalEngineSetLauncher):
600 class SSHEngineSetLauncher(LocalEngineSetLauncher):
592 launcher_class = SSHEngineLauncher
601 launcher_class = SSHEngineLauncher
593 engines = Dict(config=True,
602 engines = Dict(config=True,
594 help="""dict of engines to launch. This is a dict by hostname of ints,
603 help="""dict of engines to launch. This is a dict by hostname of ints,
595 corresponding to the number of engines to start on that host.""")
604 corresponding to the number of engines to start on that host.""")
596
605
597 def start(self, n, profile_dir):
606 def start(self, n):
598 """Start engines by profile or profile_dir.
607 """Start engines by profile or profile_dir.
599 `n` is ignored, and the `engines` config property is used instead.
608 `n` is ignored, and the `engines` config property is used instead.
600 """
609 """
601
610
602 self.profile_dir = unicode(profile_dir)
603 dlist = []
611 dlist = []
604 for host, n in self.engines.iteritems():
612 for host, n in self.engines.iteritems():
605 if isinstance(n, (tuple, list)):
613 if isinstance(n, (tuple, list)):
606 n, args = n
614 n, args = n
607 else:
615 else:
608 args = copy.deepcopy(self.engine_args)
616 args = copy.deepcopy(self.engine_args)
609
617
610 if '@' in host:
618 if '@' in host:
611 user,host = host.split('@',1)
619 user,host = host.split('@',1)
612 else:
620 else:
613 user=None
621 user=None
614 for i in range(n):
622 for i in range(n):
615 if i > 0:
623 if i > 0:
616 time.sleep(self.delay)
624 time.sleep(self.delay)
617 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
625 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log,
626 profile_dir=self.profile_dir, cluster_id=self.cluster_id,
627 )
618
628
619 # Copy the engine args over to each engine launcher.
629 # Copy the engine args over to each engine launcher.
620 i
630 el.engine_cmd = self.engine_cmd
621 el.program_args = args
631 el.engine_args = args
622 el.on_stop(self._notice_engine_stopped)
632 el.on_stop(self._notice_engine_stopped)
623 d = el.start(profile_dir, user=user, hostname=host)
633 d = el.start(user=user, hostname=host)
624 if i==0:
634 if i==0:
625 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
635 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
626 self.launchers[host+str(i)] = el
636 self.launchers[host+str(i)] = el
627 dlist.append(d)
637 dlist.append(d)
628 self.notify_start(dlist)
638 self.notify_start(dlist)
629 return dlist
639 return dlist
630
640
631
641
632
642
633 #-----------------------------------------------------------------------------
643 #-----------------------------------------------------------------------------
634 # Windows HPC Server 2008 scheduler launchers
644 # Windows HPC Server 2008 scheduler launchers
635 #-----------------------------------------------------------------------------
645 #-----------------------------------------------------------------------------
636
646
637
647
638 # This is only used on Windows.
648 # This is only used on Windows.
639 def find_job_cmd():
649 def find_job_cmd():
640 if WINDOWS:
650 if WINDOWS:
641 try:
651 try:
642 return find_cmd('job')
652 return find_cmd('job')
643 except (FindCmdError, ImportError):
653 except (FindCmdError, ImportError):
644 # ImportError will be raised if win32api is not installed
654 # ImportError will be raised if win32api is not installed
645 return 'job'
655 return 'job'
646 else:
656 else:
647 return 'job'
657 return 'job'
648
658
649
659
650 class WindowsHPCLauncher(BaseLauncher):
660 class WindowsHPCLauncher(BaseLauncher):
651
661
652 job_id_regexp = Unicode(r'\d+', config=True,
662 job_id_regexp = Unicode(r'\d+', config=True,
653 help="""A regular expression used to get the job id from the output of the
663 help="""A regular expression used to get the job id from the output of the
654 submit_command. """
664 submit_command. """
655 )
665 )
656 job_file_name = Unicode(u'ipython_job.xml', config=True,
666 job_file_name = Unicode(u'ipython_job.xml', config=True,
657 help="The filename of the instantiated job script.")
667 help="The filename of the instantiated job script.")
658 # The full path to the instantiated job script. This gets made dynamically
668 # The full path to the instantiated job script. This gets made dynamically
659 # by combining the work_dir with the job_file_name.
669 # by combining the work_dir with the job_file_name.
660 job_file = Unicode(u'')
670 job_file = Unicode(u'')
661 scheduler = Unicode('', config=True,
671 scheduler = Unicode('', config=True,
662 help="The hostname of the scheduler to submit the job to.")
672 help="The hostname of the scheduler to submit the job to.")
663 job_cmd = Unicode(find_job_cmd(), config=True,
673 job_cmd = Unicode(find_job_cmd(), config=True,
664 help="The command for submitting jobs.")
674 help="The command for submitting jobs.")
665
675
666 def __init__(self, work_dir=u'.', config=None, **kwargs):
676 def __init__(self, work_dir=u'.', config=None, **kwargs):
667 super(WindowsHPCLauncher, self).__init__(
677 super(WindowsHPCLauncher, self).__init__(
668 work_dir=work_dir, config=config, **kwargs
678 work_dir=work_dir, config=config, **kwargs
669 )
679 )
670
680
671 @property
681 @property
672 def job_file(self):
682 def job_file(self):
673 return os.path.join(self.work_dir, self.job_file_name)
683 return os.path.join(self.work_dir, self.job_file_name)
674
684
675 def write_job_file(self, n):
685 def write_job_file(self, n):
676 raise NotImplementedError("Implement write_job_file in a subclass.")
686 raise NotImplementedError("Implement write_job_file in a subclass.")
677
687
678 def find_args(self):
688 def find_args(self):
679 return [u'job.exe']
689 return [u'job.exe']
680
690
681 def parse_job_id(self, output):
691 def parse_job_id(self, output):
682 """Take the output of the submit command and return the job id."""
692 """Take the output of the submit command and return the job id."""
683 m = re.search(self.job_id_regexp, output)
693 m = re.search(self.job_id_regexp, output)
684 if m is not None:
694 if m is not None:
685 job_id = m.group()
695 job_id = m.group()
686 else:
696 else:
687 raise LauncherError("Job id couldn't be determined: %s" % output)
697 raise LauncherError("Job id couldn't be determined: %s" % output)
688 self.job_id = job_id
698 self.job_id = job_id
689 self.log.info('Job started with job id: %r' % job_id)
699 self.log.info('Job started with job id: %r' % job_id)
690 return job_id
700 return job_id
691
701
692 def start(self, n):
702 def start(self, n):
693 """Start n copies of the process using the Win HPC job scheduler."""
703 """Start n copies of the process using the Win HPC job scheduler."""
694 self.write_job_file(n)
704 self.write_job_file(n)
695 args = [
705 args = [
696 'submit',
706 'submit',
697 '/jobfile:%s' % self.job_file,
707 '/jobfile:%s' % self.job_file,
698 '/scheduler:%s' % self.scheduler
708 '/scheduler:%s' % self.scheduler
699 ]
709 ]
700 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
710 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
701
711
702 output = check_output([self.job_cmd]+args,
712 output = check_output([self.job_cmd]+args,
703 env=os.environ,
713 env=os.environ,
704 cwd=self.work_dir,
714 cwd=self.work_dir,
705 stderr=STDOUT
715 stderr=STDOUT
706 )
716 )
707 job_id = self.parse_job_id(output)
717 job_id = self.parse_job_id(output)
708 self.notify_start(job_id)
718 self.notify_start(job_id)
709 return job_id
719 return job_id
710
720
711 def stop(self):
721 def stop(self):
712 args = [
722 args = [
713 'cancel',
723 'cancel',
714 self.job_id,
724 self.job_id,
715 '/scheduler:%s' % self.scheduler
725 '/scheduler:%s' % self.scheduler
716 ]
726 ]
717 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
727 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
718 try:
728 try:
719 output = check_output([self.job_cmd]+args,
729 output = check_output([self.job_cmd]+args,
720 env=os.environ,
730 env=os.environ,
721 cwd=self.work_dir,
731 cwd=self.work_dir,
722 stderr=STDOUT
732 stderr=STDOUT
723 )
733 )
724 except:
734 except:
725 output = 'The job already appears to be stoppped: %r' % self.job_id
735 output = 'The job already appears to be stoppped: %r' % self.job_id
726 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
736 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
727 return output
737 return output
728
738
729
739
730 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
740 class WindowsHPCControllerLauncher(WindowsHPCLauncher, ClusterAppMixin):
731
741
732 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
742 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
733 help="WinHPC xml job file.")
743 help="WinHPC xml job file.")
734 extra_args = List([], config=False,
744 controller_args = List([], config=False,
735 help="extra args to pass to ipcontroller")
745 help="extra args to pass to ipcontroller")
736
746
737 def write_job_file(self, n):
747 def write_job_file(self, n):
738 job = IPControllerJob(config=self.config)
748 job = IPControllerJob(config=self.config)
739
749
740 t = IPControllerTask(config=self.config)
750 t = IPControllerTask(config=self.config)
741 # The tasks work directory is *not* the actual work directory of
751 # The tasks work directory is *not* the actual work directory of
742 # the controller. It is used as the base path for the stdout/stderr
752 # the controller. It is used as the base path for the stdout/stderr
743 # files that the scheduler redirects to.
753 # files that the scheduler redirects to.
744 t.work_directory = self.profile_dir
754 t.work_directory = self.profile_dir
745 # Add the profile_dir and from self.start().
755 # Add the profile_dir and from self.start().
746 t.controller_args.extend(self.extra_args)
756 t.controller_args.extend(self.cluster_args)
757 t.controller_args.extend(self.controller_args)
747 job.add_task(t)
758 job.add_task(t)
748
759
749 self.log.info("Writing job description file: %s" % self.job_file)
760 self.log.info("Writing job description file: %s" % self.job_file)
750 job.write(self.job_file)
761 job.write(self.job_file)
751
762
752 @property
763 @property
753 def job_file(self):
764 def job_file(self):
754 return os.path.join(self.profile_dir, self.job_file_name)
765 return os.path.join(self.profile_dir, self.job_file_name)
755
766
756 def start(self, profile_dir):
767 def start(self):
757 """Start the controller by profile_dir."""
768 """Start the controller by profile_dir."""
758 self.extra_args = ['--profile-dir=%s'%profile_dir]
759 self.profile_dir = unicode(profile_dir)
760 return super(WindowsHPCControllerLauncher, self).start(1)
769 return super(WindowsHPCControllerLauncher, self).start(1)
761
770
762
771
763 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
772 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher, ClusterAppMixin):
764
773
765 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
774 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
766 help="jobfile for ipengines job")
775 help="jobfile for ipengines job")
767 extra_args = List([], config=False,
776 engine_args = List([], config=False,
768 help="extra args to pas to ipengine")
777 help="extra args to pas to ipengine")
769
778
770 def write_job_file(self, n):
779 def write_job_file(self, n):
771 job = IPEngineSetJob(config=self.config)
780 job = IPEngineSetJob(config=self.config)
772
781
773 for i in range(n):
782 for i in range(n):
774 t = IPEngineTask(config=self.config)
783 t = IPEngineTask(config=self.config)
775 # The tasks work directory is *not* the actual work directory of
784 # The tasks work directory is *not* the actual work directory of
776 # the engine. It is used as the base path for the stdout/stderr
785 # the engine. It is used as the base path for the stdout/stderr
777 # files that the scheduler redirects to.
786 # files that the scheduler redirects to.
778 t.work_directory = self.profile_dir
787 t.work_directory = self.profile_dir
779 # Add the profile_dir and from self.start().
788 # Add the profile_dir and from self.start().
780 t.engine_args.extend(self.extra_args)
789 t.controller_args.extend(self.cluster_args)
790 t.controller_args.extend(self.engine_args)
781 job.add_task(t)
791 job.add_task(t)
782
792
783 self.log.info("Writing job description file: %s" % self.job_file)
793 self.log.info("Writing job description file: %s" % self.job_file)
784 job.write(self.job_file)
794 job.write(self.job_file)
785
795
786 @property
796 @property
787 def job_file(self):
797 def job_file(self):
788 return os.path.join(self.profile_dir, self.job_file_name)
798 return os.path.join(self.profile_dir, self.job_file_name)
789
799
790 def start(self, n, profile_dir):
800 def start(self, n):
791 """Start the controller by profile_dir."""
801 """Start the controller by profile_dir."""
792 self.extra_args = ['--profile-dir=%s'%profile_dir]
793 self.profile_dir = unicode(profile_dir)
794 return super(WindowsHPCEngineSetLauncher, self).start(n)
802 return super(WindowsHPCEngineSetLauncher, self).start(n)
795
803
796
804
797 #-----------------------------------------------------------------------------
805 #-----------------------------------------------------------------------------
798 # Batch (PBS) system launchers
806 # Batch (PBS) system launchers
799 #-----------------------------------------------------------------------------
807 #-----------------------------------------------------------------------------
800
808
809 class BatchClusterAppMixin(ClusterAppMixin):
810 """ClusterApp mixin that updates the self.context dict, rather than cl-args."""
811 def _profile_dir_changed(self, name, old, new):
812 self.context[name] = new
813 _cluster_id_changed = _profile_dir_changed
814
815 def _profile_dir_default(self):
816 self.context['profile_dir'] = ''
817 return ''
818 def _cluster_id_default(self):
819 self.context['cluster_id'] = ''
820 return ''
821
822
801 class BatchSystemLauncher(BaseLauncher):
823 class BatchSystemLauncher(BaseLauncher):
802 """Launch an external process using a batch system.
824 """Launch an external process using a batch system.
803
825
804 This class is designed to work with UNIX batch systems like PBS, LSF,
826 This class is designed to work with UNIX batch systems like PBS, LSF,
805 GridEngine, etc. The overall model is that there are different commands
827 GridEngine, etc. The overall model is that there are different commands
806 like qsub, qdel, etc. that handle the starting and stopping of the process.
828 like qsub, qdel, etc. that handle the starting and stopping of the process.
807
829
808 This class also has the notion of a batch script. The ``batch_template``
830 This class also has the notion of a batch script. The ``batch_template``
809 attribute can be set to a string that is a template for the batch script.
831 attribute can be set to a string that is a template for the batch script.
810 This template is instantiated using string formatting. Thus the template can
832 This template is instantiated using string formatting. Thus the template can
811 use {n} fot the number of instances. Subclasses can add additional variables
833 use {n} fot the number of instances. Subclasses can add additional variables
812 to the template dict.
834 to the template dict.
813 """
835 """
814
836
815 # Subclasses must fill these in. See PBSEngineSet
837 # Subclasses must fill these in. See PBSEngineSet
816 submit_command = List([''], config=True,
838 submit_command = List([''], config=True,
817 help="The name of the command line program used to submit jobs.")
839 help="The name of the command line program used to submit jobs.")
818 delete_command = List([''], config=True,
840 delete_command = List([''], config=True,
819 help="The name of the command line program used to delete jobs.")
841 help="The name of the command line program used to delete jobs.")
820 job_id_regexp = Unicode('', config=True,
842 job_id_regexp = Unicode('', config=True,
821 help="""A regular expression used to get the job id from the output of the
843 help="""A regular expression used to get the job id from the output of the
822 submit_command.""")
844 submit_command.""")
823 batch_template = Unicode('', config=True,
845 batch_template = Unicode('', config=True,
824 help="The string that is the batch script template itself.")
846 help="The string that is the batch script template itself.")
825 batch_template_file = Unicode(u'', config=True,
847 batch_template_file = Unicode(u'', config=True,
826 help="The file that contains the batch template.")
848 help="The file that contains the batch template.")
827 batch_file_name = Unicode(u'batch_script', config=True,
849 batch_file_name = Unicode(u'batch_script', config=True,
828 help="The filename of the instantiated batch script.")
850 help="The filename of the instantiated batch script.")
829 queue = Unicode(u'', config=True,
851 queue = Unicode(u'', config=True,
830 help="The PBS Queue.")
852 help="The PBS Queue.")
831
853
854 def _queue_changed(self, name, old, new):
855 self.context[name] = new
856
857 n = Int(1)
858 _n_changed = _queue_changed
859
832 # not configurable, override in subclasses
860 # not configurable, override in subclasses
833 # PBS Job Array regex
861 # PBS Job Array regex
834 job_array_regexp = Unicode('')
862 job_array_regexp = Unicode('')
835 job_array_template = Unicode('')
863 job_array_template = Unicode('')
836 # PBS Queue regex
864 # PBS Queue regex
837 queue_regexp = Unicode('')
865 queue_regexp = Unicode('')
838 queue_template = Unicode('')
866 queue_template = Unicode('')
839 # The default batch template, override in subclasses
867 # The default batch template, override in subclasses
840 default_template = Unicode('')
868 default_template = Unicode('')
841 # The full path to the instantiated batch script.
869 # The full path to the instantiated batch script.
842 batch_file = Unicode(u'')
870 batch_file = Unicode(u'')
843 # the format dict used with batch_template:
871 # the format dict used with batch_template:
844 context = Dict()
872 context = Dict()
845 # the Formatter instance for rendering the templates:
873 # the Formatter instance for rendering the templates:
846 formatter = Instance(EvalFormatter, (), {})
874 formatter = Instance(EvalFormatter, (), {})
847
875
848
876
849 def find_args(self):
877 def find_args(self):
850 return self.submit_command + [self.batch_file]
878 return self.submit_command + [self.batch_file]
851
879
852 def __init__(self, work_dir=u'.', config=None, **kwargs):
880 def __init__(self, work_dir=u'.', config=None, **kwargs):
853 super(BatchSystemLauncher, self).__init__(
881 super(BatchSystemLauncher, self).__init__(
854 work_dir=work_dir, config=config, **kwargs
882 work_dir=work_dir, config=config, **kwargs
855 )
883 )
856 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
884 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
857
885
858 def parse_job_id(self, output):
886 def parse_job_id(self, output):
859 """Take the output of the submit command and return the job id."""
887 """Take the output of the submit command and return the job id."""
860 m = re.search(self.job_id_regexp, output)
888 m = re.search(self.job_id_regexp, output)
861 if m is not None:
889 if m is not None:
862 job_id = m.group()
890 job_id = m.group()
863 else:
891 else:
864 raise LauncherError("Job id couldn't be determined: %s" % output)
892 raise LauncherError("Job id couldn't be determined: %s" % output)
865 self.job_id = job_id
893 self.job_id = job_id
866 self.log.info('Job submitted with job id: %r' % job_id)
894 self.log.info('Job submitted with job id: %r' % job_id)
867 return job_id
895 return job_id
868
896
869 def write_batch_script(self, n):
897 def write_batch_script(self, n):
870 """Instantiate and write the batch script to the work_dir."""
898 """Instantiate and write the batch script to the work_dir."""
871 self.context['n'] = n
899 self.n = n
872 self.context['queue'] = self.queue
873 # first priority is batch_template if set
900 # first priority is batch_template if set
874 if self.batch_template_file and not self.batch_template:
901 if self.batch_template_file and not self.batch_template:
875 # second priority is batch_template_file
902 # second priority is batch_template_file
876 with open(self.batch_template_file) as f:
903 with open(self.batch_template_file) as f:
877 self.batch_template = f.read()
904 self.batch_template = f.read()
878 if not self.batch_template:
905 if not self.batch_template:
879 # third (last) priority is default_template
906 # third (last) priority is default_template
880 self.batch_template = self.default_template
907 self.batch_template = self.default_template
881
908
882 # add jobarray or queue lines to user-specified template
909 # add jobarray or queue lines to user-specified template
883 # note that this is *only* when user did not specify a template.
910 # note that this is *only* when user did not specify a template.
884 regex = re.compile(self.job_array_regexp)
911 regex = re.compile(self.job_array_regexp)
885 # print regex.search(self.batch_template)
912 # print regex.search(self.batch_template)
886 if not regex.search(self.batch_template):
913 if not regex.search(self.batch_template):
887 self.log.info("adding job array settings to batch script")
914 self.log.info("adding job array settings to batch script")
888 firstline, rest = self.batch_template.split('\n',1)
915 firstline, rest = self.batch_template.split('\n',1)
889 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
916 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
890
917
891 regex = re.compile(self.queue_regexp)
918 regex = re.compile(self.queue_regexp)
892 # print regex.search(self.batch_template)
919 # print regex.search(self.batch_template)
893 if self.queue and not regex.search(self.batch_template):
920 if self.queue and not regex.search(self.batch_template):
894 self.log.info("adding PBS queue settings to batch script")
921 self.log.info("adding PBS queue settings to batch script")
895 firstline, rest = self.batch_template.split('\n',1)
922 firstline, rest = self.batch_template.split('\n',1)
896 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
923 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
897
924
898 script_as_string = self.formatter.format(self.batch_template, **self.context)
925 script_as_string = self.formatter.format(self.batch_template, **self.context)
899 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
926 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
900
927
901 with open(self.batch_file, 'w') as f:
928 with open(self.batch_file, 'w') as f:
902 f.write(script_as_string)
929 f.write(script_as_string)
903 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
930 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
904
931
905 def start(self, n, profile_dir):
932 def start(self, n):
906 """Start n copies of the process using a batch system."""
933 """Start n copies of the process using a batch system."""
907 # Here we save profile_dir in the context so they
934 # Here we save profile_dir in the context so they
908 # can be used in the batch script template as {profile_dir}
935 # can be used in the batch script template as {profile_dir}
909 self.context['profile_dir'] = profile_dir
910 self.profile_dir = unicode(profile_dir)
911 self.write_batch_script(n)
936 self.write_batch_script(n)
912 output = check_output(self.args, env=os.environ)
937 output = check_output(self.args, env=os.environ)
913
938
914 job_id = self.parse_job_id(output)
939 job_id = self.parse_job_id(output)
915 self.notify_start(job_id)
940 self.notify_start(job_id)
916 return job_id
941 return job_id
917
942
918 def stop(self):
943 def stop(self):
919 output = check_output(self.delete_command+[self.job_id], env=os.environ)
944 output = check_output(self.delete_command+[self.job_id], env=os.environ)
920 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
945 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
921 return output
946 return output
922
947
923
948
924 class PBSLauncher(BatchSystemLauncher):
949 class PBSLauncher(BatchSystemLauncher):
925 """A BatchSystemLauncher subclass for PBS."""
950 """A BatchSystemLauncher subclass for PBS."""
926
951
927 submit_command = List(['qsub'], config=True,
952 submit_command = List(['qsub'], config=True,
928 help="The PBS submit command ['qsub']")
953 help="The PBS submit command ['qsub']")
929 delete_command = List(['qdel'], config=True,
954 delete_command = List(['qdel'], config=True,
930 help="The PBS delete command ['qsub']")
955 help="The PBS delete command ['qsub']")
931 job_id_regexp = Unicode(r'\d+', config=True,
956 job_id_regexp = Unicode(r'\d+', config=True,
932 help="Regular expresion for identifying the job ID [r'\d+']")
957 help="Regular expresion for identifying the job ID [r'\d+']")
933
958
934 batch_file = Unicode(u'')
959 batch_file = Unicode(u'')
935 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
960 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
936 job_array_template = Unicode('#PBS -t 1-{n}')
961 job_array_template = Unicode('#PBS -t 1-{n}')
937 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
962 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
938 queue_template = Unicode('#PBS -q {queue}')
963 queue_template = Unicode('#PBS -q {queue}')
939
964
940
965
941 class PBSControllerLauncher(PBSLauncher):
966 class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):
942 """Launch a controller using PBS."""
967 """Launch a controller using PBS."""
943
968
944 batch_file_name = Unicode(u'pbs_controller', config=True,
969 batch_file_name = Unicode(u'pbs_controller', config=True,
945 help="batch file name for the controller job.")
970 help="batch file name for the controller job.")
946 default_template= Unicode("""#!/bin/sh
971 default_template= Unicode("""#!/bin/sh
947 #PBS -V
972 #PBS -V
948 #PBS -N ipcontroller
973 #PBS -N ipcontroller
949 %s --log-to-file --profile-dir={profile_dir}
974 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
950 """%(' '.join(ipcontroller_cmd_argv)))
975 """%(' '.join(ipcontroller_cmd_argv)))
951
976
952 def start(self, profile_dir):
977
978 def start(self):
953 """Start the controller by profile or profile_dir."""
979 """Start the controller by profile or profile_dir."""
954 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
980 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
955 return super(PBSControllerLauncher, self).start(1, profile_dir)
981 return super(PBSControllerLauncher, self).start(1)
956
982
957
983
958 class PBSEngineSetLauncher(PBSLauncher):
984 class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):
959 """Launch Engines using PBS"""
985 """Launch Engines using PBS"""
960 batch_file_name = Unicode(u'pbs_engines', config=True,
986 batch_file_name = Unicode(u'pbs_engines', config=True,
961 help="batch file name for the engine(s) job.")
987 help="batch file name for the engine(s) job.")
962 default_template= Unicode(u"""#!/bin/sh
988 default_template= Unicode(u"""#!/bin/sh
963 #PBS -V
989 #PBS -V
964 #PBS -N ipengine
990 #PBS -N ipengine
965 %s --profile-dir={profile_dir}
991 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
966 """%(' '.join(ipengine_cmd_argv)))
992 """%(' '.join(ipengine_cmd_argv)))
967
993
968 def start(self, n, profile_dir):
994 def start(self, n):
969 """Start n engines by profile or profile_dir."""
995 """Start n engines by profile or profile_dir."""
970 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
996 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
971 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
997 return super(PBSEngineSetLauncher, self).start(n)
972
998
973 #SGE is very similar to PBS
999 #SGE is very similar to PBS
974
1000
975 class SGELauncher(PBSLauncher):
1001 class SGELauncher(PBSLauncher):
976 """Sun GridEngine is a PBS clone with slightly different syntax"""
1002 """Sun GridEngine is a PBS clone with slightly different syntax"""
977 job_array_regexp = Unicode('#\$\W+\-t')
1003 job_array_regexp = Unicode('#\$\W+\-t')
978 job_array_template = Unicode('#$ -t 1-{n}')
1004 job_array_template = Unicode('#$ -t 1-{n}')
979 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
1005 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
980 queue_template = Unicode('#$ -q {queue}')
1006 queue_template = Unicode('#$ -q {queue}')
981
1007
982 class SGEControllerLauncher(SGELauncher):
1008 class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):
983 """Launch a controller using SGE."""
1009 """Launch a controller using SGE."""
984
1010
985 batch_file_name = Unicode(u'sge_controller', config=True,
1011 batch_file_name = Unicode(u'sge_controller', config=True,
986 help="batch file name for the ipontroller job.")
1012 help="batch file name for the ipontroller job.")
987 default_template= Unicode(u"""#$ -V
1013 default_template= Unicode(u"""#$ -V
988 #$ -S /bin/sh
1014 #$ -S /bin/sh
989 #$ -N ipcontroller
1015 #$ -N ipcontroller
990 %s --log-to-file --profile-dir={profile_dir}
1016 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
991 """%(' '.join(ipcontroller_cmd_argv)))
1017 """%(' '.join(ipcontroller_cmd_argv)))
992
1018
993 def start(self, profile_dir):
1019 def start(self):
994 """Start the controller by profile or profile_dir."""
1020 """Start the controller by profile or profile_dir."""
995 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
1021 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
996 return super(SGEControllerLauncher, self).start(1, profile_dir)
1022 return super(SGEControllerLauncher, self).start(1)
997
1023
998 class SGEEngineSetLauncher(SGELauncher):
1024 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
999 """Launch Engines with SGE"""
1025 """Launch Engines with SGE"""
1000 batch_file_name = Unicode(u'sge_engines', config=True,
1026 batch_file_name = Unicode(u'sge_engines', config=True,
1001 help="batch file name for the engine(s) job.")
1027 help="batch file name for the engine(s) job.")
1002 default_template = Unicode("""#$ -V
1028 default_template = Unicode("""#$ -V
1003 #$ -S /bin/sh
1029 #$ -S /bin/sh
1004 #$ -N ipengine
1030 #$ -N ipengine
1005 %s --profile-dir={profile_dir}
1031 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1006 """%(' '.join(ipengine_cmd_argv)))
1032 """%(' '.join(ipengine_cmd_argv)))
1007
1033
1008 def start(self, n, profile_dir):
1034 def start(self, n):
1009 """Start n engines by profile or profile_dir."""
1035 """Start n engines by profile or profile_dir."""
1010 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1036 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1011 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1037 return super(SGEEngineSetLauncher, self).start(n)
1012
1038
1013
1039
1014 # LSF launchers
1040 # LSF launchers
1015
1041
1016 class LSFLauncher(BatchSystemLauncher):
1042 class LSFLauncher(BatchSystemLauncher):
1017 """A BatchSystemLauncher subclass for LSF."""
1043 """A BatchSystemLauncher subclass for LSF."""
1018
1044
1019 submit_command = List(['bsub'], config=True,
1045 submit_command = List(['bsub'], config=True,
1020 help="The PBS submit command ['bsub']")
1046 help="The PBS submit command ['bsub']")
1021 delete_command = List(['bkill'], config=True,
1047 delete_command = List(['bkill'], config=True,
1022 help="The PBS delete command ['bkill']")
1048 help="The PBS delete command ['bkill']")
1023 job_id_regexp = Unicode(r'\d+', config=True,
1049 job_id_regexp = Unicode(r'\d+', config=True,
1024 help="Regular expresion for identifying the job ID [r'\d+']")
1050 help="Regular expresion for identifying the job ID [r'\d+']")
1025
1051
1026 batch_file = Unicode(u'')
1052 batch_file = Unicode(u'')
1027 job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1053 job_array_regexp = Unicode('#BSUB[ \t]-J+\w+\[\d+-\d+\]')
1028 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1054 job_array_template = Unicode('#BSUB -J ipengine[1-{n}]')
1029 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1055 queue_regexp = Unicode('#BSUB[ \t]+-q[ \t]+\w+')
1030 queue_template = Unicode('#BSUB -q {queue}')
1056 queue_template = Unicode('#BSUB -q {queue}')
1031
1057
1032 def start(self, n, profile_dir):
1058 def start(self, n):
1033 """Start n copies of the process using LSF batch system.
1059 """Start n copies of the process using LSF batch system.
1034 This cant inherit from the base class because bsub expects
1060 This cant inherit from the base class because bsub expects
1035 to be piped a shell script in order to honor the #BSUB directives :
1061 to be piped a shell script in order to honor the #BSUB directives :
1036 bsub < script
1062 bsub < script
1037 """
1063 """
1038 # Here we save profile_dir in the context so they
1064 # Here we save profile_dir in the context so they
1039 # can be used in the batch script template as {profile_dir}
1065 # can be used in the batch script template as {profile_dir}
1040 self.context['profile_dir'] = profile_dir
1041 self.profile_dir = unicode(profile_dir)
1042 self.write_batch_script(n)
1066 self.write_batch_script(n)
1043 #output = check_output(self.args, env=os.environ)
1067 #output = check_output(self.args, env=os.environ)
1044 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1068 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1045 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1069 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1046 output,err = p.communicate()
1070 output,err = p.communicate()
1047 job_id = self.parse_job_id(output)
1071 job_id = self.parse_job_id(output)
1048 self.notify_start(job_id)
1072 self.notify_start(job_id)
1049 return job_id
1073 return job_id
1050
1074
1051
1075
1052 class LSFControllerLauncher(LSFLauncher):
1076 class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):
1053 """Launch a controller using LSF."""
1077 """Launch a controller using LSF."""
1054
1078
1055 batch_file_name = Unicode(u'lsf_controller', config=True,
1079 batch_file_name = Unicode(u'lsf_controller', config=True,
1056 help="batch file name for the controller job.")
1080 help="batch file name for the controller job.")
1057 default_template= Unicode("""#!/bin/sh
1081 default_template= Unicode("""#!/bin/sh
1058 #BSUB -J ipcontroller
1082 #BSUB -J ipcontroller
1059 #BSUB -oo ipcontroller.o.%%J
1083 #BSUB -oo ipcontroller.o.%%J
1060 #BSUB -eo ipcontroller.e.%%J
1084 #BSUB -eo ipcontroller.e.%%J
1061 %s --log-to-file --profile-dir={profile_dir}
1085 %s --log-to-file --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1062 """%(' '.join(ipcontroller_cmd_argv)))
1086 """%(' '.join(ipcontroller_cmd_argv)))
1063
1087
1064 def start(self, profile_dir):
1088 def start(self):
1065 """Start the controller by profile or profile_dir."""
1089 """Start the controller by profile or profile_dir."""
1066 self.log.info("Starting LSFControllerLauncher: %r" % self.args)
1090 self.log.info("Starting LSFControllerLauncher: %r" % self.args)
1067 return super(LSFControllerLauncher, self).start(1, profile_dir)
1091 return super(LSFControllerLauncher, self).start(1)
1068
1092
1069
1093
1070 class LSFEngineSetLauncher(LSFLauncher):
1094 class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):
1071 """Launch Engines using LSF"""
1095 """Launch Engines using LSF"""
1072 batch_file_name = Unicode(u'lsf_engines', config=True,
1096 batch_file_name = Unicode(u'lsf_engines', config=True,
1073 help="batch file name for the engine(s) job.")
1097 help="batch file name for the engine(s) job.")
1074 default_template= Unicode(u"""#!/bin/sh
1098 default_template= Unicode(u"""#!/bin/sh
1075 #BSUB -oo ipengine.o.%%J
1099 #BSUB -oo ipengine.o.%%J
1076 #BSUB -eo ipengine.e.%%J
1100 #BSUB -eo ipengine.e.%%J
1077 %s --profile-dir={profile_dir}
1101 %s --profile-dir="{profile_dir}" --cluster-id="{cluster_id}"
1078 """%(' '.join(ipengine_cmd_argv)))
1102 """%(' '.join(ipengine_cmd_argv)))
1079
1103
1080 def start(self, n, profile_dir):
1104 def start(self, n):
1081 """Start n engines by profile or profile_dir."""
1105 """Start n engines by profile or profile_dir."""
1082 self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args))
1106 self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args))
1083 return super(LSFEngineSetLauncher, self).start(n, profile_dir)
1107 return super(LSFEngineSetLauncher, self).start(n)
1084
1108
1085
1109
1086 #-----------------------------------------------------------------------------
1110 #-----------------------------------------------------------------------------
1087 # A launcher for ipcluster itself!
1111 # A launcher for ipcluster itself!
1088 #-----------------------------------------------------------------------------
1112 #-----------------------------------------------------------------------------
1089
1113
1090
1114
1091 class IPClusterLauncher(LocalProcessLauncher):
1115 class IPClusterLauncher(LocalProcessLauncher):
1092 """Launch the ipcluster program in an external process."""
1116 """Launch the ipcluster program in an external process."""
1093
1117
1094 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1118 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1095 help="Popen command for ipcluster")
1119 help="Popen command for ipcluster")
1096 ipcluster_args = List(
1120 ipcluster_args = List(
1097 ['--clean-logs', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1121 ['--clean-logs', '--log-to-file', '--log-level=%i'%logging.INFO], config=True,
1098 help="Command line arguments to pass to ipcluster.")
1122 help="Command line arguments to pass to ipcluster.")
1099 ipcluster_subcommand = Unicode('start')
1123 ipcluster_subcommand = Unicode('start')
1100 ipcluster_n = Int(2)
1124 ipcluster_n = Int(2)
1101
1125
1102 def find_args(self):
1126 def find_args(self):
1103 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1127 return self.ipcluster_cmd + [self.ipcluster_subcommand] + \
1104 ['--n=%i'%self.ipcluster_n] + self.ipcluster_args
1128 ['--n=%i'%self.ipcluster_n] + self.ipcluster_args
1105
1129
1106 def start(self):
1130 def start(self):
1107 self.log.info("Starting ipcluster: %r" % self.args)
1131 self.log.info("Starting ipcluster: %r" % self.args)
1108 return super(IPClusterLauncher, self).start()
1132 return super(IPClusterLauncher, self).start()
1109
1133
1110 #-----------------------------------------------------------------------------
1134 #-----------------------------------------------------------------------------
1111 # Collections of launchers
1135 # Collections of launchers
1112 #-----------------------------------------------------------------------------
1136 #-----------------------------------------------------------------------------
1113
1137
1114 local_launchers = [
1138 local_launchers = [
1115 LocalControllerLauncher,
1139 LocalControllerLauncher,
1116 LocalEngineLauncher,
1140 LocalEngineLauncher,
1117 LocalEngineSetLauncher,
1141 LocalEngineSetLauncher,
1118 ]
1142 ]
1119 mpi_launchers = [
1143 mpi_launchers = [
1120 MPIExecLauncher,
1144 MPIExecLauncher,
1121 MPIExecControllerLauncher,
1145 MPIExecControllerLauncher,
1122 MPIExecEngineSetLauncher,
1146 MPIExecEngineSetLauncher,
1123 ]
1147 ]
1124 ssh_launchers = [
1148 ssh_launchers = [
1125 SSHLauncher,
1149 SSHLauncher,
1126 SSHControllerLauncher,
1150 SSHControllerLauncher,
1127 SSHEngineLauncher,
1151 SSHEngineLauncher,
1128 SSHEngineSetLauncher,
1152 SSHEngineSetLauncher,
1129 ]
1153 ]
1130 winhpc_launchers = [
1154 winhpc_launchers = [
1131 WindowsHPCLauncher,
1155 WindowsHPCLauncher,
1132 WindowsHPCControllerLauncher,
1156 WindowsHPCControllerLauncher,
1133 WindowsHPCEngineSetLauncher,
1157 WindowsHPCEngineSetLauncher,
1134 ]
1158 ]
1135 pbs_launchers = [
1159 pbs_launchers = [
1136 PBSLauncher,
1160 PBSLauncher,
1137 PBSControllerLauncher,
1161 PBSControllerLauncher,
1138 PBSEngineSetLauncher,
1162 PBSEngineSetLauncher,
1139 ]
1163 ]
1140 sge_launchers = [
1164 sge_launchers = [
1141 SGELauncher,
1165 SGELauncher,
1142 SGEControllerLauncher,
1166 SGEControllerLauncher,
1143 SGEEngineSetLauncher,
1167 SGEEngineSetLauncher,
1144 ]
1168 ]
1145 lsf_launchers = [
1169 lsf_launchers = [
1146 LSFLauncher,
1170 LSFLauncher,
1147 LSFControllerLauncher,
1171 LSFControllerLauncher,
1148 LSFEngineSetLauncher,
1172 LSFEngineSetLauncher,
1149 ]
1173 ]
1150 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1174 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1151 + pbs_launchers + sge_launchers + lsf_launchers
1175 + pbs_launchers + sge_launchers + lsf_launchers
1152
1176
General Comments 0
You need to be logged in to leave comments. Login now