##// END OF EJS Templates
add Client.resubmit for re-running tasks...
add Client.resubmit for re-running tasks closes gh-411 * allow `content` in session.serialize to be a unicode object, because mongo+JSON cannot be relied upon to produce encoded bytes.

File last commit:

r3673:b9f54806
r3874:0c043a69
Show More
logwatcher.py
98 lines | 3.3 KiB | text/x-python | PythonLexer
MinRK
Refactor newparallel to use Config system...
r3604 #!/usr/bin/env python
MinRK
rebase IPython.parallel after removal of IPython.kernel...
r3672 """A simple logger object that consolidates messages incoming from ipcluster processes."""
MinRK
Refactor newparallel to use Config system...
r3604
#-----------------------------------------------------------------------------
# Copyright (C) 2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
import logging
MinRK
resort imports in a cleaner order
r3631 import sys
MinRK
Refactor newparallel to use Config system...
r3604
import zmq
from zmq.eventloop import ioloop, zmqstream
MinRK
resort imports in a cleaner order
r3631
MinRK
Refactor newparallel to use Config system...
r3604 from IPython.utils.traitlets import Int, Str, Instance, List
MinRK
organize IPython.parallel into subpackages
r3673 from IPython.parallel.factory import LoggingFactory
MinRK
rework logging connections
r3610
MinRK
Refactor newparallel to use Config system...
r3604 #-----------------------------------------------------------------------------
# Classes
#-----------------------------------------------------------------------------
MinRK
rework logging connections
r3610 class LogWatcher(LoggingFactory):
MinRK
Refactor newparallel to use Config system...
r3604 """A simple class that receives messages on a SUB socket, as published
by subclasses of `zmq.log.handlers.PUBHandler`, and logs them itself.
This can subscribe to multiple topics, but defaults to all topics.
"""
# configurables
topics = List([''], config=True)
url = Str('tcp://127.0.0.1:20202', config=True)
# internals
context = Instance(zmq.Context, (), {})
stream = Instance('zmq.eventloop.zmqstream.ZMQStream')
loop = Instance('zmq.eventloop.ioloop.IOLoop')
def _loop_default(self):
return ioloop.IOLoop.instance()
MinRK
rework logging connections
r3610 def __init__(self, **kwargs):
super(LogWatcher, self).__init__(**kwargs)
MinRK
Refactor newparallel to use Config system...
r3604 s = self.context.socket(zmq.SUB)
s.bind(self.url)
self.stream = zmqstream.ZMQStream(s, self.loop)
self.subscribe()
self.on_trait_change(self.subscribe, 'topics')
MinRK
rework logging connections
r3610
def start(self):
MinRK
Refactor newparallel to use Config system...
r3604 self.stream.on_recv(self.log_message)
MinRK
rework logging connections
r3610 def stop(self):
self.stream.stop_on_recv()
MinRK
Refactor newparallel to use Config system...
r3604 def subscribe(self):
"""Update our SUB socket's subscriptions."""
self.stream.setsockopt(zmq.UNSUBSCRIBE, '')
for topic in self.topics:
MinRK
rework logging connections
r3610 self.log.debug("Subscribing to: %r"%topic)
MinRK
Refactor newparallel to use Config system...
r3604 self.stream.setsockopt(zmq.SUBSCRIBE, topic)
def _extract_level(self, topic_str):
"""Turn 'engine.0.INFO.extra' into (logging.INFO, 'engine.0.extra')"""
topics = topic_str.split('.')
for idx,t in enumerate(topics):
level = getattr(logging, t, None)
if level is not None:
break
if level is None:
level = logging.INFO
else:
topics.pop(idx)
return level, '.'.join(topics)
def log_message(self, raw):
"""receive and parse a message, then log it."""
if len(raw) != 2 or '.' not in raw[0]:
MinRK
rework logging connections
r3610 self.log.error("Invalid log message: %s"%raw)
MinRK
Refactor newparallel to use Config system...
r3604 return
else:
topic, msg = raw
# don't newline, since log messages always newline:
topic,level_name = topic.rsplit('.',1)
level,topic = self._extract_level(topic)
if msg[-1] == '\n':
msg = msg[:-1]
logging.log(level, "[%s] %s" % (topic, msg))