From 7c84fd02d45d79ed2cf68c7383ec60c708a74870 2012-04-05 19:26:33 From: MinRK Date: 2012-04-05 19:26:33 Subject: [PATCH] add log_errors decorator for on_recv callbacks Logs uncaught exceptions, rather than allowing them to raise, which would cause the socket to be closed, preventing further requests. Reduces impact of bugs like the one fixed in the previous commit. --- diff --git a/IPython/parallel/controller/heartmonitor.py b/IPython/parallel/controller/heartmonitor.py index a27536b..83d967d 100755 --- a/IPython/parallel/controller/heartmonitor.py +++ b/IPython/parallel/controller/heartmonitor.py @@ -25,7 +25,7 @@ from zmq.eventloop import ioloop, zmqstream from IPython.config.configurable import LoggingConfigurable from IPython.utils.traitlets import Set, Instance, CFloat, Integer -from IPython.parallel.util import asbytes +from IPython.parallel.util import asbytes, log_errors class Heart(object): """A basic heart object for responding to a HeartMonitor. @@ -148,6 +148,7 @@ class HeartMonitor(LoggingConfigurable): self.hearts.remove(heart) + @log_errors def handle_pong(self, msg): "a heart just beat" current = asbytes(str(self.lifetime)) diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index eea28ed..f35d8e8 100644 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -454,6 +454,7 @@ class Hub(SessionFactory): #----------------------------------------------------------------------------- + @util.log_errors def dispatch_monitor_traffic(self, msg): """all ME and Task queue messages come through here, as well as IOPub traffic.""" @@ -473,6 +474,7 @@ class Hub(SessionFactory): self.log.error("Invalid monitor topic: %r", switch) + @util.log_errors def dispatch_query(self, msg): """Route registration requests and queries from clients.""" try: diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index 400e16c..bc417f0 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -43,7 +43,7 @@ from IPython.config.application import Application from IPython.config.loader import Config from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes -from IPython.parallel import error +from IPython.parallel import error, util from IPython.parallel.factory import SessionFactory from IPython.parallel.util import connect_logger, local_logger, asbytes @@ -240,6 +240,8 @@ class TaskScheduler(SessionFactory): # [Un]Registration Handling #----------------------------------------------------------------------- + + @util.log_errors def dispatch_notification(self, msg): """dispatch register/unregister events.""" try: @@ -343,6 +345,9 @@ class TaskScheduler(SessionFactory): #----------------------------------------------------------------------- # Job Submission #----------------------------------------------------------------------- + + + @util.log_errors def dispatch_submission(self, raw_msg): """Dispatch job submission to appropriate handlers.""" # ensure targets up to date: @@ -560,6 +565,9 @@ class TaskScheduler(SessionFactory): #----------------------------------------------------------------------- # Result Handling #----------------------------------------------------------------------- + + + @util.log_errors def dispatch_result(self, raw_msg): """dispatch method for result replies""" try: diff --git a/IPython/parallel/util.py b/IPython/parallel/util.py index 8cf4c70..f96704a 100644 --- a/IPython/parallel/util.py +++ b/IPython/parallel/util.py @@ -39,6 +39,8 @@ except: import zmq from zmq.log import handlers +from IPython.external.decorator import decorator + # IPython imports from IPython.config.application import Application from IPython.utils import py3compat @@ -106,6 +108,19 @@ class ReverseDict(dict): # Functions #----------------------------------------------------------------------------- +@decorator +def log_errors(f, self, *args, **kwargs): + """decorator to log unhandled exceptions raised in a method. + + For use wrapping on_recv callbacks, so that exceptions + do not cause the stream to be closed. + """ + try: + return f(self, *args, **kwargs) + except Exception: + self.log.error("Uncaught exception in %r" % f, exc_info=True) + + def asbytes(s): """ensure that an object is ascii bytes""" if isinstance(s, unicode):