##// END OF EJS Templates
add LoggingConfigurable base class
MinRK -
Show More
@@ -1,263 +1,278 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 A base class for objects that are configurable.
5 5
6 6 Authors:
7 7
8 8 * Brian Granger
9 9 * Fernando Perez
10 10 """
11 11
12 12 #-----------------------------------------------------------------------------
13 13 # Copyright (C) 2008-2010 The IPython Development Team
14 14 #
15 15 # Distributed under the terms of the BSD License. The full license is in
16 16 # the file COPYING, distributed as part of this software.
17 17 #-----------------------------------------------------------------------------
18 18
19 19 #-----------------------------------------------------------------------------
20 20 # Imports
21 21 #-----------------------------------------------------------------------------
22 22
23 23 from copy import deepcopy
24 24 import datetime
25 25
26 26 from loader import Config
27 27 from IPython.utils.traitlets import HasTraits, Instance
28 28 from IPython.utils.text import indent
29 29
30 30
31 31 #-----------------------------------------------------------------------------
32 32 # Helper classes for Configurables
33 33 #-----------------------------------------------------------------------------
34 34
35 35
36 36 class ConfigurableError(Exception):
37 37 pass
38 38
39 39
40 40 class MultipleInstanceError(ConfigurableError):
41 41 pass
42 42
43 43 #-----------------------------------------------------------------------------
44 44 # Configurable implementation
45 45 #-----------------------------------------------------------------------------
46 46
47 47 class Configurable(HasTraits):
48 48
49 49 config = Instance(Config,(),{})
50 50 created = None
51 51
52 52 def __init__(self, **kwargs):
53 53 """Create a conigurable given a config config.
54 54
55 55 Parameters
56 56 ----------
57 57 config : Config
58 58 If this is empty, default values are used. If config is a
59 59 :class:`Config` instance, it will be used to configure the
60 60 instance.
61 61
62 62 Notes
63 63 -----
64 64 Subclasses of Configurable must call the :meth:`__init__` method of
65 65 :class:`Configurable` *before* doing anything else and using
66 66 :func:`super`::
67 67
68 68 class MyConfigurable(Configurable):
69 69 def __init__(self, config=None):
70 70 super(MyConfigurable, self).__init__(config)
71 71 # Then any other code you need to finish initialization.
72 72
73 73 This ensures that instances will be configured properly.
74 74 """
75 75 config = kwargs.pop('config', None)
76 76 if config is not None:
77 77 # We used to deepcopy, but for now we are trying to just save
78 78 # by reference. This *could* have side effects as all components
79 79 # will share config. In fact, I did find such a side effect in
80 80 # _config_changed below. If a config attribute value was a mutable type
81 81 # all instances of a component were getting the same copy, effectively
82 82 # making that a class attribute.
83 83 # self.config = deepcopy(config)
84 84 self.config = config
85 85 # This should go second so individual keyword arguments override
86 86 # the values in config.
87 87 super(Configurable, self).__init__(**kwargs)
88 88 self.created = datetime.datetime.now()
89 89
90 90 #-------------------------------------------------------------------------
91 91 # Static trait notifiations
92 92 #-------------------------------------------------------------------------
93 93
94 94 def _config_changed(self, name, old, new):
95 95 """Update all the class traits having ``config=True`` as metadata.
96 96
97 97 For any class trait with a ``config`` metadata attribute that is
98 98 ``True``, we update the trait with the value of the corresponding
99 99 config entry.
100 100 """
101 101 # Get all traits with a config metadata entry that is True
102 102 traits = self.traits(config=True)
103 103
104 104 # We auto-load config section for this class as well as any parent
105 105 # classes that are Configurable subclasses. This starts with Configurable
106 106 # and works down the mro loading the config for each section.
107 107 section_names = [cls.__name__ for cls in \
108 108 reversed(self.__class__.__mro__) if
109 109 issubclass(cls, Configurable) and issubclass(self.__class__, cls)]
110 110
111 111 for sname in section_names:
112 112 # Don't do a blind getattr as that would cause the config to
113 113 # dynamically create the section with name self.__class__.__name__.
114 114 if new._has_section(sname):
115 115 my_config = new[sname]
116 116 for k, v in traits.iteritems():
117 117 # Don't allow traitlets with config=True to start with
118 118 # uppercase. Otherwise, they are confused with Config
119 119 # subsections. But, developers shouldn't have uppercase
120 120 # attributes anyways! (PEP 6)
121 121 if k[0].upper()==k[0] and not k.startswith('_'):
122 122 raise ConfigurableError('Configurable traitlets with '
123 123 'config=True must start with a lowercase so they are '
124 124 'not confused with Config subsections: %s.%s' % \
125 125 (self.__class__.__name__, k))
126 126 try:
127 127 # Here we grab the value from the config
128 128 # If k has the naming convention of a config
129 129 # section, it will be auto created.
130 130 config_value = my_config[k]
131 131 except KeyError:
132 132 pass
133 133 else:
134 134 # print "Setting %s.%s from %s.%s=%r" % \
135 135 # (self.__class__.__name__,k,sname,k,config_value)
136 136 # We have to do a deepcopy here if we don't deepcopy the entire
137 137 # config object. If we don't, a mutable config_value will be
138 138 # shared by all instances, effectively making it a class attribute.
139 139 setattr(self, k, deepcopy(config_value))
140 140
141 141 @classmethod
142 142 def class_get_help(cls):
143 143 """Get the help string for this class in ReST format."""
144 144 cls_traits = cls.class_traits(config=True)
145 145 final_help = []
146 146 final_help.append(u'%s options' % cls.__name__)
147 147 final_help.append(len(final_help[0])*u'-')
148 148 for k,v in cls.class_traits(config=True).iteritems():
149 149 help = cls.class_get_trait_help(v)
150 150 final_help.append(help)
151 151 return '\n'.join(final_help)
152 152
153 153 @classmethod
154 154 def class_get_trait_help(cls, trait):
155 155 """Get the help string for a single """
156 156 lines = []
157 157 header = "%s.%s : %s" % (cls.__name__, trait.name, trait.__class__.__name__)
158 158 try:
159 159 dvr = repr(trait.get_default_value())
160 160 except Exception:
161 161 dvr = None # ignore defaults we can't construct
162 162 if dvr is not None:
163 163 header += ' [default: %s]'%dvr
164 164 lines.append(header)
165 165
166 166 help = trait.get_metadata('help')
167 167 if help is not None:
168 168 lines.append(indent(help.strip(), flatten=True))
169 169 if 'Enum' in trait.__class__.__name__:
170 170 # include Enum choices
171 171 lines.append(indent('Choices: %r'%(trait.values,), flatten=True))
172 172 return '\n'.join(lines)
173 173
174 174 @classmethod
175 175 def class_print_help(cls):
176 176 print cls.class_get_help()
177 177
178 178
179 179 class SingletonConfigurable(Configurable):
180 180 """A configurable that only allows one instance.
181 181
182 182 This class is for classes that should only have one instance of itself
183 183 or *any* subclass. To create and retrieve such a class use the
184 184 :meth:`SingletonConfigurable.instance` method.
185 185 """
186 186
187 187 _instance = None
188 188
189 189 @classmethod
190 190 def _walk_mro(cls):
191 191 """Walk the cls.mro() for parent classes that are also singletons
192 192
193 193 For use in instance()
194 194 """
195 195
196 196 for subclass in cls.mro():
197 197 if issubclass(cls, subclass) and \
198 198 issubclass(subclass, SingletonConfigurable) and \
199 199 subclass != SingletonConfigurable:
200 200 yield subclass
201 201
202 202 @classmethod
203 203 def clear_instance(cls):
204 204 """unset _instance for this class and singleton parents.
205 205 """
206 206 if not cls.initialized():
207 207 return
208 208 for subclass in cls._walk_mro():
209 209 if isinstance(subclass._instance, cls):
210 210 # only clear instances that are instances
211 211 # of the calling class
212 212 subclass._instance = None
213 213
214 214 @classmethod
215 215 def instance(cls, *args, **kwargs):
216 216 """Returns a global instance of this class.
217 217
218 218 This method create a new instance if none have previously been created
219 219 and returns a previously created instance is one already exists.
220 220
221 221 The arguments and keyword arguments passed to this method are passed
222 222 on to the :meth:`__init__` method of the class upon instantiation.
223 223
224 224 Examples
225 225 --------
226 226
227 227 Create a singleton class using instance, and retrieve it::
228 228
229 229 >>> from IPython.config.configurable import SingletonConfigurable
230 230 >>> class Foo(SingletonConfigurable): pass
231 231 >>> foo = Foo.instance()
232 232 >>> foo == Foo.instance()
233 233 True
234 234
235 235 Create a subclass that is retrived using the base class instance::
236 236
237 237 >>> class Bar(SingletonConfigurable): pass
238 238 >>> class Bam(Bar): pass
239 239 >>> bam = Bam.instance()
240 240 >>> bam == Bar.instance()
241 241 True
242 242 """
243 243 # Create and save the instance
244 244 if cls._instance is None:
245 245 inst = cls(*args, **kwargs)
246 246 # Now make sure that the instance will also be returned by
247 247 # parent classes' _instance attribute.
248 248 for subclass in cls._walk_mro():
249 249 subclass._instance = inst
250 250
251 251 if isinstance(cls._instance, cls):
252 252 return cls._instance
253 253 else:
254 254 raise MultipleInstanceError(
255 255 'Multiple incompatible subclass instances of '
256 256 '%s are being created.' % cls.__name__
257 257 )
258 258
259 259 @classmethod
260 260 def initialized(cls):
261 261 """Has an instance been created?"""
262 262 return hasattr(cls, "_instance") and cls._instance is not None
263 263
264
265 class LoggingConfigurable(Configurable):
266 """A parent class for Configurables that log.
267
268 Subclasses have a log trait, and the default behavior
269 is to get the logger from the currently running Application
270 via Application.instance().log.
271 """
272
273 log = Instance('logging.Logger')
274 def _log_default(self):
275 from IPython.config.application import Application
276 return Application.instance().log
277
278 No newline at end of file
@@ -1,1072 +1,1069 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 Facilities for launching IPython processes asynchronously.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 import copy
19 19 import logging
20 20 import os
21 21 import re
22 22 import stat
23 23
24 24 # signal imports, handling various platforms, versions
25 25
26 26 from signal import SIGINT, SIGTERM
27 27 try:
28 28 from signal import SIGKILL
29 29 except ImportError:
30 30 # Windows
31 31 SIGKILL=SIGTERM
32 32
33 33 try:
34 34 # Windows >= 2.7, 3.2
35 35 from signal import CTRL_C_EVENT as SIGINT
36 36 except ImportError:
37 37 pass
38 38
39 39 from subprocess import Popen, PIPE, STDOUT
40 40 try:
41 41 from subprocess import check_output
42 42 except ImportError:
43 43 # pre-2.7, define check_output with Popen
44 44 def check_output(*args, **kwargs):
45 45 kwargs.update(dict(stdout=PIPE))
46 46 p = Popen(*args, **kwargs)
47 47 out,err = p.communicate()
48 48 return out
49 49
50 50 from zmq.eventloop import ioloop
51 51
52 52 from IPython.config.application import Application
53 from IPython.config.configurable import Configurable
53 from IPython.config.configurable import LoggingConfigurable
54 54 from IPython.utils.text import EvalFormatter
55 55 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
56 56 from IPython.utils.path import get_ipython_module_path
57 57 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
58 58
59 59 from .win32support import forward_read_events
60 60
61 61 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
62 62
63 63 WINDOWS = os.name == 'nt'
64 64
65 65 #-----------------------------------------------------------------------------
66 66 # Paths to the kernel apps
67 67 #-----------------------------------------------------------------------------
68 68
69 69
70 70 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
71 71 'IPython.parallel.apps.ipclusterapp'
72 72 ))
73 73
74 74 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
75 75 'IPython.parallel.apps.ipengineapp'
76 76 ))
77 77
78 78 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
79 79 'IPython.parallel.apps.ipcontrollerapp'
80 80 ))
81 81
82 82 #-----------------------------------------------------------------------------
83 83 # Base launchers and errors
84 84 #-----------------------------------------------------------------------------
85 85
86 86
87 87 class LauncherError(Exception):
88 88 pass
89 89
90 90
91 91 class ProcessStateError(LauncherError):
92 92 pass
93 93
94 94
95 95 class UnknownStatus(LauncherError):
96 96 pass
97 97
98 98
99 class BaseLauncher(Configurable):
99 class BaseLauncher(LoggingConfigurable):
100 100 """An asbtraction for starting, stopping and signaling a process."""
101 101
102 102 # In all of the launchers, the work_dir is where child processes will be
103 103 # run. This will usually be the profile_dir, but may not be. any work_dir
104 104 # passed into the __init__ method will override the config value.
105 105 # This should not be used to set the work_dir for the actual engine
106 106 # and controller. Instead, use their own config files or the
107 107 # controller_args, engine_args attributes of the launchers to add
108 108 # the work_dir option.
109 109 work_dir = Unicode(u'.')
110 110 loop = Instance('zmq.eventloop.ioloop.IOLoop')
111 log = Instance('logging.Logger')
112 def _log_default(self):
113 return Application.instance().log
114 111
115 112 start_data = Any()
116 113 stop_data = Any()
117 114
118 115 def _loop_default(self):
119 116 return ioloop.IOLoop.instance()
120 117
121 118 def __init__(self, work_dir=u'.', config=None, **kwargs):
122 119 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
123 120 self.state = 'before' # can be before, running, after
124 121 self.stop_callbacks = []
125 122 self.start_data = None
126 123 self.stop_data = None
127 124
128 125 @property
129 126 def args(self):
130 127 """A list of cmd and args that will be used to start the process.
131 128
132 129 This is what is passed to :func:`spawnProcess` and the first element
133 130 will be the process name.
134 131 """
135 132 return self.find_args()
136 133
137 134 def find_args(self):
138 135 """The ``.args`` property calls this to find the args list.
139 136
140 137 Subcommand should implement this to construct the cmd and args.
141 138 """
142 139 raise NotImplementedError('find_args must be implemented in a subclass')
143 140
144 141 @property
145 142 def arg_str(self):
146 143 """The string form of the program arguments."""
147 144 return ' '.join(self.args)
148 145
149 146 @property
150 147 def running(self):
151 148 """Am I running."""
152 149 if self.state == 'running':
153 150 return True
154 151 else:
155 152 return False
156 153
157 154 def start(self):
158 155 """Start the process.
159 156
160 157 This must return a deferred that fires with information about the
161 158 process starting (like a pid, job id, etc.).
162 159 """
163 160 raise NotImplementedError('start must be implemented in a subclass')
164 161
165 162 def stop(self):
166 163 """Stop the process and notify observers of stopping.
167 164
168 165 This must return a deferred that fires with information about the
169 166 processing stopping, like errors that occur while the process is
170 167 attempting to be shut down. This deferred won't fire when the process
171 168 actually stops. To observe the actual process stopping, see
172 169 :func:`observe_stop`.
173 170 """
174 171 raise NotImplementedError('stop must be implemented in a subclass')
175 172
176 173 def on_stop(self, f):
177 174 """Get a deferred that will fire when the process stops.
178 175
179 176 The deferred will fire with data that contains information about
180 177 the exit status of the process.
181 178 """
182 179 if self.state=='after':
183 180 return f(self.stop_data)
184 181 else:
185 182 self.stop_callbacks.append(f)
186 183
187 184 def notify_start(self, data):
188 185 """Call this to trigger startup actions.
189 186
190 187 This logs the process startup and sets the state to 'running'. It is
191 188 a pass-through so it can be used as a callback.
192 189 """
193 190
194 191 self.log.info('Process %r started: %r' % (self.args[0], data))
195 192 self.start_data = data
196 193 self.state = 'running'
197 194 return data
198 195
199 196 def notify_stop(self, data):
200 197 """Call this to trigger process stop actions.
201 198
202 199 This logs the process stopping and sets the state to 'after'. Call
203 200 this to trigger all the deferreds from :func:`observe_stop`."""
204 201
205 202 self.log.info('Process %r stopped: %r' % (self.args[0], data))
206 203 self.stop_data = data
207 204 self.state = 'after'
208 205 for i in range(len(self.stop_callbacks)):
209 206 d = self.stop_callbacks.pop()
210 207 d(data)
211 208 return data
212 209
213 210 def signal(self, sig):
214 211 """Signal the process.
215 212
216 213 Return a semi-meaningless deferred after signaling the process.
217 214
218 215 Parameters
219 216 ----------
220 217 sig : str or int
221 218 'KILL', 'INT', etc., or any signal number
222 219 """
223 220 raise NotImplementedError('signal must be implemented in a subclass')
224 221
225 222
226 223 #-----------------------------------------------------------------------------
227 224 # Local process launchers
228 225 #-----------------------------------------------------------------------------
229 226
230 227
231 228 class LocalProcessLauncher(BaseLauncher):
232 229 """Start and stop an external process in an asynchronous manner.
233 230
234 231 This will launch the external process with a working directory of
235 232 ``self.work_dir``.
236 233 """
237 234
238 235 # This is used to to construct self.args, which is passed to
239 236 # spawnProcess.
240 237 cmd_and_args = List([])
241 238 poll_frequency = Int(100) # in ms
242 239
243 240 def __init__(self, work_dir=u'.', config=None, **kwargs):
244 241 super(LocalProcessLauncher, self).__init__(
245 242 work_dir=work_dir, config=config, **kwargs
246 243 )
247 244 self.process = None
248 245 self.start_deferred = None
249 246 self.poller = None
250 247
251 248 def find_args(self):
252 249 return self.cmd_and_args
253 250
254 251 def start(self):
255 252 if self.state == 'before':
256 253 self.process = Popen(self.args,
257 254 stdout=PIPE,stderr=PIPE,stdin=PIPE,
258 255 env=os.environ,
259 256 cwd=self.work_dir
260 257 )
261 258 if WINDOWS:
262 259 self.stdout = forward_read_events(self.process.stdout)
263 260 self.stderr = forward_read_events(self.process.stderr)
264 261 else:
265 262 self.stdout = self.process.stdout.fileno()
266 263 self.stderr = self.process.stderr.fileno()
267 264 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
268 265 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
269 266 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
270 267 self.poller.start()
271 268 self.notify_start(self.process.pid)
272 269 else:
273 270 s = 'The process was already started and has state: %r' % self.state
274 271 raise ProcessStateError(s)
275 272
276 273 def stop(self):
277 274 return self.interrupt_then_kill()
278 275
279 276 def signal(self, sig):
280 277 if self.state == 'running':
281 278 if WINDOWS and sig != SIGINT:
282 279 # use Windows tree-kill for better child cleanup
283 280 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
284 281 else:
285 282 self.process.send_signal(sig)
286 283
287 284 def interrupt_then_kill(self, delay=2.0):
288 285 """Send INT, wait a delay and then send KILL."""
289 286 try:
290 287 self.signal(SIGINT)
291 288 except Exception:
292 289 self.log.debug("interrupt failed")
293 290 pass
294 291 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
295 292 self.killer.start()
296 293
297 294 # callbacks, etc:
298 295
299 296 def handle_stdout(self, fd, events):
300 297 if WINDOWS:
301 298 line = self.stdout.recv()
302 299 else:
303 300 line = self.process.stdout.readline()
304 301 # a stopped process will be readable but return empty strings
305 302 if line:
306 303 self.log.info(line[:-1])
307 304 else:
308 305 self.poll()
309 306
310 307 def handle_stderr(self, fd, events):
311 308 if WINDOWS:
312 309 line = self.stderr.recv()
313 310 else:
314 311 line = self.process.stderr.readline()
315 312 # a stopped process will be readable but return empty strings
316 313 if line:
317 314 self.log.error(line[:-1])
318 315 else:
319 316 self.poll()
320 317
321 318 def poll(self):
322 319 status = self.process.poll()
323 320 if status is not None:
324 321 self.poller.stop()
325 322 self.loop.remove_handler(self.stdout)
326 323 self.loop.remove_handler(self.stderr)
327 324 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
328 325 return status
329 326
330 327 class LocalControllerLauncher(LocalProcessLauncher):
331 328 """Launch a controller as a regular external process."""
332 329
333 330 controller_cmd = List(ipcontroller_cmd_argv, config=True,
334 331 help="""Popen command to launch ipcontroller.""")
335 332 # Command line arguments to ipcontroller.
336 333 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
337 334 help="""command-line args to pass to ipcontroller""")
338 335
339 336 def find_args(self):
340 337 return self.controller_cmd + self.controller_args
341 338
342 339 def start(self, profile_dir):
343 340 """Start the controller by profile_dir."""
344 341 self.controller_args.extend(['profile_dir=%s'%profile_dir])
345 342 self.profile_dir = unicode(profile_dir)
346 343 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
347 344 return super(LocalControllerLauncher, self).start()
348 345
349 346
350 347 class LocalEngineLauncher(LocalProcessLauncher):
351 348 """Launch a single engine as a regular externall process."""
352 349
353 350 engine_cmd = List(ipengine_cmd_argv, config=True,
354 351 help="""command to launch the Engine.""")
355 352 # Command line arguments for ipengine.
356 353 engine_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
357 354 help="command-line arguments to pass to ipengine"
358 355 )
359 356
360 357 def find_args(self):
361 358 return self.engine_cmd + self.engine_args
362 359
363 360 def start(self, profile_dir):
364 361 """Start the engine by profile_dir."""
365 362 self.engine_args.extend(['profile_dir=%s'%profile_dir])
366 363 self.profile_dir = unicode(profile_dir)
367 364 return super(LocalEngineLauncher, self).start()
368 365
369 366
370 367 class LocalEngineSetLauncher(BaseLauncher):
371 368 """Launch a set of engines as regular external processes."""
372 369
373 370 # Command line arguments for ipengine.
374 371 engine_args = List(
375 372 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
376 373 help="command-line arguments to pass to ipengine"
377 374 )
378 375 # launcher class
379 376 launcher_class = LocalEngineLauncher
380 377
381 378 launchers = Dict()
382 379 stop_data = Dict()
383 380
384 381 def __init__(self, work_dir=u'.', config=None, **kwargs):
385 382 super(LocalEngineSetLauncher, self).__init__(
386 383 work_dir=work_dir, config=config, **kwargs
387 384 )
388 385 self.stop_data = {}
389 386
390 387 def start(self, n, profile_dir):
391 388 """Start n engines by profile or profile_dir."""
392 389 self.profile_dir = unicode(profile_dir)
393 390 dlist = []
394 391 for i in range(n):
395 392 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
396 393 # Copy the engine args over to each engine launcher.
397 394 el.engine_args = copy.deepcopy(self.engine_args)
398 395 el.on_stop(self._notice_engine_stopped)
399 396 d = el.start(profile_dir)
400 397 if i==0:
401 398 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
402 399 self.launchers[i] = el
403 400 dlist.append(d)
404 401 self.notify_start(dlist)
405 402 # The consumeErrors here could be dangerous
406 403 # dfinal = gatherBoth(dlist, consumeErrors=True)
407 404 # dfinal.addCallback(self.notify_start)
408 405 return dlist
409 406
410 407 def find_args(self):
411 408 return ['engine set']
412 409
413 410 def signal(self, sig):
414 411 dlist = []
415 412 for el in self.launchers.itervalues():
416 413 d = el.signal(sig)
417 414 dlist.append(d)
418 415 # dfinal = gatherBoth(dlist, consumeErrors=True)
419 416 return dlist
420 417
421 418 def interrupt_then_kill(self, delay=1.0):
422 419 dlist = []
423 420 for el in self.launchers.itervalues():
424 421 d = el.interrupt_then_kill(delay)
425 422 dlist.append(d)
426 423 # dfinal = gatherBoth(dlist, consumeErrors=True)
427 424 return dlist
428 425
429 426 def stop(self):
430 427 return self.interrupt_then_kill()
431 428
432 429 def _notice_engine_stopped(self, data):
433 430 pid = data['pid']
434 431 for idx,el in self.launchers.iteritems():
435 432 if el.process.pid == pid:
436 433 break
437 434 self.launchers.pop(idx)
438 435 self.stop_data[idx] = data
439 436 if not self.launchers:
440 437 self.notify_stop(self.stop_data)
441 438
442 439
443 440 #-----------------------------------------------------------------------------
444 441 # MPIExec launchers
445 442 #-----------------------------------------------------------------------------
446 443
447 444
448 445 class MPIExecLauncher(LocalProcessLauncher):
449 446 """Launch an external process using mpiexec."""
450 447
451 448 mpi_cmd = List(['mpiexec'], config=True,
452 449 help="The mpiexec command to use in starting the process."
453 450 )
454 451 mpi_args = List([], config=True,
455 452 help="The command line arguments to pass to mpiexec."
456 453 )
457 454 program = List(['date'], config=True,
458 455 help="The program to start via mpiexec.")
459 456 program_args = List([], config=True,
460 457 help="The command line argument to the program."
461 458 )
462 459 n = Int(1)
463 460
464 461 def find_args(self):
465 462 """Build self.args using all the fields."""
466 463 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
467 464 self.program + self.program_args
468 465
469 466 def start(self, n):
470 467 """Start n instances of the program using mpiexec."""
471 468 self.n = n
472 469 return super(MPIExecLauncher, self).start()
473 470
474 471
475 472 class MPIExecControllerLauncher(MPIExecLauncher):
476 473 """Launch a controller using mpiexec."""
477 474
478 475 controller_cmd = List(ipcontroller_cmd_argv, config=True,
479 476 help="Popen command to launch the Contropper"
480 477 )
481 478 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
482 479 help="Command line arguments to pass to ipcontroller."
483 480 )
484 481 n = Int(1)
485 482
486 483 def start(self, profile_dir):
487 484 """Start the controller by profile_dir."""
488 485 self.controller_args.extend(['profile_dir=%s'%profile_dir])
489 486 self.profile_dir = unicode(profile_dir)
490 487 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
491 488 return super(MPIExecControllerLauncher, self).start(1)
492 489
493 490 def find_args(self):
494 491 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
495 492 self.controller_cmd + self.controller_args
496 493
497 494
498 495 class MPIExecEngineSetLauncher(MPIExecLauncher):
499 496
500 497 program = List(ipengine_cmd_argv, config=True,
501 498 help="Popen command for ipengine"
502 499 )
503 500 program_args = List(
504 501 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
505 502 help="Command line arguments for ipengine."
506 503 )
507 504 n = Int(1)
508 505
509 506 def start(self, n, profile_dir):
510 507 """Start n engines by profile or profile_dir."""
511 508 self.program_args.extend(['profile_dir=%s'%profile_dir])
512 509 self.profile_dir = unicode(profile_dir)
513 510 self.n = n
514 511 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
515 512 return super(MPIExecEngineSetLauncher, self).start(n)
516 513
517 514 #-----------------------------------------------------------------------------
518 515 # SSH launchers
519 516 #-----------------------------------------------------------------------------
520 517
521 518 # TODO: Get SSH Launcher working again.
522 519
523 520 class SSHLauncher(LocalProcessLauncher):
524 521 """A minimal launcher for ssh.
525 522
526 523 To be useful this will probably have to be extended to use the ``sshx``
527 524 idea for environment variables. There could be other things this needs
528 525 as well.
529 526 """
530 527
531 528 ssh_cmd = List(['ssh'], config=True,
532 529 help="command for starting ssh")
533 530 ssh_args = List(['-tt'], config=True,
534 531 help="args to pass to ssh")
535 532 program = List(['date'], config=True,
536 533 help="Program to launch via ssh")
537 534 program_args = List([], config=True,
538 535 help="args to pass to remote program")
539 536 hostname = Unicode('', config=True,
540 537 help="hostname on which to launch the program")
541 538 user = Unicode('', config=True,
542 539 help="username for ssh")
543 540 location = Unicode('', config=True,
544 541 help="user@hostname location for ssh in one setting")
545 542
546 543 def _hostname_changed(self, name, old, new):
547 544 if self.user:
548 545 self.location = u'%s@%s' % (self.user, new)
549 546 else:
550 547 self.location = new
551 548
552 549 def _user_changed(self, name, old, new):
553 550 self.location = u'%s@%s' % (new, self.hostname)
554 551
555 552 def find_args(self):
556 553 return self.ssh_cmd + self.ssh_args + [self.location] + \
557 554 self.program + self.program_args
558 555
559 556 def start(self, profile_dir, hostname=None, user=None):
560 557 self.profile_dir = unicode(profile_dir)
561 558 if hostname is not None:
562 559 self.hostname = hostname
563 560 if user is not None:
564 561 self.user = user
565 562
566 563 return super(SSHLauncher, self).start()
567 564
568 565 def signal(self, sig):
569 566 if self.state == 'running':
570 567 # send escaped ssh connection-closer
571 568 self.process.stdin.write('~.')
572 569 self.process.stdin.flush()
573 570
574 571
575 572
576 573 class SSHControllerLauncher(SSHLauncher):
577 574
578 575 program = List(ipcontroller_cmd_argv, config=True,
579 576 help="remote ipcontroller command.")
580 577 program_args = List(['--reuse-files', '--log-to-file','log_level=%i'%logging.INFO], config=True,
581 578 help="Command line arguments to ipcontroller.")
582 579
583 580
584 581 class SSHEngineLauncher(SSHLauncher):
585 582 program = List(ipengine_cmd_argv, config=True,
586 583 help="remote ipengine command.")
587 584 # Command line arguments for ipengine.
588 585 program_args = List(
589 586 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
590 587 help="Command line arguments to ipengine."
591 588 )
592 589
593 590 class SSHEngineSetLauncher(LocalEngineSetLauncher):
594 591 launcher_class = SSHEngineLauncher
595 592 engines = Dict(config=True,
596 593 help="""dict of engines to launch. This is a dict by hostname of ints,
597 594 corresponding to the number of engines to start on that host.""")
598 595
599 596 def start(self, n, profile_dir):
600 597 """Start engines by profile or profile_dir.
601 598 `n` is ignored, and the `engines` config property is used instead.
602 599 """
603 600
604 601 self.profile_dir = unicode(profile_dir)
605 602 dlist = []
606 603 for host, n in self.engines.iteritems():
607 604 if isinstance(n, (tuple, list)):
608 605 n, args = n
609 606 else:
610 607 args = copy.deepcopy(self.engine_args)
611 608
612 609 if '@' in host:
613 610 user,host = host.split('@',1)
614 611 else:
615 612 user=None
616 613 for i in range(n):
617 614 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
618 615
619 616 # Copy the engine args over to each engine launcher.
620 617 i
621 618 el.program_args = args
622 619 el.on_stop(self._notice_engine_stopped)
623 620 d = el.start(profile_dir, user=user, hostname=host)
624 621 if i==0:
625 622 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
626 623 self.launchers[host+str(i)] = el
627 624 dlist.append(d)
628 625 self.notify_start(dlist)
629 626 return dlist
630 627
631 628
632 629
633 630 #-----------------------------------------------------------------------------
634 631 # Windows HPC Server 2008 scheduler launchers
635 632 #-----------------------------------------------------------------------------
636 633
637 634
638 635 # This is only used on Windows.
639 636 def find_job_cmd():
640 637 if WINDOWS:
641 638 try:
642 639 return find_cmd('job')
643 640 except (FindCmdError, ImportError):
644 641 # ImportError will be raised if win32api is not installed
645 642 return 'job'
646 643 else:
647 644 return 'job'
648 645
649 646
650 647 class WindowsHPCLauncher(BaseLauncher):
651 648
652 649 job_id_regexp = Unicode(r'\d+', config=True,
653 650 help="""A regular expression used to get the job id from the output of the
654 651 submit_command. """
655 652 )
656 653 job_file_name = Unicode(u'ipython_job.xml', config=True,
657 654 help="The filename of the instantiated job script.")
658 655 # The full path to the instantiated job script. This gets made dynamically
659 656 # by combining the work_dir with the job_file_name.
660 657 job_file = Unicode(u'')
661 658 scheduler = Unicode('', config=True,
662 659 help="The hostname of the scheduler to submit the job to.")
663 660 job_cmd = Unicode(find_job_cmd(), config=True,
664 661 help="The command for submitting jobs.")
665 662
666 663 def __init__(self, work_dir=u'.', config=None, **kwargs):
667 664 super(WindowsHPCLauncher, self).__init__(
668 665 work_dir=work_dir, config=config, **kwargs
669 666 )
670 667
671 668 @property
672 669 def job_file(self):
673 670 return os.path.join(self.work_dir, self.job_file_name)
674 671
675 672 def write_job_file(self, n):
676 673 raise NotImplementedError("Implement write_job_file in a subclass.")
677 674
678 675 def find_args(self):
679 676 return [u'job.exe']
680 677
681 678 def parse_job_id(self, output):
682 679 """Take the output of the submit command and return the job id."""
683 680 m = re.search(self.job_id_regexp, output)
684 681 if m is not None:
685 682 job_id = m.group()
686 683 else:
687 684 raise LauncherError("Job id couldn't be determined: %s" % output)
688 685 self.job_id = job_id
689 686 self.log.info('Job started with job id: %r' % job_id)
690 687 return job_id
691 688
692 689 def start(self, n):
693 690 """Start n copies of the process using the Win HPC job scheduler."""
694 691 self.write_job_file(n)
695 692 args = [
696 693 'submit',
697 694 '/jobfile:%s' % self.job_file,
698 695 '/scheduler:%s' % self.scheduler
699 696 ]
700 697 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
701 698 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
702 699 output = check_output([self.job_cmd]+args,
703 700 env=os.environ,
704 701 cwd=self.work_dir,
705 702 stderr=STDOUT
706 703 )
707 704 job_id = self.parse_job_id(output)
708 705 self.notify_start(job_id)
709 706 return job_id
710 707
711 708 def stop(self):
712 709 args = [
713 710 'cancel',
714 711 self.job_id,
715 712 '/scheduler:%s' % self.scheduler
716 713 ]
717 714 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
718 715 try:
719 716 output = check_output([self.job_cmd]+args,
720 717 env=os.environ,
721 718 cwd=self.work_dir,
722 719 stderr=STDOUT
723 720 )
724 721 except:
725 722 output = 'The job already appears to be stoppped: %r' % self.job_id
726 723 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
727 724 return output
728 725
729 726
730 727 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
731 728
732 729 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
733 730 help="WinHPC xml job file.")
734 731 extra_args = List([], config=False,
735 732 help="extra args to pass to ipcontroller")
736 733
737 734 def write_job_file(self, n):
738 735 job = IPControllerJob(config=self.config)
739 736
740 737 t = IPControllerTask(config=self.config)
741 738 # The tasks work directory is *not* the actual work directory of
742 739 # the controller. It is used as the base path for the stdout/stderr
743 740 # files that the scheduler redirects to.
744 741 t.work_directory = self.profile_dir
745 742 # Add the profile_dir and from self.start().
746 743 t.controller_args.extend(self.extra_args)
747 744 job.add_task(t)
748 745
749 746 self.log.info("Writing job description file: %s" % self.job_file)
750 747 job.write(self.job_file)
751 748
752 749 @property
753 750 def job_file(self):
754 751 return os.path.join(self.profile_dir, self.job_file_name)
755 752
756 753 def start(self, profile_dir):
757 754 """Start the controller by profile_dir."""
758 755 self.extra_args = ['profile_dir=%s'%profile_dir]
759 756 self.profile_dir = unicode(profile_dir)
760 757 return super(WindowsHPCControllerLauncher, self).start(1)
761 758
762 759
763 760 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
764 761
765 762 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
766 763 help="jobfile for ipengines job")
767 764 extra_args = List([], config=False,
768 765 help="extra args to pas to ipengine")
769 766
770 767 def write_job_file(self, n):
771 768 job = IPEngineSetJob(config=self.config)
772 769
773 770 for i in range(n):
774 771 t = IPEngineTask(config=self.config)
775 772 # The tasks work directory is *not* the actual work directory of
776 773 # the engine. It is used as the base path for the stdout/stderr
777 774 # files that the scheduler redirects to.
778 775 t.work_directory = self.profile_dir
779 776 # Add the profile_dir and from self.start().
780 777 t.engine_args.extend(self.extra_args)
781 778 job.add_task(t)
782 779
783 780 self.log.info("Writing job description file: %s" % self.job_file)
784 781 job.write(self.job_file)
785 782
786 783 @property
787 784 def job_file(self):
788 785 return os.path.join(self.profile_dir, self.job_file_name)
789 786
790 787 def start(self, n, profile_dir):
791 788 """Start the controller by profile_dir."""
792 789 self.extra_args = ['profile_dir=%s'%profile_dir]
793 790 self.profile_dir = unicode(profile_dir)
794 791 return super(WindowsHPCEngineSetLauncher, self).start(n)
795 792
796 793
797 794 #-----------------------------------------------------------------------------
798 795 # Batch (PBS) system launchers
799 796 #-----------------------------------------------------------------------------
800 797
801 798 class BatchSystemLauncher(BaseLauncher):
802 799 """Launch an external process using a batch system.
803 800
804 801 This class is designed to work with UNIX batch systems like PBS, LSF,
805 802 GridEngine, etc. The overall model is that there are different commands
806 803 like qsub, qdel, etc. that handle the starting and stopping of the process.
807 804
808 805 This class also has the notion of a batch script. The ``batch_template``
809 806 attribute can be set to a string that is a template for the batch script.
810 807 This template is instantiated using string formatting. Thus the template can
811 808 use {n} fot the number of instances. Subclasses can add additional variables
812 809 to the template dict.
813 810 """
814 811
815 812 # Subclasses must fill these in. See PBSEngineSet
816 813 submit_command = List([''], config=True,
817 814 help="The name of the command line program used to submit jobs.")
818 815 delete_command = List([''], config=True,
819 816 help="The name of the command line program used to delete jobs.")
820 817 job_id_regexp = Unicode('', config=True,
821 818 help="""A regular expression used to get the job id from the output of the
822 819 submit_command.""")
823 820 batch_template = Unicode('', config=True,
824 821 help="The string that is the batch script template itself.")
825 822 batch_template_file = Unicode(u'', config=True,
826 823 help="The file that contains the batch template.")
827 824 batch_file_name = Unicode(u'batch_script', config=True,
828 825 help="The filename of the instantiated batch script.")
829 826 queue = Unicode(u'', config=True,
830 827 help="The PBS Queue.")
831 828
832 829 # not configurable, override in subclasses
833 830 # PBS Job Array regex
834 831 job_array_regexp = Unicode('')
835 832 job_array_template = Unicode('')
836 833 # PBS Queue regex
837 834 queue_regexp = Unicode('')
838 835 queue_template = Unicode('')
839 836 # The default batch template, override in subclasses
840 837 default_template = Unicode('')
841 838 # The full path to the instantiated batch script.
842 839 batch_file = Unicode(u'')
843 840 # the format dict used with batch_template:
844 841 context = Dict()
845 842 # the Formatter instance for rendering the templates:
846 843 formatter = Instance(EvalFormatter, (), {})
847 844
848 845
849 846 def find_args(self):
850 847 return self.submit_command + [self.batch_file]
851 848
852 849 def __init__(self, work_dir=u'.', config=None, **kwargs):
853 850 super(BatchSystemLauncher, self).__init__(
854 851 work_dir=work_dir, config=config, **kwargs
855 852 )
856 853 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
857 854
858 855 def parse_job_id(self, output):
859 856 """Take the output of the submit command and return the job id."""
860 857 m = re.search(self.job_id_regexp, output)
861 858 if m is not None:
862 859 job_id = m.group()
863 860 else:
864 861 raise LauncherError("Job id couldn't be determined: %s" % output)
865 862 self.job_id = job_id
866 863 self.log.info('Job submitted with job id: %r' % job_id)
867 864 return job_id
868 865
869 866 def write_batch_script(self, n):
870 867 """Instantiate and write the batch script to the work_dir."""
871 868 self.context['n'] = n
872 869 self.context['queue'] = self.queue
873 870 # first priority is batch_template if set
874 871 if self.batch_template_file and not self.batch_template:
875 872 # second priority is batch_template_file
876 873 with open(self.batch_template_file) as f:
877 874 self.batch_template = f.read()
878 875 if not self.batch_template:
879 876 # third (last) priority is default_template
880 877 self.batch_template = self.default_template
881 878
882 879 regex = re.compile(self.job_array_regexp)
883 880 # print regex.search(self.batch_template)
884 881 if not regex.search(self.batch_template):
885 882 self.log.info("adding job array settings to batch script")
886 883 firstline, rest = self.batch_template.split('\n',1)
887 884 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
888 885
889 886 regex = re.compile(self.queue_regexp)
890 887 # print regex.search(self.batch_template)
891 888 if self.queue and not regex.search(self.batch_template):
892 889 self.log.info("adding PBS queue settings to batch script")
893 890 firstline, rest = self.batch_template.split('\n',1)
894 891 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
895 892
896 893 script_as_string = self.formatter.format(self.batch_template, **self.context)
897 894 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
898 895
899 896 with open(self.batch_file, 'w') as f:
900 897 f.write(script_as_string)
901 898 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
902 899
903 900 def start(self, n, profile_dir):
904 901 """Start n copies of the process using a batch system."""
905 902 # Here we save profile_dir in the context so they
906 903 # can be used in the batch script template as {profile_dir}
907 904 self.context['profile_dir'] = profile_dir
908 905 self.profile_dir = unicode(profile_dir)
909 906 self.write_batch_script(n)
910 907 output = check_output(self.args, env=os.environ)
911 908
912 909 job_id = self.parse_job_id(output)
913 910 self.notify_start(job_id)
914 911 return job_id
915 912
916 913 def stop(self):
917 914 output = check_output(self.delete_command+[self.job_id], env=os.environ)
918 915 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
919 916 return output
920 917
921 918
922 919 class PBSLauncher(BatchSystemLauncher):
923 920 """A BatchSystemLauncher subclass for PBS."""
924 921
925 922 submit_command = List(['qsub'], config=True,
926 923 help="The PBS submit command ['qsub']")
927 924 delete_command = List(['qdel'], config=True,
928 925 help="The PBS delete command ['qsub']")
929 926 job_id_regexp = Unicode(r'\d+', config=True,
930 927 help="Regular expresion for identifying the job ID [r'\d+']")
931 928
932 929 batch_file = Unicode(u'')
933 930 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
934 931 job_array_template = Unicode('#PBS -t 1-{n}')
935 932 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
936 933 queue_template = Unicode('#PBS -q {queue}')
937 934
938 935
939 936 class PBSControllerLauncher(PBSLauncher):
940 937 """Launch a controller using PBS."""
941 938
942 939 batch_file_name = Unicode(u'pbs_controller', config=True,
943 940 help="batch file name for the controller job.")
944 941 default_template= Unicode("""#!/bin/sh
945 942 #PBS -V
946 943 #PBS -N ipcontroller
947 944 %s --log-to-file profile_dir={profile_dir}
948 945 """%(' '.join(ipcontroller_cmd_argv)))
949 946
950 947 def start(self, profile_dir):
951 948 """Start the controller by profile or profile_dir."""
952 949 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
953 950 return super(PBSControllerLauncher, self).start(1, profile_dir)
954 951
955 952
956 953 class PBSEngineSetLauncher(PBSLauncher):
957 954 """Launch Engines using PBS"""
958 955 batch_file_name = Unicode(u'pbs_engines', config=True,
959 956 help="batch file name for the engine(s) job.")
960 957 default_template= Unicode(u"""#!/bin/sh
961 958 #PBS -V
962 959 #PBS -N ipengine
963 960 %s profile_dir={profile_dir}
964 961 """%(' '.join(ipengine_cmd_argv)))
965 962
966 963 def start(self, n, profile_dir):
967 964 """Start n engines by profile or profile_dir."""
968 965 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
969 966 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
970 967
971 968 #SGE is very similar to PBS
972 969
973 970 class SGELauncher(PBSLauncher):
974 971 """Sun GridEngine is a PBS clone with slightly different syntax"""
975 972 job_array_regexp = Unicode('#\$\W+\-t')
976 973 job_array_template = Unicode('#$ -t 1-{n}')
977 974 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
978 975 queue_template = Unicode('#$ -q $queue')
979 976
980 977 class SGEControllerLauncher(SGELauncher):
981 978 """Launch a controller using SGE."""
982 979
983 980 batch_file_name = Unicode(u'sge_controller', config=True,
984 981 help="batch file name for the ipontroller job.")
985 982 default_template= Unicode(u"""#$ -V
986 983 #$ -S /bin/sh
987 984 #$ -N ipcontroller
988 985 %s --log-to-file profile_dir={profile_dir}
989 986 """%(' '.join(ipcontroller_cmd_argv)))
990 987
991 988 def start(self, profile_dir):
992 989 """Start the controller by profile or profile_dir."""
993 990 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
994 991 return super(SGEControllerLauncher, self).start(1, profile_dir)
995 992
996 993 class SGEEngineSetLauncher(SGELauncher):
997 994 """Launch Engines with SGE"""
998 995 batch_file_name = Unicode(u'sge_engines', config=True,
999 996 help="batch file name for the engine(s) job.")
1000 997 default_template = Unicode("""#$ -V
1001 998 #$ -S /bin/sh
1002 999 #$ -N ipengine
1003 1000 %s profile_dir={profile_dir}
1004 1001 """%(' '.join(ipengine_cmd_argv)))
1005 1002
1006 1003 def start(self, n, profile_dir):
1007 1004 """Start n engines by profile or profile_dir."""
1008 1005 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1009 1006 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1010 1007
1011 1008
1012 1009 #-----------------------------------------------------------------------------
1013 1010 # A launcher for ipcluster itself!
1014 1011 #-----------------------------------------------------------------------------
1015 1012
1016 1013
1017 1014 class IPClusterLauncher(LocalProcessLauncher):
1018 1015 """Launch the ipcluster program in an external process."""
1019 1016
1020 1017 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1021 1018 help="Popen command for ipcluster")
1022 1019 ipcluster_args = List(
1023 1020 ['--clean-logs', '--log-to-file', 'log_level=%i'%logging.INFO], config=True,
1024 1021 help="Command line arguments to pass to ipcluster.")
1025 1022 ipcluster_subcommand = Unicode('start')
1026 1023 ipcluster_n = Int(2)
1027 1024
1028 1025 def find_args(self):
1029 1026 return self.ipcluster_cmd + ['--'+self.ipcluster_subcommand] + \
1030 1027 ['n=%i'%self.ipcluster_n] + self.ipcluster_args
1031 1028
1032 1029 def start(self):
1033 1030 self.log.info("Starting ipcluster: %r" % self.args)
1034 1031 return super(IPClusterLauncher, self).start()
1035 1032
1036 1033 #-----------------------------------------------------------------------------
1037 1034 # Collections of launchers
1038 1035 #-----------------------------------------------------------------------------
1039 1036
1040 1037 local_launchers = [
1041 1038 LocalControllerLauncher,
1042 1039 LocalEngineLauncher,
1043 1040 LocalEngineSetLauncher,
1044 1041 ]
1045 1042 mpi_launchers = [
1046 1043 MPIExecLauncher,
1047 1044 MPIExecControllerLauncher,
1048 1045 MPIExecEngineSetLauncher,
1049 1046 ]
1050 1047 ssh_launchers = [
1051 1048 SSHLauncher,
1052 1049 SSHControllerLauncher,
1053 1050 SSHEngineLauncher,
1054 1051 SSHEngineSetLauncher,
1055 1052 ]
1056 1053 winhpc_launchers = [
1057 1054 WindowsHPCLauncher,
1058 1055 WindowsHPCControllerLauncher,
1059 1056 WindowsHPCEngineSetLauncher,
1060 1057 ]
1061 1058 pbs_launchers = [
1062 1059 PBSLauncher,
1063 1060 PBSControllerLauncher,
1064 1061 PBSEngineSetLauncher,
1065 1062 ]
1066 1063 sge_launchers = [
1067 1064 SGELauncher,
1068 1065 SGEControllerLauncher,
1069 1066 SGEEngineSetLauncher,
1070 1067 ]
1071 1068 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1072 1069 + pbs_launchers + sge_launchers
@@ -1,113 +1,108 b''
1 1 #!/usr/bin/env python
2 2 """A simple logger object that consolidates messages incoming from ipcluster processes."""
3 3
4 4 #-----------------------------------------------------------------------------
5 5 # Copyright (C) 2011 The IPython Development Team
6 6 #
7 7 # Distributed under the terms of the BSD License. The full license is in
8 8 # the file COPYING, distributed as part of this software.
9 9 #-----------------------------------------------------------------------------
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Imports
13 13 #-----------------------------------------------------------------------------
14 14
15 15
16 16 import logging
17 17 import sys
18 18
19 19 import zmq
20 20 from zmq.eventloop import ioloop, zmqstream
21 21
22 from IPython.config.application import Application
23 from IPython.config.configurable import Configurable
22 from IPython.config.configurable import LoggingConfigurable
24 23 from IPython.utils.traitlets import Int, Unicode, Instance, List
25 24
26 25 #-----------------------------------------------------------------------------
27 26 # Classes
28 27 #-----------------------------------------------------------------------------
29 28
30 29
31 class LogWatcher(Configurable):
30 class LogWatcher(LoggingConfigurable):
32 31 """A simple class that receives messages on a SUB socket, as published
33 32 by subclasses of `zmq.log.handlers.PUBHandler`, and logs them itself.
34 33
35 34 This can subscribe to multiple topics, but defaults to all topics.
36 35 """
37 36
38 log = Instance('logging.Logger')
39 def _log_default(self):
40 return Application.instance().log
41
42 37 # configurables
43 38 topics = List([''], config=True,
44 39 help="The ZMQ topics to subscribe to. Default is to subscribe to all messages")
45 40 url = Unicode('tcp://127.0.0.1:20202', config=True,
46 41 help="ZMQ url on which to listen for log messages")
47 42
48 43 # internals
49 44 stream = Instance('zmq.eventloop.zmqstream.ZMQStream')
50 45
51 46 context = Instance(zmq.Context)
52 47 def _context_default(self):
53 48 return zmq.Context.instance()
54 49
55 50 loop = Instance(zmq.eventloop.ioloop.IOLoop)
56 51 def _loop_default(self):
57 52 return ioloop.IOLoop.instance()
58 53
59 54 def __init__(self, **kwargs):
60 55 super(LogWatcher, self).__init__(**kwargs)
61 56 s = self.context.socket(zmq.SUB)
62 57 s.bind(self.url)
63 58 self.stream = zmqstream.ZMQStream(s, self.loop)
64 59 self.subscribe()
65 60 self.on_trait_change(self.subscribe, 'topics')
66 61
67 62 def start(self):
68 63 self.stream.on_recv(self.log_message)
69 64
70 65 def stop(self):
71 66 self.stream.stop_on_recv()
72 67
73 68 def subscribe(self):
74 69 """Update our SUB socket's subscriptions."""
75 70 self.stream.setsockopt(zmq.UNSUBSCRIBE, '')
76 71 if '' in self.topics:
77 72 self.log.debug("Subscribing to: everything")
78 73 self.stream.setsockopt(zmq.SUBSCRIBE, '')
79 74 else:
80 75 for topic in self.topics:
81 76 self.log.debug("Subscribing to: %r"%(topic))
82 77 self.stream.setsockopt(zmq.SUBSCRIBE, topic)
83 78
84 79 def _extract_level(self, topic_str):
85 80 """Turn 'engine.0.INFO.extra' into (logging.INFO, 'engine.0.extra')"""
86 81 topics = topic_str.split('.')
87 82 for idx,t in enumerate(topics):
88 83 level = getattr(logging, t, None)
89 84 if level is not None:
90 85 break
91 86
92 87 if level is None:
93 88 level = logging.INFO
94 89 else:
95 90 topics.pop(idx)
96 91
97 92 return level, '.'.join(topics)
98 93
99 94
100 95 def log_message(self, raw):
101 96 """receive and parse a message, then log it."""
102 97 if len(raw) != 2 or '.' not in raw[0]:
103 98 self.log.error("Invalid log message: %s"%raw)
104 99 return
105 100 else:
106 101 topic, msg = raw
107 102 # don't newline, since log messages always newline:
108 103 topic,level_name = topic.rsplit('.',1)
109 104 level,topic = self._extract_level(topic)
110 105 if msg[-1] == '\n':
111 106 msg = msg[:-1]
112 107 self.log.log(level, "[%s] %s" % (topic, msg))
113 108
@@ -1,184 +1,180 b''
1 1 """A Task logger that presents our DB interface,
2 2 but exists entirely in memory and implemented with dicts.
3 3
4 4 TaskRecords are dicts of the form:
5 5 {
6 6 'msg_id' : str(uuid),
7 7 'client_uuid' : str(uuid),
8 8 'engine_uuid' : str(uuid) or None,
9 9 'header' : dict(header),
10 10 'content': dict(content),
11 11 'buffers': list(buffers),
12 12 'submitted': datetime,
13 13 'started': datetime or None,
14 14 'completed': datetime or None,
15 15 'resubmitted': datetime or None,
16 16 'result_header' : dict(header) or None,
17 17 'result_content' : dict(content) or None,
18 18 'result_buffers' : list(buffers) or None,
19 19 }
20 20 With this info, many of the special categories of tasks can be defined by query:
21 21
22 22 pending: completed is None
23 23 client's outstanding: client_uuid = uuid && completed is None
24 24 MIA: arrived is None (and completed is None)
25 25 etc.
26 26
27 27 EngineRecords are dicts of the form:
28 28 {
29 29 'eid' : int(id),
30 30 'uuid': str(uuid)
31 31 }
32 32 This may be extended, but is currently.
33 33
34 34 We support a subset of mongodb operators:
35 35 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
36 36 """
37 37 #-----------------------------------------------------------------------------
38 38 # Copyright (C) 2010 The IPython Development Team
39 39 #
40 40 # Distributed under the terms of the BSD License. The full license is in
41 41 # the file COPYING, distributed as part of this software.
42 42 #-----------------------------------------------------------------------------
43 43
44 44
45 45 from datetime import datetime
46 46
47 from IPython.config.application import Application
48 from IPython.config.configurable import Configurable
47 from IPython.config.configurable import LoggingConfigurable
49 48
50 49 from IPython.utils.traitlets import Dict, Unicode, Instance
51 50
52 51 filters = {
53 52 '$lt' : lambda a,b: a < b,
54 53 '$gt' : lambda a,b: b > a,
55 54 '$eq' : lambda a,b: a == b,
56 55 '$ne' : lambda a,b: a != b,
57 56 '$lte': lambda a,b: a <= b,
58 57 '$gte': lambda a,b: a >= b,
59 58 '$in' : lambda a,b: a in b,
60 59 '$nin': lambda a,b: a not in b,
61 60 '$all': lambda a,b: all([ a in bb for bb in b ]),
62 61 '$mod': lambda a,b: a%b[0] == b[1],
63 62 '$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
64 63 }
65 64
66 65
67 66 class CompositeFilter(object):
68 67 """Composite filter for matching multiple properties."""
69 68
70 69 def __init__(self, dikt):
71 70 self.tests = []
72 71 self.values = []
73 72 for key, value in dikt.iteritems():
74 73 self.tests.append(filters[key])
75 74 self.values.append(value)
76 75
77 76 def __call__(self, value):
78 77 for test,check in zip(self.tests, self.values):
79 78 if not test(value, check):
80 79 return False
81 80 return True
82 81
83 class BaseDB(Configurable):
82 class BaseDB(LoggingConfigurable):
84 83 """Empty Parent class so traitlets work on DB."""
85 84 # base configurable traits:
86 85 session = Unicode("")
87 log = Instance('logging.Logger')
88 def _log_default(self):
89 return Application.instance().log
90 86
91 87 class DictDB(BaseDB):
92 88 """Basic in-memory dict-based object for saving Task Records.
93 89
94 90 This is the first object to present the DB interface
95 91 for logging tasks out of memory.
96 92
97 93 The interface is based on MongoDB, so adding a MongoDB
98 94 backend should be straightforward.
99 95 """
100 96
101 97 _records = Dict()
102 98
103 99 def _match_one(self, rec, tests):
104 100 """Check if a specific record matches tests."""
105 101 for key,test in tests.iteritems():
106 102 if not test(rec.get(key, None)):
107 103 return False
108 104 return True
109 105
110 106 def _match(self, check):
111 107 """Find all the matches for a check dict."""
112 108 matches = []
113 109 tests = {}
114 110 for k,v in check.iteritems():
115 111 if isinstance(v, dict):
116 112 tests[k] = CompositeFilter(v)
117 113 else:
118 114 tests[k] = lambda o: o==v
119 115
120 116 for rec in self._records.itervalues():
121 117 if self._match_one(rec, tests):
122 118 matches.append(rec)
123 119 return matches
124 120
125 121 def _extract_subdict(self, rec, keys):
126 122 """extract subdict of keys"""
127 123 d = {}
128 124 d['msg_id'] = rec['msg_id']
129 125 for key in keys:
130 126 d[key] = rec[key]
131 127 return d
132 128
133 129 def add_record(self, msg_id, rec):
134 130 """Add a new Task Record, by msg_id."""
135 131 if self._records.has_key(msg_id):
136 132 raise KeyError("Already have msg_id %r"%(msg_id))
137 133 self._records[msg_id] = rec
138 134
139 135 def get_record(self, msg_id):
140 136 """Get a specific Task Record, by msg_id."""
141 137 if not self._records.has_key(msg_id):
142 138 raise KeyError("No such msg_id %r"%(msg_id))
143 139 return self._records[msg_id]
144 140
145 141 def update_record(self, msg_id, rec):
146 142 """Update the data in an existing record."""
147 143 self._records[msg_id].update(rec)
148 144
149 145 def drop_matching_records(self, check):
150 146 """Remove a record from the DB."""
151 147 matches = self._match(check)
152 148 for m in matches:
153 149 del self._records[m['msg_id']]
154 150
155 151 def drop_record(self, msg_id):
156 152 """Remove a record from the DB."""
157 153 del self._records[msg_id]
158 154
159 155
160 156 def find_records(self, check, keys=None):
161 157 """Find records matching a query dict, optionally extracting subset of keys.
162 158
163 159 Returns dict keyed by msg_id of matching records.
164 160
165 161 Parameters
166 162 ----------
167 163
168 164 check: dict
169 165 mongodb-style query argument
170 166 keys: list of strs [optional]
171 167 if specified, the subset of keys to extract. msg_id will *always* be
172 168 included.
173 169 """
174 170 matches = self._match(check)
175 171 if keys:
176 172 return [ self._extract_subdict(rec, keys) for rec in matches ]
177 173 else:
178 174 return matches
179 175
180 176
181 177 def get_history(self):
182 178 """get all msg_ids, ordered by time submitted."""
183 179 msg_ids = self._records.keys()
184 180 return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])
@@ -1,170 +1,165 b''
1 1 #!/usr/bin/env python
2 2 """
3 3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
4 4 and hearts are tracked based on their XREQ identities.
5 5 """
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2010-2011 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 from __future__ import print_function
14 14 import time
15 15 import uuid
16 16
17 17 import zmq
18 18 from zmq.devices import ThreadDevice
19 19 from zmq.eventloop import ioloop, zmqstream
20 20
21 from IPython.config.application import Application
22 from IPython.config.configurable import Configurable
21 from IPython.config.configurable import LoggingConfigurable
23 22 from IPython.utils.traitlets import Set, Instance, CFloat
24 23
25 24 class Heart(object):
26 25 """A basic heart object for responding to a HeartMonitor.
27 26 This is a simple wrapper with defaults for the most common
28 27 Device model for responding to heartbeats.
29 28
30 29 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
31 30 SUB/XREQ for in/out.
32 31
33 32 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
34 33 device=None
35 34 id=None
36 35 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None):
37 36 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
38 37 self.device.daemon=True
39 38 self.device.connect_in(in_addr)
40 39 self.device.connect_out(out_addr)
41 40 if in_type == zmq.SUB:
42 41 self.device.setsockopt_in(zmq.SUBSCRIBE, "")
43 42 if heart_id is None:
44 43 heart_id = str(uuid.uuid4())
45 44 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
46 45 self.id = heart_id
47 46
48 47 def start(self):
49 48 return self.device.start()
50 49
51 class HeartMonitor(Configurable):
50 class HeartMonitor(LoggingConfigurable):
52 51 """A basic HeartMonitor class
53 52 pingstream: a PUB stream
54 53 pongstream: an XREP stream
55 54 period: the period of the heartbeat in milliseconds"""
56 55
57 56 period=CFloat(1000, config=True,
58 57 help='The frequency at which the Hub pings the engines for heartbeats '
59 58 ' (in ms) [default: 100]',
60 59 )
61 60
62 log = Instance('logging.Logger')
63 def _log_default(self):
64 return Application.instance().log
65
66 61 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
67 62 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
68 63 loop = Instance('zmq.eventloop.ioloop.IOLoop')
69 64 def _loop_default(self):
70 65 return ioloop.IOLoop.instance()
71 66
72 67 # not settable:
73 68 hearts=Set()
74 69 responses=Set()
75 70 on_probation=Set()
76 71 last_ping=CFloat(0)
77 72 _new_handlers = Set()
78 73 _failure_handlers = Set()
79 74 lifetime = CFloat(0)
80 75 tic = CFloat(0)
81 76
82 77 def __init__(self, **kwargs):
83 78 super(HeartMonitor, self).__init__(**kwargs)
84 79
85 80 self.pongstream.on_recv(self.handle_pong)
86 81
87 82 def start(self):
88 83 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
89 84 self.caller.start()
90 85
91 86 def add_new_heart_handler(self, handler):
92 87 """add a new handler for new hearts"""
93 88 self.log.debug("heartbeat::new_heart_handler: %s"%handler)
94 89 self._new_handlers.add(handler)
95 90
96 91 def add_heart_failure_handler(self, handler):
97 92 """add a new handler for heart failure"""
98 93 self.log.debug("heartbeat::new heart failure handler: %s"%handler)
99 94 self._failure_handlers.add(handler)
100 95
101 96 def beat(self):
102 97 self.pongstream.flush()
103 98 self.last_ping = self.lifetime
104 99
105 100 toc = time.time()
106 101 self.lifetime += toc-self.tic
107 102 self.tic = toc
108 103 # self.log.debug("heartbeat::%s"%self.lifetime)
109 104 goodhearts = self.hearts.intersection(self.responses)
110 105 missed_beats = self.hearts.difference(goodhearts)
111 106 heartfailures = self.on_probation.intersection(missed_beats)
112 107 newhearts = self.responses.difference(goodhearts)
113 108 map(self.handle_new_heart, newhearts)
114 109 map(self.handle_heart_failure, heartfailures)
115 110 self.on_probation = missed_beats.intersection(self.hearts)
116 111 self.responses = set()
117 112 # print self.on_probation, self.hearts
118 113 # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
119 114 self.pingstream.send(str(self.lifetime))
120 115
121 116 def handle_new_heart(self, heart):
122 117 if self._new_handlers:
123 118 for handler in self._new_handlers:
124 119 handler(heart)
125 120 else:
126 121 self.log.info("heartbeat::yay, got new heart %s!"%heart)
127 122 self.hearts.add(heart)
128 123
129 124 def handle_heart_failure(self, heart):
130 125 if self._failure_handlers:
131 126 for handler in self._failure_handlers:
132 127 try:
133 128 handler(heart)
134 129 except Exception as e:
135 130 self.log.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
136 131 pass
137 132 else:
138 133 self.log.info("heartbeat::Heart %s failed :("%heart)
139 134 self.hearts.remove(heart)
140 135
141 136
142 137 def handle_pong(self, msg):
143 138 "a heart just beat"
144 139 if msg[1] == str(self.lifetime):
145 140 delta = time.time()-self.tic
146 141 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
147 142 self.responses.add(msg[0])
148 143 elif msg[1] == str(self.last_ping):
149 144 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
150 145 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
151 146 self.responses.add(msg[0])
152 147 else:
153 148 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
154 149 (msg[1],self.lifetime))
155 150
156 151
157 152 if __name__ == '__main__':
158 153 loop = ioloop.IOLoop.instance()
159 154 context = zmq.Context()
160 155 pub = context.socket(zmq.PUB)
161 156 pub.bind('tcp://127.0.0.1:5555')
162 157 xrep = context.socket(zmq.XREP)
163 158 xrep.bind('tcp://127.0.0.1:5556')
164 159
165 160 outstream = zmqstream.ZMQStream(pub, loop)
166 161 instream = zmqstream.ZMQStream(xrep, loop)
167 162
168 163 hb = HeartMonitor(loop, outstream, instream)
169 164
170 165 loop.start()
General Comments 0
You need to be logged in to leave comments. Login now