diff --git a/IPython/parallel/controller/heartmonitor.py b/IPython/parallel/controller/heartmonitor.py index a27536b..83d967d 100755 --- a/IPython/parallel/controller/heartmonitor.py +++ b/IPython/parallel/controller/heartmonitor.py @@ -25,7 +25,7 @@ from zmq.eventloop import ioloop, zmqstream from IPython.config.configurable import LoggingConfigurable from IPython.utils.traitlets import Set, Instance, CFloat, Integer -from IPython.parallel.util import asbytes +from IPython.parallel.util import asbytes, log_errors class Heart(object): """A basic heart object for responding to a HeartMonitor. @@ -148,6 +148,7 @@ class HeartMonitor(LoggingConfigurable): self.hearts.remove(heart) + @log_errors def handle_pong(self, msg): "a heart just beat" current = asbytes(str(self.lifetime)) diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index eea28ed..f35d8e8 100644 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -454,6 +454,7 @@ class Hub(SessionFactory): #----------------------------------------------------------------------------- + @util.log_errors def dispatch_monitor_traffic(self, msg): """all ME and Task queue messages come through here, as well as IOPub traffic.""" @@ -473,6 +474,7 @@ class Hub(SessionFactory): self.log.error("Invalid monitor topic: %r", switch) + @util.log_errors def dispatch_query(self, msg): """Route registration requests and queries from clients.""" try: diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index 400e16c..bc417f0 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -43,7 +43,7 @@ from IPython.config.application import Application from IPython.config.loader import Config from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes -from IPython.parallel import error +from IPython.parallel import error, util from IPython.parallel.factory import SessionFactory from IPython.parallel.util import connect_logger, local_logger, asbytes @@ -240,6 +240,8 @@ class TaskScheduler(SessionFactory): # [Un]Registration Handling #----------------------------------------------------------------------- + + @util.log_errors def dispatch_notification(self, msg): """dispatch register/unregister events.""" try: @@ -343,6 +345,9 @@ class TaskScheduler(SessionFactory): #----------------------------------------------------------------------- # Job Submission #----------------------------------------------------------------------- + + + @util.log_errors def dispatch_submission(self, raw_msg): """Dispatch job submission to appropriate handlers.""" # ensure targets up to date: @@ -560,6 +565,9 @@ class TaskScheduler(SessionFactory): #----------------------------------------------------------------------- # Result Handling #----------------------------------------------------------------------- + + + @util.log_errors def dispatch_result(self, raw_msg): """dispatch method for result replies""" try: diff --git a/IPython/parallel/util.py b/IPython/parallel/util.py index 8cf4c70..f96704a 100644 --- a/IPython/parallel/util.py +++ b/IPython/parallel/util.py @@ -39,6 +39,8 @@ except: import zmq from zmq.log import handlers +from IPython.external.decorator import decorator + # IPython imports from IPython.config.application import Application from IPython.utils import py3compat @@ -106,6 +108,19 @@ class ReverseDict(dict): # Functions #----------------------------------------------------------------------------- +@decorator +def log_errors(f, self, *args, **kwargs): + """decorator to log unhandled exceptions raised in a method. + + For use wrapping on_recv callbacks, so that exceptions + do not cause the stream to be closed. + """ + try: + return f(self, *args, **kwargs) + except Exception: + self.log.error("Uncaught exception in %r" % f, exc_info=True) + + def asbytes(s): """ensure that an object is ascii bytes""" if isinstance(s, unicode):