logwatcher.py
115 lines
| 3.7 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3604 | #!/usr/bin/env python | ||
MinRK
|
r4018 | """ | ||
A simple logger object that consolidates messages incoming from ipcluster processes. | ||||
Authors: | ||||
* MinRK | ||||
""" | ||||
MinRK
|
r3604 | |||
#----------------------------------------------------------------------------- | ||||
# Copyright (C) 2011 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
import logging | ||||
MinRK
|
r3631 | import sys | ||
MinRK
|
r3604 | |||
import zmq | ||||
from zmq.eventloop import ioloop, zmqstream | ||||
MinRK
|
r3631 | |||
MinRK
|
r4016 | from IPython.config.configurable import LoggingConfigurable | ||
MinRK
|
r3988 | from IPython.utils.traitlets import Int, Unicode, Instance, List | ||
MinRK
|
r3604 | |||
#----------------------------------------------------------------------------- | ||||
# Classes | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r4016 | class LogWatcher(LoggingConfigurable): | ||
MinRK
|
r3604 | """A simple class that receives messages on a SUB socket, as published | ||
by subclasses of `zmq.log.handlers.PUBHandler`, and logs them itself. | ||||
This can subscribe to multiple topics, but defaults to all topics. | ||||
""" | ||||
MinRK
|
r4007 | |||
MinRK
|
r3604 | # configurables | ||
MinRK
|
r3989 | topics = List([''], config=True, | ||
help="The ZMQ topics to subscribe to. Default is to subscribe to all messages") | ||||
url = Unicode('tcp://127.0.0.1:20202', config=True, | ||||
help="ZMQ url on which to listen for log messages") | ||||
MinRK
|
r3604 | |||
# internals | ||||
stream = Instance('zmq.eventloop.zmqstream.ZMQStream') | ||||
MinRK
|
r3989 | |||
context = Instance(zmq.Context) | ||||
def _context_default(self): | ||||
return zmq.Context.instance() | ||||
loop = Instance(zmq.eventloop.ioloop.IOLoop) | ||||
MinRK
|
r3604 | def _loop_default(self): | ||
return ioloop.IOLoop.instance() | ||||
MinRK
|
r3610 | def __init__(self, **kwargs): | ||
super(LogWatcher, self).__init__(**kwargs) | ||||
MinRK
|
r3604 | s = self.context.socket(zmq.SUB) | ||
s.bind(self.url) | ||||
self.stream = zmqstream.ZMQStream(s, self.loop) | ||||
self.subscribe() | ||||
self.on_trait_change(self.subscribe, 'topics') | ||||
MinRK
|
r3610 | |||
def start(self): | ||||
MinRK
|
r3604 | self.stream.on_recv(self.log_message) | ||
MinRK
|
r3610 | def stop(self): | ||
self.stream.stop_on_recv() | ||||
MinRK
|
r3604 | def subscribe(self): | ||
"""Update our SUB socket's subscriptions.""" | ||||
self.stream.setsockopt(zmq.UNSUBSCRIBE, '') | ||||
MinRK
|
r3989 | if '' in self.topics: | ||
self.log.debug("Subscribing to: everything") | ||||
self.stream.setsockopt(zmq.SUBSCRIBE, '') | ||||
else: | ||||
for topic in self.topics: | ||||
self.log.debug("Subscribing to: %r"%(topic)) | ||||
self.stream.setsockopt(zmq.SUBSCRIBE, topic) | ||||
MinRK
|
r3604 | |||
def _extract_level(self, topic_str): | ||||
"""Turn 'engine.0.INFO.extra' into (logging.INFO, 'engine.0.extra')""" | ||||
topics = topic_str.split('.') | ||||
for idx,t in enumerate(topics): | ||||
level = getattr(logging, t, None) | ||||
if level is not None: | ||||
break | ||||
if level is None: | ||||
level = logging.INFO | ||||
else: | ||||
topics.pop(idx) | ||||
return level, '.'.join(topics) | ||||
def log_message(self, raw): | ||||
"""receive and parse a message, then log it.""" | ||||
if len(raw) != 2 or '.' not in raw[0]: | ||||
MinRK
|
r3610 | self.log.error("Invalid log message: %s"%raw) | ||
MinRK
|
r3604 | return | ||
else: | ||||
topic, msg = raw | ||||
# don't newline, since log messages always newline: | ||||
topic,level_name = topic.rsplit('.',1) | ||||
level,topic = self._extract_level(topic) | ||||
if msg[-1] == '\n': | ||||
msg = msg[:-1] | ||||
MinRK
|
r3989 | self.log.log(level, "[%s] %s" % (topic, msg)) | ||
MinRK
|
r3604 | |||