##// END OF EJS Templates
* Refactored KernelManager to use Traitlets and to have its channels as attributes...
epatters -
Show More
@@ -9,9 +9,6 b' from pygments.lexers import PythonLexer'
9 from PyQt4 import QtCore, QtGui
9 from PyQt4 import QtCore, QtGui
10 import zmq
10 import zmq
11
11
12 # IPython imports.
13 from IPython.zmq.session import Message, Session
14
15 # Local imports
12 # Local imports
16 from call_tip_widget import CallTipWidget
13 from call_tip_widget import CallTipWidget
17 from completion_lexer import CompletionLexer
14 from completion_lexer import CompletionLexer
@@ -19,48 +16,6 b' from console_widget import HistoryConsoleWidget'
19 from pygments_highlighter import PygmentsHighlighter
16 from pygments_highlighter import PygmentsHighlighter
20
17
21
18
22 class FrontendReplyThread(Thread, QtCore.QObject):
23 """ A Thread that receives a reply from the kernel for the frontend.
24 """
25
26 finished = QtCore.pyqtSignal()
27 output_received = QtCore.pyqtSignal(Message)
28 reply_received = QtCore.pyqtSignal(Message)
29
30 def __init__(self, parent):
31 """ Create a FrontendReplyThread for the specified frontend.
32 """
33 assert isinstance(parent, FrontendWidget)
34 QtCore.QObject.__init__(self, parent)
35 Thread.__init__(self)
36
37 self.sleep_time = 0.05
38
39 def run(self):
40 """ The starting point for the thread.
41 """
42 frontend = self.parent()
43 while True:
44 rep = frontend._recv_reply()
45 if rep is not None:
46 self._recv_output()
47 self.reply_received.emit(rep)
48 break
49
50 self._recv_output()
51 time.sleep(self.sleep_time)
52
53 self.finished.emit()
54
55 def _recv_output(self):
56 """ Send any output to the frontend.
57 """
58 frontend = self.parent()
59 omsgs = frontend._recv_output()
60 for omsg in omsgs:
61 self.output_received.emit(omsg)
62
63
64 class FrontendHighlighter(PygmentsHighlighter):
19 class FrontendHighlighter(PygmentsHighlighter):
65 """ A Python PygmentsHighlighter that can be turned on and off and which
20 """ A Python PygmentsHighlighter that can be turned on and off and which
66 knows about continuation prompts.
21 knows about continuation prompts.
@@ -96,7 +51,7 b' class FrontendWidget(HistoryConsoleWidget):'
96 """
51 """
97
52
98 # Emitted when an 'execute_reply' is received from the kernel.
53 # Emitted when an 'execute_reply' is received from the kernel.
99 executed = QtCore.pyqtSignal(Message)
54 executed = QtCore.pyqtSignal(object)
100
55
101 #---------------------------------------------------------------------------
56 #---------------------------------------------------------------------------
102 # 'QWidget' interface
57 # 'QWidget' interface
@@ -109,6 +64,7 b' class FrontendWidget(HistoryConsoleWidget):'
109 self._compile = CommandCompiler()
64 self._compile = CommandCompiler()
110 self._completion_lexer = CompletionLexer(PythonLexer())
65 self._completion_lexer = CompletionLexer(PythonLexer())
111 self._highlighter = FrontendHighlighter(self)
66 self._highlighter = FrontendHighlighter(self)
67 self._kernel_manager = None
112
68
113 self.document().contentsChange.connect(self._document_contents_change)
69 self.document().contentsChange.connect(self._document_contents_change)
114
70
@@ -183,14 +139,7 b' class FrontendWidget(HistoryConsoleWidget):'
183
139
184 executed = code is not None
140 executed = code is not None
185 if executed:
141 if executed:
186 msg = self.session.send(self.request_socket, 'execute_request',
142 self.kernel_manager.xreq_channel.execute(source)
187 dict(code=source))
188 thread = FrontendReplyThread(self)
189 if not hidden:
190 thread.output_received.connect(self._handle_output)
191 thread.reply_received.connect(self._handle_reply)
192 thread.finished.connect(thread.deleteLater)
193 thread.start()
194 else:
143 else:
195 space = 0
144 space = 0
196 for char in lines[-1]:
145 for char in lines[-1]:
@@ -221,9 +170,25 b' class FrontendWidget(HistoryConsoleWidget):'
221 def _set_kernel_manager(self, kernel_manager):
170 def _set_kernel_manager(self, kernel_manager):
222 """ Sets a new kernel manager, configuring its channels as necessary.
171 """ Sets a new kernel manager, configuring its channels as necessary.
223 """
172 """
173 # Disconnect the old kernel manager.
174 if self._kernel_manager is not None:
175 sub = self._kernel_manager.sub_channel
176 xreq = self._kernel_manager.xreq_channel
177 sub.message_received.disconnect(self._handle_sub)
178 xreq.execute_reply.disconnect(self._handle_execute_reply)
179 xreq.complete_reply.disconnect(self._handle_complete_reply)
180 xreq.object_info_reply.disconnect(self._handle_object_info_reply)
181
182 # Connect the new kernel manager.
224 self._kernel_manager = kernel_manager
183 self._kernel_manager = kernel_manager
225 self._sub_channel = kernel_manager.get_sub_channel()
184 sub = kernel_manager.sub_channel
226 self._xreq_channel = kernel_manager.get_xreq_channel()
185 xreq = kernel_manager.xreq_channel
186 sub.message_received.connect(self._handle_sub)
187 xreq.execute_reply.connect(self._handle_execute_reply)
188 #xreq.complete_reply.connect(self._handle_complete_reply)
189 #xreq.object_info_repy.connect(self._handle_object_info_reply)
190
191 self._show_prompt('>>> ')
227
192
228 kernel_manager = property(_get_kernel_manager, _set_kernel_manager)
193 kernel_manager = property(_get_kernel_manager, _set_kernel_manager)
229
194
@@ -282,11 +247,6 b' class FrontendWidget(HistoryConsoleWidget):'
282 self._complete_with_items(cursor, matches)
247 self._complete_with_items(cursor, matches)
283 return True
248 return True
284
249
285 def _kernel_connected(self):
286 """ Called when the frontend is connected to a kernel.
287 """
288 self._show_prompt('>>> ')
289
290 def _get_context(self, cursor=None):
250 def _get_context(self, cursor=None):
291 """ Gets the context at the current cursor location.
251 """ Gets the context at the current cursor location.
292 """
252 """
@@ -310,46 +270,32 b' class FrontendWidget(HistoryConsoleWidget):'
310 if position == self.textCursor().position():
270 if position == self.textCursor().position():
311 self._call_tip()
271 self._call_tip()
312
272
313 def _handle_output(self, omsg):
273 def _handle_sub(self, omsg):
314 handler = getattr(self, '_handle_%s' % omsg.msg_type, None)
274 handler = getattr(self, '_handle_%s' % omsg['msg_type'], None)
315 if handler is not None:
275 if handler is not None:
316 handler(omsg)
276 handler(omsg)
317
277
318 def _handle_pyout(self, omsg):
278 def _handle_pyout(self, omsg):
319 if omsg.parent_header.session == self.session.session:
279 session = omsg['parent_header']['session']
320 self.appendPlainText(omsg.content.data + '\n')
280 if session == self.kernel_manager.session.session:
281 self.appendPlainText(omsg['content']['data'] + '\n')
321
282
322 def _handle_stream(self, omsg):
283 def _handle_stream(self, omsg):
323 self.appendPlainText(omsg.content.data)
284 self.appendPlainText(omsg['content']['data'])
324
285
325 def _handle_reply(self, rep):
286 def _handle_execute_reply(self, rep):
326 if rep is not None:
287 content = rep['content']
327 if rep.msg_type == 'execute_reply':
288 status = content['status']
328 if rep.content.status == 'error':
289 if status == 'error':
329 self.appendPlainText(rep.content.traceback[-1])
290 self.appendPlainText(content['traceback'][-1])
330 elif rep.content.status == 'aborted':
291 elif status == 'aborted':
331 text = "ERROR: ABORTED\n"
292 text = "ERROR: ABORTED\n"
332 ab = self.messages[rep.parent_header.msg_id].content
333 if 'code' in ab:
334 text += ab.code
335 else:
336 text += ab
337 self.appendPlainText(text)
293 self.appendPlainText(text)
338 self._show_prompt('>>> ')
294 self._show_prompt('>>> ')
339 self.executed.emit(rep)
295 self.executed.emit(rep)
340
296
341 #------ Communication methods ----------------------------------------------
297 #------ Communication methods ----------------------------------------------
342
298
343 def _recv_output(self):
344 omsgs = []
345 while True:
346 omsg = self.session.recv(self.sub_socket)
347 if omsg is None:
348 break
349 else:
350 omsgs.append(omsg)
351 return omsgs
352
353 def _recv_reply(self):
299 def _recv_reply(self):
354 return self.session.recv(self.request_socket)
300 return self.session.recv(self.request_socket)
355
301
@@ -364,26 +310,19 b' class FrontendWidget(HistoryConsoleWidget):'
364
310
365 if __name__ == '__main__':
311 if __name__ == '__main__':
366 import sys
312 import sys
313 from IPython.frontend.qt.kernelmanager import QtKernelManager
367
314
368 # Defaults
315 # Create KernelManager
369 ip = '127.0.0.1'
316 xreq_addr = ('127.0.0.1', 5575)
370 port_base = 5555
317 sub_addr = ('127.0.0.1', 5576)
371 connection = ('tcp://%s' % ip) + ':%i'
318 rep_addr = ('127.0.0.1', 5577)
372 req_conn = connection % port_base
319 kernel_manager = QtKernelManager(xreq_addr, sub_addr, rep_addr)
373 sub_conn = connection % (port_base+1)
320 kernel_manager.sub_channel.start()
374
321 kernel_manager.xreq_channel.start()
375 # Create initial sockets
376 c = zmq.Context()
377 request_socket = c.socket(zmq.XREQ)
378 request_socket.connect(req_conn)
379 sub_socket = c.socket(zmq.SUB)
380 sub_socket.connect(sub_conn)
381 sub_socket.setsockopt(zmq.SUBSCRIBE, '')
382
322
383 # Launch application
323 # Launch application
384 app = QtGui.QApplication(sys.argv)
324 app = QtGui.QApplication(sys.argv)
385 widget = FrontendWidget(request_socket=request_socket,
325 widget = FrontendWidget(kernel_manager)
386 sub_socket=sub_socket)
387 widget.setWindowTitle('Python')
326 widget.setWindowTitle('Python')
388 widget.resize(640, 480)
327 widget.resize(640, 480)
389 widget.show()
328 widget.show()
@@ -5,29 +5,30 b''
5 from PyQt4 import QtCore
5 from PyQt4 import QtCore
6
6
7 # IPython imports.
7 # IPython imports.
8 from IPython.zmq.kernel_manager import KernelManager, SubSocketChannel, \
8 from IPython.zmq.kernelmanager import KernelManager, SubSocketChannel, \
9 XReqSocketChannel, RepSocketChannel
9 XReqSocketChannel, RepSocketChannel
10
10
11
11
12 class QtKernelManager(KernelManager):
13 """ A KernelManager that provides channels that use signals and slots.
14 """
15
16 sub_channel_class = QtSubSocketChannel
17 xreq_channel_class = QtXReqSocketChannel
18 rep_channel_class = QtRepSocketChannel
19
20
21 class QtSubSocketChannel(SubSocketChannel, QtCore.QObject):
12 class QtSubSocketChannel(SubSocketChannel, QtCore.QObject):
22
13
23 # Emitted when any message is received.
14 # Emitted when any message is received.
24 message_received = QtCore.pyqtSignal(dict)
15 message_received = QtCore.pyqtSignal(object)
25
16
26 # Emitted when a message of type 'pyout' or 'stdout' is received.
17 # Emitted when a message of type 'pyout' or 'stdout' is received.
27 output_received = QtCore.pyqtSignal(dict)
18 output_received = QtCore.pyqtSignal(object)
28
19
29 # Emitted when a message of type 'pyerr' or 'stderr' is received.
20 # Emitted when a message of type 'pyerr' or 'stderr' is received.
30 error_received = QtCore.pyqtSignal(dict)
21 error_received = QtCore.pyqtSignal(object)
22
23 #---------------------------------------------------------------------------
24 # 'object' interface
25 #---------------------------------------------------------------------------
26
27 def __init__(self, *args, **kw):
28 """ Reimplemented to ensure that QtCore.QObject is initialized first.
29 """
30 QtCore.QObject.__init__(self)
31 SubSocketChannel.__init__(self, *args, **kw)
31
32
32 #---------------------------------------------------------------------------
33 #---------------------------------------------------------------------------
33 # 'SubSocketChannel' interface
34 # 'SubSocketChannel' interface
@@ -50,12 +51,22 b' class QtSubSocketChannel(SubSocketChannel, QtCore.QObject):'
50 class QtXReqSocketChannel(XReqSocketChannel, QtCore.QObject):
51 class QtXReqSocketChannel(XReqSocketChannel, QtCore.QObject):
51
52
52 # Emitted when any message is received.
53 # Emitted when any message is received.
53 message_received = QtCore.pyqtSignal(dict)
54 message_received = QtCore.pyqtSignal(object)
54
55
55 # Emitted when a reply has been received for the corresponding request type.
56 # Emitted when a reply has been received for the corresponding request type.
56 execute_reply = QtCore.pyqtSignal(dict)
57 execute_reply = QtCore.pyqtSignal(object)
57 complete_reply = QtCore.pyqtSignal(dict)
58 complete_reply = QtCore.pyqtSignal(object)
58 object_info_reply = QtCore.pyqtSignal(dict)
59 object_info_reply = QtCore.pyqtSignal(object)
60
61 #---------------------------------------------------------------------------
62 # 'object' interface
63 #---------------------------------------------------------------------------
64
65 def __init__(self, *args, **kw):
66 """ Reimplemented to ensure that QtCore.QObject is initialized first.
67 """
68 QtCore.QObject.__init__(self)
69 XReqSocketChannel.__init__(self, *args, **kw)
59
70
60 #---------------------------------------------------------------------------
71 #---------------------------------------------------------------------------
61 # 'XReqSocketChannel' interface
72 # 'XReqSocketChannel' interface
@@ -73,7 +84,29 b' class QtXReqSocketChannel(XReqSocketChannel, QtCore.QObject):'
73 if signal:
84 if signal:
74 signal.emit(msg)
85 signal.emit(msg)
75
86
87 def _queue_request(self, msg, callback):
88 """ Reimplemented to skip callback handling.
89 """
90 self.command_queue.put(msg)
91
76
92
77 class QtRepSocketChannel(RepSocketChannel, QtCore.QObject):
93 class QtRepSocketChannel(RepSocketChannel, QtCore.QObject):
78
94
79 pass
95 #---------------------------------------------------------------------------
96 # 'object' interface
97 #---------------------------------------------------------------------------
98
99 def __init__(self, *args, **kw):
100 """ Reimplemented to ensure that QtCore.QObject is initialized first.
101 """
102 QtCore.QObject.__init__(self)
103 RepSocketChannel.__init__(self, *args, **kw)
104
105
106 class QtKernelManager(KernelManager):
107 """ A KernelManager that provides channels that use signals and slots.
108 """
109
110 sub_channel_class = QtSubSocketChannel
111 xreq_channel_class = QtXReqSocketChannel
112 rep_channel_class = QtRepSocketChannel
@@ -4,13 +4,18 b' TODO: Create logger to handle debugging and console messages.'
4
4
5 """
5 """
6
6
7 # Standard library imports.
7 from Queue import Queue, Empty
8 from Queue import Queue, Empty
8 from threading import Thread
9 from threading import Thread
9 import traceback
10 import traceback
10
11
12 # System library imports.
11 import zmq
13 import zmq
12 from zmq import POLLIN, POLLOUT, POLLERR
14 from zmq import POLLIN, POLLOUT, POLLERR
13 from zmq.eventloop import ioloop
15 from zmq.eventloop import ioloop
16
17 # Local imports.
18 from IPython.utils.traitlets import HasTraits, Any, Int, Instance, Str, Type
14 from session import Session
19 from session import Session
15
20
16
21
@@ -18,56 +23,6 b' class MissingHandlerError(Exception):'
18 pass
23 pass
19
24
20
25
21 class KernelManager(object):
22
23 sub_channel_class = SubSocketChannel
24 xreq_channel_class = XReqSocketChannel
25 rep_channel_class = RepSocketChannel
26
27 def __init__(self, xreq_addr, sub_addr, rep_addr,
28 context=None, session=None):
29 self.context = zmq.Context() if context is None else context
30 self.session = Session() if session is None else session
31 self.xreq_addr = xreq_addr
32 self.sub_addr = sub_addr
33 self.rep_addr = rep_addr
34
35 def start_kernel(self):
36 """Start a localhost kernel on ip and port.
37
38 The SUB channel is for the frontend to receive messages published by
39 the kernel.
40
41 The REQ channel is for the frontend to make requests of the kernel.
42
43 The REP channel is for the kernel to request stdin (raw_input) from
44 the frontend.
45 """
46
47 def kill_kernel(self):
48 """Kill the running kernel"""
49
50 def is_alive(self):
51 """Is the kernel alive?"""
52 return True
53
54 def signal_kernel(self, signum):
55 """Send signum to the kernel."""
56
57 def get_sub_channel(self):
58 """Get the SUB socket channel object."""
59 return self.sub_channel_class(self.context, self.session, self.sub_addr)
60
61 def get_xreq_channel(self):
62 """Get the REQ socket channel object to make requests of the kernel."""
63 return self.xreq_channel_class(self.context, self.session,
64 self.xreq_addr)
65
66 def get_rep_channel(self):
67 """Get the REP socket channel object to handle stdin (raw_input)."""
68 return self.rep_channel_class(self.context, self.session, self.rep_addr)
69
70
71 class ZmqSocketChannel(Thread):
26 class ZmqSocketChannel(Thread):
72
27
73 socket = None
28 socket = None
@@ -231,7 +186,8 b' class XReqSocketChannel(ZmqSocketChannel):'
231 return callback
186 return callback
232 handler = self.handlers.get(name)
187 handler = self.handlers.get(name)
233 if handler is None:
188 if handler is None:
234 raise MissingHandlerError('No handler defined for method: %s' % name)
189 raise MissingHandlerError(
190 'No handler defined for method: %s' % name)
235 return handler
191 return handler
236
192
237 def override_call_handler(self, func):
193 def override_call_handler(self, func):
@@ -273,3 +229,90 b' class RepSocketChannel(ZmqSocketChannel):'
273
229
274 def on_raw_input():
230 def on_raw_input():
275 pass
231 pass
232
233
234 class KernelManager(HasTraits):
235
236 # The addresses to use for the various channels. Should be tuples of form
237 # (ip_address, port).
238 sub_address = Any
239 xreq_address = Any
240 rep_address = Any
241 # FIXME: Add Tuple to Traitlets.
242 #sub_address = Tuple(Str, Int)
243 #xreq_address = Tuple(Str, Int)
244 #rep_address = Tuple(Str, Int)
245
246 # The PyZMQ Context to use for communication with the kernel.
247 context = Instance(zmq.Context, ())
248
249 # The Session to use for communication with the kernel.
250 session = Instance(Session, ())
251
252 # The classes to use for the various channels.
253 sub_channel_class = Type(SubSocketChannel)
254 xreq_channel_class = Type(XReqSocketChannel)
255 rep_channel_class = Type(RepSocketChannel)
256
257 # Protected traits.
258 _sub_channel = Any
259 _xreq_channel = Any
260 _rep_channel = Any
261
262 def __init__(self, xreq_address, sub_address, rep_address, **traits):
263 super(KernelManager, self).__init__()
264
265 self.xreq_address = xreq_address
266 self.sub_address = sub_address
267 self.rep_address = rep_address
268
269 # FIXME: This should be the business of HasTraits. The convention is:
270 # HasTraits.__init__(self, **traits_to_be_initialized.)
271 for trait in traits:
272 setattr(self, trait, traits[trait])
273
274 def start_kernel(self):
275 """Start a localhost kernel on ip and port.
276
277 The SUB channel is for the frontend to receive messages published by
278 the kernel.
279
280 The REQ channel is for the frontend to make requests of the kernel.
281
282 The REP channel is for the kernel to request stdin (raw_input) from
283 the frontend.
284 """
285
286 def kill_kernel(self):
287 """Kill the running kernel"""
288
289 def is_alive(self):
290 """Is the kernel alive?"""
291 return True
292
293 def signal_kernel(self, signum):
294 """Send signum to the kernel."""
295
296 @property
297 def sub_channel(self):
298 """Get the SUB socket channel object."""
299 if self._sub_channel is None:
300 self._sub_channel = self.sub_channel_class(
301 self.context, self.session, self.sub_address)
302 return self._sub_channel
303
304 @property
305 def xreq_channel(self):
306 """Get the REQ socket channel object to make requests of the kernel."""
307 if self._xreq_channel is None:
308 self._xreq_channel = self.xreq_channel_class(
309 self.context, self.session, self.xreq_address)
310 return self._xreq_channel
311
312 @property
313 def rep_channel(self):
314 """Get the REP socket channel object to handle stdin (raw_input)."""
315 if self._rep_channel is None:
316 self._rep_channel = self.rep_channel_class(
317 self.context, self.session, self.rep_address)
318 return self._rep_channel
@@ -9,8 +9,8 b" rep_addr = ('127.0.0.1', 5577)"
9
9
10
10
11 km = KernelManager(xreq_addr, sub_addr, rep_addr)
11 km = KernelManager(xreq_addr, sub_addr, rep_addr)
12 # xreq_channel = km.get_xreq_channel()
12 # xreq_channel = km.xreq_channel
13 sub_channel = km.get_sub_channel()
13 sub_channel = km.sub_channel
14
14
15 # xreq_channel.start()
15 # xreq_channel.start()
16 sub_channel.start()
16 sub_channel.start()
General Comments 0
You need to be logged in to leave comments. Login now