##// END OF EJS Templates
get default logger from Application.instance()
MinRK -
Show More
@@ -1,1069 +1,1072 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Facilities for launching IPython processes asynchronously.
4 Facilities for launching IPython processes asynchronously.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import copy
18 import copy
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import stat
22 import stat
23
23
24 # signal imports, handling various platforms, versions
24 # signal imports, handling various platforms, versions
25
25
26 from signal import SIGINT, SIGTERM
26 from signal import SIGINT, SIGTERM
27 try:
27 try:
28 from signal import SIGKILL
28 from signal import SIGKILL
29 except ImportError:
29 except ImportError:
30 # Windows
30 # Windows
31 SIGKILL=SIGTERM
31 SIGKILL=SIGTERM
32
32
33 try:
33 try:
34 # Windows >= 2.7, 3.2
34 # Windows >= 2.7, 3.2
35 from signal import CTRL_C_EVENT as SIGINT
35 from signal import CTRL_C_EVENT as SIGINT
36 except ImportError:
36 except ImportError:
37 pass
37 pass
38
38
39 from subprocess import Popen, PIPE, STDOUT
39 from subprocess import Popen, PIPE, STDOUT
40 try:
40 try:
41 from subprocess import check_output
41 from subprocess import check_output
42 except ImportError:
42 except ImportError:
43 # pre-2.7, define check_output with Popen
43 # pre-2.7, define check_output with Popen
44 def check_output(*args, **kwargs):
44 def check_output(*args, **kwargs):
45 kwargs.update(dict(stdout=PIPE))
45 kwargs.update(dict(stdout=PIPE))
46 p = Popen(*args, **kwargs)
46 p = Popen(*args, **kwargs)
47 out,err = p.communicate()
47 out,err = p.communicate()
48 return out
48 return out
49
49
50 from zmq.eventloop import ioloop
50 from zmq.eventloop import ioloop
51
51
52 from IPython.config.application import Application
52 from IPython.config.configurable import Configurable
53 from IPython.config.configurable import Configurable
53 from IPython.utils.text import EvalFormatter
54 from IPython.utils.text import EvalFormatter
54 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
55 from IPython.utils.traitlets import Any, Int, List, Unicode, Dict, Instance
55 from IPython.utils.path import get_ipython_module_path
56 from IPython.utils.path import get_ipython_module_path
56 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
57 from IPython.utils.process import find_cmd, pycmd2argv, FindCmdError
57
58
58 from .win32support import forward_read_events
59 from .win32support import forward_read_events
59
60
60 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
61 from .winhpcjob import IPControllerTask, IPEngineTask, IPControllerJob, IPEngineSetJob
61
62
62 WINDOWS = os.name == 'nt'
63 WINDOWS = os.name == 'nt'
63
64
64 #-----------------------------------------------------------------------------
65 #-----------------------------------------------------------------------------
65 # Paths to the kernel apps
66 # Paths to the kernel apps
66 #-----------------------------------------------------------------------------
67 #-----------------------------------------------------------------------------
67
68
68
69
69 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
70 ipcluster_cmd_argv = pycmd2argv(get_ipython_module_path(
70 'IPython.parallel.apps.ipclusterapp'
71 'IPython.parallel.apps.ipclusterapp'
71 ))
72 ))
72
73
73 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
74 ipengine_cmd_argv = pycmd2argv(get_ipython_module_path(
74 'IPython.parallel.apps.ipengineapp'
75 'IPython.parallel.apps.ipengineapp'
75 ))
76 ))
76
77
77 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
78 ipcontroller_cmd_argv = pycmd2argv(get_ipython_module_path(
78 'IPython.parallel.apps.ipcontrollerapp'
79 'IPython.parallel.apps.ipcontrollerapp'
79 ))
80 ))
80
81
81 #-----------------------------------------------------------------------------
82 #-----------------------------------------------------------------------------
82 # Base launchers and errors
83 # Base launchers and errors
83 #-----------------------------------------------------------------------------
84 #-----------------------------------------------------------------------------
84
85
85
86
86 class LauncherError(Exception):
87 class LauncherError(Exception):
87 pass
88 pass
88
89
89
90
90 class ProcessStateError(LauncherError):
91 class ProcessStateError(LauncherError):
91 pass
92 pass
92
93
93
94
94 class UnknownStatus(LauncherError):
95 class UnknownStatus(LauncherError):
95 pass
96 pass
96
97
97
98
98 class BaseLauncher(Configurable):
99 class BaseLauncher(Configurable):
99 """An asbtraction for starting, stopping and signaling a process."""
100 """An asbtraction for starting, stopping and signaling a process."""
100
101
101 # In all of the launchers, the work_dir is where child processes will be
102 # In all of the launchers, the work_dir is where child processes will be
102 # run. This will usually be the profile_dir, but may not be. any work_dir
103 # run. This will usually be the profile_dir, but may not be. any work_dir
103 # passed into the __init__ method will override the config value.
104 # passed into the __init__ method will override the config value.
104 # This should not be used to set the work_dir for the actual engine
105 # This should not be used to set the work_dir for the actual engine
105 # and controller. Instead, use their own config files or the
106 # and controller. Instead, use their own config files or the
106 # controller_args, engine_args attributes of the launchers to add
107 # controller_args, engine_args attributes of the launchers to add
107 # the work_dir option.
108 # the work_dir option.
108 work_dir = Unicode(u'.')
109 work_dir = Unicode(u'.')
109 loop = Instance('zmq.eventloop.ioloop.IOLoop')
110 loop = Instance('zmq.eventloop.ioloop.IOLoop')
110 log = Instance('logging.Logger', ('root',))
111 log = Instance('logging.Logger')
112 def _log_default(self):
113 return Application.instance().log
111
114
112 start_data = Any()
115 start_data = Any()
113 stop_data = Any()
116 stop_data = Any()
114
117
115 def _loop_default(self):
118 def _loop_default(self):
116 return ioloop.IOLoop.instance()
119 return ioloop.IOLoop.instance()
117
120
118 def __init__(self, work_dir=u'.', config=None, **kwargs):
121 def __init__(self, work_dir=u'.', config=None, **kwargs):
119 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
122 super(BaseLauncher, self).__init__(work_dir=work_dir, config=config, **kwargs)
120 self.state = 'before' # can be before, running, after
123 self.state = 'before' # can be before, running, after
121 self.stop_callbacks = []
124 self.stop_callbacks = []
122 self.start_data = None
125 self.start_data = None
123 self.stop_data = None
126 self.stop_data = None
124
127
125 @property
128 @property
126 def args(self):
129 def args(self):
127 """A list of cmd and args that will be used to start the process.
130 """A list of cmd and args that will be used to start the process.
128
131
129 This is what is passed to :func:`spawnProcess` and the first element
132 This is what is passed to :func:`spawnProcess` and the first element
130 will be the process name.
133 will be the process name.
131 """
134 """
132 return self.find_args()
135 return self.find_args()
133
136
134 def find_args(self):
137 def find_args(self):
135 """The ``.args`` property calls this to find the args list.
138 """The ``.args`` property calls this to find the args list.
136
139
137 Subcommand should implement this to construct the cmd and args.
140 Subcommand should implement this to construct the cmd and args.
138 """
141 """
139 raise NotImplementedError('find_args must be implemented in a subclass')
142 raise NotImplementedError('find_args must be implemented in a subclass')
140
143
141 @property
144 @property
142 def arg_str(self):
145 def arg_str(self):
143 """The string form of the program arguments."""
146 """The string form of the program arguments."""
144 return ' '.join(self.args)
147 return ' '.join(self.args)
145
148
146 @property
149 @property
147 def running(self):
150 def running(self):
148 """Am I running."""
151 """Am I running."""
149 if self.state == 'running':
152 if self.state == 'running':
150 return True
153 return True
151 else:
154 else:
152 return False
155 return False
153
156
154 def start(self):
157 def start(self):
155 """Start the process.
158 """Start the process.
156
159
157 This must return a deferred that fires with information about the
160 This must return a deferred that fires with information about the
158 process starting (like a pid, job id, etc.).
161 process starting (like a pid, job id, etc.).
159 """
162 """
160 raise NotImplementedError('start must be implemented in a subclass')
163 raise NotImplementedError('start must be implemented in a subclass')
161
164
162 def stop(self):
165 def stop(self):
163 """Stop the process and notify observers of stopping.
166 """Stop the process and notify observers of stopping.
164
167
165 This must return a deferred that fires with information about the
168 This must return a deferred that fires with information about the
166 processing stopping, like errors that occur while the process is
169 processing stopping, like errors that occur while the process is
167 attempting to be shut down. This deferred won't fire when the process
170 attempting to be shut down. This deferred won't fire when the process
168 actually stops. To observe the actual process stopping, see
171 actually stops. To observe the actual process stopping, see
169 :func:`observe_stop`.
172 :func:`observe_stop`.
170 """
173 """
171 raise NotImplementedError('stop must be implemented in a subclass')
174 raise NotImplementedError('stop must be implemented in a subclass')
172
175
173 def on_stop(self, f):
176 def on_stop(self, f):
174 """Get a deferred that will fire when the process stops.
177 """Get a deferred that will fire when the process stops.
175
178
176 The deferred will fire with data that contains information about
179 The deferred will fire with data that contains information about
177 the exit status of the process.
180 the exit status of the process.
178 """
181 """
179 if self.state=='after':
182 if self.state=='after':
180 return f(self.stop_data)
183 return f(self.stop_data)
181 else:
184 else:
182 self.stop_callbacks.append(f)
185 self.stop_callbacks.append(f)
183
186
184 def notify_start(self, data):
187 def notify_start(self, data):
185 """Call this to trigger startup actions.
188 """Call this to trigger startup actions.
186
189
187 This logs the process startup and sets the state to 'running'. It is
190 This logs the process startup and sets the state to 'running'. It is
188 a pass-through so it can be used as a callback.
191 a pass-through so it can be used as a callback.
189 """
192 """
190
193
191 self.log.info('Process %r started: %r' % (self.args[0], data))
194 self.log.info('Process %r started: %r' % (self.args[0], data))
192 self.start_data = data
195 self.start_data = data
193 self.state = 'running'
196 self.state = 'running'
194 return data
197 return data
195
198
196 def notify_stop(self, data):
199 def notify_stop(self, data):
197 """Call this to trigger process stop actions.
200 """Call this to trigger process stop actions.
198
201
199 This logs the process stopping and sets the state to 'after'. Call
202 This logs the process stopping and sets the state to 'after'. Call
200 this to trigger all the deferreds from :func:`observe_stop`."""
203 this to trigger all the deferreds from :func:`observe_stop`."""
201
204
202 self.log.info('Process %r stopped: %r' % (self.args[0], data))
205 self.log.info('Process %r stopped: %r' % (self.args[0], data))
203 self.stop_data = data
206 self.stop_data = data
204 self.state = 'after'
207 self.state = 'after'
205 for i in range(len(self.stop_callbacks)):
208 for i in range(len(self.stop_callbacks)):
206 d = self.stop_callbacks.pop()
209 d = self.stop_callbacks.pop()
207 d(data)
210 d(data)
208 return data
211 return data
209
212
210 def signal(self, sig):
213 def signal(self, sig):
211 """Signal the process.
214 """Signal the process.
212
215
213 Return a semi-meaningless deferred after signaling the process.
216 Return a semi-meaningless deferred after signaling the process.
214
217
215 Parameters
218 Parameters
216 ----------
219 ----------
217 sig : str or int
220 sig : str or int
218 'KILL', 'INT', etc., or any signal number
221 'KILL', 'INT', etc., or any signal number
219 """
222 """
220 raise NotImplementedError('signal must be implemented in a subclass')
223 raise NotImplementedError('signal must be implemented in a subclass')
221
224
222
225
223 #-----------------------------------------------------------------------------
226 #-----------------------------------------------------------------------------
224 # Local process launchers
227 # Local process launchers
225 #-----------------------------------------------------------------------------
228 #-----------------------------------------------------------------------------
226
229
227
230
228 class LocalProcessLauncher(BaseLauncher):
231 class LocalProcessLauncher(BaseLauncher):
229 """Start and stop an external process in an asynchronous manner.
232 """Start and stop an external process in an asynchronous manner.
230
233
231 This will launch the external process with a working directory of
234 This will launch the external process with a working directory of
232 ``self.work_dir``.
235 ``self.work_dir``.
233 """
236 """
234
237
235 # This is used to to construct self.args, which is passed to
238 # This is used to to construct self.args, which is passed to
236 # spawnProcess.
239 # spawnProcess.
237 cmd_and_args = List([])
240 cmd_and_args = List([])
238 poll_frequency = Int(100) # in ms
241 poll_frequency = Int(100) # in ms
239
242
240 def __init__(self, work_dir=u'.', config=None, **kwargs):
243 def __init__(self, work_dir=u'.', config=None, **kwargs):
241 super(LocalProcessLauncher, self).__init__(
244 super(LocalProcessLauncher, self).__init__(
242 work_dir=work_dir, config=config, **kwargs
245 work_dir=work_dir, config=config, **kwargs
243 )
246 )
244 self.process = None
247 self.process = None
245 self.start_deferred = None
248 self.start_deferred = None
246 self.poller = None
249 self.poller = None
247
250
248 def find_args(self):
251 def find_args(self):
249 return self.cmd_and_args
252 return self.cmd_and_args
250
253
251 def start(self):
254 def start(self):
252 if self.state == 'before':
255 if self.state == 'before':
253 self.process = Popen(self.args,
256 self.process = Popen(self.args,
254 stdout=PIPE,stderr=PIPE,stdin=PIPE,
257 stdout=PIPE,stderr=PIPE,stdin=PIPE,
255 env=os.environ,
258 env=os.environ,
256 cwd=self.work_dir
259 cwd=self.work_dir
257 )
260 )
258 if WINDOWS:
261 if WINDOWS:
259 self.stdout = forward_read_events(self.process.stdout)
262 self.stdout = forward_read_events(self.process.stdout)
260 self.stderr = forward_read_events(self.process.stderr)
263 self.stderr = forward_read_events(self.process.stderr)
261 else:
264 else:
262 self.stdout = self.process.stdout.fileno()
265 self.stdout = self.process.stdout.fileno()
263 self.stderr = self.process.stderr.fileno()
266 self.stderr = self.process.stderr.fileno()
264 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
267 self.loop.add_handler(self.stdout, self.handle_stdout, self.loop.READ)
265 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
268 self.loop.add_handler(self.stderr, self.handle_stderr, self.loop.READ)
266 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
269 self.poller = ioloop.PeriodicCallback(self.poll, self.poll_frequency, self.loop)
267 self.poller.start()
270 self.poller.start()
268 self.notify_start(self.process.pid)
271 self.notify_start(self.process.pid)
269 else:
272 else:
270 s = 'The process was already started and has state: %r' % self.state
273 s = 'The process was already started and has state: %r' % self.state
271 raise ProcessStateError(s)
274 raise ProcessStateError(s)
272
275
273 def stop(self):
276 def stop(self):
274 return self.interrupt_then_kill()
277 return self.interrupt_then_kill()
275
278
276 def signal(self, sig):
279 def signal(self, sig):
277 if self.state == 'running':
280 if self.state == 'running':
278 if WINDOWS and sig != SIGINT:
281 if WINDOWS and sig != SIGINT:
279 # use Windows tree-kill for better child cleanup
282 # use Windows tree-kill for better child cleanup
280 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
283 check_output(['taskkill', '-pid', str(self.process.pid), '-t', '-f'])
281 else:
284 else:
282 self.process.send_signal(sig)
285 self.process.send_signal(sig)
283
286
284 def interrupt_then_kill(self, delay=2.0):
287 def interrupt_then_kill(self, delay=2.0):
285 """Send INT, wait a delay and then send KILL."""
288 """Send INT, wait a delay and then send KILL."""
286 try:
289 try:
287 self.signal(SIGINT)
290 self.signal(SIGINT)
288 except Exception:
291 except Exception:
289 self.log.debug("interrupt failed")
292 self.log.debug("interrupt failed")
290 pass
293 pass
291 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
294 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
292 self.killer.start()
295 self.killer.start()
293
296
294 # callbacks, etc:
297 # callbacks, etc:
295
298
296 def handle_stdout(self, fd, events):
299 def handle_stdout(self, fd, events):
297 if WINDOWS:
300 if WINDOWS:
298 line = self.stdout.recv()
301 line = self.stdout.recv()
299 else:
302 else:
300 line = self.process.stdout.readline()
303 line = self.process.stdout.readline()
301 # a stopped process will be readable but return empty strings
304 # a stopped process will be readable but return empty strings
302 if line:
305 if line:
303 self.log.info(line[:-1])
306 self.log.info(line[:-1])
304 else:
307 else:
305 self.poll()
308 self.poll()
306
309
307 def handle_stderr(self, fd, events):
310 def handle_stderr(self, fd, events):
308 if WINDOWS:
311 if WINDOWS:
309 line = self.stderr.recv()
312 line = self.stderr.recv()
310 else:
313 else:
311 line = self.process.stderr.readline()
314 line = self.process.stderr.readline()
312 # a stopped process will be readable but return empty strings
315 # a stopped process will be readable but return empty strings
313 if line:
316 if line:
314 self.log.error(line[:-1])
317 self.log.error(line[:-1])
315 else:
318 else:
316 self.poll()
319 self.poll()
317
320
318 def poll(self):
321 def poll(self):
319 status = self.process.poll()
322 status = self.process.poll()
320 if status is not None:
323 if status is not None:
321 self.poller.stop()
324 self.poller.stop()
322 self.loop.remove_handler(self.stdout)
325 self.loop.remove_handler(self.stdout)
323 self.loop.remove_handler(self.stderr)
326 self.loop.remove_handler(self.stderr)
324 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
327 self.notify_stop(dict(exit_code=status, pid=self.process.pid))
325 return status
328 return status
326
329
327 class LocalControllerLauncher(LocalProcessLauncher):
330 class LocalControllerLauncher(LocalProcessLauncher):
328 """Launch a controller as a regular external process."""
331 """Launch a controller as a regular external process."""
329
332
330 controller_cmd = List(ipcontroller_cmd_argv, config=True,
333 controller_cmd = List(ipcontroller_cmd_argv, config=True,
331 help="""Popen command to launch ipcontroller.""")
334 help="""Popen command to launch ipcontroller.""")
332 # Command line arguments to ipcontroller.
335 # Command line arguments to ipcontroller.
333 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
336 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
334 help="""command-line args to pass to ipcontroller""")
337 help="""command-line args to pass to ipcontroller""")
335
338
336 def find_args(self):
339 def find_args(self):
337 return self.controller_cmd + self.controller_args
340 return self.controller_cmd + self.controller_args
338
341
339 def start(self, profile_dir):
342 def start(self, profile_dir):
340 """Start the controller by profile_dir."""
343 """Start the controller by profile_dir."""
341 self.controller_args.extend(['profile_dir=%s'%profile_dir])
344 self.controller_args.extend(['profile_dir=%s'%profile_dir])
342 self.profile_dir = unicode(profile_dir)
345 self.profile_dir = unicode(profile_dir)
343 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
346 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
344 return super(LocalControllerLauncher, self).start()
347 return super(LocalControllerLauncher, self).start()
345
348
346
349
347 class LocalEngineLauncher(LocalProcessLauncher):
350 class LocalEngineLauncher(LocalProcessLauncher):
348 """Launch a single engine as a regular externall process."""
351 """Launch a single engine as a regular externall process."""
349
352
350 engine_cmd = List(ipengine_cmd_argv, config=True,
353 engine_cmd = List(ipengine_cmd_argv, config=True,
351 help="""command to launch the Engine.""")
354 help="""command to launch the Engine.""")
352 # Command line arguments for ipengine.
355 # Command line arguments for ipengine.
353 engine_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
356 engine_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
354 help="command-line arguments to pass to ipengine"
357 help="command-line arguments to pass to ipengine"
355 )
358 )
356
359
357 def find_args(self):
360 def find_args(self):
358 return self.engine_cmd + self.engine_args
361 return self.engine_cmd + self.engine_args
359
362
360 def start(self, profile_dir):
363 def start(self, profile_dir):
361 """Start the engine by profile_dir."""
364 """Start the engine by profile_dir."""
362 self.engine_args.extend(['profile_dir=%s'%profile_dir])
365 self.engine_args.extend(['profile_dir=%s'%profile_dir])
363 self.profile_dir = unicode(profile_dir)
366 self.profile_dir = unicode(profile_dir)
364 return super(LocalEngineLauncher, self).start()
367 return super(LocalEngineLauncher, self).start()
365
368
366
369
367 class LocalEngineSetLauncher(BaseLauncher):
370 class LocalEngineSetLauncher(BaseLauncher):
368 """Launch a set of engines as regular external processes."""
371 """Launch a set of engines as regular external processes."""
369
372
370 # Command line arguments for ipengine.
373 # Command line arguments for ipengine.
371 engine_args = List(
374 engine_args = List(
372 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
375 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
373 help="command-line arguments to pass to ipengine"
376 help="command-line arguments to pass to ipengine"
374 )
377 )
375 # launcher class
378 # launcher class
376 launcher_class = LocalEngineLauncher
379 launcher_class = LocalEngineLauncher
377
380
378 launchers = Dict()
381 launchers = Dict()
379 stop_data = Dict()
382 stop_data = Dict()
380
383
381 def __init__(self, work_dir=u'.', config=None, **kwargs):
384 def __init__(self, work_dir=u'.', config=None, **kwargs):
382 super(LocalEngineSetLauncher, self).__init__(
385 super(LocalEngineSetLauncher, self).__init__(
383 work_dir=work_dir, config=config, **kwargs
386 work_dir=work_dir, config=config, **kwargs
384 )
387 )
385 self.stop_data = {}
388 self.stop_data = {}
386
389
387 def start(self, n, profile_dir):
390 def start(self, n, profile_dir):
388 """Start n engines by profile or profile_dir."""
391 """Start n engines by profile or profile_dir."""
389 self.profile_dir = unicode(profile_dir)
392 self.profile_dir = unicode(profile_dir)
390 dlist = []
393 dlist = []
391 for i in range(n):
394 for i in range(n):
392 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
395 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
393 # Copy the engine args over to each engine launcher.
396 # Copy the engine args over to each engine launcher.
394 el.engine_args = copy.deepcopy(self.engine_args)
397 el.engine_args = copy.deepcopy(self.engine_args)
395 el.on_stop(self._notice_engine_stopped)
398 el.on_stop(self._notice_engine_stopped)
396 d = el.start(profile_dir)
399 d = el.start(profile_dir)
397 if i==0:
400 if i==0:
398 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
401 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
399 self.launchers[i] = el
402 self.launchers[i] = el
400 dlist.append(d)
403 dlist.append(d)
401 self.notify_start(dlist)
404 self.notify_start(dlist)
402 # The consumeErrors here could be dangerous
405 # The consumeErrors here could be dangerous
403 # dfinal = gatherBoth(dlist, consumeErrors=True)
406 # dfinal = gatherBoth(dlist, consumeErrors=True)
404 # dfinal.addCallback(self.notify_start)
407 # dfinal.addCallback(self.notify_start)
405 return dlist
408 return dlist
406
409
407 def find_args(self):
410 def find_args(self):
408 return ['engine set']
411 return ['engine set']
409
412
410 def signal(self, sig):
413 def signal(self, sig):
411 dlist = []
414 dlist = []
412 for el in self.launchers.itervalues():
415 for el in self.launchers.itervalues():
413 d = el.signal(sig)
416 d = el.signal(sig)
414 dlist.append(d)
417 dlist.append(d)
415 # dfinal = gatherBoth(dlist, consumeErrors=True)
418 # dfinal = gatherBoth(dlist, consumeErrors=True)
416 return dlist
419 return dlist
417
420
418 def interrupt_then_kill(self, delay=1.0):
421 def interrupt_then_kill(self, delay=1.0):
419 dlist = []
422 dlist = []
420 for el in self.launchers.itervalues():
423 for el in self.launchers.itervalues():
421 d = el.interrupt_then_kill(delay)
424 d = el.interrupt_then_kill(delay)
422 dlist.append(d)
425 dlist.append(d)
423 # dfinal = gatherBoth(dlist, consumeErrors=True)
426 # dfinal = gatherBoth(dlist, consumeErrors=True)
424 return dlist
427 return dlist
425
428
426 def stop(self):
429 def stop(self):
427 return self.interrupt_then_kill()
430 return self.interrupt_then_kill()
428
431
429 def _notice_engine_stopped(self, data):
432 def _notice_engine_stopped(self, data):
430 pid = data['pid']
433 pid = data['pid']
431 for idx,el in self.launchers.iteritems():
434 for idx,el in self.launchers.iteritems():
432 if el.process.pid == pid:
435 if el.process.pid == pid:
433 break
436 break
434 self.launchers.pop(idx)
437 self.launchers.pop(idx)
435 self.stop_data[idx] = data
438 self.stop_data[idx] = data
436 if not self.launchers:
439 if not self.launchers:
437 self.notify_stop(self.stop_data)
440 self.notify_stop(self.stop_data)
438
441
439
442
440 #-----------------------------------------------------------------------------
443 #-----------------------------------------------------------------------------
441 # MPIExec launchers
444 # MPIExec launchers
442 #-----------------------------------------------------------------------------
445 #-----------------------------------------------------------------------------
443
446
444
447
445 class MPIExecLauncher(LocalProcessLauncher):
448 class MPIExecLauncher(LocalProcessLauncher):
446 """Launch an external process using mpiexec."""
449 """Launch an external process using mpiexec."""
447
450
448 mpi_cmd = List(['mpiexec'], config=True,
451 mpi_cmd = List(['mpiexec'], config=True,
449 help="The mpiexec command to use in starting the process."
452 help="The mpiexec command to use in starting the process."
450 )
453 )
451 mpi_args = List([], config=True,
454 mpi_args = List([], config=True,
452 help="The command line arguments to pass to mpiexec."
455 help="The command line arguments to pass to mpiexec."
453 )
456 )
454 program = List(['date'], config=True,
457 program = List(['date'], config=True,
455 help="The program to start via mpiexec.")
458 help="The program to start via mpiexec.")
456 program_args = List([], config=True,
459 program_args = List([], config=True,
457 help="The command line argument to the program."
460 help="The command line argument to the program."
458 )
461 )
459 n = Int(1)
462 n = Int(1)
460
463
461 def find_args(self):
464 def find_args(self):
462 """Build self.args using all the fields."""
465 """Build self.args using all the fields."""
463 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
466 return self.mpi_cmd + ['-n', str(self.n)] + self.mpi_args + \
464 self.program + self.program_args
467 self.program + self.program_args
465
468
466 def start(self, n):
469 def start(self, n):
467 """Start n instances of the program using mpiexec."""
470 """Start n instances of the program using mpiexec."""
468 self.n = n
471 self.n = n
469 return super(MPIExecLauncher, self).start()
472 return super(MPIExecLauncher, self).start()
470
473
471
474
472 class MPIExecControllerLauncher(MPIExecLauncher):
475 class MPIExecControllerLauncher(MPIExecLauncher):
473 """Launch a controller using mpiexec."""
476 """Launch a controller using mpiexec."""
474
477
475 controller_cmd = List(ipcontroller_cmd_argv, config=True,
478 controller_cmd = List(ipcontroller_cmd_argv, config=True,
476 help="Popen command to launch the Contropper"
479 help="Popen command to launch the Contropper"
477 )
480 )
478 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
481 controller_args = List(['--log-to-file','log_level=%i'%logging.INFO], config=True,
479 help="Command line arguments to pass to ipcontroller."
482 help="Command line arguments to pass to ipcontroller."
480 )
483 )
481 n = Int(1)
484 n = Int(1)
482
485
483 def start(self, profile_dir):
486 def start(self, profile_dir):
484 """Start the controller by profile_dir."""
487 """Start the controller by profile_dir."""
485 self.controller_args.extend(['profile_dir=%s'%profile_dir])
488 self.controller_args.extend(['profile_dir=%s'%profile_dir])
486 self.profile_dir = unicode(profile_dir)
489 self.profile_dir = unicode(profile_dir)
487 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
490 self.log.info("Starting MPIExecControllerLauncher: %r" % self.args)
488 return super(MPIExecControllerLauncher, self).start(1)
491 return super(MPIExecControllerLauncher, self).start(1)
489
492
490 def find_args(self):
493 def find_args(self):
491 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
494 return self.mpi_cmd + ['-n', self.n] + self.mpi_args + \
492 self.controller_cmd + self.controller_args
495 self.controller_cmd + self.controller_args
493
496
494
497
495 class MPIExecEngineSetLauncher(MPIExecLauncher):
498 class MPIExecEngineSetLauncher(MPIExecLauncher):
496
499
497 program = List(ipengine_cmd_argv, config=True,
500 program = List(ipengine_cmd_argv, config=True,
498 help="Popen command for ipengine"
501 help="Popen command for ipengine"
499 )
502 )
500 program_args = List(
503 program_args = List(
501 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
504 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
502 help="Command line arguments for ipengine."
505 help="Command line arguments for ipengine."
503 )
506 )
504 n = Int(1)
507 n = Int(1)
505
508
506 def start(self, n, profile_dir):
509 def start(self, n, profile_dir):
507 """Start n engines by profile or profile_dir."""
510 """Start n engines by profile or profile_dir."""
508 self.program_args.extend(['profile_dir=%s'%profile_dir])
511 self.program_args.extend(['profile_dir=%s'%profile_dir])
509 self.profile_dir = unicode(profile_dir)
512 self.profile_dir = unicode(profile_dir)
510 self.n = n
513 self.n = n
511 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
514 self.log.info('Starting MPIExecEngineSetLauncher: %r' % self.args)
512 return super(MPIExecEngineSetLauncher, self).start(n)
515 return super(MPIExecEngineSetLauncher, self).start(n)
513
516
514 #-----------------------------------------------------------------------------
517 #-----------------------------------------------------------------------------
515 # SSH launchers
518 # SSH launchers
516 #-----------------------------------------------------------------------------
519 #-----------------------------------------------------------------------------
517
520
518 # TODO: Get SSH Launcher working again.
521 # TODO: Get SSH Launcher working again.
519
522
520 class SSHLauncher(LocalProcessLauncher):
523 class SSHLauncher(LocalProcessLauncher):
521 """A minimal launcher for ssh.
524 """A minimal launcher for ssh.
522
525
523 To be useful this will probably have to be extended to use the ``sshx``
526 To be useful this will probably have to be extended to use the ``sshx``
524 idea for environment variables. There could be other things this needs
527 idea for environment variables. There could be other things this needs
525 as well.
528 as well.
526 """
529 """
527
530
528 ssh_cmd = List(['ssh'], config=True,
531 ssh_cmd = List(['ssh'], config=True,
529 help="command for starting ssh")
532 help="command for starting ssh")
530 ssh_args = List(['-tt'], config=True,
533 ssh_args = List(['-tt'], config=True,
531 help="args to pass to ssh")
534 help="args to pass to ssh")
532 program = List(['date'], config=True,
535 program = List(['date'], config=True,
533 help="Program to launch via ssh")
536 help="Program to launch via ssh")
534 program_args = List([], config=True,
537 program_args = List([], config=True,
535 help="args to pass to remote program")
538 help="args to pass to remote program")
536 hostname = Unicode('', config=True,
539 hostname = Unicode('', config=True,
537 help="hostname on which to launch the program")
540 help="hostname on which to launch the program")
538 user = Unicode('', config=True,
541 user = Unicode('', config=True,
539 help="username for ssh")
542 help="username for ssh")
540 location = Unicode('', config=True,
543 location = Unicode('', config=True,
541 help="user@hostname location for ssh in one setting")
544 help="user@hostname location for ssh in one setting")
542
545
543 def _hostname_changed(self, name, old, new):
546 def _hostname_changed(self, name, old, new):
544 if self.user:
547 if self.user:
545 self.location = u'%s@%s' % (self.user, new)
548 self.location = u'%s@%s' % (self.user, new)
546 else:
549 else:
547 self.location = new
550 self.location = new
548
551
549 def _user_changed(self, name, old, new):
552 def _user_changed(self, name, old, new):
550 self.location = u'%s@%s' % (new, self.hostname)
553 self.location = u'%s@%s' % (new, self.hostname)
551
554
552 def find_args(self):
555 def find_args(self):
553 return self.ssh_cmd + self.ssh_args + [self.location] + \
556 return self.ssh_cmd + self.ssh_args + [self.location] + \
554 self.program + self.program_args
557 self.program + self.program_args
555
558
556 def start(self, profile_dir, hostname=None, user=None):
559 def start(self, profile_dir, hostname=None, user=None):
557 self.profile_dir = unicode(profile_dir)
560 self.profile_dir = unicode(profile_dir)
558 if hostname is not None:
561 if hostname is not None:
559 self.hostname = hostname
562 self.hostname = hostname
560 if user is not None:
563 if user is not None:
561 self.user = user
564 self.user = user
562
565
563 return super(SSHLauncher, self).start()
566 return super(SSHLauncher, self).start()
564
567
565 def signal(self, sig):
568 def signal(self, sig):
566 if self.state == 'running':
569 if self.state == 'running':
567 # send escaped ssh connection-closer
570 # send escaped ssh connection-closer
568 self.process.stdin.write('~.')
571 self.process.stdin.write('~.')
569 self.process.stdin.flush()
572 self.process.stdin.flush()
570
573
571
574
572
575
573 class SSHControllerLauncher(SSHLauncher):
576 class SSHControllerLauncher(SSHLauncher):
574
577
575 program = List(ipcontroller_cmd_argv, config=True,
578 program = List(ipcontroller_cmd_argv, config=True,
576 help="remote ipcontroller command.")
579 help="remote ipcontroller command.")
577 program_args = List(['--reuse-files', '--log-to-file','log_level=%i'%logging.INFO], config=True,
580 program_args = List(['--reuse-files', '--log-to-file','log_level=%i'%logging.INFO], config=True,
578 help="Command line arguments to ipcontroller.")
581 help="Command line arguments to ipcontroller.")
579
582
580
583
581 class SSHEngineLauncher(SSHLauncher):
584 class SSHEngineLauncher(SSHLauncher):
582 program = List(ipengine_cmd_argv, config=True,
585 program = List(ipengine_cmd_argv, config=True,
583 help="remote ipengine command.")
586 help="remote ipengine command.")
584 # Command line arguments for ipengine.
587 # Command line arguments for ipengine.
585 program_args = List(
588 program_args = List(
586 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
589 ['--log-to-file','log_level=%i'%logging.INFO], config=True,
587 help="Command line arguments to ipengine."
590 help="Command line arguments to ipengine."
588 )
591 )
589
592
590 class SSHEngineSetLauncher(LocalEngineSetLauncher):
593 class SSHEngineSetLauncher(LocalEngineSetLauncher):
591 launcher_class = SSHEngineLauncher
594 launcher_class = SSHEngineLauncher
592 engines = Dict(config=True,
595 engines = Dict(config=True,
593 help="""dict of engines to launch. This is a dict by hostname of ints,
596 help="""dict of engines to launch. This is a dict by hostname of ints,
594 corresponding to the number of engines to start on that host.""")
597 corresponding to the number of engines to start on that host.""")
595
598
596 def start(self, n, profile_dir):
599 def start(self, n, profile_dir):
597 """Start engines by profile or profile_dir.
600 """Start engines by profile or profile_dir.
598 `n` is ignored, and the `engines` config property is used instead.
601 `n` is ignored, and the `engines` config property is used instead.
599 """
602 """
600
603
601 self.profile_dir = unicode(profile_dir)
604 self.profile_dir = unicode(profile_dir)
602 dlist = []
605 dlist = []
603 for host, n in self.engines.iteritems():
606 for host, n in self.engines.iteritems():
604 if isinstance(n, (tuple, list)):
607 if isinstance(n, (tuple, list)):
605 n, args = n
608 n, args = n
606 else:
609 else:
607 args = copy.deepcopy(self.engine_args)
610 args = copy.deepcopy(self.engine_args)
608
611
609 if '@' in host:
612 if '@' in host:
610 user,host = host.split('@',1)
613 user,host = host.split('@',1)
611 else:
614 else:
612 user=None
615 user=None
613 for i in range(n):
616 for i in range(n):
614 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
617 el = self.launcher_class(work_dir=self.work_dir, config=self.config, log=self.log)
615
618
616 # Copy the engine args over to each engine launcher.
619 # Copy the engine args over to each engine launcher.
617 i
620 i
618 el.program_args = args
621 el.program_args = args
619 el.on_stop(self._notice_engine_stopped)
622 el.on_stop(self._notice_engine_stopped)
620 d = el.start(profile_dir, user=user, hostname=host)
623 d = el.start(profile_dir, user=user, hostname=host)
621 if i==0:
624 if i==0:
622 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
625 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
623 self.launchers[host+str(i)] = el
626 self.launchers[host+str(i)] = el
624 dlist.append(d)
627 dlist.append(d)
625 self.notify_start(dlist)
628 self.notify_start(dlist)
626 return dlist
629 return dlist
627
630
628
631
629
632
630 #-----------------------------------------------------------------------------
633 #-----------------------------------------------------------------------------
631 # Windows HPC Server 2008 scheduler launchers
634 # Windows HPC Server 2008 scheduler launchers
632 #-----------------------------------------------------------------------------
635 #-----------------------------------------------------------------------------
633
636
634
637
635 # This is only used on Windows.
638 # This is only used on Windows.
636 def find_job_cmd():
639 def find_job_cmd():
637 if WINDOWS:
640 if WINDOWS:
638 try:
641 try:
639 return find_cmd('job')
642 return find_cmd('job')
640 except (FindCmdError, ImportError):
643 except (FindCmdError, ImportError):
641 # ImportError will be raised if win32api is not installed
644 # ImportError will be raised if win32api is not installed
642 return 'job'
645 return 'job'
643 else:
646 else:
644 return 'job'
647 return 'job'
645
648
646
649
647 class WindowsHPCLauncher(BaseLauncher):
650 class WindowsHPCLauncher(BaseLauncher):
648
651
649 job_id_regexp = Unicode(r'\d+', config=True,
652 job_id_regexp = Unicode(r'\d+', config=True,
650 help="""A regular expression used to get the job id from the output of the
653 help="""A regular expression used to get the job id from the output of the
651 submit_command. """
654 submit_command. """
652 )
655 )
653 job_file_name = Unicode(u'ipython_job.xml', config=True,
656 job_file_name = Unicode(u'ipython_job.xml', config=True,
654 help="The filename of the instantiated job script.")
657 help="The filename of the instantiated job script.")
655 # The full path to the instantiated job script. This gets made dynamically
658 # The full path to the instantiated job script. This gets made dynamically
656 # by combining the work_dir with the job_file_name.
659 # by combining the work_dir with the job_file_name.
657 job_file = Unicode(u'')
660 job_file = Unicode(u'')
658 scheduler = Unicode('', config=True,
661 scheduler = Unicode('', config=True,
659 help="The hostname of the scheduler to submit the job to.")
662 help="The hostname of the scheduler to submit the job to.")
660 job_cmd = Unicode(find_job_cmd(), config=True,
663 job_cmd = Unicode(find_job_cmd(), config=True,
661 help="The command for submitting jobs.")
664 help="The command for submitting jobs.")
662
665
663 def __init__(self, work_dir=u'.', config=None, **kwargs):
666 def __init__(self, work_dir=u'.', config=None, **kwargs):
664 super(WindowsHPCLauncher, self).__init__(
667 super(WindowsHPCLauncher, self).__init__(
665 work_dir=work_dir, config=config, **kwargs
668 work_dir=work_dir, config=config, **kwargs
666 )
669 )
667
670
668 @property
671 @property
669 def job_file(self):
672 def job_file(self):
670 return os.path.join(self.work_dir, self.job_file_name)
673 return os.path.join(self.work_dir, self.job_file_name)
671
674
672 def write_job_file(self, n):
675 def write_job_file(self, n):
673 raise NotImplementedError("Implement write_job_file in a subclass.")
676 raise NotImplementedError("Implement write_job_file in a subclass.")
674
677
675 def find_args(self):
678 def find_args(self):
676 return [u'job.exe']
679 return [u'job.exe']
677
680
678 def parse_job_id(self, output):
681 def parse_job_id(self, output):
679 """Take the output of the submit command and return the job id."""
682 """Take the output of the submit command and return the job id."""
680 m = re.search(self.job_id_regexp, output)
683 m = re.search(self.job_id_regexp, output)
681 if m is not None:
684 if m is not None:
682 job_id = m.group()
685 job_id = m.group()
683 else:
686 else:
684 raise LauncherError("Job id couldn't be determined: %s" % output)
687 raise LauncherError("Job id couldn't be determined: %s" % output)
685 self.job_id = job_id
688 self.job_id = job_id
686 self.log.info('Job started with job id: %r' % job_id)
689 self.log.info('Job started with job id: %r' % job_id)
687 return job_id
690 return job_id
688
691
689 def start(self, n):
692 def start(self, n):
690 """Start n copies of the process using the Win HPC job scheduler."""
693 """Start n copies of the process using the Win HPC job scheduler."""
691 self.write_job_file(n)
694 self.write_job_file(n)
692 args = [
695 args = [
693 'submit',
696 'submit',
694 '/jobfile:%s' % self.job_file,
697 '/jobfile:%s' % self.job_file,
695 '/scheduler:%s' % self.scheduler
698 '/scheduler:%s' % self.scheduler
696 ]
699 ]
697 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
700 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
698 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
701 # Twisted will raise DeprecationWarnings if we try to pass unicode to this
699 output = check_output([self.job_cmd]+args,
702 output = check_output([self.job_cmd]+args,
700 env=os.environ,
703 env=os.environ,
701 cwd=self.work_dir,
704 cwd=self.work_dir,
702 stderr=STDOUT
705 stderr=STDOUT
703 )
706 )
704 job_id = self.parse_job_id(output)
707 job_id = self.parse_job_id(output)
705 self.notify_start(job_id)
708 self.notify_start(job_id)
706 return job_id
709 return job_id
707
710
708 def stop(self):
711 def stop(self):
709 args = [
712 args = [
710 'cancel',
713 'cancel',
711 self.job_id,
714 self.job_id,
712 '/scheduler:%s' % self.scheduler
715 '/scheduler:%s' % self.scheduler
713 ]
716 ]
714 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
717 self.log.info("Stopping Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
715 try:
718 try:
716 output = check_output([self.job_cmd]+args,
719 output = check_output([self.job_cmd]+args,
717 env=os.environ,
720 env=os.environ,
718 cwd=self.work_dir,
721 cwd=self.work_dir,
719 stderr=STDOUT
722 stderr=STDOUT
720 )
723 )
721 except:
724 except:
722 output = 'The job already appears to be stoppped: %r' % self.job_id
725 output = 'The job already appears to be stoppped: %r' % self.job_id
723 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
726 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
724 return output
727 return output
725
728
726
729
727 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
730 class WindowsHPCControllerLauncher(WindowsHPCLauncher):
728
731
729 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
732 job_file_name = Unicode(u'ipcontroller_job.xml', config=True,
730 help="WinHPC xml job file.")
733 help="WinHPC xml job file.")
731 extra_args = List([], config=False,
734 extra_args = List([], config=False,
732 help="extra args to pass to ipcontroller")
735 help="extra args to pass to ipcontroller")
733
736
734 def write_job_file(self, n):
737 def write_job_file(self, n):
735 job = IPControllerJob(config=self.config)
738 job = IPControllerJob(config=self.config)
736
739
737 t = IPControllerTask(config=self.config)
740 t = IPControllerTask(config=self.config)
738 # The tasks work directory is *not* the actual work directory of
741 # The tasks work directory is *not* the actual work directory of
739 # the controller. It is used as the base path for the stdout/stderr
742 # the controller. It is used as the base path for the stdout/stderr
740 # files that the scheduler redirects to.
743 # files that the scheduler redirects to.
741 t.work_directory = self.profile_dir
744 t.work_directory = self.profile_dir
742 # Add the profile_dir and from self.start().
745 # Add the profile_dir and from self.start().
743 t.controller_args.extend(self.extra_args)
746 t.controller_args.extend(self.extra_args)
744 job.add_task(t)
747 job.add_task(t)
745
748
746 self.log.info("Writing job description file: %s" % self.job_file)
749 self.log.info("Writing job description file: %s" % self.job_file)
747 job.write(self.job_file)
750 job.write(self.job_file)
748
751
749 @property
752 @property
750 def job_file(self):
753 def job_file(self):
751 return os.path.join(self.profile_dir, self.job_file_name)
754 return os.path.join(self.profile_dir, self.job_file_name)
752
755
753 def start(self, profile_dir):
756 def start(self, profile_dir):
754 """Start the controller by profile_dir."""
757 """Start the controller by profile_dir."""
755 self.extra_args = ['profile_dir=%s'%profile_dir]
758 self.extra_args = ['profile_dir=%s'%profile_dir]
756 self.profile_dir = unicode(profile_dir)
759 self.profile_dir = unicode(profile_dir)
757 return super(WindowsHPCControllerLauncher, self).start(1)
760 return super(WindowsHPCControllerLauncher, self).start(1)
758
761
759
762
760 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
763 class WindowsHPCEngineSetLauncher(WindowsHPCLauncher):
761
764
762 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
765 job_file_name = Unicode(u'ipengineset_job.xml', config=True,
763 help="jobfile for ipengines job")
766 help="jobfile for ipengines job")
764 extra_args = List([], config=False,
767 extra_args = List([], config=False,
765 help="extra args to pas to ipengine")
768 help="extra args to pas to ipengine")
766
769
767 def write_job_file(self, n):
770 def write_job_file(self, n):
768 job = IPEngineSetJob(config=self.config)
771 job = IPEngineSetJob(config=self.config)
769
772
770 for i in range(n):
773 for i in range(n):
771 t = IPEngineTask(config=self.config)
774 t = IPEngineTask(config=self.config)
772 # The tasks work directory is *not* the actual work directory of
775 # The tasks work directory is *not* the actual work directory of
773 # the engine. It is used as the base path for the stdout/stderr
776 # the engine. It is used as the base path for the stdout/stderr
774 # files that the scheduler redirects to.
777 # files that the scheduler redirects to.
775 t.work_directory = self.profile_dir
778 t.work_directory = self.profile_dir
776 # Add the profile_dir and from self.start().
779 # Add the profile_dir and from self.start().
777 t.engine_args.extend(self.extra_args)
780 t.engine_args.extend(self.extra_args)
778 job.add_task(t)
781 job.add_task(t)
779
782
780 self.log.info("Writing job description file: %s" % self.job_file)
783 self.log.info("Writing job description file: %s" % self.job_file)
781 job.write(self.job_file)
784 job.write(self.job_file)
782
785
783 @property
786 @property
784 def job_file(self):
787 def job_file(self):
785 return os.path.join(self.profile_dir, self.job_file_name)
788 return os.path.join(self.profile_dir, self.job_file_name)
786
789
787 def start(self, n, profile_dir):
790 def start(self, n, profile_dir):
788 """Start the controller by profile_dir."""
791 """Start the controller by profile_dir."""
789 self.extra_args = ['profile_dir=%s'%profile_dir]
792 self.extra_args = ['profile_dir=%s'%profile_dir]
790 self.profile_dir = unicode(profile_dir)
793 self.profile_dir = unicode(profile_dir)
791 return super(WindowsHPCEngineSetLauncher, self).start(n)
794 return super(WindowsHPCEngineSetLauncher, self).start(n)
792
795
793
796
794 #-----------------------------------------------------------------------------
797 #-----------------------------------------------------------------------------
795 # Batch (PBS) system launchers
798 # Batch (PBS) system launchers
796 #-----------------------------------------------------------------------------
799 #-----------------------------------------------------------------------------
797
800
798 class BatchSystemLauncher(BaseLauncher):
801 class BatchSystemLauncher(BaseLauncher):
799 """Launch an external process using a batch system.
802 """Launch an external process using a batch system.
800
803
801 This class is designed to work with UNIX batch systems like PBS, LSF,
804 This class is designed to work with UNIX batch systems like PBS, LSF,
802 GridEngine, etc. The overall model is that there are different commands
805 GridEngine, etc. The overall model is that there are different commands
803 like qsub, qdel, etc. that handle the starting and stopping of the process.
806 like qsub, qdel, etc. that handle the starting and stopping of the process.
804
807
805 This class also has the notion of a batch script. The ``batch_template``
808 This class also has the notion of a batch script. The ``batch_template``
806 attribute can be set to a string that is a template for the batch script.
809 attribute can be set to a string that is a template for the batch script.
807 This template is instantiated using string formatting. Thus the template can
810 This template is instantiated using string formatting. Thus the template can
808 use {n} fot the number of instances. Subclasses can add additional variables
811 use {n} fot the number of instances. Subclasses can add additional variables
809 to the template dict.
812 to the template dict.
810 """
813 """
811
814
812 # Subclasses must fill these in. See PBSEngineSet
815 # Subclasses must fill these in. See PBSEngineSet
813 submit_command = List([''], config=True,
816 submit_command = List([''], config=True,
814 help="The name of the command line program used to submit jobs.")
817 help="The name of the command line program used to submit jobs.")
815 delete_command = List([''], config=True,
818 delete_command = List([''], config=True,
816 help="The name of the command line program used to delete jobs.")
819 help="The name of the command line program used to delete jobs.")
817 job_id_regexp = Unicode('', config=True,
820 job_id_regexp = Unicode('', config=True,
818 help="""A regular expression used to get the job id from the output of the
821 help="""A regular expression used to get the job id from the output of the
819 submit_command.""")
822 submit_command.""")
820 batch_template = Unicode('', config=True,
823 batch_template = Unicode('', config=True,
821 help="The string that is the batch script template itself.")
824 help="The string that is the batch script template itself.")
822 batch_template_file = Unicode(u'', config=True,
825 batch_template_file = Unicode(u'', config=True,
823 help="The file that contains the batch template.")
826 help="The file that contains the batch template.")
824 batch_file_name = Unicode(u'batch_script', config=True,
827 batch_file_name = Unicode(u'batch_script', config=True,
825 help="The filename of the instantiated batch script.")
828 help="The filename of the instantiated batch script.")
826 queue = Unicode(u'', config=True,
829 queue = Unicode(u'', config=True,
827 help="The PBS Queue.")
830 help="The PBS Queue.")
828
831
829 # not configurable, override in subclasses
832 # not configurable, override in subclasses
830 # PBS Job Array regex
833 # PBS Job Array regex
831 job_array_regexp = Unicode('')
834 job_array_regexp = Unicode('')
832 job_array_template = Unicode('')
835 job_array_template = Unicode('')
833 # PBS Queue regex
836 # PBS Queue regex
834 queue_regexp = Unicode('')
837 queue_regexp = Unicode('')
835 queue_template = Unicode('')
838 queue_template = Unicode('')
836 # The default batch template, override in subclasses
839 # The default batch template, override in subclasses
837 default_template = Unicode('')
840 default_template = Unicode('')
838 # The full path to the instantiated batch script.
841 # The full path to the instantiated batch script.
839 batch_file = Unicode(u'')
842 batch_file = Unicode(u'')
840 # the format dict used with batch_template:
843 # the format dict used with batch_template:
841 context = Dict()
844 context = Dict()
842 # the Formatter instance for rendering the templates:
845 # the Formatter instance for rendering the templates:
843 formatter = Instance(EvalFormatter, (), {})
846 formatter = Instance(EvalFormatter, (), {})
844
847
845
848
846 def find_args(self):
849 def find_args(self):
847 return self.submit_command + [self.batch_file]
850 return self.submit_command + [self.batch_file]
848
851
849 def __init__(self, work_dir=u'.', config=None, **kwargs):
852 def __init__(self, work_dir=u'.', config=None, **kwargs):
850 super(BatchSystemLauncher, self).__init__(
853 super(BatchSystemLauncher, self).__init__(
851 work_dir=work_dir, config=config, **kwargs
854 work_dir=work_dir, config=config, **kwargs
852 )
855 )
853 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
856 self.batch_file = os.path.join(self.work_dir, self.batch_file_name)
854
857
855 def parse_job_id(self, output):
858 def parse_job_id(self, output):
856 """Take the output of the submit command and return the job id."""
859 """Take the output of the submit command and return the job id."""
857 m = re.search(self.job_id_regexp, output)
860 m = re.search(self.job_id_regexp, output)
858 if m is not None:
861 if m is not None:
859 job_id = m.group()
862 job_id = m.group()
860 else:
863 else:
861 raise LauncherError("Job id couldn't be determined: %s" % output)
864 raise LauncherError("Job id couldn't be determined: %s" % output)
862 self.job_id = job_id
865 self.job_id = job_id
863 self.log.info('Job submitted with job id: %r' % job_id)
866 self.log.info('Job submitted with job id: %r' % job_id)
864 return job_id
867 return job_id
865
868
866 def write_batch_script(self, n):
869 def write_batch_script(self, n):
867 """Instantiate and write the batch script to the work_dir."""
870 """Instantiate and write the batch script to the work_dir."""
868 self.context['n'] = n
871 self.context['n'] = n
869 self.context['queue'] = self.queue
872 self.context['queue'] = self.queue
870 # first priority is batch_template if set
873 # first priority is batch_template if set
871 if self.batch_template_file and not self.batch_template:
874 if self.batch_template_file and not self.batch_template:
872 # second priority is batch_template_file
875 # second priority is batch_template_file
873 with open(self.batch_template_file) as f:
876 with open(self.batch_template_file) as f:
874 self.batch_template = f.read()
877 self.batch_template = f.read()
875 if not self.batch_template:
878 if not self.batch_template:
876 # third (last) priority is default_template
879 # third (last) priority is default_template
877 self.batch_template = self.default_template
880 self.batch_template = self.default_template
878
881
879 regex = re.compile(self.job_array_regexp)
882 regex = re.compile(self.job_array_regexp)
880 # print regex.search(self.batch_template)
883 # print regex.search(self.batch_template)
881 if not regex.search(self.batch_template):
884 if not regex.search(self.batch_template):
882 self.log.info("adding job array settings to batch script")
885 self.log.info("adding job array settings to batch script")
883 firstline, rest = self.batch_template.split('\n',1)
886 firstline, rest = self.batch_template.split('\n',1)
884 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
887 self.batch_template = u'\n'.join([firstline, self.job_array_template, rest])
885
888
886 regex = re.compile(self.queue_regexp)
889 regex = re.compile(self.queue_regexp)
887 # print regex.search(self.batch_template)
890 # print regex.search(self.batch_template)
888 if self.queue and not regex.search(self.batch_template):
891 if self.queue and not regex.search(self.batch_template):
889 self.log.info("adding PBS queue settings to batch script")
892 self.log.info("adding PBS queue settings to batch script")
890 firstline, rest = self.batch_template.split('\n',1)
893 firstline, rest = self.batch_template.split('\n',1)
891 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
894 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
892
895
893 script_as_string = self.formatter.format(self.batch_template, **self.context)
896 script_as_string = self.formatter.format(self.batch_template, **self.context)
894 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
897 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
895
898
896 with open(self.batch_file, 'w') as f:
899 with open(self.batch_file, 'w') as f:
897 f.write(script_as_string)
900 f.write(script_as_string)
898 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
901 os.chmod(self.batch_file, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
899
902
900 def start(self, n, profile_dir):
903 def start(self, n, profile_dir):
901 """Start n copies of the process using a batch system."""
904 """Start n copies of the process using a batch system."""
902 # Here we save profile_dir in the context so they
905 # Here we save profile_dir in the context so they
903 # can be used in the batch script template as {profile_dir}
906 # can be used in the batch script template as {profile_dir}
904 self.context['profile_dir'] = profile_dir
907 self.context['profile_dir'] = profile_dir
905 self.profile_dir = unicode(profile_dir)
908 self.profile_dir = unicode(profile_dir)
906 self.write_batch_script(n)
909 self.write_batch_script(n)
907 output = check_output(self.args, env=os.environ)
910 output = check_output(self.args, env=os.environ)
908
911
909 job_id = self.parse_job_id(output)
912 job_id = self.parse_job_id(output)
910 self.notify_start(job_id)
913 self.notify_start(job_id)
911 return job_id
914 return job_id
912
915
913 def stop(self):
916 def stop(self):
914 output = check_output(self.delete_command+[self.job_id], env=os.environ)
917 output = check_output(self.delete_command+[self.job_id], env=os.environ)
915 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
918 self.notify_stop(dict(job_id=self.job_id, output=output)) # Pass the output of the kill cmd
916 return output
919 return output
917
920
918
921
919 class PBSLauncher(BatchSystemLauncher):
922 class PBSLauncher(BatchSystemLauncher):
920 """A BatchSystemLauncher subclass for PBS."""
923 """A BatchSystemLauncher subclass for PBS."""
921
924
922 submit_command = List(['qsub'], config=True,
925 submit_command = List(['qsub'], config=True,
923 help="The PBS submit command ['qsub']")
926 help="The PBS submit command ['qsub']")
924 delete_command = List(['qdel'], config=True,
927 delete_command = List(['qdel'], config=True,
925 help="The PBS delete command ['qsub']")
928 help="The PBS delete command ['qsub']")
926 job_id_regexp = Unicode(r'\d+', config=True,
929 job_id_regexp = Unicode(r'\d+', config=True,
927 help="Regular expresion for identifying the job ID [r'\d+']")
930 help="Regular expresion for identifying the job ID [r'\d+']")
928
931
929 batch_file = Unicode(u'')
932 batch_file = Unicode(u'')
930 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
933 job_array_regexp = Unicode('#PBS\W+-t\W+[\w\d\-\$]+')
931 job_array_template = Unicode('#PBS -t 1-{n}')
934 job_array_template = Unicode('#PBS -t 1-{n}')
932 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
935 queue_regexp = Unicode('#PBS\W+-q\W+\$?\w+')
933 queue_template = Unicode('#PBS -q {queue}')
936 queue_template = Unicode('#PBS -q {queue}')
934
937
935
938
936 class PBSControllerLauncher(PBSLauncher):
939 class PBSControllerLauncher(PBSLauncher):
937 """Launch a controller using PBS."""
940 """Launch a controller using PBS."""
938
941
939 batch_file_name = Unicode(u'pbs_controller', config=True,
942 batch_file_name = Unicode(u'pbs_controller', config=True,
940 help="batch file name for the controller job.")
943 help="batch file name for the controller job.")
941 default_template= Unicode("""#!/bin/sh
944 default_template= Unicode("""#!/bin/sh
942 #PBS -V
945 #PBS -V
943 #PBS -N ipcontroller
946 #PBS -N ipcontroller
944 %s --log-to-file profile_dir={profile_dir}
947 %s --log-to-file profile_dir={profile_dir}
945 """%(' '.join(ipcontroller_cmd_argv)))
948 """%(' '.join(ipcontroller_cmd_argv)))
946
949
947 def start(self, profile_dir):
950 def start(self, profile_dir):
948 """Start the controller by profile or profile_dir."""
951 """Start the controller by profile or profile_dir."""
949 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
952 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
950 return super(PBSControllerLauncher, self).start(1, profile_dir)
953 return super(PBSControllerLauncher, self).start(1, profile_dir)
951
954
952
955
953 class PBSEngineSetLauncher(PBSLauncher):
956 class PBSEngineSetLauncher(PBSLauncher):
954 """Launch Engines using PBS"""
957 """Launch Engines using PBS"""
955 batch_file_name = Unicode(u'pbs_engines', config=True,
958 batch_file_name = Unicode(u'pbs_engines', config=True,
956 help="batch file name for the engine(s) job.")
959 help="batch file name for the engine(s) job.")
957 default_template= Unicode(u"""#!/bin/sh
960 default_template= Unicode(u"""#!/bin/sh
958 #PBS -V
961 #PBS -V
959 #PBS -N ipengine
962 #PBS -N ipengine
960 %s profile_dir={profile_dir}
963 %s profile_dir={profile_dir}
961 """%(' '.join(ipengine_cmd_argv)))
964 """%(' '.join(ipengine_cmd_argv)))
962
965
963 def start(self, n, profile_dir):
966 def start(self, n, profile_dir):
964 """Start n engines by profile or profile_dir."""
967 """Start n engines by profile or profile_dir."""
965 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
968 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
966 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
969 return super(PBSEngineSetLauncher, self).start(n, profile_dir)
967
970
968 #SGE is very similar to PBS
971 #SGE is very similar to PBS
969
972
970 class SGELauncher(PBSLauncher):
973 class SGELauncher(PBSLauncher):
971 """Sun GridEngine is a PBS clone with slightly different syntax"""
974 """Sun GridEngine is a PBS clone with slightly different syntax"""
972 job_array_regexp = Unicode('#\$\W+\-t')
975 job_array_regexp = Unicode('#\$\W+\-t')
973 job_array_template = Unicode('#$ -t 1-{n}')
976 job_array_template = Unicode('#$ -t 1-{n}')
974 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
977 queue_regexp = Unicode('#\$\W+-q\W+\$?\w+')
975 queue_template = Unicode('#$ -q $queue')
978 queue_template = Unicode('#$ -q $queue')
976
979
977 class SGEControllerLauncher(SGELauncher):
980 class SGEControllerLauncher(SGELauncher):
978 """Launch a controller using SGE."""
981 """Launch a controller using SGE."""
979
982
980 batch_file_name = Unicode(u'sge_controller', config=True,
983 batch_file_name = Unicode(u'sge_controller', config=True,
981 help="batch file name for the ipontroller job.")
984 help="batch file name for the ipontroller job.")
982 default_template= Unicode(u"""#$ -V
985 default_template= Unicode(u"""#$ -V
983 #$ -S /bin/sh
986 #$ -S /bin/sh
984 #$ -N ipcontroller
987 #$ -N ipcontroller
985 %s --log-to-file profile_dir={profile_dir}
988 %s --log-to-file profile_dir={profile_dir}
986 """%(' '.join(ipcontroller_cmd_argv)))
989 """%(' '.join(ipcontroller_cmd_argv)))
987
990
988 def start(self, profile_dir):
991 def start(self, profile_dir):
989 """Start the controller by profile or profile_dir."""
992 """Start the controller by profile or profile_dir."""
990 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
993 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
991 return super(SGEControllerLauncher, self).start(1, profile_dir)
994 return super(SGEControllerLauncher, self).start(1, profile_dir)
992
995
993 class SGEEngineSetLauncher(SGELauncher):
996 class SGEEngineSetLauncher(SGELauncher):
994 """Launch Engines with SGE"""
997 """Launch Engines with SGE"""
995 batch_file_name = Unicode(u'sge_engines', config=True,
998 batch_file_name = Unicode(u'sge_engines', config=True,
996 help="batch file name for the engine(s) job.")
999 help="batch file name for the engine(s) job.")
997 default_template = Unicode("""#$ -V
1000 default_template = Unicode("""#$ -V
998 #$ -S /bin/sh
1001 #$ -S /bin/sh
999 #$ -N ipengine
1002 #$ -N ipengine
1000 %s profile_dir={profile_dir}
1003 %s profile_dir={profile_dir}
1001 """%(' '.join(ipengine_cmd_argv)))
1004 """%(' '.join(ipengine_cmd_argv)))
1002
1005
1003 def start(self, n, profile_dir):
1006 def start(self, n, profile_dir):
1004 """Start n engines by profile or profile_dir."""
1007 """Start n engines by profile or profile_dir."""
1005 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1008 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1006 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1009 return super(SGEEngineSetLauncher, self).start(n, profile_dir)
1007
1010
1008
1011
1009 #-----------------------------------------------------------------------------
1012 #-----------------------------------------------------------------------------
1010 # A launcher for ipcluster itself!
1013 # A launcher for ipcluster itself!
1011 #-----------------------------------------------------------------------------
1014 #-----------------------------------------------------------------------------
1012
1015
1013
1016
1014 class IPClusterLauncher(LocalProcessLauncher):
1017 class IPClusterLauncher(LocalProcessLauncher):
1015 """Launch the ipcluster program in an external process."""
1018 """Launch the ipcluster program in an external process."""
1016
1019
1017 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1020 ipcluster_cmd = List(ipcluster_cmd_argv, config=True,
1018 help="Popen command for ipcluster")
1021 help="Popen command for ipcluster")
1019 ipcluster_args = List(
1022 ipcluster_args = List(
1020 ['--clean-logs', '--log-to-file', 'log_level=%i'%logging.INFO], config=True,
1023 ['--clean-logs', '--log-to-file', 'log_level=%i'%logging.INFO], config=True,
1021 help="Command line arguments to pass to ipcluster.")
1024 help="Command line arguments to pass to ipcluster.")
1022 ipcluster_subcommand = Unicode('start')
1025 ipcluster_subcommand = Unicode('start')
1023 ipcluster_n = Int(2)
1026 ipcluster_n = Int(2)
1024
1027
1025 def find_args(self):
1028 def find_args(self):
1026 return self.ipcluster_cmd + ['--'+self.ipcluster_subcommand] + \
1029 return self.ipcluster_cmd + ['--'+self.ipcluster_subcommand] + \
1027 ['n=%i'%self.ipcluster_n] + self.ipcluster_args
1030 ['n=%i'%self.ipcluster_n] + self.ipcluster_args
1028
1031
1029 def start(self):
1032 def start(self):
1030 self.log.info("Starting ipcluster: %r" % self.args)
1033 self.log.info("Starting ipcluster: %r" % self.args)
1031 return super(IPClusterLauncher, self).start()
1034 return super(IPClusterLauncher, self).start()
1032
1035
1033 #-----------------------------------------------------------------------------
1036 #-----------------------------------------------------------------------------
1034 # Collections of launchers
1037 # Collections of launchers
1035 #-----------------------------------------------------------------------------
1038 #-----------------------------------------------------------------------------
1036
1039
1037 local_launchers = [
1040 local_launchers = [
1038 LocalControllerLauncher,
1041 LocalControllerLauncher,
1039 LocalEngineLauncher,
1042 LocalEngineLauncher,
1040 LocalEngineSetLauncher,
1043 LocalEngineSetLauncher,
1041 ]
1044 ]
1042 mpi_launchers = [
1045 mpi_launchers = [
1043 MPIExecLauncher,
1046 MPIExecLauncher,
1044 MPIExecControllerLauncher,
1047 MPIExecControllerLauncher,
1045 MPIExecEngineSetLauncher,
1048 MPIExecEngineSetLauncher,
1046 ]
1049 ]
1047 ssh_launchers = [
1050 ssh_launchers = [
1048 SSHLauncher,
1051 SSHLauncher,
1049 SSHControllerLauncher,
1052 SSHControllerLauncher,
1050 SSHEngineLauncher,
1053 SSHEngineLauncher,
1051 SSHEngineSetLauncher,
1054 SSHEngineSetLauncher,
1052 ]
1055 ]
1053 winhpc_launchers = [
1056 winhpc_launchers = [
1054 WindowsHPCLauncher,
1057 WindowsHPCLauncher,
1055 WindowsHPCControllerLauncher,
1058 WindowsHPCControllerLauncher,
1056 WindowsHPCEngineSetLauncher,
1059 WindowsHPCEngineSetLauncher,
1057 ]
1060 ]
1058 pbs_launchers = [
1061 pbs_launchers = [
1059 PBSLauncher,
1062 PBSLauncher,
1060 PBSControllerLauncher,
1063 PBSControllerLauncher,
1061 PBSEngineSetLauncher,
1064 PBSEngineSetLauncher,
1062 ]
1065 ]
1063 sge_launchers = [
1066 sge_launchers = [
1064 SGELauncher,
1067 SGELauncher,
1065 SGEControllerLauncher,
1068 SGEControllerLauncher,
1066 SGEEngineSetLauncher,
1069 SGEEngineSetLauncher,
1067 ]
1070 ]
1068 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1071 all_launchers = local_launchers + mpi_launchers + ssh_launchers + winhpc_launchers\
1069 + pbs_launchers + sge_launchers
1072 + pbs_launchers + sge_launchers
@@ -1,110 +1,113 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple logger object that consolidates messages incoming from ipcluster processes."""
2 """A simple logger object that consolidates messages incoming from ipcluster processes."""
3
3
4 #-----------------------------------------------------------------------------
4 #-----------------------------------------------------------------------------
5 # Copyright (C) 2011 The IPython Development Team
5 # Copyright (C) 2011 The IPython Development Team
6 #
6 #
7 # Distributed under the terms of the BSD License. The full license is in
7 # Distributed under the terms of the BSD License. The full license is in
8 # the file COPYING, distributed as part of this software.
8 # the file COPYING, distributed as part of this software.
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10
10
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Imports
12 # Imports
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
14
15
15
16 import logging
16 import logging
17 import sys
17 import sys
18
18
19 import zmq
19 import zmq
20 from zmq.eventloop import ioloop, zmqstream
20 from zmq.eventloop import ioloop, zmqstream
21
21
22 from IPython.config.application import Application
22 from IPython.config.configurable import Configurable
23 from IPython.config.configurable import Configurable
23 from IPython.utils.traitlets import Int, Unicode, Instance, List
24 from IPython.utils.traitlets import Int, Unicode, Instance, List
24
25
25 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
26 # Classes
27 # Classes
27 #-----------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
28
29
29
30
30 class LogWatcher(Configurable):
31 class LogWatcher(Configurable):
31 """A simple class that receives messages on a SUB socket, as published
32 """A simple class that receives messages on a SUB socket, as published
32 by subclasses of `zmq.log.handlers.PUBHandler`, and logs them itself.
33 by subclasses of `zmq.log.handlers.PUBHandler`, and logs them itself.
33
34
34 This can subscribe to multiple topics, but defaults to all topics.
35 This can subscribe to multiple topics, but defaults to all topics.
35 """
36 """
36
37
37 log = Instance('logging.Logger', ('root',))
38 log = Instance('logging.Logger')
39 def _log_default(self):
40 return Application.instance().log
38
41
39 # configurables
42 # configurables
40 topics = List([''], config=True,
43 topics = List([''], config=True,
41 help="The ZMQ topics to subscribe to. Default is to subscribe to all messages")
44 help="The ZMQ topics to subscribe to. Default is to subscribe to all messages")
42 url = Unicode('tcp://127.0.0.1:20202', config=True,
45 url = Unicode('tcp://127.0.0.1:20202', config=True,
43 help="ZMQ url on which to listen for log messages")
46 help="ZMQ url on which to listen for log messages")
44
47
45 # internals
48 # internals
46 stream = Instance('zmq.eventloop.zmqstream.ZMQStream')
49 stream = Instance('zmq.eventloop.zmqstream.ZMQStream')
47
50
48 context = Instance(zmq.Context)
51 context = Instance(zmq.Context)
49 def _context_default(self):
52 def _context_default(self):
50 return zmq.Context.instance()
53 return zmq.Context.instance()
51
54
52 loop = Instance(zmq.eventloop.ioloop.IOLoop)
55 loop = Instance(zmq.eventloop.ioloop.IOLoop)
53 def _loop_default(self):
56 def _loop_default(self):
54 return ioloop.IOLoop.instance()
57 return ioloop.IOLoop.instance()
55
58
56 def __init__(self, **kwargs):
59 def __init__(self, **kwargs):
57 super(LogWatcher, self).__init__(**kwargs)
60 super(LogWatcher, self).__init__(**kwargs)
58 s = self.context.socket(zmq.SUB)
61 s = self.context.socket(zmq.SUB)
59 s.bind(self.url)
62 s.bind(self.url)
60 self.stream = zmqstream.ZMQStream(s, self.loop)
63 self.stream = zmqstream.ZMQStream(s, self.loop)
61 self.subscribe()
64 self.subscribe()
62 self.on_trait_change(self.subscribe, 'topics')
65 self.on_trait_change(self.subscribe, 'topics')
63
66
64 def start(self):
67 def start(self):
65 self.stream.on_recv(self.log_message)
68 self.stream.on_recv(self.log_message)
66
69
67 def stop(self):
70 def stop(self):
68 self.stream.stop_on_recv()
71 self.stream.stop_on_recv()
69
72
70 def subscribe(self):
73 def subscribe(self):
71 """Update our SUB socket's subscriptions."""
74 """Update our SUB socket's subscriptions."""
72 self.stream.setsockopt(zmq.UNSUBSCRIBE, '')
75 self.stream.setsockopt(zmq.UNSUBSCRIBE, '')
73 if '' in self.topics:
76 if '' in self.topics:
74 self.log.debug("Subscribing to: everything")
77 self.log.debug("Subscribing to: everything")
75 self.stream.setsockopt(zmq.SUBSCRIBE, '')
78 self.stream.setsockopt(zmq.SUBSCRIBE, '')
76 else:
79 else:
77 for topic in self.topics:
80 for topic in self.topics:
78 self.log.debug("Subscribing to: %r"%(topic))
81 self.log.debug("Subscribing to: %r"%(topic))
79 self.stream.setsockopt(zmq.SUBSCRIBE, topic)
82 self.stream.setsockopt(zmq.SUBSCRIBE, topic)
80
83
81 def _extract_level(self, topic_str):
84 def _extract_level(self, topic_str):
82 """Turn 'engine.0.INFO.extra' into (logging.INFO, 'engine.0.extra')"""
85 """Turn 'engine.0.INFO.extra' into (logging.INFO, 'engine.0.extra')"""
83 topics = topic_str.split('.')
86 topics = topic_str.split('.')
84 for idx,t in enumerate(topics):
87 for idx,t in enumerate(topics):
85 level = getattr(logging, t, None)
88 level = getattr(logging, t, None)
86 if level is not None:
89 if level is not None:
87 break
90 break
88
91
89 if level is None:
92 if level is None:
90 level = logging.INFO
93 level = logging.INFO
91 else:
94 else:
92 topics.pop(idx)
95 topics.pop(idx)
93
96
94 return level, '.'.join(topics)
97 return level, '.'.join(topics)
95
98
96
99
97 def log_message(self, raw):
100 def log_message(self, raw):
98 """receive and parse a message, then log it."""
101 """receive and parse a message, then log it."""
99 if len(raw) != 2 or '.' not in raw[0]:
102 if len(raw) != 2 or '.' not in raw[0]:
100 self.log.error("Invalid log message: %s"%raw)
103 self.log.error("Invalid log message: %s"%raw)
101 return
104 return
102 else:
105 else:
103 topic, msg = raw
106 topic, msg = raw
104 # don't newline, since log messages always newline:
107 # don't newline, since log messages always newline:
105 topic,level_name = topic.rsplit('.',1)
108 topic,level_name = topic.rsplit('.',1)
106 level,topic = self._extract_level(topic)
109 level,topic = self._extract_level(topic)
107 if msg[-1] == '\n':
110 if msg[-1] == '\n':
108 msg = msg[:-1]
111 msg = msg[:-1]
109 self.log.log(level, "[%s] %s" % (topic, msg))
112 self.log.log(level, "[%s] %s" % (topic, msg))
110
113
@@ -1,181 +1,184 b''
1 """A Task logger that presents our DB interface,
1 """A Task logger that presents our DB interface,
2 but exists entirely in memory and implemented with dicts.
2 but exists entirely in memory and implemented with dicts.
3
3
4 TaskRecords are dicts of the form:
4 TaskRecords are dicts of the form:
5 {
5 {
6 'msg_id' : str(uuid),
6 'msg_id' : str(uuid),
7 'client_uuid' : str(uuid),
7 'client_uuid' : str(uuid),
8 'engine_uuid' : str(uuid) or None,
8 'engine_uuid' : str(uuid) or None,
9 'header' : dict(header),
9 'header' : dict(header),
10 'content': dict(content),
10 'content': dict(content),
11 'buffers': list(buffers),
11 'buffers': list(buffers),
12 'submitted': datetime,
12 'submitted': datetime,
13 'started': datetime or None,
13 'started': datetime or None,
14 'completed': datetime or None,
14 'completed': datetime or None,
15 'resubmitted': datetime or None,
15 'resubmitted': datetime or None,
16 'result_header' : dict(header) or None,
16 'result_header' : dict(header) or None,
17 'result_content' : dict(content) or None,
17 'result_content' : dict(content) or None,
18 'result_buffers' : list(buffers) or None,
18 'result_buffers' : list(buffers) or None,
19 }
19 }
20 With this info, many of the special categories of tasks can be defined by query:
20 With this info, many of the special categories of tasks can be defined by query:
21
21
22 pending: completed is None
22 pending: completed is None
23 client's outstanding: client_uuid = uuid && completed is None
23 client's outstanding: client_uuid = uuid && completed is None
24 MIA: arrived is None (and completed is None)
24 MIA: arrived is None (and completed is None)
25 etc.
25 etc.
26
26
27 EngineRecords are dicts of the form:
27 EngineRecords are dicts of the form:
28 {
28 {
29 'eid' : int(id),
29 'eid' : int(id),
30 'uuid': str(uuid)
30 'uuid': str(uuid)
31 }
31 }
32 This may be extended, but is currently.
32 This may be extended, but is currently.
33
33
34 We support a subset of mongodb operators:
34 We support a subset of mongodb operators:
35 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
35 $lt,$gt,$lte,$gte,$ne,$in,$nin,$all,$mod,$exists
36 """
36 """
37 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
38 # Copyright (C) 2010 The IPython Development Team
38 # Copyright (C) 2010 The IPython Development Team
39 #
39 #
40 # Distributed under the terms of the BSD License. The full license is in
40 # Distributed under the terms of the BSD License. The full license is in
41 # the file COPYING, distributed as part of this software.
41 # the file COPYING, distributed as part of this software.
42 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
43
43
44
44
45 from datetime import datetime
45 from datetime import datetime
46
46
47 from IPython.config.application import Application
47 from IPython.config.configurable import Configurable
48 from IPython.config.configurable import Configurable
48
49
49 from IPython.utils.traitlets import Dict, Unicode, Instance
50 from IPython.utils.traitlets import Dict, Unicode, Instance
50
51
51 filters = {
52 filters = {
52 '$lt' : lambda a,b: a < b,
53 '$lt' : lambda a,b: a < b,
53 '$gt' : lambda a,b: b > a,
54 '$gt' : lambda a,b: b > a,
54 '$eq' : lambda a,b: a == b,
55 '$eq' : lambda a,b: a == b,
55 '$ne' : lambda a,b: a != b,
56 '$ne' : lambda a,b: a != b,
56 '$lte': lambda a,b: a <= b,
57 '$lte': lambda a,b: a <= b,
57 '$gte': lambda a,b: a >= b,
58 '$gte': lambda a,b: a >= b,
58 '$in' : lambda a,b: a in b,
59 '$in' : lambda a,b: a in b,
59 '$nin': lambda a,b: a not in b,
60 '$nin': lambda a,b: a not in b,
60 '$all': lambda a,b: all([ a in bb for bb in b ]),
61 '$all': lambda a,b: all([ a in bb for bb in b ]),
61 '$mod': lambda a,b: a%b[0] == b[1],
62 '$mod': lambda a,b: a%b[0] == b[1],
62 '$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
63 '$exists' : lambda a,b: (b and a is not None) or (a is None and not b)
63 }
64 }
64
65
65
66
66 class CompositeFilter(object):
67 class CompositeFilter(object):
67 """Composite filter for matching multiple properties."""
68 """Composite filter for matching multiple properties."""
68
69
69 def __init__(self, dikt):
70 def __init__(self, dikt):
70 self.tests = []
71 self.tests = []
71 self.values = []
72 self.values = []
72 for key, value in dikt.iteritems():
73 for key, value in dikt.iteritems():
73 self.tests.append(filters[key])
74 self.tests.append(filters[key])
74 self.values.append(value)
75 self.values.append(value)
75
76
76 def __call__(self, value):
77 def __call__(self, value):
77 for test,check in zip(self.tests, self.values):
78 for test,check in zip(self.tests, self.values):
78 if not test(value, check):
79 if not test(value, check):
79 return False
80 return False
80 return True
81 return True
81
82
82 class BaseDB(Configurable):
83 class BaseDB(Configurable):
83 """Empty Parent class so traitlets work on DB."""
84 """Empty Parent class so traitlets work on DB."""
84 # base configurable traits:
85 # base configurable traits:
85 session = Unicode("")
86 session = Unicode("")
86 log = Instance('logging.Logger', ('root',))
87 log = Instance('logging.Logger')
88 def _log_default(self):
89 return Application.instance().log
87
90
88 class DictDB(BaseDB):
91 class DictDB(BaseDB):
89 """Basic in-memory dict-based object for saving Task Records.
92 """Basic in-memory dict-based object for saving Task Records.
90
93
91 This is the first object to present the DB interface
94 This is the first object to present the DB interface
92 for logging tasks out of memory.
95 for logging tasks out of memory.
93
96
94 The interface is based on MongoDB, so adding a MongoDB
97 The interface is based on MongoDB, so adding a MongoDB
95 backend should be straightforward.
98 backend should be straightforward.
96 """
99 """
97
100
98 _records = Dict()
101 _records = Dict()
99
102
100 def _match_one(self, rec, tests):
103 def _match_one(self, rec, tests):
101 """Check if a specific record matches tests."""
104 """Check if a specific record matches tests."""
102 for key,test in tests.iteritems():
105 for key,test in tests.iteritems():
103 if not test(rec.get(key, None)):
106 if not test(rec.get(key, None)):
104 return False
107 return False
105 return True
108 return True
106
109
107 def _match(self, check):
110 def _match(self, check):
108 """Find all the matches for a check dict."""
111 """Find all the matches for a check dict."""
109 matches = []
112 matches = []
110 tests = {}
113 tests = {}
111 for k,v in check.iteritems():
114 for k,v in check.iteritems():
112 if isinstance(v, dict):
115 if isinstance(v, dict):
113 tests[k] = CompositeFilter(v)
116 tests[k] = CompositeFilter(v)
114 else:
117 else:
115 tests[k] = lambda o: o==v
118 tests[k] = lambda o: o==v
116
119
117 for rec in self._records.itervalues():
120 for rec in self._records.itervalues():
118 if self._match_one(rec, tests):
121 if self._match_one(rec, tests):
119 matches.append(rec)
122 matches.append(rec)
120 return matches
123 return matches
121
124
122 def _extract_subdict(self, rec, keys):
125 def _extract_subdict(self, rec, keys):
123 """extract subdict of keys"""
126 """extract subdict of keys"""
124 d = {}
127 d = {}
125 d['msg_id'] = rec['msg_id']
128 d['msg_id'] = rec['msg_id']
126 for key in keys:
129 for key in keys:
127 d[key] = rec[key]
130 d[key] = rec[key]
128 return d
131 return d
129
132
130 def add_record(self, msg_id, rec):
133 def add_record(self, msg_id, rec):
131 """Add a new Task Record, by msg_id."""
134 """Add a new Task Record, by msg_id."""
132 if self._records.has_key(msg_id):
135 if self._records.has_key(msg_id):
133 raise KeyError("Already have msg_id %r"%(msg_id))
136 raise KeyError("Already have msg_id %r"%(msg_id))
134 self._records[msg_id] = rec
137 self._records[msg_id] = rec
135
138
136 def get_record(self, msg_id):
139 def get_record(self, msg_id):
137 """Get a specific Task Record, by msg_id."""
140 """Get a specific Task Record, by msg_id."""
138 if not self._records.has_key(msg_id):
141 if not self._records.has_key(msg_id):
139 raise KeyError("No such msg_id %r"%(msg_id))
142 raise KeyError("No such msg_id %r"%(msg_id))
140 return self._records[msg_id]
143 return self._records[msg_id]
141
144
142 def update_record(self, msg_id, rec):
145 def update_record(self, msg_id, rec):
143 """Update the data in an existing record."""
146 """Update the data in an existing record."""
144 self._records[msg_id].update(rec)
147 self._records[msg_id].update(rec)
145
148
146 def drop_matching_records(self, check):
149 def drop_matching_records(self, check):
147 """Remove a record from the DB."""
150 """Remove a record from the DB."""
148 matches = self._match(check)
151 matches = self._match(check)
149 for m in matches:
152 for m in matches:
150 del self._records[m['msg_id']]
153 del self._records[m['msg_id']]
151
154
152 def drop_record(self, msg_id):
155 def drop_record(self, msg_id):
153 """Remove a record from the DB."""
156 """Remove a record from the DB."""
154 del self._records[msg_id]
157 del self._records[msg_id]
155
158
156
159
157 def find_records(self, check, keys=None):
160 def find_records(self, check, keys=None):
158 """Find records matching a query dict, optionally extracting subset of keys.
161 """Find records matching a query dict, optionally extracting subset of keys.
159
162
160 Returns dict keyed by msg_id of matching records.
163 Returns dict keyed by msg_id of matching records.
161
164
162 Parameters
165 Parameters
163 ----------
166 ----------
164
167
165 check: dict
168 check: dict
166 mongodb-style query argument
169 mongodb-style query argument
167 keys: list of strs [optional]
170 keys: list of strs [optional]
168 if specified, the subset of keys to extract. msg_id will *always* be
171 if specified, the subset of keys to extract. msg_id will *always* be
169 included.
172 included.
170 """
173 """
171 matches = self._match(check)
174 matches = self._match(check)
172 if keys:
175 if keys:
173 return [ self._extract_subdict(rec, keys) for rec in matches ]
176 return [ self._extract_subdict(rec, keys) for rec in matches ]
174 else:
177 else:
175 return matches
178 return matches
176
179
177
180
178 def get_history(self):
181 def get_history(self):
179 """get all msg_ids, ordered by time submitted."""
182 """get all msg_ids, ordered by time submitted."""
180 msg_ids = self._records.keys()
183 msg_ids = self._records.keys()
181 return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])
184 return sorted(msg_ids, key=lambda m: self._records[m]['submitted'])
@@ -1,166 +1,170 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
4 and hearts are tracked based on their XREQ identities.
4 and hearts are tracked based on their XREQ identities.
5 """
5 """
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2010-2011 The IPython Development Team
7 # Copyright (C) 2010-2011 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 from __future__ import print_function
13 from __future__ import print_function
14 import time
14 import time
15 import uuid
15 import uuid
16
16
17 import zmq
17 import zmq
18 from zmq.devices import ThreadDevice
18 from zmq.devices import ThreadDevice
19 from zmq.eventloop import ioloop, zmqstream
19 from zmq.eventloop import ioloop, zmqstream
20
20
21 from IPython.config.application import Application
21 from IPython.config.configurable import Configurable
22 from IPython.config.configurable import Configurable
22 from IPython.utils.traitlets import Set, Instance, CFloat
23 from IPython.utils.traitlets import Set, Instance, CFloat
23
24
24 class Heart(object):
25 class Heart(object):
25 """A basic heart object for responding to a HeartMonitor.
26 """A basic heart object for responding to a HeartMonitor.
26 This is a simple wrapper with defaults for the most common
27 This is a simple wrapper with defaults for the most common
27 Device model for responding to heartbeats.
28 Device model for responding to heartbeats.
28
29
29 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
30 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
30 SUB/XREQ for in/out.
31 SUB/XREQ for in/out.
31
32
32 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
33 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
33 device=None
34 device=None
34 id=None
35 id=None
35 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None):
36 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None):
36 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
37 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
37 self.device.daemon=True
38 self.device.daemon=True
38 self.device.connect_in(in_addr)
39 self.device.connect_in(in_addr)
39 self.device.connect_out(out_addr)
40 self.device.connect_out(out_addr)
40 if in_type == zmq.SUB:
41 if in_type == zmq.SUB:
41 self.device.setsockopt_in(zmq.SUBSCRIBE, "")
42 self.device.setsockopt_in(zmq.SUBSCRIBE, "")
42 if heart_id is None:
43 if heart_id is None:
43 heart_id = str(uuid.uuid4())
44 heart_id = str(uuid.uuid4())
44 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
45 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
45 self.id = heart_id
46 self.id = heart_id
46
47
47 def start(self):
48 def start(self):
48 return self.device.start()
49 return self.device.start()
49
50
50 class HeartMonitor(Configurable):
51 class HeartMonitor(Configurable):
51 """A basic HeartMonitor class
52 """A basic HeartMonitor class
52 pingstream: a PUB stream
53 pingstream: a PUB stream
53 pongstream: an XREP stream
54 pongstream: an XREP stream
54 period: the period of the heartbeat in milliseconds"""
55 period: the period of the heartbeat in milliseconds"""
55
56
56 period=CFloat(1000, config=True,
57 period=CFloat(1000, config=True,
57 help='The frequency at which the Hub pings the engines for heartbeats '
58 help='The frequency at which the Hub pings the engines for heartbeats '
58 ' (in ms) [default: 100]',
59 ' (in ms) [default: 100]',
59 )
60 )
60
61
61 log = Instance('logging.Logger', ('root',))
62 log = Instance('logging.Logger')
63 def _log_default(self):
64 return Application.instance().log
65
62 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
66 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
63 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
67 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
64 loop = Instance('zmq.eventloop.ioloop.IOLoop')
68 loop = Instance('zmq.eventloop.ioloop.IOLoop')
65 def _loop_default(self):
69 def _loop_default(self):
66 return ioloop.IOLoop.instance()
70 return ioloop.IOLoop.instance()
67
71
68 # not settable:
72 # not settable:
69 hearts=Set()
73 hearts=Set()
70 responses=Set()
74 responses=Set()
71 on_probation=Set()
75 on_probation=Set()
72 last_ping=CFloat(0)
76 last_ping=CFloat(0)
73 _new_handlers = Set()
77 _new_handlers = Set()
74 _failure_handlers = Set()
78 _failure_handlers = Set()
75 lifetime = CFloat(0)
79 lifetime = CFloat(0)
76 tic = CFloat(0)
80 tic = CFloat(0)
77
81
78 def __init__(self, **kwargs):
82 def __init__(self, **kwargs):
79 super(HeartMonitor, self).__init__(**kwargs)
83 super(HeartMonitor, self).__init__(**kwargs)
80
84
81 self.pongstream.on_recv(self.handle_pong)
85 self.pongstream.on_recv(self.handle_pong)
82
86
83 def start(self):
87 def start(self):
84 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
88 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
85 self.caller.start()
89 self.caller.start()
86
90
87 def add_new_heart_handler(self, handler):
91 def add_new_heart_handler(self, handler):
88 """add a new handler for new hearts"""
92 """add a new handler for new hearts"""
89 self.log.debug("heartbeat::new_heart_handler: %s"%handler)
93 self.log.debug("heartbeat::new_heart_handler: %s"%handler)
90 self._new_handlers.add(handler)
94 self._new_handlers.add(handler)
91
95
92 def add_heart_failure_handler(self, handler):
96 def add_heart_failure_handler(self, handler):
93 """add a new handler for heart failure"""
97 """add a new handler for heart failure"""
94 self.log.debug("heartbeat::new heart failure handler: %s"%handler)
98 self.log.debug("heartbeat::new heart failure handler: %s"%handler)
95 self._failure_handlers.add(handler)
99 self._failure_handlers.add(handler)
96
100
97 def beat(self):
101 def beat(self):
98 self.pongstream.flush()
102 self.pongstream.flush()
99 self.last_ping = self.lifetime
103 self.last_ping = self.lifetime
100
104
101 toc = time.time()
105 toc = time.time()
102 self.lifetime += toc-self.tic
106 self.lifetime += toc-self.tic
103 self.tic = toc
107 self.tic = toc
104 # self.log.debug("heartbeat::%s"%self.lifetime)
108 # self.log.debug("heartbeat::%s"%self.lifetime)
105 goodhearts = self.hearts.intersection(self.responses)
109 goodhearts = self.hearts.intersection(self.responses)
106 missed_beats = self.hearts.difference(goodhearts)
110 missed_beats = self.hearts.difference(goodhearts)
107 heartfailures = self.on_probation.intersection(missed_beats)
111 heartfailures = self.on_probation.intersection(missed_beats)
108 newhearts = self.responses.difference(goodhearts)
112 newhearts = self.responses.difference(goodhearts)
109 map(self.handle_new_heart, newhearts)
113 map(self.handle_new_heart, newhearts)
110 map(self.handle_heart_failure, heartfailures)
114 map(self.handle_heart_failure, heartfailures)
111 self.on_probation = missed_beats.intersection(self.hearts)
115 self.on_probation = missed_beats.intersection(self.hearts)
112 self.responses = set()
116 self.responses = set()
113 # print self.on_probation, self.hearts
117 # print self.on_probation, self.hearts
114 # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
118 # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
115 self.pingstream.send(str(self.lifetime))
119 self.pingstream.send(str(self.lifetime))
116
120
117 def handle_new_heart(self, heart):
121 def handle_new_heart(self, heart):
118 if self._new_handlers:
122 if self._new_handlers:
119 for handler in self._new_handlers:
123 for handler in self._new_handlers:
120 handler(heart)
124 handler(heart)
121 else:
125 else:
122 self.log.info("heartbeat::yay, got new heart %s!"%heart)
126 self.log.info("heartbeat::yay, got new heart %s!"%heart)
123 self.hearts.add(heart)
127 self.hearts.add(heart)
124
128
125 def handle_heart_failure(self, heart):
129 def handle_heart_failure(self, heart):
126 if self._failure_handlers:
130 if self._failure_handlers:
127 for handler in self._failure_handlers:
131 for handler in self._failure_handlers:
128 try:
132 try:
129 handler(heart)
133 handler(heart)
130 except Exception as e:
134 except Exception as e:
131 self.log.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
135 self.log.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
132 pass
136 pass
133 else:
137 else:
134 self.log.info("heartbeat::Heart %s failed :("%heart)
138 self.log.info("heartbeat::Heart %s failed :("%heart)
135 self.hearts.remove(heart)
139 self.hearts.remove(heart)
136
140
137
141
138 def handle_pong(self, msg):
142 def handle_pong(self, msg):
139 "a heart just beat"
143 "a heart just beat"
140 if msg[1] == str(self.lifetime):
144 if msg[1] == str(self.lifetime):
141 delta = time.time()-self.tic
145 delta = time.time()-self.tic
142 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
146 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
143 self.responses.add(msg[0])
147 self.responses.add(msg[0])
144 elif msg[1] == str(self.last_ping):
148 elif msg[1] == str(self.last_ping):
145 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
149 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
146 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
150 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
147 self.responses.add(msg[0])
151 self.responses.add(msg[0])
148 else:
152 else:
149 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
153 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
150 (msg[1],self.lifetime))
154 (msg[1],self.lifetime))
151
155
152
156
153 if __name__ == '__main__':
157 if __name__ == '__main__':
154 loop = ioloop.IOLoop.instance()
158 loop = ioloop.IOLoop.instance()
155 context = zmq.Context()
159 context = zmq.Context()
156 pub = context.socket(zmq.PUB)
160 pub = context.socket(zmq.PUB)
157 pub.bind('tcp://127.0.0.1:5555')
161 pub.bind('tcp://127.0.0.1:5555')
158 xrep = context.socket(zmq.XREP)
162 xrep = context.socket(zmq.XREP)
159 xrep.bind('tcp://127.0.0.1:5556')
163 xrep.bind('tcp://127.0.0.1:5556')
160
164
161 outstream = zmqstream.ZMQStream(pub, loop)
165 outstream = zmqstream.ZMQStream(pub, loop)
162 instream = zmqstream.ZMQStream(xrep, loop)
166 instream = zmqstream.ZMQStream(xrep, loop)
163
167
164 hb = HeartMonitor(loop, outstream, instream)
168 hb = HeartMonitor(loop, outstream, instream)
165
169
166 loop.start()
170 loop.start()
@@ -1,535 +1,538 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """edited session.py to work with streams, and move msg_type to the header
2 """edited session.py to work with streams, and move msg_type to the header
3 """
3 """
4 #-----------------------------------------------------------------------------
4 #-----------------------------------------------------------------------------
5 # Copyright (C) 2010-2011 The IPython Development Team
5 # Copyright (C) 2010-2011 The IPython Development Team
6 #
6 #
7 # Distributed under the terms of the BSD License. The full license is in
7 # Distributed under the terms of the BSD License. The full license is in
8 # the file COPYING, distributed as part of this software.
8 # the file COPYING, distributed as part of this software.
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10
10
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Imports
12 # Imports
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
14
15 import hmac
15 import hmac
16 import logging
16 import logging
17 import os
17 import os
18 import pprint
18 import pprint
19 import uuid
19 import uuid
20 from datetime import datetime
20 from datetime import datetime
21
21
22 try:
22 try:
23 import cPickle
23 import cPickle
24 pickle = cPickle
24 pickle = cPickle
25 except:
25 except:
26 cPickle = None
26 cPickle = None
27 import pickle
27 import pickle
28
28
29 import zmq
29 import zmq
30 from zmq.utils import jsonapi
30 from zmq.utils import jsonapi
31 from zmq.eventloop.ioloop import IOLoop
31 from zmq.eventloop.ioloop import IOLoop
32 from zmq.eventloop.zmqstream import ZMQStream
32 from zmq.eventloop.zmqstream import ZMQStream
33
33
34 from IPython.config.application import Application
34 from IPython.config.configurable import Configurable
35 from IPython.config.configurable import Configurable
35 from IPython.utils.importstring import import_item
36 from IPython.utils.importstring import import_item
36 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
37 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
37 from IPython.utils.traitlets import CStr, Unicode, Bool, Any, Instance, Set
38 from IPython.utils.traitlets import CStr, Unicode, Bool, Any, Instance, Set
38
39
39 #-----------------------------------------------------------------------------
40 #-----------------------------------------------------------------------------
40 # utility functions
41 # utility functions
41 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
42
43
43 def squash_unicode(obj):
44 def squash_unicode(obj):
44 """coerce unicode back to bytestrings."""
45 """coerce unicode back to bytestrings."""
45 if isinstance(obj,dict):
46 if isinstance(obj,dict):
46 for key in obj.keys():
47 for key in obj.keys():
47 obj[key] = squash_unicode(obj[key])
48 obj[key] = squash_unicode(obj[key])
48 if isinstance(key, unicode):
49 if isinstance(key, unicode):
49 obj[squash_unicode(key)] = obj.pop(key)
50 obj[squash_unicode(key)] = obj.pop(key)
50 elif isinstance(obj, list):
51 elif isinstance(obj, list):
51 for i,v in enumerate(obj):
52 for i,v in enumerate(obj):
52 obj[i] = squash_unicode(v)
53 obj[i] = squash_unicode(v)
53 elif isinstance(obj, unicode):
54 elif isinstance(obj, unicode):
54 obj = obj.encode('utf8')
55 obj = obj.encode('utf8')
55 return obj
56 return obj
56
57
57 #-----------------------------------------------------------------------------
58 #-----------------------------------------------------------------------------
58 # globals and defaults
59 # globals and defaults
59 #-----------------------------------------------------------------------------
60 #-----------------------------------------------------------------------------
60 key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default'
61 key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default'
61 json_packer = lambda obj: jsonapi.dumps(obj, **{key:date_default})
62 json_packer = lambda obj: jsonapi.dumps(obj, **{key:date_default})
62 json_unpacker = lambda s: squash_unicode(extract_dates(jsonapi.loads(s)))
63 json_unpacker = lambda s: squash_unicode(extract_dates(jsonapi.loads(s)))
63
64
64 pickle_packer = lambda o: pickle.dumps(o,-1)
65 pickle_packer = lambda o: pickle.dumps(o,-1)
65 pickle_unpacker = pickle.loads
66 pickle_unpacker = pickle.loads
66
67
67 default_packer = json_packer
68 default_packer = json_packer
68 default_unpacker = json_unpacker
69 default_unpacker = json_unpacker
69
70
70
71
71 DELIM="<IDS|MSG>"
72 DELIM="<IDS|MSG>"
72
73
73 #-----------------------------------------------------------------------------
74 #-----------------------------------------------------------------------------
74 # Classes
75 # Classes
75 #-----------------------------------------------------------------------------
76 #-----------------------------------------------------------------------------
76
77
77 class SessionFactory(Configurable):
78 class SessionFactory(Configurable):
78 """The Base class for configurables that have a Session, Context, logger,
79 """The Base class for configurables that have a Session, Context, logger,
79 and IOLoop.
80 and IOLoop.
80 """
81 """
81
82
82 log = Instance('logging.Logger', ('', logging.WARN))
83 log = Instance('logging.Logger')
84 def _log_default(self):
85 return Application.instance().log
83
86
84 logname = Unicode('')
87 logname = Unicode('')
85 def _logname_changed(self, name, old, new):
88 def _logname_changed(self, name, old, new):
86 self.log = logging.getLogger(new)
89 self.log = logging.getLogger(new)
87
90
88 # not configurable:
91 # not configurable:
89 context = Instance('zmq.Context')
92 context = Instance('zmq.Context')
90 def _context_default(self):
93 def _context_default(self):
91 return zmq.Context.instance()
94 return zmq.Context.instance()
92
95
93 session = Instance('IPython.zmq.session.Session')
96 session = Instance('IPython.zmq.session.Session')
94
97
95 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
98 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
96 def _loop_default(self):
99 def _loop_default(self):
97 return IOLoop.instance()
100 return IOLoop.instance()
98
101
99 def __init__(self, **kwargs):
102 def __init__(self, **kwargs):
100 super(SessionFactory, self).__init__(**kwargs)
103 super(SessionFactory, self).__init__(**kwargs)
101
104
102 if self.session is None:
105 if self.session is None:
103 # construct the session
106 # construct the session
104 self.session = Session(**kwargs)
107 self.session = Session(**kwargs)
105
108
106
109
107 class Message(object):
110 class Message(object):
108 """A simple message object that maps dict keys to attributes.
111 """A simple message object that maps dict keys to attributes.
109
112
110 A Message can be created from a dict and a dict from a Message instance
113 A Message can be created from a dict and a dict from a Message instance
111 simply by calling dict(msg_obj)."""
114 simply by calling dict(msg_obj)."""
112
115
113 def __init__(self, msg_dict):
116 def __init__(self, msg_dict):
114 dct = self.__dict__
117 dct = self.__dict__
115 for k, v in dict(msg_dict).iteritems():
118 for k, v in dict(msg_dict).iteritems():
116 if isinstance(v, dict):
119 if isinstance(v, dict):
117 v = Message(v)
120 v = Message(v)
118 dct[k] = v
121 dct[k] = v
119
122
120 # Having this iterator lets dict(msg_obj) work out of the box.
123 # Having this iterator lets dict(msg_obj) work out of the box.
121 def __iter__(self):
124 def __iter__(self):
122 return iter(self.__dict__.iteritems())
125 return iter(self.__dict__.iteritems())
123
126
124 def __repr__(self):
127 def __repr__(self):
125 return repr(self.__dict__)
128 return repr(self.__dict__)
126
129
127 def __str__(self):
130 def __str__(self):
128 return pprint.pformat(self.__dict__)
131 return pprint.pformat(self.__dict__)
129
132
130 def __contains__(self, k):
133 def __contains__(self, k):
131 return k in self.__dict__
134 return k in self.__dict__
132
135
133 def __getitem__(self, k):
136 def __getitem__(self, k):
134 return self.__dict__[k]
137 return self.__dict__[k]
135
138
136
139
137 def msg_header(msg_id, msg_type, username, session):
140 def msg_header(msg_id, msg_type, username, session):
138 date = datetime.now()
141 date = datetime.now()
139 return locals()
142 return locals()
140
143
141 def extract_header(msg_or_header):
144 def extract_header(msg_or_header):
142 """Given a message or header, return the header."""
145 """Given a message or header, return the header."""
143 if not msg_or_header:
146 if not msg_or_header:
144 return {}
147 return {}
145 try:
148 try:
146 # See if msg_or_header is the entire message.
149 # See if msg_or_header is the entire message.
147 h = msg_or_header['header']
150 h = msg_or_header['header']
148 except KeyError:
151 except KeyError:
149 try:
152 try:
150 # See if msg_or_header is just the header
153 # See if msg_or_header is just the header
151 h = msg_or_header['msg_id']
154 h = msg_or_header['msg_id']
152 except KeyError:
155 except KeyError:
153 raise
156 raise
154 else:
157 else:
155 h = msg_or_header
158 h = msg_or_header
156 if not isinstance(h, dict):
159 if not isinstance(h, dict):
157 h = dict(h)
160 h = dict(h)
158 return h
161 return h
159
162
160 class Session(Configurable):
163 class Session(Configurable):
161 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
164 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
162 debug=Bool(False, config=True, help="""Debug output in the Session""")
165 debug=Bool(False, config=True, help="""Debug output in the Session""")
163 packer = Unicode('json',config=True,
166 packer = Unicode('json',config=True,
164 help="""The name of the packer for serializing messages.
167 help="""The name of the packer for serializing messages.
165 Should be one of 'json', 'pickle', or an import name
168 Should be one of 'json', 'pickle', or an import name
166 for a custom callable serializer.""")
169 for a custom callable serializer.""")
167 def _packer_changed(self, name, old, new):
170 def _packer_changed(self, name, old, new):
168 if new.lower() == 'json':
171 if new.lower() == 'json':
169 self.pack = json_packer
172 self.pack = json_packer
170 self.unpack = json_unpacker
173 self.unpack = json_unpacker
171 elif new.lower() == 'pickle':
174 elif new.lower() == 'pickle':
172 self.pack = pickle_packer
175 self.pack = pickle_packer
173 self.unpack = pickle_unpacker
176 self.unpack = pickle_unpacker
174 else:
177 else:
175 self.pack = import_item(str(new))
178 self.pack = import_item(str(new))
176
179
177 unpacker = Unicode('json', config=True,
180 unpacker = Unicode('json', config=True,
178 help="""The name of the unpacker for unserializing messages.
181 help="""The name of the unpacker for unserializing messages.
179 Only used with custom functions for `packer`.""")
182 Only used with custom functions for `packer`.""")
180 def _unpacker_changed(self, name, old, new):
183 def _unpacker_changed(self, name, old, new):
181 if new.lower() == 'json':
184 if new.lower() == 'json':
182 self.pack = json_packer
185 self.pack = json_packer
183 self.unpack = json_unpacker
186 self.unpack = json_unpacker
184 elif new.lower() == 'pickle':
187 elif new.lower() == 'pickle':
185 self.pack = pickle_packer
188 self.pack = pickle_packer
186 self.unpack = pickle_unpacker
189 self.unpack = pickle_unpacker
187 else:
190 else:
188 self.unpack = import_item(str(new))
191 self.unpack = import_item(str(new))
189
192
190 session = CStr('', config=True,
193 session = CStr('', config=True,
191 help="""The UUID identifying this session.""")
194 help="""The UUID identifying this session.""")
192 def _session_default(self):
195 def _session_default(self):
193 return bytes(uuid.uuid4())
196 return bytes(uuid.uuid4())
194 username = Unicode(os.environ.get('USER','username'), config=True,
197 username = Unicode(os.environ.get('USER','username'), config=True,
195 help="""Username for the Session. Default is your system username.""")
198 help="""Username for the Session. Default is your system username.""")
196
199
197 # message signature related traits:
200 # message signature related traits:
198 key = CStr('', config=True,
201 key = CStr('', config=True,
199 help="""execution key, for extra authentication.""")
202 help="""execution key, for extra authentication.""")
200 def _key_changed(self, name, old, new):
203 def _key_changed(self, name, old, new):
201 if new:
204 if new:
202 self.auth = hmac.HMAC(new)
205 self.auth = hmac.HMAC(new)
203 else:
206 else:
204 self.auth = None
207 self.auth = None
205 auth = Instance(hmac.HMAC)
208 auth = Instance(hmac.HMAC)
206 counters = Instance('collections.defaultdict', (int,))
209 counters = Instance('collections.defaultdict', (int,))
207 digest_history = Set()
210 digest_history = Set()
208
211
209 keyfile = Unicode('', config=True,
212 keyfile = Unicode('', config=True,
210 help="""path to file containing execution key.""")
213 help="""path to file containing execution key.""")
211 def _keyfile_changed(self, name, old, new):
214 def _keyfile_changed(self, name, old, new):
212 with open(new, 'rb') as f:
215 with open(new, 'rb') as f:
213 self.key = f.read().strip()
216 self.key = f.read().strip()
214
217
215 pack = Any(default_packer) # the actual packer function
218 pack = Any(default_packer) # the actual packer function
216 def _pack_changed(self, name, old, new):
219 def _pack_changed(self, name, old, new):
217 if not callable(new):
220 if not callable(new):
218 raise TypeError("packer must be callable, not %s"%type(new))
221 raise TypeError("packer must be callable, not %s"%type(new))
219
222
220 unpack = Any(default_unpacker) # the actual packer function
223 unpack = Any(default_unpacker) # the actual packer function
221 def _unpack_changed(self, name, old, new):
224 def _unpack_changed(self, name, old, new):
222 # unpacker is not checked - it is assumed to be
225 # unpacker is not checked - it is assumed to be
223 if not callable(new):
226 if not callable(new):
224 raise TypeError("unpacker must be callable, not %s"%type(new))
227 raise TypeError("unpacker must be callable, not %s"%type(new))
225
228
226 def __init__(self, **kwargs):
229 def __init__(self, **kwargs):
227 super(Session, self).__init__(**kwargs)
230 super(Session, self).__init__(**kwargs)
228 self._check_packers()
231 self._check_packers()
229 self.none = self.pack({})
232 self.none = self.pack({})
230
233
231 @property
234 @property
232 def msg_id(self):
235 def msg_id(self):
233 """always return new uuid"""
236 """always return new uuid"""
234 return str(uuid.uuid4())
237 return str(uuid.uuid4())
235
238
236 def _check_packers(self):
239 def _check_packers(self):
237 """check packers for binary data and datetime support."""
240 """check packers for binary data and datetime support."""
238 pack = self.pack
241 pack = self.pack
239 unpack = self.unpack
242 unpack = self.unpack
240
243
241 # check simple serialization
244 # check simple serialization
242 msg = dict(a=[1,'hi'])
245 msg = dict(a=[1,'hi'])
243 try:
246 try:
244 packed = pack(msg)
247 packed = pack(msg)
245 except Exception:
248 except Exception:
246 raise ValueError("packer could not serialize a simple message")
249 raise ValueError("packer could not serialize a simple message")
247
250
248 # ensure packed message is bytes
251 # ensure packed message is bytes
249 if not isinstance(packed, bytes):
252 if not isinstance(packed, bytes):
250 raise ValueError("message packed to %r, but bytes are required"%type(packed))
253 raise ValueError("message packed to %r, but bytes are required"%type(packed))
251
254
252 # check that unpack is pack's inverse
255 # check that unpack is pack's inverse
253 try:
256 try:
254 unpacked = unpack(packed)
257 unpacked = unpack(packed)
255 except Exception:
258 except Exception:
256 raise ValueError("unpacker could not handle the packer's output")
259 raise ValueError("unpacker could not handle the packer's output")
257
260
258 # check datetime support
261 # check datetime support
259 msg = dict(t=datetime.now())
262 msg = dict(t=datetime.now())
260 try:
263 try:
261 unpacked = unpack(pack(msg))
264 unpacked = unpack(pack(msg))
262 except Exception:
265 except Exception:
263 self.pack = lambda o: pack(squash_dates(o))
266 self.pack = lambda o: pack(squash_dates(o))
264 self.unpack = lambda s: extract_dates(unpack(s))
267 self.unpack = lambda s: extract_dates(unpack(s))
265
268
266 def msg_header(self, msg_type):
269 def msg_header(self, msg_type):
267 return msg_header(self.msg_id, msg_type, self.username, self.session)
270 return msg_header(self.msg_id, msg_type, self.username, self.session)
268
271
269 def msg(self, msg_type, content=None, parent=None, subheader=None):
272 def msg(self, msg_type, content=None, parent=None, subheader=None):
270 msg = {}
273 msg = {}
271 msg['header'] = self.msg_header(msg_type)
274 msg['header'] = self.msg_header(msg_type)
272 msg['msg_id'] = msg['header']['msg_id']
275 msg['msg_id'] = msg['header']['msg_id']
273 msg['parent_header'] = {} if parent is None else extract_header(parent)
276 msg['parent_header'] = {} if parent is None else extract_header(parent)
274 msg['msg_type'] = msg_type
277 msg['msg_type'] = msg_type
275 msg['content'] = {} if content is None else content
278 msg['content'] = {} if content is None else content
276 sub = {} if subheader is None else subheader
279 sub = {} if subheader is None else subheader
277 msg['header'].update(sub)
280 msg['header'].update(sub)
278 return msg
281 return msg
279
282
280 def sign(self, msg):
283 def sign(self, msg):
281 """Sign a message with HMAC digest. If no auth, return b''."""
284 """Sign a message with HMAC digest. If no auth, return b''."""
282 if self.auth is None:
285 if self.auth is None:
283 return b''
286 return b''
284 h = self.auth.copy()
287 h = self.auth.copy()
285 for m in msg:
288 for m in msg:
286 h.update(m)
289 h.update(m)
287 return h.hexdigest()
290 return h.hexdigest()
288
291
289 def serialize(self, msg, ident=None):
292 def serialize(self, msg, ident=None):
290 content = msg.get('content', {})
293 content = msg.get('content', {})
291 if content is None:
294 if content is None:
292 content = self.none
295 content = self.none
293 elif isinstance(content, dict):
296 elif isinstance(content, dict):
294 content = self.pack(content)
297 content = self.pack(content)
295 elif isinstance(content, bytes):
298 elif isinstance(content, bytes):
296 # content is already packed, as in a relayed message
299 # content is already packed, as in a relayed message
297 pass
300 pass
298 elif isinstance(content, unicode):
301 elif isinstance(content, unicode):
299 # should be bytes, but JSON often spits out unicode
302 # should be bytes, but JSON often spits out unicode
300 content = content.encode('utf8')
303 content = content.encode('utf8')
301 else:
304 else:
302 raise TypeError("Content incorrect type: %s"%type(content))
305 raise TypeError("Content incorrect type: %s"%type(content))
303
306
304 real_message = [self.pack(msg['header']),
307 real_message = [self.pack(msg['header']),
305 self.pack(msg['parent_header']),
308 self.pack(msg['parent_header']),
306 content
309 content
307 ]
310 ]
308
311
309 to_send = []
312 to_send = []
310
313
311 if isinstance(ident, list):
314 if isinstance(ident, list):
312 # accept list of idents
315 # accept list of idents
313 to_send.extend(ident)
316 to_send.extend(ident)
314 elif ident is not None:
317 elif ident is not None:
315 to_send.append(ident)
318 to_send.append(ident)
316 to_send.append(DELIM)
319 to_send.append(DELIM)
317
320
318 signature = self.sign(real_message)
321 signature = self.sign(real_message)
319 to_send.append(signature)
322 to_send.append(signature)
320
323
321 to_send.extend(real_message)
324 to_send.extend(real_message)
322
325
323 return to_send
326 return to_send
324
327
325 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
328 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
326 buffers=None, subheader=None, track=False):
329 buffers=None, subheader=None, track=False):
327 """Build and send a message via stream or socket.
330 """Build and send a message via stream or socket.
328
331
329 Parameters
332 Parameters
330 ----------
333 ----------
331
334
332 stream : zmq.Socket or ZMQStream
335 stream : zmq.Socket or ZMQStream
333 the socket-like object used to send the data
336 the socket-like object used to send the data
334 msg_or_type : str or Message/dict
337 msg_or_type : str or Message/dict
335 Normally, msg_or_type will be a msg_type unless a message is being sent more
338 Normally, msg_or_type will be a msg_type unless a message is being sent more
336 than once.
339 than once.
337
340
338 content : dict or None
341 content : dict or None
339 the content of the message (ignored if msg_or_type is a message)
342 the content of the message (ignored if msg_or_type is a message)
340 parent : Message or dict or None
343 parent : Message or dict or None
341 the parent or parent header describing the parent of this message
344 the parent or parent header describing the parent of this message
342 ident : bytes or list of bytes
345 ident : bytes or list of bytes
343 the zmq.IDENTITY routing path
346 the zmq.IDENTITY routing path
344 subheader : dict or None
347 subheader : dict or None
345 extra header keys for this message's header
348 extra header keys for this message's header
346 buffers : list or None
349 buffers : list or None
347 the already-serialized buffers to be appended to the message
350 the already-serialized buffers to be appended to the message
348 track : bool
351 track : bool
349 whether to track. Only for use with Sockets,
352 whether to track. Only for use with Sockets,
350 because ZMQStream objects cannot track messages.
353 because ZMQStream objects cannot track messages.
351
354
352 Returns
355 Returns
353 -------
356 -------
354 msg : message dict
357 msg : message dict
355 the constructed message
358 the constructed message
356 (msg,tracker) : (message dict, MessageTracker)
359 (msg,tracker) : (message dict, MessageTracker)
357 if track=True, then a 2-tuple will be returned,
360 if track=True, then a 2-tuple will be returned,
358 the first element being the constructed
361 the first element being the constructed
359 message, and the second being the MessageTracker
362 message, and the second being the MessageTracker
360
363
361 """
364 """
362
365
363 if not isinstance(stream, (zmq.Socket, ZMQStream)):
366 if not isinstance(stream, (zmq.Socket, ZMQStream)):
364 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
367 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
365 elif track and isinstance(stream, ZMQStream):
368 elif track and isinstance(stream, ZMQStream):
366 raise TypeError("ZMQStream cannot track messages")
369 raise TypeError("ZMQStream cannot track messages")
367
370
368 if isinstance(msg_or_type, (Message, dict)):
371 if isinstance(msg_or_type, (Message, dict)):
369 # we got a Message, not a msg_type
372 # we got a Message, not a msg_type
370 # don't build a new Message
373 # don't build a new Message
371 msg = msg_or_type
374 msg = msg_or_type
372 else:
375 else:
373 msg = self.msg(msg_or_type, content, parent, subheader)
376 msg = self.msg(msg_or_type, content, parent, subheader)
374
377
375 buffers = [] if buffers is None else buffers
378 buffers = [] if buffers is None else buffers
376 to_send = self.serialize(msg, ident)
379 to_send = self.serialize(msg, ident)
377 flag = 0
380 flag = 0
378 if buffers:
381 if buffers:
379 flag = zmq.SNDMORE
382 flag = zmq.SNDMORE
380 _track = False
383 _track = False
381 else:
384 else:
382 _track=track
385 _track=track
383 if track:
386 if track:
384 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
387 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
385 else:
388 else:
386 tracker = stream.send_multipart(to_send, flag, copy=False)
389 tracker = stream.send_multipart(to_send, flag, copy=False)
387 for b in buffers[:-1]:
390 for b in buffers[:-1]:
388 stream.send(b, flag, copy=False)
391 stream.send(b, flag, copy=False)
389 if buffers:
392 if buffers:
390 if track:
393 if track:
391 tracker = stream.send(buffers[-1], copy=False, track=track)
394 tracker = stream.send(buffers[-1], copy=False, track=track)
392 else:
395 else:
393 tracker = stream.send(buffers[-1], copy=False)
396 tracker = stream.send(buffers[-1], copy=False)
394
397
395 # omsg = Message(msg)
398 # omsg = Message(msg)
396 if self.debug:
399 if self.debug:
397 pprint.pprint(msg)
400 pprint.pprint(msg)
398 pprint.pprint(to_send)
401 pprint.pprint(to_send)
399 pprint.pprint(buffers)
402 pprint.pprint(buffers)
400
403
401 msg['tracker'] = tracker
404 msg['tracker'] = tracker
402
405
403 return msg
406 return msg
404
407
405 def send_raw(self, stream, msg, flags=0, copy=True, ident=None):
408 def send_raw(self, stream, msg, flags=0, copy=True, ident=None):
406 """Send a raw message via ident path.
409 """Send a raw message via ident path.
407
410
408 Parameters
411 Parameters
409 ----------
412 ----------
410 msg : list of sendable buffers"""
413 msg : list of sendable buffers"""
411 to_send = []
414 to_send = []
412 if isinstance(ident, bytes):
415 if isinstance(ident, bytes):
413 ident = [ident]
416 ident = [ident]
414 if ident is not None:
417 if ident is not None:
415 to_send.extend(ident)
418 to_send.extend(ident)
416
419
417 to_send.append(DELIM)
420 to_send.append(DELIM)
418 to_send.append(self.sign(msg))
421 to_send.append(self.sign(msg))
419 to_send.extend(msg)
422 to_send.extend(msg)
420 stream.send_multipart(msg, flags, copy=copy)
423 stream.send_multipart(msg, flags, copy=copy)
421
424
422 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
425 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
423 """receives and unpacks a message
426 """receives and unpacks a message
424 returns [idents], msg"""
427 returns [idents], msg"""
425 if isinstance(socket, ZMQStream):
428 if isinstance(socket, ZMQStream):
426 socket = socket.socket
429 socket = socket.socket
427 try:
430 try:
428 msg = socket.recv_multipart(mode)
431 msg = socket.recv_multipart(mode)
429 except zmq.ZMQError as e:
432 except zmq.ZMQError as e:
430 if e.errno == zmq.EAGAIN:
433 if e.errno == zmq.EAGAIN:
431 # We can convert EAGAIN to None as we know in this case
434 # We can convert EAGAIN to None as we know in this case
432 # recv_multipart won't return None.
435 # recv_multipart won't return None.
433 return None,None
436 return None,None
434 else:
437 else:
435 raise
438 raise
436 # return an actual Message object
439 # return an actual Message object
437 # determine the number of idents by trying to unpack them.
440 # determine the number of idents by trying to unpack them.
438 # this is terrible:
441 # this is terrible:
439 idents, msg = self.feed_identities(msg, copy)
442 idents, msg = self.feed_identities(msg, copy)
440 try:
443 try:
441 return idents, self.unpack_message(msg, content=content, copy=copy)
444 return idents, self.unpack_message(msg, content=content, copy=copy)
442 except Exception as e:
445 except Exception as e:
443 print (idents, msg)
446 print (idents, msg)
444 # TODO: handle it
447 # TODO: handle it
445 raise e
448 raise e
446
449
447 def feed_identities(self, msg, copy=True):
450 def feed_identities(self, msg, copy=True):
448 """feed until DELIM is reached, then return the prefix as idents and remainder as
451 """feed until DELIM is reached, then return the prefix as idents and remainder as
449 msg. This is easily broken by setting an IDENT to DELIM, but that would be silly.
452 msg. This is easily broken by setting an IDENT to DELIM, but that would be silly.
450
453
451 Parameters
454 Parameters
452 ----------
455 ----------
453 msg : a list of Message or bytes objects
456 msg : a list of Message or bytes objects
454 the message to be split
457 the message to be split
455 copy : bool
458 copy : bool
456 flag determining whether the arguments are bytes or Messages
459 flag determining whether the arguments are bytes or Messages
457
460
458 Returns
461 Returns
459 -------
462 -------
460 (idents,msg) : two lists
463 (idents,msg) : two lists
461 idents will always be a list of bytes - the indentity prefix
464 idents will always be a list of bytes - the indentity prefix
462 msg will be a list of bytes or Messages, unchanged from input
465 msg will be a list of bytes or Messages, unchanged from input
463 msg should be unpackable via self.unpack_message at this point.
466 msg should be unpackable via self.unpack_message at this point.
464 """
467 """
465 if copy:
468 if copy:
466 idx = msg.index(DELIM)
469 idx = msg.index(DELIM)
467 return msg[:idx], msg[idx+1:]
470 return msg[:idx], msg[idx+1:]
468 else:
471 else:
469 failed = True
472 failed = True
470 for idx,m in enumerate(msg):
473 for idx,m in enumerate(msg):
471 if m.bytes == DELIM:
474 if m.bytes == DELIM:
472 failed = False
475 failed = False
473 break
476 break
474 if failed:
477 if failed:
475 raise ValueError("DELIM not in msg")
478 raise ValueError("DELIM not in msg")
476 idents, msg = msg[:idx], msg[idx+1:]
479 idents, msg = msg[:idx], msg[idx+1:]
477 return [m.bytes for m in idents], msg
480 return [m.bytes for m in idents], msg
478
481
479 def unpack_message(self, msg, content=True, copy=True):
482 def unpack_message(self, msg, content=True, copy=True):
480 """Return a message object from the format
483 """Return a message object from the format
481 sent by self.send.
484 sent by self.send.
482
485
483 Parameters:
486 Parameters:
484 -----------
487 -----------
485
488
486 content : bool (True)
489 content : bool (True)
487 whether to unpack the content dict (True),
490 whether to unpack the content dict (True),
488 or leave it serialized (False)
491 or leave it serialized (False)
489
492
490 copy : bool (True)
493 copy : bool (True)
491 whether to return the bytes (True),
494 whether to return the bytes (True),
492 or the non-copying Message object in each place (False)
495 or the non-copying Message object in each place (False)
493
496
494 """
497 """
495 minlen = 4
498 minlen = 4
496 message = {}
499 message = {}
497 if not copy:
500 if not copy:
498 for i in range(minlen):
501 for i in range(minlen):
499 msg[i] = msg[i].bytes
502 msg[i] = msg[i].bytes
500 if self.auth is not None:
503 if self.auth is not None:
501 signature = msg[0]
504 signature = msg[0]
502 if signature in self.digest_history:
505 if signature in self.digest_history:
503 raise ValueError("Duplicate Signature: %r"%signature)
506 raise ValueError("Duplicate Signature: %r"%signature)
504 self.digest_history.add(signature)
507 self.digest_history.add(signature)
505 check = self.sign(msg[1:4])
508 check = self.sign(msg[1:4])
506 if not signature == check:
509 if not signature == check:
507 raise ValueError("Invalid Signature: %r"%signature)
510 raise ValueError("Invalid Signature: %r"%signature)
508 if not len(msg) >= minlen:
511 if not len(msg) >= minlen:
509 raise TypeError("malformed message, must have at least %i elements"%minlen)
512 raise TypeError("malformed message, must have at least %i elements"%minlen)
510 message['header'] = self.unpack(msg[1])
513 message['header'] = self.unpack(msg[1])
511 message['msg_type'] = message['header']['msg_type']
514 message['msg_type'] = message['header']['msg_type']
512 message['parent_header'] = self.unpack(msg[2])
515 message['parent_header'] = self.unpack(msg[2])
513 if content:
516 if content:
514 message['content'] = self.unpack(msg[3])
517 message['content'] = self.unpack(msg[3])
515 else:
518 else:
516 message['content'] = msg[3]
519 message['content'] = msg[3]
517
520
518 message['buffers'] = msg[4:]
521 message['buffers'] = msg[4:]
519 return message
522 return message
520
523
521 def test_msg2obj():
524 def test_msg2obj():
522 am = dict(x=1)
525 am = dict(x=1)
523 ao = Message(am)
526 ao = Message(am)
524 assert ao.x == am['x']
527 assert ao.x == am['x']
525
528
526 am['y'] = dict(z=1)
529 am['y'] = dict(z=1)
527 ao = Message(am)
530 ao = Message(am)
528 assert ao.y.z == am['y']['z']
531 assert ao.y.z == am['y']['z']
529
532
530 k1, k2 = 'y', 'z'
533 k1, k2 = 'y', 'z'
531 assert ao[k1][k2] == am[k1][k2]
534 assert ao[k1][k2] == am[k1][k2]
532
535
533 am2 = dict(ao)
536 am2 = dict(ao)
534 assert am['x'] == am2['x']
537 assert am['x'] == am2['x']
535 assert am['y']['z'] == am2['y']['z']
538 assert am['y']['z'] == am2['y']['z']
General Comments 0
You need to be logged in to leave comments. Login now