##// END OF EJS Templates
tweak dagdeps for new AsyncResult objects
tweak dagdeps for new AsyncResult objects

File last commit:

r3604:2c044319
r3606:9f1a03ab
Show More
logwatcher.py
92 lines | 3.2 KiB | text/x-python | PythonLexer
#!/usr/bin/env python
"""A simple logger object that consolidates messages incoming from ipclusterz processes."""
#-----------------------------------------------------------------------------
# Copyright (C) 2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
import sys
import logging
import zmq
from zmq.eventloop import ioloop, zmqstream
from IPython.config.configurable import Configurable
from IPython.utils.traitlets import Int, Str, Instance, List
#-----------------------------------------------------------------------------
# Classes
#-----------------------------------------------------------------------------
class LogWatcher(Configurable):
"""A simple class that receives messages on a SUB socket, as published
by subclasses of `zmq.log.handlers.PUBHandler`, and logs them itself.
This can subscribe to multiple topics, but defaults to all topics.
"""
# configurables
topics = List([''], config=True)
url = Str('tcp://127.0.0.1:20202', config=True)
# internals
context = Instance(zmq.Context, (), {})
stream = Instance('zmq.eventloop.zmqstream.ZMQStream')
loop = Instance('zmq.eventloop.ioloop.IOLoop')
def _loop_default(self):
return ioloop.IOLoop.instance()
def __init__(self, config=None):
super(LogWatcher, self).__init__(config=config)
s = self.context.socket(zmq.SUB)
s.bind(self.url)
self.stream = zmqstream.ZMQStream(s, self.loop)
self.subscribe()
self.on_trait_change(self.subscribe, 'topics')
self.stream.on_recv(self.log_message)
def subscribe(self):
"""Update our SUB socket's subscriptions."""
self.stream.setsockopt(zmq.UNSUBSCRIBE, '')
for topic in self.topics:
logging.debug("Subscribing to: %r"%topic)
self.stream.setsockopt(zmq.SUBSCRIBE, topic)
def _extract_level(self, topic_str):
"""Turn 'engine.0.INFO.extra' into (logging.INFO, 'engine.0.extra')"""
topics = topic_str.split('.')
for idx,t in enumerate(topics):
level = getattr(logging, t, None)
if level is not None:
break
if level is None:
level = logging.INFO
else:
topics.pop(idx)
return level, '.'.join(topics)
def log_message(self, raw):
"""receive and parse a message, then log it."""
if len(raw) != 2 or '.' not in raw[0]:
logging.error("Invalid log message: %s"%raw)
return
else:
topic, msg = raw
# don't newline, since log messages always newline:
topic,level_name = topic.rsplit('.',1)
level,topic = self._extract_level(topic)
if msg[-1] == '\n':
msg = msg[:-1]
logging.log(level, "[%s] %s" % (topic, msg))