##// END OF EJS Templates
Basic raw_input implementation is now working.
epatters -
Show More
@@ -179,10 +179,12 b' class FrontendWidget(HistoryConsoleWidget):'
179 179 # Disconnect the old kernel manager's channels.
180 180 sub = self._kernel_manager.sub_channel
181 181 xreq = self._kernel_manager.xreq_channel
182 rep = self._kernel_manager.rep_channel
182 183 sub.message_received.disconnect(self._handle_sub)
183 184 xreq.execute_reply.disconnect(self._handle_execute_reply)
184 185 xreq.complete_reply.disconnect(self._handle_complete_reply)
185 186 xreq.object_info_reply.disconnect(self._handle_object_info_reply)
187 rep.readline_requested.disconnect(self._handle_req)
186 188
187 189 # Handle the case where the old kernel manager is still listening.
188 190 if self._kernel_manager.channels_running:
@@ -200,10 +202,12 b' class FrontendWidget(HistoryConsoleWidget):'
200 202 # Connect the new kernel manager's channels.
201 203 sub = kernel_manager.sub_channel
202 204 xreq = kernel_manager.xreq_channel
205 rep = kernel_manager.rep_channel
203 206 sub.message_received.connect(self._handle_sub)
204 207 xreq.execute_reply.connect(self._handle_execute_reply)
205 208 xreq.complete_reply.connect(self._handle_complete_reply)
206 209 xreq.object_info_reply.connect(self._handle_object_info_reply)
210 rep.readline_requested.connect(self._handle_req)
207 211
208 212 # Handle the case where the kernel manager started channels before
209 213 # we connected.
@@ -292,10 +296,9 b' class FrontendWidget(HistoryConsoleWidget):'
292 296 if position == self.textCursor().position():
293 297 self._call_tip()
294 298
295 def _handle_req(self):
299 def _handle_req(self, req):
296 300 def callback(line):
297 print repr(line)
298 self._show_prompt()
301 self.kernel_manager.rep_channel.readline(line)
299 302 self._readline(callback=callback)
300 303
301 304 def _handle_sub(self, omsg):
@@ -95,6 +95,12 b' class QtXReqSocketChannel(XReqSocketChannel, QtCore.QObject):'
95 95
96 96 class QtRepSocketChannel(RepSocketChannel, QtCore.QObject):
97 97
98 # Emitted when any message is received.
99 message_received = QtCore.pyqtSignal(object)
100
101 # Emitted when a readline request is received.
102 readline_requested = QtCore.pyqtSignal(object)
103
98 104 #---------------------------------------------------------------------------
99 105 # 'object' interface
100 106 #---------------------------------------------------------------------------
@@ -105,6 +111,21 b' class QtRepSocketChannel(RepSocketChannel, QtCore.QObject):'
105 111 QtCore.QObject.__init__(self)
106 112 RepSocketChannel.__init__(self, *args, **kw)
107 113
114 #---------------------------------------------------------------------------
115 # 'RepSocketChannel' interface
116 #---------------------------------------------------------------------------
117
118 def call_handlers(self, msg):
119 """ Reimplemented to emit signals instead of making callbacks.
120 """
121 # Emit the generic signal.
122 self.message_received.emit(msg)
123
124 # Emit signals for specialized message types.
125 msg_type = msg['msg_type']
126 if msg_type == 'readline_request':
127 self.readline_requested.emit(msg)
128
108 129
109 130 class QtKernelManager(KernelManager, QtCore.QObject):
110 131 """ A KernelManager that provides signals and slots.
@@ -55,9 +55,10 b' class InStream(object):'
55 55 if self.socket is None:
56 56 raise ValueError(u'I/O operation on closed file')
57 57 else:
58 content = { u'size' : unicode(size) }
59 msg = self.session.msg(u'readline', content=content)
60 return self._request(msg)
58 content = dict(size=size)
59 msg = self.session.msg('readline_request', content=content)
60 reply = self._request(msg)
61 return reply['content']['line']
61 62
62 63 def readlines(self, size=-1):
63 64 raise NotImplementedError
@@ -83,7 +84,7 b' class InStream(object):'
83 84 raise
84 85 else:
85 86 break
86 return reply[u'content'][u'data']
87 return reply
87 88
88 89
89 90 class OutStream(object):
@@ -199,7 +199,6 b' class XReqSocketChannel(ZmqSocketChannel):'
199 199 Returns
200 200 -------
201 201 The msg_id of the message sent.
202
203 202 """
204 203 content = dict(text=text, line=line)
205 204 msg = self.session.msg('complete_request', content)
@@ -338,24 +337,84 b' class SubSocketChannel(ZmqSocketChannel):'
338 337 class RepSocketChannel(ZmqSocketChannel):
339 338 """A reply channel to handle raw_input requests that the kernel makes."""
340 339
340 msg_queue = None
341
342 def __init__(self, context, session, address):
343 self.msg_queue = Queue()
344 super(RepSocketChannel, self).__init__(context, session, address)
345
341 346 def run(self):
342 347 """The thread's main activity. Call start() instead."""
348 self.socket = self.context.socket(zmq.XREQ)
349 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
350 self.socket.connect('tcp://%s:%i' % self.address)
343 351 self.ioloop = ioloop.IOLoop()
352 self.iostate = POLLERR|POLLIN
353 self.ioloop.add_handler(self.socket, self._handle_events,
354 self.iostate)
344 355 self.ioloop.start()
345 356
346 357 def stop(self):
347 358 self.ioloop.stop()
348 359 super(RepSocketChannel, self).stop()
349 360
350 def on_raw_input(self):
361 def call_handlers(self, msg):
362 """This method is called in the ioloop thread when a message arrives.
363
364 Subclasses should override this method to handle incoming messages.
365 It is important to remember that this method is called in the thread
366 so that some logic must be done to ensure that the application leve
367 handlers are called in the application thread.
368 """
369 raise NotImplementedError('call_handlers must be defined in a subclass.')
370
371 def readline(self, line):
372 """A send a line of raw input to the kernel.
373
374 Parameters
375 ----------
376 line : str
377 The line of the input.
378 """
379 content = dict(line=line)
380 msg = self.session.msg('readline_reply', content)
381 self._queue_reply(msg)
382
383 def _handle_events(self, socket, events):
384 if events & POLLERR:
385 self._handle_err()
386 if events & POLLOUT:
387 self._handle_send()
388 if events & POLLIN:
389 self._handle_recv()
390
391 def _handle_recv(self):
392 msg = self.socket.recv_json()
393 self.call_handlers(msg)
394
395 def _handle_send(self):
396 try:
397 msg = self.msg_queue.get(False)
398 except Empty:
351 399 pass
400 else:
401 self.socket.send_json(msg)
402 if self.msg_queue.empty():
403 self.drop_io_state(POLLOUT)
404
405 def _handle_err(self):
406 # We don't want to let this go silently, so eventually we should log.
407 raise zmq.ZMQError()
408
409 def _queue_reply(self, msg):
410 self.msg_queue.put(msg)
411 self.add_io_state(POLLOUT)
352 412
353 413
354 414 #-----------------------------------------------------------------------------
355 415 # Main kernel manager class
356 416 #-----------------------------------------------------------------------------
357 417
358
359 418 class KernelManager(HasTraits):
360 419 """ Manages a kernel for a frontend.
361 420
General Comments 0
You need to be logged in to leave comments. Login now