From 1f9a029b6574f3973521d582faa0cd4258f79b9a 2011-12-06 01:40:57 From: Thomas Kluyver Date: 2011-12-06 01:40:57 Subject: [PATCH] Refactor and simplification of zmqterminal. --- diff --git a/IPython/frontend/zmqterminal/completer.py b/IPython/frontend/zmqterminal/completer.py index cd75059..5d21a1e 100644 --- a/IPython/frontend/zmqterminal/completer.py +++ b/IPython/frontend/zmqterminal/completer.py @@ -1,11 +1,6 @@ # -*- coding: utf-8 -*- import readline -import time -import sys -stdout = sys.stdout - -class TimeoutError(Exception): - pass +from Queue import Empty class ClientCompleter2p(object): """Client-side completion machinery. @@ -27,22 +22,18 @@ class ClientCompleter2p(object): dict(text=text, line=line)) # send completion request to kernel # Give the kernel up to 0.5s to respond - for i in range(5): - if self.km.xreq_channel.was_called(): - msg_xreq = self.km.xreq_channel.get_msg() - if msg["header"]['session'] == msg_xreq["parent_header"]['session'] and \ - msg_xreq["content"]["status"] == 'ok' and \ - msg_xreq["msg_type"] == "complete_reply" : - return msg_xreq["content"]["matches"] - - time.sleep(0.1) - raise TimeoutError + msg_xreq = self.km.xreq_channel.get_msg(timeout=0.5) + if msg["header"]['session'] == msg_xreq["parent_header"]['session'] and \ + msg_xreq["content"]["status"] == 'ok' and \ + msg_xreq["msg_type"] == "complete_reply" : + return msg_xreq["content"]["matches"] + return [] def complete(self, text, state): if state == 0: try: self.matches = self.complete_request(text) - except TimeoutError: + except Empty: print('WARNING: Kernel timeout on tab completion.') try: diff --git a/IPython/frontend/zmqterminal/frontend.py b/IPython/frontend/zmqterminal/frontend.py index b827a31..3ee50e2 100755 --- a/IPython/frontend/zmqterminal/frontend.py +++ b/IPython/frontend/zmqterminal/frontend.py @@ -17,32 +17,18 @@ For more details, see the ipython-zmq design #----------------------------------------------------------------------------- import __builtin__ -from contextlib import nested -import time import sys import os -import signal -import uuid -import cPickle as pickle -import code -import zmq +from Queue import Empty import readline import rlcompleter -import time #----------------------------------------------------------------------------- # Imports from ipython #----------------------------------------------------------------------------- from IPython.external.argparse import ArgumentParser -from IPython.utils.traitlets import ( -Int, Str, CBool, CaselessStrEnum, Enum, List, Unicode -) -from IPython.core.interactiveshell import get_default_colors -from IPython.core.excolors import exception_colors -from IPython.utils import PyColorize from IPython.core.inputsplitter import IPythonInputSplitter -from IPython.frontend.zmqterminal.kernelmanager import KernelManager2p as KernelManager -from IPython.zmq.session import Session +from IPython.zmq.blockingkernelmanager import BlockingKernelManager as KernelManager from IPython.frontend.zmqterminal.completer import ClientCompleter2p #----------------------------------------------------------------------------- @@ -51,7 +37,7 @@ from IPython.frontend.zmqterminal.completer import ClientCompleter2p from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS class Frontend(object): - """This class is a simple frontend to ipython-zmq + """This class is a simple frontend to ipython-zmq NOTE: this class uses kernelmanager to manipulate sockets @@ -60,16 +46,16 @@ class Frontend(object): kernelmanager : object instantiated object from class KernelManager in module kernelmanager - """ + """ - def __init__(self, kernelmanager): + def __init__(self, kernelmanager): self.km = kernelmanager self.session = kernelmanager.session self.request_socket = self.km.xreq_channel.socket self.sub_socket = self.km.sub_channel.socket self.reply_socket = self.km.rep_channel.socket self.msg_header = self.km.session.msg_header() - self.completer = ClientCompleter2p(self,self.km) + self.completer = ClientCompleter2p(self, self.km) readline.parse_and_bind("tab: complete") readline.parse_and_bind('set show-all-if-ambiguous on') readline.set_completer(self.completer.complete) @@ -88,10 +74,10 @@ class Frontend(object): self.prompt_count = 0 self._get_initial_prompt() - def _get_initial_prompt(self): + def _get_initial_prompt(self): self._execute('', hidden=True) - def interact(self): + def interact(self): """Gets input from console using inputsplitter, then while you enter code it can indent and set index id to any input """ @@ -108,7 +94,7 @@ class Frontend(object): pass - def start(self): + def start(self): """Start the interaction loop, calling the .interact() method for each input cell. """ @@ -128,37 +114,39 @@ class Frontend(object): elif answer == 'n': break - def _execute(self, source, hidden = True): - """ Execute 'source'. If 'hidden', do not show any output. + def _execute(self, source, hidden = True): + """ Execute 'source'. If 'hidden', do not show any output. See parent class :meth:`execute` docstring for full details. - """ - self.km.xreq_channel.execute(source, hidden) - self.handle_xreq_channel() - self.handle_rep_channel() + """ + self.km.xreq_channel.execute(source, hidden) + while not self.km.xreq_channel.msg_ready(): + try: + self.handle_rep_channel(timeout=0.1) + except Empty: + pass + self.handle_xreq_channel() - def handle_xreq_channel(self): - # Give the kernel up to 0.5s to respond - for i in range(5): - if self.km.xreq_channel.was_called(): - self.msg_xreq = self.km.xreq_channel.get_msg() - if self.msg_header["session"] == self.msg_xreq["parent_header"]["session"] : - if self.msg_xreq["content"]["status"] == 'ok' : - if self.msg_xreq["msg_type"] == "execute_reply" : - self.handle_sub_channel() - self.prompt_count = self.msg_xreq["content"]["execution_count"]+1 - - else: - etb = self.msg_xreq["content"]["traceback"] - print >> sys.stderr, etb[0] - print >> sys.stderr, etb[1] - print >> sys.stderr, etb[2] - self.prompt_count = self.msg_xreq["content"]["execution_count"]+1 - break - time.sleep(0.1) + def handle_xreq_channel(self): + self.msg_xreq = self.km.xreq_channel.get_msg() + if self.msg_header["session"] == self.msg_xreq["parent_header"]["session"]: + if self.msg_xreq["content"]["status"] == 'ok' : + if self.msg_xreq["msg_type"] == "execute_reply" : + self.handle_sub_channel() + self.prompt_count = self.msg_xreq["content"]["execution_count"]+1 + + else: + etb = self.msg_xreq["content"]["traceback"] + print >> sys.stderr, etb[0] + try: # These bits aren't there for a SyntaxError + print >> sys.stderr, etb[1] + print >> sys.stderr, etb[2] + except IndexError: + pass + self.prompt_count = self.msg_xreq["content"]["execution_count"]+1 - def handle_sub_channel(self): + def handle_sub_channel(self): """ Method to procces subscribe channel's messages This method reads a message and processes the content in different @@ -168,7 +156,7 @@ class Frontend(object): sub_msg: message receive from kernel in the sub socket channel capture by kernel manager. """ - while self.km.sub_channel.was_called(): + while self.km.sub_channel.msg_ready(): sub_msg = self.km.sub_channel.get_msg() if self.msg_header["username"] == sub_msg['parent_header']['username'] and \ self.km.session.session == sub_msg['parent_header']['session']: @@ -188,14 +176,13 @@ class Frontend(object): print >> sys.stdout,"Out[%i]:"%sub_msg["content"]["execution_count"], sub_msg["content"]["data"]["text/plain"] sys.stdout.flush() - def handle_rep_channel(self): - """ Method to capture raw_input - """ - if self.km.rep_channel.was_called() : - self.msg_rep = self.km.rep_channel.get_msg() - if self.msg_header["session"] == self.msg_rep["parent_header"]["session"] : - raw_data = raw_input(self.msg_rep["content"]["prompt"]) - self.km.rep_channel.input(raw_data) + def handle_rep_channel(self, timeout=0.1): + """ Method to capture raw_input + """ + self.msg_rep = self.km.rep_channel.get_msg(timeout=timeout) + if self.msg_header["session"] == self.msg_rep["parent_header"]["session"] : + raw_data = raw_input(self.msg_rep["content"]["prompt"]) + self.km.rep_channel.input(raw_data) @@ -270,7 +257,6 @@ def start_frontend(): kernel_manager.start_channels() - time.sleep(4) frontend=Frontend(kernel_manager) return frontend diff --git a/IPython/frontend/zmqterminal/kernelmanager.py b/IPython/frontend/zmqterminal/kernelmanager.py deleted file mode 100644 index 5bf657c..0000000 --- a/IPython/frontend/zmqterminal/kernelmanager.py +++ /dev/null @@ -1,92 +0,0 @@ -# -*- coding: utf-8 -*- - -from Queue import Queue -from IPython.zmq.session import Session, Message, extract_header -from IPython.utils.traitlets import Type, HasTraits, TraitType -from IPython.zmq.kernelmanager import KernelManager, SubSocketChannel, \ -XReqSocketChannel, RepSocketChannel, HBSocketChannel -MetaHasTraits = type(HasTraits) - - -class SubSocketChannel2p(SubSocketChannel): - #--------------------------------------------------------------------------- - # 'SubSocketChannel' interface - #--------------------------------------------------------------------------- - _msg = None - queue = Queue(-1) - def call_handlers(self, msg): - self.queue.put(Message(msg)) - - def get_msg(self): - return self.queue.get() - - def was_called(self): - return not self.queue.empty() - -class XReqSocketChannel2p(XReqSocketChannel): - #--------------------------------------------------------------------------- - # 'XReqSocketChannel' interface - #--------------------------------------------------------------------------- - _msg = None - _called = False - queue = Queue(-1) - def call_handlers(self, msg): - self.queue.put(Message(msg)) - - def get_msg(self): - return self.queue.get() - - def was_called(self): - return not self.queue.empty() - -class RepSocketChannel2p(RepSocketChannel): - #--------------------------------------------------------------------------- - # 'XReqSocketChannel' interface - #--------------------------------------------------------------------------- - _msg = None - _called = False - def call_handlers(self, msg): - self._called = True - self._msg = Message(msg) - - def get_msg(self): - self._called = False - return self._msg - - def was_called(self): - return self._called - -class HBSocketChannel2p(HBSocketChannel): - #--------------------------------------------------------------------------- - # 'XReqSocketChannel' interface - #--------------------------------------------------------------------------- - _msg = None - _called = False - def call_handlers(self, msg): - self._called = True - self._msg = Message(msg) - - def get_msg(self): - self._called = False - return self._msg - - def was_called(self): - return self._called - -class KernelManager2p(KernelManager): - sub_channel_class = Type(SubSocketChannel2p) - xreq_channel_class = Type(XReqSocketChannel2p) - rep_channel_class = Type(RepSocketChannel2p) - hb_channel_class = Type(HBSocketChannel2p) - - def start_kernel(self, *args, **kw): - """ Reimplemented for proper heartbeat management. - """ - if self._xreq_channel is not None: - self._xreq_channel.reset_first_reply() - super(KernelManager2p, self).start_kernel(*args, **kw) - - - - - \ No newline at end of file diff --git a/IPython/zmq/blockingkernelmanager.py b/IPython/zmq/blockingkernelmanager.py index 9dbe1c3..ed849b6 100644 --- a/IPython/zmq/blockingkernelmanager.py +++ b/IPython/zmq/blockingkernelmanager.py @@ -16,6 +16,7 @@ from __future__ import print_function # Stdlib from Queue import Queue, Empty +from threading import Event # Our own from IPython.utils import io @@ -104,9 +105,31 @@ class BlockingShellSocketChannel(ShellSocketChannel): class BlockingStdInSocketChannel(StdInSocketChannel): + def __init__(self, context, session, address=None): + super(BlockingRepSocketChannel, self).__init__(context, session, address) + self._in_queue = Queue() + def call_handlers(self, msg): #io.rprint('[[Rep]]', msg) # dbg - pass + self._in_queue.put(msg) + + def get_msg(self, block=True, timeout=None): + "Gets a message if there is one that is ready." + return self._in_queue.get(block, timeout) + + def get_msgs(self): + """Get all messages that are currently ready.""" + msgs = [] + while True: + try: + msgs.append(self.get_msg(block=False)) + except Empty: + break + return msgs + + def msg_ready(self): + "Is there a message that has been received?" + return not self._in_queue.empty() class BlockingHBSocketChannel(HBSocketChannel):