From 49041c426f8885510cf3c45e8474fe9505547e95 2010-09-01 05:28:03 From: Brian Granger Date: 2010-09-01 05:28:03 Subject: [PATCH] Merge branch 'mynewkernel' into upstream-newkernel --- diff --git a/IPython/frontend/qt/base_frontend_mixin.py b/IPython/frontend/qt/base_frontend_mixin.py index b752bfa..4b5372f 100644 --- a/IPython/frontend/qt/base_frontend_mixin.py +++ b/IPython/frontend/qt/base_frontend_mixin.py @@ -31,8 +31,9 @@ class BaseFrontendMixin(object): # Disconnect the old kernel manager's channels. old_manager.sub_channel.message_received.disconnect(self._dispatch) old_manager.xreq_channel.message_received.disconnect(self._dispatch) - old_manager.rep_channel.message_received.connect(self._dispatch) - + old_manager.rep_channel.message_received.disconnect(self._dispatch) + old_manager.hb_channel.kernel_died.disconnect(self._handle_kernel_died) + # Handle the case where the old kernel manager is still listening. if old_manager.channels_running: self._stopped_channels() @@ -50,7 +51,8 @@ class BaseFrontendMixin(object): kernel_manager.sub_channel.message_received.connect(self._dispatch) kernel_manager.xreq_channel.message_received.connect(self._dispatch) kernel_manager.rep_channel.message_received.connect(self._dispatch) - + kernel_manager.hb_channel.kernel_died.connect(self._handle_kernel_died) + # Handle the case where the kernel manager started channels before # we connected. if kernel_manager.channels_running: @@ -91,3 +93,17 @@ class BaseFrontendMixin(object): """ session = self._kernel_manager.session.session return msg['parent_header']['session'] == session + + def _handle_kernel_died(self, since_last_heartbeat): + """ This is called when the ``kernel_died`` signal is emitted. + + This method is called when the kernel heartbeat has not been + active for a certain amount of time. The typical action will be to + give the user the option of restarting the kernel. + + Parameters + ---------- + since_last_heartbeat : float + The time since the heartbeat was last received. + """ + pass diff --git a/IPython/frontend/qt/console/frontend_widget.py b/IPython/frontend/qt/console/frontend_widget.py index 31503a9..0acfb07 100644 --- a/IPython/frontend/qt/console/frontend_widget.py +++ b/IPython/frontend/qt/console/frontend_widget.py @@ -90,6 +90,7 @@ class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin): # Protected class variables. _input_splitter_class = InputSplitter + _possible_kernel_restart = Bool(False) #--------------------------------------------------------------------------- # 'object' interface @@ -176,7 +177,8 @@ class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin): self._kernel_interrupt() return True elif key == QtCore.Qt.Key_Period: - self._kernel_restart() + message = 'Are you sure you want to restart the kernel?' + self._kernel_restart(message) return True return super(FrontendWidget, self)._event_filter_console_keypress(event) @@ -348,29 +350,38 @@ class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin): self._append_plain_text('Kernel process is either remote or ' 'unspecified. Cannot interrupt.\n') - def _kernel_restart(self): + def _kernel_restart(self, message): """ Attempts to restart the running kernel. """ - if self.custom_restart: - self.custom_restart_requested.emit() - elif self.kernel_manager.has_kernel: - message = 'Are you sure you want to restart the kernel?' - buttons = QtGui.QMessageBox.Yes | QtGui.QMessageBox.No - result = QtGui.QMessageBox.question(self, 'Restart kernel?', - message, buttons) - if result == QtGui.QMessageBox.Yes: - try: - self.kernel_manager.restart_kernel() - except RuntimeError: - message = 'Kernel started externally. Cannot restart.\n' - self._append_plain_text(message) - else: - self._stopped_channels() - self._append_plain_text('Kernel restarting...\n') - self._show_interpreter_prompt() - else: - self._append_plain_text('Kernel process is either remote or ' - 'unspecified. Cannot restart.\n') + # We want to make sure that if this dialog is already happening, that + # other signals don't trigger it again. This can happen when the + # kernel_died heartbeat signal is emitted and the user is slow to + # respond to the dialog. + if not self._possible_kernel_restart: + if self.custom_restart: + self.custom_restart_requested.emit() + elif self.kernel_manager.has_kernel: + # Setting this to True will prevent this logic from happening + # again until the current pass is completed. + self._possible_kernel_restart = True + buttons = QtGui.QMessageBox.Yes | QtGui.QMessageBox.No + result = QtGui.QMessageBox.question(self, 'Restart kernel?', + message, buttons) + if result == QtGui.QMessageBox.Yes: + try: + self.kernel_manager.restart_kernel() + except RuntimeError: + message = 'Kernel started externally. Cannot restart.\n' + self._append_plain_text(message) + else: + self._stopped_channels() + self._append_plain_text('Kernel restarting...\n') + self._show_interpreter_prompt() + # This might need to be moved to another location? + self._possible_kernel_restart = False + else: + self._append_plain_text('Kernel process is either remote or ' + 'unspecified. Cannot restart.\n') def _process_execute_abort(self, msg): """ Process a reply for an aborted execution request. diff --git a/IPython/frontend/qt/console/ipython_widget.py b/IPython/frontend/qt/console/ipython_widget.py index 0ee71c2..7ecd7d8 100644 --- a/IPython/frontend/qt/console/ipython_widget.py +++ b/IPython/frontend/qt/console/ipython_widget.py @@ -154,6 +154,14 @@ class IPythonWidget(FrontendWidget): # FIXME: Disabled until history requests are properly implemented. #self.kernel_manager.xreq_channel.history(raw=True, output=False) + def _handle_kernel_died(self, since_last_heartbeat): + """ Handle the kernel's death by asking if the user wants to restart. + """ + message = 'The kernel heartbeat has been inactive for %.2f ' \ + 'seconds. Do you want to restart the kernel? You may ' \ + 'first want to check the network connection.' % since_last_heartbeat + self._kernel_restart(message) + #--------------------------------------------------------------------------- # 'FrontendWidget' interface #--------------------------------------------------------------------------- diff --git a/IPython/frontend/qt/console/scripts/ipythonqt.py b/IPython/frontend/qt/console/scripts/ipythonqt.py index 4123cb4..4229a77 100755 --- a/IPython/frontend/qt/console/scripts/ipythonqt.py +++ b/IPython/frontend/qt/console/scripts/ipythonqt.py @@ -33,6 +33,8 @@ def main(): help='set the SUB channel port [default random]') kgroup.add_argument('--rep', type=int, metavar='PORT', default=0, help='set the REP channel port [default random]') + kgroup.add_argument('--hb', type=int, metavar='PORT', default=0, + help='set the heartbeat port [default: random]') egroup = kgroup.add_mutually_exclusive_group() egroup.add_argument('--pure', action='store_true', help = \ @@ -61,7 +63,8 @@ def main(): # Create a KernelManager and start a kernel. kernel_manager = QtKernelManager(xreq_address=(args.ip, args.xreq), sub_address=(args.ip, args.sub), - rep_address=(args.ip, args.rep)) + rep_address=(args.ip, args.rep), + hb_address=(args.ip, args.hb)) if args.ip == LOCALHOST and not args.existing: if args.pure: kernel_manager.start_kernel(ipython=False) diff --git a/IPython/frontend/qt/kernelmanager.py b/IPython/frontend/qt/kernelmanager.py index 7467849..eadaf3a 100644 --- a/IPython/frontend/qt/kernelmanager.py +++ b/IPython/frontend/qt/kernelmanager.py @@ -8,7 +8,7 @@ import zmq # IPython imports. from IPython.utils.traitlets import Type from IPython.zmq.kernelmanager import KernelManager, SubSocketChannel, \ - XReqSocketChannel, RepSocketChannel + XReqSocketChannel, RepSocketChannel, HBSocketChannel from util import MetaQObjectHasTraits # When doing multiple inheritance from QtCore.QObject and other classes @@ -149,6 +149,32 @@ class QtRepSocketChannel(RepSocketChannel, QtCore.QObject): self.input_requested.emit(msg) +class QtHBSocketChannel(HBSocketChannel, QtCore.QObject): + + # Emitted when the kernel has died. + kernel_died = QtCore.pyqtSignal(object) + + #--------------------------------------------------------------------------- + # 'object' interface + #--------------------------------------------------------------------------- + + def __init__(self, *args, **kw): + """ Reimplemented to ensure that QtCore.QObject is initialized first. + """ + QtCore.QObject.__init__(self) + HBSocketChannel.__init__(self, *args, **kw) + + #--------------------------------------------------------------------------- + # 'RepSocketChannel' interface + #--------------------------------------------------------------------------- + + def call_handlers(self, since_last_heartbeat): + """ Reimplemented to emit signals instead of making callbacks. + """ + # Emit the generic signal. + self.kernel_died.emit(since_last_heartbeat) + + class QtKernelManager(KernelManager, QtCore.QObject): """ A KernelManager that provides signals and slots. """ @@ -165,6 +191,7 @@ class QtKernelManager(KernelManager, QtCore.QObject): sub_channel_class = Type(QtSubSocketChannel) xreq_channel_class = Type(QtXReqSocketChannel) rep_channel_class = Type(QtRepSocketChannel) + hb_channel_class = Type(QtHBSocketChannel) #--------------------------------------------------------------------------- # 'object' interface diff --git a/IPython/zmq/entry_point.py b/IPython/zmq/entry_point.py index 0ce3112..f063cd4 100644 --- a/IPython/zmq/entry_point.py +++ b/IPython/zmq/entry_point.py @@ -19,7 +19,7 @@ from exitpoller import ExitPollerUnix, ExitPollerWindows from displayhook import DisplayHook from iostream import OutStream from session import Session - +from heartbeat import Heartbeat def bind_port(socket, ip, port): """ Binds the specified ZMQ socket. If the port is zero, a random port is @@ -47,6 +47,8 @@ def make_argument_parser(): help='set the PUB channel port [default: random]') parser.add_argument('--req', type=int, metavar='PORT', default=0, help='set the REQ channel port [default: random]') + parser.add_argument('--hb', type=int, metavar='PORT', default=0, + help='set the heartbeat port [default: random]') if sys.platform == 'win32': parser.add_argument('--parent', type=int, metavar='HANDLE', @@ -84,6 +86,10 @@ def make_kernel(namespace, kernel_factory, req_port = bind_port(req_socket, namespace.ip, namespace.req) io.raw_print("REQ Channel on port", req_port) + hb = Heartbeat(context, (namespace.ip, namespace.hb)) + hb.start() + io.raw_print("Heartbeat REP Channel on port", hb.port) + # Redirect input streams and set a display hook. if out_stream_factory: pass @@ -122,7 +128,7 @@ def make_default_main(kernel_factory): return main -def base_launch_kernel(code, xrep_port=0, pub_port=0, req_port=0, +def base_launch_kernel(code, xrep_port=0, pub_port=0, req_port=0, hb_port=0, independent=False, extra_arguments=[]): """ Launches a localhost kernel, binding to the specified ports. @@ -140,6 +146,9 @@ def base_launch_kernel(code, xrep_port=0, pub_port=0, req_port=0, req_port : int, optional The port to use for the REQ (raw input) channel. + hb_port : int, optional + The port to use for the hearbeat REP channel. + independent : bool, optional (default False) If set, the kernel process is guaranteed to survive if this process dies. If not set, an effort is made to ensure that the kernel is killed @@ -157,7 +166,8 @@ def base_launch_kernel(code, xrep_port=0, pub_port=0, req_port=0, """ # Find open ports as necessary. ports = [] - ports_needed = int(xrep_port <= 0) + int(pub_port <= 0) + int(req_port <= 0) + ports_needed = int(xrep_port <= 0) + int(pub_port <= 0) + \ + int(req_port <= 0) + int(hb_port <= 0) for i in xrange(ports_needed): sock = socket.socket() sock.bind(('', 0)) @@ -172,10 +182,13 @@ def base_launch_kernel(code, xrep_port=0, pub_port=0, req_port=0, pub_port = ports.pop(0) if req_port <= 0: req_port = ports.pop(0) + if hb_port <= 0: + hb_port = ports.pop(0) # Build the kernel launch command. arguments = [ sys.executable, '-c', code, '--xrep', str(xrep_port), - '--pub', str(pub_port), '--req', str(req_port) ] + '--pub', str(pub_port), '--req', str(req_port), + '--hb', str(hb_port) ] arguments.extend(extra_arguments) # Spawn a kernel. @@ -196,4 +209,4 @@ def base_launch_kernel(code, xrep_port=0, pub_port=0, req_port=0, else: proc = Popen(arguments + ['--parent']) - return proc, xrep_port, pub_port, req_port + return proc, xrep_port, pub_port, req_port, hb_port diff --git a/IPython/zmq/heartbeat.py b/IPython/zmq/heartbeat.py new file mode 100644 index 0000000..28c85d0 --- /dev/null +++ b/IPython/zmq/heartbeat.py @@ -0,0 +1,43 @@ +"""The client and server for a basic ping-pong style heartbeat. +""" + +#----------------------------------------------------------------------------- +# Copyright (C) 2008-2010 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +import sys +from threading import Thread + +import zmq + +#----------------------------------------------------------------------------- +# Code +#----------------------------------------------------------------------------- + + +class Heartbeat(Thread): + "A simple ping-pong style heartbeat that runs in a thread." + + def __init__(self, context, addr=('127.0.0.1', 0)): + Thread.__init__(self) + self.context = context + self.addr = addr + self.ip = addr[0] + self.port = addr[1] + self.daemon = True + + def run(self): + self.socket = self.context.socket(zmq.REP) + if self.port == 0: + self.port = self.socket.bind_to_random_port('tcp://%s' % self.ip) + else: + self.socket.bind('tcp://%s:%i' % self.addr) + zmq.device(zmq.FORWARDER, self.socket, self.socket) + diff --git a/IPython/zmq/ipkernel.py b/IPython/zmq/ipkernel.py index 1df405c..b08c9bf 100755 --- a/IPython/zmq/ipkernel.py +++ b/IPython/zmq/ipkernel.py @@ -377,8 +377,8 @@ class TkKernel(Kernel): # Kernel main and launch functions #----------------------------------------------------------------------------- -def launch_kernel(xrep_port=0, pub_port=0, req_port=0, independent=False, - pylab=False): +def launch_kernel(xrep_port=0, pub_port=0, req_port=0, hb_port=0, + independent=False, pylab=False): """ Launches a localhost kernel, binding to the specified ports. Parameters @@ -392,6 +392,9 @@ def launch_kernel(xrep_port=0, pub_port=0, req_port=0, independent=False, req_port : int, optional The port to use for the REQ (raw input) channel. + hb_port : int, optional + The port to use for the hearbeat REP channel. + independent : bool, optional (default False) If set, the kernel process is guaranteed to survive if this process dies. If not set, an effort is made to ensure that the kernel is killed @@ -415,8 +418,8 @@ def launch_kernel(xrep_port=0, pub_port=0, req_port=0, independent=False, if isinstance(pylab, basestring): extra_arguments.append(pylab) return base_launch_kernel('from IPython.zmq.ipkernel import main; main()', - xrep_port, pub_port, req_port, independent, - extra_arguments) + xrep_port, pub_port, req_port, hb_port, + independent, extra_arguments) def main(): """ The IPython kernel main entry point. diff --git a/IPython/zmq/kernelmanager.py b/IPython/zmq/kernelmanager.py index 1aaf510..5cc55e8 100644 --- a/IPython/zmq/kernelmanager.py +++ b/IPython/zmq/kernelmanager.py @@ -447,6 +447,71 @@ class RepSocketChannel(ZmqSocketChannel): self.add_io_state(POLLOUT) +class HBSocketChannel(ZmqSocketChannel): + """The heartbeat channel which monitors the kernel heartbeat. + """ + + time_to_dead = 5.0 + socket = None + poller = None + + def __init__(self, context, session, address): + super(HBSocketChannel, self).__init__(context, session, address) + + def _create_socket(self): + self.socket = self.context.socket(zmq.REQ) + self.socket.setsockopt(zmq.IDENTITY, self.session.session) + self.socket.connect('tcp://%s:%i' % self.address) + self.poller = zmq.Poller() + self.poller.register(self.socket, zmq.POLLIN) + + def run(self): + """The thread's main activity. Call start() instead.""" + self._create_socket() + + while True: + since_last_heartbeat = 0.0 + request_time = time.time() + try: + self.socket.send_json('ping') + except zmq.ZMQError, e: + if e.errno == zmq.EFSM: + time.sleep(self.time_to_dead) + self._create_socket() + else: + raise + else: + while True: + try: + reply = self.socket.recv_json(zmq.NOBLOCK) + except zmq.ZMQError, e: + if e.errno == zmq.EAGAIN: + until_dead = self.time_to_dead-(time.time()-request_time) + self.poller.poll(until_dead) + since_last_heartbeat = time.time() - request_time + if since_last_heartbeat > self.time_to_dead: + self.call_handlers(since_last_heartbeat) + break + else: + # We should probably log this instead + raise + else: + until_dead = self.time_to_dead-(time.time()-request_time) + if until_dead > 0.0: + time.sleep(until_dead) + break + + def call_handlers(self, since_last_heartbeat): + """This method is called in the ioloop thread when a message arrives. + + Subclasses should override this method to handle incoming messages. + It is important to remember that this method is called in the thread + so that some logic must be done to ensure that the application leve + handlers are called in the application thread. + """ + raise NotImplementedError('call_handlers must be defined in a subclass.') + + #----------------------------------------------------------------------------- # Main kernel manager class #----------------------------------------------------------------------------- @@ -475,17 +540,20 @@ class KernelManager(HasTraits): xreq_address = TCPAddress((LOCALHOST, 0)) sub_address = TCPAddress((LOCALHOST, 0)) rep_address = TCPAddress((LOCALHOST, 0)) + hb_address = TCPAddress((LOCALHOST, 0)) # The classes to use for the various channels. xreq_channel_class = Type(XReqSocketChannel) sub_channel_class = Type(SubSocketChannel) rep_channel_class = Type(RepSocketChannel) + hb_channel_class = Type(HBSocketChannel) # Protected traits. _launch_args = Any _xreq_channel = Any _sub_channel = Any _rep_channel = Any + _hb_channel = Any #-------------------------------------------------------------------------- # Channel management methods: @@ -502,6 +570,7 @@ class KernelManager(HasTraits): self.xreq_channel.start() self.sub_channel.start() self.rep_channel.start() + self.hb_channel.start() def stop_channels(self): """Stops the channels for this kernel. @@ -512,13 +581,15 @@ class KernelManager(HasTraits): self.xreq_channel.stop() self.sub_channel.stop() self.rep_channel.stop() + self.hb_channel.stop() @property def channels_running(self): """Are all of the channels created and running?""" return self.xreq_channel.is_alive() \ and self.sub_channel.is_alive() \ - and self.rep_channel.is_alive() + and self.rep_channel.is_alive() \ + and self.hb_channel.is_alive() #-------------------------------------------------------------------------- # Kernel process management methods: @@ -535,8 +606,9 @@ class KernelManager(HasTraits): ipython : bool, optional (default True) Whether to use an IPython kernel instead of a plain Python kernel. """ - xreq, sub, rep = self.xreq_address, self.sub_address, self.rep_address - if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or rep[0] != LOCALHOST: + xreq, sub, rep, hb = self.xreq_address, self.sub_address, \ + self.rep_address, self.hb_address + if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or rep[0] != LOCALHOST or hb[0] != LOCALHOST: raise RuntimeError("Can only launch a kernel on localhost." "Make sure that the '*_address' attributes are " "configured properly.") @@ -546,11 +618,13 @@ class KernelManager(HasTraits): from ipkernel import launch_kernel as launch else: from pykernel import launch_kernel as launch - self.kernel, xrep, pub, req = launch(xrep_port=xreq[1], pub_port=sub[1], - req_port=rep[1], **kw) + self.kernel, xrep, pub, req, hb = launch( + xrep_port=xreq[1], pub_port=sub[1], req_port=rep[1], + hb_port=hb[1], **kw) self.xreq_address = (LOCALHOST, xrep) self.sub_address = (LOCALHOST, pub) self.rep_address = (LOCALHOST, req) + self.hb_address = (LOCALHOST, hb) def restart_kernel(self): """Restarts a kernel with the same arguments that were used to launch @@ -630,3 +704,12 @@ class KernelManager(HasTraits): self.session, self.rep_address) return self._rep_channel + + @property + def hb_channel(self): + """Get the REP socket channel object to handle stdin (raw_input).""" + if self._hb_channel is None: + self._hb_channel = self.hb_channel_class(self.context, + self.session, + self.hb_address) + return self._hb_channel