logwatcher.py
92 lines
| 3.2 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3604 | #!/usr/bin/env python | ||
"""A simple logger object that consolidates messages incoming from ipclusterz processes.""" | ||||
#----------------------------------------------------------------------------- | ||||
# Copyright (C) 2011 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
import sys | ||||
import logging | ||||
import zmq | ||||
from zmq.eventloop import ioloop, zmqstream | ||||
from IPython.config.configurable import Configurable | ||||
from IPython.utils.traitlets import Int, Str, Instance, List | ||||
#----------------------------------------------------------------------------- | ||||
# Classes | ||||
#----------------------------------------------------------------------------- | ||||
class LogWatcher(Configurable): | ||||
"""A simple class that receives messages on a SUB socket, as published | ||||
by subclasses of `zmq.log.handlers.PUBHandler`, and logs them itself. | ||||
This can subscribe to multiple topics, but defaults to all topics. | ||||
""" | ||||
# configurables | ||||
topics = List([''], config=True) | ||||
url = Str('tcp://127.0.0.1:20202', config=True) | ||||
# internals | ||||
context = Instance(zmq.Context, (), {}) | ||||
stream = Instance('zmq.eventloop.zmqstream.ZMQStream') | ||||
loop = Instance('zmq.eventloop.ioloop.IOLoop') | ||||
def _loop_default(self): | ||||
return ioloop.IOLoop.instance() | ||||
def __init__(self, config=None): | ||||
super(LogWatcher, self).__init__(config=config) | ||||
s = self.context.socket(zmq.SUB) | ||||
s.bind(self.url) | ||||
self.stream = zmqstream.ZMQStream(s, self.loop) | ||||
self.subscribe() | ||||
self.on_trait_change(self.subscribe, 'topics') | ||||
self.stream.on_recv(self.log_message) | ||||
def subscribe(self): | ||||
"""Update our SUB socket's subscriptions.""" | ||||
self.stream.setsockopt(zmq.UNSUBSCRIBE, '') | ||||
for topic in self.topics: | ||||
logging.debug("Subscribing to: %r"%topic) | ||||
self.stream.setsockopt(zmq.SUBSCRIBE, topic) | ||||
def _extract_level(self, topic_str): | ||||
"""Turn 'engine.0.INFO.extra' into (logging.INFO, 'engine.0.extra')""" | ||||
topics = topic_str.split('.') | ||||
for idx,t in enumerate(topics): | ||||
level = getattr(logging, t, None) | ||||
if level is not None: | ||||
break | ||||
if level is None: | ||||
level = logging.INFO | ||||
else: | ||||
topics.pop(idx) | ||||
return level, '.'.join(topics) | ||||
def log_message(self, raw): | ||||
"""receive and parse a message, then log it.""" | ||||
if len(raw) != 2 or '.' not in raw[0]: | ||||
logging.error("Invalid log message: %s"%raw) | ||||
return | ||||
else: | ||||
topic, msg = raw | ||||
# don't newline, since log messages always newline: | ||||
topic,level_name = topic.rsplit('.',1) | ||||
level,topic = self._extract_level(topic) | ||||
if msg[-1] == '\n': | ||||
msg = msg[:-1] | ||||
logging.log(level, "[%s] %s" % (topic, msg)) | ||||