kernelmanager.py
593 lines
| 19.6 KiB
| text/x-python
|
PythonLexer
Brian Granger
|
r2699 | """Classes to manage the interaction with a running kernel. | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | Todo | ||
==== | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | * Create logger to handle debugging and console messages. | ||
Brian Granger
|
r2606 | """ | ||
Brian Granger
|
r2699 | #----------------------------------------------------------------------------- | ||
# Copyright (C) 2008-2010 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
epatters
|
r2611 | # Standard library imports. | ||
Brian Granger
|
r2606 | from Queue import Queue, Empty | ||
epatters
|
r2667 | from subprocess import Popen | ||
Brian Granger
|
r2606 | from threading import Thread | ||
epatters
|
r2614 | import time | ||
Brian Granger
|
r2606 | |||
epatters
|
r2611 | # System library imports. | ||
Brian Granger
|
r2606 | import zmq | ||
from zmq import POLLIN, POLLOUT, POLLERR | ||||
from zmq.eventloop import ioloop | ||||
epatters
|
r2611 | |||
# Local imports. | ||||
Brian Granger
|
r2699 | from IPython.utils.traitlets import HasTraits, Any, Instance, Type | ||
epatters
|
r2667 | from kernel import launch_kernel | ||
Brian Granger
|
r2606 | from session import Session | ||
Brian Granger
|
r2699 | #----------------------------------------------------------------------------- | ||
# Constants and exceptions | ||||
#----------------------------------------------------------------------------- | ||||
epatters
|
r2667 | |||
LOCALHOST = '127.0.0.1' | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | class InvalidPortNumber(Exception): | ||
Brian Granger
|
r2606 | pass | ||
Brian Granger
|
r2699 | #----------------------------------------------------------------------------- | ||
# ZMQ Socket Channel classes | ||||
#----------------------------------------------------------------------------- | ||||
Brian Granger
|
r2606 | |||
class ZmqSocketChannel(Thread): | ||||
Brian Granger
|
r2699 | """The base class for the channels that use ZMQ sockets. | ||
epatters
|
r2631 | """ | ||
Brian Granger
|
r2695 | context = None | ||
session = None | ||||
socket = None | ||||
ioloop = None | ||||
iostate = None | ||||
Brian Granger
|
r2699 | _address = None | ||
Brian Granger
|
r2695 | |||
Brian Granger
|
r2699 | def __init__(self, context, session, address): | ||
"""Create a channel | ||||
Brian Granger
|
r2695 | |||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
context : zmq.Context | ||||
The ZMQ context to use. | ||||
session : session.Session | ||||
The session to use. | ||||
address : tuple | ||||
Standard (ip, port) tuple that the kernel is listening on. | ||||
""" | ||||
epatters
|
r2632 | super(ZmqSocketChannel, self).__init__() | ||
self.daemon = True | ||||
Brian Granger
|
r2606 | self.context = context | ||
self.session = session | ||||
Brian Granger
|
r2699 | if address[1] == 0: | ||
epatters
|
r2702 | message = 'The port number for a channel cannot be 0.' | ||
raise InvalidPortNumber(message) | ||||
Brian Granger
|
r2699 | self._address = address | ||
epatters
|
r2631 | |||
epatters
|
r2632 | def stop(self): | ||
Brian Granger
|
r2699 | """Stop the channel's activity. | ||
Brian Granger
|
r2691 | |||
Brian Granger
|
r2699 | This calls :method:`Thread.join` and returns when the thread | ||
terminates. :class:`RuntimeError` will be raised if | ||||
:method:`self.start` is called again. | ||||
epatters
|
r2632 | """ | ||
epatters
|
r2642 | self.join() | ||
Brian Granger
|
r2699 | @property | ||
def address(self): | ||||
"""Get the channel's address as an (ip, port) tuple. | ||||
By the default, the address is (localhost, 0), where 0 means a random | ||||
port. | ||||
epatters
|
r2632 | """ | ||
return self._address | ||||
Brian Granger
|
r2695 | def add_io_state(self, state): | ||
"""Add IO state to the eventloop. | ||||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR | ||||
The IO state flag to set. | ||||
Brian Granger
|
r2695 | This is thread safe as it uses the thread safe IOLoop.add_callback. | ||
""" | ||||
def add_io_state_callback(): | ||||
if not self.iostate & state: | ||||
self.iostate = self.iostate | state | ||||
self.ioloop.update_handler(self.socket, self.iostate) | ||||
self.ioloop.add_callback(add_io_state_callback) | ||||
def drop_io_state(self, state): | ||||
"""Drop IO state from the eventloop. | ||||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR | ||||
The IO state flag to set. | ||||
Brian Granger
|
r2695 | This is thread safe as it uses the thread safe IOLoop.add_callback. | ||
""" | ||||
def drop_io_state_callback(): | ||||
if self.iostate & state: | ||||
self.iostate = self.iostate & (~state) | ||||
self.ioloop.update_handler(self.socket, self.iostate) | ||||
self.ioloop.add_callback(drop_io_state_callback) | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | class XReqSocketChannel(ZmqSocketChannel): | ||
"""The XREQ channel for issues request/replies to the kernel. | ||||
""" | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | command_queue = None | ||
def __init__(self, context, session, address): | ||||
self.command_queue = Queue() | ||||
super(XReqSocketChannel, self).__init__(context, session, address) | ||||
Brian Granger
|
r2606 | |||
def run(self): | ||||
Brian Granger
|
r2699 | """The thread's main activity. Call start() instead.""" | ||
self.socket = self.context.socket(zmq.XREQ) | ||||
Brian Granger
|
r2606 | self.socket.setsockopt(zmq.IDENTITY, self.session.session) | ||
epatters
|
r2632 | self.socket.connect('tcp://%s:%i' % self.address) | ||
Brian Granger
|
r2606 | self.ioloop = ioloop.IOLoop() | ||
Brian Granger
|
r2699 | self.iostate = POLLERR|POLLIN | ||
Brian Granger
|
r2606 | self.ioloop.add_handler(self.socket, self._handle_events, | ||
Brian Granger
|
r2695 | self.iostate) | ||
Brian Granger
|
r2606 | self.ioloop.start() | ||
epatters
|
r2632 | def stop(self): | ||
self.ioloop.stop() | ||||
Brian Granger
|
r2699 | super(XReqSocketChannel, self).stop() | ||
Brian Granger
|
r2606 | |||
def call_handlers(self, msg): | ||||
Brian Granger
|
r2692 | """This method is called in the ioloop thread when a message arrives. | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2692 | Subclasses should override this method to handle incoming messages. | ||
It is important to remember that this method is called in the thread | ||||
so that some logic must be done to ensure that the application leve | ||||
handlers are called in the application thread. | ||||
""" | ||||
raise NotImplementedError('call_handlers must be defined in a subclass.') | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | def execute(self, code): | ||
"""Execute code in the kernel. | ||||
epatters
|
r2672 | |||
Parameters | ||||
---------- | ||||
Brian Granger
|
r2699 | code : str | ||
A string of Python code. | ||||
epatters
|
r2614 | |||
Brian Granger
|
r2699 | Returns | ||
------- | ||||
The msg_id of the message sent. | ||||
epatters
|
r2614 | """ | ||
Brian Granger
|
r2699 | # Create class for content/msg creation. Related to, but possibly | ||
# not in Session. | ||||
content = dict(code=code) | ||||
msg = self.session.msg('execute_request', content) | ||||
self._queue_request(msg) | ||||
return msg['header']['msg_id'] | ||||
epatters
|
r2614 | |||
Brian Granger
|
r2699 | def complete(self, text, line, block=None): | ||
"""Tab complete text, line, block in the kernel's namespace. | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
text : str | ||||
The text to complete. | ||||
line : str | ||||
The full line of text that is the surrounding context for the | ||||
text to complete. | ||||
block : str | ||||
The full block of code in which the completion is being requested. | ||||
Returns | ||||
------- | ||||
The msg_id of the message sent. | ||||
""" | ||||
content = dict(text=text, line=line) | ||||
msg = self.session.msg('complete_request', content) | ||||
self._queue_request(msg) | ||||
return msg['header']['msg_id'] | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | def object_info(self, oname): | ||
"""Get metadata information about an object. | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
oname : str | ||||
A string specifying the object name. | ||||
Returns | ||||
------- | ||||
The msg_id of the message sent. | ||||
""" | ||||
content = dict(oname=oname) | ||||
msg = self.session.msg('object_info_request', content) | ||||
self._queue_request(msg) | ||||
return msg['header']['msg_id'] | ||||
epatters
|
r2632 | |||
Brian Granger
|
r2606 | def _handle_events(self, socket, events): | ||
if events & POLLERR: | ||||
self._handle_err() | ||||
if events & POLLOUT: | ||||
self._handle_send() | ||||
if events & POLLIN: | ||||
self._handle_recv() | ||||
def _handle_recv(self): | ||||
msg = self.socket.recv_json() | ||||
epatters
|
r2609 | self.call_handlers(msg) | ||
Brian Granger
|
r2606 | |||
def _handle_send(self): | ||||
try: | ||||
msg = self.command_queue.get(False) | ||||
except Empty: | ||||
pass | ||||
else: | ||||
self.socket.send_json(msg) | ||||
Brian Granger
|
r2695 | if self.command_queue.empty(): | ||
self.drop_io_state(POLLOUT) | ||||
Brian Granger
|
r2606 | |||
def _handle_err(self): | ||||
Brian Granger
|
r2692 | # We don't want to let this go silently, so eventually we should log. | ||
Brian Granger
|
r2694 | raise zmq.ZMQError() | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | def _queue_request(self, msg): | ||
Brian Granger
|
r2606 | self.command_queue.put(msg) | ||
Brian Granger
|
r2695 | self.add_io_state(POLLOUT) | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | class SubSocketChannel(ZmqSocketChannel): | ||
"""The SUB channel which listens for messages that the kernel publishes. | ||||
""" | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | def __init__(self, context, session, address): | ||
super(SubSocketChannel, self).__init__(context, session, address) | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | def run(self): | ||
"""The thread's main activity. Call start() instead.""" | ||||
self.socket = self.context.socket(zmq.SUB) | ||||
self.socket.setsockopt(zmq.SUBSCRIBE,'') | ||||
self.socket.setsockopt(zmq.IDENTITY, self.session.session) | ||||
self.socket.connect('tcp://%s:%i' % self.address) | ||||
self.ioloop = ioloop.IOLoop() | ||||
self.iostate = POLLIN|POLLERR | ||||
self.ioloop.add_handler(self.socket, self._handle_events, | ||||
self.iostate) | ||||
self.ioloop.start() | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | def stop(self): | ||
self.ioloop.stop() | ||||
super(SubSocketChannel, self).stop() | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2697 | def call_handlers(self, msg): | ||
"""This method is called in the ioloop thread when a message arrives. | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2697 | Subclasses should override this method to handle incoming messages. | ||
It is important to remember that this method is called in the thread | ||||
so that some logic must be done to ensure that the application leve | ||||
handlers are called in the application thread. | ||||
Brian Granger
|
r2606 | """ | ||
Brian Granger
|
r2697 | raise NotImplementedError('call_handlers must be defined in a subclass.') | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | def flush(self, timeout=1.0): | ||
"""Immediately processes all pending messages on the SUB channel. | ||||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | This method is thread safe. | ||
Brian Granger
|
r2606 | |||
Brian Granger
|
r2699 | Parameters | ||
---------- | ||||
timeout : float, optional | ||||
The maximum amount of time to spend flushing, in seconds. The | ||||
default is one second. | ||||
""" | ||||
# We do the IOLoop callback process twice to ensure that the IOLoop | ||||
# gets to perform at least one full poll. | ||||
stop_time = time.time() + timeout | ||||
for i in xrange(2): | ||||
self._flushed = False | ||||
self.ioloop.add_callback(self._flush) | ||||
while not self._flushed and time.time() < stop_time: | ||||
time.sleep(0.01) | ||||
def _handle_events(self, socket, events): | ||||
# Turn on and off POLLOUT depending on if we have made a request | ||||
if events & POLLERR: | ||||
self._handle_err() | ||||
if events & POLLIN: | ||||
self._handle_recv() | ||||
def _handle_err(self): | ||||
# We don't want to let this go silently, so eventually we should log. | ||||
raise zmq.ZMQError() | ||||
def _handle_recv(self): | ||||
# Get all of the messages we can | ||||
while True: | ||||
try: | ||||
msg = self.socket.recv_json(zmq.NOBLOCK) | ||||
except zmq.ZMQError: | ||||
# Check the errno? | ||||
# Will this tigger POLLERR? | ||||
break | ||||
else: | ||||
self.call_handlers(msg) | ||||
def _flush(self): | ||||
"""Callback for :method:`self.flush`.""" | ||||
self._flushed = True | ||||
Brian Granger
|
r2606 | |||
class RepSocketChannel(ZmqSocketChannel): | ||||
Brian Granger
|
r2699 | """A reply channel to handle raw_input requests that the kernel makes.""" | ||
Brian Granger
|
r2606 | |||
epatters
|
r2707 | msg_queue = None | ||
def __init__(self, context, session, address): | ||||
self.msg_queue = Queue() | ||||
super(RepSocketChannel, self).__init__(context, session, address) | ||||
epatters
|
r2701 | def run(self): | ||
"""The thread's main activity. Call start() instead.""" | ||||
epatters
|
r2707 | self.socket = self.context.socket(zmq.XREQ) | ||
self.socket.setsockopt(zmq.IDENTITY, self.session.session) | ||||
self.socket.connect('tcp://%s:%i' % self.address) | ||||
epatters
|
r2701 | self.ioloop = ioloop.IOLoop() | ||
epatters
|
r2707 | self.iostate = POLLERR|POLLIN | ||
self.ioloop.add_handler(self.socket, self._handle_events, | ||||
self.iostate) | ||||
epatters
|
r2701 | self.ioloop.start() | ||
def stop(self): | ||||
self.ioloop.stop() | ||||
epatters
|
r2702 | super(RepSocketChannel, self).stop() | ||
Brian Granger
|
r2606 | |||
epatters
|
r2707 | def call_handlers(self, msg): | ||
"""This method is called in the ioloop thread when a message arrives. | ||||
Subclasses should override this method to handle incoming messages. | ||||
It is important to remember that this method is called in the thread | ||||
so that some logic must be done to ensure that the application leve | ||||
handlers are called in the application thread. | ||||
""" | ||||
raise NotImplementedError('call_handlers must be defined in a subclass.') | ||||
def readline(self, line): | ||||
"""A send a line of raw input to the kernel. | ||||
Parameters | ||||
---------- | ||||
line : str | ||||
The line of the input. | ||||
""" | ||||
content = dict(line=line) | ||||
msg = self.session.msg('readline_reply', content) | ||||
self._queue_reply(msg) | ||||
def _handle_events(self, socket, events): | ||||
if events & POLLERR: | ||||
self._handle_err() | ||||
if events & POLLOUT: | ||||
self._handle_send() | ||||
if events & POLLIN: | ||||
self._handle_recv() | ||||
def _handle_recv(self): | ||||
msg = self.socket.recv_json() | ||||
self.call_handlers(msg) | ||||
def _handle_send(self): | ||||
try: | ||||
msg = self.msg_queue.get(False) | ||||
except Empty: | ||||
pass | ||||
else: | ||||
self.socket.send_json(msg) | ||||
if self.msg_queue.empty(): | ||||
self.drop_io_state(POLLOUT) | ||||
def _handle_err(self): | ||||
# We don't want to let this go silently, so eventually we should log. | ||||
raise zmq.ZMQError() | ||||
def _queue_reply(self, msg): | ||||
self.msg_queue.put(msg) | ||||
self.add_io_state(POLLOUT) | ||||
epatters
|
r2611 | |||
Brian Granger
|
r2699 | #----------------------------------------------------------------------------- | ||
# Main kernel manager class | ||||
#----------------------------------------------------------------------------- | ||||
epatters
|
r2611 | class KernelManager(HasTraits): | ||
epatters
|
r2631 | """ Manages a kernel for a frontend. | ||
epatters
|
r2611 | |||
epatters
|
r2631 | The SUB channel is for the frontend to receive messages published by the | ||
kernel. | ||||
The REQ channel is for the frontend to make requests of the kernel. | ||||
The REP channel is for the kernel to request stdin (raw_input) from the | ||||
frontend. | ||||
""" | ||||
epatters
|
r2611 | # The PyZMQ Context to use for communication with the kernel. | ||
Brian Granger
|
r2699 | context = Instance(zmq.Context) | ||
epatters
|
r2611 | |||
# The Session to use for communication with the kernel. | ||||
Brian Granger
|
r2699 | session = Instance(Session) | ||
epatters
|
r2611 | |||
# The classes to use for the various channels. | ||||
xreq_channel_class = Type(XReqSocketChannel) | ||||
Brian Granger
|
r2699 | sub_channel_class = Type(SubSocketChannel) | ||
epatters
|
r2611 | rep_channel_class = Type(RepSocketChannel) | ||
epatters
|
r2631 | |||
epatters
|
r2611 | # Protected traits. | ||
epatters
|
r2667 | _kernel = Instance(Popen) | ||
Brian Granger
|
r2699 | _xreq_address = Any | ||
_sub_address = Any | ||||
_rep_address = Any | ||||
epatters
|
r2611 | _xreq_channel = Any | ||
Brian Granger
|
r2699 | _sub_channel = Any | ||
epatters
|
r2611 | _rep_channel = Any | ||
Brian Granger
|
r2699 | def __init__(self, xreq_address=None, sub_address=None, rep_address=None, | ||
context=None, session=None): | ||||
self._xreq_address = (LOCALHOST, 0) if xreq_address is None else xreq_address | ||||
self._sub_address = (LOCALHOST, 0) if sub_address is None else sub_address | ||||
self._rep_address = (LOCALHOST, 0) if rep_address is None else rep_address | ||||
self.context = zmq.Context() if context is None else context | ||||
self.session = Session() if session is None else session | ||||
epatters
|
r2686 | #-------------------------------------------------------------------------- | ||
# Channel management methods: | ||||
#-------------------------------------------------------------------------- | ||||
epatters
|
r2611 | |||
Brian Granger
|
r2699 | def start_channels(self): | ||
"""Starts the channels for this kernel. | ||||
This will create the channels if they do not exist and then start | ||||
them. If port numbers of 0 are being used (random ports) then you | ||||
must first call :method:`start_kernel`. If the channels have been | ||||
stopped and you call this, :class:`RuntimeError` will be raised. | ||||
epatters
|
r2639 | """ | ||
Brian Granger
|
r2699 | self.xreq_channel.start() | ||
self.sub_channel.start() | ||||
self.rep_channel.start() | ||||
epatters
|
r2639 | |||
Brian Granger
|
r2699 | def stop_channels(self): | ||
"""Stops the channels for this kernel. | ||||
This stops the channels by joining their threads. If the channels | ||||
were not started, :class:`RuntimeError` will be raised. | ||||
""" | ||||
self.xreq_channel.stop() | ||||
self.sub_channel.stop() | ||||
self.rep_channel.stop() | ||||
epatters
|
r2686 | |||
Brian Granger
|
r2699 | @property | ||
def channels_running(self): | ||||
"""Are all of the channels created and running?""" | ||||
return self.xreq_channel.is_alive() \ | ||||
and self.sub_channel.is_alive() \ | ||||
and self.rep_channel.is_alive() | ||||
epatters
|
r2639 | |||
epatters
|
r2686 | #-------------------------------------------------------------------------- | ||
# Kernel process management methods: | ||||
#-------------------------------------------------------------------------- | ||||
epatters
|
r2611 | def start_kernel(self): | ||
epatters
|
r2686 | """Starts a kernel process and configures the manager to use it. | ||
Brian Granger
|
r2699 | If random ports (port=0) are being used, this method must be called | ||
before the channels are created. | ||||
epatters
|
r2611 | """ | ||
epatters
|
r2703 | xreq, sub, rep = self.xreq_address, self.sub_address, self.rep_address | ||
if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or rep[0] != LOCALHOST: | ||||
epatters
|
r2667 | raise RuntimeError("Can only launch a kernel on localhost." | ||
"Make sure that the '*_address' attributes are " | ||||
"configured properly.") | ||||
epatters
|
r2703 | kernel, xrep, pub, req = launch_kernel( | ||
xrep_port=xreq[1], pub_port=sub[1], req_port=rep[1]) | ||||
epatters
|
r2686 | self._kernel = kernel | ||
Brian Granger
|
r2699 | self._xreq_address = (LOCALHOST, xrep) | ||
self._sub_address = (LOCALHOST, pub) | ||||
epatters
|
r2703 | self._rep_address = (LOCALHOST, req) | ||
epatters
|
r2686 | |||
@property | ||||
def has_kernel(self): | ||||
"""Returns whether a kernel process has been specified for the kernel | ||||
manager. | ||||
A kernel process can be set via 'start_kernel' or 'set_kernel'. | ||||
epatters
|
r2632 | """ | ||
epatters
|
r2686 | return self._kernel is not None | ||
def kill_kernel(self): | ||||
""" Kill the running kernel. """ | ||||
Brian Granger
|
r2699 | if self._kernel is not None: | ||
epatters
|
r2667 | self._kernel.kill() | ||
self._kernel = None | ||||
epatters
|
r2639 | else: | ||
epatters
|
r2686 | raise RuntimeError("Cannot kill kernel. No kernel is running!") | ||
epatters
|
r2611 | |||
def signal_kernel(self, signum): | ||||
epatters
|
r2686 | """ Sends a signal to the kernel. """ | ||
Brian Granger
|
r2699 | if self._kernel is not None: | ||
epatters
|
r2686 | self._kernel.send_signal(signum) | ||
else: | ||||
raise RuntimeError("Cannot signal kernel. No kernel is running!") | ||||
epatters
|
r2611 | |||
Brian Granger
|
r2699 | @property | ||
def is_alive(self): | ||||
"""Is the kernel process still running?""" | ||||
if self._kernel is not None: | ||||
if self._kernel.poll() is None: | ||||
return True | ||||
else: | ||||
return False | ||||
else: | ||||
# We didn't start the kernel with this KernelManager so we don't | ||||
# know if it is running. We should use a heartbeat for this case. | ||||
return True | ||||
epatters
|
r2632 | #-------------------------------------------------------------------------- | ||
# Channels used for communication with the kernel: | ||||
#-------------------------------------------------------------------------- | ||||
epatters
|
r2611 | @property | ||
def xreq_channel(self): | ||||
"""Get the REQ socket channel object to make requests of the kernel.""" | ||||
if self._xreq_channel is None: | ||||
epatters
|
r2631 | self._xreq_channel = self.xreq_channel_class(self.context, | ||
Brian Granger
|
r2699 | self.session, | ||
self.xreq_address) | ||||
epatters
|
r2611 | return self._xreq_channel | ||
@property | ||||
Brian Granger
|
r2699 | def sub_channel(self): | ||
"""Get the SUB socket channel object.""" | ||||
if self._sub_channel is None: | ||||
self._sub_channel = self.sub_channel_class(self.context, | ||||
self.session, | ||||
self.sub_address) | ||||
return self._sub_channel | ||||
@property | ||||
epatters
|
r2611 | def rep_channel(self): | ||
"""Get the REP socket channel object to handle stdin (raw_input).""" | ||||
if self._rep_channel is None: | ||||
epatters
|
r2631 | self._rep_channel = self.rep_channel_class(self.context, | ||
Brian Granger
|
r2699 | self.session, | ||
self.rep_address) | ||||
epatters
|
r2611 | return self._rep_channel | ||
epatters
|
r2631 | |||
Brian Granger
|
r2699 | @property | ||
def xreq_address(self): | ||||
return self._xreq_address | ||||
epatters
|
r2632 | |||
Brian Granger
|
r2699 | @property | ||
def sub_address(self): | ||||
return self._sub_address | ||||
epatters
|
r2632 | |||
Brian Granger
|
r2699 | @property | ||
def rep_address(self): | ||||
return self._rep_address | ||||
epatters
|
r2632 | |||