##// END OF EJS Templates
add `KernelManager.client()`
add `KernelManager.client()`

File last commit:

r10294:a1e189cc
r10300:548f9a02
Show More
client.py
216 lines | 7.8 KiB | text/x-python | PythonLexer
"""Base class to manage the interaction with a running kernel
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2013 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
from __future__ import absolute_import
import zmq
# Local imports
from IPython.config.configurable import LoggingConfigurable
from IPython.utils.traitlets import (
Any, Instance, Type,
)
from .zmq.session import Session
from .channels import (
ShellChannel, IOPubChannel,
HBChannel, StdInChannel,
)
from .clientabc import KernelClientABC
from .connect import ConnectionFileMixin
#-----------------------------------------------------------------------------
# Main kernel client class
#-----------------------------------------------------------------------------
class KernelClient(LoggingConfigurable, ConnectionFileMixin):
"""Communicates with a single kernel on any host via zmq channels.
There are four channels associated with each kernel:
* shell: for request/reply calls to the kernel.
* iopub: for the kernel to publish results to frontends.
* hb: for monitoring the kernel's heartbeat.
* stdin: for frontends to reply to raw_input calls in the kernel.
"""
# The PyZMQ Context to use for communication with the kernel.
context = Instance(zmq.Context)
def _context_default(self):
return zmq.Context.instance()
# The Session to use for communication with the kernel.
session = Instance(Session)
def _session_default(self):
return Session(config=self.config)
# The classes to use for the various channels
shell_channel_class = Type(ShellChannel)
iopub_channel_class = Type(IOPubChannel)
stdin_channel_class = Type(StdInChannel)
hb_channel_class = Type(HBChannel)
# Protected traits
_shell_channel = Any
_iopub_channel = Any
_stdin_channel = Any
_hb_channel = Any
# def __init__(self, *args, **kwargs):
# super(KernelClient, self).__init__(*args, **kwargs)
# # setup channel proxy methods, e.g.
# # Client.execute => shell_channel.execute
# for channel in ['shell', 'iopub', 'stdin', 'hb']:
# cls = getattr(self, '%s_channel_class' % channel)
# for method in cls.proxy_methods:
# setattr(self, method, self._proxy_method(channel, method))
#
#--------------------------------------------------------------------------
# Channel proxy methods
#--------------------------------------------------------------------------
def _get_msg(channel, *args, **kwargs):
return channel.get_msg(*args, **kwargs)
def get_shell_msg(self, *args, **kwargs):
"""Get a message from the shell channel"""
return self.shell_channel.get_msg(*args, **kwargs)
def get_iopub_msg(self, *args, **kwargs):
"""Get a message from the iopub channel"""
return self.iopub_channel.get_msg(*args, **kwargs)
def get_stdin_msg(self, *args, **kwargs):
"""Get a message from the stdin channel"""
return self.stdin_channel.get_msg(*args, **kwargs)
#--------------------------------------------------------------------------
# Channel management methods
#--------------------------------------------------------------------------
def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
"""Starts the channels for this kernel.
This will create the channels if they do not exist and then start
them (their activity runs in a thread). If port numbers of 0 are
being used (random ports) then you must first call
:method:`start_kernel`. If the channels have been stopped and you
call this, :class:`RuntimeError` will be raised.
"""
if shell:
self.shell_channel.start()
for method in self.shell_channel.proxy_methods:
setattr(self, method, getattr(self.shell_channel, method))
if iopub:
self.iopub_channel.start()
for method in self.iopub_channel.proxy_methods:
setattr(self, method, getattr(self.iopub_channel, method))
if stdin:
self.stdin_channel.start()
for method in self.stdin_channel.proxy_methods:
setattr(self, method, getattr(self.stdin_channel, method))
self.shell_channel.allow_stdin = True
else:
self.shell_channel.allow_stdin = False
if hb:
self.hb_channel.start()
def stop_channels(self):
"""Stops all the running channels for this kernel.
This stops their event loops and joins their threads.
"""
if self.shell_channel.is_alive():
self.shell_channel.stop()
if self.iopub_channel.is_alive():
self.iopub_channel.stop()
if self.stdin_channel.is_alive():
self.stdin_channel.stop()
if self.hb_channel.is_alive():
self.hb_channel.stop()
@property
def channels_running(self):
"""Are any of the channels created and running?"""
return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
self.stdin_channel.is_alive() or self.hb_channel.is_alive())
def _make_url(self, port):
"""Make a zmq url with a port.
There are two cases that this handles:
* tcp: tcp://ip:port
* ipc: ipc://ip-port
"""
if self.transport == 'tcp':
return "tcp://%s:%i" % (self.ip, port)
else:
return "%s://%s-%s" % (self.transport, self.ip, port)
@property
def shell_channel(self):
"""Get the shell channel object for this kernel."""
if self._shell_channel is None:
self._shell_channel = self.shell_channel_class(
self.context, self.session, self._make_url(self.shell_port)
)
return self._shell_channel
@property
def iopub_channel(self):
"""Get the iopub channel object for this kernel."""
if self._iopub_channel is None:
self._iopub_channel = self.iopub_channel_class(
self.context, self.session, self._make_url(self.iopub_port)
)
return self._iopub_channel
@property
def stdin_channel(self):
"""Get the stdin channel object for this kernel."""
if self._stdin_channel is None:
self._stdin_channel = self.stdin_channel_class(
self.context, self.session, self._make_url(self.stdin_port)
)
return self._stdin_channel
@property
def hb_channel(self):
"""Get the hb channel object for this kernel."""
if self._hb_channel is None:
self._hb_channel = self.hb_channel_class(
self.context, self.session, self._make_url(self.hb_port)
)
return self._hb_channel
def is_alive(self):
"""Is the kernel process still running?"""
if self._hb_channel is not None:
# We didn't start the kernel with this KernelManager so we
# use the heartbeat.
return self._hb_channel.is_beating()
else:
# no heartbeat and not local, we can't tell if it's running,
# so naively return True
return True
#-----------------------------------------------------------------------------
# ABC Registration
#-----------------------------------------------------------------------------
KernelClientABC.register(KernelClient)