##// END OF EJS Templates
Initial checkin of Qt kernel manager. Began refactor of FrontendWidget.
epatters -
Show More
@@ -0,0 +1,79 b''
1 """ A KernelManager that provides channels that use signals and slots.
2 """
3
4 # System library imports.
5 from PyQt4 import QtCore
6
7 # IPython imports.
8 from IPython.zmq.kernel_manager import KernelManager, SubSocketChannel, \
9 XReqSocketChannel, RepSocketChannel
10
11
12 class QtKernelManager(KernelManager):
13 """ A KernelManager that provides channels that use signals and slots.
14 """
15
16 sub_channel_class = QtSubSocketChannel
17 xreq_channel_class = QtXReqSocketChannel
18 rep_channel_class = QtRepSocketChannel
19
20
21 class QtSubSocketChannel(SubSocketChannel, QtCore.QObject):
22
23 # Emitted when any message is received.
24 message_received = QtCore.pyqtSignal(dict)
25
26 # Emitted when a message of type 'pyout' or 'stdout' is received.
27 output_received = QtCore.pyqtSignal(dict)
28
29 # Emitted when a message of type 'pyerr' or 'stderr' is received.
30 error_received = QtCore.pyqtSignal(dict)
31
32 #---------------------------------------------------------------------------
33 # 'SubSocketChannel' interface
34 #---------------------------------------------------------------------------
35
36 def call_handlers(self, msg):
37 """ Reimplemented to emit signals instead of making callbacks.
38 """
39 # Emit the generic signal.
40 self.message_received.emit(msg)
41
42 # Emit signals for specialized message types.
43 msg_type = msg['msg_type']
44 if msg_type in ('pyout', 'stdout'):
45 self.output_received.emit(msg)
46 elif msg_type in ('pyerr', 'stderr'):
47 self.error_received.emit(msg)
48
49
50 class QtXReqSocketChannel(XReqSocketChannel, QtCore.QObject):
51
52 # Emitted when any message is received.
53 message_received = QtCore.pyqtSignal(dict)
54
55 # Emitted when a reply has been received for the corresponding request type.
56 execute_reply = QtCore.pyqtSignal(dict)
57 complete_reply = QtCore.pyqtSignal(dict)
58 object_info_reply = QtCore.pyqtSignal(dict)
59
60 #---------------------------------------------------------------------------
61 # 'XReqSocketChannel' interface
62 #---------------------------------------------------------------------------
63
64 def call_handlers(self, msg):
65 """ Reimplemented to emit signals instead of making callbacks.
66 """
67 # Emit the generic signal.
68 self.message_received.emit(msg)
69
70 # Emit signals for specialized message types.
71 msg_type = msg['msg_type']
72 signal = getattr(self, msg_type, None)
73 if signal:
74 signal.emit(msg)
75
76
77 class QtRepSocketChannel(RepSocketChannel, QtCore.QObject):
78
79 pass
@@ -1,380 +1,391 b''
1 1 # Standard library imports
2 2 from codeop import CommandCompiler
3 3 from threading import Thread
4 4 import time
5 5 import types
6 6
7 7 # System library imports
8 from IPython.zmq.session import Message, Session
9 8 from pygments.lexers import PythonLexer
10 9 from PyQt4 import QtCore, QtGui
11 10 import zmq
12 11
12 # IPython imports.
13 from IPython.zmq.session import Message, Session
14
13 15 # Local imports
14 16 from call_tip_widget import CallTipWidget
15 17 from completion_lexer import CompletionLexer
16 18 from console_widget import HistoryConsoleWidget
17 19 from pygments_highlighter import PygmentsHighlighter
18 20
19 21
20 22 class FrontendReplyThread(Thread, QtCore.QObject):
21 23 """ A Thread that receives a reply from the kernel for the frontend.
22 24 """
23 25
24 26 finished = QtCore.pyqtSignal()
25 27 output_received = QtCore.pyqtSignal(Message)
26 28 reply_received = QtCore.pyqtSignal(Message)
27 29
28 30 def __init__(self, parent):
29 31 """ Create a FrontendReplyThread for the specified frontend.
30 32 """
31 33 assert isinstance(parent, FrontendWidget)
32 34 QtCore.QObject.__init__(self, parent)
33 35 Thread.__init__(self)
34 36
35 37 self.sleep_time = 0.05
36 38
37 39 def run(self):
38 40 """ The starting point for the thread.
39 41 """
40 42 frontend = self.parent()
41 43 while True:
42 44 rep = frontend._recv_reply()
43 45 if rep is not None:
44 46 self._recv_output()
45 47 self.reply_received.emit(rep)
46 48 break
47 49
48 50 self._recv_output()
49 51 time.sleep(self.sleep_time)
50 52
51 53 self.finished.emit()
52 54
53 55 def _recv_output(self):
54 56 """ Send any output to the frontend.
55 57 """
56 58 frontend = self.parent()
57 59 omsgs = frontend._recv_output()
58 60 for omsg in omsgs:
59 61 self.output_received.emit(omsg)
60 62
61 63
62 64 class FrontendHighlighter(PygmentsHighlighter):
63 65 """ A Python PygmentsHighlighter that can be turned on and off and which
64 66 knows about continuation prompts.
65 67 """
66 68
67 69 def __init__(self, frontend):
68 70 PygmentsHighlighter.__init__(self, frontend.document(), PythonLexer())
69 71 self._current_offset = 0
70 72 self._frontend = frontend
71 73 self.highlighting_on = False
72 74
73 75 def highlightBlock(self, qstring):
74 76 """ Highlight a block of text. Reimplemented to highlight selectively.
75 77 """
76 78 if self.highlighting_on:
77 79 for prompt in (self._frontend._prompt,
78 80 self._frontend.continuation_prompt):
79 81 if qstring.startsWith(prompt):
80 82 qstring.remove(0, len(prompt))
81 83 self._current_offset = len(prompt)
82 84 break
83 85 PygmentsHighlighter.highlightBlock(self, qstring)
84 86
85 87 def setFormat(self, start, count, format):
86 88 """ Reimplemented to avoid highlighting continuation prompts.
87 89 """
88 90 start += self._current_offset
89 91 PygmentsHighlighter.setFormat(self, start, count, format)
90 92
91 93
92 94 class FrontendWidget(HistoryConsoleWidget):
93 95 """ A Qt frontend for an IPython kernel.
94 96 """
95 97
96 98 # Emitted when an 'execute_reply' is received from the kernel.
97 99 executed = QtCore.pyqtSignal(Message)
98 100
99 101 #---------------------------------------------------------------------------
100 102 # 'QWidget' interface
101 103 #---------------------------------------------------------------------------
102 104
103 def __init__(self, parent=None, session=None, request_socket=None,
104 sub_socket=None):
105 def __init__(self, kernel_manager, parent=None):
105 106 super(FrontendWidget, self).__init__(parent)
106 self.continuation_prompt = '... '
107 107
108 108 self._call_tip_widget = CallTipWidget(self)
109 109 self._compile = CommandCompiler()
110 110 self._completion_lexer = CompletionLexer(PythonLexer())
111 111 self._highlighter = FrontendHighlighter(self)
112 112
113 self.session = Session() if session is None else session
114 self.request_socket = request_socket
115 self.sub_socket = sub_socket
116
117 113 self.document().contentsChange.connect(self._document_contents_change)
118 114
119 self._kernel_connected() # XXX
115 self.continuation_prompt = '... '
116 self.kernel_manager = kernel_manager
120 117
121 118 def focusOutEvent(self, event):
122 119 """ Reimplemented to hide calltips.
123 120 """
124 121 self._call_tip_widget.hide()
125 122 return super(FrontendWidget, self).focusOutEvent(event)
126 123
127 124 def keyPressEvent(self, event):
128 125 """ Reimplemented to hide calltips.
129 126 """
130 127 if event.key() == QtCore.Qt.Key_Escape:
131 128 self._call_tip_widget.hide()
132 129 return super(FrontendWidget, self).keyPressEvent(event)
133 130
134 131 #---------------------------------------------------------------------------
135 132 # 'ConsoleWidget' abstract interface
136 133 #---------------------------------------------------------------------------
137 134
138 135 def _execute(self, interactive):
139 136 """ Called to execute the input buffer. When triggered by an the enter
140 137 key press, 'interactive' is True; otherwise, it is False. Returns
141 138 whether the input buffer was completely processed and a new prompt
142 139 created.
143 140 """
144 141 return self.execute_source(self.input_buffer, interactive=interactive)
145 142
146 143 def _prompt_started_hook(self):
147 144 """ Called immediately after a new prompt is displayed.
148 145 """
149 146 self._highlighter.highlighting_on = True
150 147
151 148 def _prompt_finished_hook(self):
152 149 """ Called immediately after a prompt is finished, i.e. when some input
153 150 will be processed and a new prompt displayed.
154 151 """
155 152 self._highlighter.highlighting_on = False
156 153
157 154 def _tab_pressed(self):
158 155 """ Called when the tab key is pressed. Returns whether to continue
159 156 processing the event.
160 157 """
161 158 self._keep_cursor_in_buffer()
162 159 cursor = self.textCursor()
163 160 if not self._complete():
164 161 cursor.insertText(' ')
165 162 return False
166 163
167 164 #---------------------------------------------------------------------------
168 165 # 'FrontendWidget' interface
169 166 #---------------------------------------------------------------------------
170 167
171 168 def execute_source(self, source, hidden=False, interactive=False):
172 169 """ Execute a string containing Python code. If 'hidden', no output is
173 170 shown. Returns whether the source executed (i.e., returns True only
174 171 if no more input is necessary).
175 172 """
176 173 try:
177 174 code = self._compile(source, symbol='single')
178 175 except (OverflowError, SyntaxError, ValueError):
179 176 # Just let IPython deal with the syntax error.
180 177 code = Exception
181 178
182 179 # Only execute interactive multiline input if it ends with a blank line
183 180 lines = source.splitlines()
184 181 if interactive and len(lines) > 1 and lines[-1].strip() != '':
185 182 code = None
186 183
187 184 executed = code is not None
188 185 if executed:
189 186 msg = self.session.send(self.request_socket, 'execute_request',
190 187 dict(code=source))
191 188 thread = FrontendReplyThread(self)
192 189 if not hidden:
193 190 thread.output_received.connect(self._handle_output)
194 191 thread.reply_received.connect(self._handle_reply)
195 192 thread.finished.connect(thread.deleteLater)
196 193 thread.start()
197 194 else:
198 195 space = 0
199 196 for char in lines[-1]:
200 197 if char == '\t':
201 198 space += 4
202 199 elif char == ' ':
203 200 space += 1
204 201 else:
205 202 break
206 203 if source.endswith(':') or source.endswith(':\n'):
207 204 space += 4
208 205 self._show_continuation_prompt()
209 206 self.appendPlainText(' ' * space)
210 207
211 208 return executed
212 209
213 210 def execute_file(self, path, hidden=False):
214 211 """ Attempts to execute file with 'path'. If 'hidden', no output is
215 212 shown.
216 213 """
217 214 self.execute_source('run %s' % path, hidden=hidden)
218 215
216 def _get_kernel_manager(self):
217 """ Returns the current kernel manager.
218 """
219 return self._kernel_manager
220
221 def _set_kernel_manager(self, kernel_manager):
222 """ Sets a new kernel manager, configuring its channels as necessary.
223 """
224 self._kernel_manager = kernel_manager
225 self._sub_channel = kernel_manager.get_sub_channel()
226 self._xreq_channel = kernel_manager.get_xreq_channel()
227
228 kernel_manager = property(_get_kernel_manager, _set_kernel_manager)
229
219 230 #---------------------------------------------------------------------------
220 231 # 'FrontendWidget' protected interface
221 232 #---------------------------------------------------------------------------
222 233
223 234 def _call_tip(self):
224 235 """ Shows a call tip, if appropriate, at the current cursor location.
225 236 """
226 237 # Decide if it makes sense to show a call tip
227 238 cursor = self.textCursor()
228 239 cursor.movePosition(QtGui.QTextCursor.Left)
229 240 document = self.document()
230 241 if document.characterAt(cursor.position()).toAscii() != '(':
231 242 return False
232 243 context = self._get_context(cursor)
233 244 if not context:
234 245 return False
235 246
236 247 # Send the metadata request to the kernel
237 248 text = '.'.join(context)
238 249 msg = self.session.send(self.request_socket, 'metadata_request',
239 250 dict(context=text))
240 251
241 252 # Give the kernel some time to respond
242 253 rep = self._recv_reply_now('metadata_reply')
243 254 doc = rep.content.docstring if rep else ''
244 255
245 256 # Show the call tip
246 257 if doc:
247 258 self._call_tip_widget.show_tip(doc)
248 259 return True
249 260
250 261 def _complete(self):
251 262 """ Performs completion at the current cursor location.
252 263 """
253 264 # Decide if it makes sense to do completion
254 265 context = self._get_context()
255 266 if not context:
256 267 return False
257 268
258 269 # Send the completion request to the kernel
259 270 text = '.'.join(context)
260 271 line = self.input_buffer_cursor_line
261 272 msg = self.session.send(self.request_socket, 'complete_request',
262 273 dict(text=text, line=line))
263 274
264 275 # Give the kernel some time to respond
265 276 rep = self._recv_reply_now('complete_reply')
266 277 matches = rep.content.matches if rep else []
267 278
268 279 # Show the completion at the correct location
269 280 cursor = self.textCursor()
270 281 cursor.movePosition(QtGui.QTextCursor.Left, n=len(text))
271 282 self._complete_with_items(cursor, matches)
272 283 return True
273 284
274 285 def _kernel_connected(self):
275 286 """ Called when the frontend is connected to a kernel.
276 287 """
277 288 self._show_prompt('>>> ')
278 289
279 290 def _get_context(self, cursor=None):
280 291 """ Gets the context at the current cursor location.
281 292 """
282 293 if cursor is None:
283 294 cursor = self.textCursor()
284 295 cursor.movePosition(QtGui.QTextCursor.StartOfLine,
285 296 QtGui.QTextCursor.KeepAnchor)
286 297 text = unicode(cursor.selectedText())
287 298 return self._completion_lexer.get_context(text)
288 299
289 300 #------ Signal handlers ----------------------------------------------------
290 301
291 302 def _document_contents_change(self, position, removed, added):
292 303 """ Called whenever the document's content changes. Display a calltip
293 304 if appropriate.
294 305 """
295 306 # Calculate where the cursor should be *after* the change:
296 307 position += added
297 308
298 309 document = self.document()
299 310 if position == self.textCursor().position():
300 311 self._call_tip()
301 312
302 313 def _handle_output(self, omsg):
303 314 handler = getattr(self, '_handle_%s' % omsg.msg_type, None)
304 315 if handler is not None:
305 316 handler(omsg)
306 317
307 318 def _handle_pyout(self, omsg):
308 319 if omsg.parent_header.session == self.session.session:
309 320 self.appendPlainText(omsg.content.data + '\n')
310 321
311 322 def _handle_stream(self, omsg):
312 323 self.appendPlainText(omsg.content.data)
313 324
314 325 def _handle_reply(self, rep):
315 326 if rep is not None:
316 327 if rep.msg_type == 'execute_reply':
317 328 if rep.content.status == 'error':
318 329 self.appendPlainText(rep.content.traceback[-1])
319 330 elif rep.content.status == 'aborted':
320 331 text = "ERROR: ABORTED\n"
321 332 ab = self.messages[rep.parent_header.msg_id].content
322 333 if 'code' in ab:
323 334 text += ab.code
324 335 else:
325 336 text += ab
326 337 self.appendPlainText(text)
327 338 self._show_prompt('>>> ')
328 339 self.executed.emit(rep)
329 340
330 341 #------ Communication methods ----------------------------------------------
331 342
332 343 def _recv_output(self):
333 344 omsgs = []
334 345 while True:
335 346 omsg = self.session.recv(self.sub_socket)
336 347 if omsg is None:
337 348 break
338 349 else:
339 350 omsgs.append(omsg)
340 351 return omsgs
341 352
342 353 def _recv_reply(self):
343 354 return self.session.recv(self.request_socket)
344 355
345 356 def _recv_reply_now(self, msg_type):
346 357 for i in xrange(5):
347 358 rep = self._recv_reply()
348 359 if rep is not None and rep.msg_type == msg_type:
349 360 return rep
350 361 time.sleep(0.1)
351 362 return None
352 363
353 364
354 365 if __name__ == '__main__':
355 366 import sys
356 367
357 368 # Defaults
358 369 ip = '127.0.0.1'
359 370 port_base = 5555
360 371 connection = ('tcp://%s' % ip) + ':%i'
361 372 req_conn = connection % port_base
362 373 sub_conn = connection % (port_base+1)
363 374
364 375 # Create initial sockets
365 376 c = zmq.Context()
366 377 request_socket = c.socket(zmq.XREQ)
367 378 request_socket.connect(req_conn)
368 379 sub_socket = c.socket(zmq.SUB)
369 380 sub_socket.connect(sub_conn)
370 381 sub_socket.setsockopt(zmq.SUBSCRIBE, '')
371 382
372 383 # Launch application
373 384 app = QtGui.QApplication(sys.argv)
374 385 widget = FrontendWidget(request_socket=request_socket,
375 386 sub_socket=sub_socket)
376 387 widget.setWindowTitle('Python')
377 388 widget.resize(640, 480)
378 389 widget.show()
379 390 sys.exit(app.exec_())
380 391
@@ -1,271 +1,275 b''
1 1 """Kernel frontend classes.
2 2
3 To do:
4
5 1. Create custom channel subclasses for Qt.
6 2. Create logger to handle debugging and console messages.
3 TODO: Create logger to handle debugging and console messages.
7 4
8 5 """
9 6
10 7 from Queue import Queue, Empty
11 8 from threading import Thread
12 9 import traceback
13 10
14 11 import zmq
15 12 from zmq import POLLIN, POLLOUT, POLLERR
16 13 from zmq.eventloop import ioloop
17 14 from session import Session
18 15
19 16
20 17 class MissingHandlerError(Exception):
21 18 pass
22 19
23 20
24 21 class KernelManager(object):
25 22
23 sub_channel_class = SubSocketChannel
24 xreq_channel_class = XReqSocketChannel
25 rep_channel_class = RepSocketChannel
26
26 27 def __init__(self, xreq_addr, sub_addr, rep_addr,
27 28 context=None, session=None):
28 29 self.context = zmq.Context() if context is None else context
29 30 self.session = Session() if session is None else session
30 31 self.xreq_addr = xreq_addr
31 32 self.sub_addr = sub_addr
32 33 self.rep_addr = rep_addr
33 34
34 35 def start_kernel(self):
35 36 """Start a localhost kernel on ip and port.
36 37
37 38 The SUB channel is for the frontend to receive messages published by
38 39 the kernel.
39 40
40 41 The REQ channel is for the frontend to make requests of the kernel.
41 42
42 43 The REP channel is for the kernel to request stdin (raw_input) from
43 44 the frontend.
44 45 """
45 46
46 47 def kill_kernel(self):
47 48 """Kill the running kernel"""
48 49
49 50 def is_alive(self):
50 51 """Is the kernel alive?"""
51 52 return True
52 53
53 54 def signal_kernel(self, signum):
54 55 """Send signum to the kernel."""
55 56
56 57 def get_sub_channel(self):
57 58 """Get the SUB socket channel object."""
58 return SubSocketChannel(self.context, self.session, self.sub_addr)
59 return self.sub_channel_class(self.context, self.session, self.sub_addr)
59 60
60 61 def get_xreq_channel(self):
61 62 """Get the REQ socket channel object to make requests of the kernel."""
62 return XReqSocketChannel(self.context, self.session, self.xreq_addr)
63 return self.xreq_channel_class(self.context, self.session,
64 self.xreq_addr)
63 65
64 66 def get_rep_channel(self):
65 67 """Get the REP socket channel object to handle stdin (raw_input)."""
66 return RepSocketChannel(self.context, self.session, self.rep_addr)
68 return self.rep_channel_class(self.context, self.session, self.rep_addr)
67 69
68 70
69 71 class ZmqSocketChannel(Thread):
70 72
71 73 socket = None
72 74
73 75 def __init__(self, context, session, addr):
74 76 self.context = context
75 77 self.session = session
76 78 self.addr = addr
77 79 super(ZmqSocketChannel, self).__init__()
78 80 self.daemon = True
79 81
80 82
81 83 class SubSocketChannel(ZmqSocketChannel):
82 84
83 85 handlers = None
84 86 _overriden_call_handler = None
85 87
86 88 def __init__(self, context, session, addr):
87 89 self.handlers = {}
88 90 super(SubSocketChannel, self).__init__(context, session, addr)
89 91
90 92 def run(self):
91 93 self.socket = self.context.socket(zmq.SUB)
92 94 self.socket.setsockopt(zmq.SUBSCRIBE,'')
93 95 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
94 96 self.socket.connect('tcp://%s:%i' % self.addr)
95 97 self.ioloop = ioloop.IOLoop()
96 98 self.ioloop.add_handler(self.socket, self._handle_events,
97 99 POLLIN|POLLERR)
98 100 self.ioloop.start()
99 101
100 102 def _handle_events(self, socket, events):
101 103 # Turn on and off POLLOUT depending on if we have made a request
102 104 if events & POLLERR:
103 105 self._handle_err()
104 106 if events & POLLIN:
105 107 self._handle_recv()
106 108
107 109 def _handle_err(self):
108 110 raise zmq.ZmqError()
109 111
110 112 def _handle_recv(self):
111 113 msg = self.socket.recv_json()
112 114 self.call_handlers(msg)
113 115
114 116 def override_call_handler(self, func):
115 117 """Permanently override the call_handler.
116 118
117 119 The function func will be called as::
118 120
119 121 func(handler, msg)
120 122
121 123 And must call::
122 124
123 125 handler(msg)
124 126
125 127 in the main thread.
126 128 """
127 129 assert callable(func), "not a callable: %r" % func
128 130 self._overriden_call_handler = func
129 131
130 132 def call_handlers(self, msg):
131 133 handler = self.handlers.get(msg['msg_type'], None)
132 134 if handler is not None:
133 135 try:
134 136 self.call_handler(handler, msg)
135 137 except:
136 138 # XXX: This should be logged at least
137 139 traceback.print_last()
138 140
139 141 def call_handler(self, handler, msg):
140 142 if self._overriden_call_handler is not None:
141 143 self._overriden_call_handler(handler, msg)
142 144 elif hasattr(self, '_call_handler'):
143 145 call_handler = getattr(self, '_call_handler')
144 146 call_handler(handler, msg)
145 147 else:
146 148 raise RuntimeError('no handler!')
147 149
148 150 def add_handler(self, callback, msg_type):
149 151 """Register a callback for msg type."""
150 152 self.handlers[msg_type] = callback
151 153
152 154 def remove_handler(self, msg_type):
153 155 self.handlers.pop(msg_type, None)
154 156
155 157
156 158 class XReqSocketChannel(ZmqSocketChannel):
157 159
158 160 handler_queue = None
159 161 command_queue = None
160 162 handlers = None
161 163 _overriden_call_handler = None
162 164
163 165 def __init__(self, context, session, addr):
164 166 self.handlers = {}
165 167 self.handler_queue = Queue()
166 168 self.command_queue = Queue()
167 169 super(XReqSocketChannel, self).__init__(context, session, addr)
168 170
169 171 def run(self):
170 172 self.socket = self.context.socket(zmq.XREQ)
171 173 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
172 174 self.socket.connect('tcp://%s:%i' % self.addr)
173 175 self.ioloop = ioloop.IOLoop()
174 176 self.ioloop.add_handler(self.socket, self._handle_events,
175 177 POLLIN|POLLOUT|POLLERR)
176 178 self.ioloop.start()
177 179
178 180 def _handle_events(self, socket, events):
179 181 # Turn on and off POLLOUT depending on if we have made a request
180 182 if events & POLLERR:
181 183 self._handle_err()
182 184 if events & POLLOUT:
183 185 self._handle_send()
184 186 if events & POLLIN:
185 187 self._handle_recv()
186 188
187 189 def _handle_recv(self):
188 190 msg = self.socket.recv_json()
189 print "Got reply:", msg
190 try:
191 handler = self.handler_queue.get(False)
192 except Empty:
193 print "Message received with no handler!!!"
194 print msg
195 else:
196 self.call_handler(handler, msg)
191 self.call_handlers(msg)
197 192
198 193 def _handle_send(self):
199 194 try:
200 195 msg = self.command_queue.get(False)
201 196 except Empty:
202 197 pass
203 198 else:
204 199 self.socket.send_json(msg)
205 200
206 201 def _handle_err(self):
207 202 raise zmq.ZmqError()
208 203
209 204 def _queue_request(self, msg, callback):
210 205 handler = self._find_handler(msg['msg_type'], callback)
211 206 self.handler_queue.put(handler)
212 207 self.command_queue.put(msg)
213 208
214 209 def execute(self, code, callback=None):
215 210 # Create class for content/msg creation. Related to, but possibly
216 211 # not in Session.
217 212 content = dict(code=code)
218 213 msg = self.session.msg('execute_request', content)
219 214 self._queue_request(msg, callback)
220 215 return msg['header']['msg_id']
221 216
222 217 def complete(self, text, line, block=None, callback=None):
223 218 content = dict(text=text, line=line)
224 219 msg = self.session.msg('complete_request', content)
225 return self._queue_request(msg, callback)
220 self._queue_request(msg, callback)
226 221 return msg['header']['msg_id']
227 222
228 223 def object_info(self, oname, callback=None):
229 224 content = dict(oname=oname)
230 225 msg = self.session.msg('object_info_request', content)
231 return self._queue_request(msg, callback)
226 self._queue_request(msg, callback)
232 227 return msg['header']['msg_id']
233 228
234 229 def _find_handler(self, name, callback):
235 230 if callback is not None:
236 231 return callback
237 232 handler = self.handlers.get(name)
238 233 if handler is None:
239 234 raise MissingHandlerError('No handler defined for method: %s' % name)
240 235 return handler
241 236
242 237 def override_call_handler(self, func):
243 238 """Permanently override the call_handler.
244 239
245 240 The function func will be called as::
246 241
247 242 func(handler, msg)
248 243
249 244 And must call::
250 245
251 246 handler(msg)
252 247
253 248 in the main thread.
254 249 """
255 250 assert callable(func), "not a callable: %r" % func
256 251 self._overriden_call_handler = func
257 252
253 def call_handlers(self, msg):
254 try:
255 handler = self.handler_queue.get(False)
256 except Empty:
257 print "Message received with no handler!!!"
258 print msg
259 else:
260 self.call_handler(handler, msg)
261
258 262 def call_handler(self, handler, msg):
259 263 if self._overriden_call_handler is not None:
260 264 self._overriden_call_handler(handler, msg)
261 265 elif hasattr(self, '_call_handler'):
262 266 call_handler = getattr(self, '_call_handler')
263 267 call_handler(handler, msg)
264 268 else:
265 269 raise RuntimeError('no handler!')
266 270
267 271
268 272 class RepSocketChannel(ZmqSocketChannel):
269 273
270 274 def on_raw_input():
271 275 pass
General Comments 0
You need to be logged in to leave comments. Login now