From ce1ad5399216155b99dfd02bc5723e5c85c5b0da 2013-04-25 17:04:47 From: Brian E. Granger Date: 2013-04-25 17:04:47 Subject: [PATCH] Merge pull request #3011 from minrk/kernelclient IPEP 12: add KernelClient --- diff --git a/IPython/core/interactiveshell.py b/IPython/core/interactiveshell.py index b9459fb..df011e9 100644 --- a/IPython/core/interactiveshell.py +++ b/IPython/core/interactiveshell.py @@ -421,11 +421,11 @@ class InteractiveShell(SingletonConfigurable): def __init__(self, config=None, ipython_dir=None, profile_dir=None, user_module=None, user_ns=None, - custom_exceptions=((), None)): + custom_exceptions=((), None), **kwargs): # This is where traits with a config_key argument are updated # from the values on config. - super(InteractiveShell, self).__init__(config=config) + super(InteractiveShell, self).__init__(config=config, **kwargs) self.configurables = [self] # These are relatively independent and stateless diff --git a/IPython/frontend/consoleapp.py b/IPython/frontend/consoleapp.py index c8b3393..67148f7 100644 --- a/IPython/frontend/consoleapp.py +++ b/IPython/frontend/consoleapp.py @@ -34,8 +34,8 @@ import uuid from IPython.config.application import boolean_flag from IPython.config.configurable import Configurable from IPython.core.profiledir import ProfileDir -from IPython.kernel.blockingkernelmanager import BlockingKernelManager -from IPython.kernel.kernelmanager import KernelManager +from IPython.kernel.blocking import BlockingKernelClient +from IPython.kernel import KernelManager from IPython.kernel import tunnel_to_kernel, find_connection_file, swallow_argv from IPython.utils.path import filefind from IPython.utils.py3compat import str_to_bytes @@ -144,7 +144,8 @@ class IPythonConsoleApp(Configurable): classes = classes flags = Dict(flags) aliases = Dict(aliases) - kernel_manager_class = BlockingKernelManager + kernel_manager_class = KernelManager + kernel_client_class = BlockingKernelClient kernel_argv = List(Unicode) # frontend flags&aliases to be stripped when building kernel_argv @@ -328,6 +329,9 @@ class IPythonConsoleApp(Configurable): def init_kernel_manager(self): # Don't let Qt or ZMQ swallow KeyboardInterupts. + if self.existing: + self.kernel_manager = None + return signal.signal(signal.SIGINT, signal.SIG_DFL) # Create a KernelManager and start a kernel. @@ -339,15 +343,39 @@ class IPythonConsoleApp(Configurable): connection_file=self.connection_file, config=self.config, ) - # start the kernel - if not self.existing: - self.kernel_manager.start_kernel(extra_arguments=self.kernel_argv) - atexit.register(self.kernel_manager.cleanup_ipc_files) - elif self.sshserver: + self.kernel_manager.client_factory = self.kernel_client_class + self.kernel_manager.start_kernel(extra_arguments=self.kernel_argv) + atexit.register(self.kernel_manager.cleanup_ipc_files) + + if self.sshserver: # ssh, write new connection file self.kernel_manager.write_connection_file() + + # in case KM defaults / ssh writing changes things: + km = self.kernel_manager + self.shell_port=km.shell_port + self.iopub_port=km.iopub_port + self.stdin_port=km.stdin_port + self.hb_port=km.hb_port + self.connection_file = km.connection_file + atexit.register(self.kernel_manager.cleanup_connection_file) - self.kernel_manager.start_channels() + + def init_kernel_client(self): + if self.kernel_manager is not None: + self.kernel_client = self.kernel_manager.client() + else: + self.kernel_client = self.kernel_client_class( + shell_port=self.shell_port, + iopub_port=self.iopub_port, + stdin_port=self.stdin_port, + hb_port=self.hb_port, + connection_file=self.connection_file, + config=self.config, + ) + + self.kernel_client.start_channels() + def initialize(self, argv=None): @@ -359,4 +387,5 @@ class IPythonConsoleApp(Configurable): default_secure(self.config) self.init_ssh() self.init_kernel_manager() + self.init_kernel_client() diff --git a/IPython/frontend/html/notebook/handlers.py b/IPython/frontend/html/notebook/handlers.py index d6536f8..9b08859 100644 --- a/IPython/frontend/html/notebook/handlers.py +++ b/IPython/frontend/html/notebook/handlers.py @@ -407,6 +407,9 @@ class ZMQStreamHandler(websocket.WebSocketHandler): return jsonapi.dumps(msg, default=date_default) def _on_zmq_reply(self, msg_list): + # Sometimes this gets triggered when the on_close method is scheduled in the + # eventloop but hasn't been called. + if self.stream.closed(): return try: msg = self._reserialize_reply(msg_list) except Exception: @@ -466,10 +469,7 @@ class AuthenticatedZMQStreamHandler(ZMQStreamHandler): class IOPubHandler(AuthenticatedZMQStreamHandler): def initialize(self, *args, **kwargs): - self._kernel_alive = True - self._beating = False self.iopub_stream = None - self.hb_stream = None def on_first_message(self, msg): try: @@ -478,12 +478,11 @@ class IOPubHandler(AuthenticatedZMQStreamHandler): self.close() return km = self.application.kernel_manager - self.time_to_dead = km.time_to_dead - self.first_beat = km.first_beat kernel_id = self.kernel_id + km.add_restart_callback(kernel_id, self.on_kernel_restarted) + km.add_restart_callback(kernel_id, self.on_restart_failed, 'dead') try: - self.iopub_stream = km.create_iopub_stream(kernel_id) - self.hb_stream = km.create_hb_stream(kernel_id) + self.iopub_stream = km.connect_iopub(kernel_id) except web.HTTPError: # WebSockets don't response to traditional error codes so we # close the connection. @@ -492,81 +491,39 @@ class IOPubHandler(AuthenticatedZMQStreamHandler): self.close() else: self.iopub_stream.on_recv(self._on_zmq_reply) - self.start_hb(self.kernel_died) def on_message(self, msg): pass + def _send_status_message(self, status): + msg = self.session.msg("status", + {'execution_state': status} + ) + self.write_message(jsonapi.dumps(msg, default=date_default)) + + def on_kernel_restarted(self): + logging.warn("kernel %s restarted", self.kernel_id) + self._send_status_message('restarting') + + def on_restart_failed(self): + logging.error("kernel %s restarted failed!", self.kernel_id) + self._send_status_message('dead') + def on_close(self): # This method can be called twice, once by self.kernel_died and once # from the WebSocket close event. If the WebSocket connection is # closed before the ZMQ streams are setup, they could be None. - self.stop_hb() + km = self.application.kernel_manager + if self.kernel_id in km: + km.remove_restart_callback( + self.kernel_id, self.on_kernel_restarted, + ) + km.remove_restart_callback( + self.kernel_id, self.on_restart_failed, 'dead', + ) if self.iopub_stream is not None and not self.iopub_stream.closed(): self.iopub_stream.on_recv(None) self.iopub_stream.close() - if self.hb_stream is not None and not self.hb_stream.closed(): - self.hb_stream.close() - - def start_hb(self, callback): - """Start the heartbeating and call the callback if the kernel dies.""" - if not self._beating: - self._kernel_alive = True - - def ping_or_dead(): - self.hb_stream.flush() - if self._kernel_alive: - self._kernel_alive = False - self.hb_stream.send(b'ping') - # flush stream to force immediate socket send - self.hb_stream.flush() - else: - try: - callback() - except: - pass - finally: - self.stop_hb() - - def beat_received(msg): - self._kernel_alive = True - - self.hb_stream.on_recv(beat_received) - loop = ioloop.IOLoop.instance() - self._hb_periodic_callback = ioloop.PeriodicCallback(ping_or_dead, self.time_to_dead*1000, loop) - loop.add_timeout(time.time()+self.first_beat, self._really_start_hb) - self._beating= True - - def _really_start_hb(self): - """callback for delayed heartbeat start - - Only start the hb loop if we haven't been closed during the wait. - """ - if self._beating and not self.hb_stream.closed(): - self._hb_periodic_callback.start() - - def stop_hb(self): - """Stop the heartbeating and cancel all related callbacks.""" - if self._beating: - self._beating = False - self._hb_periodic_callback.stop() - if not self.hb_stream.closed(): - self.hb_stream.on_recv(None) - - def _delete_kernel_data(self): - """Remove the kernel data and notebook mapping.""" - self.application.kernel_manager.delete_mapping_for_kernel(self.kernel_id) - - def kernel_died(self): - self._delete_kernel_data() - self.application.log.error("Kernel died: %s" % self.kernel_id) - self.write_message( - {'header': {'msg_type': 'status'}, - 'parent_header': {}, - 'content': {'execution_state':'dead'} - } - ) - self.on_close() class ShellHandler(AuthenticatedZMQStreamHandler): @@ -584,7 +541,7 @@ class ShellHandler(AuthenticatedZMQStreamHandler): self.max_msg_size = km.max_msg_size kernel_id = self.kernel_id try: - self.shell_stream = km.create_shell_stream(kernel_id) + self.shell_stream = km.connect_shell(kernel_id) except web.HTTPError: # WebSockets don't response to traditional error codes so we # close the connection. diff --git a/IPython/frontend/html/notebook/kernelmanager.py b/IPython/frontend/html/notebook/kernelmanager.py index d769921..7a5423d 100644 --- a/IPython/frontend/html/notebook/kernelmanager.py +++ b/IPython/frontend/html/notebook/kernelmanager.py @@ -20,8 +20,9 @@ from tornado import web from IPython.kernel.multikernelmanager import MultiKernelManager from IPython.utils.traitlets import ( - Dict, List, Unicode, Float, Integer, + Dict, List, Unicode, Integer, ) + #----------------------------------------------------------------------------- # Classes #----------------------------------------------------------------------------- @@ -30,11 +31,11 @@ from IPython.utils.traitlets import ( class MappingKernelManager(MultiKernelManager): """A KernelManager that handles notebok mapping and HTTP error handling""" + def _kernel_manager_class_default(self): + return "IPython.kernel.ioloop.IOLoopKernelManager" + kernel_argv = List(Unicode) - time_to_dead = Float(3.0, config=True, help="""Kernel heartbeat interval in seconds.""") - first_beat = Float(5.0, config=True, help="Delay (in seconds) before sending first heartbeat.") - max_msg_size = Integer(65536, config=True, help=""" The max raw message size accepted from the browser over a WebSocket connection. @@ -57,11 +58,10 @@ class MappingKernelManager(MultiKernelManager): def notebook_for_kernel(self, kernel_id): """Return the notebook_id for a kernel_id or None.""" - notebook_ids = [k for k, v in self._notebook_mapping.iteritems() if v == kernel_id] - if len(notebook_ids) == 1: - return notebook_ids[0] - else: - return None + for notebook_id, kid in self._notebook_mapping.iteritems(): + if kernel_id == kid: + return notebook_id + return None def delete_mapping_for_kernel(self, kernel_id): """Remove the kernel/notebook mapping for kernel_id.""" @@ -69,8 +69,14 @@ class MappingKernelManager(MultiKernelManager): if notebook_id is not None: del self._notebook_mapping[notebook_id] + def _handle_kernel_died(self, kernel_id): + """notice that a kernel died""" + self.log.warn("Kernel %s died, removing from map.", kernel_id) + self.delete_mapping_for_kernel(kernel_id) + self.remove_kernel(kernel_id, now=True) + def start_kernel(self, notebook_id=None, **kwargs): - """Start a kernel for a notebok an return its kernel_id. + """Start a kernel for a notebook an return its kernel_id. Parameters ---------- @@ -86,46 +92,22 @@ class MappingKernelManager(MultiKernelManager): self.set_kernel_for_notebook(notebook_id, kernel_id) self.log.info("Kernel started: %s" % kernel_id) self.log.debug("Kernel args: %r" % kwargs) + # register callback for failed auto-restart + self.add_restart_callback(kernel_id, + lambda : self._handle_kernel_died(kernel_id), + 'dead', + ) else: self.log.info("Using existing kernel: %s" % kernel_id) + return kernel_id def shutdown_kernel(self, kernel_id, now=False): - """Shutdown a kernel and remove its notebook association.""" - self._check_kernel_id(kernel_id) - super(MappingKernelManager, self).shutdown_kernel( - kernel_id, now=now - ) + """Shutdown a kernel by kernel_id""" + super(MappingKernelManager, self).shutdown_kernel(kernel_id, now=now) self.delete_mapping_for_kernel(kernel_id) - self.log.info("Kernel shutdown: %s" % kernel_id) - - def interrupt_kernel(self, kernel_id): - """Interrupt a kernel.""" - self._check_kernel_id(kernel_id) - super(MappingKernelManager, self).interrupt_kernel(kernel_id) - self.log.info("Kernel interrupted: %s" % kernel_id) - - def restart_kernel(self, kernel_id): - """Restart a kernel while keeping clients connected.""" - self._check_kernel_id(kernel_id) - super(MappingKernelManager, self).restart_kernel(kernel_id) - self.log.info("Kernel restarted: %s" % kernel_id) - - def create_iopub_stream(self, kernel_id): - """Create a new iopub stream.""" - self._check_kernel_id(kernel_id) - return super(MappingKernelManager, self).create_iopub_stream(kernel_id) - - def create_shell_stream(self, kernel_id): - """Create a new shell stream.""" - self._check_kernel_id(kernel_id) - return super(MappingKernelManager, self).create_shell_stream(kernel_id) - - def create_hb_stream(self, kernel_id): - """Create a new hb stream.""" - self._check_kernel_id(kernel_id) - return super(MappingKernelManager, self).create_hb_stream(kernel_id) + # override _check_kernel_id to raise 404 instead of KeyError def _check_kernel_id(self, kernel_id): """Check a that a kernel_id exists and raise 404 if not.""" if kernel_id not in self: diff --git a/IPython/frontend/html/notebook/static/js/kernel.js b/IPython/frontend/html/notebook/static/js/kernel.js index f39839b..ee304db 100644 --- a/IPython/frontend/html/notebook/static/js/kernel.js +++ b/IPython/frontend/html/notebook/static/js/kernel.js @@ -104,8 +104,6 @@ var IPython = (function (IPython) { this.ws_url = json.ws_url; this.kernel_url = this.base_url + "/" + this.kernel_id; this.start_channels(); - this.shell_channel.onmessage = $.proxy(this._handle_shell_reply,this); - this.iopub_channel.onmessage = $.proxy(this._handle_iopub_reply,this); $([IPython.events]).trigger('status_started.Kernel', {kernel: this}); }; @@ -165,6 +163,8 @@ var IPython = (function (IPython) { that.iopub_channel.onclose = ws_closed_late; } }, 1000); + this.shell_channel.onmessage = $.proxy(this._handle_shell_reply,this); + this.iopub_channel.onmessage = $.proxy(this._handle_iopub_reply,this); }; /** @@ -418,6 +418,8 @@ var IPython = (function (IPython) { $([IPython.events]).trigger('status_busy.Kernel', {kernel: this}); } else if (content.execution_state === 'idle') { $([IPython.events]).trigger('status_idle.Kernel', {kernel: this}); + } else if (content.execution_state === 'restarting') { + $([IPython.events]).trigger('status_restarting.Kernel', {kernel: this}); } else if (content.execution_state === 'dead') { this.stop_channels(); $([IPython.events]).trigger('status_dead.Kernel', {kernel: this}); diff --git a/IPython/frontend/html/notebook/static/js/notificationarea.js b/IPython/frontend/html/notebook/static/js/notificationarea.js index 48066b3..b15354b 100644 --- a/IPython/frontend/html/notebook/static/js/notificationarea.js +++ b/IPython/frontend/html/notebook/static/js/notificationarea.js @@ -84,7 +84,7 @@ var IPython = (function (IPython) { $([IPython.events]).on('status_restarting.Kernel',function () { IPython.save_widget.update_document_title(); - knw.set_message("Restarting kernel",1000); + knw.set_message("Restarting kernel", 2000); }); $([IPython.events]).on('status_interrupting.Kernel',function () { @@ -93,9 +93,10 @@ var IPython = (function (IPython) { $([IPython.events]).on('status_dead.Kernel',function () { var dialog = $('
'); - dialog.html('The kernel has died, would you like to restart it?' + - ' If you do not restart the kernel, you will be able to save' + - ' the notebook, but running code will not work until the notebook' + + dialog.html('The kernel has died, and the automatic restart has failed.' + + ' It is possible the kernel cannot be restarted.' + + ' If you are not able to restart the kernel, you will still be able to save' + + ' the notebook, but running code will no longer work until the notebook' + ' is reopened.' ); $(document).append(dialog); @@ -105,7 +106,7 @@ var IPython = (function (IPython) { title: "Dead kernel", close: function(event, ui) {$(this).dialog('destroy').remove();}, buttons : { - "Restart": function () { + "Manual Restart": function () { $([IPython.events]).trigger('status_restarting.Kernel'); IPython.notebook.start_kernel(); $(this).dialog('close'); diff --git a/IPython/frontend/qt/base_frontend_mixin.py b/IPython/frontend/qt/base_frontend_mixin.py index 7897723..d03be9b 100644 --- a/IPython/frontend/qt/base_frontend_mixin.py +++ b/IPython/frontend/qt/base_frontend_mixin.py @@ -12,56 +12,73 @@ class BaseFrontendMixin(object): #--------------------------------------------------------------------------- # 'BaseFrontendMixin' concrete interface #--------------------------------------------------------------------------- - - def _get_kernel_manager(self): - """ Returns the current kernel manager. + _kernel_client = None + _kernel_manager = None + + @property + def kernel_client(self): + """Returns the current kernel client.""" + return self._kernel_client + + @kernel_client.setter + def kernel_client(self, kernel_client): + """Disconnect from the current kernel client (if any) and set a new + kernel client. """ - return self._kernel_manager - - def _set_kernel_manager(self, kernel_manager): - """ Disconnect from the current kernel manager (if any) and set a new - kernel manager. - """ - # Disconnect the old kernel manager, if necessary. - old_manager = self._kernel_manager - if old_manager is not None: - old_manager.started_kernel.disconnect(self._started_kernel) - old_manager.started_channels.disconnect(self._started_channels) - old_manager.stopped_channels.disconnect(self._stopped_channels) - - # Disconnect the old kernel manager's channels. - old_manager.iopub_channel.message_received.disconnect(self._dispatch) - old_manager.shell_channel.message_received.disconnect(self._dispatch) - old_manager.stdin_channel.message_received.disconnect(self._dispatch) - old_manager.hb_channel.kernel_died.disconnect( + # Disconnect the old kernel client, if necessary. + old_client = self._kernel_client + if old_client is not None: + old_client.started_channels.disconnect(self._started_channels) + old_client.stopped_channels.disconnect(self._stopped_channels) + + # Disconnect the old kernel client's channels. + old_client.iopub_channel.message_received.disconnect(self._dispatch) + old_client.shell_channel.message_received.disconnect(self._dispatch) + old_client.stdin_channel.message_received.disconnect(self._dispatch) + old_client.hb_channel.kernel_died.disconnect( self._handle_kernel_died) - # Handle the case where the old kernel manager is still listening. - if old_manager.channels_running: + # Handle the case where the old kernel client is still listening. + if old_client.channels_running: self._stopped_channels() - # Set the new kernel manager. - self._kernel_manager = kernel_manager - if kernel_manager is None: + # Set the new kernel client. + self._kernel_client = kernel_client + if kernel_client is None: return - # Connect the new kernel manager. - kernel_manager.started_kernel.connect(self._started_kernel) - kernel_manager.started_channels.connect(self._started_channels) - kernel_manager.stopped_channels.connect(self._stopped_channels) + # Connect the new kernel client. + kernel_client.started_channels.connect(self._started_channels) + kernel_client.stopped_channels.connect(self._stopped_channels) - # Connect the new kernel manager's channels. - kernel_manager.iopub_channel.message_received.connect(self._dispatch) - kernel_manager.shell_channel.message_received.connect(self._dispatch) - kernel_manager.stdin_channel.message_received.connect(self._dispatch) - kernel_manager.hb_channel.kernel_died.connect(self._handle_kernel_died) + # Connect the new kernel client's channels. + kernel_client.iopub_channel.message_received.connect(self._dispatch) + kernel_client.shell_channel.message_received.connect(self._dispatch) + kernel_client.stdin_channel.message_received.connect(self._dispatch) + # hb_channel + kernel_client.hb_channel.kernel_died.connect(self._handle_kernel_died) - # Handle the case where the kernel manager started channels before + # Handle the case where the kernel client started channels before # we connected. - if kernel_manager.channels_running: + if kernel_client.channels_running: self._started_channels() - kernel_manager = property(_get_kernel_manager, _set_kernel_manager) + @property + def kernel_manager(self): + """The kernel manager, if any""" + return self._kernel_manager + + @kernel_manager.setter + def kernel_manager(self, kernel_manager): + old_man = self._kernel_manager + if old_man is not None: + old_man.kernel_restarted.disconnect(self._handle_kernel_restarted) + + self._kernel_manager = kernel_manager + if kernel_manager is None: + return + + kernel_manager.kernel_restarted.connect(self._handle_kernel_restarted) #--------------------------------------------------------------------------- # 'BaseFrontendMixin' abstract interface @@ -71,8 +88,9 @@ class BaseFrontendMixin(object): """ This is called when the ``kernel_died`` signal is emitted. This method is called when the kernel heartbeat has not been - active for a certain amount of time. The typical action will be to - give the user the option of restarting the kernel. + active for a certain amount of time. + This is a strictly passive notification - + the kernel is likely being restarted by its KernelManager. Parameters ---------- @@ -80,6 +98,17 @@ class BaseFrontendMixin(object): The time since the heartbeat was last received. """ + def _handle_kernel_restarted(self): + """ This is called when the ``kernel_restarted`` signal is emitted. + + This method is called when the kernel has been restarted by the + autorestart mechanism. + + Parameters + ---------- + since_last_heartbeat : float + The time since the heartbeat was last received. + """ def _started_kernel(self): """Called when the KernelManager starts (or restarts) the kernel subprocess. Channels may or may not be running at this point. @@ -112,7 +141,7 @@ class BaseFrontendMixin(object): """ Returns whether a reply from the kernel originated from a request from this frontend. """ - session = self._kernel_manager.session.session + session = self._kernel_client.session.session parent = msg['parent_header'] if not parent: # if the message has no parent, assume it is meant for all frontends diff --git a/IPython/frontend/qt/kernelmanager.py b/IPython/frontend/qt/client.py similarity index 58% rename from IPython/frontend/qt/kernelmanager.py rename to IPython/frontend/qt/client.py index bded7fc..0aa927a 100644 --- a/IPython/frontend/qt/kernelmanager.py +++ b/IPython/frontend/qt/client.py @@ -1,13 +1,18 @@ -""" Defines a KernelManager that provides signals and slots. +""" Defines a KernelClient that provides signals and slots. """ -# Local imports. +# Local imports from IPython.utils.traitlets import Type -from IPython.kernel.kernelmanager import ShellChannel, IOPubChannel, \ - StdInChannel, HBChannel, KernelManager -from base_kernelmanager import QtShellChannelMixin, QtIOPubChannelMixin, \ - QtStdInChannelMixin, QtHBChannelMixin, QtKernelManagerMixin +from IPython.kernel.channels import ( + ShellChannel, IOPubChannel, StdInChannel, HBChannel +) +from IPython.kernel import KernelClient +from .kernel_mixins import ( + QtShellChannelMixin, QtIOPubChannelMixin, + QtStdInChannelMixin, QtHBChannelMixin, + QtKernelClientMixin +) class QtShellChannel(QtShellChannelMixin, ShellChannel): pass @@ -22,8 +27,8 @@ class QtHBChannel(QtHBChannelMixin, HBChannel): pass -class QtKernelManager(QtKernelManagerMixin, KernelManager): - """ A KernelManager that provides signals and slots. +class QtKernelClient(QtKernelClientMixin, KernelClient): + """ A KernelClient that provides signals and slots. """ iopub_channel_class = Type(QtIOPubChannel) diff --git a/IPython/frontend/qt/console/frontend_widget.py b/IPython/frontend/qt/console/frontend_widget.py index 2fe1cfa..d36b628 100644 --- a/IPython/frontend/qt/console/frontend_widget.py +++ b/IPython/frontend/qt/console/frontend_widget.py @@ -148,6 +148,7 @@ class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin): self._highlighter = FrontendHighlighter(self) self._input_splitter = self._input_splitter_class() self._kernel_manager = None + self._kernel_client = None self._request_info = {} self._request_info['execute'] = {}; self._callback_dict = {} @@ -215,7 +216,7 @@ class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin): See parent class :meth:`execute` docstring for full details. """ - msg_id = self.kernel_manager.shell_channel.execute(source, hidden) + msg_id = self.kernel_client.execute(source, hidden) self._request_info['execute'][msg_id] = self._ExecutionRequest(msg_id, 'user') self._hidden = hidden if not hidden: @@ -357,7 +358,7 @@ class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin): # generate uuid, which would be used as an indication of whether or # not the unique request originated from here (can use msg id ?) local_uuid = str(uuid.uuid1()) - msg_id = self.kernel_manager.shell_channel.execute('', + msg_id = self.kernel_client.execute('', silent=True, user_expressions={ local_uuid:expr }) self._callback_dict[local_uuid] = callback self._request_info['execute'][msg_id] = self._ExecutionRequest(msg_id, 'silent_exec_callback') @@ -400,7 +401,7 @@ class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin): if info and info.kind == 'user' and not self._hidden: # Make sure that all output from the SUB channel has been processed # before writing a new prompt. - self.kernel_manager.iopub_channel.flush() + self.kernel_client.iopub_channel.flush() # Reset the ANSI style information to prevent bad text in stdout # from messing up our colors. We're not a true terminal so we're @@ -435,27 +436,39 @@ class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin): # Make sure that all output from the SUB channel has been processed # before entering readline mode. - self.kernel_manager.iopub_channel.flush() + self.kernel_client.iopub_channel.flush() def callback(line): - self.kernel_manager.stdin_channel.input(line) + self.kernel_client.stdin_channel.input(line) if self._reading: self.log.debug("Got second input request, assuming first was interrupted.") self._reading = False self._readline(msg['content']['prompt'], callback=callback) + def _kernel_restarted_message(self, died=True): + msg = "Kernel died, restarting" if died else "Kernel restarting" + self._append_html("
%s

" % msg, + before_prompt=False + ) + def _handle_kernel_died(self, since_last_heartbeat): - """ Handle the kernel's death by asking if the user wants to restart. + """Handle the kernel's death (if we do not own the kernel). """ - self.log.debug("kernel died: %s", since_last_heartbeat) + self.log.warn("kernel died: %s", since_last_heartbeat) if self.custom_restart: self.custom_restart_kernel_died.emit(since_last_heartbeat) else: - message = 'The kernel heartbeat has been inactive for %.2f ' \ - 'seconds. Do you want to restart the kernel? You may ' \ - 'first want to check the network connection.' % \ - since_last_heartbeat - self.restart_kernel(message, now=True) + self._kernel_restarted_message(died=True) + self.reset() + + def _handle_kernel_restarted(self, died=True): + """Notice that the autorestarter restarted the kernel. + + There's nothing to do but show a message. + """ + self.log.warn("kernel restarted") + self._kernel_restarted_message(died=died) + self.reset() def _handle_object_info_reply(self, rep): """ Handle replies for call tips. @@ -505,37 +518,42 @@ class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin): def _handle_shutdown_reply(self, msg): """ Handle shutdown signal, only if from other console. """ - self.log.debug("shutdown: %s", msg.get('content', '')) + self.log.warn("shutdown: %s", msg.get('content', '')) + restart = msg.get('content', {}).get('restart', False) if not self._hidden and not self._is_from_this_session(msg): - if self._local_kernel: - if not msg['content']['restart']: + # got shutdown reply, request came from session other than ours + if restart: + # someone restarted the kernel, handle it + self._handle_kernel_restarted(died=False) + else: + # kernel was shutdown permanently + # this triggers exit_requested if the kernel was local, + # and a dialog if the kernel was remote, + # so we don't suddenly clear the qtconsole without asking. + if self._local_kernel: self.exit_requested.emit(self) else: - # we just got notified of a restart! - time.sleep(0.25) # wait 1/4 sec to reset - # lest the request for a new prompt - # goes to the old kernel - self.reset() - else: # remote kernel, prompt on Kernel shutdown/reset - title = self.window().windowTitle() - if not msg['content']['restart']: + title = self.window().windowTitle() reply = QtGui.QMessageBox.question(self, title, "Kernel has been shutdown permanently. " "Close the Console?", QtGui.QMessageBox.Yes,QtGui.QMessageBox.No) if reply == QtGui.QMessageBox.Yes: self.exit_requested.emit(self) - else: - # XXX: remove message box in favor of using the - # clear_on_kernel_restart setting? - reply = QtGui.QMessageBox.question(self, title, - "Kernel has been reset. Clear the Console?", - QtGui.QMessageBox.Yes,QtGui.QMessageBox.No) - if reply == QtGui.QMessageBox.Yes: - time.sleep(0.25) # wait 1/4 sec to reset - # lest the request for a new prompt - # goes to the old kernel - self.reset() + + def _handle_status(self, msg): + """Handle status message""" + # This is where a busy/idle indicator would be triggered, + # when we make one. + state = msg['content'].get('execution_state', '') + if state == 'starting': + # kernel started while we were running + if self._executing: + self._handle_kernel_restarted(died=True) + elif state == 'idle': + pass + elif state == 'busy': + pass def _started_channels(self): """ Called when the KernelManager channels have started listening or @@ -568,16 +586,15 @@ class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin): if self.custom_interrupt: self._reading = False self.custom_interrupt_requested.emit() - elif self.kernel_manager.has_kernel: + elif self.kernel_manager: self._reading = False self.kernel_manager.interrupt_kernel() else: - self._append_plain_text('Kernel process is either remote or ' - 'unspecified. Cannot interrupt.\n') + self._append_plain_text('Cannot interrupt a kernel I did not start.\n') def reset(self, clear=False): - """ Resets the widget to its initial state if ``clear`` parameter or - ``clear_on_kernel_restart`` configuration setting is True, otherwise + """ Resets the widget to its initial state if ``clear`` parameter + is True, otherwise prints a visual indication of the fact that the kernel restarted, but does not clear the traces from previous usage of the kernel before it was restarted. With ``clear=True``, it is similar to ``%clear``, but @@ -589,15 +606,9 @@ class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin): self._reading = False self._highlighter.highlighting_on = False - if self.clear_on_kernel_restart or clear: + if clear: self._control.clear() self._append_plain_text(self.banner) - else: - self._append_plain_text("# restarting kernel...") - self._append_html("

") - # XXX: Reprinting the full banner may be too much, but once #1680 is - # addressed, that will mitigate it. - #self._append_plain_text(self.banner) # update output marker for stdout/stderr, so that startup # messages appear after banner: self._append_before_prompt_pos = self._get_cursor().position() @@ -614,10 +625,11 @@ class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin): if self.custom_restart: self.custom_restart_requested.emit() + return - elif self.kernel_manager.has_kernel: + if self.kernel_manager: # Pause the heart beat channel to prevent further warnings. - self.kernel_manager.hb_channel.pause() + self.kernel_client.hb_channel.pause() # Prompt the user to restart the kernel. Un-pause the heartbeat if # they decline. (If they accept, the heartbeat will be un-paused @@ -634,21 +646,23 @@ class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin): if do_restart: try: self.kernel_manager.restart_kernel(now=now) - except RuntimeError: - self._append_plain_text('Kernel started externally. ' - 'Cannot restart.\n', - before_prompt=True - ) + except RuntimeError as e: + self._append_plain_text( + 'Error restarting kernel: %s\n' % e, + before_prompt=True + ) else: - self.reset() + self._append_html("
Restarting kernel...\n

", + before_prompt=True, + ) else: - self.kernel_manager.hb_channel.unpause() + self.kernel_client.hb_channel.unpause() else: - self._append_plain_text('Kernel process is either remote or ' - 'unspecified. Cannot restart.\n', - before_prompt=True - ) + self._append_plain_text( + 'Cannot restart a Kernel I did not start\n', + before_prompt=True + ) #--------------------------------------------------------------------------- # 'FrontendWidget' protected interface @@ -670,7 +684,7 @@ class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin): # Send the metadata request to the kernel name = '.'.join(context) - msg_id = self.kernel_manager.shell_channel.object_info(name) + msg_id = self.kernel_client.object_info(name) pos = self._get_cursor().position() self._request_info['call_tip'] = self._CallTipRequest(msg_id, pos) return True @@ -681,7 +695,7 @@ class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin): context = self._get_context() if context: # Send the completion request to the kernel - msg_id = self.kernel_manager.shell_channel.complete( + msg_id = self.kernel_client.complete( '.'.join(context), # text self._get_input_buffer_cursor_line(), # line self._get_input_buffer_cursor_column(), # cursor_pos diff --git a/IPython/frontend/qt/console/history_console_widget.py b/IPython/frontend/qt/console/history_console_widget.py index 9201c7a..672df05 100644 --- a/IPython/frontend/qt/console/history_console_widget.py +++ b/IPython/frontend/qt/console/history_console_widget.py @@ -224,7 +224,7 @@ class HistoryConsoleWidget(ConsoleWidget): return self._history[-n:] def _request_update_session_history_length(self): - msg_id = self.kernel_manager.shell_channel.execute('', + msg_id = self.kernel_client.shell_channel.execute('', silent=True, user_expressions={ 'hlen':'len(get_ipython().history_manager.input_hist_raw)', diff --git a/IPython/frontend/qt/console/ipython_widget.py b/IPython/frontend/qt/console/ipython_widget.py index dfacd93..a68ec0f 100644 --- a/IPython/frontend/qt/console/ipython_widget.py +++ b/IPython/frontend/qt/console/ipython_widget.py @@ -194,7 +194,7 @@ class IPythonWidget(FrontendWidget): self._retrying_history_request = True # wait out the kernel's queue flush, which is currently timed at 0.1s time.sleep(0.25) - self.kernel_manager.shell_channel.history(hist_access_type='tail',n=1000) + self.kernel_client.shell_channel.history(hist_access_type='tail',n=1000) else: self._retrying_history_request = False return @@ -261,7 +261,7 @@ class IPythonWidget(FrontendWidget): """Reimplemented to make a history request and load %guiref.""" super(IPythonWidget, self)._started_channels() self._load_guiref_magic() - self.kernel_manager.shell_channel.history(hist_access_type='tail', + self.kernel_client.shell_channel.history(hist_access_type='tail', n=1000) def _started_kernel(self): @@ -269,12 +269,12 @@ class IPythonWidget(FrontendWidget): Principally triggered by kernel restart. """ - if self.kernel_manager.shell_channel is not None: + if self.kernel_client.shell_channel is not None: self._load_guiref_magic() def _load_guiref_magic(self): """Load %guiref magic.""" - self.kernel_manager.shell_channel.execute('\n'.join([ + self.kernel_client.shell_channel.execute('\n'.join([ "try:", " _usage", "except:", @@ -330,7 +330,7 @@ class IPythonWidget(FrontendWidget): text = '' # Send the completion request to the kernel - msg_id = self.kernel_manager.shell_channel.complete( + msg_id = self.kernel_client.shell_channel.complete( text, # text self._get_input_buffer_cursor_line(), # line self._get_input_buffer_cursor_column(), # cursor_pos @@ -376,7 +376,7 @@ class IPythonWidget(FrontendWidget): """ # If a number was not specified, make a prompt number request. if number is None: - msg_id = self.kernel_manager.shell_channel.execute('', silent=True) + msg_id = self.kernel_client.shell_channel.execute('', silent=True) info = self._ExecutionRequest(msg_id, 'prompt') self._request_info['execute'][msg_id] = info return diff --git a/IPython/frontend/qt/console/mainwindow.py b/IPython/frontend/qt/console/mainwindow.py index 4778b1b..2d73355 100644 --- a/IPython/frontend/qt/console/mainwindow.py +++ b/IPython/frontend/qt/console/mainwindow.py @@ -176,6 +176,7 @@ class MainWindow(QtGui.QMainWindow): self.update_tab_bar_visibility() return + kernel_client = closing_widget.kernel_client kernel_manager = closing_widget.kernel_manager if keepkernel is None and not closing_widget._confirm_exit: @@ -183,7 +184,7 @@ class MainWindow(QtGui.QMainWindow): # or leave it alone if we don't keepkernel = closing_widget._existing if keepkernel is None: #show prompt - if kernel_manager and kernel_manager.channels_running: + if kernel_client and kernel_client.channels_running: title = self.window().windowTitle() cancel = QtGui.QMessageBox.Cancel okay = QtGui.QMessageBox.Ok @@ -209,17 +210,17 @@ class MainWindow(QtGui.QMainWindow): reply = box.exec_() if reply == 1: # close All for slave in slave_tabs: - background(slave.kernel_manager.stop_channels) + background(slave.kernel_client.stop_channels) self.tab_widget.removeTab(self.tab_widget.indexOf(slave)) closing_widget.execute("exit") self.tab_widget.removeTab(current_tab) - background(kernel_manager.stop_channels) + background(kernel_client.stop_channels) elif reply == 0: # close Console if not closing_widget._existing: # Have kernel: don't quit, just close the tab closing_widget.execute("exit True") self.tab_widget.removeTab(current_tab) - background(kernel_manager.stop_channels) + background(kernel_client.stop_channels) else: reply = QtGui.QMessageBox.question(self, title, "Are you sure you want to close this Console?"+ @@ -231,15 +232,16 @@ class MainWindow(QtGui.QMainWindow): self.tab_widget.removeTab(current_tab) elif keepkernel: #close console but leave kernel running (no prompt) self.tab_widget.removeTab(current_tab) - background(kernel_manager.stop_channels) + background(kernel_client.stop_channels) else: #close console and kernel (no prompt) self.tab_widget.removeTab(current_tab) - if kernel_manager and kernel_manager.channels_running: + if kernel_client and kernel_client.channels_running: for slave in slave_tabs: - background(slave.kernel_manager.stop_channels) + background(slave.kernel_client.stop_channels) self.tab_widget.removeTab(self.tab_widget.indexOf(slave)) - kernel_manager.shutdown_kernel() - background(kernel_manager.stop_channels) + if kernel_manager: + kernel_manager.shutdown_kernel() + background(kernel_client.stop_channels) self.update_tab_bar_visibility() @@ -284,7 +286,7 @@ class MainWindow(QtGui.QMainWindow): #convert from/to int/richIpythonWidget if needed if isinstance(tab, int): tab = self.tab_widget.widget(tab) - km=tab.kernel_manager + km=tab.kernel_client #build list of all widgets widget_list = [self.tab_widget.widget(i) for i in range(self.tab_widget.count())] @@ -292,7 +294,7 @@ class MainWindow(QtGui.QMainWindow): # widget that are candidate to be the owner of the kernel does have all the same port of the curent widget # And should have a _may_close attribute filtered_widget_list = [ widget for widget in widget_list if - widget.kernel_manager.connection_file == km.connection_file and + widget.kernel_client.connection_file == km.connection_file and hasattr(widget,'_may_close') ] # the master widget is the one that may close the kernel master_widget= [ widget for widget in filtered_widget_list if widget._may_close] @@ -315,14 +317,14 @@ class MainWindow(QtGui.QMainWindow): #convert from/to int/richIpythonWidget if needed if isinstance(tab, int): tab = self.tab_widget.widget(tab) - km=tab.kernel_manager + km=tab.kernel_client #build list of all widgets widget_list = [self.tab_widget.widget(i) for i in range(self.tab_widget.count())] # widget that are candidate not to be the owner of the kernel does have all the same port of the curent widget filtered_widget_list = ( widget for widget in widget_list if - widget.kernel_manager.connection_file == km.connection_file) + widget.kernel_client.connection_file == km.connection_file) # Get a list of all widget owning the same kernel and removed it from # the previous cadidate. (better using sets ?) master_widget_list = self.find_master_tab(tab, as_list=True) diff --git a/IPython/frontend/qt/console/qtconsoleapp.py b/IPython/frontend/qt/console/qtconsoleapp.py index a7b219b..3aba384 100644 --- a/IPython/frontend/qt/console/qtconsoleapp.py +++ b/IPython/frontend/qt/console/qtconsoleapp.py @@ -20,11 +20,9 @@ Authors: #----------------------------------------------------------------------------- # stdlib imports -import json import os import signal import sys -import uuid # If run on Windows, install an exception hook which pops up a # message box. Pythonw.exe hides the console, so without this @@ -59,21 +57,17 @@ from IPython.external.qt import QtCore, QtGui from IPython.config.application import boolean_flag, catch_config_error from IPython.core.application import BaseIPythonApplication from IPython.core.profiledir import ProfileDir -from IPython.frontend.qt.console.frontend_widget import FrontendWidget from IPython.frontend.qt.console.ipython_widget import IPythonWidget from IPython.frontend.qt.console.rich_ipython_widget import RichIPythonWidget from IPython.frontend.qt.console import styles from IPython.frontend.qt.console.mainwindow import MainWindow -from IPython.frontend.qt.kernelmanager import QtKernelManager +from IPython.frontend.qt.client import QtKernelClient +from IPython.frontend.qt.manager import QtKernelManager from IPython.kernel import tunnel_to_kernel, find_connection_file -from IPython.utils.path import filefind -from IPython.utils.py3compat import str_to_bytes from IPython.utils.traitlets import ( - Dict, List, Unicode, Integer, CaselessStrEnum, CBool, Any + Dict, List, Unicode, CBool, Any ) -from IPython.kernel.zmq.kernelapp import IPKernelApp -from IPython.kernel.zmq.session import Session, default_secure -from IPython.kernel.zmq.zmqshell import ZMQInteractiveShell +from IPython.kernel.zmq.session import default_secure from IPython.frontend.consoleapp import ( IPythonConsoleApp, app_aliases, app_flags, flags, aliases @@ -166,6 +160,7 @@ class IPythonQtConsoleApp(BaseIPythonApplication, IPythonConsoleApp): aliases = Dict(aliases) frontend_flags = Any(qt_flags) frontend_aliases = Any(qt_aliases) + kernel_client_class = QtKernelClient kernel_manager_class = QtKernelManager stylesheet = Unicode('', config=True, @@ -196,16 +191,20 @@ class IPythonQtConsoleApp(BaseIPythonApplication, IPythonConsoleApp): kernel_manager = self.kernel_manager_class( connection_file=self._new_connection_file(), config=self.config, + autorestart=True, ) # start the kernel kwargs = dict() kwargs['extra_arguments'] = self.kernel_argv kernel_manager.start_kernel(**kwargs) - kernel_manager.start_channels() + kernel_manager.client_factory = self.kernel_client_class + kernel_client = kernel_manager.client() + kernel_client.start_channels(shell=True, iopub=True) widget = self.widget_factory(config=self.config, local_kernel=True) self.init_colors(widget) widget.kernel_manager = kernel_manager + widget.kernel_client = kernel_client widget._existing = False widget._may_close = True widget._confirm_exit = self.confirm_exit @@ -219,24 +218,28 @@ class IPythonQtConsoleApp(BaseIPythonApplication, IPythonConsoleApp): current_widget : IPythonWidget The IPythonWidget whose kernel this frontend is to share """ - kernel_manager = self.kernel_manager_class( - connection_file=current_widget.kernel_manager.connection_file, + kernel_client = self.kernel_client_class( + connection_file=current_widget.kernel_client.connection_file, config = self.config, ) - kernel_manager.load_connection_file() - kernel_manager.start_channels() + kernel_client.load_connection_file() + kernel_client.start_channels() widget = self.widget_factory(config=self.config, local_kernel=False) self.init_colors(widget) widget._existing = True widget._may_close = False widget._confirm_exit = False - widget.kernel_manager = kernel_manager + widget.kernel_client = kernel_client + widget.kernel_manager = current_widget.kernel_manager return widget + def init_qt_app(self): + # separate from qt_elements, because it must run first + self.app = QtGui.QApplication([]) + def init_qt_elements(self): # Create the widget. - self.app = QtGui.QApplication([]) base_path = os.path.abspath(os.path.dirname(__file__)) icon_path = os.path.join(base_path, 'resources', 'icon', 'IPythonConsole.svg') @@ -256,6 +259,7 @@ class IPythonQtConsoleApp(BaseIPythonApplication, IPythonConsoleApp): self.widget._confirm_exit = self.confirm_exit self.widget.kernel_manager = self.kernel_manager + self.widget.kernel_client = self.kernel_client self.window = MainWindow(self.app, confirm_exit=self.confirm_exit, new_frontend_factory=self.new_frontend_master, @@ -342,6 +346,7 @@ class IPythonQtConsoleApp(BaseIPythonApplication, IPythonConsoleApp): @catch_config_error def initialize(self, argv=None): + self.init_qt_app() super(IPythonQtConsoleApp, self).initialize(argv) IPythonConsoleApp.initialize(self,argv) self.init_qt_elements() diff --git a/IPython/frontend/qt/inprocess_kernelmanager.py b/IPython/frontend/qt/inprocess.py similarity index 70% rename from IPython/frontend/qt/inprocess_kernelmanager.py rename to IPython/frontend/qt/inprocess.py index 326a5e1..8de6051 100644 --- a/IPython/frontend/qt/inprocess_kernelmanager.py +++ b/IPython/frontend/qt/inprocess.py @@ -2,12 +2,17 @@ """ # Local imports. -from IPython.kernel.inprocess.kernelmanager import \ - InProcessShellChannel, InProcessIOPubChannel, InProcessStdInChannel, \ - InProcessHBChannel, InProcessKernelManager +from IPython.kernel.inprocess import ( + InProcessShellChannel, InProcessIOPubChannel, InProcessStdInChannel, + InProcessHBChannel, InProcessKernelClient, InProcessKernelManager, +) + from IPython.utils.traitlets import Type -from base_kernelmanager import QtShellChannelMixin, QtIOPubChannelMixin, \ - QtStdInChannelMixin, QtHBChannelMixin, QtKernelManagerMixin +from .kernel_mixins import ( + QtShellChannelMixin, QtIOPubChannelMixin, + QtStdInChannelMixin, QtHBChannelMixin, QtKernelClientMixin, + QtKernelManagerMixin, +) class QtInProcessShellChannel(QtShellChannelMixin, InProcessShellChannel): @@ -22,8 +27,7 @@ class QtInProcessStdInChannel(QtStdInChannelMixin, InProcessStdInChannel): class QtInProcessHBChannel(QtHBChannelMixin, InProcessHBChannel): pass - -class QtInProcessKernelManager(QtKernelManagerMixin, InProcessKernelManager): +class QtInProcessKernelClient(QtKernelClientMixin, InProcessKernelClient): """ An in-process KernelManager with signals and slots. """ @@ -31,3 +35,6 @@ class QtInProcessKernelManager(QtKernelManagerMixin, InProcessKernelManager): shell_channel_class = Type(QtInProcessShellChannel) stdin_channel_class = Type(QtInProcessStdInChannel) hb_channel_class = Type(QtInProcessHBChannel) + +class QtInProcessKernelManager(QtKernelManagerMixin, InProcessKernelManager): + client_class = __module__ + '.QtInProcessKernelClient' diff --git a/IPython/frontend/qt/base_kernelmanager.py b/IPython/frontend/qt/kernel_mixins.py similarity index 78% rename from IPython/frontend/qt/base_kernelmanager.py rename to IPython/frontend/qt/kernel_mixins.py index 0ca589f..e97b7b6 100644 --- a/IPython/frontend/qt/base_kernelmanager.py +++ b/IPython/frontend/qt/kernel_mixins.py @@ -54,20 +54,12 @@ class QtShellChannelMixin(ChannelQObject): # Emitted when any message is received. message_received = QtCore.Signal(object) - # Emitted when a reply has been received for the corresponding request - # type. + # Emitted when a reply has been received for the corresponding request type. execute_reply = QtCore.Signal(object) complete_reply = QtCore.Signal(object) object_info_reply = QtCore.Signal(object) history_reply = QtCore.Signal(object) - # Emitted when the first reply comes back. - first_reply = QtCore.Signal() - - # Used by the first_reply signal logic to determine if a reply is the - # first. - _handlers_called = False - #--------------------------------------------------------------------------- # 'ShellChannel' interface #--------------------------------------------------------------------------- @@ -84,19 +76,6 @@ class QtShellChannelMixin(ChannelQObject): if signal: signal.emit(msg) - if not self._handlers_called: - self.first_reply.emit() - self._handlers_called = True - - #--------------------------------------------------------------------------- - # 'QtShellChannelMixin' interface - #--------------------------------------------------------------------------- - - def reset_first_reply(self): - """ Reset the first_reply signal to fire again on the next reply. - """ - self._handlers_called = False - class QtIOPubChannelMixin(ChannelQObject): @@ -189,19 +168,31 @@ class QtHBChannelMixin(ChannelQObject): self.kernel_died.emit(since_last_heartbeat) +class QtKernelRestarterMixin(HasTraits, SuperQObject): + + __metaclass__ = MetaQObjectHasTraits + _timer = None + + class QtKernelManagerMixin(HasTraits, SuperQObject): - """ A KernelManager that provides signals and slots. + """ A KernelClient that provides signals and slots. """ __metaclass__ = MetaQObjectHasTraits - # Emitted when the kernel manager has started listening. - started_kernel = QtCore.Signal() + kernel_restarted = QtCore.Signal() + - # Emitted when the kernel manager has started listening. +class QtKernelClientMixin(HasTraits, SuperQObject): + """ A KernelClient that provides signals and slots. + """ + + __metaclass__ = MetaQObjectHasTraits + + # Emitted when the kernel client has started listening. started_channels = QtCore.Signal() - # Emitted when the kernel manager has stopped listening. + # Emitted when the kernel client has stopped listening. stopped_channels = QtCore.Signal() # Use Qt-specific channel classes that emit signals. @@ -211,50 +202,19 @@ class QtKernelManagerMixin(HasTraits, SuperQObject): hb_channel_class = Type(QtHBChannelMixin) #--------------------------------------------------------------------------- - # 'KernelManager' interface + # 'KernelClient' interface #--------------------------------------------------------------------------- - #------ Kernel process management ------------------------------------------ - - def start_kernel(self, *args, **kw): - """ Reimplemented for proper heartbeat management. - """ - if self._shell_channel is not None: - self._shell_channel.reset_first_reply() - super(QtKernelManagerMixin, self).start_kernel(*args, **kw) - self.started_kernel.emit() - #------ Channel management ------------------------------------------------- def start_channels(self, *args, **kw): """ Reimplemented to emit signal. """ - super(QtKernelManagerMixin, self).start_channels(*args, **kw) + super(QtKernelClientMixin, self).start_channels(*args, **kw) self.started_channels.emit() def stop_channels(self): """ Reimplemented to emit signal. """ - super(QtKernelManagerMixin, self).stop_channels() + super(QtKernelClientMixin, self).stop_channels() self.stopped_channels.emit() - - @property - def shell_channel(self): - """ Reimplemented for proper heartbeat management. - """ - if self._shell_channel is None: - self._shell_channel = super(QtKernelManagerMixin,self).shell_channel - self._shell_channel.first_reply.connect(self._first_reply) - return self._shell_channel - - #--------------------------------------------------------------------------- - # Protected interface - #--------------------------------------------------------------------------- - - def _first_reply(self): - """ Unpauses the heartbeat channel when the first reply is received on - the execute channel. Note that this will *not* start the heartbeat - channel if it is not already running! - """ - if self._hb_channel is not None: - self._hb_channel.unpause() diff --git a/IPython/frontend/qt/manager.py b/IPython/frontend/qt/manager.py new file mode 100644 index 0000000..e7b877b --- /dev/null +++ b/IPython/frontend/qt/manager.py @@ -0,0 +1,52 @@ +""" Defines a KernelClient that provides signals and slots. +""" + +from IPython.external.qt import QtCore + +# Local imports +from IPython.utils.traitlets import Bool, Instance + +from IPython.kernel import KernelManager +from IPython.kernel.restarter import KernelRestarter + +from .kernel_mixins import QtKernelManagerMixin, QtKernelRestarterMixin + + +class QtKernelRestarter(KernelRestarter, QtKernelRestarterMixin): + + def start(self): + if self._timer is None: + self._timer = QtCore.QTimer() + self._timer.timeout.connect(self.poll) + self._timer.start(self.time_to_dead * 1000) + + def stop(self): + self._timer.stop() + + def poll(self): + super(QtKernelRestarter, self).poll() + + +class QtKernelManager(KernelManager, QtKernelManagerMixin): + """A KernelManager with Qt signals for restart""" + + autorestart = Bool(True, config=True) + + def start_restarter(self): + if self.autorestart and self.has_kernel: + if self._restarter is None: + self._restarter = QtKernelRestarter( + kernel_manager=self, + config=self.config, + log=self.log, + ) + self._restarter.add_callback(self._handle_kernel_restarted) + self._restarter.start() + + def stop_restarter(self): + if self.autorestart: + if self._restarter is not None: + self._restarter.stop() + + def _handle_kernel_restarted(self): + self.kernel_restarted.emit() diff --git a/IPython/frontend/terminal/console/app.py b/IPython/frontend/terminal/console/app.py index 7606ef9..bc60879 100644 --- a/IPython/frontend/terminal/console/app.py +++ b/IPython/frontend/terminal/console/app.py @@ -114,7 +114,10 @@ class ZMQTerminalIPythonApp(TerminalIPythonApp, IPythonConsoleApp): signal.signal(signal.SIGINT, self.handle_sigint) self.shell = ZMQTerminalInteractiveShell.instance(config=self.config, display_banner=False, profile_dir=self.profile_dir, - ipython_dir=self.ipython_dir, kernel_manager=self.kernel_manager) + ipython_dir=self.ipython_dir, + manager=self.kernel_manager, + client=self.kernel_client, + ) def init_gui_pylab(self): # no-op, because we don't want to import matplotlib in the frontend. @@ -122,7 +125,7 @@ class ZMQTerminalIPythonApp(TerminalIPythonApp, IPythonConsoleApp): def handle_sigint(self, *args): if self.shell._executing: - if self.kernel_manager.has_kernel: + if self.kernel_manager: # interrupt already gets passed to subprocess by signal handler. # Only if we prevent that should we need to explicitly call # interrupt_kernel, until which time, this would result in a diff --git a/IPython/frontend/terminal/console/completer.py b/IPython/frontend/terminal/console/completer.py index 817ee18..ca76e21 100644 --- a/IPython/frontend/terminal/console/completer.py +++ b/IPython/frontend/terminal/console/completer.py @@ -9,9 +9,9 @@ class ZMQCompleter(object): state=0,1,2,... When state=0 it should compute ALL the completion matches, and then return them for each value of state.""" - def __init__(self, shell, km): + def __init__(self, shell, client): self.shell = shell - self.km = km + self.client = client self.matches = [] def complete_request(self,text): @@ -20,10 +20,10 @@ class ZMQCompleter(object): # send completion request to kernel # Give the kernel up to 0.5s to respond - msg_id = self.km.shell_channel.complete(text=text, line=line, + msg_id = self.client.shell_channel.complete(text=text, line=line, cursor_pos=cursor_pos) - msg = self.km.shell_channel.get_msg(timeout=0.5) + msg = self.client.shell_channel.get_msg(timeout=0.5) if msg['parent_header']['msg_id'] == msg_id: return msg["content"]["matches"] return [] diff --git a/IPython/frontend/terminal/console/interactiveshell.py b/IPython/frontend/terminal/console/interactiveshell.py index 743537a..c723fcb 100644 --- a/IPython/frontend/terminal/console/interactiveshell.py +++ b/IPython/frontend/terminal/console/interactiveshell.py @@ -1,12 +1,9 @@ # -*- coding: utf-8 -*- -"""Frontend of ipython working with python-zmq +"""terminal client to the IPython kernel -Ipython's frontend, is a ipython interface that send request to kernel and proccess the kernel's outputs. - -For more details, see the ipython-zmq design """ #----------------------------------------------------------------------------- -# Copyright (C) 2011 The IPython Development Team +# Copyright (C) 2013 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. @@ -37,7 +34,7 @@ from IPython.core.alias import AliasManager, AliasError from IPython.core import page from IPython.utils.warn import warn, error, fatal from IPython.utils import io -from IPython.utils.traitlets import List, Enum, Any +from IPython.utils.traitlets import List, Enum, Any, Instance, Unicode from IPython.utils.tempdir import NamedFileInTemporaryDirectory from IPython.frontend.terminal.interactiveshell import TerminalInteractiveShell @@ -105,11 +102,12 @@ class ZMQTerminalInteractiveShell(TerminalInteractiveShell): """ ) - def __init__(self, *args, **kwargs): - self.km = kwargs.pop('kernel_manager') - self.session_id = self.km.session.session - super(ZMQTerminalInteractiveShell, self).__init__(*args, **kwargs) - + manager = Instance('IPython.kernel.KernelManager') + client = Instance('IPython.kernel.KernelClient') + def _client_changed(self, name, old, new): + self.session_id = new.session.session + session_id = Unicode() + def init_completer(self): """Initialize the completion machinery. @@ -121,7 +119,7 @@ class ZMQTerminalInteractiveShell(TerminalInteractiveShell): from IPython.core.completerlib import (module_completer, magic_run_completer, cd_completer) - self.Completer = ZMQCompleter(self, self.km) + self.Completer = ZMQCompleter(self, self.client) self.set_hook('complete_command', module_completer, str_key = 'import') @@ -156,18 +154,18 @@ class ZMQTerminalInteractiveShell(TerminalInteractiveShell): self._executing = True # flush stale replies, which could have been ignored, due to missed heartbeats - while self.km.shell_channel.msg_ready(): - self.km.shell_channel.get_msg() + while self.client.shell_channel.msg_ready(): + self.client.shell_channel.get_msg() # shell_channel.execute takes 'hidden', which is the inverse of store_hist - msg_id = self.km.shell_channel.execute(cell, not store_history) - while not self.km.shell_channel.msg_ready() and self.km.is_alive: + msg_id = self.client.shell_channel.execute(cell, not store_history) + while not self.client.shell_channel.msg_ready() and self.client.is_alive(): try: self.handle_stdin_request(timeout=0.05) except Empty: # display intermediate print statements, etc. self.handle_iopub() pass - if self.km.shell_channel.msg_ready(): + if self.client.shell_channel.msg_ready(): self.handle_execute_reply(msg_id) self._executing = False @@ -176,7 +174,7 @@ class ZMQTerminalInteractiveShell(TerminalInteractiveShell): #----------------- def handle_execute_reply(self, msg_id): - msg = self.km.shell_channel.get_msg() + msg = self.client.shell_channel.get_msg() if msg["parent_header"].get("msg_id", None) == msg_id: self.handle_iopub() @@ -211,8 +209,8 @@ class ZMQTerminalInteractiveShell(TerminalInteractiveShell): sub_msg: message receive from kernel in the sub socket channel capture by kernel manager. """ - while self.km.iopub_channel.msg_ready(): - sub_msg = self.km.iopub_channel.get_msg() + while self.client.iopub_channel.msg_ready(): + sub_msg = self.client.iopub_channel.get_msg() msg_type = sub_msg['header']['msg_type'] parent = sub_msg["parent_header"] if (not parent) or self.session_id == parent['session']: @@ -298,7 +296,7 @@ class ZMQTerminalInteractiveShell(TerminalInteractiveShell): def handle_stdin_request(self, timeout=0.1): """ Method to capture raw_input """ - msg_rep = self.km.stdin_channel.get_msg(timeout=timeout) + msg_rep = self.client.stdin_channel.get_msg(timeout=timeout) # in case any iopub came while we were waiting: self.handle_iopub() if self.session_id == msg_rep["parent_header"].get("session"): @@ -325,8 +323,8 @@ class ZMQTerminalInteractiveShell(TerminalInteractiveShell): # only send stdin reply if there *was not* another request # or execution finished while we were reading. - if not (self.km.stdin_channel.msg_ready() or self.km.shell_channel.msg_ready()): - self.km.stdin_channel.input(raw_data) + if not (self.client.stdin_channel.msg_ready() or self.client.shell_channel.msg_ready()): + self.client.stdin_channel.input(raw_data) def mainloop(self, display_banner=False): while True: @@ -344,10 +342,10 @@ class ZMQTerminalInteractiveShell(TerminalInteractiveShell): def wait_for_kernel(self, timeout=None): """method to wait for a kernel to be ready""" tic = time.time() - self.km.hb_channel.unpause() + self.client.hb_channel.unpause() while True: self.run_cell('1', False) - if self.km.hb_channel.is_beating(): + if self.client.hb_channel.is_beating(): # heart failure was not the reason this returned break else: @@ -389,13 +387,14 @@ class ZMQTerminalInteractiveShell(TerminalInteractiveShell): # ask_exit callback. while not self.exit_now: - if not self.km.is_alive: + if not self.client.is_alive(): # kernel died, prompt for action or exit - action = "restart" if self.km.has_kernel else "wait for restart" + + action = "restart" if self.manager else "wait for restart" ans = self.ask_yes_no("kernel died, %s ([y]/n)?" % action, default='y') if ans: - if self.km.has_kernel: - self.km.restart_kernel(True) + if self.manager: + self.manager.restart_kernel(True) self.wait_for_kernel(3) else: self.exit_now = True diff --git a/IPython/frontend/terminal/console/tests/test_image_handler.py b/IPython/frontend/terminal/console/tests/test_image_handler.py index 5de7195..21f615a 100644 --- a/IPython/frontend/terminal/console/tests/test_image_handler.py +++ b/IPython/frontend/terminal/console/tests/test_image_handler.py @@ -10,7 +10,7 @@ import sys import unittest import base64 -from IPython.kernel.kernelmanager import KernelManager +from IPython.kernel import KernelClient from IPython.frontend.terminal.console.interactiveshell \ import ZMQTerminalInteractiveShell from IPython.utils.tempdir import TemporaryDirectory @@ -26,8 +26,8 @@ SCRIPT_PATH = os.path.join( class ZMQTerminalInteractiveShellTestCase(unittest.TestCase): def setUp(self): - km = KernelManager() - self.shell = ZMQTerminalInteractiveShell(kernel_manager=km) + client = KernelClient() + self.shell = ZMQTerminalInteractiveShell(kernel_client=client) self.raw = b'dummy data' self.mime = 'image/png' self.data = {self.mime: base64.encodestring(self.raw).decode('ascii')} diff --git a/IPython/frontend/terminal/interactiveshell.py b/IPython/frontend/terminal/interactiveshell.py index 19ce96b..d6f40f4 100644 --- a/IPython/frontend/terminal/interactiveshell.py +++ b/IPython/frontend/terminal/interactiveshell.py @@ -368,11 +368,13 @@ class TerminalInteractiveShell(InteractiveShell): def __init__(self, config=None, ipython_dir=None, profile_dir=None, user_ns=None, user_module=None, custom_exceptions=((),None), - usage=None, banner1=None, banner2=None, display_banner=None): + usage=None, banner1=None, banner2=None, display_banner=None, + **kwargs): super(TerminalInteractiveShell, self).__init__( config=config, ipython_dir=ipython_dir, profile_dir=profile_dir, user_ns=user_ns, - user_module=user_module, custom_exceptions=custom_exceptions + user_module=user_module, custom_exceptions=custom_exceptions, + **kwargs ) # use os.system instead of utils.process.system by default, # because piped system doesn't make sense in the Terminal: diff --git a/IPython/kernel/__init__.py b/IPython/kernel/__init__.py index b7b32ff..daa15d1 100644 --- a/IPython/kernel/__init__.py +++ b/IPython/kernel/__init__.py @@ -5,6 +5,7 @@ from . import zmq from .connect import * from .launcher import * -from .kernelmanager import KernelManager -from .blockingkernelmanager import BlockingKernelManager +from .client import KernelClient +from .manager import KernelManager +from .blocking import BlockingKernelClient from .multikernelmanager import MultiKernelManager diff --git a/IPython/kernel/blocking/__init__.py b/IPython/kernel/blocking/__init__.py new file mode 100644 index 0000000..dc38f24 --- /dev/null +++ b/IPython/kernel/blocking/__init__.py @@ -0,0 +1 @@ +from .client import BlockingKernelClient \ No newline at end of file diff --git a/IPython/kernel/blockingkernelmanager.py b/IPython/kernel/blocking/channels.py similarity index 89% rename from IPython/kernel/blockingkernelmanager.py rename to IPython/kernel/blocking/channels.py index c80709d..6eaceda 100644 --- a/IPython/kernel/blockingkernelmanager.py +++ b/IPython/kernel/blocking/channels.py @@ -1,9 +1,9 @@ -""" Implements a fully blocking kernel manager. +"""Blocking channels Useful for test suites and blocking terminal interfaces. """ #----------------------------------------------------------------------------- -# Copyright (C) 2010-2012 The IPython Development Team +# Copyright (C) 2013 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING.txt, distributed as part of this software. @@ -15,8 +15,7 @@ Useful for test suites and blocking terminal interfaces. import Queue -from IPython.utils.traitlets import Type -from .kernelmanager import KernelManager, IOPubChannel, HBChannel, \ +from IPython.kernel.channels import IOPubChannel, HBChannel, \ ShellChannel, StdInChannel #----------------------------------------------------------------------------- @@ -25,14 +24,14 @@ from .kernelmanager import KernelManager, IOPubChannel, HBChannel, \ class BlockingChannelMixin(object): - + def __init__(self, *args, **kwds): super(BlockingChannelMixin, self).__init__(*args, **kwds) self._in_queue = Queue.Queue() - + def call_handlers(self, msg): self._in_queue.put(msg) - + def get_msg(self, block=True, timeout=None): """ Gets a message if there is one that is ready. """ if timeout is None: @@ -40,7 +39,7 @@ class BlockingChannelMixin(object): # behavior, so wait for a week instead timeout = 604800 return self._in_queue.get(block, timeout) - + def get_msgs(self): """ Get all messages that are currently ready. """ msgs = [] @@ -50,7 +49,7 @@ class BlockingChannelMixin(object): except Queue.Empty: break return msgs - + def msg_ready(self): """ Is there a message that has been received? """ return not self._in_queue.empty() @@ -69,7 +68,7 @@ class BlockingStdInChannel(BlockingChannelMixin, StdInChannel): class BlockingHBChannel(HBChannel): - + # This kernel needs quicker monitoring, shorten to 1 sec. # less than 0.5s is unreliable, and will get occasional # false reports of missed beats. @@ -78,13 +77,3 @@ class BlockingHBChannel(HBChannel): def call_handlers(self, since_last_heartbeat): """ Pause beating on missed heartbeat. """ pass - - -class BlockingKernelManager(KernelManager): - - # The classes to use for the various channels. - shell_channel_class = Type(BlockingShellChannel) - iopub_channel_class = Type(BlockingIOPubChannel) - stdin_channel_class = Type(BlockingStdInChannel) - hb_channel_class = Type(BlockingHBChannel) - diff --git a/IPython/kernel/blocking/client.py b/IPython/kernel/blocking/client.py new file mode 100644 index 0000000..971cf0c --- /dev/null +++ b/IPython/kernel/blocking/client.py @@ -0,0 +1,33 @@ +"""Implements a fully blocking kernel client. + +Useful for test suites and blocking terminal interfaces. +""" +#----------------------------------------------------------------------------- +# Copyright (C) 2013 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING.txt, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +from IPython.utils.traitlets import Type +from IPython.kernel.client import KernelClient +from .channels import ( + BlockingIOPubChannel, BlockingHBChannel, + BlockingShellChannel, BlockingStdInChannel +) + +#----------------------------------------------------------------------------- +# Blocking kernel manager +#----------------------------------------------------------------------------- + +class BlockingKernelClient(KernelClient): + + # The classes to use for the various channels + shell_channel_class = Type(BlockingShellChannel) + iopub_channel_class = Type(BlockingIOPubChannel) + stdin_channel_class = Type(BlockingStdInChannel) + hb_channel_class = Type(BlockingHBChannel) diff --git a/IPython/kernel/kernelmanager.py b/IPython/kernel/channels.py similarity index 59% rename from IPython/kernel/kernelmanager.py rename to IPython/kernel/channels.py index 384f183..023a5eb 100644 --- a/IPython/kernel/kernelmanager.py +++ b/IPython/kernel/channels.py @@ -1,11 +1,8 @@ -"""Base classes to manage the interaction with a running kernel. - -TODO -* Create logger to handle debugging and console messages. +"""Base classes to manage a Client's interaction with a running kernel """ #----------------------------------------------------------------------------- -# Copyright (C) 2008-2011 The IPython Development Team +# Copyright (C) 2013 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. @@ -20,15 +17,9 @@ from __future__ import absolute_import # Standard library imports import atexit import errno -import json -from subprocess import Popen -import os -import signal -import sys from threading import Thread import time -# System library imports import zmq # import ZMQError in top-level namespace, to avoid ugly attribute-error messages # during garbage collection of threads at exit: @@ -36,25 +27,11 @@ from zmq import ZMQError from zmq.eventloop import ioloop, zmqstream # Local imports -from IPython.config.configurable import Configurable -from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS -from IPython.utils.traitlets import ( - Any, Instance, Type, Unicode, List, Integer, Bool, CaselessStrEnum -) -from IPython.utils.py3compat import str_to_bytes -from IPython.kernel import ( - write_connection_file, - make_ipkernel_cmd, - launch_kernel, -) -from .zmq.session import Session -from .kernelmanagerabc import ( +from .channelsabc import ( ShellChannelABC, IOPubChannelABC, HBChannelABC, StdInChannelABC, - KernelManagerABC ) - #----------------------------------------------------------------------------- # Constants and exceptions #----------------------------------------------------------------------------- @@ -104,6 +81,7 @@ class ZMQSocketChannel(Thread): stream = None _address = None _exiting = False + proxy_methods = [] def __init__(self, context, session, address): """Create a channel. @@ -129,7 +107,7 @@ class ZMQSocketChannel(Thread): address = "tcp://%s:%i" % address self._address = address atexit.register(self._notice_exit) - + def _notice_exit(self): self._exiting = True @@ -170,11 +148,11 @@ class ZMQSocketChannel(Thread): def _queue_send(self, msg): """Queue a message to be sent from the IOLoop's thread. - + Parameters ---------- msg : message to send - + This is threadsafe, as it uses IOLoop.add_callback to give the loop's thread control of the action. """ @@ -189,7 +167,7 @@ class ZMQSocketChannel(Thread): """ ident,smsg = self.session.feed_identities(msg) self.call_handlers(self.session.unserialize(smsg)) - + class ShellChannel(ZMQSocketChannel): @@ -198,6 +176,14 @@ class ShellChannel(ZMQSocketChannel): command_queue = None # flag for whether execute requests should be allowed to call raw_input: allow_stdin = True + proxy_methods = [ + 'execute', + 'complete', + 'object_info', + 'history', + 'kernel_info', + 'shutdown', + ] def __init__(self, context, session, address): super(ShellChannel, self).__init__(context, session, address) @@ -226,7 +212,7 @@ class ShellChannel(ZMQSocketChannel): Subclasses should override this method to handle incoming messages. It is important to remember that this method is called in the thread - so that some logic must be done to ensure that the application leve + so that some logic must be done to ensure that the application level handlers are called in the application thread. """ raise NotImplementedError('call_handlers must be defined in a subclass.') @@ -261,7 +247,7 @@ class ShellChannel(ZMQSocketChannel): allow_stdin : bool, optional (default self.allow_stdin) Flag for whether the kernel can send stdin requests to frontends. - Some frontends (e.g. the Notebook) do not support stdin requests. + Some frontends (e.g. the Notebook) do not support stdin requests. If raw_input is called from code executed from such a frontend, a StdinNotImplementedError will be raised. @@ -275,8 +261,8 @@ class ShellChannel(ZMQSocketChannel): user_expressions = {} if allow_stdin is None: allow_stdin = self.allow_stdin - - + + # Don't waste network traffic if inputs are invalid if not isinstance(code, basestring): raise ValueError('code %r must be a string' % code) @@ -474,6 +460,7 @@ class StdInChannel(ZMQSocketChannel): """The stdin channel to handle raw_input requests that the kernel makes.""" msg_queue = None + proxy_methods = ['input'] def __init__(self, context, session, address): super(StdInChannel, self).__init__(context, session, address) @@ -543,17 +530,17 @@ class HBChannel(ZMQSocketChannel): self.socket = self.context.socket(zmq.REQ) self.socket.setsockopt(zmq.LINGER, 0) self.socket.connect(self.address) - + self.poller.register(self.socket, zmq.POLLIN) - + def _poll(self, start_time): """poll for heartbeat replies until we reach self.time_to_dead. - + Ignores interrupts, and returns the result of poll(), which will be an empty list if no messages arrived before the timeout, or the event tuple if there is a message to receive. """ - + until_dead = self.time_to_dead - (time.time() - start_time) # ensure poll at least once until_dead = max(until_dead, 1e-3) @@ -584,13 +571,13 @@ class HBChannel(ZMQSocketChannel): self._create_socket() self._running = True self._beating = True - + while self._running: if self._pause: # just sleep, and skip the rest of the loop time.sleep(self.time_to_dead) continue - + since_last_heartbeat = 0.0 # io.rprint('Ping from HB channel') # dbg # no need to catch EFSM here, because the previous event was @@ -651,474 +638,7 @@ class HBChannel(ZMQSocketChannel): raise NotImplementedError('call_handlers must be defined in a subclass.') -#----------------------------------------------------------------------------- -# Main kernel manager class -#----------------------------------------------------------------------------- - -class KernelManager(Configurable): - """Manages a single kernel on this host along with its channels. - - There are four channels associated with each kernel: - - * shell: for request/reply calls to the kernel. - * iopub: for the kernel to publish results to frontends. - * hb: for monitoring the kernel's heartbeat. - * stdin: for frontends to reply to raw_input calls in the kernel. - - The usage of the channels that this class manages is optional. It is - entirely possible to connect to the kernels directly using ZeroMQ - sockets. These channels are useful primarily for talking to a kernel - whose :class:`KernelManager` is in the same process. - - This version manages kernels started using Popen. - """ - # The PyZMQ Context to use for communication with the kernel. - context = Instance(zmq.Context) - def _context_default(self): - return zmq.Context.instance() - - # The Session to use for communication with the kernel. - session = Instance(Session) - def _session_default(self): - return Session(config=self.config) - - # The kernel process with which the KernelManager is communicating. - # generally a Popen instance - kernel = Any() - - kernel_cmd = List(Unicode, config=True, - help="""The Popen Command to launch the kernel. - Override this if you have a custom - """ - ) - def _kernel_cmd_changed(self, name, old, new): - self.ipython_kernel = False - - ipython_kernel = Bool(True) - - - # The addresses for the communication channels. - connection_file = Unicode('') - - transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True) - - ip = Unicode(LOCALHOST, config=True, - help="""Set the kernel\'s IP address [default localhost]. - If the IP address is something other than localhost, then - Consoles on other machines will be able to connect - to the Kernel, so be careful!""" - ) - def _ip_default(self): - if self.transport == 'ipc': - if self.connection_file: - return os.path.splitext(self.connection_file)[0] + '-ipc' - else: - return 'kernel-ipc' - else: - return LOCALHOST - def _ip_changed(self, name, old, new): - if new == '*': - self.ip = '0.0.0.0' - shell_port = Integer(0) - iopub_port = Integer(0) - stdin_port = Integer(0) - hb_port = Integer(0) - - # The classes to use for the various channels. - shell_channel_class = Type(ShellChannel) - iopub_channel_class = Type(IOPubChannel) - stdin_channel_class = Type(StdInChannel) - hb_channel_class = Type(HBChannel) - - # Protected traits. - _launch_args = Any - _shell_channel = Any - _iopub_channel = Any - _stdin_channel = Any - _hb_channel = Any - _connection_file_written=Bool(False) - - def __del__(self): - self.cleanup_connection_file() - - #-------------------------------------------------------------------------- - # Channel management methods: - #-------------------------------------------------------------------------- - - def start_channels(self, shell=True, iopub=True, stdin=True, hb=True): - """Starts the channels for this kernel. - - This will create the channels if they do not exist and then start - them (their activity runs in a thread). If port numbers of 0 are - being used (random ports) then you must first call - :method:`start_kernel`. If the channels have been stopped and you - call this, :class:`RuntimeError` will be raised. - """ - if shell: - self.shell_channel.start() - if iopub: - self.iopub_channel.start() - if stdin: - self.stdin_channel.start() - self.shell_channel.allow_stdin = True - else: - self.shell_channel.allow_stdin = False - if hb: - self.hb_channel.start() - - def stop_channels(self): - """Stops all the running channels for this kernel. - - This stops their event loops and joins their threads. - """ - if self.shell_channel.is_alive(): - self.shell_channel.stop() - if self.iopub_channel.is_alive(): - self.iopub_channel.stop() - if self.stdin_channel.is_alive(): - self.stdin_channel.stop() - if self.hb_channel.is_alive(): - self.hb_channel.stop() - - @property - def channels_running(self): - """Are any of the channels created and running?""" - return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or - self.stdin_channel.is_alive() or self.hb_channel.is_alive()) - - def _make_url(self, port): - """Make a zmq url with a port. - - There are two cases that this handles: - - * tcp: tcp://ip:port - * ipc: ipc://ip-port - """ - if self.transport == 'tcp': - return "tcp://%s:%i" % (self.ip, port) - else: - return "%s://%s-%s" % (self.transport, self.ip, port) - - @property - def shell_channel(self): - """Get the shell channel object for this kernel.""" - if self._shell_channel is None: - self._shell_channel = self.shell_channel_class( - self.context, self.session, self._make_url(self.shell_port) - ) - return self._shell_channel - - @property - def iopub_channel(self): - """Get the iopub channel object for this kernel.""" - if self._iopub_channel is None: - self._iopub_channel = self.iopub_channel_class( - self.context, self.session, self._make_url(self.iopub_port) - ) - return self._iopub_channel - - @property - def stdin_channel(self): - """Get the stdin channel object for this kernel.""" - if self._stdin_channel is None: - self._stdin_channel = self.stdin_channel_class( - self.context, self.session, self._make_url(self.stdin_port) - ) - return self._stdin_channel - - @property - def hb_channel(self): - """Get the hb channel object for this kernel.""" - if self._hb_channel is None: - self._hb_channel = self.hb_channel_class( - self.context, self.session, self._make_url(self.hb_port) - ) - return self._hb_channel - - #-------------------------------------------------------------------------- - # Connection and ipc file management - #-------------------------------------------------------------------------- - - def cleanup_connection_file(self): - """Cleanup connection file *if we wrote it* - - Will not raise if the connection file was already removed somehow. - """ - if self._connection_file_written: - # cleanup connection files on full shutdown of kernel we started - self._connection_file_written = False - try: - os.remove(self.connection_file) - except (IOError, OSError, AttributeError): - pass - - def cleanup_ipc_files(self): - """Cleanup ipc files if we wrote them.""" - if self.transport != 'ipc': - return - for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port): - ipcfile = "%s-%i" % (self.ip, port) - try: - os.remove(ipcfile) - except (IOError, OSError): - pass - - def load_connection_file(self): - """Load connection info from JSON dict in self.connection_file.""" - with open(self.connection_file) as f: - cfg = json.loads(f.read()) - - from pprint import pprint - pprint(cfg) - self.transport = cfg.get('transport', 'tcp') - self.ip = cfg['ip'] - self.shell_port = cfg['shell_port'] - self.stdin_port = cfg['stdin_port'] - self.iopub_port = cfg['iopub_port'] - self.hb_port = cfg['hb_port'] - self.session.key = str_to_bytes(cfg['key']) - - def write_connection_file(self): - """Write connection info to JSON dict in self.connection_file.""" - if self._connection_file_written: - return - self.connection_file,cfg = write_connection_file(self.connection_file, - transport=self.transport, ip=self.ip, key=self.session.key, - stdin_port=self.stdin_port, iopub_port=self.iopub_port, - shell_port=self.shell_port, hb_port=self.hb_port) - # write_connection_file also sets default ports: - self.shell_port = cfg['shell_port'] - self.stdin_port = cfg['stdin_port'] - self.iopub_port = cfg['iopub_port'] - self.hb_port = cfg['hb_port'] - - self._connection_file_written = True - - #-------------------------------------------------------------------------- - # Kernel management - #-------------------------------------------------------------------------- - - def format_kernel_cmd(self, **kw): - """format templated args (e.g. {connection_file})""" - if self.kernel_cmd: - cmd = self.kernel_cmd - else: - cmd = make_ipkernel_cmd( - 'from IPython.kernel.zmq.kernelapp import main; main()', - **kw - ) - ns = dict(connection_file=self.connection_file) - ns.update(self._launch_args) - return [ c.format(**ns) for c in cmd ] - - def _launch_kernel(self, kernel_cmd, **kw): - """actually launch the kernel - - override in a subclass to launch kernel subprocesses differently - """ - return launch_kernel(kernel_cmd, **kw) - - def start_kernel(self, **kw): - """Starts a kernel on this host in a separate process. - - If random ports (port=0) are being used, this method must be called - before the channels are created. - - Parameters: - ----------- - **kw : optional - keyword arguments that are passed down to build the kernel_cmd - and launching the kernel (e.g. Popen kwargs). - """ - if self.transport == 'tcp' and self.ip not in LOCAL_IPS: - raise RuntimeError("Can only launch a kernel on a local interface. " - "Make sure that the '*_address' attributes are " - "configured properly. " - "Currently valid addresses are: %s"%LOCAL_IPS - ) - - # write connection file / get default ports - self.write_connection_file() - - # save kwargs for use in restart - self._launch_args = kw.copy() - # build the Popen cmd - kernel_cmd = self.format_kernel_cmd(**kw) - # launch the kernel subprocess - self.kernel = self._launch_kernel(kernel_cmd, - ipython_kernel=self.ipython_kernel, - **kw) - - def shutdown_kernel(self, now=False, restart=False): - """Attempts to the stop the kernel process cleanly. - - This attempts to shutdown the kernels cleanly by: - - 1. Sending it a shutdown message over the shell channel. - 2. If that fails, the kernel is shutdown forcibly by sending it - a signal. - - Parameters: - ----------- - now : bool - Should the kernel be forcible killed *now*. This skips the - first, nice shutdown attempt. - restart: bool - Will this kernel be restarted after it is shutdown. When this - is True, connection files will not be cleaned up. - """ - # FIXME: Shutdown does not work on Windows due to ZMQ errors! - if sys.platform == 'win32': - self._kill_kernel() - return - - # Pause the heart beat channel if it exists. - if self._hb_channel is not None: - self._hb_channel.pause() - - if now: - if self.has_kernel: - self._kill_kernel() - else: - # Don't send any additional kernel kill messages immediately, to give - # the kernel a chance to properly execute shutdown actions. Wait for at - # most 1s, checking every 0.1s. - self.shell_channel.shutdown(restart=restart) - for i in range(10): - if self.is_alive: - time.sleep(0.1) - else: - break - else: - # OK, we've waited long enough. - if self.has_kernel: - self._kill_kernel() - - if not restart: - self.cleanup_connection_file() - self.cleanup_ipc_files() - else: - self.cleanup_ipc_files() - - def restart_kernel(self, now=False, **kw): - """Restarts a kernel with the arguments that were used to launch it. - - If the old kernel was launched with random ports, the same ports will be - used for the new kernel. The same connection file is used again. - - Parameters - ---------- - now : bool, optional - If True, the kernel is forcefully restarted *immediately*, without - having a chance to do any cleanup action. Otherwise the kernel is - given 1s to clean up before a forceful restart is issued. - - In all cases the kernel is restarted, the only difference is whether - it is given a chance to perform a clean shutdown or not. - - **kw : optional - Any options specified here will overwrite those used to launch the - kernel. - """ - if self._launch_args is None: - raise RuntimeError("Cannot restart the kernel. " - "No previous call to 'start_kernel'.") - else: - # Stop currently running kernel. - self.shutdown_kernel(now=now, restart=True) - - # Start new kernel. - self._launch_args.update(kw) - self.start_kernel(**self._launch_args) - - # FIXME: Messages get dropped in Windows due to probable ZMQ bug - # unless there is some delay here. - if sys.platform == 'win32': - time.sleep(0.2) - - @property - def has_kernel(self): - """Has a kernel been started that we are managing.""" - return self.kernel is not None - - def _kill_kernel(self): - """Kill the running kernel. - - This is a private method, callers should use shutdown_kernel(now=True). - """ - if self.has_kernel: - # Pause the heart beat channel if it exists. - if self._hb_channel is not None: - self._hb_channel.pause() - - # Signal the kernel to terminate (sends SIGKILL on Unix and calls - # TerminateProcess() on Win32). - try: - self.kernel.kill() - except OSError as e: - # In Windows, we will get an Access Denied error if the process - # has already terminated. Ignore it. - if sys.platform == 'win32': - if e.winerror != 5: - raise - # On Unix, we may get an ESRCH error if the process has already - # terminated. Ignore it. - else: - from errno import ESRCH - if e.errno != ESRCH: - raise - - # Block until the kernel terminates. - self.kernel.wait() - self.kernel = None - else: - raise RuntimeError("Cannot kill kernel. No kernel is running!") - - def interrupt_kernel(self): - """Interrupts the kernel by sending it a signal. - - Unlike ``signal_kernel``, this operation is well supported on all - platforms. - """ - if self.has_kernel: - if sys.platform == 'win32': - from .zmq.parentpoller import ParentPollerWindows as Poller - Poller.send_interrupt(self.kernel.win32_interrupt_event) - else: - self.kernel.send_signal(signal.SIGINT) - else: - raise RuntimeError("Cannot interrupt kernel. No kernel is running!") - - def signal_kernel(self, signum): - """Sends a signal to the kernel. - - Note that since only SIGTERM is supported on Windows, this function is - only useful on Unix systems. - """ - if self.has_kernel: - self.kernel.send_signal(signum) - else: - raise RuntimeError("Cannot signal kernel. No kernel is running!") - - @property - def is_alive(self): - """Is the kernel process still running?""" - if self.has_kernel: - if self.kernel.poll() is None: - return True - else: - return False - elif self._hb_channel is not None: - # We didn't start the kernel with this KernelManager so we - # use the heartbeat. - return self._hb_channel.is_beating() - else: - # no heartbeat and not local, we can't tell if it's running, - # so naively return True - return True - - -#----------------------------------------------------------------------------- +#---------------------------------------------------------------------#----------------------------------------------------------------------------- # ABC Registration #----------------------------------------------------------------------------- @@ -1126,5 +646,3 @@ ShellChannelABC.register(ShellChannel) IOPubChannelABC.register(IOPubChannel) HBChannelABC.register(HBChannel) StdInChannelABC.register(StdInChannel) -KernelManagerABC.register(KernelManager) - diff --git a/IPython/kernel/channelsabc.py b/IPython/kernel/channelsabc.py new file mode 100644 index 0000000..82302e7 --- /dev/null +++ b/IPython/kernel/channelsabc.py @@ -0,0 +1,126 @@ +"""Abstract base classes for kernel client channels""" + +#----------------------------------------------------------------------------- +# Copyright (C) 2013 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +# Standard library imports +import abc + +#----------------------------------------------------------------------------- +# Channels +#----------------------------------------------------------------------------- + + +class ChannelABC(object): + """A base class for all channel ABCs.""" + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def start(self): + pass + + @abc.abstractmethod + def stop(self): + pass + + @abc.abstractmethod + def is_alive(self): + pass + + +class ShellChannelABC(ChannelABC): + """ShellChannel ABC. + + The docstrings for this class can be found in the base implementation: + + `IPython.kernel.channels.ShellChannel` + """ + + @abc.abstractproperty + def allow_stdin(self): + pass + + @abc.abstractmethod + def execute(self, code, silent=False, store_history=True, + user_variables=None, user_expressions=None, allow_stdin=None): + pass + + @abc.abstractmethod + def complete(self, text, line, cursor_pos, block=None): + pass + + @abc.abstractmethod + def object_info(self, oname, detail_level=0): + pass + + @abc.abstractmethod + def history(self, raw=True, output=False, hist_access_type='range', **kwargs): + pass + + @abc.abstractmethod + def kernel_info(self): + pass + + @abc.abstractmethod + def shutdown(self, restart=False): + pass + + +class IOPubChannelABC(ChannelABC): + """IOPubChannel ABC. + + The docstrings for this class can be found in the base implementation: + + `IPython.kernel.channels.IOPubChannel` + """ + + @abc.abstractmethod + def flush(self, timeout=1.0): + pass + + +class StdInChannelABC(ChannelABC): + """StdInChannel ABC. + + The docstrings for this class can be found in the base implementation: + + `IPython.kernel.channels.StdInChannel` + """ + + @abc.abstractmethod + def input(self, string): + pass + + +class HBChannelABC(ChannelABC): + """HBChannel ABC. + + The docstrings for this class can be found in the base implementation: + + `IPython.kernel.channels.HBChannel` + """ + + @abc.abstractproperty + def time_to_dead(self): + pass + + @abc.abstractmethod + def pause(self): + pass + + @abc.abstractmethod + def unpause(self): + pass + + @abc.abstractmethod + def is_beating(self): + pass diff --git a/IPython/kernel/client.py b/IPython/kernel/client.py new file mode 100644 index 0000000..1798548 --- /dev/null +++ b/IPython/kernel/client.py @@ -0,0 +1,198 @@ +"""Base class to manage the interaction with a running kernel +""" + +#----------------------------------------------------------------------------- +# Copyright (C) 2013 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +from __future__ import absolute_import + +import zmq + +# Local imports +from IPython.config.configurable import LoggingConfigurable +from IPython.utils.traitlets import ( + Any, Instance, Type, +) + +from .zmq.session import Session +from .channels import ( + ShellChannel, IOPubChannel, + HBChannel, StdInChannel, +) +from .clientabc import KernelClientABC +from .connect import ConnectionFileMixin + + +#----------------------------------------------------------------------------- +# Main kernel client class +#----------------------------------------------------------------------------- + +class KernelClient(LoggingConfigurable, ConnectionFileMixin): + """Communicates with a single kernel on any host via zmq channels. + + There are four channels associated with each kernel: + + * shell: for request/reply calls to the kernel. + * iopub: for the kernel to publish results to frontends. + * hb: for monitoring the kernel's heartbeat. + * stdin: for frontends to reply to raw_input calls in the kernel. + + The methods of the channels are exposed as methods of the client itself + (KernelClient.execute, complete, history, etc.). + See the channels themselves for documentation of these methods. + + """ + + # The PyZMQ Context to use for communication with the kernel. + context = Instance(zmq.Context) + def _context_default(self): + return zmq.Context.instance() + + # The Session to use for communication with the kernel. + session = Instance(Session) + def _session_default(self): + return Session(config=self.config) + + # The classes to use for the various channels + shell_channel_class = Type(ShellChannel) + iopub_channel_class = Type(IOPubChannel) + stdin_channel_class = Type(StdInChannel) + hb_channel_class = Type(HBChannel) + + # Protected traits + _shell_channel = Any + _iopub_channel = Any + _stdin_channel = Any + _hb_channel = Any + + #-------------------------------------------------------------------------- + # Channel proxy methods + #-------------------------------------------------------------------------- + + def _get_msg(channel, *args, **kwargs): + return channel.get_msg(*args, **kwargs) + + def get_shell_msg(self, *args, **kwargs): + """Get a message from the shell channel""" + return self.shell_channel.get_msg(*args, **kwargs) + + def get_iopub_msg(self, *args, **kwargs): + """Get a message from the iopub channel""" + return self.iopub_channel.get_msg(*args, **kwargs) + + def get_stdin_msg(self, *args, **kwargs): + """Get a message from the stdin channel""" + return self.stdin_channel.get_msg(*args, **kwargs) + + #-------------------------------------------------------------------------- + # Channel management methods + #-------------------------------------------------------------------------- + + def start_channels(self, shell=True, iopub=True, stdin=True, hb=True): + """Starts the channels for this kernel. + + This will create the channels if they do not exist and then start + them (their activity runs in a thread). If port numbers of 0 are + being used (random ports) then you must first call + :method:`start_kernel`. If the channels have been stopped and you + call this, :class:`RuntimeError` will be raised. + """ + if shell: + self.shell_channel.start() + for method in self.shell_channel.proxy_methods: + setattr(self, method, getattr(self.shell_channel, method)) + if iopub: + self.iopub_channel.start() + for method in self.iopub_channel.proxy_methods: + setattr(self, method, getattr(self.iopub_channel, method)) + if stdin: + self.stdin_channel.start() + for method in self.stdin_channel.proxy_methods: + setattr(self, method, getattr(self.stdin_channel, method)) + self.shell_channel.allow_stdin = True + else: + self.shell_channel.allow_stdin = False + if hb: + self.hb_channel.start() + + def stop_channels(self): + """Stops all the running channels for this kernel. + + This stops their event loops and joins their threads. + """ + if self.shell_channel.is_alive(): + self.shell_channel.stop() + if self.iopub_channel.is_alive(): + self.iopub_channel.stop() + if self.stdin_channel.is_alive(): + self.stdin_channel.stop() + if self.hb_channel.is_alive(): + self.hb_channel.stop() + + @property + def channels_running(self): + """Are any of the channels created and running?""" + return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or + self.stdin_channel.is_alive() or self.hb_channel.is_alive()) + + @property + def shell_channel(self): + """Get the shell channel object for this kernel.""" + if self._shell_channel is None: + self._shell_channel = self.shell_channel_class( + self.context, self.session, self._make_url('shell') + ) + return self._shell_channel + + @property + def iopub_channel(self): + """Get the iopub channel object for this kernel.""" + if self._iopub_channel is None: + self._iopub_channel = self.iopub_channel_class( + self.context, self.session, self._make_url('iopub') + ) + return self._iopub_channel + + @property + def stdin_channel(self): + """Get the stdin channel object for this kernel.""" + if self._stdin_channel is None: + self._stdin_channel = self.stdin_channel_class( + self.context, self.session, self._make_url('stdin') + ) + return self._stdin_channel + + @property + def hb_channel(self): + """Get the hb channel object for this kernel.""" + if self._hb_channel is None: + self._hb_channel = self.hb_channel_class( + self.context, self.session, self._make_url('hb') + ) + return self._hb_channel + + def is_alive(self): + """Is the kernel process still running?""" + if self._hb_channel is not None: + # We didn't start the kernel with this KernelManager so we + # use the heartbeat. + return self._hb_channel.is_beating() + else: + # no heartbeat and not local, we can't tell if it's running, + # so naively return True + return True + + +#----------------------------------------------------------------------------- +# ABC Registration +#----------------------------------------------------------------------------- + +KernelClientABC.register(KernelClient) diff --git a/IPython/kernel/clientabc.py b/IPython/kernel/clientabc.py new file mode 100644 index 0000000..a4c9580 --- /dev/null +++ b/IPython/kernel/clientabc.py @@ -0,0 +1,81 @@ +"""Abstract base class for kernel clients""" + +#----------------------------------------------------------------------------- +# Copyright (C) 2013 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +# Standard library imports +import abc + +#----------------------------------------------------------------------------- +# Main kernel client class +#----------------------------------------------------------------------------- + +class KernelClientABC(object): + """KernelManager ABC. + + The docstrings for this class can be found in the base implementation: + + `IPython.kernel.client.KernelClient` + """ + + __metaclass__ = abc.ABCMeta + + @abc.abstractproperty + def kernel(self): + pass + + @abc.abstractproperty + def shell_channel_class(self): + pass + + @abc.abstractproperty + def iopub_channel_class(self): + pass + + @abc.abstractproperty + def hb_channel_class(self): + pass + + @abc.abstractproperty + def stdin_channel_class(self): + pass + + #-------------------------------------------------------------------------- + # Channel management methods + #-------------------------------------------------------------------------- + + @abc.abstractmethod + def start_channels(self, shell=True, iopub=True, stdin=True, hb=True): + pass + + @abc.abstractmethod + def stop_channels(self): + pass + + @abc.abstractproperty + def channels_running(self): + pass + + @abc.abstractproperty + def shell_channel(self): + pass + + @abc.abstractproperty + def iopub_channel(self): + pass + + @abc.abstractproperty + def stdin_channel(self): + pass + + @abc.abstractproperty + def hb_channel(self): + pass diff --git a/IPython/kernel/connect.py b/IPython/kernel/connect.py index e895948..0a431df 100644 --- a/IPython/kernel/connect.py +++ b/IPython/kernel/connect.py @@ -17,6 +17,8 @@ Authors: # Imports #----------------------------------------------------------------------------- +from __future__ import absolute_import + import glob import json import os @@ -26,14 +28,21 @@ from getpass import getpass from subprocess import Popen, PIPE import tempfile +import zmq + # external imports from IPython.external.ssh import tunnel # IPython imports +# from IPython.config import Configurable from IPython.core.profiledir import ProfileDir from IPython.utils.localinterfaces import LOCALHOST from IPython.utils.path import filefind, get_ipython_dir from IPython.utils.py3compat import str_to_bytes, bytes_to_str +from IPython.utils.traitlets import ( + Bool, Integer, Unicode, CaselessStrEnum, + HasTraits, +) #----------------------------------------------------------------------------- @@ -41,7 +50,7 @@ from IPython.utils.py3compat import str_to_bytes, bytes_to_str #----------------------------------------------------------------------------- def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, hb_port=0, - ip=LOCALHOST, key=b'', transport='tcp'): + control_port=0, ip=LOCALHOST, key=b'', transport='tcp'): """Generates a JSON config file, including the selection of random ports. Parameters @@ -51,16 +60,19 @@ def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, The path to the file to write shell_port : int, optional - The port to use for ROUTER channel. + The port to use for ROUTER (shell) channel. iopub_port : int, optional The port to use for the SUB channel. stdin_port : int, optional - The port to use for the REQ (raw input) channel. + The port to use for the ROUTER (raw input) channel. + + control_port : int, optional + The port to use for the ROUTER (control) channel. hb_port : int, optional - The port to use for the hearbeat REP channel. + The port to use for the heartbeat REP channel. ip : str, optional The ip address the kernel will bind to. @@ -76,8 +88,11 @@ def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, # Find open ports as necessary. ports = [] - ports_needed = int(shell_port <= 0) + int(iopub_port <= 0) + \ - int(stdin_port <= 0) + int(hb_port <= 0) + ports_needed = int(shell_port <= 0) + \ + int(iopub_port <= 0) + \ + int(stdin_port <= 0) + \ + int(control_port <= 0) + \ + int(hb_port <= 0) if transport == 'tcp': for i in range(ports_needed): sock = socket.socket() @@ -100,12 +115,15 @@ def write_connection_file(fname=None, shell_port=0, iopub_port=0, stdin_port=0, iopub_port = ports.pop(0) if stdin_port <= 0: stdin_port = ports.pop(0) + if control_port <= 0: + control_port = ports.pop(0) if hb_port <= 0: hb_port = ports.pop(0) cfg = dict( shell_port=shell_port, iopub_port=iopub_port, stdin_port=stdin_port, + control_port=control_port, hb_port=hb_port, ) cfg['ip'] = ip @@ -286,7 +304,9 @@ def connect_qtconsole(connection_file=None, argv=None, profile=None): "qtconsoleapp.main()" ]) - return Popen([sys.executable, '-c', cmd, '--existing', cf] + argv, stdout=PIPE, stderr=PIPE) + return Popen([sys.executable, '-c', cmd, '--existing', cf] + argv, + stdout=PIPE, stderr=PIPE, close_fds=True, + ) def tunnel_to_kernel(connection_info, sshserver, sshkey=None): @@ -337,6 +357,179 @@ def tunnel_to_kernel(connection_info, sshserver, sshkey=None): return tuple(lports) + +#----------------------------------------------------------------------------- +# Mixin for classes that work with connection files +#----------------------------------------------------------------------------- + +channel_socket_types = { + 'hb' : zmq.REQ, + 'shell' : zmq.DEALER, + 'iopub' : zmq.SUB, + 'stdin' : zmq.DEALER, + 'control': zmq.DEALER, +} + +port_names = [ "%s_port" % channel for channel in ('shell', 'stdin', 'iopub', 'hb', 'control')] + +class ConnectionFileMixin(HasTraits): + """Mixin for configurable classes that work with connection files""" + + # The addresses for the communication channels + connection_file = Unicode('') + _connection_file_written = Bool(False) + + transport = CaselessStrEnum(['tcp', 'ipc'], default_value='tcp', config=True) + + ip = Unicode(LOCALHOST, config=True, + help="""Set the kernel\'s IP address [default localhost]. + If the IP address is something other than localhost, then + Consoles on other machines will be able to connect + to the Kernel, so be careful!""" + ) + + def _ip_default(self): + if self.transport == 'ipc': + if self.connection_file: + return os.path.splitext(self.connection_file)[0] + '-ipc' + else: + return 'kernel-ipc' + else: + return LOCALHOST + + def _ip_changed(self, name, old, new): + if new == '*': + self.ip = '0.0.0.0' + + # protected traits + + shell_port = Integer(0) + iopub_port = Integer(0) + stdin_port = Integer(0) + control_port = Integer(0) + hb_port = Integer(0) + + @property + def ports(self): + return [ getattr(self, name) for name in port_names ] + + #-------------------------------------------------------------------------- + # Connection and ipc file management + #-------------------------------------------------------------------------- + + def get_connection_info(self): + """return the connection info as a dict""" + return dict( + transport=self.transport, + ip=self.ip, + shell_port=self.shell_port, + iopub_port=self.iopub_port, + stdin_port=self.stdin_port, + hb_port=self.hb_port, + control_port=self.control_port, + ) + + def cleanup_connection_file(self): + """Cleanup connection file *if we wrote it* + + Will not raise if the connection file was already removed somehow. + """ + if self._connection_file_written: + # cleanup connection files on full shutdown of kernel we started + self._connection_file_written = False + try: + os.remove(self.connection_file) + except (IOError, OSError, AttributeError): + pass + + def cleanup_ipc_files(self): + """Cleanup ipc files if we wrote them.""" + if self.transport != 'ipc': + return + for port in self.ports: + ipcfile = "%s-%i" % (self.ip, port) + try: + os.remove(ipcfile) + except (IOError, OSError): + pass + + def write_connection_file(self): + """Write connection info to JSON dict in self.connection_file.""" + if self._connection_file_written: + return + + self.connection_file, cfg = write_connection_file(self.connection_file, + transport=self.transport, ip=self.ip, key=self.session.key, + stdin_port=self.stdin_port, iopub_port=self.iopub_port, + shell_port=self.shell_port, hb_port=self.hb_port, + control_port=self.control_port, + ) + # write_connection_file also sets default ports: + for name in port_names: + setattr(self, name, cfg[name]) + + self._connection_file_written = True + + def load_connection_file(self): + """Load connection info from JSON dict in self.connection_file.""" + with open(self.connection_file) as f: + cfg = json.loads(f.read()) + + self.transport = cfg.get('transport', 'tcp') + self.ip = cfg['ip'] + for name in port_names: + setattr(self, name, cfg[name]) + self.session.key = str_to_bytes(cfg['key']) + + #-------------------------------------------------------------------------- + # Creating connected sockets + #-------------------------------------------------------------------------- + + def _make_url(self, channel): + """Make a ZeroMQ URL for a given channel.""" + transport = self.transport + ip = self.ip + port = getattr(self, '%s_port' % channel) + + if transport == 'tcp': + return "tcp://%s:%i" % (ip, port) + else: + return "%s://%s-%s" % (transport, ip, port) + + def _create_connected_socket(self, channel, identity=None): + """Create a zmq Socket and connect it to the kernel.""" + url = self._make_url(channel) + socket_type = channel_socket_types[channel] + self.log.info("Connecting to: %s" % url) + sock = self.context.socket(socket_type) + if identity: + sock.identity = identity + sock.connect(url) + return sock + + def connect_iopub(self, identity=None): + """return zmq Socket connected to the IOPub channel""" + sock = self._create_connected_socket('iopub', identity=identity) + sock.setsockopt(zmq.SUBSCRIBE, b'') + return sock + + def connect_shell(self, identity=None): + """return zmq Socket connected to the Shell channel""" + return self._create_connected_socket('shell', identity=identity) + + def connect_stdin(self, identity=None): + """return zmq Socket connected to the StdIn channel""" + return self._create_connected_socket('stdin', identity=identity) + + def connect_hb(self, identity=None): + """return zmq Socket connected to the Heartbeat channel""" + return self._create_connected_socket('hb', identity=identity) + + def connect_control(self, identity=None): + """return zmq Socket connected to the Heartbeat channel""" + return self._create_connected_socket('control', identity=identity) + + __all__ = [ 'write_connection_file', 'get_connection_file', @@ -344,4 +537,4 @@ __all__ = [ 'get_connection_info', 'connect_qtconsole', 'tunnel_to_kernel', -] \ No newline at end of file +] diff --git a/IPython/kernel/inprocess/__init__.py b/IPython/kernel/inprocess/__init__.py index e69de29..6070a7c 100644 --- a/IPython/kernel/inprocess/__init__.py +++ b/IPython/kernel/inprocess/__init__.py @@ -0,0 +1,10 @@ +from .channels import ( + InProcessShellChannel, + InProcessIOPubChannel, + InProcessStdInChannel, + InProcessHBChannel, +) + +from .client import InProcessKernelClient +from .manager import InProcessKernelManager +from .blocking import BlockingInProcessKernelClient diff --git a/IPython/kernel/inprocess/blockingkernelmanager.py b/IPython/kernel/inprocess/blocking.py similarity index 78% rename from IPython/kernel/inprocess/blockingkernelmanager.py rename to IPython/kernel/inprocess/blocking.py index e001073..91042c9 100644 --- a/IPython/kernel/inprocess/blockingkernelmanager.py +++ b/IPython/kernel/inprocess/blocking.py @@ -1,4 +1,4 @@ -""" Implements a fully blocking kernel manager. +""" Implements a fully blocking kernel client. Useful for test suites and blocking terminal interfaces. """ @@ -12,15 +12,19 @@ Useful for test suites and blocking terminal interfaces. #----------------------------------------------------------------------------- # Imports #----------------------------------------------------------------------------- -from __future__ import print_function -# Local imports. +# IPython imports from IPython.utils.io import raw_print from IPython.utils.traitlets import Type -from kernelmanager import InProcessKernelManager, InProcessShellChannel, \ - InProcessIOPubChannel, InProcessStdInChannel -from IPython.kernel.blockingkernelmanager import BlockingChannelMixin +from IPython.kernel.blocking.channels import BlockingChannelMixin +# Local imports +from .channels import ( + InProcessShellChannel, + InProcessIOPubChannel, + InProcessStdInChannel, +) +from .client import InProcessKernelClient #----------------------------------------------------------------------------- # Blocking kernel manager @@ -33,7 +37,7 @@ class BlockingInProcessIOPubChannel(BlockingChannelMixin, InProcessIOPubChannel) pass class BlockingInProcessStdInChannel(BlockingChannelMixin, InProcessStdInChannel): - + def call_handlers(self, msg): """ Overridden for the in-process channel. @@ -41,12 +45,12 @@ class BlockingInProcessStdInChannel(BlockingChannelMixin, InProcessStdInChannel) """ msg_type = msg['header']['msg_type'] if msg_type == 'input_request': - _raw_input = self.manager.kernel._sys_raw_input + _raw_input = self.client.kernel._sys_raw_input prompt = msg['content']['prompt'] raw_print(prompt, end='') self.input(_raw_input()) -class BlockingInProcessKernelManager(InProcessKernelManager): +class BlockingInProcessKernelClient(InProcessKernelClient): # The classes to use for the various channels. shell_channel_class = Type(BlockingInProcessShellChannel) diff --git a/IPython/kernel/inprocess/kernelmanager.py b/IPython/kernel/inprocess/channels.py similarity index 61% rename from IPython/kernel/inprocess/kernelmanager.py rename to IPython/kernel/inprocess/channels.py index b31f271..6f8ba67 100644 --- a/IPython/kernel/inprocess/kernelmanager.py +++ b/IPython/kernel/inprocess/channels.py @@ -1,4 +1,4 @@ -""" A kernel manager for in-process kernels. """ +""" A kernel client for in-process kernels. """ #----------------------------------------------------------------------------- # Copyright (C) 2012 The IPython Development Team @@ -11,15 +11,13 @@ # Imports #----------------------------------------------------------------------------- -# Local imports. -from IPython.config.configurable import Configurable -from IPython.utils.traitlets import Any, Instance, Type -from IPython.kernel.kernelmanagerabc import ( +# IPython imports +from IPython.kernel.channelsabc import ( ShellChannelABC, IOPubChannelABC, HBChannelABC, StdInChannelABC, - KernelManagerABC ) +# Local imports from .socket import DummySocket #----------------------------------------------------------------------------- @@ -28,10 +26,11 @@ from .socket import DummySocket class InProcessChannel(object): """Base class for in-process channels.""" + proxy_methods = [] - def __init__(self, manager): + def __init__(self, client): super(InProcessChannel, self).__init__() - self.manager = manager + self.client = client self._is_alive = False #-------------------------------------------------------------------------- @@ -77,10 +76,17 @@ class InProcessChannel(object): class InProcessShellChannel(InProcessChannel): - """See `IPython.kernel.kernelmanager.ShellChannel` for docstrings.""" + """See `IPython.kernel.channels.ShellChannel` for docstrings.""" # flag for whether execute requests should be allowed to call raw_input allow_stdin = True + proxy_methods = [ + 'execute', + 'complete', + 'object_info', + 'history', + 'shutdown', + ] #-------------------------------------------------------------------------- # ShellChannel interface @@ -94,26 +100,26 @@ class InProcessShellChannel(InProcessChannel): user_variables=user_variables, user_expressions=user_expressions, allow_stdin=allow_stdin) - msg = self.manager.session.msg('execute_request', content) + msg = self.client.session.msg('execute_request', content) self._dispatch_to_kernel(msg) return msg['header']['msg_id'] def complete(self, text, line, cursor_pos, block=None): content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos) - msg = self.manager.session.msg('complete_request', content) + msg = self.client.session.msg('complete_request', content) self._dispatch_to_kernel(msg) return msg['header']['msg_id'] def object_info(self, oname, detail_level=0): content = dict(oname=oname, detail_level=detail_level) - msg = self.manager.session.msg('object_info_request', content) + msg = self.client.session.msg('object_info_request', content) self._dispatch_to_kernel(msg) return msg['header']['msg_id'] def history(self, raw=True, output=False, hist_access_type='range', **kwds): content = dict(raw=raw, output=output, hist_access_type=hist_access_type, **kwds) - msg = self.manager.session.msg('history_request', content) + msg = self.client.session.msg('history_request', content) self._dispatch_to_kernel(msg) return msg['header']['msg_id'] @@ -128,38 +134,40 @@ class InProcessShellChannel(InProcessChannel): def _dispatch_to_kernel(self, msg): """ Send a message to the kernel and handle a reply. """ - kernel = self.manager.kernel + kernel = self.client.kernel if kernel is None: raise RuntimeError('Cannot send request. No kernel exists.') stream = DummySocket() - self.manager.session.send(stream, msg) + self.client.session.send(stream, msg) msg_parts = stream.recv_multipart() kernel.dispatch_shell(stream, msg_parts) - idents, reply_msg = self.manager.session.recv(stream, copy=False) + idents, reply_msg = self.client.session.recv(stream, copy=False) self.call_handlers_later(reply_msg) class InProcessIOPubChannel(InProcessChannel): - """See `IPython.kernel.kernelmanager.IOPubChannel` for docstrings.""" + """See `IPython.kernel.channels.IOPubChannel` for docstrings.""" def flush(self, timeout=1.0): pass class InProcessStdInChannel(InProcessChannel): - """See `IPython.kernel.kernelmanager.StdInChannel` for docstrings.""" + """See `IPython.kernel.channels.StdInChannel` for docstrings.""" + + proxy_methods = ['input'] def input(self, string): - kernel = self.manager.kernel + kernel = self.client.kernel if kernel is None: raise RuntimeError('Cannot send input reply. No kernel exists.') kernel.raw_input_str = string class InProcessHBChannel(InProcessChannel): - """See `IPython.kernel.kernelmanager.HBChannel` for docstrings.""" + """See `IPython.kernel.channels.HBChannel` for docstrings.""" time_to_dead = 3.0 @@ -176,133 +184,6 @@ class InProcessHBChannel(InProcessChannel): def is_beating(self): return not self._pause - -#----------------------------------------------------------------------------- -# Main kernel manager class -#----------------------------------------------------------------------------- - -class InProcessKernelManager(Configurable): - """A manager for an in-process kernel. - - This class implements the interface of - `IPython.kernel.kernelmanagerabc.KernelManagerABC` and allows - (asynchronous) frontends to be used seamlessly with an in-process kernel. - - See `IPython.kernel.kernelmanager.KernelManager` for docstrings. - """ - - # The Session to use for building messages. - session = Instance('IPython.kernel.zmq.session.Session') - def _session_default(self): - from IPython.kernel.zmq.session import Session - return Session(config=self.config) - - # The kernel process with which the KernelManager is communicating. - kernel = Instance('IPython.kernel.inprocess.ipkernel.InProcessKernel') - - # The classes to use for the various channels. - shell_channel_class = Type(InProcessShellChannel) - iopub_channel_class = Type(InProcessIOPubChannel) - stdin_channel_class = Type(InProcessStdInChannel) - hb_channel_class = Type(InProcessHBChannel) - - # Protected traits. - _shell_channel = Any - _iopub_channel = Any - _stdin_channel = Any - _hb_channel = Any - - #-------------------------------------------------------------------------- - # Channel management methods. - #-------------------------------------------------------------------------- - - def start_channels(self, shell=True, iopub=True, stdin=True, hb=True): - if shell: - self.shell_channel.start() - if iopub: - self.iopub_channel.start() - if stdin: - self.stdin_channel.start() - self.shell_channel.allow_stdin = True - else: - self.shell_channel.allow_stdin = False - if hb: - self.hb_channel.start() - - def stop_channels(self): - if self.shell_channel.is_alive(): - self.shell_channel.stop() - if self.iopub_channel.is_alive(): - self.iopub_channel.stop() - if self.stdin_channel.is_alive(): - self.stdin_channel.stop() - if self.hb_channel.is_alive(): - self.hb_channel.stop() - - @property - def channels_running(self): - return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or - self.stdin_channel.is_alive() or self.hb_channel.is_alive()) - - @property - def shell_channel(self): - if self._shell_channel is None: - self._shell_channel = self.shell_channel_class(self) - return self._shell_channel - - @property - def iopub_channel(self): - if self._iopub_channel is None: - self._iopub_channel = self.iopub_channel_class(self) - return self._iopub_channel - - @property - def stdin_channel(self): - if self._stdin_channel is None: - self._stdin_channel = self.stdin_channel_class(self) - return self._stdin_channel - - @property - def hb_channel(self): - if self._hb_channel is None: - self._hb_channel = self.hb_channel_class(self) - return self._hb_channel - - #-------------------------------------------------------------------------- - # Kernel management methods: - #-------------------------------------------------------------------------- - - def start_kernel(self, **kwds): - from IPython.kernel.inprocess.ipkernel import InProcessKernel - self.kernel = InProcessKernel() - self.kernel.frontends.append(self) - - def shutdown_kernel(self): - self._kill_kernel() - - def restart_kernel(self, now=False, **kwds): - self.shutdown_kernel() - self.start_kernel(**kwds) - - @property - def has_kernel(self): - return self.kernel is not None - - def _kill_kernel(self): - self.kernel.frontends.remove(self) - self.kernel = None - - def interrupt_kernel(self): - raise NotImplementedError("Cannot interrupt in-process kernel.") - - def signal_kernel(self, signum): - raise NotImplementedError("Cannot signal in-process kernel.") - - @property - def is_alive(self): - return True - - #----------------------------------------------------------------------------- # ABC Registration #----------------------------------------------------------------------------- @@ -311,4 +192,3 @@ ShellChannelABC.register(InProcessShellChannel) IOPubChannelABC.register(InProcessIOPubChannel) HBChannelABC.register(InProcessHBChannel) StdInChannelABC.register(InProcessStdInChannel) -KernelManagerABC.register(InProcessKernelManager) diff --git a/IPython/kernel/inprocess/client.py b/IPython/kernel/inprocess/client.py new file mode 100644 index 0000000..2ec276e --- /dev/null +++ b/IPython/kernel/inprocess/client.py @@ -0,0 +1,87 @@ +"""A client for in-process kernels.""" + +#----------------------------------------------------------------------------- +# Copyright (C) 2012 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +# IPython imports +from IPython.utils.traitlets import Type, Instance +from IPython.kernel.clientabc import KernelClientABC +from IPython.kernel.client import KernelClient + +# Local imports +from .channels import ( + InProcessShellChannel, + InProcessIOPubChannel, + InProcessHBChannel, + InProcessStdInChannel, + +) + +#----------------------------------------------------------------------------- +# Main kernel Client class +#----------------------------------------------------------------------------- + +class InProcessKernelClient(KernelClient): + """A client for an in-process kernel. + + This class implements the interface of + `IPython.kernel.clientabc.KernelClientABC` and allows + (asynchronous) frontends to be used seamlessly with an in-process kernel. + + See `IPython.kernel.client.KernelClient` for docstrings. + """ + + # The classes to use for the various channels. + shell_channel_class = Type(InProcessShellChannel) + iopub_channel_class = Type(InProcessIOPubChannel) + stdin_channel_class = Type(InProcessStdInChannel) + hb_channel_class = Type(InProcessHBChannel) + + kernel = Instance('IPython.kernel.inprocess.ipkernel.Kernel') + + #-------------------------------------------------------------------------- + # Channel management methods + #-------------------------------------------------------------------------- + + def start_channels(self, *args, **kwargs): + super(InProcessKernelClient, self).start_channels(self) + self.kernel.frontends.append(self) + + @property + def shell_channel(self): + if self._shell_channel is None: + self._shell_channel = self.shell_channel_class(self) + return self._shell_channel + + @property + def iopub_channel(self): + if self._iopub_channel is None: + self._iopub_channel = self.iopub_channel_class(self) + return self._iopub_channel + + @property + def stdin_channel(self): + if self._stdin_channel is None: + self._stdin_channel = self.stdin_channel_class(self) + return self._stdin_channel + + @property + def hb_channel(self): + if self._hb_channel is None: + self._hb_channel = self.hb_channel_class(self) + return self._hb_channel + + +#----------------------------------------------------------------------------- +# ABC Registration +#----------------------------------------------------------------------------- + +KernelClientABC.register(InProcessKernelClient) diff --git a/IPython/kernel/inprocess/ipkernel.py b/IPython/kernel/inprocess/ipkernel.py index 33b3cc5..c367fd4 100644 --- a/IPython/kernel/inprocess/ipkernel.py +++ b/IPython/kernel/inprocess/ipkernel.py @@ -37,7 +37,8 @@ class InProcessKernel(Kernel): # The frontends connected to this kernel. frontends = List( - Instance('IPython.kernel.inprocess.kernelmanager.InProcessKernelManager')) + Instance('IPython.kernel.inprocess.client.InProcessKernelClient') + ) # The GUI environment that the kernel is running under. This need not be # specified for the normal operation for the kernel, but is required for diff --git a/IPython/kernel/inprocess/manager.py b/IPython/kernel/inprocess/manager.py new file mode 100644 index 0000000..db833ea --- /dev/null +++ b/IPython/kernel/inprocess/manager.py @@ -0,0 +1,77 @@ +"""A kernel manager for in-process kernels.""" + +#----------------------------------------------------------------------------- +# Copyright (C) 2013 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +from IPython.utils.traitlets import Instance, DottedObjectName +from IPython.kernel.managerabc import KernelManagerABC +from IPython.kernel.manager import KernelManager + +#----------------------------------------------------------------------------- +# Main kernel manager class +#----------------------------------------------------------------------------- + +class InProcessKernelManager(KernelManager): + """A manager for an in-process kernel. + + This class implements the interface of + `IPython.kernel.kernelmanagerabc.KernelManagerABC` and allows + (asynchronous) frontends to be used seamlessly with an in-process kernel. + + See `IPython.kernel.kernelmanager.KernelManager` for docstrings. + """ + + # The kernel process with which the KernelManager is communicating. + kernel = Instance('IPython.kernel.inprocess.ipkernel.InProcessKernel') + # the client class for KM.client() shortcut + client_class = DottedObjectName('IPython.kernel.inprocess.BlockingInProcessKernelClient') + + #-------------------------------------------------------------------------- + # Kernel management methods + #-------------------------------------------------------------------------- + + def start_kernel(self, **kwds): + from IPython.kernel.inprocess.ipkernel import InProcessKernel + self.kernel = InProcessKernel() + + def shutdown_kernel(self): + self._kill_kernel() + + def restart_kernel(self, now=False, **kwds): + self.shutdown_kernel() + self.start_kernel(**kwds) + + @property + def has_kernel(self): + return self.kernel is not None + + def _kill_kernel(self): + self.kernel = None + + def interrupt_kernel(self): + raise NotImplementedError("Cannot interrupt in-process kernel.") + + def signal_kernel(self, signum): + raise NotImplementedError("Cannot signal in-process kernel.") + + def is_alive(self): + return self.kernel is not None + + def client(self, **kwargs): + kwargs['kernel'] = self.kernel + return super(InProcessKernelManager, self).client(**kwargs) + + +#----------------------------------------------------------------------------- +# ABC Registration +#----------------------------------------------------------------------------- + +KernelManagerABC.register(InProcessKernelManager) diff --git a/IPython/kernel/inprocess/tests/test_kernel.py b/IPython/kernel/inprocess/tests/test_kernel.py index bc0657f..a5ec8c1 100644 --- a/IPython/kernel/inprocess/tests/test_kernel.py +++ b/IPython/kernel/inprocess/tests/test_kernel.py @@ -16,8 +16,8 @@ import sys import unittest # Local imports -from IPython.kernel.inprocess.blockingkernelmanager import \ - BlockingInProcessKernelManager +from IPython.kernel.inprocess.blocking import BlockingInProcessKernelClient +from IPython.kernel.inprocess.manager import InProcessKernelManager from IPython.kernel.inprocess.ipkernel import InProcessKernel from IPython.testing.decorators import skipif_not_matplotlib from IPython.utils.io import capture_output @@ -29,33 +29,35 @@ from IPython.utils import py3compat class InProcessKernelTestCase(unittest.TestCase): + def setUp(self): + self.km = InProcessKernelManager() + self.km.start_kernel() + self.kc = BlockingInProcessKernelClient(kernel=self.km.kernel) + self.kc.start_channels() + @skipif_not_matplotlib def test_pylab(self): """ Does pylab work in the in-process kernel? """ - km = BlockingInProcessKernelManager() - km.start_kernel() - km.shell_channel.execute('%pylab') - msg = get_stream_message(km) + kc = self.kc + kc.execute('%pylab') + msg = get_stream_message(kc) self.assert_('Welcome to pylab' in msg['content']['data']) def test_raw_input(self): """ Does the in-process kernel handle raw_input correctly? """ - km = BlockingInProcessKernelManager() - km.start_kernel() - io = StringIO('foobar\n') sys_stdin = sys.stdin sys.stdin = io try: if py3compat.PY3: - km.shell_channel.execute('x = input()') + self.kc.execute('x = input()') else: - km.shell_channel.execute('x = raw_input()') + self.kc.execute('x = raw_input()') finally: sys.stdin = sys_stdin - self.assertEqual(km.kernel.shell.user_ns.get('x'), 'foobar') + self.assertEqual(self.km.kernel.shell.user_ns.get('x'), 'foobar') def test_stdout(self): """ Does the in-process kernel correctly capture IO? @@ -66,21 +68,21 @@ class InProcessKernelTestCase(unittest.TestCase): kernel.shell.run_cell('print("foo")') self.assertEqual(io.stdout, 'foo\n') - km = BlockingInProcessKernelManager(kernel=kernel) - kernel.frontends.append(km) - km.shell_channel.execute('print("bar")') - msg = get_stream_message(km) + kc = BlockingInProcessKernelClient(kernel=kernel) + kernel.frontends.append(kc) + kc.shell_channel.execute('print("bar")') + msg = get_stream_message(kc) self.assertEqual(msg['content']['data'], 'bar\n') #----------------------------------------------------------------------------- # Utility functions #----------------------------------------------------------------------------- -def get_stream_message(kernel_manager, timeout=5): +def get_stream_message(kernel_client, timeout=5): """ Gets a single stream message synchronously from the sub channel. """ while True: - msg = kernel_manager.iopub_channel.get_msg(timeout=timeout) + msg = kernel_client.get_iopub_msg(timeout=timeout) if msg['header']['msg_type'] == 'stream': return msg diff --git a/IPython/kernel/inprocess/tests/test_kernelmanager.py b/IPython/kernel/inprocess/tests/test_kernelmanager.py index df1ad24..f6cb4bd 100644 --- a/IPython/kernel/inprocess/tests/test_kernelmanager.py +++ b/IPython/kernel/inprocess/tests/test_kernelmanager.py @@ -14,9 +14,9 @@ from __future__ import print_function import unittest # Local imports -from IPython.kernel.inprocess.blockingkernelmanager import \ - BlockingInProcessKernelManager +from IPython.kernel.inprocess.blocking import BlockingInProcessKernelClient from IPython.kernel.inprocess.ipkernel import InProcessKernel +from IPython.kernel.inprocess.manager import InProcessKernelManager #----------------------------------------------------------------------------- # Test case @@ -24,20 +24,22 @@ from IPython.kernel.inprocess.ipkernel import InProcessKernel class InProcessKernelManagerTestCase(unittest.TestCase): - def test_inteface(self): + def test_interface(self): """ Does the in-process kernel manager implement the basic KM interface? """ - km = BlockingInProcessKernelManager() - self.assert_(not km.channels_running) + km = InProcessKernelManager() self.assert_(not km.has_kernel) - km.start_channels() - self.assert_(km.channels_running) - km.start_kernel() self.assert_(km.has_kernel) self.assert_(km.kernel is not None) + kc = BlockingInProcessKernelClient(kernel=km.kernel) + self.assert_(not kc.channels_running) + + kc.start_channels() + self.assert_(kc.channels_running) + old_kernel = km.kernel km.restart_kernel() self.assert_(km.kernel is not None) @@ -49,37 +51,43 @@ class InProcessKernelManagerTestCase(unittest.TestCase): self.assertRaises(NotImplementedError, km.interrupt_kernel) self.assertRaises(NotImplementedError, km.signal_kernel, 9) - km.stop_channels() - self.assert_(not km.channels_running) + kc.stop_channels() + self.assert_(not kc.channels_running) def test_execute(self): """ Does executing code in an in-process kernel work? """ - km = BlockingInProcessKernelManager() + km = InProcessKernelManager() km.start_kernel() - km.shell_channel.execute('foo = 1') + kc = BlockingInProcessKernelClient(kernel=km.kernel) + kc.start_channels() + kc.execute('foo = 1') self.assertEquals(km.kernel.shell.user_ns['foo'], 1) def test_complete(self): """ Does requesting completion from an in-process kernel work? """ - km = BlockingInProcessKernelManager() + km = InProcessKernelManager() km.start_kernel() + kc = BlockingInProcessKernelClient(kernel=km.kernel) + kc.start_channels() km.kernel.shell.push({'my_bar': 0, 'my_baz': 1}) - km.shell_channel.complete('my_ba', 'my_ba', 5) - msg = km.shell_channel.get_msg() - self.assertEquals(msg['header']['msg_type'], 'complete_reply') - self.assertEquals(sorted(msg['content']['matches']), + kc.complete('my_ba', 'my_ba', 5) + msg = kc.get_shell_msg() + self.assertEqual(msg['header']['msg_type'], 'complete_reply') + self.assertEqual(sorted(msg['content']['matches']), ['my_bar', 'my_baz']) def test_object_info(self): """ Does requesting object information from an in-process kernel work? """ - km = BlockingInProcessKernelManager() + km = InProcessKernelManager() km.start_kernel() + kc = BlockingInProcessKernelClient(kernel=km.kernel) + kc.start_channels() km.kernel.shell.user_ns['foo'] = 1 - km.shell_channel.object_info('foo') - msg = km.shell_channel.get_msg() + kc.object_info('foo') + msg = kc.get_shell_msg() self.assertEquals(msg['header']['msg_type'], 'object_info_reply') self.assertEquals(msg['content']['name'], 'foo') self.assertEquals(msg['content']['type_name'], 'int') @@ -87,11 +95,13 @@ class InProcessKernelManagerTestCase(unittest.TestCase): def test_history(self): """ Does requesting history from an in-process kernel work? """ - km = BlockingInProcessKernelManager() + km = InProcessKernelManager() km.start_kernel() - km.shell_channel.execute('%who') - km.shell_channel.history(hist_access_type='tail', n=1) - msg = km.shell_channel.get_msgs()[-1] + kc = BlockingInProcessKernelClient(kernel=km.kernel) + kc.start_channels() + kc.execute('%who') + kc.history(hist_access_type='tail', n=1) + msg = kc.shell_channel.get_msgs()[-1] self.assertEquals(msg['header']['msg_type'], 'history_reply') history = msg['content']['history'] self.assertEquals(len(history), 1) diff --git a/IPython/kernel/ioloop/__init__.py b/IPython/kernel/ioloop/__init__.py new file mode 100644 index 0000000..d64f06d --- /dev/null +++ b/IPython/kernel/ioloop/__init__.py @@ -0,0 +1,2 @@ +from .manager import IOLoopKernelManager +from .restarter import IOLoopKernelRestarter diff --git a/IPython/kernel/ioloop/manager.py b/IPython/kernel/ioloop/manager.py new file mode 100644 index 0000000..a62445f --- /dev/null +++ b/IPython/kernel/ioloop/manager.py @@ -0,0 +1,63 @@ +"""A kernel manager with a tornado IOLoop""" + +#----------------------------------------------------------------------------- +# Copyright (C) 2013 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +from __future__ import absolute_import + +import zmq +from zmq.eventloop import ioloop +from zmq.eventloop.zmqstream import ZMQStream + +from IPython.utils.traitlets import ( + Instance +) + +from IPython.kernel.manager import KernelManager +from .restarter import IOLoopKernelRestarter + +#----------------------------------------------------------------------------- +# Code +#----------------------------------------------------------------------------- + + +def as_zmqstream(f): + def wrapped(self, *args, **kwargs): + socket = f(self, *args, **kwargs) + return ZMQStream(socket, self.loop) + return wrapped + +class IOLoopKernelManager(KernelManager): + + loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False) + def _loop_default(self): + return ioloop.IOLoop.instance() + + _restarter = Instance('IPython.kernel.ioloop.IOLoopKernelRestarter') + + def start_restarter(self): + if self.autorestart and self.has_kernel: + if self._restarter is None: + self._restarter = IOLoopKernelRestarter( + kernel_manager=self, loop=self.loop, + config=self.config, log=self.log + ) + self._restarter.start() + + def stop_restarter(self): + if self.autorestart: + if self._restarter is not None: + self._restarter.stop() + + connect_shell = as_zmqstream(KernelManager.connect_shell) + connect_iopub = as_zmqstream(KernelManager.connect_iopub) + connect_stdin = as_zmqstream(KernelManager.connect_stdin) + connect_hb = as_zmqstream(KernelManager.connect_hb) diff --git a/IPython/kernel/ioloop/restarter.py b/IPython/kernel/ioloop/restarter.py new file mode 100644 index 0000000..947190c --- /dev/null +++ b/IPython/kernel/ioloop/restarter.py @@ -0,0 +1,55 @@ +"""A basic in process kernel monitor with autorestarting. + +This watches a kernel's state using KernelManager.is_alive and auto +restarts the kernel if it dies. +""" + +#----------------------------------------------------------------------------- +# Copyright (C) 2013 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +from __future__ import absolute_import + +import zmq +from zmq.eventloop import ioloop + + +from IPython.kernel.restarter import KernelRestarter +from IPython.utils.traitlets import ( + Instance, Float, List, +) + +#----------------------------------------------------------------------------- +# Code +#----------------------------------------------------------------------------- + +class IOLoopKernelRestarter(KernelRestarter): + """Monitor and autorestart a kernel.""" + + loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False) + def _loop_default(self): + return ioloop.IOLoop.instance() + + _pcallback = None + + def start(self): + """Start the polling of the kernel.""" + if self._pcallback is None: + self._pcallback = ioloop.PeriodicCallback( + self.poll, 1000*self.time_to_dead, self.loop + ) + self._pcallback.start() + + def stop(self): + """Stop the kernel polling.""" + if self._pcallback is not None: + self._pcallback.stop() + self._pcallback = None + diff --git a/IPython/kernel/manager.py b/IPython/kernel/manager.py new file mode 100644 index 0000000..cd30195 --- /dev/null +++ b/IPython/kernel/manager.py @@ -0,0 +1,379 @@ +"""Base class to manage a running kernel +""" + +#----------------------------------------------------------------------------- +# Copyright (C) 2013 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +from __future__ import absolute_import + +# Standard library imports +import signal +import sys +import time + +import zmq + +# Local imports +from IPython.config.configurable import LoggingConfigurable +from IPython.utils.importstring import import_item +from IPython.utils.localinterfaces import LOCAL_IPS +from IPython.utils.traitlets import ( + Any, Instance, Unicode, List, Bool, Type, DottedObjectName +) +from IPython.kernel import ( + make_ipkernel_cmd, + launch_kernel, +) +from .connect import ConnectionFileMixin +from .zmq.session import Session +from .managerabc import ( + KernelManagerABC +) + +#----------------------------------------------------------------------------- +# Main kernel manager class +#----------------------------------------------------------------------------- + +class KernelManager(LoggingConfigurable, ConnectionFileMixin): + """Manages a single kernel in a subprocess on this host. + + This version starts kernels with Popen. + """ + + # The PyZMQ Context to use for communication with the kernel. + context = Instance(zmq.Context) + def _context_default(self): + return zmq.Context.instance() + + # The Session to use for communication with the kernel. + session = Instance(Session) + def _session_default(self): + return Session(config=self.config) + + # the class to create with our `client` method + client_class = DottedObjectName('IPython.kernel.client.KernelClient') + client_factory = Type() + def _client_class_changed(self, name, old, new): + self.client_factory = import_item(str(new)) + + # The kernel process with which the KernelManager is communicating. + # generally a Popen instance + kernel = Any() + + kernel_cmd = List(Unicode, config=True, + help="""The Popen Command to launch the kernel. + Override this if you have a custom + """ + ) + + def _kernel_cmd_changed(self, name, old, new): + self.ipython_kernel = False + + ipython_kernel = Bool(True) + + # Protected traits + _launch_args = Any() + _control_socket = Any() + + _restarter = Any() + + autorestart = Bool(False, config=True, + help="""Should we autorestart the kernel if it dies.""" + ) + + def __del__(self): + self._close_control_socket() + self.cleanup_connection_file() + + #-------------------------------------------------------------------------- + # Kernel restarter + #-------------------------------------------------------------------------- + + def start_restarter(self): + pass + + def stop_restarter(self): + pass + + def add_restart_callback(self, callback, event='restart'): + """register a callback to be called when a kernel is restarted""" + if self._restarter is None: + return + self._restarter.add_callback(callback, event) + + def remove_restart_callback(self, callback, event='restart'): + """unregister a callback to be called when a kernel is restarted""" + if self._restarter is None: + return + self._restarter.remove_callback(callback, event) + + #-------------------------------------------------------------------------- + # create a Client connected to our Kernel + #-------------------------------------------------------------------------- + + def client(self, **kwargs): + """Create a client configured to connect to our kernel""" + if self.client_factory is None: + self.client_factory = import_item(self.client_class) + + kw = {} + kw.update(self.get_connection_info()) + kw.update(dict( + connection_file=self.connection_file, + session=self.session, + config=self.config, + )) + + # add kwargs last, for manual overrides + kw.update(kwargs) + return self.client_factory(**kw) + + #-------------------------------------------------------------------------- + # Kernel management + #-------------------------------------------------------------------------- + + def format_kernel_cmd(self, **kw): + """format templated args (e.g. {connection_file})""" + if self.kernel_cmd: + cmd = self.kernel_cmd + else: + cmd = make_ipkernel_cmd( + 'from IPython.kernel.zmq.kernelapp import main; main()', + **kw + ) + ns = dict(connection_file=self.connection_file) + ns.update(self._launch_args) + return [ c.format(**ns) for c in cmd ] + + def _launch_kernel(self, kernel_cmd, **kw): + """actually launch the kernel + + override in a subclass to launch kernel subprocesses differently + """ + return launch_kernel(kernel_cmd, **kw) + + # Control socket used for polite kernel shutdown + + def _connect_control_socket(self): + if self._control_socket is None: + self._control_socket = self.connect_control() + self._control_socket.linger = 100 + + def _close_control_socket(self): + if self._control_socket is None: + return + self._control_socket.close() + self._control_socket = None + + def start_kernel(self, **kw): + """Starts a kernel on this host in a separate process. + + If random ports (port=0) are being used, this method must be called + before the channels are created. + + Parameters: + ----------- + **kw : optional + keyword arguments that are passed down to build the kernel_cmd + and launching the kernel (e.g. Popen kwargs). + """ + if self.transport == 'tcp' and self.ip not in LOCAL_IPS: + raise RuntimeError("Can only launch a kernel on a local interface. " + "Make sure that the '*_address' attributes are " + "configured properly. " + "Currently valid addresses are: %s"%LOCAL_IPS + ) + + # write connection file / get default ports + self.write_connection_file() + + # save kwargs for use in restart + self._launch_args = kw.copy() + # build the Popen cmd + kernel_cmd = self.format_kernel_cmd(**kw) + # launch the kernel subprocess + self.kernel = self._launch_kernel(kernel_cmd, + ipython_kernel=self.ipython_kernel, + **kw) + self.start_restarter() + self._connect_control_socket() + + def _send_shutdown_request(self, restart=False): + """TODO: send a shutdown request via control channel""" + content = dict(restart=restart) + msg = self.session.msg("shutdown_request", content=content) + self.session.send(self._control_socket, msg) + + def shutdown_kernel(self, now=False, restart=False): + """Attempts to the stop the kernel process cleanly. + + This attempts to shutdown the kernels cleanly by: + + 1. Sending it a shutdown message over the shell channel. + 2. If that fails, the kernel is shutdown forcibly by sending it + a signal. + + Parameters: + ----------- + now : bool + Should the kernel be forcible killed *now*. This skips the + first, nice shutdown attempt. + restart: bool + Will this kernel be restarted after it is shutdown. When this + is True, connection files will not be cleaned up. + """ + # Stop monitoring for restarting while we shutdown. + self.stop_restarter() + + # FIXME: Shutdown does not work on Windows due to ZMQ errors! + if sys.platform == 'win32': + self._kill_kernel() + return + + if now: + if self.has_kernel: + self._kill_kernel() + else: + # Don't send any additional kernel kill messages immediately, to give + # the kernel a chance to properly execute shutdown actions. Wait for at + # most 1s, checking every 0.1s. + self._send_shutdown_request(restart=restart) + for i in range(10): + if self.is_alive(): + time.sleep(0.1) + else: + break + else: + # OK, we've waited long enough. + if self.has_kernel: + self._kill_kernel() + + if not restart: + self.cleanup_connection_file() + self.cleanup_ipc_files() + else: + self.cleanup_ipc_files() + + def restart_kernel(self, now=False, **kw): + """Restarts a kernel with the arguments that were used to launch it. + + If the old kernel was launched with random ports, the same ports will be + used for the new kernel. The same connection file is used again. + + Parameters + ---------- + now : bool, optional + If True, the kernel is forcefully restarted *immediately*, without + having a chance to do any cleanup action. Otherwise the kernel is + given 1s to clean up before a forceful restart is issued. + + In all cases the kernel is restarted, the only difference is whether + it is given a chance to perform a clean shutdown or not. + + **kw : optional + Any options specified here will overwrite those used to launch the + kernel. + """ + if self._launch_args is None: + raise RuntimeError("Cannot restart the kernel. " + "No previous call to 'start_kernel'.") + else: + # Stop currently running kernel. + self.shutdown_kernel(now=now, restart=True) + + # Start new kernel. + self._launch_args.update(kw) + self.start_kernel(**self._launch_args) + + # FIXME: Messages get dropped in Windows due to probable ZMQ bug + # unless there is some delay here. + if sys.platform == 'win32': + time.sleep(0.2) + + @property + def has_kernel(self): + """Has a kernel been started that we are managing.""" + return self.kernel is not None + + def _kill_kernel(self): + """Kill the running kernel. + + This is a private method, callers should use shutdown_kernel(now=True). + """ + if self.has_kernel: + + # Signal the kernel to terminate (sends SIGKILL on Unix and calls + # TerminateProcess() on Win32). + try: + self.kernel.kill() + except OSError as e: + # In Windows, we will get an Access Denied error if the process + # has already terminated. Ignore it. + if sys.platform == 'win32': + if e.winerror != 5: + raise + # On Unix, we may get an ESRCH error if the process has already + # terminated. Ignore it. + else: + from errno import ESRCH + if e.errno != ESRCH: + raise + + # Block until the kernel terminates. + self.kernel.wait() + self.kernel = None + else: + raise RuntimeError("Cannot kill kernel. No kernel is running!") + + def interrupt_kernel(self): + """Interrupts the kernel by sending it a signal. + + Unlike ``signal_kernel``, this operation is well supported on all + platforms. + """ + if self.has_kernel: + if sys.platform == 'win32': + from .zmq.parentpoller import ParentPollerWindows as Poller + Poller.send_interrupt(self.kernel.win32_interrupt_event) + else: + self.kernel.send_signal(signal.SIGINT) + else: + raise RuntimeError("Cannot interrupt kernel. No kernel is running!") + + def signal_kernel(self, signum): + """Sends a signal to the kernel. + + Note that since only SIGTERM is supported on Windows, this function is + only useful on Unix systems. + """ + if self.has_kernel: + self.kernel.send_signal(signum) + else: + raise RuntimeError("Cannot signal kernel. No kernel is running!") + + def is_alive(self): + """Is the kernel process still running?""" + if self.has_kernel: + if self.kernel.poll() is None: + return True + else: + return False + else: + # we don't have a kernel + return False + + +#----------------------------------------------------------------------------- +# ABC Registration +#----------------------------------------------------------------------------- + +KernelManagerABC.register(KernelManager) + diff --git a/IPython/kernel/kernelmanagerabc.py b/IPython/kernel/managerabc.py similarity index 98% rename from IPython/kernel/kernelmanagerabc.py rename to IPython/kernel/managerabc.py index 038e4de..d668a2a 100644 --- a/IPython/kernel/kernelmanagerabc.py +++ b/IPython/kernel/managerabc.py @@ -220,7 +220,6 @@ class KernelManagerABC(object): def signal_kernel(self, signum): pass - @abc.abstractproperty + @abc.abstractmethod def is_alive(self): pass - diff --git a/IPython/kernel/multikernelmanager.py b/IPython/kernel/multikernelmanager.py index 9b148ed..063aaf8 100644 --- a/IPython/kernel/multikernelmanager.py +++ b/IPython/kernel/multikernelmanager.py @@ -22,13 +22,13 @@ import os import uuid import zmq -from zmq.eventloop.zmqstream import ZMQStream from IPython.config.configurable import LoggingConfigurable from IPython.utils.importstring import import_item from IPython.utils.traitlets import ( - Instance, Dict, Unicode, Any, DottedObjectName, + Instance, Dict, Unicode, Any, DottedObjectName, Bool ) + #----------------------------------------------------------------------------- # Classes #----------------------------------------------------------------------------- @@ -37,11 +37,28 @@ class DuplicateKernelError(Exception): pass + +def kernel_method(f): + """decorator for proxying MKM.method(kernel_id) to individual KMs by ID""" + def wrapped(self, kernel_id, *args, **kwargs): + # get the kernel + km = self.get_kernel(kernel_id) + method = getattr(km, f.__name__) + # call the kernel's method + r = method(*args, **kwargs) + # last thing, call anything defined in the actual class method + # such as logging messages + f(self, kernel_id, *args, **kwargs) + # return the method result + return r + return wrapped + + class MultiKernelManager(LoggingConfigurable): """A class for managing multiple kernels.""" kernel_manager_class = DottedObjectName( - "IPython.kernel.blockingkernelmanager.BlockingKernelManager", config=True, + "IPython.kernel.ioloop.IOLoopKernelManager", config=True, help="""The kernel manager class. This is configurable to allow subclassing of the KernelManager for customized behavior. """ @@ -56,7 +73,7 @@ class MultiKernelManager(LoggingConfigurable): context = Instance('zmq.Context') def _context_default(self): return zmq.Context.instance() - + connection_dir = Unicode('') _kernels = Dict() @@ -93,14 +110,13 @@ class MultiKernelManager(LoggingConfigurable): # including things like its transport and ip. km = self.kernel_manager_factory(connection_file=os.path.join( self.connection_dir, "kernel-%s.json" % kernel_id), - config=self.config, + config=self.config, autorestart=True, log=self.log ) km.start_kernel(**kwargs) - # start just the shell channel, needed for graceful restart - km.start_channels(shell=True, iopub=False, stdin=False, hb=False) self._kernels[kernel_id] = km return kernel_id + @kernel_method def shutdown_kernel(self, kernel_id, now=False): """Shutdown a kernel by its kernel uuid. @@ -111,16 +127,25 @@ class MultiKernelManager(LoggingConfigurable): now : bool Should the kernel be shutdown forcibly using a signal. """ - k = self.get_kernel(kernel_id) - k.shutdown_kernel(now=now) - k.shell_channel.stop() - del self._kernels[kernel_id] + self.log.info("Kernel shutdown: %s" % kernel_id) + self.remove_kernel(kernel_id) + + def remove_kernel(self, kernel_id): + """remove a kernel from our mapping. + + Mainly so that a kernel can be removed if it is already dead, + without having to call shutdown_kernel. + + The kernel object is returned. + """ + return self._kernels.pop(kernel_id) def shutdown_all(self, now=False): """Shutdown all kernels.""" for kid in self.list_kernel_ids(): self.shutdown_kernel(kid, now=now) + @kernel_method def interrupt_kernel(self, kernel_id): """Interrupt (SIGINT) the kernel by its uuid. @@ -129,8 +154,9 @@ class MultiKernelManager(LoggingConfigurable): kernel_id : uuid The id of the kernel to interrupt. """ - return self.get_kernel(kernel_id).interrupt_kernel() + self.log.info("Kernel interrupted: %s" % kernel_id) + @kernel_method def signal_kernel(self, kernel_id, signum): """Sends a signal to the kernel by its uuid. @@ -142,8 +168,9 @@ class MultiKernelManager(LoggingConfigurable): kernel_id : uuid The id of the kernel to signal. """ - return self.get_kernel(kernel_id).signal_kernel(signum) + self.log.info("Signaled Kernel %s with %s" % (kernel_id, signum)) + @kernel_method def restart_kernel(self, kernel_id): """Restart a kernel by its uuid, keeping the same ports. @@ -152,7 +179,25 @@ class MultiKernelManager(LoggingConfigurable): kernel_id : uuid The id of the kernel to interrupt. """ - return self.get_kernel(kernel_id).restart_kernel() + self.log.info("Kernel restarted: %s" % kernel_id) + + @kernel_method + def is_alive(self, kernel_id): + """Is the kernel alive. + + This calls KernelManager.is_alive() which calls Popen.poll on the + actual kernel subprocess. + + Parameters + ========== + kernel_id : uuid + The id of the kernel. + """ + + def _check_kernel_id(self, kernel_id): + """check that a kernel id is valid""" + if kernel_id not in self: + raise KeyError("Kernel with id not found: %s" % kernel_id) def get_kernel(self, kernel_id): """Get the single KernelManager object for a kernel by its uuid. @@ -162,12 +207,18 @@ class MultiKernelManager(LoggingConfigurable): kernel_id : uuid The id of the kernel. """ - km = self._kernels.get(kernel_id) - if km is not None: - return km - else: - raise KeyError("Kernel with id not found: %s" % kernel_id) + self._check_kernel_id(kernel_id) + return self._kernels[kernel_id] + + @kernel_method + def add_restart_callback(self, kernel_id, callback, event='restart'): + """add a callback for the KernelRestarter""" + + @kernel_method + def remove_restart_callback(self, kernel_id, callback, event='restart'): + """remove a callback for the KernelRestarter""" + @kernel_method def get_connection_info(self, kernel_id): """Return a dictionary of connection data for a kernel. @@ -184,76 +235,67 @@ class MultiKernelManager(LoggingConfigurable): numbers of the different channels (stdin_port, iopub_port, shell_port, hb_port). """ - km = self.get_kernel(kernel_id) - return dict(transport=km.transport, - ip=km.ip, - shell_port=km.shell_port, - iopub_port=km.iopub_port, - stdin_port=km.stdin_port, - hb_port=km.hb_port, - ) - - def _make_url(self, transport, ip, port): - """Make a ZeroMQ URL for a given transport, ip and port.""" - if transport == 'tcp': - return "tcp://%s:%i" % (ip, port) - else: - return "%s://%s-%s" % (transport, ip, port) - - def _create_connected_stream(self, kernel_id, socket_type, channel): - """Create a connected ZMQStream for a kernel.""" - cinfo = self.get_connection_info(kernel_id) - url = self._make_url(cinfo['transport'], cinfo['ip'], - cinfo['%s_port' % channel] - ) - sock = self.context.socket(socket_type) - self.log.info("Connecting to: %s" % url) - sock.connect(url) - return ZMQStream(sock) - def create_iopub_stream(self, kernel_id): - """Return a ZMQStream object connected to the iopub channel. + @kernel_method + def connect_iopub(self, kernel_id, identity=None): + """Return a zmq Socket connected to the iopub channel. Parameters ========== kernel_id : uuid - The id of the kernel. + The id of the kernel + identity : bytes (optional) + The zmq identity of the socket Returns ======= - stream : ZMQStream + stream : zmq Socket or ZMQStream """ - iopub_stream = self._create_connected_stream(kernel_id, zmq.SUB, 'iopub') - iopub_stream.socket.setsockopt(zmq.SUBSCRIBE, b'') - return iopub_stream - def create_shell_stream(self, kernel_id): - """Return a ZMQStream object connected to the shell channel. + @kernel_method + def connect_shell(self, kernel_id, identity=None): + """Return a zmq Socket connected to the shell channel. Parameters ========== kernel_id : uuid - The id of the kernel. + The id of the kernel + identity : bytes (optional) + The zmq identity of the socket Returns ======= - stream : ZMQStream + stream : zmq Socket or ZMQStream """ - shell_stream = self._create_connected_stream(kernel_id, zmq.DEALER, 'shell') - return shell_stream - def create_hb_stream(self, kernel_id): - """Return a ZMQStream object connected to the hb channel. + @kernel_method + def connect_stdin(self, kernel_id, identity=None): + """Return a zmq Socket connected to the stdin channel. Parameters ========== kernel_id : uuid - The id of the kernel. + The id of the kernel + identity : bytes (optional) + The zmq identity of the socket Returns ======= - stream : ZMQStream + stream : zmq Socket or ZMQStream """ - hb_stream = self._create_connected_stream(kernel_id, zmq.REQ, 'hb') - return hb_stream + @kernel_method + def connect_hb(self, kernel_id, identity=None): + """Return a zmq Socket connected to the hb channel. + + Parameters + ========== + kernel_id : uuid + The id of the kernel + identity : bytes (optional) + The zmq identity of the socket + + Returns + ======= + stream : zmq Socket or ZMQStream + """ diff --git a/IPython/kernel/restarter.py b/IPython/kernel/restarter.py new file mode 100644 index 0000000..6dec73e --- /dev/null +++ b/IPython/kernel/restarter.py @@ -0,0 +1,114 @@ +"""A basic kernel monitor with autorestarting. + +This watches a kernel's state using KernelManager.is_alive and auto +restarts the kernel if it dies. + +It is an incomplete base class, and must be subclassed. +""" + +#----------------------------------------------------------------------------- +# Copyright (C) 2013 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +from IPython.config.configurable import LoggingConfigurable +from IPython.utils.traitlets import ( + Instance, Float, Dict, Bool, Integer, +) + +#----------------------------------------------------------------------------- +# Code +#----------------------------------------------------------------------------- + +class KernelRestarter(LoggingConfigurable): + """Monitor and autorestart a kernel.""" + + kernel_manager = Instance('IPython.kernel.KernelManager') + + time_to_dead = Float(3.0, config=True, + help="""Kernel heartbeat interval in seconds.""" + ) + + restart_limit = Integer(5, config=True, + help="""The number of consecutive autorestarts before the kernel is presumed dead.""" + ) + _restarting = Bool(False) + _restart_count = Integer(0) + + callbacks = Dict() + def _callbacks_default(self): + return dict(restart=[], dead=[]) + + def start(self): + """Start the polling of the kernel.""" + raise NotImplementedError("Must be implemented in a subclass") + + def stop(self): + """Stop the kernel polling.""" + raise NotImplementedError("Must be implemented in a subclass") + + def add_callback(self, f, event='restart'): + """register a callback to fire on a particular event + + Possible values for event: + + 'restart' (default): kernel has died, and will be restarted. + 'dead': restart has failed, kernel will be left dead. + + """ + self.callbacks[event].append(f) + + def remove_callback(self, f, event='restart'): + """unregister a callback to fire on a particular event + + Possible values for event: + + 'restart' (default): kernel has died, and will be restarted. + 'dead': restart has failed, kernel will be left dead. + + """ + try: + self.callbacks[event].remove(f) + except ValueError: + pass + + def _fire_callbacks(self, event): + """fire our callbacks for a particular event""" + for callback in self.callbacks[event]: + try: + callback() + except Exception as e: + self.log.error("KernelRestarter: %s callback %r failed", event, callback, exc_info=True) + + def poll(self): + self.log.debug('Polling kernel...') + if not self.kernel_manager.is_alive(): + if self._restarting: + self._restart_count += 1 + else: + self._restart_count = 1 + + if self._restart_count >= self.restart_limit: + self.log.warn("KernelRestarter: restart failed") + self._fire_callbacks('dead') + self._restarting = False + self._restart_count = 0 + self.stop() + else: + self.log.info('KernelRestarter: restarting kernel (%i/%i)', + self._restart_count, + self.restart_limit + ) + self._fire_callbacks('restart') + self.kernel_manager.restart_kernel(now=True) + self._restarting = True + else: + if self._restarting: + self.log.debug("KernelRestarter: restart apparently succeeded") + self._restarting = False diff --git a/IPython/kernel/tests/test_kernelmanager.py b/IPython/kernel/tests/test_kernelmanager.py index 0d70da8..15a0a8a 100644 --- a/IPython/kernel/tests/test_kernelmanager.py +++ b/IPython/kernel/tests/test_kernelmanager.py @@ -7,12 +7,14 @@ from unittest import TestCase from IPython.testing import decorators as dec from IPython.config.loader import Config -from IPython.kernel.kernelmanager import KernelManager +from IPython.kernel import KernelManager class TestKernelManager(TestCase): def _get_tcp_km(self): - return KernelManager() + c = Config() + km = KernelManager(config=c) + return km def _get_ipc_km(self): c = Config() @@ -23,8 +25,9 @@ class TestKernelManager(TestCase): def _run_lifecycle(self, km): km.start_kernel(stdout=PIPE, stderr=PIPE) - km.start_channels(shell=True, iopub=False, stdin=False, hb=False) + self.assertTrue(km.is_alive()) km.restart_kernel() + self.assertTrue(km.is_alive()) # We need a delay here to give the restarting kernel a chance to # restart. Otherwise, the interrupt will kill it, causing the test # suite to hang. The reason it *hangs* is that the shutdown @@ -35,7 +38,6 @@ class TestKernelManager(TestCase): km.interrupt_kernel() self.assertTrue(isinstance(km, KernelManager)) km.shutdown_kernel() - km.shell_channel.stop() def test_tcp_lifecycle(self): km = self._get_tcp_km() diff --git a/IPython/kernel/tests/test_message_spec.py b/IPython/kernel/tests/test_message_spec.py index 71fb1a7..72389ac 100644 --- a/IPython/kernel/tests/test_message_spec.py +++ b/IPython/kernel/tests/test_message_spec.py @@ -15,7 +15,7 @@ from Queue import Empty import nose.tools as nt -from ..blockingkernelmanager import BlockingKernelManager +from IPython.kernel import KernelManager, BlockingKernelClient from IPython.testing import decorators as dec @@ -29,28 +29,29 @@ from IPython.utils.traitlets import ( #----------------------------------------------------------------------------- def setup(): - global KM - KM = BlockingKernelManager() - + global KM, KC + KM = KernelManager() + KM.client_factory = BlockingKernelClient KM.start_kernel(stdout=PIPE, stderr=PIPE) - KM.start_channels() + KC = KM.client() + KC.start_channels() # wait for kernel to be ready - KM.shell_channel.execute("pass") - KM.shell_channel.get_msg(block=True, timeout=5) + KC.execute("pass") + KC.get_shell_msg(block=True, timeout=5) flush_channels() def teardown(): - KM.stop_channels() + KC.stop_channels() KM.shutdown_kernel() -def flush_channels(km=None): - if km is None: - km = KM +def flush_channels(kc=None): """flush any messages waiting on the queue""" - for channel in (km.shell_channel, km.iopub_channel): + if kc is None: + kc = KC + for channel in (kc.shell_channel, kc.iopub_channel): while True: try: msg = channel.get_msg(block=True, timeout=0.1) @@ -60,22 +61,17 @@ def flush_channels(km=None): list(validate_message(msg)) -def execute(code='', km=None, **kwargs): +def execute(code='', kc=None, **kwargs): """wrapper for doing common steps for validating an execution request""" - if km is None: - km = KM - shell = km.shell_channel - sub = km.iopub_channel - - msg_id = shell.execute(code=code, **kwargs) - reply = shell.get_msg(timeout=2) + msg_id = KC.execute(code=code, **kwargs) + reply = KC.get_shell_msg(timeout=2) list(validate_message(reply, 'execute_reply', msg_id)) - busy = sub.get_msg(timeout=2) + busy = KC.get_iopub_msg(timeout=2) list(validate_message(busy, 'status', msg_id)) nt.assert_equal(busy['content']['execution_state'], 'busy') if not kwargs.get('silent'): - pyin = sub.get_msg(timeout=2) + pyin = KC.get_iopub_msg(timeout=2) list(validate_message(pyin, 'pyin', msg_id)) nt.assert_equal(pyin['content']['code'], code) @@ -192,7 +188,7 @@ class ArgSpec(Reference): class Status(Reference): - execution_state = Enum((u'busy', u'idle')) + execution_state = Enum((u'busy', u'idle', u'starting')) class CompleteReply(Reference): @@ -301,9 +297,8 @@ def validate_message(msg, msg_type=None, parent=None): def test_execute(): flush_channels() - shell = KM.shell_channel - msg_id = shell.execute(code='x=1') - reply = shell.get_msg(timeout=2) + msg_id = KC.execute(code='x=1') + reply = KC.get_shell_msg(timeout=2) for tst in validate_message(reply, 'execute_reply', msg_id): yield tst @@ -314,23 +309,23 @@ def test_execute_silent(): msg_id, reply = execute(code='x=1', silent=True) # flush status=idle - status = KM.iopub_channel.get_msg(timeout=2) + status = KC.iopub_channel.get_msg(timeout=2) for tst in validate_message(status, 'status', msg_id): yield tst nt.assert_equal(status['content']['execution_state'], 'idle') - yield nt.assert_raises(Empty, KM.iopub_channel.get_msg, timeout=0.1) + yield nt.assert_raises(Empty, KC.iopub_channel.get_msg, timeout=0.1) count = reply['execution_count'] msg_id, reply = execute(code='x=2', silent=True) # flush status=idle - status = KM.iopub_channel.get_msg(timeout=2) + status = KC.iopub_channel.get_msg(timeout=2) for tst in validate_message(status, 'status', msg_id): yield tst yield nt.assert_equal(status['content']['execution_state'], 'idle') - yield nt.assert_raises(Empty, KM.iopub_channel.get_msg, timeout=0.1) + yield nt.assert_raises(Empty, KC.iopub_channel.get_msg, timeout=0.1) count_2 = reply['execution_count'] yield nt.assert_equal(count_2, count) @@ -343,7 +338,7 @@ def test_execute_error(): yield nt.assert_equal(reply['status'], 'error') yield nt.assert_equal(reply['ename'], 'ZeroDivisionError') - pyerr = KM.iopub_channel.get_msg(timeout=2) + pyerr = KC.iopub_channel.get_msg(timeout=2) for tst in validate_message(pyerr, 'pyerr', msg_id): yield tst @@ -382,10 +377,8 @@ def test_user_expressions(): def test_oinfo(): flush_channels() - shell = KM.shell_channel - - msg_id = shell.object_info('a') - reply = shell.get_msg(timeout=2) + msg_id = KC.object_info('a') + reply = KC.get_shell_msg(timeout=2) for tst in validate_message(reply, 'object_info_reply', msg_id): yield tst @@ -394,12 +387,10 @@ def test_oinfo(): def test_oinfo_found(): flush_channels() - shell = KM.shell_channel - msg_id, reply = execute(code='a=5') - msg_id = shell.object_info('a') - reply = shell.get_msg(timeout=2) + msg_id = KC.object_info('a') + reply = KC.get_shell_msg(timeout=2) for tst in validate_message(reply, 'object_info_reply', msg_id): yield tst content = reply['content'] @@ -412,12 +403,10 @@ def test_oinfo_found(): def test_oinfo_detail(): flush_channels() - shell = KM.shell_channel - msg_id, reply = execute(code='ip=get_ipython()') - msg_id = shell.object_info('ip.object_inspect', detail_level=2) - reply = shell.get_msg(timeout=2) + msg_id = KC.object_info('ip.object_inspect', detail_level=2) + reply = KC.get_shell_msg(timeout=2) for tst in validate_message(reply, 'object_info_reply', msg_id): yield tst content = reply['content'] @@ -431,10 +420,8 @@ def test_oinfo_detail(): def test_oinfo_not_found(): flush_channels() - shell = KM.shell_channel - - msg_id = shell.object_info('dne') - reply = shell.get_msg(timeout=2) + msg_id = KC.object_info('dne') + reply = KC.get_shell_msg(timeout=2) for tst in validate_message(reply, 'object_info_reply', msg_id): yield tst content = reply['content'] @@ -445,12 +432,10 @@ def test_oinfo_not_found(): def test_complete(): flush_channels() - shell = KM.shell_channel - msg_id, reply = execute(code="alpha = albert = 5") - msg_id = shell.complete('al', 'al', 2) - reply = shell.get_msg(timeout=2) + msg_id = KC.complete('al', 'al', 2) + reply = KC.get_shell_msg(timeout=2) for tst in validate_message(reply, 'complete_reply', msg_id): yield tst matches = reply['content']['matches'] @@ -462,10 +447,8 @@ def test_complete(): def test_kernel_info_request(): flush_channels() - shell = KM.shell_channel - - msg_id = shell.kernel_info() - reply = shell.get_msg(timeout=2) + msg_id = KC.kernel_info() + reply = KC.get_shell_msg(timeout=2) for tst in validate_message(reply, 'kernel_info_reply', msg_id): yield tst @@ -479,7 +462,7 @@ def test_stream(): msg_id, reply = execute("print('hi')") - stdout = KM.iopub_channel.get_msg(timeout=2) + stdout = KC.iopub_channel.get_msg(timeout=2) for tst in validate_message(stdout, 'stream', msg_id): yield tst content = stdout['content'] @@ -493,7 +476,7 @@ def test_display_data(): msg_id, reply = execute("from IPython.core.display import display; display(1)") - display = KM.iopub_channel.get_msg(timeout=2) + display = KC.iopub_channel.get_msg(timeout=2) for tst in validate_message(display, 'display_data', parent=msg_id): yield tst data = display['content']['data'] diff --git a/IPython/kernel/tests/test_multikernelmanager.py b/IPython/kernel/tests/test_multikernelmanager.py index 49a5ac2..0c6ffe5 100644 --- a/IPython/kernel/tests/test_multikernelmanager.py +++ b/IPython/kernel/tests/test_multikernelmanager.py @@ -8,13 +8,15 @@ from IPython.testing import decorators as dec from IPython.config.loader import Config from IPython.utils.localinterfaces import LOCALHOST -from IPython.kernel.kernelmanager import KernelManager +from IPython.kernel import KernelManager from IPython.kernel.multikernelmanager import MultiKernelManager class TestKernelManager(TestCase): def _get_tcp_km(self): - return MultiKernelManager() + c = Config() + km = MultiKernelManager(config=c) + return km def _get_ipc_km(self): c = Config() @@ -25,10 +27,12 @@ class TestKernelManager(TestCase): def _run_lifecycle(self, km): kid = km.start_kernel(stdout=PIPE, stderr=PIPE) + self.assertTrue(km.is_alive(kid)) self.assertTrue(kid in km) self.assertTrue(kid in km.list_kernel_ids()) self.assertEqual(len(km),1) km.restart_kernel(kid) + self.assertTrue(km.is_alive(kid)) self.assertTrue(kid in km.list_kernel_ids()) # We need a delay here to give the restarting kernel a chance to # restart. Otherwise, the interrupt will kill it, causing the test @@ -51,13 +55,13 @@ class TestKernelManager(TestCase): self.assertEqual(ip, cinfo['ip']) self.assertTrue('stdin_port' in cinfo) self.assertTrue('iopub_port' in cinfo) - stream = km.create_iopub_stream(kid) + stream = km.connect_iopub(kid) stream.close() self.assertTrue('shell_port' in cinfo) - stream = km.create_shell_stream(kid) + stream = km.connect_shell(kid) stream.close() self.assertTrue('hb_port' in cinfo) - stream = km.create_hb_stream(kid) + stream = km.connect_hb(kid) stream.close() km.shutdown_kernel(kid) diff --git a/IPython/kernel/tests/test_public_api.py b/IPython/kernel/tests/test_public_api.py index aca6cca..39cd7e7 100644 --- a/IPython/kernel/tests/test_public_api.py +++ b/IPython/kernel/tests/test_public_api.py @@ -25,11 +25,17 @@ from IPython import kernel @dec.parametric def test_kms(): - for base in ("", "Blocking", "Multi"): + for base in ("", "Multi"): KM = base + "KernelManager" yield nt.assert_true(KM in dir(kernel), KM) @dec.parametric +def test_kcs(): + for base in ("", "Blocking"): + KM = base + "KernelClient" + yield nt.assert_true(KM in dir(kernel), KM) + +@dec.parametric def test_launcher(): for name in launcher.__all__: yield nt.assert_true(name in dir(kernel), name) diff --git a/IPython/kernel/zmq/ipkernel.py b/IPython/kernel/zmq/ipkernel.py index d69ef9c..ffa5349 100755 --- a/IPython/kernel/zmq/ipkernel.py +++ b/IPython/kernel/zmq/ipkernel.py @@ -275,6 +275,9 @@ class Kernel(Configurable): for s in self.shell_streams: s.on_recv(make_dispatcher(s), copy=False) + + # publish idle status + self._publish_status('starting') def do_one_iteration(self): """step eventloop just once""" diff --git a/IPython/kernel/zmq/kernelapp.py b/IPython/kernel/zmq/kernelapp.py index fafc7b1..5e64edf 100644 --- a/IPython/kernel/zmq/kernelapp.py +++ b/IPython/kernel/zmq/kernelapp.py @@ -69,6 +69,7 @@ kernel_aliases.update({ 'shell' : 'IPKernelApp.shell_port', 'iopub' : 'IPKernelApp.iopub_port', 'stdin' : 'IPKernelApp.stdin_port', + 'control' : 'IPKernelApp.control_port', 'f' : 'IPKernelApp.connection_file', 'parent': 'IPKernelApp.parent', 'transport': 'IPKernelApp.transport', @@ -145,7 +146,8 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp): hb_port = Integer(0, config=True, help="set the heartbeat port [default: random]") shell_port = Integer(0, config=True, help="set the shell (ROUTER) port [default: random]") iopub_port = Integer(0, config=True, help="set the iopub (PUB) port [default: random]") - stdin_port = Integer(0, config=True, help="set the stdin (DEALER) port [default: random]") + stdin_port = Integer(0, config=True, help="set the stdin (ROUTER) port [default: random]") + control_port = Integer(0, config=True, help="set the control (ROUTER) port [default: random]") connection_file = Unicode('', config=True, help="""JSON file in which to store connection info [default: kernel-.json] @@ -227,7 +229,7 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp): if self.ip == self._ip_default() and 'ip' in cfg: # not overridden by config or cl_args self.ip = cfg['ip'] - for channel in ('hb', 'shell', 'iopub', 'stdin'): + for channel in ('hb', 'shell', 'iopub', 'stdin', 'control'): name = channel + '_port' if getattr(self, name) == 0 and name in cfg: # not overridden by config or cl_args @@ -241,7 +243,7 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp): self.log.debug("Writing connection file: %s", cf) write_connection_file(cf, ip=self.ip, key=self.session.key, transport=self.transport, shell_port=self.shell_port, stdin_port=self.stdin_port, hb_port=self.hb_port, - iopub_port=self.iopub_port) + iopub_port=self.iopub_port, control_port=self.control_port) def cleanup_connection_file(self): cf = self.abs_connection_file @@ -257,7 +259,7 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp): """cleanup ipc files if we wrote them""" if self.transport != 'ipc': return - for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port): + for port in (self.shell_port, self.iopub_port, self.stdin_port, self.hb_port, self.control_port): ipcfile = "%s-%i" % (self.ip, port) try: os.remove(ipcfile) @@ -282,15 +284,19 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp): self.shell_socket = context.socket(zmq.ROUTER) self.shell_port = self._bind_socket(self.shell_socket, self.shell_port) - self.log.debug("shell ROUTER Channel on port: %i"%self.shell_port) + self.log.debug("shell ROUTER Channel on port: %i" % self.shell_port) self.iopub_socket = context.socket(zmq.PUB) self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port) - self.log.debug("iopub PUB Channel on port: %i"%self.iopub_port) + self.log.debug("iopub PUB Channel on port: %i" % self.iopub_port) self.stdin_socket = context.socket(zmq.ROUTER) self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port) - self.log.debug("stdin ROUTER Channel on port: %i"%self.stdin_port) + self.log.debug("stdin ROUTER Channel on port: %i" % self.stdin_port) + + self.control_socket = context.socket(zmq.ROUTER) + self.control_port = self._bind_socket(self.control_socket, self.control_port) + self.log.debug("control ROUTER Channel on port: %i" % self.control_port) def init_heartbeat(self): """start the heart beating""" @@ -299,7 +305,7 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp): hb_ctx = zmq.Context() self.heartbeat = Heartbeat(hb_ctx, (self.transport, self.ip, self.hb_port)) self.hb_port = self.heartbeat.port - self.log.debug("Heartbeat REP Channel on port: %i"%self.hb_port) + self.log.debug("Heartbeat REP Channel on port: %i" % self.hb_port) self.heartbeat.start() # Helper to make it easier to connect to an existing kernel. @@ -321,7 +327,8 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp): self.ports = dict(shell=self.shell_port, iopub=self.iopub_port, - stdin=self.stdin_port, hb=self.hb_port) + stdin=self.stdin_port, hb=self.hb_port, + control=self.control_port) def init_session(self): """create our session object""" @@ -353,11 +360,12 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp): def init_kernel(self): """Create the Kernel object itself""" shell_stream = ZMQStream(self.shell_socket) + control_stream = ZMQStream(self.control_socket) kernel_factory = import_item(str(self.kernel_class)) kernel = kernel_factory(config=self.config, session=self.session, - shell_streams=[shell_stream], + shell_streams=[shell_stream, control_stream], iopub_socket=self.iopub_socket, stdin_socket=self.stdin_socket, log=self.log, diff --git a/IPython/kernel/zmq/tests/test_embed_kernel.py b/IPython/kernel/zmq/tests/test_embed_kernel.py index e1cddab..da1dc61 100644 --- a/IPython/kernel/zmq/tests/test_embed_kernel.py +++ b/IPython/kernel/zmq/tests/test_embed_kernel.py @@ -22,7 +22,7 @@ from subprocess import Popen, PIPE import nose.tools as nt -from IPython.kernel.blockingkernelmanager import BlockingKernelManager +from IPython.kernel import BlockingKernelClient from IPython.utils import path, py3compat #------------------------------------------------------------------------------- @@ -83,14 +83,14 @@ def setup_kernel(cmd): kernel.terminate() raise IOError("Connection file %r never arrived" % connection_file) - km = BlockingKernelManager(connection_file=connection_file) - km.load_connection_file() - km.start_channels() + client = BlockingKernelClient(connection_file=connection_file) + client.load_connection_file() + client.start_channels() try: - yield km + yield client finally: - km.stop_channels() + client.stop_channels() kernel.terminate() def test_embed_kernel_basic(): @@ -105,23 +105,21 @@ def test_embed_kernel_basic(): '', ]) - with setup_kernel(cmd) as km: - shell = km.shell_channel - + with setup_kernel(cmd) as client: # oinfo a (int) - msg_id = shell.object_info('a') - msg = shell.get_msg(block=True, timeout=2) + msg_id = client.object_info('a') + msg = client.get_shell_msg(block=True, timeout=2) content = msg['content'] nt.assert_true(content['found']) - msg_id = shell.execute("c=a*2") - msg = shell.get_msg(block=True, timeout=2) + msg_id = client.execute("c=a*2") + msg = client.get_shell_msg(block=True, timeout=2) content = msg['content'] nt.assert_equal(content['status'], u'ok') # oinfo c (should be 10) - msg_id = shell.object_info('c') - msg = shell.get_msg(block=True, timeout=2) + msg_id = client.object_info('c') + msg = client.get_shell_msg(block=True, timeout=2) content = msg['content'] nt.assert_true(content['found']) nt.assert_equal(content['string_form'], u'10') @@ -138,26 +136,24 @@ def test_embed_kernel_namespace(): '', ]) - with setup_kernel(cmd) as km: - shell = km.shell_channel - + with setup_kernel(cmd) as client: # oinfo a (int) - msg_id = shell.object_info('a') - msg = shell.get_msg(block=True, timeout=2) + msg_id = client.object_info('a') + msg = client.get_shell_msg(block=True, timeout=2) content = msg['content'] nt.assert_true(content['found']) nt.assert_equal(content['string_form'], u'5') # oinfo b (str) - msg_id = shell.object_info('b') - msg = shell.get_msg(block=True, timeout=2) + msg_id = client.object_info('b') + msg = client.get_shell_msg(block=True, timeout=2) content = msg['content'] nt.assert_true(content['found']) nt.assert_equal(content['string_form'], u'hi there') # oinfo c (undefined) - msg_id = shell.object_info('c') - msg = shell.get_msg(block=True, timeout=2) + msg_id = client.object_info('c') + msg = client.get_shell_msg(block=True, timeout=2) content = msg['content'] nt.assert_false(content['found']) @@ -176,18 +172,17 @@ def test_embed_kernel_reentrant(): '', ]) - with setup_kernel(cmd) as km: - shell = km.shell_channel + with setup_kernel(cmd) as client: for i in range(5): - msg_id = shell.object_info('count') - msg = shell.get_msg(block=True, timeout=2) + msg_id = client.object_info('count') + msg = client.get_shell_msg(block=True, timeout=2) content = msg['content'] nt.assert_true(content['found']) nt.assert_equal(content['string_form'], unicode(i)) # exit from embed_kernel - shell.execute("get_ipython().exit_now = True") - msg = shell.get_msg(block=True, timeout=2) + client.execute("get_ipython().exit_now = True") + msg = client.get_shell_msg(block=True, timeout=2) time.sleep(0.2) diff --git a/docs/source/development/messaging.txt b/docs/source/development/messaging.txt index 3e18964..5b150a5 100644 --- a/docs/source/development/messaging.txt +++ b/docs/source/development/messaging.txt @@ -195,7 +195,7 @@ The ``user_`` fields deserve a detailed explanation. In the past, IPython had the notion of a prompt string that allowed arbitrary code to be evaluated, and this was put to good use by many in creating prompts that displayed system status, path information, and even more esoteric uses like remote instrument -status aqcuired over the network. But now that IPython has a clean separation +status acquired over the network. But now that IPython has a clean separation between the kernel and the clients, the kernel has no prompt knowledge; prompts are a frontend-side feature, and it should be even possible for different frontends to display different prompts while interacting with the same kernel. @@ -934,7 +934,8 @@ Message type: ``status``:: content = { # When the kernel starts to execute code, it will enter the 'busy' # state and when it finishes, it will enter the 'idle' state. - execution_state : ('busy', 'idle') + # The kernel will publish state 'starting' exactly once at process startup. + execution_state : ('busy', 'idle', 'starting') } Kernel crashes diff --git a/examples/inprocess/embedded_qtconsole.py b/examples/inprocess/embedded_qtconsole.py new file mode 100644 index 0000000..f453b2d --- /dev/null +++ b/examples/inprocess/embedded_qtconsole.py @@ -0,0 +1,45 @@ +import os + +from IPython.frontend.qt.console.rich_ipython_widget import RichIPythonWidget +from IPython.frontend.qt.inprocess import QtInProcessKernelManager +from IPython.lib import guisupport + + +def print_process_id(): + print 'Process ID is:', os.getpid() + + +def main(): + # Print the ID of the main process + print_process_id() + + app = guisupport.get_app_qt4() + + # Create an in-process kernel + # >>> print_process_id() + # will print the same process ID as the main process + kernel_manager = QtInProcessKernelManager() + kernel_manager.start_kernel() + kernel = kernel_manager.kernel + kernel.gui = 'qt4' + kernel.shell.push({'foo': 43, 'print_process_id': print_process_id}) + + kernel_client = kernel_manager.client() + kernel_client.start_channels() + + def stop(): + kernel_client.stop_channels() + kernel_manager.shutdown_kernel() + app.exit() + + control = RichIPythonWidget() + control.kernel_manager = kernel_manager + control.kernel_client = kernel_client + control.exit_requested.connect(stop) + control.show() + + guisupport.start_event_loop_qt4(app) + + +if __name__ == '__main__': + main() diff --git a/examples/inprocess/embedded_terminal.py b/examples/inprocess/embedded_terminal.py new file mode 100644 index 0000000..469f2e1 --- /dev/null +++ b/examples/inprocess/embedded_terminal.py @@ -0,0 +1,30 @@ +import os + +from IPython.kernel.inprocess import InProcessKernelManager +from IPython.frontend.terminal.console.interactiveshell import ZMQTerminalInteractiveShell + + +def print_process_id(): + print 'Process ID is:', os.getpid() + + +def main(): + print_process_id() + + # Create an in-process kernel + # >>> print_process_id() + # will print the same process ID as the main process + kernel_manager = InProcessKernelManager() + kernel_manager.start_kernel() + kernel = kernel_manager.kernel + kernel.gui = 'qt4' + kernel.shell.push({'foo': 43, 'print_process_id': print_process_id}) + client = kernel_manager.client() + client.start_channels() + + shell = ZMQTerminalInteractiveShell(manager=kernel_manager, client=client) + shell.mainloop() + + +if __name__ == '__main__': + main()