#!/usr/bin/env python """A simple logger object that consolidates messages incoming from ipclusterz processes.""" #----------------------------------------------------------------------------- # Copyright (C) 2011 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. #----------------------------------------------------------------------------- #----------------------------------------------------------------------------- # Imports #----------------------------------------------------------------------------- import sys import logging import zmq from zmq.eventloop import ioloop, zmqstream from IPython.config.configurable import Configurable from IPython.utils.traitlets import Int, Str, Instance, List #----------------------------------------------------------------------------- # Classes #----------------------------------------------------------------------------- class LogWatcher(Configurable): """A simple class that receives messages on a SUB socket, as published by subclasses of `zmq.log.handlers.PUBHandler`, and logs them itself. This can subscribe to multiple topics, but defaults to all topics. """ # configurables topics = List([''], config=True) url = Str('tcp://127.0.0.1:20202', config=True) # internals context = Instance(zmq.Context, (), {}) stream = Instance('zmq.eventloop.zmqstream.ZMQStream') loop = Instance('zmq.eventloop.ioloop.IOLoop') def _loop_default(self): return ioloop.IOLoop.instance() def __init__(self, config=None): super(LogWatcher, self).__init__(config=config) s = self.context.socket(zmq.SUB) s.bind(self.url) self.stream = zmqstream.ZMQStream(s, self.loop) self.subscribe() self.on_trait_change(self.subscribe, 'topics') self.stream.on_recv(self.log_message) def subscribe(self): """Update our SUB socket's subscriptions.""" self.stream.setsockopt(zmq.UNSUBSCRIBE, '') for topic in self.topics: logging.debug("Subscribing to: %r"%topic) self.stream.setsockopt(zmq.SUBSCRIBE, topic) def _extract_level(self, topic_str): """Turn 'engine.0.INFO.extra' into (logging.INFO, 'engine.0.extra')""" topics = topic_str.split('.') for idx,t in enumerate(topics): level = getattr(logging, t, None) if level is not None: break if level is None: level = logging.INFO else: topics.pop(idx) return level, '.'.join(topics) def log_message(self, raw): """receive and parse a message, then log it.""" if len(raw) != 2 or '.' not in raw[0]: logging.error("Invalid log message: %s"%raw) return else: topic, msg = raw # don't newline, since log messages always newline: topic,level_name = topic.rsplit('.',1) level,topic = self._extract_level(topic) if msg[-1] == '\n': msg = msg[:-1] logging.log(level, "[%s] %s" % (topic, msg))