logwatcher.py
98 lines
| 3.3 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3604 | #!/usr/bin/env python | ||
"""A simple logger object that consolidates messages incoming from ipclusterz processes.""" | ||||
#----------------------------------------------------------------------------- | ||||
# Copyright (C) 2011 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
import logging | ||||
MinRK
|
r3631 | import sys | ||
MinRK
|
r3604 | |||
import zmq | ||||
from zmq.eventloop import ioloop, zmqstream | ||||
MinRK
|
r3631 | |||
MinRK
|
r3604 | from IPython.utils.traitlets import Int, Str, Instance, List | ||
MinRK
|
r3610 | from factory import LoggingFactory | ||
MinRK
|
r3604 | #----------------------------------------------------------------------------- | ||
# Classes | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3610 | class LogWatcher(LoggingFactory): | ||
MinRK
|
r3604 | """A simple class that receives messages on a SUB socket, as published | ||
by subclasses of `zmq.log.handlers.PUBHandler`, and logs them itself. | ||||
This can subscribe to multiple topics, but defaults to all topics. | ||||
""" | ||||
# configurables | ||||
topics = List([''], config=True) | ||||
url = Str('tcp://127.0.0.1:20202', config=True) | ||||
# internals | ||||
context = Instance(zmq.Context, (), {}) | ||||
stream = Instance('zmq.eventloop.zmqstream.ZMQStream') | ||||
loop = Instance('zmq.eventloop.ioloop.IOLoop') | ||||
def _loop_default(self): | ||||
return ioloop.IOLoop.instance() | ||||
MinRK
|
r3610 | def __init__(self, **kwargs): | ||
super(LogWatcher, self).__init__(**kwargs) | ||||
MinRK
|
r3604 | s = self.context.socket(zmq.SUB) | ||
s.bind(self.url) | ||||
self.stream = zmqstream.ZMQStream(s, self.loop) | ||||
self.subscribe() | ||||
self.on_trait_change(self.subscribe, 'topics') | ||||
MinRK
|
r3610 | |||
def start(self): | ||||
MinRK
|
r3604 | self.stream.on_recv(self.log_message) | ||
MinRK
|
r3610 | def stop(self): | ||
self.stream.stop_on_recv() | ||||
MinRK
|
r3604 | def subscribe(self): | ||
"""Update our SUB socket's subscriptions.""" | ||||
self.stream.setsockopt(zmq.UNSUBSCRIBE, '') | ||||
for topic in self.topics: | ||||
MinRK
|
r3610 | self.log.debug("Subscribing to: %r"%topic) | ||
MinRK
|
r3604 | self.stream.setsockopt(zmq.SUBSCRIBE, topic) | ||
def _extract_level(self, topic_str): | ||||
"""Turn 'engine.0.INFO.extra' into (logging.INFO, 'engine.0.extra')""" | ||||
topics = topic_str.split('.') | ||||
for idx,t in enumerate(topics): | ||||
level = getattr(logging, t, None) | ||||
if level is not None: | ||||
break | ||||
if level is None: | ||||
level = logging.INFO | ||||
else: | ||||
topics.pop(idx) | ||||
return level, '.'.join(topics) | ||||
def log_message(self, raw): | ||||
"""receive and parse a message, then log it.""" | ||||
if len(raw) != 2 or '.' not in raw[0]: | ||||
MinRK
|
r3610 | self.log.error("Invalid log message: %s"%raw) | ||
MinRK
|
r3604 | return | ||
else: | ||||
topic, msg = raw | ||||
# don't newline, since log messages always newline: | ||||
topic,level_name = topic.rsplit('.',1) | ||||
level,topic = self._extract_level(topic) | ||||
if msg[-1] == '\n': | ||||
msg = msg[:-1] | ||||
logging.log(level, "[%s] %s" % (topic, msg)) | ||||