From 18dcf1ded288afbaceb1b69d68a3a4b05e33ec49 2010-08-10 19:24:30 From: epatters Date: 2010-08-10 19:24:30 Subject: [PATCH] * Change input mechanism: replace raw_input instead of stdin. * Made 'kernel' a public attribute of KernelManager. --- diff --git a/IPython/frontend/qt/console/frontend_widget.py b/IPython/frontend/qt/console/frontend_widget.py index c3ae3ea..9387046 100644 --- a/IPython/frontend/qt/console/frontend_widget.py +++ b/IPython/frontend/qt/console/frontend_widget.py @@ -192,7 +192,7 @@ class FrontendWidget(HistoryConsoleWidget): xreq.execute_reply.disconnect(self._handle_execute_reply) xreq.complete_reply.disconnect(self._handle_complete_reply) xreq.object_info_reply.disconnect(self._handle_object_info_reply) - rep.readline_requested.disconnect(self._handle_req) + rep.input_requested.disconnect(self._handle_req) # Handle the case where the old kernel manager is still listening. if self._kernel_manager.channels_running: @@ -215,7 +215,7 @@ class FrontendWidget(HistoryConsoleWidget): xreq.execute_reply.connect(self._handle_execute_reply) xreq.complete_reply.connect(self._handle_complete_reply) xreq.object_info_reply.connect(self._handle_object_info_reply) - rep.readline_requested.connect(self._handle_req) + rep.input_requested.connect(self._handle_req) # Handle the case where the kernel manager started channels before # we connected. @@ -325,8 +325,8 @@ class FrontendWidget(HistoryConsoleWidget): self.kernel_manager.sub_channel.flush() def callback(line): - self.kernel_manager.rep_channel.readline(line) - self._readline(callback=callback) + self.kernel_manager.rep_channel.input(line) + self._readline(req['content']['prompt'], callback=callback) def _handle_sub(self, omsg): if self._hidden: diff --git a/IPython/frontend/qt/kernelmanager.py b/IPython/frontend/qt/kernelmanager.py index f2e21ea..9d3550b 100644 --- a/IPython/frontend/qt/kernelmanager.py +++ b/IPython/frontend/qt/kernelmanager.py @@ -98,8 +98,8 @@ class QtRepSocketChannel(RepSocketChannel, QtCore.QObject): # Emitted when any message is received. message_received = QtCore.pyqtSignal(object) - # Emitted when a readline request is received. - readline_requested = QtCore.pyqtSignal(object) + # Emitted when an input request is received. + input_requested = QtCore.pyqtSignal(object) #--------------------------------------------------------------------------- # 'object' interface @@ -123,8 +123,8 @@ class QtRepSocketChannel(RepSocketChannel, QtCore.QObject): # Emit signals for specialized message types. msg_type = msg['msg_type'] - if msg_type == 'readline_request': - self.readline_requested.emit(msg) + if msg_type == 'input_request': + self.input_requested.emit(msg) class QtKernelManager(KernelManager, QtCore.QObject): diff --git a/IPython/zmq/kernel.py b/IPython/zmq/kernel.py index 285fd66..75bf654 100755 --- a/IPython/zmq/kernel.py +++ b/IPython/zmq/kernel.py @@ -37,93 +37,6 @@ from completer import KernelCompleter # Kernel and stream classes #----------------------------------------------------------------------------- -class InStream(object): - """ A file like object that reads from a 0MQ XREQ socket.""" - - def __init__(self, session, socket): - self.session = session - self.socket = socket - - def close(self): - self.socket = None - - def flush(self): - if self.socket is None: - raise ValueError('I/O operation on closed file') - - def isatty(self): - return False - - def next(self): - raise IOError('Seek not supported.') - - def read(self, size=-1): - # FIXME: Do we want another request for this? - string = '\n'.join(self.readlines()) - return self._truncate(string, size) - - def readline(self, size=-1): - if self.socket is None: - raise ValueError('I/O operation on closed file') - else: - content = dict(size=size) - msg = self.session.msg('readline_request', content=content) - reply = self._request(msg) - line = reply['content']['line'] - return self._truncate(line, size) - - def readlines(self, sizehint=-1): - # Sizehint is ignored, as is permitted. - if self.socket is None: - raise ValueError('I/O operation on closed file') - else: - lines = [] - while True: - line = self.readline() - if line: - lines.append(line) - else: - break - return lines - - def seek(self, offset, whence=None): - raise IOError('Seek not supported.') - - def write(self, string): - raise IOError('Write not supported on a read only stream.') - - def writelines(self, sequence): - raise IOError('Write not supported on a read only stream.') - - def _request(self, msg): - # Flush output before making the request. This ensures, for example, - # that raw_input(prompt) actually gets a prompt written. - sys.stderr.flush() - sys.stdout.flush() - - self.socket.send_json(msg) - while True: - try: - reply = self.socket.recv_json(zmq.NOBLOCK) - except zmq.ZMQError, e: - if e.errno == zmq.EAGAIN: - pass - else: - raise - else: - break - return reply - - def _truncate(self, string, size): - if size >= 0: - if isinstance(string, str): - return string[:size] - elif isinstance(string, unicode): - encoded = string.encode('utf-8')[:size] - return encoded.decode('utf-8', 'ignore') - return string - - class OutStream(object): """A file like object that publishes the stream to a 0MQ PUB socket.""" @@ -215,10 +128,11 @@ class DisplayHook(object): class Kernel(object): - def __init__(self, session, reply_socket, pub_socket): + def __init__(self, session, reply_socket, pub_socket, req_socket): self.session = session self.reply_socket = reply_socket self.pub_socket = pub_socket + self.req_socket = req_socket self.user_ns = {} self.history = [] self.compiler = CommandCompiler() @@ -265,7 +179,15 @@ class Kernel(object): try: comp_code = self.compiler(code, '') + + # Replace raw_input. Note that is not sufficient to replace + # raw_input in the user namespace. + raw_input = lambda prompt='': self.raw_input(prompt, ident, parent) + __builtin__.raw_input = raw_input + + # Configure the display hook. sys.displayhook.set_parent(parent) + exec comp_code in self.user_ns, self.user_ns except: result = u'error' @@ -295,6 +217,26 @@ class Kernel(object): if reply_msg['content']['status'] == u'error': self.abort_queue() + def raw_input(self, prompt, ident, parent): + # Flush output before making the request. + sys.stderr.flush() + sys.stdout.flush() + + # Send the input request. + content = dict(prompt=prompt) + msg = self.session.msg(u'input_request', content, parent) + self.req_socket.send_json(msg) + + # Await a response. + reply = self.req_socket.recv_json() + try: + value = reply['content']['value'] + except: + print>>sys.__stderr__, "Got bad raw_input reply: " + print>>sys.__stderr__, Message(parent) + value = '' + return value + def complete_request(self, ident, parent): matches = {'matches' : self.complete(parent), 'status' : 'ok'} @@ -345,7 +287,7 @@ class Kernel(object): def start(self): while True: ident = self.reply_socket.recv() - assert self.reply_socket.rcvmore(), "Unexpected missing message part." + assert self.reply_socket.rcvmore(), "Missing message part." msg = self.reply_socket.recv_json() omsg = Message(msg) print>>sys.__stdout__ @@ -362,7 +304,7 @@ class Kernel(object): class ExitPollerUnix(Thread): """ A Unix-specific daemon thread that terminates the program immediately - when this process' parent process no longer exists. + when the parent process no longer exists. """ def __init__(self): @@ -452,13 +394,12 @@ def main(): print >>sys.__stdout__, "REQ Channel on port", req_port # Redirect input streams and set a display hook. - sys.stdin = InStream(session, req_socket) sys.stdout = OutStream(session, pub_socket, u'stdout') sys.stderr = OutStream(session, pub_socket, u'stderr') sys.displayhook = DisplayHook(session, pub_socket) # Create the kernel. - kernel = Kernel(session, reply_socket, pub_socket) + kernel = Kernel(session, reply_socket, pub_socket, req_socket) # Configure this kernel/process to die on parent termination, if necessary. if namespace.parent: diff --git a/IPython/zmq/kernelmanager.py b/IPython/zmq/kernelmanager.py index e0089e2..d40023b 100644 --- a/IPython/zmq/kernelmanager.py +++ b/IPython/zmq/kernelmanager.py @@ -368,16 +368,10 @@ class RepSocketChannel(ZmqSocketChannel): """ raise NotImplementedError('call_handlers must be defined in a subclass.') - def readline(self, line): - """A send a line of raw input to the kernel. - - Parameters - ---------- - line : str - The line of the input. - """ - content = dict(line=line) - msg = self.session.msg('readline_reply', content) + def input(self, string): + """Send a string of raw input to the kernel.""" + content = dict(value=string) + msg = self.session.msg('input_reply', content) self._queue_reply(msg) def _handle_events(self, socket, events): @@ -432,13 +426,15 @@ class KernelManager(HasTraits): # The Session to use for communication with the kernel. session = Instance(Session) + # The kernel process with which the KernelManager is communicating. + kernel = Instance(Popen) + # The classes to use for the various channels. xreq_channel_class = Type(XReqSocketChannel) sub_channel_class = Type(SubSocketChannel) rep_channel_class = Type(RepSocketChannel) # Protected traits. - _kernel = Instance(Popen) _xreq_address = Any _sub_address = Any _rep_address = Any @@ -504,9 +500,8 @@ class KernelManager(HasTraits): "Make sure that the '*_address' attributes are " "configured properly.") - kernel, xrep, pub, req = launch_kernel( + self.kernel, xrep, pub, req = launch_kernel( xrep_port=xreq[1], pub_port=sub[1], req_port=rep[1]) - self._kernel = kernel self._xreq_address = (LOCALHOST, xrep) self._sub_address = (LOCALHOST, pub) self._rep_address = (LOCALHOST, req) @@ -515,31 +510,29 @@ class KernelManager(HasTraits): def has_kernel(self): """Returns whether a kernel process has been specified for the kernel manager. - - A kernel process can be set via 'start_kernel' or 'set_kernel'. """ - return self._kernel is not None + return self.kernel is not None def kill_kernel(self): """ Kill the running kernel. """ - if self._kernel is not None: - self._kernel.kill() - self._kernel = None + if self.kernel is not None: + self.kernel.kill() + self.kernel = None else: raise RuntimeError("Cannot kill kernel. No kernel is running!") def signal_kernel(self, signum): """ Sends a signal to the kernel. """ - if self._kernel is not None: - self._kernel.send_signal(signum) + if self.kernel is not None: + self.kernel.send_signal(signum) else: raise RuntimeError("Cannot signal kernel. No kernel is running!") @property def is_alive(self): """Is the kernel process still running?""" - if self._kernel is not None: - if self._kernel.poll() is None: + if self.kernel is not None: + if self.kernel.poll() is None: return True else: return False