From c46f948dfce115a553578dae00178c4be6c35210 2010-08-04 14:25:24 From: epatters Date: 2010-08-04 14:25:24 Subject: [PATCH] Merge branch 'kernelmanager' of git://github.com/ellisonbg/ipython into qtfrontend. Fixed breakage and conflicts from merge. Conflicts: IPython/frontend/qt/console/ipython_widget.py --- diff --git a/IPython/frontend/qt/console/frontend_widget.py b/IPython/frontend/qt/console/frontend_widget.py index f5ab175..afaa3e1 100644 --- a/IPython/frontend/qt/console/frontend_widget.py +++ b/IPython/frontend/qt/console/frontend_widget.py @@ -161,10 +161,10 @@ class FrontendWidget(HistoryConsoleWidget): """ # Disconnect the old kernel manager, if necessary. if self._kernel_manager is not None: - self._kernel_manager.started_listening.disconnect( - self._started_listening) - self._kernel_manager.stopped_listening.disconnect( - self._stopped_listening) + self._kernel_manager.started_channels.disconnect( + self._started_channels) + self._kernel_manager.stopped_channels.disconnect( + self._stopped_channels) # Disconnect the old kernel manager's channels. sub = self._kernel_manager.sub_channel @@ -174,9 +174,9 @@ class FrontendWidget(HistoryConsoleWidget): xreq.complete_reply.disconnect(self._handle_complete_reply) xreq.object_info_reply.disconnect(self._handle_object_info_reply) - # Handle the case where the old kernel manager is still listening. - if self._kernel_manager.is_listening: - self._stopped_listening() + # Handle the case where the old kernel manager is still channels. + if self._kernel_manager.channels_running: + self._stopped_channels() # Set the new kernel manager. self._kernel_manager = kernel_manager @@ -184,8 +184,8 @@ class FrontendWidget(HistoryConsoleWidget): return # Connect the new kernel manager. - kernel_manager.started_listening.connect(self._started_listening) - kernel_manager.stopped_listening.connect(self._stopped_listening) + kernel_manager.started_channels.connect(self._started_channels) + kernel_manager.stopped_channels.connect(self._stopped_channels) # Connect the new kernel manager's channels. sub = kernel_manager.sub_channel @@ -195,10 +195,10 @@ class FrontendWidget(HistoryConsoleWidget): xreq.complete_reply.connect(self._handle_complete_reply) xreq.object_info_reply.connect(self._handle_object_info_reply) - # Handle the case where the kernel manager started listening before + # Handle the case where the kernel manager started channels before # we connected. - if kernel_manager.is_listening: - self._started_listening() + if kernel_manager.channels_running: + self._started_channels() kernel_manager = property(_get_kernel_manager, _set_kernel_manager) @@ -323,8 +323,8 @@ class FrontendWidget(HistoryConsoleWidget): if doc: self._call_tip_widget.show_docstring(doc) - def _started_listening(self): + def _started_channels(self): self.clear() - def _stopped_listening(self): + def _stopped_channels(self): pass diff --git a/IPython/frontend/qt/console/ipython_widget.py b/IPython/frontend/qt/console/ipython_widget.py index b40b20f..b26b639 100644 --- a/IPython/frontend/qt/console/ipython_widget.py +++ b/IPython/frontend/qt/console/ipython_widget.py @@ -82,7 +82,7 @@ if __name__ == '__main__': # Create a KernelManager. kernel_manager = QtKernelManager() kernel_manager.start_kernel() - kernel_manager.start_listening() + kernel_manager.start_channels() # Launch the application. app = QtGui.QApplication([]) diff --git a/IPython/frontend/qt/kernelmanager.py b/IPython/frontend/qt/kernelmanager.py index f546479..1ae72bf 100644 --- a/IPython/frontend/qt/kernelmanager.py +++ b/IPython/frontend/qt/kernelmanager.py @@ -93,12 +93,6 @@ class QtXReqSocketChannel(XReqSocketChannel, QtCore.QObject): if signal: signal.emit(msg) - def _queue_request(self, msg, callback): - """ Reimplemented to skip callback handling. - """ - self.command_queue.put(msg) - self.add_io_state(zmq.POLLOUT) - class QtRepSocketChannel(RepSocketChannel, QtCore.QObject): @@ -120,10 +114,10 @@ class QtKernelManager(KernelManager, QtCore.QObject): __metaclass__ = MetaQObjectHasTraits # Emitted when the kernel manager has started listening. - started_listening = QtCore.pyqtSignal() + started_channels = QtCore.pyqtSignal() # Emitted when the kernel manager has stopped listening. - stopped_listening = QtCore.pyqtSignal() + stopped_channels = QtCore.pyqtSignal() # Use Qt-specific channel classes that emit signals. sub_channel_class = QtSubSocketChannel @@ -131,17 +125,27 @@ class QtKernelManager(KernelManager, QtCore.QObject): rep_channel_class = QtRepSocketChannel #--------------------------------------------------------------------------- + # 'object' interface + #--------------------------------------------------------------------------- + + def __init__(self, *args, **kw): + """ Reimplemented to ensure that QtCore.QObject is initialized first. + """ + QtCore.QObject.__init__(self) + KernelManager.__init__(self, *args, **kw) + + #--------------------------------------------------------------------------- # 'KernelManager' interface #--------------------------------------------------------------------------- - def start_listening(self): + def start_channels(self): """ Reimplemented to emit signal. """ - super(QtKernelManager, self).start_listening() - self.started_listening.emit() + super(QtKernelManager, self).start_channels() + self.started_channels.emit() - def stop_listening(self): + def stop_channels(self): """ Reimplemented to emit signal. """ - super(QtKernelManager, self).stop_listening() - self.stopped_listening.emit() + super(QtKernelManager, self).stop_channels() + self.stopped_channels.emit() diff --git a/IPython/zmq/kernelmanager.py b/IPython/zmq/kernelmanager.py index 0829c35..7b879e0 100644 --- a/IPython/zmq/kernelmanager.py +++ b/IPython/zmq/kernelmanager.py @@ -1,15 +1,27 @@ -"""Kernel frontend classes. +"""Classes to manage the interaction with a running kernel. -TODO: Create logger to handle debugging and console messages. +Todo +==== +* Create logger to handle debugging and console messages. """ +#----------------------------------------------------------------------------- +# Copyright (C) 2008-2010 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + # Standard library imports. from Queue import Queue, Empty from subprocess import Popen from threading import Thread import time -import traceback # System library imports. import zmq @@ -17,70 +29,80 @@ from zmq import POLLIN, POLLOUT, POLLERR from zmq.eventloop import ioloop # Local imports. -from IPython.utils.traitlets import HasTraits, Any, Bool, Int, Instance, Str, \ - Type +from IPython.utils.traitlets import HasTraits, Any, Instance, Type from kernel import launch_kernel from session import Session -# Constants. -LOCALHOST = '127.0.0.1' +#----------------------------------------------------------------------------- +# Constants and exceptions +#----------------------------------------------------------------------------- +LOCALHOST = '127.0.0.1' -class MissingHandlerError(Exception): +class InvalidPortNumber(Exception): pass +#----------------------------------------------------------------------------- +# ZMQ Socket Channel classes +#----------------------------------------------------------------------------- class ZmqSocketChannel(Thread): - """ The base class for the channels that use ZMQ sockets. + """The base class for the channels that use ZMQ sockets. """ - context = None session = None socket = None ioloop = None iostate = None + _address = None - def __init__(self, context, session, address=None): + def __init__(self, context, session, address): + """Create a channel + + Parameters + ---------- + context : zmq.Context + The ZMQ context to use. + session : session.Session + The session to use. + address : tuple + Standard (ip, port) tuple that the kernel is listening on. + """ super(ZmqSocketChannel, self).__init__() self.daemon = True self.context = context self.session = session - self.address = address + if address[1] == 0: + raise InvalidPortNumber('The port number for a channel cannot be 0.') + self._address = address def stop(self): - """Stop the thread's activity. Returns when the thread terminates. + """Stop the channel's activity. - The thread will raise :class:`RuntimeError` if :method:`self.start` - is called again. + This calls :method:`Thread.join` and returns when the thread + terminates. :class:`RuntimeError` will be raised if + :method:`self.start` is called again. """ self.join() - - def get_address(self): - """ Get the channel's address. By the default, a channel is on - localhost with no port specified (a negative port number). + @property + def address(self): + """Get the channel's address as an (ip, port) tuple. + + By the default, the address is (localhost, 0), where 0 means a random + port. """ return self._address - def set_adresss(self, address): - """ Set the channel's address. Should be a tuple of form: - (ip address [str], port [int]). - or None, in which case the address is reset to its default value. - """ - # FIXME: Validate address. - if self.is_alive(): # This is Thread.is_alive - raise RuntimeError("Cannot set address on a running channel!") - else: - if address is None: - address = (LOCALHOST, 0) - self._address = address - - address = property(get_address, set_adresss) - def add_io_state(self, state): """Add IO state to the eventloop. + Parameters + ---------- + state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR + The IO state flag to set. + This is thread safe as it uses the thread safe IOLoop.add_callback. """ def add_io_state_callback(): @@ -92,6 +114,11 @@ class ZmqSocketChannel(Thread): def drop_io_state(self, state): """Drop IO state from the eventloop. + Parameters + ---------- + state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR + The IO state flag to set. + This is thread safe as it uses the thread safe IOLoop.add_callback. """ def drop_io_state_callback(): @@ -101,48 +128,30 @@ class ZmqSocketChannel(Thread): self.ioloop.add_callback(drop_io_state_callback) -class SubSocketChannel(ZmqSocketChannel): +class XReqSocketChannel(ZmqSocketChannel): + """The XREQ channel for issues request/replies to the kernel. + """ - def __init__(self, context, session, address=None): - super(SubSocketChannel, self).__init__(context, session, address) + command_queue = None + + def __init__(self, context, session, address): + self.command_queue = Queue() + super(XReqSocketChannel, self).__init__(context, session, address) def run(self): - self.socket = self.context.socket(zmq.SUB) - self.socket.setsockopt(zmq.SUBSCRIBE,'') + """The thread's main activity. Call start() instead.""" + self.socket = self.context.socket(zmq.XREQ) self.socket.setsockopt(zmq.IDENTITY, self.session.session) self.socket.connect('tcp://%s:%i' % self.address) self.ioloop = ioloop.IOLoop() - self.iostate = POLLIN|POLLERR + self.iostate = POLLERR|POLLIN self.ioloop.add_handler(self.socket, self._handle_events, self.iostate) self.ioloop.start() def stop(self): self.ioloop.stop() - super(SubSocketChannel, self).stop() - - def _handle_events(self, socket, events): - # Turn on and off POLLOUT depending on if we have made a request - if events & POLLERR: - self._handle_err() - if events & POLLIN: - self._handle_recv() - - def _handle_err(self): - # We don't want to let this go silently, so eventually we should log. - raise zmq.ZMQError() - - def _handle_recv(self): - # Get all of the messages we can - while True: - try: - msg = self.socket.recv_json(zmq.NOBLOCK) - except zmq.ZMQError: - # Check the errno? - # Will this tigger POLLERR? - break - else: - self.call_handlers(msg) + super(XReqSocketChannel, self).stop() def call_handlers(self, msg): """This method is called in the ioloop thread when a message arrives. @@ -154,59 +163,65 @@ class SubSocketChannel(ZmqSocketChannel): """ raise NotImplementedError('call_handlers must be defined in a subclass.') - def flush(self, timeout=1.0): - """Immediately processes all pending messages on the SUB channel. - - This method is thread safe. + def execute(self, code): + """Execute code in the kernel. Parameters ---------- - timeout : float, optional - The maximum amount of time to spend flushing, in seconds. The - default is one second. - """ - # We do the IOLoop callback process twice to ensure that the IOLoop - # gets to perform at least one full poll. - stop_time = time.time() + timeout - for i in xrange(2): - self._flushed = False - self.ioloop.add_callback(self._flush) - while not self._flushed and time.time() < stop_time: - time.sleep(0.01) - - def _flush(self): - """Called in this thread by the IOLoop to indicate that all events have - been processed. + code : str + A string of Python code. + + Returns + ------- + The msg_id of the message sent. """ - self._flushed = True + # Create class for content/msg creation. Related to, but possibly + # not in Session. + content = dict(code=code) + msg = self.session.msg('execute_request', content) + self._queue_request(msg) + return msg['header']['msg_id'] + def complete(self, text, line, block=None): + """Tab complete text, line, block in the kernel's namespace. -class XReqSocketChannel(ZmqSocketChannel): + Parameters + ---------- + text : str + The text to complete. + line : str + The full line of text that is the surrounding context for the + text to complete. + block : str + The full block of code in which the completion is being requested. + + Returns + ------- + The msg_id of the message sent. - handler_queue = None - command_queue = None - handlers = None - _overriden_call_handler = None + """ + content = dict(text=text, line=line) + msg = self.session.msg('complete_request', content) + self._queue_request(msg) + return msg['header']['msg_id'] - def __init__(self, context, session, address=None): - self.handlers = {} - self.handler_queue = Queue() - self.command_queue = Queue() - super(XReqSocketChannel, self).__init__(context, session, address) + def object_info(self, oname): + """Get metadata information about an object. - def run(self): - self.socket = self.context.socket(zmq.XREQ) - self.socket.setsockopt(zmq.IDENTITY, self.session.session) - self.socket.connect('tcp://%s:%i' % self.address) - self.ioloop = ioloop.IOLoop() - self.iostate = POLLERR|POLLIN - self.ioloop.add_handler(self.socket, self._handle_events, - self.iostate) - self.ioloop.start() - - def stop(self): - self.ioloop.stop() - super(XReqSocketChannel, self).stop() + Parameters + ---------- + oname : str + A string specifying the object name. + + Returns + ------- + The msg_id of the message sent. + """ + print oname + content = dict(oname=oname) + msg = self.session.msg('object_info_request', content) + self._queue_request(msg) + return msg['header']['msg_id'] def _handle_events(self, socket, events): if events & POLLERR: @@ -234,82 +249,113 @@ class XReqSocketChannel(ZmqSocketChannel): # We don't want to let this go silently, so eventually we should log. raise zmq.ZMQError() - def _queue_request(self, msg, callback): - handler = self._find_handler(msg['msg_type'], callback) - self.handler_queue.put(handler) + def _queue_request(self, msg): self.command_queue.put(msg) self.add_io_state(POLLOUT) - def execute(self, code, callback=None): - # Create class for content/msg creation. Related to, but possibly - # not in Session. - content = dict(code=code) - msg = self.session.msg('execute_request', content) - self._queue_request(msg, callback) - return msg['header']['msg_id'] - def complete(self, text, line, block=None, callback=None): - content = dict(text=text, line=line) - msg = self.session.msg('complete_request', content) - self._queue_request(msg, callback) - return msg['header']['msg_id'] +class SubSocketChannel(ZmqSocketChannel): + """The SUB channel which listens for messages that the kernel publishes. + """ - def object_info(self, oname, callback=None): - content = dict(oname=oname) - msg = self.session.msg('object_info_request', content) - self._queue_request(msg, callback) - return msg['header']['msg_id'] + def __init__(self, context, session, address): + super(SubSocketChannel, self).__init__(context, session, address) - def _find_handler(self, name, callback): - if callback is not None: - return callback - handler = self.handlers.get(name) - if handler is None: - raise MissingHandlerError( - 'No handler defined for method: %s' % name) - return handler - - def override_call_handler(self, func): - """Permanently override the call_handler. - - The function func will be called as:: + def run(self): + """The thread's main activity. Call start() instead.""" + self.socket = self.context.socket(zmq.SUB) + self.socket.setsockopt(zmq.SUBSCRIBE,'') + self.socket.setsockopt(zmq.IDENTITY, self.session.session) + self.socket.connect('tcp://%s:%i' % self.address) + self.ioloop = ioloop.IOLoop() + self.iostate = POLLIN|POLLERR + self.ioloop.add_handler(self.socket, self._handle_events, + self.iostate) + self.ioloop.start() - func(handler, msg) + def stop(self): + self.ioloop.stop() + super(SubSocketChannel, self).stop() - And must call:: - - handler(msg) + def call_handlers(self, msg): + """This method is called in the ioloop thread when a message arrives. - in the main thread. + Subclasses should override this method to handle incoming messages. + It is important to remember that this method is called in the thread + so that some logic must be done to ensure that the application leve + handlers are called in the application thread. """ - assert callable(func), "not a callable: %r" % func - self._overriden_call_handler = func + raise NotImplementedError('call_handlers must be defined in a subclass.') - def call_handlers(self, msg): - try: - handler = self.handler_queue.get(False) - except Empty: - print "Message received with no handler!!!" - print msg - else: - self.call_handler(handler, msg) - - def call_handler(self, handler, msg): - if self._overriden_call_handler is not None: - self._overriden_call_handler(handler, msg) - elif hasattr(self, '_call_handler'): - call_handler = getattr(self, '_call_handler') - call_handler(handler, msg) - else: - raise RuntimeError('no handler!') + def flush(self, timeout=1.0): + """Immediately processes all pending messages on the SUB channel. + + This method is thread safe. + + Parameters + ---------- + timeout : float, optional + The maximum amount of time to spend flushing, in seconds. The + default is one second. + """ + # We do the IOLoop callback process twice to ensure that the IOLoop + # gets to perform at least one full poll. + stop_time = time.time() + timeout + for i in xrange(2): + self._flushed = False + self.ioloop.add_callback(self._flush) + while not self._flushed and time.time() < stop_time: + time.sleep(0.01) + + def _handle_events(self, socket, events): + # Turn on and off POLLOUT depending on if we have made a request + if events & POLLERR: + self._handle_err() + if events & POLLIN: + self._handle_recv() + + def _handle_err(self): + # We don't want to let this go silently, so eventually we should log. + raise zmq.ZMQError() + + def _handle_recv(self): + # Get all of the messages we can + while True: + try: + msg = self.socket.recv_json(zmq.NOBLOCK) + except zmq.ZMQError: + # Check the errno? + # Will this tigger POLLERR? + break + else: + self.call_handlers(msg) + + def _flush(self): + """Callback for :method:`self.flush`.""" + self._flushed = True class RepSocketChannel(ZmqSocketChannel): + """A reply channel to handle raw_input requests that the kernel makes.""" + + def run(self): + """The thread's main activity. Call start() instead.""" + self.ioloop = ioloop.IOLoop() + self.ioloop.start() + + def stop(self): + self.ioloop.stop() + super(SubSocketChannel, self).stop() def on_raw_input(self): pass +#----------------------------------------------------------------------------- +# Main kernel manager class +#----------------------------------------------------------------------------- + + class KernelManager(HasTraits): """ Manages a kernel for a frontend. @@ -321,59 +367,66 @@ class KernelManager(HasTraits): The REP channel is for the kernel to request stdin (raw_input) from the frontend. """ - - # Whether the kernel manager is currently listening on its channels. - is_listening = Bool(False) - # The PyZMQ Context to use for communication with the kernel. - context = Instance(zmq.Context, ()) + context = Instance(zmq.Context) # The Session to use for communication with the kernel. - session = Instance(Session, ()) + session = Instance(Session) # The classes to use for the various channels. - sub_channel_class = Type(SubSocketChannel) xreq_channel_class = Type(XReqSocketChannel) + sub_channel_class = Type(SubSocketChannel) rep_channel_class = Type(RepSocketChannel) # Protected traits. _kernel = Instance(Popen) - _sub_channel = Any + _xreq_address = Any + _sub_address = Any + _rep_address = Any _xreq_channel = Any + _sub_channel = Any _rep_channel = Any + def __init__(self, xreq_address=None, sub_address=None, rep_address=None, + context=None, session=None): + self._xreq_address = (LOCALHOST, 0) if xreq_address is None else xreq_address + self._sub_address = (LOCALHOST, 0) if sub_address is None else sub_address + self._rep_address = (LOCALHOST, 0) if rep_address is None else rep_address + self.context = zmq.Context() if context is None else context + self.session = Session() if session is None else session + #-------------------------------------------------------------------------- # Channel management methods: #-------------------------------------------------------------------------- - def start_listening(self): - """Starts listening on the specified ports. If already listening, raises - a RuntimeError. + def start_channels(self): + """Starts the channels for this kernel. + + This will create the channels if they do not exist and then start + them. If port numbers of 0 are being used (random ports) then you + must first call :method:`start_kernel`. If the channels have been + stopped and you call this, :class:`RuntimeError` will be raised. """ - if self.is_listening: - raise RuntimeError("Cannot start listening. Already listening!") - else: - self.is_listening = True - self.sub_channel.start() - self.xreq_channel.start() - self.rep_channel.start() + self.xreq_channel.start() + self.sub_channel.start() + self.rep_channel.start() - @property - def is_alive(self): - """ Returns whether the kernel is alive. """ - if self.is_listening: - # TODO: check if alive. - return True - else: - return False + def stop_channels(self): + """Stops the channels for this kernel. + + This stops the channels by joining their threads. If the channels + were not started, :class:`RuntimeError` will be raised. + """ + self.xreq_channel.stop() + self.sub_channel.stop() + self.rep_channel.stop() - def stop_listening(self): - """Stops listening. If not listening, does nothing. """ - if self.is_listening: - self.is_listening = False - self.sub_channel.stop() - self.xreq_channel.stop() - self.rep_channel.stop() + @property + def channels_running(self): + """Are all of the channels created and running?""" + return self.xreq_channel.is_alive() \ + and self.sub_channel.is_alive() \ + and self.rep_channel.is_alive() #-------------------------------------------------------------------------- # Kernel process management methods: @@ -382,9 +435,8 @@ class KernelManager(HasTraits): def start_kernel(self): """Starts a kernel process and configures the manager to use it. - If ports have been specified via the address attributes, they are used. - Otherwise, open ports are chosen by the OS and the channel port - attributes are configured as appropriate. + If random ports (port=0) are being used, this method must be called + before the channels are created. """ xreq, sub = self.xreq_address, self.sub_address if xreq[0] != LOCALHOST or sub[0] != LOCALHOST: @@ -393,24 +445,13 @@ class KernelManager(HasTraits): "configured properly.") kernel, xrep, pub = launch_kernel(xrep_port=xreq[1], pub_port=sub[1]) - self.set_kernel(kernel) - self.xreq_address = (LOCALHOST, xrep) - self.sub_address = (LOCALHOST, pub) - - def set_kernel(self, kernel): - """Sets the kernel manager's kernel to an existing kernel process. - - It is *not* necessary to a set a kernel to communicate with it via the - channels, and those objects must be configured separately. It - *is* necessary to set a kernel if you want to use the manager (or - frontends that use the manager) to signal and/or kill the kernel. - - Parameters: - ----------- - kernel : Popen - An existing kernel process. - """ self._kernel = kernel + self._xreq_address = (LOCALHOST, xrep) + self._sub_address = (LOCALHOST, pub) + # The rep channel is not fully working yet, but its base class makes + # sure the port is not 0. We set to -1 for now until the rep channel + # is fully working. + self._rep_address = (LOCALHOST, -1) @property def has_kernel(self): @@ -423,7 +464,7 @@ class KernelManager(HasTraits): def kill_kernel(self): """ Kill the running kernel. """ - if self._kernel: + if self._kernel is not None: self._kernel.kill() self._kernel = None else: @@ -431,67 +472,65 @@ class KernelManager(HasTraits): def signal_kernel(self, signum): """ Sends a signal to the kernel. """ - if self._kernel: + if self._kernel is not None: self._kernel.send_signal(signum) else: raise RuntimeError("Cannot signal kernel. No kernel is running!") + @property + def is_alive(self): + """Is the kernel process still running?""" + if self._kernel is not None: + if self._kernel.poll() is None: + return True + else: + return False + else: + # We didn't start the kernel with this KernelManager so we don't + # know if it is running. We should use a heartbeat for this case. + return True + #-------------------------------------------------------------------------- # Channels used for communication with the kernel: #-------------------------------------------------------------------------- @property - def sub_channel(self): - """Get the SUB socket channel object.""" - if self._sub_channel is None: - self._sub_channel = self.sub_channel_class(self.context, - self.session) - return self._sub_channel - - @property def xreq_channel(self): """Get the REQ socket channel object to make requests of the kernel.""" if self._xreq_channel is None: self._xreq_channel = self.xreq_channel_class(self.context, - self.session) + self.session, + self.xreq_address) return self._xreq_channel @property + def sub_channel(self): + """Get the SUB socket channel object.""" + if self._sub_channel is None: + self._sub_channel = self.sub_channel_class(self.context, + self.session, + self.sub_address) + return self._sub_channel + + @property def rep_channel(self): """Get the REP socket channel object to handle stdin (raw_input).""" if self._rep_channel is None: self._rep_channel = self.rep_channel_class(self.context, - self.session) + self.session, + self.rep_address) return self._rep_channel - #-------------------------------------------------------------------------- - # Delegates for the Channel address attributes: - #-------------------------------------------------------------------------- - - def get_sub_address(self): - return self.sub_channel.address - - def set_sub_address(self, address): - self.sub_channel.address = address - - sub_address = property(get_sub_address, set_sub_address, - doc="The address used by SUB socket channel.") - - def get_xreq_address(self): - return self.xreq_channel.address - - def set_xreq_address(self, address): - self.xreq_channel.address = address + @property + def xreq_address(self): + return self._xreq_address - xreq_address = property(get_xreq_address, set_xreq_address, - doc="The address used by XREQ socket channel.") - - def get_rep_address(self): - return self.rep_channel.address + @property + def sub_address(self): + return self._sub_address - def set_rep_address(self, address): - self.rep_channel.address = address + @property + def rep_address(self): + return self._rep_address - rep_address = property(get_rep_address, set_rep_address, - doc="The address used by REP socket channel.")