##// END OF EJS Templates
* Refactored KernelManager to use Traitlets and to have its channels as attributes...
epatters -
Show More
@@ -1,391 +1,330 b''
1 1 # Standard library imports
2 2 from codeop import CommandCompiler
3 3 from threading import Thread
4 4 import time
5 5 import types
6 6
7 7 # System library imports
8 8 from pygments.lexers import PythonLexer
9 9 from PyQt4 import QtCore, QtGui
10 10 import zmq
11 11
12 # IPython imports.
13 from IPython.zmq.session import Message, Session
14
15 12 # Local imports
16 13 from call_tip_widget import CallTipWidget
17 14 from completion_lexer import CompletionLexer
18 15 from console_widget import HistoryConsoleWidget
19 16 from pygments_highlighter import PygmentsHighlighter
20 17
21 18
22 class FrontendReplyThread(Thread, QtCore.QObject):
23 """ A Thread that receives a reply from the kernel for the frontend.
24 """
25
26 finished = QtCore.pyqtSignal()
27 output_received = QtCore.pyqtSignal(Message)
28 reply_received = QtCore.pyqtSignal(Message)
29
30 def __init__(self, parent):
31 """ Create a FrontendReplyThread for the specified frontend.
32 """
33 assert isinstance(parent, FrontendWidget)
34 QtCore.QObject.__init__(self, parent)
35 Thread.__init__(self)
36
37 self.sleep_time = 0.05
38
39 def run(self):
40 """ The starting point for the thread.
41 """
42 frontend = self.parent()
43 while True:
44 rep = frontend._recv_reply()
45 if rep is not None:
46 self._recv_output()
47 self.reply_received.emit(rep)
48 break
49
50 self._recv_output()
51 time.sleep(self.sleep_time)
52
53 self.finished.emit()
54
55 def _recv_output(self):
56 """ Send any output to the frontend.
57 """
58 frontend = self.parent()
59 omsgs = frontend._recv_output()
60 for omsg in omsgs:
61 self.output_received.emit(omsg)
62
63
64 19 class FrontendHighlighter(PygmentsHighlighter):
65 20 """ A Python PygmentsHighlighter that can be turned on and off and which
66 21 knows about continuation prompts.
67 22 """
68 23
69 24 def __init__(self, frontend):
70 25 PygmentsHighlighter.__init__(self, frontend.document(), PythonLexer())
71 26 self._current_offset = 0
72 27 self._frontend = frontend
73 28 self.highlighting_on = False
74 29
75 30 def highlightBlock(self, qstring):
76 31 """ Highlight a block of text. Reimplemented to highlight selectively.
77 32 """
78 33 if self.highlighting_on:
79 34 for prompt in (self._frontend._prompt,
80 35 self._frontend.continuation_prompt):
81 36 if qstring.startsWith(prompt):
82 37 qstring.remove(0, len(prompt))
83 38 self._current_offset = len(prompt)
84 39 break
85 40 PygmentsHighlighter.highlightBlock(self, qstring)
86 41
87 42 def setFormat(self, start, count, format):
88 43 """ Reimplemented to avoid highlighting continuation prompts.
89 44 """
90 45 start += self._current_offset
91 46 PygmentsHighlighter.setFormat(self, start, count, format)
92 47
93 48
94 49 class FrontendWidget(HistoryConsoleWidget):
95 50 """ A Qt frontend for an IPython kernel.
96 51 """
97 52
98 53 # Emitted when an 'execute_reply' is received from the kernel.
99 executed = QtCore.pyqtSignal(Message)
54 executed = QtCore.pyqtSignal(object)
100 55
101 56 #---------------------------------------------------------------------------
102 57 # 'QWidget' interface
103 58 #---------------------------------------------------------------------------
104 59
105 60 def __init__(self, kernel_manager, parent=None):
106 61 super(FrontendWidget, self).__init__(parent)
107 62
108 63 self._call_tip_widget = CallTipWidget(self)
109 64 self._compile = CommandCompiler()
110 65 self._completion_lexer = CompletionLexer(PythonLexer())
111 66 self._highlighter = FrontendHighlighter(self)
67 self._kernel_manager = None
112 68
113 69 self.document().contentsChange.connect(self._document_contents_change)
114 70
115 71 self.continuation_prompt = '... '
116 72 self.kernel_manager = kernel_manager
117 73
118 74 def focusOutEvent(self, event):
119 75 """ Reimplemented to hide calltips.
120 76 """
121 77 self._call_tip_widget.hide()
122 78 return super(FrontendWidget, self).focusOutEvent(event)
123 79
124 80 def keyPressEvent(self, event):
125 81 """ Reimplemented to hide calltips.
126 82 """
127 83 if event.key() == QtCore.Qt.Key_Escape:
128 84 self._call_tip_widget.hide()
129 85 return super(FrontendWidget, self).keyPressEvent(event)
130 86
131 87 #---------------------------------------------------------------------------
132 88 # 'ConsoleWidget' abstract interface
133 89 #---------------------------------------------------------------------------
134 90
135 91 def _execute(self, interactive):
136 92 """ Called to execute the input buffer. When triggered by an the enter
137 93 key press, 'interactive' is True; otherwise, it is False. Returns
138 94 whether the input buffer was completely processed and a new prompt
139 95 created.
140 96 """
141 97 return self.execute_source(self.input_buffer, interactive=interactive)
142 98
143 99 def _prompt_started_hook(self):
144 100 """ Called immediately after a new prompt is displayed.
145 101 """
146 102 self._highlighter.highlighting_on = True
147 103
148 104 def _prompt_finished_hook(self):
149 105 """ Called immediately after a prompt is finished, i.e. when some input
150 106 will be processed and a new prompt displayed.
151 107 """
152 108 self._highlighter.highlighting_on = False
153 109
154 110 def _tab_pressed(self):
155 111 """ Called when the tab key is pressed. Returns whether to continue
156 112 processing the event.
157 113 """
158 114 self._keep_cursor_in_buffer()
159 115 cursor = self.textCursor()
160 116 if not self._complete():
161 117 cursor.insertText(' ')
162 118 return False
163 119
164 120 #---------------------------------------------------------------------------
165 121 # 'FrontendWidget' interface
166 122 #---------------------------------------------------------------------------
167 123
168 124 def execute_source(self, source, hidden=False, interactive=False):
169 125 """ Execute a string containing Python code. If 'hidden', no output is
170 126 shown. Returns whether the source executed (i.e., returns True only
171 127 if no more input is necessary).
172 128 """
173 129 try:
174 130 code = self._compile(source, symbol='single')
175 131 except (OverflowError, SyntaxError, ValueError):
176 132 # Just let IPython deal with the syntax error.
177 133 code = Exception
178 134
179 135 # Only execute interactive multiline input if it ends with a blank line
180 136 lines = source.splitlines()
181 137 if interactive and len(lines) > 1 and lines[-1].strip() != '':
182 138 code = None
183 139
184 140 executed = code is not None
185 141 if executed:
186 msg = self.session.send(self.request_socket, 'execute_request',
187 dict(code=source))
188 thread = FrontendReplyThread(self)
189 if not hidden:
190 thread.output_received.connect(self._handle_output)
191 thread.reply_received.connect(self._handle_reply)
192 thread.finished.connect(thread.deleteLater)
193 thread.start()
142 self.kernel_manager.xreq_channel.execute(source)
194 143 else:
195 144 space = 0
196 145 for char in lines[-1]:
197 146 if char == '\t':
198 147 space += 4
199 148 elif char == ' ':
200 149 space += 1
201 150 else:
202 151 break
203 152 if source.endswith(':') or source.endswith(':\n'):
204 153 space += 4
205 154 self._show_continuation_prompt()
206 155 self.appendPlainText(' ' * space)
207 156
208 157 return executed
209 158
210 159 def execute_file(self, path, hidden=False):
211 160 """ Attempts to execute file with 'path'. If 'hidden', no output is
212 161 shown.
213 162 """
214 163 self.execute_source('run %s' % path, hidden=hidden)
215 164
216 165 def _get_kernel_manager(self):
217 166 """ Returns the current kernel manager.
218 167 """
219 168 return self._kernel_manager
220 169
221 170 def _set_kernel_manager(self, kernel_manager):
222 171 """ Sets a new kernel manager, configuring its channels as necessary.
223 172 """
173 # Disconnect the old kernel manager.
174 if self._kernel_manager is not None:
175 sub = self._kernel_manager.sub_channel
176 xreq = self._kernel_manager.xreq_channel
177 sub.message_received.disconnect(self._handle_sub)
178 xreq.execute_reply.disconnect(self._handle_execute_reply)
179 xreq.complete_reply.disconnect(self._handle_complete_reply)
180 xreq.object_info_reply.disconnect(self._handle_object_info_reply)
181
182 # Connect the new kernel manager.
224 183 self._kernel_manager = kernel_manager
225 self._sub_channel = kernel_manager.get_sub_channel()
226 self._xreq_channel = kernel_manager.get_xreq_channel()
184 sub = kernel_manager.sub_channel
185 xreq = kernel_manager.xreq_channel
186 sub.message_received.connect(self._handle_sub)
187 xreq.execute_reply.connect(self._handle_execute_reply)
188 #xreq.complete_reply.connect(self._handle_complete_reply)
189 #xreq.object_info_repy.connect(self._handle_object_info_reply)
190
191 self._show_prompt('>>> ')
227 192
228 193 kernel_manager = property(_get_kernel_manager, _set_kernel_manager)
229 194
230 195 #---------------------------------------------------------------------------
231 196 # 'FrontendWidget' protected interface
232 197 #---------------------------------------------------------------------------
233 198
234 199 def _call_tip(self):
235 200 """ Shows a call tip, if appropriate, at the current cursor location.
236 201 """
237 202 # Decide if it makes sense to show a call tip
238 203 cursor = self.textCursor()
239 204 cursor.movePosition(QtGui.QTextCursor.Left)
240 205 document = self.document()
241 206 if document.characterAt(cursor.position()).toAscii() != '(':
242 207 return False
243 208 context = self._get_context(cursor)
244 209 if not context:
245 210 return False
246 211
247 212 # Send the metadata request to the kernel
248 213 text = '.'.join(context)
249 214 msg = self.session.send(self.request_socket, 'metadata_request',
250 215 dict(context=text))
251 216
252 217 # Give the kernel some time to respond
253 218 rep = self._recv_reply_now('metadata_reply')
254 219 doc = rep.content.docstring if rep else ''
255 220
256 221 # Show the call tip
257 222 if doc:
258 223 self._call_tip_widget.show_tip(doc)
259 224 return True
260 225
261 226 def _complete(self):
262 227 """ Performs completion at the current cursor location.
263 228 """
264 229 # Decide if it makes sense to do completion
265 230 context = self._get_context()
266 231 if not context:
267 232 return False
268 233
269 234 # Send the completion request to the kernel
270 235 text = '.'.join(context)
271 236 line = self.input_buffer_cursor_line
272 237 msg = self.session.send(self.request_socket, 'complete_request',
273 238 dict(text=text, line=line))
274 239
275 240 # Give the kernel some time to respond
276 241 rep = self._recv_reply_now('complete_reply')
277 242 matches = rep.content.matches if rep else []
278 243
279 244 # Show the completion at the correct location
280 245 cursor = self.textCursor()
281 246 cursor.movePosition(QtGui.QTextCursor.Left, n=len(text))
282 247 self._complete_with_items(cursor, matches)
283 248 return True
284 249
285 def _kernel_connected(self):
286 """ Called when the frontend is connected to a kernel.
287 """
288 self._show_prompt('>>> ')
289
290 250 def _get_context(self, cursor=None):
291 251 """ Gets the context at the current cursor location.
292 252 """
293 253 if cursor is None:
294 254 cursor = self.textCursor()
295 255 cursor.movePosition(QtGui.QTextCursor.StartOfLine,
296 256 QtGui.QTextCursor.KeepAnchor)
297 257 text = unicode(cursor.selectedText())
298 258 return self._completion_lexer.get_context(text)
299 259
300 260 #------ Signal handlers ----------------------------------------------------
301 261
302 262 def _document_contents_change(self, position, removed, added):
303 263 """ Called whenever the document's content changes. Display a calltip
304 264 if appropriate.
305 265 """
306 266 # Calculate where the cursor should be *after* the change:
307 267 position += added
308 268
309 269 document = self.document()
310 270 if position == self.textCursor().position():
311 271 self._call_tip()
312 272
313 def _handle_output(self, omsg):
314 handler = getattr(self, '_handle_%s' % omsg.msg_type, None)
273 def _handle_sub(self, omsg):
274 handler = getattr(self, '_handle_%s' % omsg['msg_type'], None)
315 275 if handler is not None:
316 276 handler(omsg)
317 277
318 278 def _handle_pyout(self, omsg):
319 if omsg.parent_header.session == self.session.session:
320 self.appendPlainText(omsg.content.data + '\n')
279 session = omsg['parent_header']['session']
280 if session == self.kernel_manager.session.session:
281 self.appendPlainText(omsg['content']['data'] + '\n')
321 282
322 283 def _handle_stream(self, omsg):
323 self.appendPlainText(omsg.content.data)
284 self.appendPlainText(omsg['content']['data'])
324 285
325 def _handle_reply(self, rep):
326 if rep is not None:
327 if rep.msg_type == 'execute_reply':
328 if rep.content.status == 'error':
329 self.appendPlainText(rep.content.traceback[-1])
330 elif rep.content.status == 'aborted':
331 text = "ERROR: ABORTED\n"
332 ab = self.messages[rep.parent_header.msg_id].content
333 if 'code' in ab:
334 text += ab.code
335 else:
336 text += ab
337 self.appendPlainText(text)
338 self._show_prompt('>>> ')
339 self.executed.emit(rep)
286 def _handle_execute_reply(self, rep):
287 content = rep['content']
288 status = content['status']
289 if status == 'error':
290 self.appendPlainText(content['traceback'][-1])
291 elif status == 'aborted':
292 text = "ERROR: ABORTED\n"
293 self.appendPlainText(text)
294 self._show_prompt('>>> ')
295 self.executed.emit(rep)
340 296
341 297 #------ Communication methods ----------------------------------------------
342 298
343 def _recv_output(self):
344 omsgs = []
345 while True:
346 omsg = self.session.recv(self.sub_socket)
347 if omsg is None:
348 break
349 else:
350 omsgs.append(omsg)
351 return omsgs
352
353 299 def _recv_reply(self):
354 300 return self.session.recv(self.request_socket)
355 301
356 302 def _recv_reply_now(self, msg_type):
357 303 for i in xrange(5):
358 304 rep = self._recv_reply()
359 305 if rep is not None and rep.msg_type == msg_type:
360 306 return rep
361 307 time.sleep(0.1)
362 308 return None
363 309
364 310
365 311 if __name__ == '__main__':
366 312 import sys
313 from IPython.frontend.qt.kernelmanager import QtKernelManager
367 314
368 # Defaults
369 ip = '127.0.0.1'
370 port_base = 5555
371 connection = ('tcp://%s' % ip) + ':%i'
372 req_conn = connection % port_base
373 sub_conn = connection % (port_base+1)
374
375 # Create initial sockets
376 c = zmq.Context()
377 request_socket = c.socket(zmq.XREQ)
378 request_socket.connect(req_conn)
379 sub_socket = c.socket(zmq.SUB)
380 sub_socket.connect(sub_conn)
381 sub_socket.setsockopt(zmq.SUBSCRIBE, '')
315 # Create KernelManager
316 xreq_addr = ('127.0.0.1', 5575)
317 sub_addr = ('127.0.0.1', 5576)
318 rep_addr = ('127.0.0.1', 5577)
319 kernel_manager = QtKernelManager(xreq_addr, sub_addr, rep_addr)
320 kernel_manager.sub_channel.start()
321 kernel_manager.xreq_channel.start()
382 322
383 323 # Launch application
384 324 app = QtGui.QApplication(sys.argv)
385 widget = FrontendWidget(request_socket=request_socket,
386 sub_socket=sub_socket)
325 widget = FrontendWidget(kernel_manager)
387 326 widget.setWindowTitle('Python')
388 327 widget.resize(640, 480)
389 328 widget.show()
390 329 sys.exit(app.exec_())
391 330
@@ -1,79 +1,112 b''
1 1 """ A KernelManager that provides channels that use signals and slots.
2 2 """
3 3
4 4 # System library imports.
5 5 from PyQt4 import QtCore
6 6
7 7 # IPython imports.
8 from IPython.zmq.kernel_manager import KernelManager, SubSocketChannel, \
8 from IPython.zmq.kernelmanager import KernelManager, SubSocketChannel, \
9 9 XReqSocketChannel, RepSocketChannel
10 10
11 11
12 class QtKernelManager(KernelManager):
13 """ A KernelManager that provides channels that use signals and slots.
14 """
15
16 sub_channel_class = QtSubSocketChannel
17 xreq_channel_class = QtXReqSocketChannel
18 rep_channel_class = QtRepSocketChannel
19
20
21 12 class QtSubSocketChannel(SubSocketChannel, QtCore.QObject):
22 13
23 14 # Emitted when any message is received.
24 message_received = QtCore.pyqtSignal(dict)
15 message_received = QtCore.pyqtSignal(object)
25 16
26 17 # Emitted when a message of type 'pyout' or 'stdout' is received.
27 output_received = QtCore.pyqtSignal(dict)
18 output_received = QtCore.pyqtSignal(object)
28 19
29 20 # Emitted when a message of type 'pyerr' or 'stderr' is received.
30 error_received = QtCore.pyqtSignal(dict)
21 error_received = QtCore.pyqtSignal(object)
22
23 #---------------------------------------------------------------------------
24 # 'object' interface
25 #---------------------------------------------------------------------------
26
27 def __init__(self, *args, **kw):
28 """ Reimplemented to ensure that QtCore.QObject is initialized first.
29 """
30 QtCore.QObject.__init__(self)
31 SubSocketChannel.__init__(self, *args, **kw)
31 32
32 33 #---------------------------------------------------------------------------
33 34 # 'SubSocketChannel' interface
34 35 #---------------------------------------------------------------------------
35 36
36 37 def call_handlers(self, msg):
37 38 """ Reimplemented to emit signals instead of making callbacks.
38 39 """
39 40 # Emit the generic signal.
40 41 self.message_received.emit(msg)
41 42
42 43 # Emit signals for specialized message types.
43 44 msg_type = msg['msg_type']
44 45 if msg_type in ('pyout', 'stdout'):
45 46 self.output_received.emit(msg)
46 47 elif msg_type in ('pyerr', 'stderr'):
47 48 self.error_received.emit(msg)
48 49
49 50
50 51 class QtXReqSocketChannel(XReqSocketChannel, QtCore.QObject):
51 52
52 53 # Emitted when any message is received.
53 message_received = QtCore.pyqtSignal(dict)
54 message_received = QtCore.pyqtSignal(object)
54 55
55 56 # Emitted when a reply has been received for the corresponding request type.
56 execute_reply = QtCore.pyqtSignal(dict)
57 complete_reply = QtCore.pyqtSignal(dict)
58 object_info_reply = QtCore.pyqtSignal(dict)
57 execute_reply = QtCore.pyqtSignal(object)
58 complete_reply = QtCore.pyqtSignal(object)
59 object_info_reply = QtCore.pyqtSignal(object)
60
61 #---------------------------------------------------------------------------
62 # 'object' interface
63 #---------------------------------------------------------------------------
64
65 def __init__(self, *args, **kw):
66 """ Reimplemented to ensure that QtCore.QObject is initialized first.
67 """
68 QtCore.QObject.__init__(self)
69 XReqSocketChannel.__init__(self, *args, **kw)
59 70
60 71 #---------------------------------------------------------------------------
61 72 # 'XReqSocketChannel' interface
62 73 #---------------------------------------------------------------------------
63 74
64 75 def call_handlers(self, msg):
65 76 """ Reimplemented to emit signals instead of making callbacks.
66 77 """
67 78 # Emit the generic signal.
68 79 self.message_received.emit(msg)
69 80
70 81 # Emit signals for specialized message types.
71 82 msg_type = msg['msg_type']
72 83 signal = getattr(self, msg_type, None)
73 84 if signal:
74 85 signal.emit(msg)
75 86
87 def _queue_request(self, msg, callback):
88 """ Reimplemented to skip callback handling.
89 """
90 self.command_queue.put(msg)
91
76 92
77 93 class QtRepSocketChannel(RepSocketChannel, QtCore.QObject):
78 94
79 pass
95 #---------------------------------------------------------------------------
96 # 'object' interface
97 #---------------------------------------------------------------------------
98
99 def __init__(self, *args, **kw):
100 """ Reimplemented to ensure that QtCore.QObject is initialized first.
101 """
102 QtCore.QObject.__init__(self)
103 RepSocketChannel.__init__(self, *args, **kw)
104
105
106 class QtKernelManager(KernelManager):
107 """ A KernelManager that provides channels that use signals and slots.
108 """
109
110 sub_channel_class = QtSubSocketChannel
111 xreq_channel_class = QtXReqSocketChannel
112 rep_channel_class = QtRepSocketChannel
@@ -1,275 +1,318 b''
1 1 """Kernel frontend classes.
2 2
3 3 TODO: Create logger to handle debugging and console messages.
4 4
5 5 """
6 6
7 # Standard library imports.
7 8 from Queue import Queue, Empty
8 9 from threading import Thread
9 10 import traceback
10 11
12 # System library imports.
11 13 import zmq
12 14 from zmq import POLLIN, POLLOUT, POLLERR
13 15 from zmq.eventloop import ioloop
16
17 # Local imports.
18 from IPython.utils.traitlets import HasTraits, Any, Int, Instance, Str, Type
14 19 from session import Session
15 20
16 21
17 22 class MissingHandlerError(Exception):
18 23 pass
19 24
20 25
21 class KernelManager(object):
22
23 sub_channel_class = SubSocketChannel
24 xreq_channel_class = XReqSocketChannel
25 rep_channel_class = RepSocketChannel
26
27 def __init__(self, xreq_addr, sub_addr, rep_addr,
28 context=None, session=None):
29 self.context = zmq.Context() if context is None else context
30 self.session = Session() if session is None else session
31 self.xreq_addr = xreq_addr
32 self.sub_addr = sub_addr
33 self.rep_addr = rep_addr
34
35 def start_kernel(self):
36 """Start a localhost kernel on ip and port.
37
38 The SUB channel is for the frontend to receive messages published by
39 the kernel.
40
41 The REQ channel is for the frontend to make requests of the kernel.
42
43 The REP channel is for the kernel to request stdin (raw_input) from
44 the frontend.
45 """
46
47 def kill_kernel(self):
48 """Kill the running kernel"""
49
50 def is_alive(self):
51 """Is the kernel alive?"""
52 return True
53
54 def signal_kernel(self, signum):
55 """Send signum to the kernel."""
56
57 def get_sub_channel(self):
58 """Get the SUB socket channel object."""
59 return self.sub_channel_class(self.context, self.session, self.sub_addr)
60
61 def get_xreq_channel(self):
62 """Get the REQ socket channel object to make requests of the kernel."""
63 return self.xreq_channel_class(self.context, self.session,
64 self.xreq_addr)
65
66 def get_rep_channel(self):
67 """Get the REP socket channel object to handle stdin (raw_input)."""
68 return self.rep_channel_class(self.context, self.session, self.rep_addr)
69
70
71 26 class ZmqSocketChannel(Thread):
72 27
73 28 socket = None
74 29
75 30 def __init__(self, context, session, addr):
76 31 self.context = context
77 32 self.session = session
78 33 self.addr = addr
79 34 super(ZmqSocketChannel, self).__init__()
80 35 self.daemon = True
81 36
82 37
83 38 class SubSocketChannel(ZmqSocketChannel):
84 39
85 40 handlers = None
86 41 _overriden_call_handler = None
87 42
88 43 def __init__(self, context, session, addr):
89 44 self.handlers = {}
90 45 super(SubSocketChannel, self).__init__(context, session, addr)
91 46
92 47 def run(self):
93 48 self.socket = self.context.socket(zmq.SUB)
94 49 self.socket.setsockopt(zmq.SUBSCRIBE,'')
95 50 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
96 51 self.socket.connect('tcp://%s:%i' % self.addr)
97 52 self.ioloop = ioloop.IOLoop()
98 53 self.ioloop.add_handler(self.socket, self._handle_events,
99 54 POLLIN|POLLERR)
100 55 self.ioloop.start()
101 56
102 57 def _handle_events(self, socket, events):
103 58 # Turn on and off POLLOUT depending on if we have made a request
104 59 if events & POLLERR:
105 60 self._handle_err()
106 61 if events & POLLIN:
107 62 self._handle_recv()
108 63
109 64 def _handle_err(self):
110 65 raise zmq.ZmqError()
111 66
112 67 def _handle_recv(self):
113 68 msg = self.socket.recv_json()
114 69 self.call_handlers(msg)
115 70
116 71 def override_call_handler(self, func):
117 72 """Permanently override the call_handler.
118 73
119 74 The function func will be called as::
120 75
121 76 func(handler, msg)
122 77
123 78 And must call::
124 79
125 80 handler(msg)
126 81
127 82 in the main thread.
128 83 """
129 84 assert callable(func), "not a callable: %r" % func
130 85 self._overriden_call_handler = func
131 86
132 87 def call_handlers(self, msg):
133 88 handler = self.handlers.get(msg['msg_type'], None)
134 89 if handler is not None:
135 90 try:
136 91 self.call_handler(handler, msg)
137 92 except:
138 93 # XXX: This should be logged at least
139 94 traceback.print_last()
140 95
141 96 def call_handler(self, handler, msg):
142 97 if self._overriden_call_handler is not None:
143 98 self._overriden_call_handler(handler, msg)
144 99 elif hasattr(self, '_call_handler'):
145 100 call_handler = getattr(self, '_call_handler')
146 101 call_handler(handler, msg)
147 102 else:
148 103 raise RuntimeError('no handler!')
149 104
150 105 def add_handler(self, callback, msg_type):
151 106 """Register a callback for msg type."""
152 107 self.handlers[msg_type] = callback
153 108
154 109 def remove_handler(self, msg_type):
155 110 self.handlers.pop(msg_type, None)
156 111
157 112
158 113 class XReqSocketChannel(ZmqSocketChannel):
159 114
160 115 handler_queue = None
161 116 command_queue = None
162 117 handlers = None
163 118 _overriden_call_handler = None
164 119
165 120 def __init__(self, context, session, addr):
166 121 self.handlers = {}
167 122 self.handler_queue = Queue()
168 123 self.command_queue = Queue()
169 124 super(XReqSocketChannel, self).__init__(context, session, addr)
170 125
171 126 def run(self):
172 127 self.socket = self.context.socket(zmq.XREQ)
173 128 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
174 129 self.socket.connect('tcp://%s:%i' % self.addr)
175 130 self.ioloop = ioloop.IOLoop()
176 131 self.ioloop.add_handler(self.socket, self._handle_events,
177 132 POLLIN|POLLOUT|POLLERR)
178 133 self.ioloop.start()
179 134
180 135 def _handle_events(self, socket, events):
181 136 # Turn on and off POLLOUT depending on if we have made a request
182 137 if events & POLLERR:
183 138 self._handle_err()
184 139 if events & POLLOUT:
185 140 self._handle_send()
186 141 if events & POLLIN:
187 142 self._handle_recv()
188 143
189 144 def _handle_recv(self):
190 145 msg = self.socket.recv_json()
191 146 self.call_handlers(msg)
192 147
193 148 def _handle_send(self):
194 149 try:
195 150 msg = self.command_queue.get(False)
196 151 except Empty:
197 152 pass
198 153 else:
199 154 self.socket.send_json(msg)
200 155
201 156 def _handle_err(self):
202 157 raise zmq.ZmqError()
203 158
204 159 def _queue_request(self, msg, callback):
205 160 handler = self._find_handler(msg['msg_type'], callback)
206 161 self.handler_queue.put(handler)
207 162 self.command_queue.put(msg)
208 163
209 164 def execute(self, code, callback=None):
210 165 # Create class for content/msg creation. Related to, but possibly
211 166 # not in Session.
212 167 content = dict(code=code)
213 168 msg = self.session.msg('execute_request', content)
214 169 self._queue_request(msg, callback)
215 170 return msg['header']['msg_id']
216 171
217 172 def complete(self, text, line, block=None, callback=None):
218 173 content = dict(text=text, line=line)
219 174 msg = self.session.msg('complete_request', content)
220 175 self._queue_request(msg, callback)
221 176 return msg['header']['msg_id']
222 177
223 178 def object_info(self, oname, callback=None):
224 179 content = dict(oname=oname)
225 180 msg = self.session.msg('object_info_request', content)
226 181 self._queue_request(msg, callback)
227 182 return msg['header']['msg_id']
228 183
229 184 def _find_handler(self, name, callback):
230 185 if callback is not None:
231 186 return callback
232 187 handler = self.handlers.get(name)
233 188 if handler is None:
234 raise MissingHandlerError('No handler defined for method: %s' % name)
189 raise MissingHandlerError(
190 'No handler defined for method: %s' % name)
235 191 return handler
236 192
237 193 def override_call_handler(self, func):
238 194 """Permanently override the call_handler.
239 195
240 196 The function func will be called as::
241 197
242 198 func(handler, msg)
243 199
244 200 And must call::
245 201
246 202 handler(msg)
247 203
248 204 in the main thread.
249 205 """
250 206 assert callable(func), "not a callable: %r" % func
251 207 self._overriden_call_handler = func
252 208
253 209 def call_handlers(self, msg):
254 210 try:
255 211 handler = self.handler_queue.get(False)
256 212 except Empty:
257 213 print "Message received with no handler!!!"
258 214 print msg
259 215 else:
260 216 self.call_handler(handler, msg)
261 217
262 218 def call_handler(self, handler, msg):
263 219 if self._overriden_call_handler is not None:
264 220 self._overriden_call_handler(handler, msg)
265 221 elif hasattr(self, '_call_handler'):
266 222 call_handler = getattr(self, '_call_handler')
267 223 call_handler(handler, msg)
268 224 else:
269 225 raise RuntimeError('no handler!')
270 226
271 227
272 228 class RepSocketChannel(ZmqSocketChannel):
273 229
274 230 def on_raw_input():
275 231 pass
232
233
234 class KernelManager(HasTraits):
235
236 # The addresses to use for the various channels. Should be tuples of form
237 # (ip_address, port).
238 sub_address = Any
239 xreq_address = Any
240 rep_address = Any
241 # FIXME: Add Tuple to Traitlets.
242 #sub_address = Tuple(Str, Int)
243 #xreq_address = Tuple(Str, Int)
244 #rep_address = Tuple(Str, Int)
245
246 # The PyZMQ Context to use for communication with the kernel.
247 context = Instance(zmq.Context, ())
248
249 # The Session to use for communication with the kernel.
250 session = Instance(Session, ())
251
252 # The classes to use for the various channels.
253 sub_channel_class = Type(SubSocketChannel)
254 xreq_channel_class = Type(XReqSocketChannel)
255 rep_channel_class = Type(RepSocketChannel)
256
257 # Protected traits.
258 _sub_channel = Any
259 _xreq_channel = Any
260 _rep_channel = Any
261
262 def __init__(self, xreq_address, sub_address, rep_address, **traits):
263 super(KernelManager, self).__init__()
264
265 self.xreq_address = xreq_address
266 self.sub_address = sub_address
267 self.rep_address = rep_address
268
269 # FIXME: This should be the business of HasTraits. The convention is:
270 # HasTraits.__init__(self, **traits_to_be_initialized.)
271 for trait in traits:
272 setattr(self, trait, traits[trait])
273
274 def start_kernel(self):
275 """Start a localhost kernel on ip and port.
276
277 The SUB channel is for the frontend to receive messages published by
278 the kernel.
279
280 The REQ channel is for the frontend to make requests of the kernel.
281
282 The REP channel is for the kernel to request stdin (raw_input) from
283 the frontend.
284 """
285
286 def kill_kernel(self):
287 """Kill the running kernel"""
288
289 def is_alive(self):
290 """Is the kernel alive?"""
291 return True
292
293 def signal_kernel(self, signum):
294 """Send signum to the kernel."""
295
296 @property
297 def sub_channel(self):
298 """Get the SUB socket channel object."""
299 if self._sub_channel is None:
300 self._sub_channel = self.sub_channel_class(
301 self.context, self.session, self.sub_address)
302 return self._sub_channel
303
304 @property
305 def xreq_channel(self):
306 """Get the REQ socket channel object to make requests of the kernel."""
307 if self._xreq_channel is None:
308 self._xreq_channel = self.xreq_channel_class(
309 self.context, self.session, self.xreq_address)
310 return self._xreq_channel
311
312 @property
313 def rep_channel(self):
314 """Get the REP socket channel object to handle stdin (raw_input)."""
315 if self._rep_channel is None:
316 self._rep_channel = self.rep_channel_class(
317 self.context, self.session, self.rep_address)
318 return self._rep_channel
@@ -1,52 +1,52 b''
1 1 from Queue import Queue, Empty
2 2 import time
3 3
4 4 from kernelmanager import KernelManager
5 5
6 6 xreq_addr = ('127.0.0.1',5575)
7 7 sub_addr = ('127.0.0.1', 5576)
8 8 rep_addr = ('127.0.0.1', 5577)
9 9
10 10
11 11 km = KernelManager(xreq_addr, sub_addr, rep_addr)
12 # xreq_channel = km.get_xreq_channel()
13 sub_channel = km.get_sub_channel()
12 # xreq_channel = km.xreq_channel
13 sub_channel = km.sub_channel
14 14
15 15 # xreq_channel.start()
16 16 sub_channel.start()
17 17
18 18 print "Channels are started"
19 19
20 20 def printer(msg):
21 21 print
22 22 print msg
23 23
24 24 class CallHandler(object):
25 25
26 26 def __init__(self):
27 27 self.queue = Queue()
28 28
29 29 def __call__(self, handler, msg):
30 30 self.queue.put((handler, msg))
31 31
32 32 def handle(self):
33 33 try:
34 34 handler, msg = self.queue.get(block=False)
35 35 except Empty:
36 36 pass
37 37 else:
38 38 handler(msg)
39 39
40 40 call_handler = CallHandler()
41 41 sub_channel.override_call_handler(call_handler)
42 42 sub_channel.add_handler(printer, 'pyin')
43 43 sub_channel.add_handler(printer, 'pyout')
44 44 sub_channel.add_handler(printer, 'stdout')
45 45 sub_channel.add_handler(printer, 'stderr')
46 46
47 47 for i in range(100):
48 48 call_handler.handle()
49 49 time.sleep(1)
50 50
51 51 # xreq_channel.join()
52 sub_channel.join() No newline at end of file
52 sub_channel.join()
General Comments 0
You need to be logged in to leave comments. Login now