##// END OF EJS Templates
Move methods to send specific messages from channels to KernelClient
Move methods to send specific messages from channels to KernelClient

File last commit:

r19215:03729e7b
r19215:03729e7b
Show More
channels.py
192 lines | 5.9 KiB | text/x-python | PythonLexer
MinRK
split KernelManager into KernelManager + KernelClient
r10285 """Blocking channels
Fernando Perez
Rework messaging to better conform to our spec....
r2926
Useful for test suites and blocking terminal interfaces.
"""
Thomas Kluyver
Implement blocking channels without Python threads
r19208 # Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
Thomas Kluyver
Update imports for Python 3...
r13354 try:
from queue import Queue, Empty # Py 3
except ImportError:
from Queue import Queue, Empty # Py 2
Brian Granger
Creating an ABC for kernel managers and channels.
r9121
Thomas Kluyver
Remove some dead code
r19212 from IPython.kernel.channelsabc import ShellChannelABC, IOPubChannelABC, \
StdInChannelABC
Thomas Kluyver
Factor out code to set up ZMQ sockets
r19211 from IPython.kernel.channels import HBChannel,\
make_iopub_socket, make_shell_socket, make_stdin_socket,\
InvalidPortNumber, major_protocol_version
Thomas Kluyver
Implement blocking channels without Python threads
r19208 from IPython.utils.py3compat import string_types, iteritems
# some utilities to validate message structure, these might get moved elsewhere
# if they prove to have more generic utility
def validate_string_list(lst):
"""Validate that the input is a list of strings.
Raises ValueError if not."""
if not isinstance(lst, list):
raise ValueError('input %r must be a list' % lst)
for x in lst:
if not isinstance(x, string_types):
raise ValueError('element %r in list must be a string' % x)
def validate_string_dict(dct):
"""Validate that the input is a dict with string keys and values.
Raises ValueError if not."""
for k,v in iteritems(dct):
if not isinstance(k, string_types):
raise ValueError('key %r in dict must be a string' % k)
if not isinstance(v, string_types):
raise ValueError('value %r in dict must be a string' % v)
Brian Granger
Added initial draft of blockingkernelmanager.py.
r2693
Thomas Kluyver
Implement blocking channels without Python threads
r19208 class ZMQSocketChannel(object):
"""The base class for the channels that use ZMQ sockets."""
context = None
session = None
socket = None
ioloop = None
stream = None
_address = None
_exiting = False
proxy_methods = []
Brian Granger
Cleanup naming and organization of channels....
r9120
Thomas Kluyver
Implement blocking channels without Python threads
r19208 def __init__(self, context, session, address):
"""Create a channel.
MinRK
relocate redundantly-named kernel files...
r10283
Thomas Kluyver
Implement blocking channels without Python threads
r19208 Parameters
----------
context : :class:`zmq.Context`
The ZMQ context to use.
session : :class:`session.Session`
The session to use.
address : zmq url
Standard (ip, port) tuple that the kernel is listening on.
"""
super(ZMQSocketChannel, self).__init__()
self.daemon = True
MinRK
relocate redundantly-named kernel files...
r10283
Thomas Kluyver
Implement blocking channels without Python threads
r19208 self.context = context
self.session = session
if isinstance(address, tuple):
if address[1] == 0:
message = 'The port number for a channel cannot be 0.'
raise InvalidPortNumber(message)
address = "tcp://%s:%i" % address
self._address = address
def _recv(self, **kwargs):
msg = self.socket.recv_multipart(**kwargs)
ident,smsg = self.session.feed_identities(msg)
return self.session.deserialize(smsg)
MinRK
relocate redundantly-named kernel files...
r10283
Brian Granger
Cleanup naming and organization of channels....
r9120 def get_msg(self, block=True, timeout=None):
""" Gets a message if there is one that is ready. """
Thomas Kluyver
Implement blocking channels without Python threads
r19208 if block:
if timeout is not None:
timeout *= 1000 # seconds to ms
ready = self.socket.poll(timeout)
else:
ready = self.socket.poll(timeout=0)
if ready:
return self._recv()
else:
raise Empty
MinRK
relocate redundantly-named kernel files...
r10283
Brian Granger
Cleanup naming and organization of channels....
r9120 def get_msgs(self):
""" Get all messages that are currently ready. """
msgs = []
while True:
try:
msgs.append(self.get_msg(block=False))
Thomas Kluyver
Update imports for Python 3...
r13354 except Empty:
Brian Granger
Cleanup naming and organization of channels....
r9120 break
return msgs
MinRK
relocate redundantly-named kernel files...
r10283
Brian Granger
Cleanup naming and organization of channels....
r9120 def msg_ready(self):
""" Is there a message that has been received? """
Thomas Kluyver
Implement blocking channels without Python threads
r19208 return bool(self.socket.poll(timeout=0))
def close(self):
if self.socket is not None:
try:
self.socket.close(linger=0)
except Exception:
pass
self.socket = None
stop = close
def is_alive(self):
return (self.socket is not None)
@property
def address(self):
"""Get the channel's address as a zmq url string.
These URLS have the form: 'tcp://127.0.0.1:5555'.
"""
return self._address
def _queue_send(self, msg):
"""Pass a message to the ZMQ socket to send
"""
self.session.send(self.socket, msg)
class BlockingShellChannel(ZMQSocketChannel):
"""The shell channel for issuing request/replies to the kernel."""
def start(self):
Thomas Kluyver
Factor out code to set up ZMQ sockets
r19211 self.socket = make_stdin_socket(self.context, self.session.bsession, self.address)
Thomas Kluyver
Implement blocking channels without Python threads
r19208
def _handle_kernel_info_reply(self, msg):
"""handle kernel info reply
sets protocol adaptation version
"""
adapt_version = int(msg['content']['protocol_version'].split('.')[0])
if adapt_version != major_protocol_version:
self.session.adapt_version = adapt_version
def _recv(self, **kwargs):
# Listen for kernel_info_reply message to do protocol adaptation
msg = ZMQSocketChannel._recv(self, **kwargs)
MinRK
interrogate kernel_info to get protocol version for adaptation
r16697 if msg['msg_type'] == 'kernel_info_reply':
self._handle_kernel_info_reply(msg)
Thomas Kluyver
Implement blocking channels without Python threads
r19208 return msg
class BlockingIOPubChannel(ZMQSocketChannel):
"""The iopub channel which listens for messages that the kernel publishes.
This channel is where all output is published to frontends.
"""
def start(self):
Thomas Kluyver
Factor out code to set up ZMQ sockets
r19211 self.socket = make_iopub_socket(self.context, self.session.bsession, self.address)
Thomas Kluyver
Implement blocking channels without Python threads
r19208
class BlockingStdInChannel(ZMQSocketChannel):
"""The stdin channel to handle raw_input requests that the kernel makes."""
msg_queue = None
proxy_methods = ['input']
Fernando Perez
Rework messaging to better conform to our spec....
r2926
Thomas Kluyver
Implement blocking channels without Python threads
r19208 def start(self):
Thomas Kluyver
Factor out code to set up ZMQ sockets
r19211 self.socket = make_stdin_socket(self.context, self.session.bsession, self.address)
Brian Granger
Cleanup naming and organization of channels....
r9120
Thomas Kluyver
Remove some dead code
r19212 ShellChannelABC.register(BlockingShellChannel)
IOPubChannelABC.register(BlockingIOPubChannel)
StdInChannelABC.register(BlockingStdInChannel)
Brian Granger
Cleanup naming and organization of channels....
r9120
class BlockingHBChannel(HBChannel):
MinRK
relocate redundantly-named kernel files...
r10283
MinRK
Fixes to the heartbeat channel...
r5614 # This kernel needs quicker monitoring, shorten to 1 sec.
# less than 0.5s is unreliable, and will get occasional
# false reports of missed beats.
time_to_dead = 1.
Fernando Perez
Rework messaging to better conform to our spec....
r2926
def call_handlers(self, since_last_heartbeat):
epatters
Refactor kernel managers in preparation for the EmbeddedKernel.
r8408 """ Pause beating on missed heartbeat. """
epatters
Add missing msg queue + comment out debug printing in BlockingKernelManager.
r3825 pass