##// END OF EJS Templates
Merge pull request #1548 from minrk/ar_sugar...
Merge pull request #1548 from minrk/ar_sugar Add sugar methods/properties to AsyncResult that are generically useful: * `ar.wall_time` = received - submitted * `ar.serial_time` = sum of serial computation time * `ar.elapsed` = time since submission (wall_time if done) * `ar.progress` = (int) number of sub-tasks that have completed * `len(ar)` = # of tasks * `ar.wait_interactive()`: prints progress These are simple methods derived from the metadata/timestamps already created. But I've been persuaded by @wesm's practice of including simple methods that do useful (and/or cool) things. This also required/revealed some minor fixes/cleanup to clear_output in some cases: * dedent base `core.displaypub.clear_output`, so it's actually defined in the class * clear_output publishes `'\r\b'`, so it will clear terminal-like frontends that don't understand full clear_output behavior. * `core.display.clear_output()` still works, even outside an IPython session. Added a new notebook that shows how to use these new methods and how to do simple animations/progress bars using `clear_output()`. Added `Client.spin_thread(interval)` / `stop_spin_thread()` for running spin in a background thread, to keep zmq queue clear. This can be used to ensure that timing information is as accurate as possible (at the cost of having a background thread active).

File last commit:

r6324:7c84fd02
r6485:e0b43119 merge
Show More
heartmonitor.py
181 lines | 6.3 KiB | text/x-python | PythonLexer
#!/usr/bin/env python
"""
A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
and hearts are tracked based on their XREQ identities.
Authors:
* Min RK
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2010-2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
from __future__ import print_function
import time
import uuid
import zmq
from zmq.devices import ThreadDevice
from zmq.eventloop import ioloop, zmqstream
from IPython.config.configurable import LoggingConfigurable
from IPython.utils.traitlets import Set, Instance, CFloat, Integer
from IPython.parallel.util import asbytes, log_errors
class Heart(object):
"""A basic heart object for responding to a HeartMonitor.
This is a simple wrapper with defaults for the most common
Device model for responding to heartbeats.
It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
SUB/XREQ for in/out.
You can specify the XREQ's IDENTITY via the optional heart_id argument."""
device=None
id=None
def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None):
self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
# do not allow the device to share global Context.instance,
# which is the default behavior in pyzmq > 2.1.10
self.device.context_factory = zmq.Context
self.device.daemon=True
self.device.connect_in(in_addr)
self.device.connect_out(out_addr)
if in_type == zmq.SUB:
self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
if heart_id is None:
heart_id = uuid.uuid4().bytes
self.device.setsockopt_out(zmq.IDENTITY, heart_id)
self.id = heart_id
def start(self):
return self.device.start()
class HeartMonitor(LoggingConfigurable):
"""A basic HeartMonitor class
pingstream: a PUB stream
pongstream: an XREP stream
period: the period of the heartbeat in milliseconds"""
period = Integer(3000, config=True,
help='The frequency at which the Hub pings the engines for heartbeats '
'(in ms)',
)
pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
loop = Instance('zmq.eventloop.ioloop.IOLoop')
def _loop_default(self):
return ioloop.IOLoop.instance()
# not settable:
hearts=Set()
responses=Set()
on_probation=Set()
last_ping=CFloat(0)
_new_handlers = Set()
_failure_handlers = Set()
lifetime = CFloat(0)
tic = CFloat(0)
def __init__(self, **kwargs):
super(HeartMonitor, self).__init__(**kwargs)
self.pongstream.on_recv(self.handle_pong)
def start(self):
self.tic = time.time()
self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
self.caller.start()
def add_new_heart_handler(self, handler):
"""add a new handler for new hearts"""
self.log.debug("heartbeat::new_heart_handler: %s", handler)
self._new_handlers.add(handler)
def add_heart_failure_handler(self, handler):
"""add a new handler for heart failure"""
self.log.debug("heartbeat::new heart failure handler: %s", handler)
self._failure_handlers.add(handler)
def beat(self):
self.pongstream.flush()
self.last_ping = self.lifetime
toc = time.time()
self.lifetime += toc-self.tic
self.tic = toc
self.log.debug("heartbeat::sending %s", self.lifetime)
goodhearts = self.hearts.intersection(self.responses)
missed_beats = self.hearts.difference(goodhearts)
heartfailures = self.on_probation.intersection(missed_beats)
newhearts = self.responses.difference(goodhearts)
map(self.handle_new_heart, newhearts)
map(self.handle_heart_failure, heartfailures)
self.on_probation = missed_beats.intersection(self.hearts)
self.responses = set()
# print self.on_probation, self.hearts
# self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
self.pingstream.send(asbytes(str(self.lifetime)))
# flush stream to force immediate socket send
self.pingstream.flush()
def handle_new_heart(self, heart):
if self._new_handlers:
for handler in self._new_handlers:
handler(heart)
else:
self.log.info("heartbeat::yay, got new heart %s!", heart)
self.hearts.add(heart)
def handle_heart_failure(self, heart):
if self._failure_handlers:
for handler in self._failure_handlers:
try:
handler(heart)
except Exception as e:
self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
pass
else:
self.log.info("heartbeat::Heart %s failed :(", heart)
self.hearts.remove(heart)
@log_errors
def handle_pong(self, msg):
"a heart just beat"
current = asbytes(str(self.lifetime))
last = asbytes(str(self.last_ping))
if msg[1] == current:
delta = time.time()-self.tic
# self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
self.responses.add(msg[0])
elif msg[1] == last:
delta = time.time()-self.tic + (self.lifetime-self.last_ping)
self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
self.responses.add(msg[0])
else:
self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
if __name__ == '__main__':
loop = ioloop.IOLoop.instance()
context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.bind('tcp://127.0.0.1:5555')
xrep = context.socket(zmq.ROUTER)
xrep.bind('tcp://127.0.0.1:5556')
outstream = zmqstream.ZMQStream(pub, loop)
instream = zmqstream.ZMQStream(xrep, loop)
hb = HeartMonitor(loop, outstream, instream)
loop.start()