##// END OF EJS Templates
add LoggingConfigurable base class
MinRK -
Show More
@@ -1,263 +1,278 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 A base class for objects that are configurable.
4 A base class for objects that are configurable.
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * Fernando Perez
9 * Fernando Perez
10 """
10 """
11
11
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Copyright (C) 2008-2010 The IPython Development Team
13 # Copyright (C) 2008-2010 The IPython Development Team
14 #
14 #
15 # Distributed under the terms of the BSD License. The full license is in
15 # Distributed under the terms of the BSD License. The full license is in
16 # the file COPYING, distributed as part of this software.
16 # the file COPYING, distributed as part of this software.
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 #-----------------------------------------------------------------------------
19 #-----------------------------------------------------------------------------
20 # Imports
20 # Imports
21 #-----------------------------------------------------------------------------
21 #-----------------------------------------------------------------------------
22
22
23 from copy import deepcopy
23 from copy import deepcopy
24 import datetime
24 import datetime
25
25
26 from loader import Config
26 from loader import Config
27 from IPython.utils.traitlets import HasTraits, Instance
27 from IPython.utils.traitlets import HasTraits, Instance
28 from IPython.utils.text import indent
28 from IPython.utils.text import indent
29
29
30
30
31 #-----------------------------------------------------------------------------
31 #-----------------------------------------------------------------------------
32 # Helper classes for Configurables
32 # Helper classes for Configurables
33 #-----------------------------------------------------------------------------
33 #-----------------------------------------------------------------------------
34
34
35
35
36 class ConfigurableError(Exception):
36 class ConfigurableError(Exception):
37 pass
37 pass
38
38
39
39
40 class MultipleInstanceError(ConfigurableError):
40 class MultipleInstanceError(ConfigurableError):
41 pass
41 pass
42
42
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44 # Configurable implementation
44 # Configurable implementation
45 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
46
46
47 class Configurable(HasTraits):
47 class Configurable(HasTraits):
48
48
49 config = Instance(Config,(),{})
49 config = Instance(Config,(),{})
50 created = None
50 created = None
51
51
52 def __init__(self, **kwargs):
52 def __init__(self, **kwargs):
53 """Create a conigurable given a config config.
53 """Create a conigurable given a config config.
54
54
55 Parameters
55 Parameters
56 ----------
56 ----------
57 config : Config
57 config : Config
58 If this is empty, default values are used. If config is a
58 If this is empty, default values are used. If config is a
59 :class:`Config` instance, it will be used to configure the
59 :class:`Config` instance, it will be used to configure the
60 instance.
60 instance.
61
61
62 Notes
62 Notes
63 -----
63 -----
64 Subclasses of Configurable must call the :meth:`__init__` method of
64 Subclasses of Configurable must call the :meth:`__init__` method of
65 :class:`Configurable` *before* doing anything else and using
65 :class:`Configurable` *before* doing anything else and using
66 :func:`super`::
66 :func:`super`::
67
67
68 class MyConfigurable(Configurable):
68 class MyConfigurable(Configurable):
69 def __init__(self, config=None):
69 def __init__(self, config=None):
70 super(MyConfigurable, self).__init__(config)
70 super(MyConfigurable, self).__init__(config)
71 # Then any other code you need to finish initialization.
71 # Then any other code you need to finish initialization.
72
72
73 This ensures that instances will be configured properly.
73 This ensures that instances will be configured properly.
74 """
74 """
75 config = kwargs.pop('config', None)
75 config = kwargs.pop('config', None)
76 if config is not None:
76 if config is not None:
77 # We used to deepcopy, but for now we are trying to just save
77 # We used to deepcopy, but for now we are trying to just save
78 # by reference. This *could* have side effects as all components
78 # by reference. This *could* have side effects as all components
79 # will share config. In fact, I did find such a side effect in
79 # will share config. In fact, I did find such a side effect in
80 # _config_changed below. If a config attribute value was a mutable type
80 # _config_changed below. If a config attribute value was a mutable type
81 # all instances of a component were getting the same copy, effectively
81 # all instances of a component were getting the same copy, effectively
82 # making that a class attribute.
82 # making that a class attribute.
83 # self.config = deepcopy(config)
83 # self.config = deepcopy(config)
84 self.config = config
84 self.config = config
85 # This should go second so individual keyword arguments override
85 # This should go second so individual keyword arguments override
86 # the values in config.
86 # the values in config.
87 super(Configurable, self).__init__(**kwargs)
87 super(Configurable, self).__init__(**kwargs)
88 self.created = datetime.datetime.now()
88 self.created = datetime.datetime.now()
89
89
90 #-------------------------------------------------------------------------
90 #-------------------------------------------------------------------------
91 # Static trait notifiations
91 # Static trait notifiations
92 #-------------------------------------------------------------------------
92 #-------------------------------------------------------------------------
93
93
94 def _config_changed(self, name, old, new):
94 def _config_changed(self, name, old, new):
95 """Update all the class traits having ``config=True`` as metadata.
95 """Update all the class traits having ``config=True`` as metadata.
96
96
97 For any class trait with a ``config`` metadata attribute that is
97 For any class trait with a ``config`` metadata attribute that is
98 ``True``, we update the trait with the value of the corresponding
98 ``True``, we update the trait with the value of the corresponding
99 config entry.
99 config entry.
100 """
100 """
101 # Get all traits with a config metadata entry that is True
101 # Get all traits with a config metadata entry that is True
102 traits = self.traits(config=True)
102 traits = self.traits(config=True)
103
103
104 # We auto-load config section for this class as well as any parent
104 # We auto-load config section for this class as well as any parent
105 # classes that are Configurable subclasses. This starts with Configurable
105 # classes that are Configurable subclasses. This starts with Configurable
106 # and works down the mro loading the config for each section.
106 # and works down the mro loading the config for each section.
107 section_names = [cls.__name__ for cls in \
107 section_names = [cls.__name__ for cls in \
108 reversed(self.__class__.__mro__) if
108 reversed(self.__class__.__mro__) if
109 issubclass(cls, Configurable) and issubclass(self.__class__, cls)]
109 issubclass(cls, Configurable) and issubclass(self.__class__, cls)]
110
110
111 for sname in section_names:
111 for sname in section_names:
112 # Don't do a blind getattr as that would cause the config to
112 # Don't do a blind getattr as that would cause the config to
113 # dynamically create the section with name self.__class__.__name__.
113 # dynamically create the section with name self.__class__.__name__.
114 if new._has_section(sname):
114 if new._has_section(sname):
115 my_config = new[sname]
115 my_config = new[sname]
116 for k, v in traits.iteritems():
116 for k, v in traits.iteritems():
117 # Don't allow traitlets with config=True to start with
117 # Don't allow traitlets with config=True to start with
118 # uppercase. Otherwise, they are confused with Config
118 # uppercase. Otherwise, they are confused with Config
119 # subsections. But, developers shouldn't have uppercase
119 # subsections. But, developers shouldn't have uppercase
120 # attributes anyways! (PEP 6)
120 # attributes anyways! (PEP 6)
121 if k[0].upper()==k[0] and not k.startswith('_'):
121 if k[0].upper()==k[0] and not k.startswith('_'):
122 raise ConfigurableError('Configurable traitlets with '
122 raise ConfigurableError('Configurable traitlets with '
123 'config=True must start with a lowercase so they are '
123 'config=True must start with a lowercase so they are '
124 'not confused with Config subsections: %s.%s' % \
124 'not confused with Config subsections: %s.%s' % \
125 (self.__class__.__name__, k))
125 (self.__class__.__name__, k))
126 try:
126 try:
127 # Here we grab the value from the config
127 # Here we grab the value from the config
128 # If k has the naming convention of a config
128 # If k has the naming convention of a config
129 # section, it will be auto created.
129 # section, it will be auto created.
130 config_value = my_config[k]
130 config_value = my_config[k]
131 except KeyError:
131 except KeyError:
132 pass
132 pass
133 else:
133 else:
134 # print "Setting %s.%s from %s.%s=%r" % \
134 # print "Setting %s.%s from %s.%s=%r" % \
135 # (self.__class__.__name__,k,sname,k,config_value)
135 # (self.__class__.__name__,k,sname,k,config_value)
136 # We have to do a deepcopy here if we don't deepcopy the entire
136 # We have to do a deepcopy here if we don't deepcopy the entire
137 # config object. If we don't, a mutable config_value will be
137 # config object. If we don't, a mutable config_value will be
138 # shared by all instances, effectively making it a class attribute.
138 # shared by all instances, effectively making it a class attribute.
139 setattr(self, k, deepcopy(config_value))
139 setattr(self, k, deepcopy(config_value))
140
140
141 @classmethod
141 @classmethod
142 def class_get_help(cls):
142 def class_get_help(cls):
143 """Get the help string for this class in ReST format."""
143 """Get the help string for this class in ReST format."""
144 cls_traits = cls.class_traits(config=True)
144 cls_traits = cls.class_traits(config=True)
145 final_help = []
145 final_help = []
146 final_help.append(u'%s options' % cls.__name__)
146 final_help.append(u'%s options' % cls.__name__)
147 final_help.append(len(final_help[0])*u'-')
147 final_help.append(len(final_help[0])*u'-')
148 for k,v in cls.class_traits(config=True).iteritems():
148 for k,v in cls.class_traits(config=True).iteritems():
149 help = cls.class_get_trait_help(v)
149 help = cls.class_get_trait_help(v)
150 final_help.append(help)
150 final_help.append(help)
151 return '\n'.join(final_help)
151 return '\n'.join(final_help)
152
152
153 @classmethod
153 @classmethod
154 def class_get_trait_help(cls, trait):
154 def class_get_trait_help(cls, trait):
155 """Get the help string for a single """
155 """Get the help string for a single """
156 lines = []
156 lines = []
157 header = "%s.%s : %s" % (cls.__name__, trait.name, trait.__class__.__name__)
157 header = "%s.%s : %s" % (cls.__name__, trait.name, trait.__class__.__name__)
158 try:
158 try:
159 dvr = repr(trait.get_default_value())
159 dvr = repr(trait.get_default_value())
160 except Exception:
160 except Exception:
161 dvr = None # ignore defaults we can't construct
161 dvr = None # ignore defaults we can't construct
162 if dvr is not None:
162 if dvr is not None:
163 header += ' [default: %s]'%dvr
163 header += ' [default: %s]'%dvr
164 lines.append(header)
164 lines.append(header)
165
165
166 help = trait.get_metadata('help')
166 help = trait.get_metadata('help')
167 if help is not None:
167 if help is not None:
168 lines.append(indent(help.strip(), flatten=True))
168 lines.append(indent(help.strip(), flatten=True))
169 if 'Enum' in trait.__class__.__name__:
169 if 'Enum' in trait.__class__.__name__:
170 # include Enum choices
170 # include Enum choices
171 lines.append(indent('Choices: %r'%(trait.values,), flatten=True))
171 lines.append(indent('Choices: %r'%(trait.values,), flatten=True))
172 return '\n'.join(lines)
172 return '\n'.join(lines)
173
173
174 @classmethod
174 @classmethod
175 def class_print_help(cls):
175 def class_print_help(cls):
176 print cls.class_get_help()
176 print cls.class_get_help()
177
177
178
178
179 class SingletonConfigurable(Configurable):
179 class SingletonConfigurable(Configurable):
180 """A configurable that only allows one instance.
180 """A configurable that only allows one instance.
181
181
182 This class is for classes that should only have one instance of itself
182 This class is for classes that should only have one instance of itself
183 or *any* subclass. To create and retrieve such a class use the
183 or *any* subclass. To create and retrieve such a class use the
184 :meth:`SingletonConfigurable.instance` method.
184 :meth:`SingletonConfigurable.instance` method.
185 """
185 """
186
186
187 _instance = None
187 _instance = None
188
188
189 @classmethod
189 @classmethod
190 def _walk_mro(cls):
190 def _walk_mro(cls):
191 """Walk the cls.mro() for parent classes that are also singletons
191 """Walk the cls.mro() for parent classes that are also singletons
192
192
193 For use in instance()
193 For use in instance()
194 """
194 """
195
195
196 for subclass in cls.mro():
196 for subclass in cls.mro():
197 if issubclass(cls, subclass) and \
197 if issubclass(cls, subclass) and \
198 issubclass(subclass, SingletonConfigurable) and \
198 issubclass(subclass, SingletonConfigurable) and \
199 subclass != SingletonConfigurable:
199 subclass != SingletonConfigurable:
200 yield subclass
200 yield subclass
201
201
202 @classmethod
202 @classmethod
203 def clear_instance(cls):
203 def clear_instance(cls):
204 """unset _instance for this class and singleton parents.
204 """unset _instance for this class and singleton parents.
205 """
205 """
206 if not cls.initialized():
206 if not cls.initialized():
207 return
207 return
208 for subclass in cls._walk_mro():
208 for subclass in cls._walk_mro():
209 if isinstance(subclass._instance, cls):
209 if isinstance(subclass._instance, cls):
210 # only clear instances that are instances
210 # only clear instances that are instances
211 # of the calling class
211 # of the calling class
212 subclass._instance = None
212 subclass._instance = None
213
213
214 @classmethod
214 @classmethod
215 def instance(cls, *args, **kwargs):
215 def instance(cls, *args, **kwargs):
216 """Returns a global instance of this class.
216 """Returns a global instance of this class.
217
217
218 This method create a new instance if none have previously been created
218 This method create a new instance if none have previously been created
219 and returns a previously created instance is one already exists.
219 and returns a previously created instance is one already exists.
220
220
221 The arguments and keyword arguments passed to this method are passed
221 The arguments and keyword arguments passed to this method are passed
222 on to the :meth:`__init__` method of the class upon instantiation.
222 on to the :meth:`__init__` method of the class upon instantiation.
223
223
224 Examples
224 Examples
225 --------
225 --------
226
226
227 Create a singleton class using instance, and retrieve it::
227 Create a singleton class using instance, and retrieve it::
228
228
229 >>> from IPython.config.configurable import SingletonConfigurable
229 >>> from IPython.config.configurable import SingletonConfigurable
230 >>> class Foo(SingletonConfigurable): pass
230 >>> class Foo(SingletonConfigurable): pass
231 >>> foo = Foo.instance()
231 >>> foo = Foo.instance()
232 >>> foo == Foo.instance()
232 >>> foo == Foo.instance()
233 True
233 True
234
234
235 Create a subclass that is retrived using the base class instance::
235 Create a subclass that is retrived using the base class instance::
236
236
237 >>> class Bar(SingletonConfigurable): pass
237 >>> class Bar(SingletonConfigurable): pass
238 >>> class Bam(Bar): pass
238 >>> class Bam(Bar): pass
239 >>> bam = Bam.instance()
239 >>> bam = Bam.instance()
240 >>> bam == Bar.instance()
240 >>> bam == Bar.instance()
241 True
241 True
242 """
242 """
243 # Create and save the instance
243 # Create and save the instance
244 if cls._instance is None:
244 if cls._instance is None:
245 inst = cls(*args, **kwargs)
245 inst = cls(*args, **kwargs)
246 # Now make sure that the instance will also be returned by
246 # Now make sure that the instance will also be returned by
247 # parent classes' _instance attribute.
247 # parent classes' _instance attribute.
248 for subclass in cls._walk_mro():
248 for subclass in cls._walk_mro():
249 subclass._instance = inst
249 subclass._instance = inst
250
250
251 if isinstance(cls._instance, cls):
251 if isinstance(cls._instance, cls):
252 return cls._instance
252 return cls._instance
253 else:
253 else:
254 raise MultipleInstanceError(
254 raise MultipleInstanceError(
255 'Multiple incompatible subclass instances of '
255 'Multiple incompatible subclass instances of '
256 '%s are being created.' % cls.__name__
256 '%s are being created.' % cls.__name__
257 )
257 )
258
258
259 @classmethod
259 @classmethod
260 def initialized(cls):
260 def initialized(cls):
261 """Has an instance been created?"""
261 """Has an instance been created?"""
262 return hasattr(cls, "_instance") and cls._instance is not None
262 return hasattr(cls, "_instance") and cls._instance is not None
263
263
264
265 class LoggingConfigurable(Configurable):
266 """A parent class for Configurables that log.
267
268 Subclasses have a log trait, and the default behavior
269 is to get the logger from the currently running Application
270 via Application.instance().log.
271 """
272
273 log = Instance('logging.Logger')
274 def _log_default(self):
275 from IPython.config.application import Application
276 return Application.instance().log
277
278 No newline at end of file
@@ -1,1072 +1,1069 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching IPython processes asynchronously.
4 Facilities for launching IPython processes asynchronously.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import copy
18 import copy
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import stat
22 import stat
23
23
24 # signal imports, handling various platforms, versions
24 # signal imports, handling various platforms, versions
25
25
26 from signal import SIGINT, SIGTERM
26 from signal import SIGINT, SIGTERM
27 try:
27 try:
28 from signal import SIGKILL
28 from signal import SIGKILL
29 except ImportError:
29 except ImportError:
30 # Windows
30 # Windows
31 SIGKILL=SIGTERM
31 SIGKILL=SIGTERM
32
32
33 try:
33 try:
34 # Windows >= 2.7, 3.2
34 # Windows >= 2.7, 3.2
35 from signal import CTRL_C_EVENT as SIGINT
35 from signal import CTRL_C_EVENT as SIGINT
36 except ImportError:
36 except ImportError:
37 pass
37 pass
38
38
39 from subprocess import Popen, PIPE, STDOUT
39 from subprocess import Popen, PIPE, STDOUT
40 try:
40 try:
41 from subprocess import check_output
41 from subprocess import check_output
42 except ImportError:
42 except ImportError:
43 # pre-2.7, define check_output with Popen
43 # pre-2.7, define check_output with Popen
44 def check_output(*args, **kwargs):
44 def check_output(*args, **kwargs):
45 kwargs.update(dict(stdout=PIPE))
45 kwargs.update(dict(stdout=PIPE))
46 p = Popen(*args, **kwargs)
46 p = Popen(*args, **kwargs)
47 out,err = p.communicate()
47 out,err = p.communicate()
48 return out
48 return out
49
49
50 from zmq.eventloop import ioloop
50 from zmq.eventloop import ioloop
51
51
52 from IPython.config.application import Application
52 from IPython.config.application import Application
53 from IPython.config.configurable import Configurable
53 from IPython.config.configurable import LoggingConfigurable
54 from IPython.utils.text import EvalFormatter
54 from IPython.utils.text import EvalFormatter
55 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
55 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
56 from IPython.utils.path import get_ipython_module_path
56 from IPython.utils.path import get_ipython_module_path
57 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
57 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
58
58
59 from .win32support import forward_read_events
59 from .win32support import forward_read_events
60
60
61 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
61 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
62
62
63 WINDOWS = os.name == 'nt'
63 WINDOWS = os.name == 'nt'
64
64
65 #-----------------------------------------------------------------------------
65 #-----------------------------------------------------------------------------
66 # Paths to the kernel apps
66 # Paths to the kernel apps
67 #-----------------------------------------------------------------------------
67 #-----------------------------------------------------------------------------
68
68
69
69
70 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
70 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
71 'IPython.parallel.apps.ipclusterapp'
71 'IPython.parallel.apps.ipclusterapp'
72 ))
72 ))
73
73
74 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
74 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
75 'IPython.parallel.apps.ipengineapp'
75 'IPython.parallel.apps.ipengineapp'
76 ))
76 ))
77
77
78 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
78 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
79 'IPython.parallel.apps.ipcontrollerapp'
79 'IPython.parallel.apps.ipcontrollerapp'
80 ))
80 ))
81
81
82 #-----------------------------------------------------------------------------
82 #-----------------------------------------------------------------------------
83 # Base launchers and errors
83 # Base launchers and errors
84 #-----------------------------------------------------------------------------
84 #-----------------------------------------------------------------------------
85
85
86
86
87 class LauncherError(Exception):
87 class LauncherError(Exception):
88 pass
88 pass
89
89
90
90
91 class ProcessStateError(LauncherError):
91 class ProcessStateError(LauncherError):
92 pass
92 pass
93
93
94
94
95 class UnknownStatus(LauncherError):
95 class UnknownStatus(LauncherError):
96 pass
96 pass
97
97
98
98
99 class BaseLauncher(Configurable):
99 class BaseLauncher(LoggingConfigurable):
100 """An asbtraction for starting, stopping and signaling a process."""
100 """An asbtraction for starting, stopping and signaling a process."""
101
101
102 # In all of the launchers, the work_dir is where child processes will be
102 # In all of the launchers, the work_dir is where child processes will be
103 # run. This will usually be the profile_dir, but may not be. any work_dir
103 # run. This will usually be the profile_dir, but may not be. any work_dir
104 # passed into the __init__ method will override the config value.
104 # passed into the __init__ method will override the config value.
105 # This should not be used to set the work_dir for the actual engine
105 # This should not be used to set the work_dir for the actual engine
106 # and controller. Instead, use their own config files or the
106 # and controller. Instead, use their own config files or the
107 # controller_args, engine_args attributes of the launchers to add
107 # controller_args, engine_args attributes of the launchers to add
108 # the work_dir option.
108 # the work_dir option.
109 work_dir = Unicode(u'.')
109 work_dir = Unicode(u'.')
110 loop = Instance('zmq.eventloop.ioloop.IOLoop')
110 loop = Instance('zmq.eventloop.ioloop.IOLoop')
111 log = Instance('logging.Logger')
112 def _log_default(self):
113 return Application.instance().log
114
111
115 start_data = Any()
112 start_data = Any()
116 stop_data = Any()
113 stop_data = Any()
117
114
118 def _loop_default(self):
115 def _loop_default(self):
119 return ioloop.IOLoop.instance()
116 return ioloop.IOLoop.instance()
120
117
121 def __init__(self, work_dir=u'.', config=None, **kwargs):
118 def __init__(self, work_dir=u'.', config=None, **kwargs):
122 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
119 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
123 self.state = 'before' # can be before, running, after
120 self.state = 'before' # can be before, running, after
124 self.stop_callbacks = []
121 self.stop_callbacks = []
125 self.start_data = None
122 self.start_data = None
126 self.stop_data = None
123 self.stop_data = None
127
124
128 @property
125 @property
129 def args(self):
126 def args(self):
130 """A list of cmd and args that will be used to start the process.
127 """A list of cmd and args that will be used to start the process.
131
128
132 This is what is passed to :func:`spawnProcess` and the first element
129 This is what is passed to :func:`spawnProcess` and the first element
133 will be the process name.
130 will be the process name.
134 """
131 """
135 return self.find_args()
132 return self.find_args()
136
133
137 def find_args(self):
134 def find_args(self):
138 """The ``.args`` property calls this to find the args list.
135 """The ``.args`` property calls this to find the args list.
139
136
140 Subcommand should implement this to construct the cmd and args.
137 Subcommand should implement this to construct the cmd and args.
141 """
138 """
142 raise NotImplementedError('find_args must be implemented in a subclass')
139 raise NotImplementedError('find_args must be implemented in a subclass')
143
140
144 @property
141 @property
145 def arg_str(self):
142 def arg_str(self):
146 """The string form of the program arguments."""
143 """The string form of the program arguments."""
147 return ' '.join(self.args)
144 return ' '.join(self.args)
148
145
149 @property
146 @property
150 def running(self):
147 def running(self):
151 """Am I running."""
148 """Am I running."""
152 if self.state == 'running':
149 if self.state == 'running':
153 return True
150 return True
154 else:
151 else:
155 return False
152 return False
156
153
157 def start(self):
154 def start(self):
158 """Start the process.
155 """Start the process.
159
156
160 This must return a deferred that fires with information about the
157 This must return a deferred that fires with information about the
161 process starting (like a pid, job id, etc.).
158 process starting (like a pid, job id, etc.).
162 """
159 """
163 raise NotImplementedError('start must be implemented in a subclass')
160 raise NotImplementedError('start must be implemented in a subclass')
164
161
165 def stop(self):
162 def stop(self):
166 """Stop the process and notify observers of stopping.
163 """Stop the process and notify observers of stopping.
167
164
168 This must return a deferred that fires with information about the
165 This must return a deferred that fires with information about the
169 processing stopping, like errors that occur while the process is
166 processing stopping, like errors that occur while the process is
170 attempting to be shut down. This deferred won't fire when the process
167 attempting to be shut down. This deferred won't fire when the process
171 actually stops. To observe the actual process stopping, see
168 actually stops. To observe the actual process stopping, see
172 :func:`observe_stop`.
169 :func:`observe_stop`.
173 """
170 """
174 raise NotImplementedError('stop must be implemented in a subclass')
171 raise NotImplementedError('stop must be implemented in a subclass')
175
172
176 def on_stop(self, f):
173 def on_stop(self, f):
177 """Get a deferred that will fire when the process stops.
174 """Get a deferred that will fire when the process stops.
178
175
179 The deferred will fire with data that contains information about
176 The deferred will fire with data that contains information about
180 the exit status of the process.
177 the exit status of the process.
181 """
178 """
182 if self.state=='after':
179 if self.state=='after':
183 return f(self.stop_data)
180 return f(self.stop_data)
184 else:
181 else:
185 self.stop_callbacks.append(f)
182 self.stop_callbacks.append(f)
186
183
187 def notify_start(self, data):
184 def notify_start(self, data):
188 """Call this to trigger startup actions.
185 """Call this to trigger startup actions.
189
186
190 This logs the process startup and sets the state to 'running'. It is
187 This logs the process startup and sets the state to 'running'. It is
191 a pass-through so it can be used as a callback.
188 a pass-through so it can be used as a callback.
192 """
189 """
193
190
194 self.log.info('Process %r started: %r' % (self.args[0], data))
191 self.log.info('Process %r started: %r' % (self.args[0], data))
195 self.start_data = data
192 self.start_data = data
196 self.state = 'running'
193 self.state = 'running'
197 return data
194 return data
198
195
199 def notify_stop(self, data):
196 def notify_stop(self, data):
200 """Call this to trigger process stop actions.
197 """Call this to trigger process stop actions.
201
198
202 This logs the process stopping and sets the state to 'after'. Call
199 This logs the process stopping and sets the state to 'after'. Call
203 this to trigger all the deferreds from :func:`observe_stop`."""
200 this to trigger all the deferreds from :func:`observe_stop`."""
204
201
205 self.log.info('Process %r stopped: %r' % (self.args[0], data))
202 self.log.info('Process %r stopped: %r' % (self.args[0], data))
206 self.stop_data = data
203 self.stop_data = data
207 self.state = 'after'
204 self.state = 'after'
208 for i in range(len(self.stop_callbacks)):
205 for i in range(len(self.stop_callbacks)):
209 d = self.stop_callbacks.pop()
206 d = self.stop_callbacks.pop()
210 d(data)
207 d(data)
211 return data
208 return data
212
209
213 def signal(self, sig):
210 def signal(self, sig):
214 """Signal the process.
211 """Signal the process.
215
212
216 Return a semi-meaningless deferred after signaling the process.
213 Return a semi-meaningless deferred after signaling the process.
217
214
218 Parameters
215 Parameters
219 ----------
216 ----------
220 sig : str or int
217 sig : str or int
221 'KILL', 'INT', etc., or any signal number
218 'KILL', 'INT', etc., or any signal number
222 """
219 """
223 raise NotImplementedError('signal must be implemented in a subclass')
220 raise NotImplementedError('signal must be implemented in a subclass')
224
221
225
222
226 #-----------------------------------------------------------------------------
223 #-----------------------------------------------------------------------------
227 # Local process launchers
224 # Local process launchers
228 #-----------------------------------------------------------------------------
225 #-----------------------------------------------------------------------------
229
226
230
227
231 class LocalProcessLauncher(BaseLauncher):
228 class LocalProcessLauncher(BaseLauncher):
232 """Start and stop an external process in an asynchronous manner.
229 """Start and stop an external process in an asynchronous manner.
233
230
234 This will launch the external process with a working directory of
231 This will launch the external process with a working directory of
235 ``self.work_dir``.
232 ``self.work_dir``.
236 """
233 """
237
234
238 # This is used to to construct self.args, which is passed to
235 # This is used to to construct self.args, which is passed to
239 # spawnProcess.
236 # spawnProcess.
240 cmd_and_args = List([])
237 cmd_and_args = List([])
241 poll_frequency = Int(100) # in ms
238 poll_frequency = Int(100) # in ms
242
239
243 def __init__(self, work_dir=u'.', config=None, **kwargs):
240 def __init__(self, work_dir=u'.', config=None, **kwargs):
244 super(LocalProcessLauncher, self).__init__(
241 super(LocalProcessLauncher, self).__init__(
245 work_dir=work_dir, config=config, **kwargs
242 work_dir=work_dir, config=config, **kwargs
246 )
243 )
247 self.process = None
244 self.process = None
248 self.start_deferred = None
245 self.start_deferred = None
249 self.poller = None
246 self.poller = None
250
247
251 def find_args(self):
248 def find_args(self):
252 return self.cmd_and_args
249 return self.cmd_and_args
253
250
254 def start(self):
251 def start(self):
255 if self.state == 'before':
252 if self.state == 'before':
256 self.process = Popen(self.args,
253 self.process = Popen(self.args,
257 stdout=PIPE,stderr=PIPE,stdin=PIPE,
254 stdout=PIPE,stderr=PIPE,stdin=PIPE,
258 env=os.environ,
255 env=os.environ,
259 cwd=self.work_dir
256 cwd=self.work_dir
260 )
257 )
261 if WINDOWS:
258 if WINDOWS:
262 self.stdout = forward_read_events(self.process.stdout)
259 self.stdout = forward_read_events(self.process.stdout)
263 self.stderr = forward_read_events(self.process.stderr)
260 self.stderr = forward_read_events(self.process.stderr)
264 else:
261 else:
265 self.stdout = self.process.stdout.fileno()
262 self.stdout = self.process.stdout.fileno()
266 self.stderr = self.process.stderr.fileno()
263 self.stderr = self.process.stderr.fileno()
267 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
264 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
268 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
265 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
269 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
266 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
270 self.poller.start()
267 self.poller.start()
271 self.notify_start(self.process.pid)
268 self.notify_start(self.process.pid)
272 else:
269 else:
273 s = 'The process was already started and has state: %r' % self.state
270 s = 'The process was already started and has state: %r' % self.state
274 raise ProcessStateError(s)
271 raise ProcessStateError(s)
275
272
276 def stop(self):
273 def stop(self):
277 return self.interrupt_then_kill()
274 return self.interrupt_then_kill()
278
275
279 def signal(self, sig):
276 def signal(self, sig):
280 if self.state == 'running':
277 if self.state == 'running':
281 if WINDOWS and sig != SIGINT:
278 if WINDOWS and sig != SIGINT:
282 # use Windows tree-kill for better child cleanup
279 # use Windows tree-kill for better child cleanup
283 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
280 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
284 else:
281 else:
285 self.process.send_signal(sig)
282 self.process.send_signal(sig)
286
283
287 def interrupt_then_kill(self, delay=2.0):
284 def interrupt_then_kill(self, delay=2.0):
288 """Send INT, wait a delay and then send KILL."""
285 """Send INT, wait a delay and then send KILL."""
289 try:
286 try:
290 self.signal(SIGINT)
287 self.signal(SIGINT)
291 except Exception:
288 except Exception:
292 self.log.debug("interrupt failed")
289 self.log.debug("interrupt failed")
293 pass
290 pass
294 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
291 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
295 self.killer.start()
292 self.killer.start()
296
293
297 # callbacks, etc:
294 # callbacks, etc:
298
295
299 def handle_stdout(self, fd, events):
296 def handle_stdout(self, fd, events):
300 if WINDOWS:
297 if WINDOWS:
301 line = self.stdout.recv()
298 line = self.stdout.recv()
302 else:
299 else:
303 line = self.process.stdout.readline()
300 line = self.process.stdout.readline()
304 # a stopped process will be readable but return empty strings
301 # a stopped process will be readable but return empty strings
305 if line:
302 if line:
306 self.log.info(line[:-1])
303 self.log.info(line[:-1])
307 else:
304 else:
308 self.poll()
305 self.poll()
309
306
310 def handle_stderr(self, fd, events):
307 def handle_stderr(self, fd, events):
311 if WINDOWS:
308 if WINDOWS:
312 line = self.stderr.recv()
309 line = self.stderr.recv()
313 else:
310 else:
314 line = self.process.stderr.readline()
311 line = self.process.stderr.readline()
315 # a stopped process will be readable but return empty strings
312 # a stopped process will be readable but return empty strings
316 if line:
313 if line:
317 self.log.error(line[:-1])
314 self.log.error(line[:-1])
318 else:
315 else:
319 self.poll()
316 self.poll()
320
317
321 def poll(self):
318 def poll(self):
322 status = self.process.poll()
319 status = self.process.poll()
323 if status is not None:
320 if status is not None:
324 self.poller.stop()
321 self.poller.stop()
325 self.loop.remove_handler(self.stdout)
322 self.loop.remove_handler(self.stdout)
326 self.loop.remove_handler(self.stderr)
323 self.loop.remove_handler(self.stderr)
327 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
324 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
328 return status
325 return status
329
326
330 class LocalControllerLauncher(LocalProcessLauncher):
327 class LocalControllerLauncher(LocalProcessLauncher):
331 """Launch a controller as a regular external process."""
328 """Launch a controller as a regular external process."""
332
329
333 controller_cmd = List(ipcontroller_cmd_argv, config=True,
330 controller_cmd = List(ipcontroller_cmd_argv, config=True,
334 help="""Popen command to launch ipcontroller.""")
331 help="""Popen command to launch ipcontroller.""")
335 # Command line arguments to ipcontroller.
332 # Command line arguments to ipcontroller.
336 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
333 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
337 help="""command-line args to pass to ipcontroller""")
334 help="""command-line args to pass to ipcontroller""")
338
335
339 def find_args(self):
336 def find_args(self):
340 return self.controller_cmd + self.controller_args
337 return self.controller_cmd + self.controller_args
341
338
342 def start(self, profile_dir):
339 def start(self, profile_dir):
343 """Start the controller by profile_dir."""
340 """Start the controller by profile_dir."""
344 self.controller_args.extend(['profile_dir=%s'%profile_dir])
341 self.controller_args.extend(['profile_dir=%s'%profile_dir])
345 self.profile_dir = unicode(profile_dir)
342 self.profile_dir = unicode(profile_dir)
346 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
343 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
347 return super(LocalControllerLauncher, self).start()
344 return super(LocalControllerLauncher, self).start()
348
345
349
346
350 class LocalEngineLauncher(LocalProcessLauncher):
347 class LocalEngineLauncher(LocalProcessLauncher):
351 """Launch a single engine as a regular externall process."""
348 """Launch a single engine as a regular externall process."""
352
349
353 engine_cmd = List(ipengine_cmd_argv, config=True,
350 engine_cmd = List(ipengine_cmd_argv, config=True,
354 help="""command to launch the Engine.""")
351 help="""command to launch the Engine.""")
355 # Command line arguments for ipengine.
352 # Command line arguments for ipengine.
356 engine_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
353 engine_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
357 help="command-line arguments to pass to ipengine"
354 help="command-line arguments to pass to ipengine"
358 )
355 )
359
356
360 def find_args(self):
357 def find_args(self):
361 return self.engine_cmd + self.engine_args
358 return self.engine_cmd + self.engine_args
362
359
363 def start(self, profile_dir):
360 def start(self, profile_dir):
364 """Start the engine by profile_dir."""
361 """Start the engine by profile_dir."""
365 self.engine_args.extend(['profile_dir=%s'%profile_dir])
362 self.engine_args.extend(['profile_dir=%s'%profile_dir])
366 self.profile_dir = unicode(profile_dir)
363 self.profile_dir = unicode(profile_dir)
367 return super(LocalEngineLauncher, self).start()
364 return super(LocalEngineLauncher, self).start()
368
365
369
366
370 class LocalEngineSetLauncher(BaseLauncher):
367 class LocalEngineSetLauncher(BaseLauncher):
371 """Launch a set of engines as regular external processes."""
368 """Launch a set of engines as regular external processes."""
372
369
373 # Command line arguments for ipengine.
370 # Command line arguments for ipengine.
374 engine_args = List(
371 engine_args = List(
375 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
372 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
376 help="command-line arguments to pass to ipengine"
373 help="command-line arguments to pass to ipengine"
377 )
374 )
378 # launcher class
375 # launcher class
379 launcher_class = LocalEngineLauncher
376 launcher_class = LocalEngineLauncher
380
377
381 launchers = Dict()
378 launchers = Dict()
382 stop_data = Dict()
379 stop_data = Dict()
383
380
384 def __init__(self, work_dir=u'.', config=None, **kwargs):
381 def __init__(self, work_dir=u'.', config=None, **kwargs):
385 super(LocalEngineSetLauncher, self).__init__(
382 super(LocalEngineSetLauncher, self).__init__(
386 work_dir=work_dir, config=config, **kwargs
383 work_dir=work_dir, config=config, **kwargs
387 )
384 )
388 self.stop_data = {}
385 self.stop_data = {}
389
386
390 def start(self, n, profile_dir):
387 def start(self, n, profile_dir):
391 """Start n engines by profile or profile_dir."""
388 """Start n engines by profile or profile_dir."""
392 self.profile_dir = unicode(profile_dir)
389 self.profile_dir = unicode(profile_dir)
393 dlist = []
390 dlist = []
394 for i in range(n):
391 for i in range(n):
395 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
392 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
396 # Copy the engine args over to each engine launcher.
393 # Copy the engine args over to each engine launcher.
397 el.engine_args = copy.deepcopy(self.engine_args)
394 el.engine_args = copy.deepcopy(self.engine_args)
398 el.on_stop(self._notice_engine_stopped)
395 el.on_stop(self._notice_engine_stopped)
399 d = el.start(profile_dir)
396 d = el.start(profile_dir)
400 if i==0:
397 if i==0:
401 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
398 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
402 self.launchers[i] = el
399 self.launchers[i] = el
403 dlist.append(d)
400 dlist.append(d)
404 self.notify_start(dlist)
401 self.notify_start(dlist)
405 # The consumeErrors here could be dangerous
402 # The consumeErrors here could be dangerous
406 # dfinal = gatherBoth(dlist, consumeErrors=True)
403 # dfinal = gatherBoth(dlist, consumeErrors=True)
407 # dfinal.addCallback(self.notify_start)
404 # dfinal.addCallback(self.notify_start)
408 return dlist
405 return dlist
409
406
410 def find_args(self):
407 def find_args(self):
411 return ['engine set']
408 return ['engine set']
412
409
413 def signal(self, sig):
410 def signal(self, sig):
414 dlist = []
411 dlist = []
415 for el in self.launchers.itervalues():
412 for el in self.launchers.itervalues():
416 d = el.signal(sig)
413 d = el.signal(sig)
417 dlist.append(d)
414 dlist.append(d)
418 # dfinal = gatherBoth(dlist, consumeErrors=True)
415 # dfinal = gatherBoth(dlist, consumeErrors=True)
419 return dlist
416 return dlist
420
417
421 def interrupt_then_kill(self, delay=1.0):
418 def interrupt_then_kill(self, delay=1.0):
422 dlist = []
419 dlist = []
423 for el in self.launchers.itervalues():
420 for el in self.launchers.itervalues():
424 d = el.interrupt_then_kill(delay)
421 d = el.interrupt_then_kill(delay)
425 dlist.append(d)
422 dlist.append(d)
426 # dfinal = gatherBoth(dlist, consumeErrors=True)
423 # dfinal = gatherBoth(dlist, consumeErrors=True)
427 return dlist
424 return dlist
428
425
429 def stop(self):
426 def stop(self):
430 return self.interrupt_then_kill()
427 return self.interrupt_then_kill()
431
428
432 def _notice_engine_stopped(self, data):
429 def _notice_engine_stopped(self, data):
433 pid = data['pid']
430 pid = data['pid']
434 for idx,el in self.launchers.iteritems():
431 for idx,el in self.launchers.iteritems():
435 if el.process.pid == pid:
432 if el.process.pid == pid:
436 break
433 break
437 self.launchers.pop(idx)
434 self.launchers.pop(idx)
438 self.stop_data[idx] = data
435 self.stop_data[idx] = data
439 if not self.launchers:
436 if not self.launchers:
440 self.notify_stop(self.stop_data)
437 self.notify_stop(self.stop_data)
441
438
442
439
443 #-----------------------------------------------------------------------------
440 #-----------------------------------------------------------------------------
444 # MPIExec launchers
441 # MPIExec launchers
445 #-----------------------------------------------------------------------------
442 #-----------------------------------------------------------------------------
446
443
447
444
448 class MPIExecLauncher(LocalProcessLauncher):
445 class MPIExecLauncher(LocalProcessLauncher):
449 """Launch an external process using mpiexec."""
446 """Launch an external process using mpiexec."""
450
447
451 mpi_cmd = List(['mpiexec'], config=True,
448 mpi_cmd = List(['mpiexec'], config=True,
452 help="The mpiexec command to use in starting the process."
449 help="The mpiexec command to use in starting the process."
453 )
450 )
454 mpi_args = List([], config=True,
451 mpi_args = List([], config=True,
455 help="The command line arguments to pass to mpiexec."
452 help="The command line arguments to pass to mpiexec."
456 )
453 )
457 program = List(['date'], config=True,
454 program = List(['date'], config=True,
458 help="The program to start via mpiexec.")
455 help="The program to start via mpiexec.")
459 program_args = List([], config=True,
456 program_args = List([], config=True,
460 help="The command line argument to the program."
457 help="The command line argument to the program."
461 )
458 )
462 n = Int(1)
459 n = Int(1)
463
460
464 def find_args(self):
461 def find_args(self):
465 """Build self.args using all the fields."""
462 """Build self.args using all the fields."""
466 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
463 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
467 self.program + self.program_args
464 self.program + self.program_args
468
465
469 def start(self, n):
466 def start(self, n):
470 """Start n instances of the program using mpiexec."""
467 """Start n instances of the program using mpiexec."""
471 self.n = n
468 self.n = n
472 return super(MPIExecLauncher, self).start()
469 return super(MPIExecLauncher, self).start()
473
470
474
471
475 class MPIExecControllerLauncher(MPIExecLauncher):
472 class MPIExecControllerLauncher(MPIExecLauncher):
476 """Launch a controller using mpiexec."""
473 """Launch a controller using mpiexec."""
477
474
478 controller_cmd = List(ipcontroller_cmd_argv, config=True,
475 controller_cmd = List(ipcontroller_cmd_argv, config=True,
479 help="Popen command to launch the Contropper"
476 help="Popen command to launch the Contropper"
480 )
477 )
481 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
478 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
482 help="Command line arguments to pass to ipcontroller."
479 help="Command line arguments to pass to ipcontroller."
483 )
480 )
484 n = Int(1)
481 n = Int(1)
485
482
486 def start(self, profile_dir):
483 def start(self, profile_dir):
487 """Start the controller by profile_dir."""
484 """Start the controller by profile_dir."""
488 self.controller_args.extend(['profile_dir=%s'%profile_dir])
485 self.controller_args.extend(['profile_dir=%s'%profile_dir])
489 self.profile_dir = unicode(profile_dir)
486 self.profile_dir = unicode(profile_dir)
490 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
487 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
491 return super(MPIExecControllerLauncher, self).start(1)
488 return super(MPIExecControllerLauncher, self).start(1)
492
489
493 def find_args(self):
490 def find_args(self):
494 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
491 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
495 self.controller_cmd + self.controller_args
492 self.controller_cmd + self.controller_args
496
493
497
494
498 class MPIExecEngineSetLauncher(MPIExecLauncher):
495 class MPIExecEngineSetLauncher(MPIExecLauncher):
499
496
500 program = List(ipengine_cmd_argv, config=True,
497 program = List(ipengine_cmd_argv, config=True,
501 help="Popen command for ipengine"
498 help="Popen command for ipengine"
502 )
499 )
503 program_args = List(
500 program_args = List(
504 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
501 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
505 help="Command line arguments for ipengine."
502 help="Command line arguments for ipengine."
506 )
503 )
507 n = Int(1)
504 n = Int(1)
508
505
509 def start(self, n, profile_dir):
506 def start(self, n, profile_dir):
510 """Start n engines by profile or profile_dir."""
507 """Start n engines by profile or profile_dir."""
511 self.program_args.extend(['profile_dir=%s'%profile_dir])
508 self.program_args.extend(['profile_dir=%s'%profile_dir])
512 self.profile_dir = unicode(profile_dir)
509 self.profile_dir = unicode(profile_dir)
513 self.n = n
510 self.n = n
514 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
511 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
515 return super(MPIExecEngineSetLauncher, self).start(n)
512 return super(MPIExecEngineSetLauncher, self).start(n)
516
513
517 #-----------------------------------------------------------------------------
514 #-----------------------------------------------------------------------------
518 # SSH launchers
515 # SSH launchers
519 #-----------------------------------------------------------------------------
516 #-----------------------------------------------------------------------------
520
517
521 # TODO: Get SSH Launcher working again.
518 # TODO: Get SSH Launcher working again.
522
519
523 class SSHLauncher(LocalProcessLauncher):
520 class SSHLauncher(LocalProcessLauncher):
524 """A minimal launcher for ssh.
521 """A minimal launcher for ssh.
525
522
526 To be useful this will probably have to be extended to use the ``sshx``
523 To be useful this will probably have to be extended to use the ``sshx``
527 idea for environment variables. There could be other things this needs
524 idea for environment variables. There could be other things this needs
528 as well.
525 as well.
529 """
526 """
530
527
531 ssh_cmd = List(['ssh'], config=True,
528 ssh_cmd = List(['ssh'], config=True,
532 help="command for starting ssh")
529 help="command for starting ssh")
533 ssh_args = List(['-tt'], config=True,
530 ssh_args = List(['-tt'], config=True,
534 help="args to pass to ssh")
531 help="args to pass to ssh")
535 program = List(['date'], config=True,
532 program = List(['date'], config=True,
536 help="Program to launch via ssh")
533 help="Program to launch via ssh")
537 program_args = List([], config=True,
534 program_args = List([], config=True,
538 help="args to pass to remote program")
535 help="args to pass to remote program")
539 hostname = Unicode('', config=True,
536 hostname = Unicode('', config=True,
540 help="hostname on which to launch the program")
537 help="hostname on which to launch the program")
541 user = Unicode('', config=True,
538 user = Unicode('', config=True,
542 help="username for ssh")
539 help="username for ssh")
543 location = Unicode('', config=True,
540 location = Unicode('', config=True,
544 help="user@hostname location for ssh in one setting")
541 help="user@hostname location for ssh in one setting")
545
542
546 def _hostname_changed(self, name, old, new):
543 def _hostname_changed(self, name, old, new):
547 if self.user:
544 if self.user:
548 self.location = u'%s@%s' % (self.user, new)
545 self.location = u'%s@%s' % (self.user, new)
549 else:
546 else:
550 self.location = new
547 self.location = new
551
548
552 def _user_changed(self, name, old, new):
549 def _user_changed(self, name, old, new):
553 self.location = u'%s@%s' % (new, self.hostname)
550 self.location = u'%s@%s' % (new, self.hostname)
554
551
555 def find_args(self):
552 def find_args(self):
556 return self.ssh_cmd + self.ssh_args + [self.location] + \
553 return self.ssh_cmd + self.ssh_args + [self.location] + \
557 self.program + self.program_args
554 self.program + self.program_args
558
555
559 def start(self, profile_dir, hostname=None, user=None):
556 def start(self, profile_dir, hostname=None, user=None):
560 self.profile_dir = unicode(profile_dir)
557 self.profile_dir = unicode(profile_dir)
561 if hostname is not None:
558 if hostname is not None:
562 self.hostname = hostname
559 self.hostname = hostname
563 if user is not None:
560 if user is not None:
564 self.user = user
561 self.user = user
565
562
566 return super(SSHLauncher, self).start()
563 return super(SSHLauncher, self).start()
567
564
568 def signal(self, sig):
565 def signal(self, sig):
569 if self.state == 'running':
566 if self.state == 'running':
570 # send escaped ssh connection-closer
567 # send escaped ssh connection-closer
571 self.process.stdin.write('~.')
568 self.process.stdin.write('~.')
572 self.process.stdin.flush()
569 self.process.stdin.flush()
573
570
574
571
575
572
576 class SSHControllerLauncher(SSHLauncher):
573 class SSHControllerLauncher(SSHLauncher):
577
574
578 program = List(ipcontroller_cmd_argv, config=True,
575 program = List(ipcontroller_cmd_argv, config=True,
579 help="remote ipcontroller command.")
576 help="remote ipcontroller command.")
580 program_args = List(['--reuse-files', '--log-to-file','log_level=%i'%logging.INFO], config=True,
577 program_args = List(['--reuse-files', '--log-to-file','log_level=%i'%logging.INFO], config=True,
581 help="Command line arguments to ipcontroller.")
578 help="Command line arguments to ipcontroller.")
582
579
583
580
584 class SSHEngineLauncher(SSHLauncher):
581 class SSHEngineLauncher(SSHLauncher):
585 program = List(ipengine_cmd_argv, config=True,
582 program = List(ipengine_cmd_argv, config=True,
586 help="remote ipengine command.")
583 help="remote ipengine command.")
587 # Command line arguments for ipengine.
584 # Command line arguments for ipengine.
588 program_args = List(
585 program_args = List(
589 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
586 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
590 help="Command line arguments to ipengine."
587 help="Command line arguments to ipengine."
591 )
588 )
592
589
593 class SSHEngineSetLauncher(LocalEngineSetLauncher):
590 class SSHEngineSetLauncher(LocalEngineSetLauncher):
594 launcher_class = SSHEngineLauncher
591 launcher_class = SSHEngineLauncher
595 engines = Dict(config=True,
592 engines = Dict(config=True,
596 help="""dict of engines to launch. This is a dict by hostname of ints,
593 help="""dict of engines to launch. This is a dict by hostname of ints,
597 corresponding to the number of engines to start on that host.""")
594 corresponding to the number of engines to start on that host.""")
598
595
599 def start(self, n, profile_dir):
596 def start(self, n, profile_dir):
600 """Start engines by profile or profile_dir.
597 """Start engines by profile or profile_dir.
601 `n` is ignored, and the `engines` config property is used instead.
598 `n` is ignored, and the `engines` config property is used instead.
602 """
599 """
603
600
604 self.profile_dir = unicode(profile_dir)
601 self.profile_dir = unicode(profile_dir)
605 dlist = []
602 dlist = []
606 for host, n in self.engines.iteritems():
603 for host, n in self.engines.iteritems():
607 if isinstance(n, (tuple, list)):
604 if isinstance(n, (tuple, list)):
608 n, args = n
605 n, args = n
609 else:
606 else:
610 args = copy.deepcopy(self.engine_args)
607 args = copy.deepcopy(self.engine_args)
611
608
612 if '@' in host:
609 if '@' in host:
613 user,host = host.split('@',1)
610 user,host = host.split('@',1)
614 else:
611 else:
615 user=None
612 user=None
616 for i in range(n):
613 for i in range(n):
617 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
614 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
618
615
619 # Copy the engine args over to each engine launcher.
616 # Copy the engine args over to each engine launcher.
620 i
617 i
621 el.program_args = args
618 el.program_args = args
622 el.on_stop(self._notice_engine_stopped)
619 el.on_stop(self._notice_engine_stopped)
623 d = el.start(profile_dir, user=user, hostname=host)
620 d = el.start(profile_dir, user=user, hostname=host)
624 if i==0:
621 if i==0:
625 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
622 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
626 self.launchers[host+str(i)] = el
623 self.launchers[host+str(i)] = el
627 dlist.append(d)
624 dlist.append(d)
628 self.notify_start(dlist)
625 self.notify_start(dlist)
629 return dlist
626 return dlist
630
627
631
628
632
629
633 #-----------------------------------------------------------------------------
630 #-----------------------------------------------------------------------------
634 # Windows HPC Server 2008 scheduler launchers
631 # Windows HPC Server 2008 scheduler launchers
635 #-----------------------------------------------------------------------------
632 #-----------------------------------------------------------------------------
636
633
637
634
638 # This is only used on Windows.
635 # This is only used on Windows.
639 def find_job_cmd():
636 def find_job_cmd():
640 if WINDOWS:
637 if WINDOWS:
641 try:
638 try:
642 return find_cmd('job')
639 return find_cmd('job')
643 except (FindCmdError, ImportError):
640 except (FindCmdError, ImportError):
644 # ImportError will be raised if win32api is not installed
641 # ImportError will be raised if win32api is not installed
645 return 'job'
642 return 'job'
646 else:
643 else:
647 return 'job'
644 return 'job'
648
645
649
646
650 class WindowsHPCLauncher(BaseLauncher):
647 class WindowsHPCLauncher(BaseLauncher):
651
648
652 job_id_regexp = Unicode(r'\d+', config=True,
649 job_id_regexp = Unicode(r'\d+', config=True,
653 help="""A regular expression used to get the job id from the output of the
650 help="""A regular expression used to get the job id from the output of the
654 submit_command. """
651 submit_command. """
655 )
652 )
656 job_file_name = Unicode(u'ipython_job.xml', config=True,
653 job_file_name = Unicode(u'ipython_job.xml', config=True,
657 help="The filename of the instantiated job script.")
654 help="The filename of the instantiated job script.")
658 # The full path to the instantiated job script. This gets made dynamically
655 # The full path to the instantiated job script. This gets made dynamically
659 # by combining the work_dir with the job_file_name.
656 # by combining the work_dir with the job_file_name.
660 job_file = Unicode(u'')
657 job_file = Unicode(u'')
661 scheduler = Unicode('', config=True,
658 scheduler = Unicode('', config=True,
662 help="The hostname of the scheduler to submit the job to.")
659 help="The hostname of the scheduler to submit the job to.")
663 job_cmd = Unicode(find_job_cmd(), config=True,
660 job_cmd = Unicode(find_job_cmd(), config=True,
664 help="The command for submitting jobs.")
661 help="The command for submitting jobs.")
665
662
666 def __init__(self, work_dir=u'.', config=None, **kwargs):
663 def __init__(self, work_dir=u'.', config=None, **kwargs):
667 super(WindowsHPCLauncher, self).__init__(
664 super(WindowsHPCLauncher, self).__init__(
668 work_dir=work_dir, config=config, **kwargs
665 work_dir=work_dir, config=config, **kwargs
669 )
666 )
670
667
671 @property
668 @property
672 def job_file(self):
669 def job_file(self):
673 return os.path.join(self.work_dir, self.job_file_name)
670 return os.path.join(self.work_dir, self.job_file_name)
674
671
675 def write_job_file(self, n):
672 def write_job_file(self, n):
676 raise NotImplementedError("Implement write_job_file in a subclass.")
673 raise NotImplementedError("Implement write_job_file in a subclass.")
677
674
678 def find_args(self):
675 def find_args(self):
679 return [u'job.exe']
676 return [u'job.exe']
680
677
681 def parse_job_id(self, output):
678 def parse_job_id(self, output):
682 """Take the output of the submit command and return the job id."""
679 """Take the output of the submit command and return the job id."""
683 m = re.search(self.job_id_regexp, output)
680 m = re.search(self.job_id_regexp, output)
684 if m is not None:
681 if m is not None:
685 job_id = m.group()
682 job_id = m.group()
686 else:
683 else:
687 raise LauncherError("Job id couldn't be determined: %s" % output)
684 raise LauncherError("Job id couldn't be determined: %s" % output)
688 self.job_id = job_id
685 self.job_id = job_id
689 self.log.info('Job started with job id: %r' % job_id)
686 self.log.info('Job started with job id: %r' % job_id)
690 return job_id
687 return job_id
691
688
692 def start(self, n):
689 def start(self, n):
693 """Start n copies of the process using the Win HPC job scheduler."""
690 """Start n copies of the process using the Win HPC job scheduler."""
694 self.write_job_file(n)
691 self.write_job_file(n)
695 args = [
692 args = [
696 'submit',
693 'submit',
697 '/jobfile:%s' % self.job_file,
694 '/jobfile:%s' % self.job_file,
698 '/scheduler:%s' % self.scheduler
695 '/scheduler:%s' % self.scheduler
699 ]
696 ]
700 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
697 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
701 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
698 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
702 output = check_output([self.job_cmd]+args,
699 output = check_output([self.job_cmd]+args,
703 env=os.environ,
700 env=os.environ,
704 cwd=self.work_dir,
701 cwd=self.work_dir,
705 stderr=STDOUT
702 stderr=STDOUT
706 )
703 )
707 job_id = self.parse_job_id(output)
704 job_id = self.parse_job_id(output)
708 self.notify_start(job_id)
705 self.notify_start(job_id)
709 return job_id
706 return job_id
710
707
711 def stop(self):
708 def stop(self):
712 args = [
709 args = [
713 'cancel',
710 'cancel',
714 self.job_id,
711 self.job_id,
715 '/scheduler:%s' % self.scheduler
712 '/scheduler:%s' % self.scheduler
716 ]
713 ]
717 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
714 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
718 try:
715 try:
719 output = check_output([self.job_cmd]+args,
716 output = check_output([self.job_cmd]+args,
720 env=os.environ,
717 env=os.environ,
721 cwd=self.work_dir,
718 cwd=self.work_dir,
722 stderr=STDOUT
719 stderr=STDOUT
723 )
720 )
724 except:
721 except:
725 output = 'The job already appears to be stoppped: %r' % self.job_id
722 output = 'The job already appears to be stoppped: %r' % self.job_id
726 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
723 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
727 return output
724 return output
728
725
729
726
730 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
727 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
731
728
732 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
729 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
733 help="WinHPC xml job file.")
730 help="WinHPC xml job file.")
734 extra_args = List([], config=False,
731 extra_args = List([], config=False,
735 help="extra args to pass to ipcontroller")
732 help="extra args to pass to ipcontroller")
736
733
737 def write_job_file(self, n):
734 def write_job_file(self, n):
738 job = IPControllerJob(config=self.config)
735 job = IPControllerJob(config=self.config)
739
736
740 t = IPControllerTask(config=self.config)
737 t = IPControllerTask(config=self.config)
741 # The tasks work directory is *not* the actual work directory of
738 # The tasks work directory is *not* the actual work directory of
742 # the controller. It is used as the base path for the stdout/stderr
739 # the controller. It is used as the base path for the stdout/stderr
743 # files that the scheduler redirects to.
740 # files that the scheduler redirects to.
744 t.work_directory = self.profile_dir
741 t.work_directory = self.profile_dir
745 # Add the profile_dir and from self.start().
742 # Add the profile_dir and from self.start().
746 t.controller_args.extend(self.extra_args)
743 t.controller_args.extend(self.extra_args)
747 job.add_task(t)
744 job.add_task(t)
748
745
749 self.log.info("Writing job description file: %s" % self.job_file)
746 self.log.info("Writing job description file: %s" % self.job_file)
750 job.write(self.job_file)
747 job.write(self.job_file)
751
748
752 @property
749 @property
753 def job_file(self):
750 def job_file(self):
754 return os.path.join(self.profile_dir, self.job_file_name)
751 return os.path.join(self.profile_dir, self.job_file_name)
755
752
756 def start(self, profile_dir):
753 def start(self, profile_dir):
757 """Start the controller by profile_dir."""
754 """Start the controller by profile_dir."""
758 self.extra_args = ['profile_dir=%s'%profile_dir]
755 self.extra_args = ['profile_dir=%s'%profile_dir]
759 self.profile_dir = unicode(profile_dir)
756 self.profile_dir = unicode(profile_dir)
760 return super(WindowsHPCControllerLauncher, self).start(1)
757 return super(WindowsHPCControllerLauncher, self).start(1)
761
758
762
759
763 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
760 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
764
761
765 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
762 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
766 help="jobfile for ipengines job")
763 help="jobfile for ipengines job")
767 extra_args = List([], config=False,
764 extra_args = List([], config=False,
768 help="extra args to pas to ipengine")
765 help="extra args to pas to ipengine")
769
766
770 def write_job_file(self, n):
767 def write_job_file(self, n):
771 job = IPEngineSetJob(config=self.config)
768 job = IPEngineSetJob(config=self.config)
772
769
773 for i in range(n):
770 for i in range(n):
774 t = IPEngineTask(config=self.config)
771 t = IPEngineTask(config=self.config)
775 # The tasks work directory is *not* the actual work directory of
772 # The tasks work directory is *not* the actual work directory of
776 # the engine. It is used as the base path for the stdout/stderr
773 # the engine. It is used as the base path for the stdout/stderr
777 # files that the scheduler redirects to.
774 # files that the scheduler redirects to.
778 t.work_directory = self.profile_dir
775 t.work_directory = self.profile_dir
779 # Add the profile_dir and from self.start().
776 # Add the profile_dir and from self.start().
780 t.engine_args.extend(self.extra_args)
777 t.engine_args.extend(self.extra_args)
781 job.add_task(t)
778 job.add_task(t)
782
779
783 self.log.info("Writing job description file: %s" % self.job_file)
780 self.log.info("Writing job description file: %s" % self.job_file)
784 job.write(self.job_file)
781 job.write(self.job_file)
785
782
786 @property
783 @property
787 def job_file(self):
784 def job_file(self):
788 return os.path.join(self.profile_dir, self.job_file_name)
785 return os.path.join(self.profile_dir, self.job_file_name)
789
786
790 def start(self, n, profile_dir):
787 def start(self, n, profile_dir):
791 """Start the controller by profile_dir."""
788 """Start the controller by profile_dir."""
792 self.extra_args = ['profile_dir=%s'%profile_dir]
789 self.extra_args = ['profile_dir=%s'%profile_dir]
793 self.profile_dir = unicode(profile_dir)
790 self.profile_dir = unicode(profile_dir)
794 return super(WindowsHPCEngineSetLauncher, self).start(n)
791 return super(WindowsHPCEngineSetLauncher, self).start(n)
795
792
796
793
797 #-----------------------------------------------------------------------------
794 #-----------------------------------------------------------------------------
798 # Batch (PBS) system launchers
795 # Batch (PBS) system launchers
799 #-----------------------------------------------------------------------------
796 #-----------------------------------------------------------------------------
800
797
801 class BatchSystemLauncher(BaseLauncher):
798 class BatchSystemLauncher(BaseLauncher):
802 """Launch an external process using a batch system.
799 """Launch an external process using a batch system.
803
800
804 This class is designed to work with UNIX batch systems like PBS, LSF,
801 This class is designed to work with UNIX batch systems like PBS, LSF,
805 GridEngine, etc. The overall model is that there are different commands
802 GridEngine, etc. The overall model is that there are different commands
806 like qsub, qdel, etc. that handle the starting and stopping of the process.
803 like qsub, qdel, etc. that handle the starting and stopping of the process.
807
804
808 This class also has the notion of a batch script. The ``batch_template``
805 This class also has the notion of a batch script. The ``batch_template``
809 attribute can be set to a string that is a template for the batch script.
806 attribute can be set to a string that is a template for the batch script.
810 This template is instantiated using string formatting. Thus the template can
807 This template is instantiated using string formatting. Thus the template can
811 use {n} fot the number of instances. Subclasses can add additional variables
808 use {n} fot the number of instances. Subclasses can add additional variables
812 to the template dict.
809 to the template dict.
813 """
810 """
814
811
815 # Subclasses must fill these in. See PBSEngineSet
812 # Subclasses must fill these in. See PBSEngineSet
816 submit_command = List([''], config=True,
813 submit_command = List([''], config=True,
817 help="The name of the command line program used to submit jobs.")
814 help="The name of the command line program used to submit jobs.")
818 delete_command = List([''], config=True,
815 delete_command = List([''], config=True,
819 help="The name of the command line program used to delete jobs.")
816 help="The name of the command line program used to delete jobs.")
820 job_id_regexp = Unicode('', config=True,
817 job_id_regexp = Unicode('', config=True,
821 help="""A regular expression used to get the job id from the output of the
818 help="""A regular expression used to get the job id from the output of the
822 submit_command.""")
819 submit_command.""")
823 batch_template = Unicode('', config=True,
820 batch_template = Unicode('', config=True,
824 help="The string that is the batch script template itself.")
821 help="The string that is the batch script template itself.")
825 batch_template_file = Unicode(u'', config=True,
822 batch_template_file = Unicode(u'', config=True,
826 help="The file that contains the batch template.")
823 help="The file that contains the batch template.")
827 batch_file_name = Unicode(u'batch_script', config=True,
824 batch_file_name = Unicode(u'batch_script', config=True,
828 help="The filename of the instantiated batch script.")
825 help="The filename of the instantiated batch script.")
829 queue = Unicode(u'', config=True,
826 queue = Unicode(u'', config=True,
830 help="The PBS Queue.")
827 help="The PBS Queue.")
831
828
832 # not configurable, override in subclasses
829 # not configurable, override in subclasses
833 # PBS Job Array regex
830 # PBS Job Array regex
834 job_array_regexp = Unicode('')
831 job_array_regexp = Unicode('')
835 job_array_template = Unicode('')
832 job_array_template = Unicode('')
836 # PBS Queue regex
833 # PBS Queue regex
837 queue_regexp = Unicode('')
834 queue_regexp = Unicode('')
838 queue_template = Unicode('')
835 queue_template = Unicode('')
839 # The default batch template, override in subclasses
836 # The default batch template, override in subclasses
840 default_template = Unicode('')
837 default_template = Unicode('')
841 # The full path to the instantiated batch script.
838 # The full path to the instantiated batch script.
842 batch_file = Unicode(u'')
839 batch_file = Unicode(u'')
843 # the format dict used with batch_template:
840 # the format dict used with batch_template:
844 context = Dict()
841 context = Dict()
845 # the Formatter instance for rendering the templates:
842 # the Formatter instance for rendering the templates:
846 formatter = Instance(EvalFormatter, (), {})
843 formatter = Instance(EvalFormatter, (), {})
847
844
848
845
849 def find_args(self):
846 def find_args(self):
850 return self.submit_command + [self.batch_file]
847 return self.submit_command + [self.batch_file]
851
848
852 def __init__(self, work_dir=u'.', config=None, **kwargs):
849 def __init__(self, work_dir=u'.', config=None, **kwargs):
853 super(BatchSystemLauncher, self).__init__(
850 super(BatchSystemLauncher, self).__init__(
854 work_dir=work_dir, config=config, **kwargs
851 work_dir=work_dir, config=config, **kwargs
855 )
852 )
856 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
853 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
857
854
858 def parse_job_id(self, output):
855 def parse_job_id(self, output):
859 """Take the output of the submit command and return the job id."""
856 """Take the output of the submit command and return the job id."""
860 m = re.search(self.job_id_regexp, output)
857 m = re.search(self.job_id_regexp, output)
861 if m is not None:
858 if m is not None:
862 job_id = m.group()
859 job_id = m.group()
863 else:
860 else:
864 raise LauncherError("Job id couldn't be determined: %s" % output)
861 raise LauncherError("Job id couldn't be determined: %s" % output)
865 self.job_id = job_id
862 self.job_id = job_id
866 self.log.info('Job submitted with job id: %r' % job_id)
863 self.log.info('Job submitted with job id: %r' % job_id)
867 return job_id
864 return job_id
868
865
869 def write_batch_script(self, n):
866 def write_batch_script(self, n):
870 """Instantiate and write the batch script to the work_dir."""
867 """Instantiate and write the batch script to the work_dir."""
871 self.context['n'] = n
868 self.context['n'] = n
872 self.context['queue'] = self.queue
869 self.context['queue'] = self.queue
873 # first priority is batch_template if set
870 # first priority is batch_template if set
874 if self.batch_template_file and not self.batch_template:
871 if self.batch_template_file and not self.batch_template:
875 # second priority is batch_template_file
872 # second priority is batch_template_file
876 with open(self.batch_template_file) as f:
873 with open(self.batch_template_file) as f:
877 self.batch_template = f.read()
874 self.batch_template = f.read()
878 if not self.batch_template:
875 if not self.batch_template:
879 # third (last) priority is default_template
876 # third (last) priority is default_template
880 self.batch_template = self.default_template
877 self.batch_template = self.default_template
881
878
882 regex = re.compile(self.job_array_regexp)
879 regex = re.compile(self.job_array_regexp)
883 # print regex.search(self.batch_template)
880 # print regex.search(self.batch_template)
884 if not regex.search(self.batch_template):
881 if not regex.search(self.batch_template):
885 self.log.info("adding job array settings to batch script")
882 self.log.info("adding job array settings to batch script")
886 firstline, rest = self.batch_template.split('\n',1)
883 firstline, rest = self.batch_template.split('\n',1)
887 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
884 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
888
885
889 regex = re.compile(self.queue_regexp)
886 regex = re.compile(self.queue_regexp)
890 # print regex.search(self.batch_template)
887 # print regex.search(self.batch_template)
891 if self.queue and not regex.search(self.batch_template):
888 if self.queue and not regex.search(self.batch_template):
892 self.log.info("adding PBS queue settings to batch script")
889 self.log.info("adding PBS queue settings to batch script")
893 firstline, rest = self.batch_template.split('\n',1)
890 firstline, rest = self.batch_template.split('\n',1)
894 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
891 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
895
892
896 script_as_string = self.formatter.format(self.batch_template, **self.context)
893 script_as_string = self.formatter.format(self.batch_template, **self.context)
897 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
894 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
898
895
899 with open(self.batch_file, 'w') as f:
896 with open(self.batch_file, 'w') as f:
900 f.write(script_as_string)
897 f.write(script_as_string)
901 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
898 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
902
899
903 def start(self, n, profile_dir):
900 def start(self, n, profile_dir):
904 """Start n copies of the process using a batch system."""
901 """Start n copies of the process using a batch system."""
905 # Here we save profile_dir in the context so they
902 # Here we save profile_dir in the context so they
906 # can be used in the batch script template as {profile_dir}
903 # can be used in the batch script template as {profile_dir}
907 self.context['profile_dir'] = profile_dir
904 self.context['profile_dir'] = profile_dir
908 self.profile_dir = unicode(profile_dir)
905 self.profile_dir = unicode(profile_dir)
909 self.write_batch_script(n)
906 self.write_batch_script(n)
910 output = check_output(self.args, env=os.environ)
907 output = check_output(self.args, env=os.environ)
911
908
912 job_id = self.parse_job_id(output)
909 job_id = self.parse_job_id(output)
913 self.notify_start(job_id)
910 self.notify_start(job_id)
914 return job_id
911 return job_id
915
912
916 def stop(self):
913 def stop(self):
917 output = check_output(self.delete_command+[self.job_id], env=os.environ)
914 output = check_output(self.delete_command+[self.job_id], env=os.environ)
918 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
915 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
919 return output
916 return output
920
917
921
918
922 class PBSLauncher(BatchSystemLauncher):
919 class PBSLauncher(BatchSystemLauncher):
923 """A BatchSystemLauncher subclass for PBS."""
920 """A BatchSystemLauncher subclass for PBS."""
924
921
925 submit_command = List(['qsub'], config=True,
922 submit_command = List(['qsub'], config=True,
926 help="The PBS submit command ['qsub']")
923 help="The PBS submit command ['qsub']")
927 delete_command = List(['qdel'], config=True,
924 delete_command = List(['qdel'], config=True,
928 help="The PBS delete command ['qsub']")
925 help="The PBS delete command ['qsub']")
929 job_id_regexp = Unicode(r'\d+', config=True,
926 job_id_regexp = Unicode(r'\d+', config=True,
930 help="Regular expresion for identifying the job ID [r'\d+']")
927 help="Regular expresion for identifying the job ID [r'\d+']")
931
928
932 batch_file = Unicode(u'')
929 batch_file = Unicode(u'')
933 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
930 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
934 job_array_template = Unicode('#PBS -t 1-{n}')
931 job_array_template = Unicode('#PBS -t 1-{n}')
935 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
932 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
936 queue_template = Unicode('#PBS -q {queue}')
933 queue_template = Unicode('#PBS -q {queue}')
937
934
938
935
939 class PBSControllerLauncher(PBSLauncher):
936 class PBSControllerLauncher(PBSLauncher):
940 """Launch a controller using PBS."""
937 """Launch a controller using PBS."""
941
938
942 batch_file_name = Unicode(u'pbs_controller', config=True,
939 batch_file_name = Unicode(u'pbs_controller', config=True,
943 help="batch file name for the controller job.")
940 help="batch file name for the controller job.")
944 default_template= Unicode("""#!/bin/sh
941 default_template= Unicode("""#!/bin/sh
945 #PBS -V
942 #PBS -V
946 #PBS -N ipcontroller
943 #PBS -N ipcontroller
947 %s --log-to-file profile_dir={profile_dir}
944 %s --log-to-file profile_dir={profile_dir}
948 """%(' '.join(ipcontroller_cmd_argv)))
945 """%(' '.join(ipcontroller_cmd_argv)))
949
946
950 def start(self, profile_dir):
947 def start(self, profile_dir):
951 """Start the controller by profile or profile_dir."""
948 """Start the controller by profile or profile_dir."""
952 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
949 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
953 return super(PBSControllerLauncher, self).start(1, profile_dir)
950 return super(PBSControllerLauncher, self).start(1, profile_dir)
954
951
955
952
956 class PBSEngineSetLauncher(PBSLauncher):
953 class PBSEngineSetLauncher(PBSLauncher):
957 """Launch Engines using PBS"""
954 """Launch Engines using PBS"""
958 batch_file_name = Unicode(u'pbs_engines', config=True,
955 batch_file_name = Unicode(u'pbs_engines', config=True,
959 help="batch file name for the engine(s) job.")
956 help="batch file name for the engine(s) job.")
960 default_template= Unicode(u"""#!/bin/sh
957 default_template= Unicode(u"""#!/bin/sh
961 #PBS -V
958 #PBS -V
962 #PBS -N ipengine
959 #PBS -N ipengine
963 %s profile_dir={profile_dir}
960 %s profile_dir={profile_dir}
964 """%(' '.join(ipengine_cmd_argv)))
961 """%(' '.join(ipengine_cmd_argv)))
965
962
966 def start(self, n, profile_dir):
963 def start(self, n, profile_dir):
967 """Start n engines by profile or profile_dir."""
964 """Start n engines by profile or profile_dir."""
968 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
965 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
969 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
966 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
970
967
971 #SGE is very similar to PBS
968 #SGE is very similar to PBS
972
969
973 class SGELauncher(PBSLauncher):
970 class SGELauncher(PBSLauncher):
974 """Sun GridEngine is a PBS clone with slightly different syntax"""
971 """Sun GridEngine is a PBS clone with slightly different syntax"""
975 job_array_regexp = Unicode('#\$\W+\-t')
972 job_array_regexp = Unicode('#\$\W+\-t')
976 job_array_template = Unicode('#$ -t 1-{n}')
973 job_array_template = Unicode('#$ -t 1-{n}')
977 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
974 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
978 queue_template = Unicode('#$ -q $queue')
975 queue_template = Unicode('#$ -q $queue')
979
976
980 class SGEControllerLauncher(SGELauncher):
977 class SGEControllerLauncher(SGELauncher):
981 """Launch a controller using SGE."""
978 """Launch a controller using SGE."""
982
979
983 batch_file_name = Unicode(u'sge_controller', config=True,
980 batch_file_name = Unicode(u'sge_controller', config=True,
984 help="batch file name for the ipontroller job.")
981 help="batch file name for the ipontroller job.")
985 default_template= Unicode(u"""#$ -V
982 default_template= Unicode(u"""#$ -V
986 #$ -S /bin/sh
983 #$ -S /bin/sh
987 #$ -N ipcontroller
984 #$ -N ipcontroller
988 %s --log-to-file profile_dir={profile_dir}
985 %s --log-to-file profile_dir={profile_dir}
989 """%(' '.join(ipcontroller_cmd_argv)))
986 """%(' '.join(ipcontroller_cmd_argv)))
990
987
991 def start(self, profile_dir):
988 def start(self, profile_dir):
992 """Start the controller by profile or profile_dir."""
989 """Start the controller by profile or profile_dir."""
993 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
990 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
994 return super(SGEControllerLauncher, self).start(1, profile_dir)
991 return super(SGEControllerLauncher, self).start(1, profile_dir)
995
992
996 class SGEEngineSetLauncher(SGELauncher):
993 class SGEEngineSetLauncher(SGELauncher):
997 """Launch Engines with SGE"""
994 """Launch Engines with SGE"""
998 batch_file_name = Unicode(u'sge_engines', config=True,
995 batch_file_name = Unicode(u'sge_engines', config=True,
999 help="batch file name for the engine(s) job.")
996 help="batch file name for the engine(s) job.")
1000 default_template = Unicode("""#$ -V
997 default_template = Unicode("""#$ -V
1001 #$ -S /bin/sh
998 #$ -S /bin/sh
1002 #$ -N ipengine
999 #$ -N ipengine
1003 %s profile_dir={profile_dir}
1000 %s profile_dir={profile_dir}
1004 """%(' '.join(ipengine_cmd_argv)))
1001 """%(' '.join(ipengine_cmd_argv)))
1005
1002
1006 def start(self, n, profile_dir):
1003 def start(self, n, profile_dir):
1007 """Start n engines by profile or profile_dir."""
1004 """Start n engines by profile or profile_dir."""
1008 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1005 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1009 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1006 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1010
1007
1011
1008
1012 #-----------------------------------------------------------------------------
1009 #-----------------------------------------------------------------------------
1013 # A launcher for ipcluster itself!
1010 # A launcher for ipcluster itself!
1014 #-----------------------------------------------------------------------------
1011 #-----------------------------------------------------------------------------
1015
1012
1016
1013
1017 class IPClusterLauncher(LocalProcessLauncher):
1014 class IPClusterLauncher(LocalProcessLauncher):
1018 """Launch the ipcluster program in an external process."""
1015 """Launch the ipcluster program in an external process."""
1019
1016
1020 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1017 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1021 help="Popen command for ipcluster")
1018 help="Popen command for ipcluster")
1022 ipcluster_args = List(
1019 ipcluster_args = List(
1023 ['--clean-logs', '--log-to-file', 'log_level=%i'%logging.INFO], config=True,
1020 ['--clean-logs', '--log-to-file', 'log_level=%i'%logging.INFO], config=True,
1024 help="Command line arguments to pass to ipcluster.")
1021 help="Command line arguments to pass to ipcluster.")
1025 ipcluster_subcommand = Unicode('start')
1022 ipcluster_subcommand = Unicode('start')
1026 ipcluster_n = Int(2)
1023 ipcluster_n = Int(2)
1027
1024
1028 def find_args(self):
1025 def find_args(self):
1029 return self.ipcluster_cmd + ['--'+self.ipcluster_subcommand] + \
1026 return self.ipcluster_cmd + ['--'+self.ipcluster_subcommand] + \
1030 ['n=%i'%self.ipcluster_n] + self.ipcluster_args
1027 ['n=%i'%self.ipcluster_n] + self.ipcluster_args
1031
1028
1032 def start(self):
1029 def start(self):
1033 self.log.info("Starting ipcluster: %r" % self.args)
1030 self.log.info("Starting ipcluster: %r" % self.args)
1034 return super(IPClusterLauncher, self).start()
1031 return super(IPClusterLauncher, self).start()
1035
1032
1036 #-----------------------------------------------------------------------------
1033 #-----------------------------------------------------------------------------
1037 # Collections of launchers
1034 # Collections of launchers
1038 #-----------------------------------------------------------------------------
1035 #-----------------------------------------------------------------------------
1039
1036
1040 local_launchers = [
1037 local_launchers = [
1041 LocalControllerLauncher,
1038 LocalControllerLauncher,
1042 LocalEngineLauncher,
1039 LocalEngineLauncher,
1043 LocalEngineSetLauncher,
1040 LocalEngineSetLauncher,
1044 ]
1041 ]
1045 mpi_launchers = [
1042 mpi_launchers = [
1046 MPIExecLauncher,
1043 MPIExecLauncher,
1047 MPIExecControllerLauncher,
1044 MPIExecControllerLauncher,
1048 MPIExecEngineSetLauncher,
1045 MPIExecEngineSetLauncher,
1049 ]
1046 ]
1050 ssh_launchers = [
1047 ssh_launchers = [
1051 SSHLauncher,
1048 SSHLauncher,
1052 SSHControllerLauncher,
1049 SSHControllerLauncher,
1053 SSHEngineLauncher,
1050 SSHEngineLauncher,
1054 SSHEngineSetLauncher,
1051 SSHEngineSetLauncher,
1055 ]
1052 ]
1056 winhpc_launchers = [
1053 winhpc_launchers = [
1057 WindowsHPCLauncher,
1054 WindowsHPCLauncher,
1058 WindowsHPCControllerLauncher,
1055 WindowsHPCControllerLauncher,
1059 WindowsHPCEngineSetLauncher,
1056 WindowsHPCEngineSetLauncher,
1060 ]
1057 ]
1061 pbs_launchers = [
1058 pbs_launchers = [
1062 PBSLauncher,
1059 PBSLauncher,
1063 PBSControllerLauncher,
1060 PBSControllerLauncher,
1064 PBSEngineSetLauncher,
1061 PBSEngineSetLauncher,
1065 ]
1062 ]
1066 sge_launchers = [
1063 sge_launchers = [
1067 SGELauncher,
1064 SGELauncher,
1068 SGEControllerLauncher,
1065 SGEControllerLauncher,
1069 SGEEngineSetLauncher,
1066 SGEEngineSetLauncher,
1070 ]
1067 ]
1071 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1068 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1072 + pbs_launchers + sge_launchers
1069 + pbs_launchers + sge_launchers
@@ -1,113 +1,108 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple logger object that consolidates messages incoming from ipcluster processes."""
2 """A simple logger object that consolidates messages incoming from ipcluster processes."""
3
3
4 #-----------------------------------------------------------------------------
4 #-----------------------------------------------------------------------------
5 # Copyright (C) 2011 The IPython Development Team
5 # Copyright (C) 2011 The IPython Development Team
6 #
6 #
7 # Distributed under the terms of the BSD License. The full license is in
7 # Distributed under the terms of the BSD License. The full license is in
8 # the file COPYING, distributed as part of this software.
8 # the file COPYING, distributed as part of this software.
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10
10
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Imports
12 # Imports
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
14
15
15
16 import logging
16 import logging
17 import sys
17 import sys
18
18
19 import zmq
19 import zmq
20 from zmq.eventloop import ioloop, zmqstream
20 from zmq.eventloop import ioloop, zmqstream
21
21
22 from IPython.config.application import Application
22 from IPython.config.configurable import LoggingConfigurable
23 from IPython.config.configurable import Configurable
24 from IPython.utils.traitlets import Int, Unicode, Instance, List
23 from IPython.utils.traitlets import Int, Unicode, Instance, List
25
24
26 #-----------------------------------------------------------------------------
25 #-----------------------------------------------------------------------------
27 # Classes
26 # Classes
28 #-----------------------------------------------------------------------------
27 #-----------------------------------------------------------------------------
29
28
30
29
31 class LogWatcher(Configurable):
30 class LogWatcher(LoggingConfigurable):
32 """A simple class that receives messages on a SUB socket, as published
31 """A simple class that receives messages on a SUB socket, as published
33 by subclasses of `zmq.log.handlers.PUBHandler`, and logs them itself.
32 by subclasses of `zmq.log.handlers.PUBHandler`, and logs them itself.
34
33
35 This can subscribe to multiple topics, but defaults to all topics.
34 This can subscribe to multiple topics, but defaults to all topics.
36 """
35 """
37
36
38 log = Instance('logging.Logger')
39 def _log_default(self):
40 return Application.instance().log
41
42 # configurables
37 # configurables
43 topics = List([''], config=True,
38 topics = List([''], config=True,
44 help="The ZMQ topics to subscribe to. Default is to subscribe to all messages")
39 help="The ZMQ topics to subscribe to. Default is to subscribe to all messages")
45 url = Unicode('tcp://127.0.0.1:20202', config=True,
40 url = Unicode('tcp://127.0.0.1:20202', config=True,
46 help="ZMQ url on which to listen for log messages")
41 help="ZMQ url on which to listen for log messages")
47
42
48 # internals
43 # internals
49 stream = Instance('zmq.eventloop.zmqstream.ZMQStream')
44 stream = Instance('zmq.eventloop.zmqstream.ZMQStream')
50
45
51 context = Instance(zmq.Context)
46 context = Instance(zmq.Context)
52 def _context_default(self):
47 def _context_default(self):
53 return zmq.Context.instance()
48 return zmq.Context.instance()
54
49
55 loop = Instance(zmq.eventloop.ioloop.IOLoop)
50 loop = Instance(zmq.eventloop.ioloop.IOLoop)
56 def _loop_default(self):
51 def _loop_default(self):
57 return ioloop.IOLoop.instance()
52 return ioloop.IOLoop.instance()
58
53
59 def __init__(self, **kwargs):
54 def __init__(self, **kwargs):
60 super(LogWatcher, self).__init__(**kwargs)
55 super(LogWatcher, self).__init__(**kwargs)
61 s = self.context.socket(zmq.SUB)
56 s = self.context.socket(zmq.SUB)
62 s.bind(self.url)
57 s.bind(self.url)
63 self.stream = zmqstream.ZMQStream(s, self.loop)
58 self.stream = zmqstream.ZMQStream(s, self.loop)
64 self.subscribe()
59 self.subscribe()
65 self.on_trait_change(self.subscribe, 'topics')
60 self.on_trait_change(self.subscribe, 'topics')
66
61
67 def start(self):
62 def start(self):
68 self.stream.on_recv(self.log_message)
63 self.stream.on_recv(self.log_message)
69
64
70 def stop(self):
65 def stop(self):
71 self.stream.stop_on_recv()
66 self.stream.stop_on_recv()
72
67
73 def subscribe(self):
68 def subscribe(self):
74 """Update our SUB socket's subscriptions."""
69 """Update our SUB socket's subscriptions."""
75 self.stream.setsockopt(zmq.UNSUBSCRIBE, '')
70 self.stream.setsockopt(zmq.UNSUBSCRIBE, '')
76 if '' in self.topics:
71 if '' in self.topics:
77 self.log.debug("Subscribing to: everything")
72 self.log.debug("Subscribing to: everything")
78 self.stream.setsockopt(zmq.SUBSCRIBE, '')
73 self.stream.setsockopt(zmq.SUBSCRIBE, '')
79 else:
74 else:
80 for topic in self.topics:
75 for topic in self.topics:
81 self.log.debug("Subscribing to: %r"%(topic))
76 self.log.debug("Subscribing to: %r"%(topic))
82 self.stream.setsockopt(zmq.SUBSCRIBE, topic)
77 self.stream.setsockopt(zmq.SUBSCRIBE, topic)
83
78
84 def _extract_level(self, topic_str):
79 def _extract_level(self, topic_str):
85 """Turn 'engine.0.INFO.extra' into (logging.INFO, 'engine.0.extra')"""
80 """Turn 'engine.0.INFO.extra' into (logging.INFO, 'engine.0.extra')"""
86 topics = topic_str.split('.')
81 topics = topic_str.split('.')
87 for idx,t in enumerate(topics):
82 for idx,t in enumerate(topics):
88 level = getattr(logging, t, None)
83 level = getattr(logging, t, None)
89 if level is not None:
84 if level is not None:
90 break
85 break
91
86
92 if level is None:
87 if level is None:
93 level = logging.INFO
88 level = logging.INFO
94 else:
89 else:
95 topics.pop(idx)
90 topics.pop(idx)
96
91
97 return level, '.'.join(topics)
92 return level, '.'.join(topics)
98
93
99
94
100 def log_message(self, raw):
95 def log_message(self, raw):
101 """receive and parse a message, then log it."""
96 """receive and parse a message, then log it."""
102 if len(raw) != 2 or '.' not in raw[0]:
97 if len(raw) != 2 or '.' not in raw[0]:
103 self.log.error("Invalid log message: %s"%raw)
98 self.log.error("Invalid log message: %s"%raw)
104 return
99 return
105 else:
100 else:
106 topic, msg = raw
101 topic, msg = raw
107 # don't newline, since log messages always newline:
102 # don't newline, since log messages always newline:
108 topic,level_name = topic.rsplit('.',1)
103 topic,level_name = topic.rsplit('.',1)
109 level,topic = self._extract_level(topic)
104 level,topic = self._extract_level(topic)
110 if msg[-1] == '\n':
105 if msg[-1] == '\n':
111 msg = msg[:-1]
106 msg = msg[:-1]
112 self.log.log(level, "[%s] %s" % (topic, msg))
107 self.log.log(level, "[%s] %s" % (topic, msg))
113
108
@@ -1,184 +1,180 b''
1 """A Task logger that presents our DB interface,
1 """A Task logger that presents our DB interface,
2 but exists entirely in memory and implemented with dicts.
2 but exists entirely in memory and implemented with dicts.
3
3
4 TaskRecords are dicts of the form:
4 TaskRecords are dicts of the form:
5 {
5 {
6 'msg_id' : str(uuid),
6 'msg_id' : str(uuid),
7 'client_uuid' : str(uuid),
7 'client_uuid' : str(uuid),
8 'engine_uuid' : str(uuid) or None,
8 'engine_uuid' : str(uuid) or None,
9 'header' : dict(header),
9 'header' : dict(header),
10 'content': dict(content),
10 'content': dict(content),
11 'buffers': list(buffers),
11 'buffers': list(buffers),
12 'submitted': datetime,
12 'submitted': datetime,
13 'started': datetime or None,
13 'started': datetime or None,
14 'completed': datetime or None,
14 'completed': datetime or None,
15 'resubmitted': datetime or None,
15 'resubmitted': datetime or None,
16 'result_header' : dict(header) or None,
16 'result_header' : dict(header) or None,
17 'result_content' : dict(content) or None,
17 'result_content' : dict(content) or None,
18 'result_buffers' : list(buffers) or None,
18 'result_buffers' : list(buffers) or None,
19 }
19 }
20 With this info, many of the special categories of tasks can be defined by query:
20 With this info, many of the special categories of tasks can be defined by query:
21
21
22 pending: completed is None
22 pending: completed is None
23 client's outstanding: client_uuid = uuid && completed is None
23 client's outstanding: client_uuid = uuid && completed is None
24 MIA: arrived is None (and completed is None)
24 MIA: arrived is None (and completed is None)
25 etc.
25 etc.
26
26
27 EngineRecords are dicts of the form:
27 EngineRecords are dicts of the form:
28 {
28 {
29 'eid' : int(id),
29 'eid' : int(id),
30 'uuid': str(uuid)
30 'uuid': str(uuid)
31 }
31 }
32 This may be extended, but is currently.
32 This may be extended, but is currently.
33
33
34 We support a subset of mongodb operators:
34 We support a subset of mongodb operators:
35 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
35 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
36 """
36 """
37 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
38 # Copyright (C) 2010 The IPython Development Team
38 # Copyright (C) 2010 The IPython Development Team
39 #
39 #
40 # Distributed under the terms of the BSD License. The full license is in
40 # Distributed under the terms of the BSD License. The full license is in
41 # the file COPYING, distributed as part of this software.
41 # the file COPYING, distributed as part of this software.
42 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
43
43
44
44
45 from datetime import datetime
45 from datetime import datetime
46
46
47 from IPython.config.application import Application
47 from IPython.config.configurable import LoggingConfigurable
48 from IPython.config.configurable import Configurable
49
48
50 from IPython.utils.traitlets import Dict, Unicode, Instance
49 from IPython.utils.traitlets import Dict, Unicode, Instance
51
50
52 filters = {
51 filters = {
53 '$lt' : lambda a,b: a < b,
52 '$lt' : lambda a,b: a < b,
54 '$gt' : lambda a,b: b > a,
53 '$gt' : lambda a,b: b > a,
55 '$eq' : lambda a,b: a == b,
54 '$eq' : lambda a,b: a == b,
56 '$ne' : lambda a,b: a != b,
55 '$ne' : lambda a,b: a != b,
57 '$lte': lambda a,b: a <= b,
56 '$lte': lambda a,b: a <= b,
58 '$gte': lambda a,b: a >= b,
57 '$gte': lambda a,b: a >= b,
59 '$in' : lambda a,b: a in b,
58 '$in' : lambda a,b: a in b,
60 '$nin': lambda a,b: a not in b,
59 '$nin': lambda a,b: a not in b,
61 '$all': lambda a,b: all([ a in bb for bb in b ]),
60 '$all': lambda a,b: all([ a in bb for bb in b ]),
62 '$mod': lambda a,b: a%b[0] == b[1],
61 '$mod': lambda a,b: a%b[0] == b[1],
63 '$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
62 '$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
64 }
63 }
65
64
66
65
67 class CompositeFilter(object):
66 class CompositeFilter(object):
68 """Composite filter for matching multiple properties."""
67 """Composite filter for matching multiple properties."""
69
68
70 def __init__(self, dikt):
69 def __init__(self, dikt):
71 self.tests = []
70 self.tests = []
72 self.values = []
71 self.values = []
73 for key, value in dikt.iteritems():
72 for key, value in dikt.iteritems():
74 self.tests.append(filters[key])
73 self.tests.append(filters[key])
75 self.values.append(value)
74 self.values.append(value)
76
75
77 def __call__(self, value):
76 def __call__(self, value):
78 for test,check in zip(self.tests, self.values):
77 for test,check in zip(self.tests, self.values):
79 if not test(value, check):
78 if not test(value, check):
80 return False
79 return False
81 return True
80 return True
82
81
83 class BaseDB(Configurable):
82 class BaseDB(LoggingConfigurable):
84 """Empty Parent class so traitlets work on DB."""
83 """Empty Parent class so traitlets work on DB."""
85 # base configurable traits:
84 # base configurable traits:
86 session = Unicode("")
85 session = Unicode("")
87 log = Instance('logging.Logger')
88 def _log_default(self):
89 return Application.instance().log
90
86
91 class DictDB(BaseDB):
87 class DictDB(BaseDB):
92 """Basic in-memory dict-based object for saving Task Records.
88 """Basic in-memory dict-based object for saving Task Records.
93
89
94 This is the first object to present the DB interface
90 This is the first object to present the DB interface
95 for logging tasks out of memory.
91 for logging tasks out of memory.
96
92
97 The interface is based on MongoDB, so adding a MongoDB
93 The interface is based on MongoDB, so adding a MongoDB
98 backend should be straightforward.
94 backend should be straightforward.
99 """
95 """
100
96
101 _records = Dict()
97 _records = Dict()
102
98
103 def _match_one(self, rec, tests):
99 def _match_one(self, rec, tests):
104 """Check if a specific record matches tests."""
100 """Check if a specific record matches tests."""
105 for key,test in tests.iteritems():
101 for key,test in tests.iteritems():
106 if not test(rec.get(key, None)):
102 if not test(rec.get(key, None)):
107 return False
103 return False
108 return True
104 return True
109
105
110 def _match(self, check):
106 def _match(self, check):
111 """Find all the matches for a check dict."""
107 """Find all the matches for a check dict."""
112 matches = []
108 matches = []
113 tests = {}
109 tests = {}
114 for k,v in check.iteritems():
110 for k,v in check.iteritems():
115 if isinstance(v, dict):
111 if isinstance(v, dict):
116 tests[k] = CompositeFilter(v)
112 tests[k] = CompositeFilter(v)
117 else:
113 else:
118 tests[k] = lambda o: o==v
114 tests[k] = lambda o: o==v
119
115
120 for rec in self._records.itervalues():
116 for rec in self._records.itervalues():
121 if self._match_one(rec, tests):
117 if self._match_one(rec, tests):
122 matches.append(rec)
118 matches.append(rec)
123 return matches
119 return matches
124
120
125 def _extract_subdict(self, rec, keys):
121 def _extract_subdict(self, rec, keys):
126 """extract subdict of keys"""
122 """extract subdict of keys"""
127 d = {}
123 d = {}
128 d['msg_id'] = rec['msg_id']
124 d['msg_id'] = rec['msg_id']
129 for key in keys:
125 for key in keys:
130 d[key] = rec[key]
126 d[key] = rec[key]
131 return d
127 return d
132
128
133 def add_record(self, msg_id, rec):
129 def add_record(self, msg_id, rec):
134 """Add a new Task Record, by msg_id."""
130 """Add a new Task Record, by msg_id."""
135 if self._records.has_key(msg_id):
131 if self._records.has_key(msg_id):
136 raise KeyError("Already have msg_id %r"%(msg_id))
132 raise KeyError("Already have msg_id %r"%(msg_id))
137 self._records[msg_id] = rec
133 self._records[msg_id] = rec
138
134
139 def get_record(self, msg_id):
135 def get_record(self, msg_id):
140 """Get a specific Task Record, by msg_id."""
136 """Get a specific Task Record, by msg_id."""
141 if not self._records.has_key(msg_id):
137 if not self._records.has_key(msg_id):
142 raise KeyError("No such msg_id %r"%(msg_id))
138 raise KeyError("No such msg_id %r"%(msg_id))
143 return self._records[msg_id]
139 return self._records[msg_id]
144
140
145 def update_record(self, msg_id, rec):
141 def update_record(self, msg_id, rec):
146 """Update the data in an existing record."""
142 """Update the data in an existing record."""
147 self._records[msg_id].update(rec)
143 self._records[msg_id].update(rec)
148
144
149 def drop_matching_records(self, check):
145 def drop_matching_records(self, check):
150 """Remove a record from the DB."""
146 """Remove a record from the DB."""
151 matches = self._match(check)
147 matches = self._match(check)
152 for m in matches:
148 for m in matches:
153 del self._records[m['msg_id']]
149 del self._records[m['msg_id']]
154
150
155 def drop_record(self, msg_id):
151 def drop_record(self, msg_id):
156 """Remove a record from the DB."""
152 """Remove a record from the DB."""
157 del self._records[msg_id]
153 del self._records[msg_id]
158
154
159
155
160 def find_records(self, check, keys=None):
156 def find_records(self, check, keys=None):
161 """Find records matching a query dict, optionally extracting subset of keys.
157 """Find records matching a query dict, optionally extracting subset of keys.
162
158
163 Returns dict keyed by msg_id of matching records.
159 Returns dict keyed by msg_id of matching records.
164
160
165 Parameters
161 Parameters
166 ----------
162 ----------
167
163
168 check: dict
164 check: dict
169 mongodb-style query argument
165 mongodb-style query argument
170 keys: list of strs [optional]
166 keys: list of strs [optional]
171 if specified, the subset of keys to extract. msg_id will *always* be
167 if specified, the subset of keys to extract. msg_id will *always* be
172 included.
168 included.
173 """
169 """
174 matches = self._match(check)
170 matches = self._match(check)
175 if keys:
171 if keys:
176 return [ self._extract_subdict(rec, keys) for rec in matches ]
172 return [ self._extract_subdict(rec, keys) for rec in matches ]
177 else:
173 else:
178 return matches
174 return matches
179
175
180
176
181 def get_history(self):
177 def get_history(self):
182 """get all msg_ids, ordered by time submitted."""
178 """get all msg_ids, ordered by time submitted."""
183 msg_ids = self._records.keys()
179 msg_ids = self._records.keys()
184 return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])
180 return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])
@@ -1,170 +1,165 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
4 and hearts are tracked based on their XREQ identities.
4 and hearts are tracked based on their XREQ identities.
5 """
5 """
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2010-2011 The IPython Development Team
7 # Copyright (C) 2010-2011 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 from __future__ import print_function
13 from __future__ import print_function
14 import time
14 import time
15 import uuid
15 import uuid
16
16
17 import zmq
17 import zmq
18 from zmq.devices import ThreadDevice
18 from zmq.devices import ThreadDevice
19 from zmq.eventloop import ioloop, zmqstream
19 from zmq.eventloop import ioloop, zmqstream
20
20
21 from IPython.config.application import Application
21 from IPython.config.configurable import LoggingConfigurable
22 from IPython.config.configurable import Configurable
23 from IPython.utils.traitlets import Set, Instance, CFloat
22 from IPython.utils.traitlets import Set, Instance, CFloat
24
23
25 class Heart(object):
24 class Heart(object):
26 """A basic heart object for responding to a HeartMonitor.
25 """A basic heart object for responding to a HeartMonitor.
27 This is a simple wrapper with defaults for the most common
26 This is a simple wrapper with defaults for the most common
28 Device model for responding to heartbeats.
27 Device model for responding to heartbeats.
29
28
30 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
29 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
31 SUB/XREQ for in/out.
30 SUB/XREQ for in/out.
32
31
33 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
32 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
34 device=None
33 device=None
35 id=None
34 id=None
36 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None):
35 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None):
37 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
36 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
38 self.device.daemon=True
37 self.device.daemon=True
39 self.device.connect_in(in_addr)
38 self.device.connect_in(in_addr)
40 self.device.connect_out(out_addr)
39 self.device.connect_out(out_addr)
41 if in_type == zmq.SUB:
40 if in_type == zmq.SUB:
42 self.device.setsockopt_in(zmq.SUBSCRIBE, "")
41 self.device.setsockopt_in(zmq.SUBSCRIBE, "")
43 if heart_id is None:
42 if heart_id is None:
44 heart_id = str(uuid.uuid4())
43 heart_id = str(uuid.uuid4())
45 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
44 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
46 self.id = heart_id
45 self.id = heart_id
47
46
48 def start(self):
47 def start(self):
49 return self.device.start()
48 return self.device.start()
50
49
51 class HeartMonitor(Configurable):
50 class HeartMonitor(LoggingConfigurable):
52 """A basic HeartMonitor class
51 """A basic HeartMonitor class
53 pingstream: a PUB stream
52 pingstream: a PUB stream
54 pongstream: an XREP stream
53 pongstream: an XREP stream
55 period: the period of the heartbeat in milliseconds"""
54 period: the period of the heartbeat in milliseconds"""
56
55
57 period=CFloat(1000, config=True,
56 period=CFloat(1000, config=True,
58 help='The frequency at which the Hub pings the engines for heartbeats '
57 help='The frequency at which the Hub pings the engines for heartbeats '
59 ' (in ms) [default: 100]',
58 ' (in ms) [default: 100]',
60 )
59 )
61
60
62 log = Instance('logging.Logger')
63 def _log_default(self):
64 return Application.instance().log
65
66 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
61 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
67 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
62 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
68 loop = Instance('zmq.eventloop.ioloop.IOLoop')
63 loop = Instance('zmq.eventloop.ioloop.IOLoop')
69 def _loop_default(self):
64 def _loop_default(self):
70 return ioloop.IOLoop.instance()
65 return ioloop.IOLoop.instance()
71
66
72 # not settable:
67 # not settable:
73 hearts=Set()
68 hearts=Set()
74 responses=Set()
69 responses=Set()
75 on_probation=Set()
70 on_probation=Set()
76 last_ping=CFloat(0)
71 last_ping=CFloat(0)
77 _new_handlers = Set()
72 _new_handlers = Set()
78 _failure_handlers = Set()
73 _failure_handlers = Set()
79 lifetime = CFloat(0)
74 lifetime = CFloat(0)
80 tic = CFloat(0)
75 tic = CFloat(0)
81
76
82 def __init__(self, **kwargs):
77 def __init__(self, **kwargs):
83 super(HeartMonitor, self).__init__(**kwargs)
78 super(HeartMonitor, self).__init__(**kwargs)
84
79
85 self.pongstream.on_recv(self.handle_pong)
80 self.pongstream.on_recv(self.handle_pong)
86
81
87 def start(self):
82 def start(self):
88 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
83 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
89 self.caller.start()
84 self.caller.start()
90
85
91 def add_new_heart_handler(self, handler):
86 def add_new_heart_handler(self, handler):
92 """add a new handler for new hearts"""
87 """add a new handler for new hearts"""
93 self.log.debug("heartbeat::new_heart_handler: %s"%handler)
88 self.log.debug("heartbeat::new_heart_handler: %s"%handler)
94 self._new_handlers.add(handler)
89 self._new_handlers.add(handler)
95
90
96 def add_heart_failure_handler(self, handler):
91 def add_heart_failure_handler(self, handler):
97 """add a new handler for heart failure"""
92 """add a new handler for heart failure"""
98 self.log.debug("heartbeat::new heart failure handler: %s"%handler)
93 self.log.debug("heartbeat::new heart failure handler: %s"%handler)
99 self._failure_handlers.add(handler)
94 self._failure_handlers.add(handler)
100
95
101 def beat(self):
96 def beat(self):
102 self.pongstream.flush()
97 self.pongstream.flush()
103 self.last_ping = self.lifetime
98 self.last_ping = self.lifetime
104
99
105 toc = time.time()
100 toc = time.time()
106 self.lifetime += toc-self.tic
101 self.lifetime += toc-self.tic
107 self.tic = toc
102 self.tic = toc
108 # self.log.debug("heartbeat::%s"%self.lifetime)
103 # self.log.debug("heartbeat::%s"%self.lifetime)
109 goodhearts = self.hearts.intersection(self.responses)
104 goodhearts = self.hearts.intersection(self.responses)
110 missed_beats = self.hearts.difference(goodhearts)
105 missed_beats = self.hearts.difference(goodhearts)
111 heartfailures = self.on_probation.intersection(missed_beats)
106 heartfailures = self.on_probation.intersection(missed_beats)
112 newhearts = self.responses.difference(goodhearts)
107 newhearts = self.responses.difference(goodhearts)
113 map(self.handle_new_heart, newhearts)
108 map(self.handle_new_heart, newhearts)
114 map(self.handle_heart_failure, heartfailures)
109 map(self.handle_heart_failure, heartfailures)
115 self.on_probation = missed_beats.intersection(self.hearts)
110 self.on_probation = missed_beats.intersection(self.hearts)
116 self.responses = set()
111 self.responses = set()
117 # print self.on_probation, self.hearts
112 # print self.on_probation, self.hearts
118 # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
113 # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
119 self.pingstream.send(str(self.lifetime))
114 self.pingstream.send(str(self.lifetime))
120
115
121 def handle_new_heart(self, heart):
116 def handle_new_heart(self, heart):
122 if self._new_handlers:
117 if self._new_handlers:
123 for handler in self._new_handlers:
118 for handler in self._new_handlers:
124 handler(heart)
119 handler(heart)
125 else:
120 else:
126 self.log.info("heartbeat::yay, got new heart %s!"%heart)
121 self.log.info("heartbeat::yay, got new heart %s!"%heart)
127 self.hearts.add(heart)
122 self.hearts.add(heart)
128
123
129 def handle_heart_failure(self, heart):
124 def handle_heart_failure(self, heart):
130 if self._failure_handlers:
125 if self._failure_handlers:
131 for handler in self._failure_handlers:
126 for handler in self._failure_handlers:
132 try:
127 try:
133 handler(heart)
128 handler(heart)
134 except Exception as e:
129 except Exception as e:
135 self.log.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
130 self.log.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
136 pass
131 pass
137 else:
132 else:
138 self.log.info("heartbeat::Heart %s failed :("%heart)
133 self.log.info("heartbeat::Heart %s failed :("%heart)
139 self.hearts.remove(heart)
134 self.hearts.remove(heart)
140
135
141
136
142 def handle_pong(self, msg):
137 def handle_pong(self, msg):
143 "a heart just beat"
138 "a heart just beat"
144 if msg[1] == str(self.lifetime):
139 if msg[1] == str(self.lifetime):
145 delta = time.time()-self.tic
140 delta = time.time()-self.tic
146 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
141 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
147 self.responses.add(msg[0])
142 self.responses.add(msg[0])
148 elif msg[1] == str(self.last_ping):
143 elif msg[1] == str(self.last_ping):
149 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
144 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
150 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
145 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
151 self.responses.add(msg[0])
146 self.responses.add(msg[0])
152 else:
147 else:
153 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
148 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
154 (msg[1],self.lifetime))
149 (msg[1],self.lifetime))
155
150
156
151
157 if __name__ == '__main__':
152 if __name__ == '__main__':
158 loop = ioloop.IOLoop.instance()
153 loop = ioloop.IOLoop.instance()
159 context = zmq.Context()
154 context = zmq.Context()
160 pub = context.socket(zmq.PUB)
155 pub = context.socket(zmq.PUB)
161 pub.bind('tcp://127.0.0.1:5555')
156 pub.bind('tcp://127.0.0.1:5555')
162 xrep = context.socket(zmq.XREP)
157 xrep = context.socket(zmq.XREP)
163 xrep.bind('tcp://127.0.0.1:5556')
158 xrep.bind('tcp://127.0.0.1:5556')
164
159
165 outstream = zmqstream.ZMQStream(pub, loop)
160 outstream = zmqstream.ZMQStream(pub, loop)
166 instream = zmqstream.ZMQStream(xrep, loop)
161 instream = zmqstream.ZMQStream(xrep, loop)
167
162
168 hb = HeartMonitor(loop, outstream, instream)
163 hb = HeartMonitor(loop, outstream, instream)
169
164
170 loop.start()
165 loop.start()
General Comments 0
You need to be logged in to leave comments. Login now