##// END OF EJS Templates
* Refactored KernelManager to use Traitlets and to have its channels as attributes...
epatters -
Show More
@@ -9,9 +9,6 b' from pygments.lexers import PythonLexer'
9 9 from PyQt4 import QtCore, QtGui
10 10 import zmq
11 11
12 # IPython imports.
13 from IPython.zmq.session import Message, Session
14
15 12 # Local imports
16 13 from call_tip_widget import CallTipWidget
17 14 from completion_lexer import CompletionLexer
@@ -19,48 +16,6 b' from console_widget import HistoryConsoleWidget'
19 16 from pygments_highlighter import PygmentsHighlighter
20 17
21 18
22 class FrontendReplyThread(Thread, QtCore.QObject):
23 """ A Thread that receives a reply from the kernel for the frontend.
24 """
25
26 finished = QtCore.pyqtSignal()
27 output_received = QtCore.pyqtSignal(Message)
28 reply_received = QtCore.pyqtSignal(Message)
29
30 def __init__(self, parent):
31 """ Create a FrontendReplyThread for the specified frontend.
32 """
33 assert isinstance(parent, FrontendWidget)
34 QtCore.QObject.__init__(self, parent)
35 Thread.__init__(self)
36
37 self.sleep_time = 0.05
38
39 def run(self):
40 """ The starting point for the thread.
41 """
42 frontend = self.parent()
43 while True:
44 rep = frontend._recv_reply()
45 if rep is not None:
46 self._recv_output()
47 self.reply_received.emit(rep)
48 break
49
50 self._recv_output()
51 time.sleep(self.sleep_time)
52
53 self.finished.emit()
54
55 def _recv_output(self):
56 """ Send any output to the frontend.
57 """
58 frontend = self.parent()
59 omsgs = frontend._recv_output()
60 for omsg in omsgs:
61 self.output_received.emit(omsg)
62
63
64 19 class FrontendHighlighter(PygmentsHighlighter):
65 20 """ A Python PygmentsHighlighter that can be turned on and off and which
66 21 knows about continuation prompts.
@@ -96,7 +51,7 b' class FrontendWidget(HistoryConsoleWidget):'
96 51 """
97 52
98 53 # Emitted when an 'execute_reply' is received from the kernel.
99 executed = QtCore.pyqtSignal(Message)
54 executed = QtCore.pyqtSignal(object)
100 55
101 56 #---------------------------------------------------------------------------
102 57 # 'QWidget' interface
@@ -109,6 +64,7 b' class FrontendWidget(HistoryConsoleWidget):'
109 64 self._compile = CommandCompiler()
110 65 self._completion_lexer = CompletionLexer(PythonLexer())
111 66 self._highlighter = FrontendHighlighter(self)
67 self._kernel_manager = None
112 68
113 69 self.document().contentsChange.connect(self._document_contents_change)
114 70
@@ -183,14 +139,7 b' class FrontendWidget(HistoryConsoleWidget):'
183 139
184 140 executed = code is not None
185 141 if executed:
186 msg = self.session.send(self.request_socket, 'execute_request',
187 dict(code=source))
188 thread = FrontendReplyThread(self)
189 if not hidden:
190 thread.output_received.connect(self._handle_output)
191 thread.reply_received.connect(self._handle_reply)
192 thread.finished.connect(thread.deleteLater)
193 thread.start()
142 self.kernel_manager.xreq_channel.execute(source)
194 143 else:
195 144 space = 0
196 145 for char in lines[-1]:
@@ -221,9 +170,25 b' class FrontendWidget(HistoryConsoleWidget):'
221 170 def _set_kernel_manager(self, kernel_manager):
222 171 """ Sets a new kernel manager, configuring its channels as necessary.
223 172 """
173 # Disconnect the old kernel manager.
174 if self._kernel_manager is not None:
175 sub = self._kernel_manager.sub_channel
176 xreq = self._kernel_manager.xreq_channel
177 sub.message_received.disconnect(self._handle_sub)
178 xreq.execute_reply.disconnect(self._handle_execute_reply)
179 xreq.complete_reply.disconnect(self._handle_complete_reply)
180 xreq.object_info_reply.disconnect(self._handle_object_info_reply)
181
182 # Connect the new kernel manager.
224 183 self._kernel_manager = kernel_manager
225 self._sub_channel = kernel_manager.get_sub_channel()
226 self._xreq_channel = kernel_manager.get_xreq_channel()
184 sub = kernel_manager.sub_channel
185 xreq = kernel_manager.xreq_channel
186 sub.message_received.connect(self._handle_sub)
187 xreq.execute_reply.connect(self._handle_execute_reply)
188 #xreq.complete_reply.connect(self._handle_complete_reply)
189 #xreq.object_info_repy.connect(self._handle_object_info_reply)
190
191 self._show_prompt('>>> ')
227 192
228 193 kernel_manager = property(_get_kernel_manager, _set_kernel_manager)
229 194
@@ -282,11 +247,6 b' class FrontendWidget(HistoryConsoleWidget):'
282 247 self._complete_with_items(cursor, matches)
283 248 return True
284 249
285 def _kernel_connected(self):
286 """ Called when the frontend is connected to a kernel.
287 """
288 self._show_prompt('>>> ')
289
290 250 def _get_context(self, cursor=None):
291 251 """ Gets the context at the current cursor location.
292 252 """
@@ -310,46 +270,32 b' class FrontendWidget(HistoryConsoleWidget):'
310 270 if position == self.textCursor().position():
311 271 self._call_tip()
312 272
313 def _handle_output(self, omsg):
314 handler = getattr(self, '_handle_%s' % omsg.msg_type, None)
273 def _handle_sub(self, omsg):
274 handler = getattr(self, '_handle_%s' % omsg['msg_type'], None)
315 275 if handler is not None:
316 276 handler(omsg)
317 277
318 278 def _handle_pyout(self, omsg):
319 if omsg.parent_header.session == self.session.session:
320 self.appendPlainText(omsg.content.data + '\n')
279 session = omsg['parent_header']['session']
280 if session == self.kernel_manager.session.session:
281 self.appendPlainText(omsg['content']['data'] + '\n')
321 282
322 283 def _handle_stream(self, omsg):
323 self.appendPlainText(omsg.content.data)
284 self.appendPlainText(omsg['content']['data'])
324 285
325 def _handle_reply(self, rep):
326 if rep is not None:
327 if rep.msg_type == 'execute_reply':
328 if rep.content.status == 'error':
329 self.appendPlainText(rep.content.traceback[-1])
330 elif rep.content.status == 'aborted':
331 text = "ERROR: ABORTED\n"
332 ab = self.messages[rep.parent_header.msg_id].content
333 if 'code' in ab:
334 text += ab.code
335 else:
336 text += ab
337 self.appendPlainText(text)
338 self._show_prompt('>>> ')
339 self.executed.emit(rep)
286 def _handle_execute_reply(self, rep):
287 content = rep['content']
288 status = content['status']
289 if status == 'error':
290 self.appendPlainText(content['traceback'][-1])
291 elif status == 'aborted':
292 text = "ERROR: ABORTED\n"
293 self.appendPlainText(text)
294 self._show_prompt('>>> ')
295 self.executed.emit(rep)
340 296
341 297 #------ Communication methods ----------------------------------------------
342 298
343 def _recv_output(self):
344 omsgs = []
345 while True:
346 omsg = self.session.recv(self.sub_socket)
347 if omsg is None:
348 break
349 else:
350 omsgs.append(omsg)
351 return omsgs
352
353 299 def _recv_reply(self):
354 300 return self.session.recv(self.request_socket)
355 301
@@ -364,26 +310,19 b' class FrontendWidget(HistoryConsoleWidget):'
364 310
365 311 if __name__ == '__main__':
366 312 import sys
313 from IPython.frontend.qt.kernelmanager import QtKernelManager
367 314
368 # Defaults
369 ip = '127.0.0.1'
370 port_base = 5555
371 connection = ('tcp://%s' % ip) + ':%i'
372 req_conn = connection % port_base
373 sub_conn = connection % (port_base+1)
374
375 # Create initial sockets
376 c = zmq.Context()
377 request_socket = c.socket(zmq.XREQ)
378 request_socket.connect(req_conn)
379 sub_socket = c.socket(zmq.SUB)
380 sub_socket.connect(sub_conn)
381 sub_socket.setsockopt(zmq.SUBSCRIBE, '')
315 # Create KernelManager
316 xreq_addr = ('127.0.0.1', 5575)
317 sub_addr = ('127.0.0.1', 5576)
318 rep_addr = ('127.0.0.1', 5577)
319 kernel_manager = QtKernelManager(xreq_addr, sub_addr, rep_addr)
320 kernel_manager.sub_channel.start()
321 kernel_manager.xreq_channel.start()
382 322
383 323 # Launch application
384 324 app = QtGui.QApplication(sys.argv)
385 widget = FrontendWidget(request_socket=request_socket,
386 sub_socket=sub_socket)
325 widget = FrontendWidget(kernel_manager)
387 326 widget.setWindowTitle('Python')
388 327 widget.resize(640, 480)
389 328 widget.show()
@@ -5,29 +5,30 b''
5 5 from PyQt4 import QtCore
6 6
7 7 # IPython imports.
8 from IPython.zmq.kernel_manager import KernelManager, SubSocketChannel, \
8 from IPython.zmq.kernelmanager import KernelManager, SubSocketChannel, \
9 9 XReqSocketChannel, RepSocketChannel
10 10
11 11
12 class QtKernelManager(KernelManager):
13 """ A KernelManager that provides channels that use signals and slots.
14 """
15
16 sub_channel_class = QtSubSocketChannel
17 xreq_channel_class = QtXReqSocketChannel
18 rep_channel_class = QtRepSocketChannel
19
20
21 12 class QtSubSocketChannel(SubSocketChannel, QtCore.QObject):
22 13
23 14 # Emitted when any message is received.
24 message_received = QtCore.pyqtSignal(dict)
15 message_received = QtCore.pyqtSignal(object)
25 16
26 17 # Emitted when a message of type 'pyout' or 'stdout' is received.
27 output_received = QtCore.pyqtSignal(dict)
18 output_received = QtCore.pyqtSignal(object)
28 19
29 20 # Emitted when a message of type 'pyerr' or 'stderr' is received.
30 error_received = QtCore.pyqtSignal(dict)
21 error_received = QtCore.pyqtSignal(object)
22
23 #---------------------------------------------------------------------------
24 # 'object' interface
25 #---------------------------------------------------------------------------
26
27 def __init__(self, *args, **kw):
28 """ Reimplemented to ensure that QtCore.QObject is initialized first.
29 """
30 QtCore.QObject.__init__(self)
31 SubSocketChannel.__init__(self, *args, **kw)
31 32
32 33 #---------------------------------------------------------------------------
33 34 # 'SubSocketChannel' interface
@@ -50,12 +51,22 b' class QtSubSocketChannel(SubSocketChannel, QtCore.QObject):'
50 51 class QtXReqSocketChannel(XReqSocketChannel, QtCore.QObject):
51 52
52 53 # Emitted when any message is received.
53 message_received = QtCore.pyqtSignal(dict)
54 message_received = QtCore.pyqtSignal(object)
54 55
55 56 # Emitted when a reply has been received for the corresponding request type.
56 execute_reply = QtCore.pyqtSignal(dict)
57 complete_reply = QtCore.pyqtSignal(dict)
58 object_info_reply = QtCore.pyqtSignal(dict)
57 execute_reply = QtCore.pyqtSignal(object)
58 complete_reply = QtCore.pyqtSignal(object)
59 object_info_reply = QtCore.pyqtSignal(object)
60
61 #---------------------------------------------------------------------------
62 # 'object' interface
63 #---------------------------------------------------------------------------
64
65 def __init__(self, *args, **kw):
66 """ Reimplemented to ensure that QtCore.QObject is initialized first.
67 """
68 QtCore.QObject.__init__(self)
69 XReqSocketChannel.__init__(self, *args, **kw)
59 70
60 71 #---------------------------------------------------------------------------
61 72 # 'XReqSocketChannel' interface
@@ -73,7 +84,29 b' class QtXReqSocketChannel(XReqSocketChannel, QtCore.QObject):'
73 84 if signal:
74 85 signal.emit(msg)
75 86
87 def _queue_request(self, msg, callback):
88 """ Reimplemented to skip callback handling.
89 """
90 self.command_queue.put(msg)
91
76 92
77 93 class QtRepSocketChannel(RepSocketChannel, QtCore.QObject):
78 94
79 pass
95 #---------------------------------------------------------------------------
96 # 'object' interface
97 #---------------------------------------------------------------------------
98
99 def __init__(self, *args, **kw):
100 """ Reimplemented to ensure that QtCore.QObject is initialized first.
101 """
102 QtCore.QObject.__init__(self)
103 RepSocketChannel.__init__(self, *args, **kw)
104
105
106 class QtKernelManager(KernelManager):
107 """ A KernelManager that provides channels that use signals and slots.
108 """
109
110 sub_channel_class = QtSubSocketChannel
111 xreq_channel_class = QtXReqSocketChannel
112 rep_channel_class = QtRepSocketChannel
@@ -4,13 +4,18 b' TODO: Create logger to handle debugging and console messages.'
4 4
5 5 """
6 6
7 # Standard library imports.
7 8 from Queue import Queue, Empty
8 9 from threading import Thread
9 10 import traceback
10 11
12 # System library imports.
11 13 import zmq
12 14 from zmq import POLLIN, POLLOUT, POLLERR
13 15 from zmq.eventloop import ioloop
16
17 # Local imports.
18 from IPython.utils.traitlets import HasTraits, Any, Int, Instance, Str, Type
14 19 from session import Session
15 20
16 21
@@ -18,56 +23,6 b' class MissingHandlerError(Exception):'
18 23 pass
19 24
20 25
21 class KernelManager(object):
22
23 sub_channel_class = SubSocketChannel
24 xreq_channel_class = XReqSocketChannel
25 rep_channel_class = RepSocketChannel
26
27 def __init__(self, xreq_addr, sub_addr, rep_addr,
28 context=None, session=None):
29 self.context = zmq.Context() if context is None else context
30 self.session = Session() if session is None else session
31 self.xreq_addr = xreq_addr
32 self.sub_addr = sub_addr
33 self.rep_addr = rep_addr
34
35 def start_kernel(self):
36 """Start a localhost kernel on ip and port.
37
38 The SUB channel is for the frontend to receive messages published by
39 the kernel.
40
41 The REQ channel is for the frontend to make requests of the kernel.
42
43 The REP channel is for the kernel to request stdin (raw_input) from
44 the frontend.
45 """
46
47 def kill_kernel(self):
48 """Kill the running kernel"""
49
50 def is_alive(self):
51 """Is the kernel alive?"""
52 return True
53
54 def signal_kernel(self, signum):
55 """Send signum to the kernel."""
56
57 def get_sub_channel(self):
58 """Get the SUB socket channel object."""
59 return self.sub_channel_class(self.context, self.session, self.sub_addr)
60
61 def get_xreq_channel(self):
62 """Get the REQ socket channel object to make requests of the kernel."""
63 return self.xreq_channel_class(self.context, self.session,
64 self.xreq_addr)
65
66 def get_rep_channel(self):
67 """Get the REP socket channel object to handle stdin (raw_input)."""
68 return self.rep_channel_class(self.context, self.session, self.rep_addr)
69
70
71 26 class ZmqSocketChannel(Thread):
72 27
73 28 socket = None
@@ -231,7 +186,8 b' class XReqSocketChannel(ZmqSocketChannel):'
231 186 return callback
232 187 handler = self.handlers.get(name)
233 188 if handler is None:
234 raise MissingHandlerError('No handler defined for method: %s' % name)
189 raise MissingHandlerError(
190 'No handler defined for method: %s' % name)
235 191 return handler
236 192
237 193 def override_call_handler(self, func):
@@ -273,3 +229,90 b' class RepSocketChannel(ZmqSocketChannel):'
273 229
274 230 def on_raw_input():
275 231 pass
232
233
234 class KernelManager(HasTraits):
235
236 # The addresses to use for the various channels. Should be tuples of form
237 # (ip_address, port).
238 sub_address = Any
239 xreq_address = Any
240 rep_address = Any
241 # FIXME: Add Tuple to Traitlets.
242 #sub_address = Tuple(Str, Int)
243 #xreq_address = Tuple(Str, Int)
244 #rep_address = Tuple(Str, Int)
245
246 # The PyZMQ Context to use for communication with the kernel.
247 context = Instance(zmq.Context, ())
248
249 # The Session to use for communication with the kernel.
250 session = Instance(Session, ())
251
252 # The classes to use for the various channels.
253 sub_channel_class = Type(SubSocketChannel)
254 xreq_channel_class = Type(XReqSocketChannel)
255 rep_channel_class = Type(RepSocketChannel)
256
257 # Protected traits.
258 _sub_channel = Any
259 _xreq_channel = Any
260 _rep_channel = Any
261
262 def __init__(self, xreq_address, sub_address, rep_address, **traits):
263 super(KernelManager, self).__init__()
264
265 self.xreq_address = xreq_address
266 self.sub_address = sub_address
267 self.rep_address = rep_address
268
269 # FIXME: This should be the business of HasTraits. The convention is:
270 # HasTraits.__init__(self, **traits_to_be_initialized.)
271 for trait in traits:
272 setattr(self, trait, traits[trait])
273
274 def start_kernel(self):
275 """Start a localhost kernel on ip and port.
276
277 The SUB channel is for the frontend to receive messages published by
278 the kernel.
279
280 The REQ channel is for the frontend to make requests of the kernel.
281
282 The REP channel is for the kernel to request stdin (raw_input) from
283 the frontend.
284 """
285
286 def kill_kernel(self):
287 """Kill the running kernel"""
288
289 def is_alive(self):
290 """Is the kernel alive?"""
291 return True
292
293 def signal_kernel(self, signum):
294 """Send signum to the kernel."""
295
296 @property
297 def sub_channel(self):
298 """Get the SUB socket channel object."""
299 if self._sub_channel is None:
300 self._sub_channel = self.sub_channel_class(
301 self.context, self.session, self.sub_address)
302 return self._sub_channel
303
304 @property
305 def xreq_channel(self):
306 """Get the REQ socket channel object to make requests of the kernel."""
307 if self._xreq_channel is None:
308 self._xreq_channel = self.xreq_channel_class(
309 self.context, self.session, self.xreq_address)
310 return self._xreq_channel
311
312 @property
313 def rep_channel(self):
314 """Get the REP socket channel object to handle stdin (raw_input)."""
315 if self._rep_channel is None:
316 self._rep_channel = self.rep_channel_class(
317 self.context, self.session, self.rep_address)
318 return self._rep_channel
@@ -9,8 +9,8 b" rep_addr = ('127.0.0.1', 5577)"
9 9
10 10
11 11 km = KernelManager(xreq_addr, sub_addr, rep_addr)
12 # xreq_channel = km.get_xreq_channel()
13 sub_channel = km.get_sub_channel()
12 # xreq_channel = km.xreq_channel
13 sub_channel = km.sub_channel
14 14
15 15 # xreq_channel.start()
16 16 sub_channel.start()
@@ -49,4 +49,4 b' for i in range(100):'
49 49 time.sleep(1)
50 50
51 51 # xreq_channel.join()
52 sub_channel.join() No newline at end of file
52 sub_channel.join()
General Comments 0
You need to be logged in to leave comments. Login now