##// END OF EJS Templates
* Change input mechanism: replace raw_input instead of stdin....
epatters -
Show More
@@ -1,382 +1,382
1 1 # Standard library imports
2 2 import signal
3 3 import sys
4 4
5 5 # System library imports
6 6 from pygments.lexers import PythonLexer
7 7 from PyQt4 import QtCore, QtGui
8 8 import zmq
9 9
10 10 # Local imports
11 11 from IPython.core.inputsplitter import InputSplitter
12 12 from call_tip_widget import CallTipWidget
13 13 from completion_lexer import CompletionLexer
14 14 from console_widget import HistoryConsoleWidget
15 15 from pygments_highlighter import PygmentsHighlighter
16 16
17 17
18 18 class FrontendHighlighter(PygmentsHighlighter):
19 19 """ A PygmentsHighlighter that can be turned on and off and that ignores
20 20 prompts.
21 21 """
22 22
23 23 def __init__(self, frontend):
24 24 super(FrontendHighlighter, self).__init__(frontend.document())
25 25 self._current_offset = 0
26 26 self._frontend = frontend
27 27 self.highlighting_on = False
28 28
29 29 def highlightBlock(self, qstring):
30 30 """ Highlight a block of text. Reimplemented to highlight selectively.
31 31 """
32 32 if not self.highlighting_on:
33 33 return
34 34
35 35 # The input to this function is unicode string that may contain
36 36 # paragraph break characters, non-breaking spaces, etc. Here we acquire
37 37 # the string as plain text so we can compare it.
38 38 current_block = self.currentBlock()
39 39 string = self._frontend._get_block_plain_text(current_block)
40 40
41 41 # Decide whether to check for the regular or continuation prompt.
42 42 if current_block.contains(self._frontend._prompt_pos):
43 43 prompt = self._frontend._prompt
44 44 else:
45 45 prompt = self._frontend._continuation_prompt
46 46
47 47 # Don't highlight the part of the string that contains the prompt.
48 48 if string.startswith(prompt):
49 49 self._current_offset = len(prompt)
50 50 qstring.remove(0, len(prompt))
51 51 else:
52 52 self._current_offset = 0
53 53
54 54 PygmentsHighlighter.highlightBlock(self, qstring)
55 55
56 56 def setFormat(self, start, count, format):
57 57 """ Reimplemented to highlight selectively.
58 58 """
59 59 start += self._current_offset
60 60 PygmentsHighlighter.setFormat(self, start, count, format)
61 61
62 62
63 63 class FrontendWidget(HistoryConsoleWidget):
64 64 """ A Qt frontend for a generic Python kernel.
65 65 """
66 66
67 67 # Emitted when an 'execute_reply' is received from the kernel.
68 68 executed = QtCore.pyqtSignal(object)
69 69
70 70 #---------------------------------------------------------------------------
71 71 # 'QObject' interface
72 72 #---------------------------------------------------------------------------
73 73
74 74 def __init__(self, parent=None):
75 75 super(FrontendWidget, self).__init__(parent)
76 76
77 77 # FrontendWidget protected variables.
78 78 self._call_tip_widget = CallTipWidget(self)
79 79 self._completion_lexer = CompletionLexer(PythonLexer())
80 80 self._hidden = True
81 81 self._highlighter = FrontendHighlighter(self)
82 82 self._input_splitter = InputSplitter(input_mode='replace')
83 83 self._kernel_manager = None
84 84
85 85 # Configure the ConsoleWidget.
86 86 self.tab_width = 4
87 87 self._set_continuation_prompt('... ')
88 88
89 89 self.document().contentsChange.connect(self._document_contents_change)
90 90
91 91 #---------------------------------------------------------------------------
92 92 # 'QWidget' interface
93 93 #---------------------------------------------------------------------------
94 94
95 95 def focusOutEvent(self, event):
96 96 """ Reimplemented to hide calltips.
97 97 """
98 98 self._call_tip_widget.hide()
99 99 super(FrontendWidget, self).focusOutEvent(event)
100 100
101 101 def keyPressEvent(self, event):
102 102 """ Reimplemented to allow calltips to process events and to send
103 103 signals to the kernel.
104 104 """
105 105 if self._executing and event.key() == QtCore.Qt.Key_C and \
106 106 self._control_down(event.modifiers()):
107 107 self._interrupt_kernel()
108 108 else:
109 109 if self._call_tip_widget.isVisible():
110 110 self._call_tip_widget.keyPressEvent(event)
111 111 super(FrontendWidget, self).keyPressEvent(event)
112 112
113 113 #---------------------------------------------------------------------------
114 114 # 'ConsoleWidget' abstract interface
115 115 #---------------------------------------------------------------------------
116 116
117 117 def _is_complete(self, source, interactive):
118 118 """ Returns whether 'source' can be completely processed and a new
119 119 prompt created. When triggered by an Enter/Return key press,
120 120 'interactive' is True; otherwise, it is False.
121 121 """
122 122 complete = self._input_splitter.push(source.expandtabs(4))
123 123 if interactive:
124 124 complete = not self._input_splitter.push_accepts_more()
125 125 return complete
126 126
127 127 def _execute(self, source, hidden):
128 128 """ Execute 'source'. If 'hidden', do not show any output.
129 129 """
130 130 self.kernel_manager.xreq_channel.execute(source)
131 131 self._hidden = hidden
132 132
133 133 def _prompt_started_hook(self):
134 134 """ Called immediately after a new prompt is displayed.
135 135 """
136 136 if not self._reading:
137 137 self._highlighter.highlighting_on = True
138 138
139 139 # Auto-indent if this is a continuation prompt.
140 140 if self._get_prompt_cursor().blockNumber() != \
141 141 self._get_end_cursor().blockNumber():
142 142 spaces = self._input_splitter.indent_spaces
143 143 self.appendPlainText('\t' * (spaces / self.tab_width))
144 144 self.appendPlainText(' ' * (spaces % self.tab_width))
145 145
146 146 def _prompt_finished_hook(self):
147 147 """ Called immediately after a prompt is finished, i.e. when some input
148 148 will be processed and a new prompt displayed.
149 149 """
150 150 if not self._reading:
151 151 self._highlighter.highlighting_on = False
152 152
153 153 def _tab_pressed(self):
154 154 """ Called when the tab key is pressed. Returns whether to continue
155 155 processing the event.
156 156 """
157 157 self._keep_cursor_in_buffer()
158 158 cursor = self.textCursor()
159 159 return not self._complete()
160 160
161 161 #---------------------------------------------------------------------------
162 162 # 'FrontendWidget' interface
163 163 #---------------------------------------------------------------------------
164 164
165 165 def execute_file(self, path, hidden=False):
166 166 """ Attempts to execute file with 'path'. If 'hidden', no output is
167 167 shown.
168 168 """
169 169 self.execute('execfile("%s")' % path, hidden=hidden)
170 170
171 171 def _get_kernel_manager(self):
172 172 """ Returns the current kernel manager.
173 173 """
174 174 return self._kernel_manager
175 175
176 176 def _set_kernel_manager(self, kernel_manager):
177 177 """ Disconnect from the current kernel manager (if any) and set a new
178 178 kernel manager.
179 179 """
180 180 # Disconnect the old kernel manager, if necessary.
181 181 if self._kernel_manager is not None:
182 182 self._kernel_manager.started_channels.disconnect(
183 183 self._started_channels)
184 184 self._kernel_manager.stopped_channels.disconnect(
185 185 self._stopped_channels)
186 186
187 187 # Disconnect the old kernel manager's channels.
188 188 sub = self._kernel_manager.sub_channel
189 189 xreq = self._kernel_manager.xreq_channel
190 190 rep = self._kernel_manager.rep_channel
191 191 sub.message_received.disconnect(self._handle_sub)
192 192 xreq.execute_reply.disconnect(self._handle_execute_reply)
193 193 xreq.complete_reply.disconnect(self._handle_complete_reply)
194 194 xreq.object_info_reply.disconnect(self._handle_object_info_reply)
195 rep.readline_requested.disconnect(self._handle_req)
195 rep.input_requested.disconnect(self._handle_req)
196 196
197 197 # Handle the case where the old kernel manager is still listening.
198 198 if self._kernel_manager.channels_running:
199 199 self._stopped_channels()
200 200
201 201 # Set the new kernel manager.
202 202 self._kernel_manager = kernel_manager
203 203 if kernel_manager is None:
204 204 return
205 205
206 206 # Connect the new kernel manager.
207 207 kernel_manager.started_channels.connect(self._started_channels)
208 208 kernel_manager.stopped_channels.connect(self._stopped_channels)
209 209
210 210 # Connect the new kernel manager's channels.
211 211 sub = kernel_manager.sub_channel
212 212 xreq = kernel_manager.xreq_channel
213 213 rep = kernel_manager.rep_channel
214 214 sub.message_received.connect(self._handle_sub)
215 215 xreq.execute_reply.connect(self._handle_execute_reply)
216 216 xreq.complete_reply.connect(self._handle_complete_reply)
217 217 xreq.object_info_reply.connect(self._handle_object_info_reply)
218 rep.readline_requested.connect(self._handle_req)
218 rep.input_requested.connect(self._handle_req)
219 219
220 220 # Handle the case where the kernel manager started channels before
221 221 # we connected.
222 222 if kernel_manager.channels_running:
223 223 self._started_channels()
224 224
225 225 kernel_manager = property(_get_kernel_manager, _set_kernel_manager)
226 226
227 227 #---------------------------------------------------------------------------
228 228 # 'FrontendWidget' protected interface
229 229 #---------------------------------------------------------------------------
230 230
231 231 def _call_tip(self):
232 232 """ Shows a call tip, if appropriate, at the current cursor location.
233 233 """
234 234 # Decide if it makes sense to show a call tip
235 235 cursor = self.textCursor()
236 236 cursor.movePosition(QtGui.QTextCursor.Left)
237 237 document = self.document()
238 238 if document.characterAt(cursor.position()).toAscii() != '(':
239 239 return False
240 240 context = self._get_context(cursor)
241 241 if not context:
242 242 return False
243 243
244 244 # Send the metadata request to the kernel
245 245 name = '.'.join(context)
246 246 self._calltip_id = self.kernel_manager.xreq_channel.object_info(name)
247 247 self._calltip_pos = self.textCursor().position()
248 248 return True
249 249
250 250 def _complete(self):
251 251 """ Performs completion at the current cursor location.
252 252 """
253 253 # Decide if it makes sense to do completion
254 254 context = self._get_context()
255 255 if not context:
256 256 return False
257 257
258 258 # Send the completion request to the kernel
259 259 text = '.'.join(context)
260 260 self._complete_id = self.kernel_manager.xreq_channel.complete(
261 261 text, self.input_buffer_cursor_line, self.input_buffer)
262 262 self._complete_pos = self.textCursor().position()
263 263 return True
264 264
265 265 def _get_banner(self):
266 266 """ Gets a banner to display at the beginning of a session.
267 267 """
268 268 banner = 'Python %s on %s\nType "help", "copyright", "credits" or ' \
269 269 '"license" for more information.'
270 270 return banner % (sys.version, sys.platform)
271 271
272 272 def _get_context(self, cursor=None):
273 273 """ Gets the context at the current cursor location.
274 274 """
275 275 if cursor is None:
276 276 cursor = self.textCursor()
277 277 cursor.movePosition(QtGui.QTextCursor.StartOfLine,
278 278 QtGui.QTextCursor.KeepAnchor)
279 279 text = str(cursor.selection().toPlainText())
280 280 return self._completion_lexer.get_context(text)
281 281
282 282 def _interrupt_kernel(self):
283 283 """ Attempts to the interrupt the kernel.
284 284 """
285 285 if self.kernel_manager.has_kernel:
286 286 self.kernel_manager.signal_kernel(signal.SIGINT)
287 287 else:
288 288 self.appendPlainText('Kernel process is either remote or '
289 289 'unspecified. Cannot interrupt.\n')
290 290
291 291 def _show_interpreter_prompt(self):
292 292 """ Shows a prompt for the interpreter.
293 293 """
294 294 self._show_prompt('>>> ')
295 295
296 296 #------ Signal handlers ----------------------------------------------------
297 297
298 298 def _started_channels(self):
299 299 """ Called when the kernel manager has started listening.
300 300 """
301 301 self._reset()
302 302 self.appendPlainText(self._get_banner())
303 303 self._show_interpreter_prompt()
304 304
305 305 def _stopped_channels(self):
306 306 """ Called when the kernel manager has stopped listening.
307 307 """
308 308 # FIXME: Print a message here?
309 309 pass
310 310
311 311 def _document_contents_change(self, position, removed, added):
312 312 """ Called whenever the document's content changes. Display a calltip
313 313 if appropriate.
314 314 """
315 315 # Calculate where the cursor should be *after* the change:
316 316 position += added
317 317
318 318 document = self.document()
319 319 if position == self.textCursor().position():
320 320 self._call_tip()
321 321
322 322 def _handle_req(self, req):
323 323 # Make sure that all output from the SUB channel has been processed
324 324 # before entering readline mode.
325 325 self.kernel_manager.sub_channel.flush()
326 326
327 327 def callback(line):
328 self.kernel_manager.rep_channel.readline(line)
329 self._readline(callback=callback)
328 self.kernel_manager.rep_channel.input(line)
329 self._readline(req['content']['prompt'], callback=callback)
330 330
331 331 def _handle_sub(self, omsg):
332 332 if self._hidden:
333 333 return
334 334 handler = getattr(self, '_handle_%s' % omsg['msg_type'], None)
335 335 if handler is not None:
336 336 handler(omsg)
337 337
338 338 def _handle_pyout(self, omsg):
339 339 self.appendPlainText(omsg['content']['data'] + '\n')
340 340
341 341 def _handle_stream(self, omsg):
342 342 self.appendPlainText(omsg['content']['data'])
343 343 self.moveCursor(QtGui.QTextCursor.End)
344 344
345 345 def _handle_execute_reply(self, reply):
346 346 if self._hidden:
347 347 return
348 348
349 349 # Make sure that all output from the SUB channel has been processed
350 350 # before writing a new prompt.
351 351 self.kernel_manager.sub_channel.flush()
352 352
353 353 status = reply['content']['status']
354 354 if status == 'error':
355 355 self._handle_execute_error(reply)
356 356 elif status == 'aborted':
357 357 text = "ERROR: ABORTED\n"
358 358 self.appendPlainText(text)
359 359 self._hidden = True
360 360 self._show_interpreter_prompt()
361 361 self.executed.emit(reply)
362 362
363 363 def _handle_execute_error(self, reply):
364 364 content = reply['content']
365 365 traceback = ''.join(content['traceback'])
366 366 self.appendPlainText(traceback)
367 367
368 368 def _handle_complete_reply(self, rep):
369 369 cursor = self.textCursor()
370 370 if rep['parent_header']['msg_id'] == self._complete_id and \
371 371 cursor.position() == self._complete_pos:
372 372 text = '.'.join(self._get_context())
373 373 cursor.movePosition(QtGui.QTextCursor.Left, n=len(text))
374 374 self._complete_with_items(cursor, rep['content']['matches'])
375 375
376 376 def _handle_object_info_reply(self, rep):
377 377 cursor = self.textCursor()
378 378 if rep['parent_header']['msg_id'] == self._calltip_id and \
379 379 cursor.position() == self._calltip_pos:
380 380 doc = rep['content']['docstring']
381 381 if doc:
382 382 self._call_tip_widget.show_docstring(doc)
@@ -1,171 +1,171
1 1 """ Defines a KernelManager that provides signals and slots.
2 2 """
3 3
4 4 # System library imports.
5 5 from PyQt4 import QtCore
6 6 import zmq
7 7
8 8 # IPython imports.
9 9 from IPython.zmq.kernelmanager import KernelManager, SubSocketChannel, \
10 10 XReqSocketChannel, RepSocketChannel
11 11 from util import MetaQObjectHasTraits
12 12
13 13
14 14 class QtSubSocketChannel(SubSocketChannel, QtCore.QObject):
15 15
16 16 # Emitted when any message is received.
17 17 message_received = QtCore.pyqtSignal(object)
18 18
19 19 # Emitted when a message of type 'pyout' or 'stdout' is received.
20 20 output_received = QtCore.pyqtSignal(object)
21 21
22 22 # Emitted when a message of type 'pyerr' or 'stderr' is received.
23 23 error_received = QtCore.pyqtSignal(object)
24 24
25 25 #---------------------------------------------------------------------------
26 26 # 'object' interface
27 27 #---------------------------------------------------------------------------
28 28
29 29 def __init__(self, *args, **kw):
30 30 """ Reimplemented to ensure that QtCore.QObject is initialized first.
31 31 """
32 32 QtCore.QObject.__init__(self)
33 33 SubSocketChannel.__init__(self, *args, **kw)
34 34
35 35 #---------------------------------------------------------------------------
36 36 # 'SubSocketChannel' interface
37 37 #---------------------------------------------------------------------------
38 38
39 39 def call_handlers(self, msg):
40 40 """ Reimplemented to emit signals instead of making callbacks.
41 41 """
42 42 # Emit the generic signal.
43 43 self.message_received.emit(msg)
44 44
45 45 # Emit signals for specialized message types.
46 46 msg_type = msg['msg_type']
47 47 if msg_type in ('pyout', 'stdout'):
48 48 self.output_received.emit(msg)
49 49 elif msg_type in ('pyerr', 'stderr'):
50 50 self.error_received.emit(msg)
51 51
52 52 def flush(self):
53 53 """ Reimplemented to ensure that signals are dispatched immediately.
54 54 """
55 55 super(QtSubSocketChannel, self).flush()
56 56 QtCore.QCoreApplication.instance().processEvents()
57 57
58 58
59 59 class QtXReqSocketChannel(XReqSocketChannel, QtCore.QObject):
60 60
61 61 # Emitted when any message is received.
62 62 message_received = QtCore.pyqtSignal(object)
63 63
64 64 # Emitted when a reply has been received for the corresponding request type.
65 65 execute_reply = QtCore.pyqtSignal(object)
66 66 complete_reply = QtCore.pyqtSignal(object)
67 67 object_info_reply = QtCore.pyqtSignal(object)
68 68
69 69 #---------------------------------------------------------------------------
70 70 # 'object' interface
71 71 #---------------------------------------------------------------------------
72 72
73 73 def __init__(self, *args, **kw):
74 74 """ Reimplemented to ensure that QtCore.QObject is initialized first.
75 75 """
76 76 QtCore.QObject.__init__(self)
77 77 XReqSocketChannel.__init__(self, *args, **kw)
78 78
79 79 #---------------------------------------------------------------------------
80 80 # 'XReqSocketChannel' interface
81 81 #---------------------------------------------------------------------------
82 82
83 83 def call_handlers(self, msg):
84 84 """ Reimplemented to emit signals instead of making callbacks.
85 85 """
86 86 # Emit the generic signal.
87 87 self.message_received.emit(msg)
88 88
89 89 # Emit signals for specialized message types.
90 90 msg_type = msg['msg_type']
91 91 signal = getattr(self, msg_type, None)
92 92 if signal:
93 93 signal.emit(msg)
94 94
95 95
96 96 class QtRepSocketChannel(RepSocketChannel, QtCore.QObject):
97 97
98 98 # Emitted when any message is received.
99 99 message_received = QtCore.pyqtSignal(object)
100 100
101 # Emitted when a readline request is received.
102 readline_requested = QtCore.pyqtSignal(object)
101 # Emitted when an input request is received.
102 input_requested = QtCore.pyqtSignal(object)
103 103
104 104 #---------------------------------------------------------------------------
105 105 # 'object' interface
106 106 #---------------------------------------------------------------------------
107 107
108 108 def __init__(self, *args, **kw):
109 109 """ Reimplemented to ensure that QtCore.QObject is initialized first.
110 110 """
111 111 QtCore.QObject.__init__(self)
112 112 RepSocketChannel.__init__(self, *args, **kw)
113 113
114 114 #---------------------------------------------------------------------------
115 115 # 'RepSocketChannel' interface
116 116 #---------------------------------------------------------------------------
117 117
118 118 def call_handlers(self, msg):
119 119 """ Reimplemented to emit signals instead of making callbacks.
120 120 """
121 121 # Emit the generic signal.
122 122 self.message_received.emit(msg)
123 123
124 124 # Emit signals for specialized message types.
125 125 msg_type = msg['msg_type']
126 if msg_type == 'readline_request':
127 self.readline_requested.emit(msg)
126 if msg_type == 'input_request':
127 self.input_requested.emit(msg)
128 128
129 129
130 130 class QtKernelManager(KernelManager, QtCore.QObject):
131 131 """ A KernelManager that provides signals and slots.
132 132 """
133 133
134 134 __metaclass__ = MetaQObjectHasTraits
135 135
136 136 # Emitted when the kernel manager has started listening.
137 137 started_channels = QtCore.pyqtSignal()
138 138
139 139 # Emitted when the kernel manager has stopped listening.
140 140 stopped_channels = QtCore.pyqtSignal()
141 141
142 142 # Use Qt-specific channel classes that emit signals.
143 143 sub_channel_class = QtSubSocketChannel
144 144 xreq_channel_class = QtXReqSocketChannel
145 145 rep_channel_class = QtRepSocketChannel
146 146
147 147 #---------------------------------------------------------------------------
148 148 # 'object' interface
149 149 #---------------------------------------------------------------------------
150 150
151 151 def __init__(self, *args, **kw):
152 152 """ Reimplemented to ensure that QtCore.QObject is initialized first.
153 153 """
154 154 QtCore.QObject.__init__(self)
155 155 KernelManager.__init__(self, *args, **kw)
156 156
157 157 #---------------------------------------------------------------------------
158 158 # 'KernelManager' interface
159 159 #---------------------------------------------------------------------------
160 160
161 161 def start_channels(self):
162 162 """ Reimplemented to emit signal.
163 163 """
164 164 super(QtKernelManager, self).start_channels()
165 165 self.started_channels.emit()
166 166
167 167 def stop_channels(self):
168 168 """ Reimplemented to emit signal.
169 169 """
170 170 super(QtKernelManager, self).stop_channels()
171 171 self.stopped_channels.emit()
@@ -1,547 +1,488
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Finish implementing `raw_input`.
7 7 * Implement `set_parent` logic. Right before doing exec, the Kernel should
8 8 call set_parent on all the PUB objects with the message about to be executed.
9 9 * Implement random port and security key logic.
10 10 * Implement control messages.
11 11 * Implement event loop and poll version.
12 12 """
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 # Standard library imports.
19 19 import __builtin__
20 20 from code import CommandCompiler
21 21 from cStringIO import StringIO
22 22 import os
23 23 import sys
24 24 from threading import Thread
25 25 import time
26 26 import traceback
27 27
28 28 # System library imports.
29 29 import zmq
30 30
31 31 # Local imports.
32 32 from IPython.external.argparse import ArgumentParser
33 33 from session import Session, Message, extract_header
34 34 from completer import KernelCompleter
35 35
36 36 #-----------------------------------------------------------------------------
37 37 # Kernel and stream classes
38 38 #-----------------------------------------------------------------------------
39 39
40 class InStream(object):
41 """ A file like object that reads from a 0MQ XREQ socket."""
42
43 def __init__(self, session, socket):
44 self.session = session
45 self.socket = socket
46
47 def close(self):
48 self.socket = None
49
50 def flush(self):
51 if self.socket is None:
52 raise ValueError('I/O operation on closed file')
53
54 def isatty(self):
55 return False
56
57 def next(self):
58 raise IOError('Seek not supported.')
59
60 def read(self, size=-1):
61 # FIXME: Do we want another request for this?
62 string = '\n'.join(self.readlines())
63 return self._truncate(string, size)
64
65 def readline(self, size=-1):
66 if self.socket is None:
67 raise ValueError('I/O operation on closed file')
68 else:
69 content = dict(size=size)
70 msg = self.session.msg('readline_request', content=content)
71 reply = self._request(msg)
72 line = reply['content']['line']
73 return self._truncate(line, size)
74
75 def readlines(self, sizehint=-1):
76 # Sizehint is ignored, as is permitted.
77 if self.socket is None:
78 raise ValueError('I/O operation on closed file')
79 else:
80 lines = []
81 while True:
82 line = self.readline()
83 if line:
84 lines.append(line)
85 else:
86 break
87 return lines
88
89 def seek(self, offset, whence=None):
90 raise IOError('Seek not supported.')
91
92 def write(self, string):
93 raise IOError('Write not supported on a read only stream.')
94
95 def writelines(self, sequence):
96 raise IOError('Write not supported on a read only stream.')
97
98 def _request(self, msg):
99 # Flush output before making the request. This ensures, for example,
100 # that raw_input(prompt) actually gets a prompt written.
101 sys.stderr.flush()
102 sys.stdout.flush()
103
104 self.socket.send_json(msg)
105 while True:
106 try:
107 reply = self.socket.recv_json(zmq.NOBLOCK)
108 except zmq.ZMQError, e:
109 if e.errno == zmq.EAGAIN:
110 pass
111 else:
112 raise
113 else:
114 break
115 return reply
116
117 def _truncate(self, string, size):
118 if size >= 0:
119 if isinstance(string, str):
120 return string[:size]
121 elif isinstance(string, unicode):
122 encoded = string.encode('utf-8')[:size]
123 return encoded.decode('utf-8', 'ignore')
124 return string
125
126
127 40 class OutStream(object):
128 41 """A file like object that publishes the stream to a 0MQ PUB socket."""
129 42
130 43 # The time interval between automatic flushes, in seconds.
131 44 flush_interval = 0.05
132 45
133 46 def __init__(self, session, pub_socket, name):
134 47 self.session = session
135 48 self.pub_socket = pub_socket
136 49 self.name = name
137 50 self.parent_header = {}
138 51 self._new_buffer()
139 52
140 53 def set_parent(self, parent):
141 54 self.parent_header = extract_header(parent)
142 55
143 56 def close(self):
144 57 self.pub_socket = None
145 58
146 59 def flush(self):
147 60 if self.pub_socket is None:
148 61 raise ValueError(u'I/O operation on closed file')
149 62 else:
150 63 data = self._buffer.getvalue()
151 64 if data:
152 65 content = {u'name':self.name, u'data':data}
153 66 msg = self.session.msg(u'stream', content=content,
154 67 parent=self.parent_header)
155 68 print>>sys.__stdout__, Message(msg)
156 69 self.pub_socket.send_json(msg)
157 70
158 71 self._buffer.close()
159 72 self._new_buffer()
160 73
161 74 def isatty(self):
162 75 return False
163 76
164 77 def next(self):
165 78 raise IOError('Read not supported on a write only stream.')
166 79
167 80 def read(self, size=-1):
168 81 raise IOError('Read not supported on a write only stream.')
169 82
170 83 def readline(self, size=-1):
171 84 raise IOError('Read not supported on a write only stream.')
172 85
173 86 def write(self, string):
174 87 if self.pub_socket is None:
175 88 raise ValueError('I/O operation on closed file')
176 89 else:
177 90 self._buffer.write(string)
178 91 current_time = time.time()
179 92 if self._start <= 0:
180 93 self._start = current_time
181 94 elif current_time - self._start > self.flush_interval:
182 95 self.flush()
183 96
184 97 def writelines(self, sequence):
185 98 if self.pub_socket is None:
186 99 raise ValueError('I/O operation on closed file')
187 100 else:
188 101 for string in sequence:
189 102 self.write(string)
190 103
191 104 def _new_buffer(self):
192 105 self._buffer = StringIO()
193 106 self._start = -1
194 107
195 108
196 109 class DisplayHook(object):
197 110
198 111 def __init__(self, session, pub_socket):
199 112 self.session = session
200 113 self.pub_socket = pub_socket
201 114 self.parent_header = {}
202 115
203 116 def __call__(self, obj):
204 117 if obj is None:
205 118 return
206 119
207 120 __builtin__._ = obj
208 121 msg = self.session.msg(u'pyout', {u'data':repr(obj)},
209 122 parent=self.parent_header)
210 123 self.pub_socket.send_json(msg)
211 124
212 125 def set_parent(self, parent):
213 126 self.parent_header = extract_header(parent)
214 127
215 128
216 129 class Kernel(object):
217 130
218 def __init__(self, session, reply_socket, pub_socket):
131 def __init__(self, session, reply_socket, pub_socket, req_socket):
219 132 self.session = session
220 133 self.reply_socket = reply_socket
221 134 self.pub_socket = pub_socket
135 self.req_socket = req_socket
222 136 self.user_ns = {}
223 137 self.history = []
224 138 self.compiler = CommandCompiler()
225 139 self.completer = KernelCompleter(self.user_ns)
226 140
227 141 # Build dict of handlers for message types
228 142 msg_types = [ 'execute_request', 'complete_request',
229 143 'object_info_request' ]
230 144 self.handlers = {}
231 145 for msg_type in msg_types:
232 146 self.handlers[msg_type] = getattr(self, msg_type)
233 147
234 148 def abort_queue(self):
235 149 while True:
236 150 try:
237 151 ident = self.reply_socket.recv(zmq.NOBLOCK)
238 152 except zmq.ZMQError, e:
239 153 if e.errno == zmq.EAGAIN:
240 154 break
241 155 else:
242 156 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
243 157 msg = self.reply_socket.recv_json()
244 158 print>>sys.__stdout__, "Aborting:"
245 159 print>>sys.__stdout__, Message(msg)
246 160 msg_type = msg['msg_type']
247 161 reply_type = msg_type.split('_')[0] + '_reply'
248 162 reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
249 163 print>>sys.__stdout__, Message(reply_msg)
250 164 self.reply_socket.send(ident,zmq.SNDMORE)
251 165 self.reply_socket.send_json(reply_msg)
252 166 # We need to wait a bit for requests to come in. This can probably
253 167 # be set shorter for true asynchronous clients.
254 168 time.sleep(0.1)
255 169
256 170 def execute_request(self, ident, parent):
257 171 try:
258 172 code = parent[u'content'][u'code']
259 173 except:
260 174 print>>sys.__stderr__, "Got bad msg: "
261 175 print>>sys.__stderr__, Message(parent)
262 176 return
263 177 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
264 178 self.pub_socket.send_json(pyin_msg)
265 179
266 180 try:
267 181 comp_code = self.compiler(code, '<zmq-kernel>')
182
183 # Replace raw_input. Note that is not sufficient to replace
184 # raw_input in the user namespace.
185 raw_input = lambda prompt='': self.raw_input(prompt, ident, parent)
186 __builtin__.raw_input = raw_input
187
188 # Configure the display hook.
268 189 sys.displayhook.set_parent(parent)
190
269 191 exec comp_code in self.user_ns, self.user_ns
270 192 except:
271 193 result = u'error'
272 194 etype, evalue, tb = sys.exc_info()
273 195 tb = traceback.format_exception(etype, evalue, tb)
274 196 exc_content = {
275 197 u'status' : u'error',
276 198 u'traceback' : tb,
277 199 u'ename' : unicode(etype.__name__),
278 200 u'evalue' : unicode(evalue)
279 201 }
280 202 exc_msg = self.session.msg(u'pyerr', exc_content, parent)
281 203 self.pub_socket.send_json(exc_msg)
282 204 reply_content = exc_content
283 205 else:
284 206 reply_content = {'status' : 'ok'}
285 207
286 208 # Flush output before sending the reply.
287 209 sys.stderr.flush()
288 210 sys.stdout.flush()
289 211
290 212 # Send the reply.
291 213 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
292 214 print>>sys.__stdout__, Message(reply_msg)
293 215 self.reply_socket.send(ident, zmq.SNDMORE)
294 216 self.reply_socket.send_json(reply_msg)
295 217 if reply_msg['content']['status'] == u'error':
296 218 self.abort_queue()
297 219
220 def raw_input(self, prompt, ident, parent):
221 # Flush output before making the request.
222 sys.stderr.flush()
223 sys.stdout.flush()
224
225 # Send the input request.
226 content = dict(prompt=prompt)
227 msg = self.session.msg(u'input_request', content, parent)
228 self.req_socket.send_json(msg)
229
230 # Await a response.
231 reply = self.req_socket.recv_json()
232 try:
233 value = reply['content']['value']
234 except:
235 print>>sys.__stderr__, "Got bad raw_input reply: "
236 print>>sys.__stderr__, Message(parent)
237 value = ''
238 return value
239
298 240 def complete_request(self, ident, parent):
299 241 matches = {'matches' : self.complete(parent),
300 242 'status' : 'ok'}
301 243 completion_msg = self.session.send(self.reply_socket, 'complete_reply',
302 244 matches, parent, ident)
303 245 print >> sys.__stdout__, completion_msg
304 246
305 247 def complete(self, msg):
306 248 return self.completer.complete(msg.content.line, msg.content.text)
307 249
308 250 def object_info_request(self, ident, parent):
309 251 context = parent['content']['oname'].split('.')
310 252 object_info = self.object_info(context)
311 253 msg = self.session.send(self.reply_socket, 'object_info_reply',
312 254 object_info, parent, ident)
313 255 print >> sys.__stdout__, msg
314 256
315 257 def object_info(self, context):
316 258 symbol, leftover = self.symbol_from_context(context)
317 259 if symbol is not None and not leftover:
318 260 doc = getattr(symbol, '__doc__', '')
319 261 else:
320 262 doc = ''
321 263 object_info = dict(docstring = doc)
322 264 return object_info
323 265
324 266 def symbol_from_context(self, context):
325 267 if not context:
326 268 return None, context
327 269
328 270 base_symbol_string = context[0]
329 271 symbol = self.user_ns.get(base_symbol_string, None)
330 272 if symbol is None:
331 273 symbol = __builtin__.__dict__.get(base_symbol_string, None)
332 274 if symbol is None:
333 275 return None, context
334 276
335 277 context = context[1:]
336 278 for i, name in enumerate(context):
337 279 new_symbol = getattr(symbol, name, None)
338 280 if new_symbol is None:
339 281 return symbol, context[i:]
340 282 else:
341 283 symbol = new_symbol
342 284
343 285 return symbol, []
344 286
345 287 def start(self):
346 288 while True:
347 289 ident = self.reply_socket.recv()
348 assert self.reply_socket.rcvmore(), "Unexpected missing message part."
290 assert self.reply_socket.rcvmore(), "Missing message part."
349 291 msg = self.reply_socket.recv_json()
350 292 omsg = Message(msg)
351 293 print>>sys.__stdout__
352 294 print>>sys.__stdout__, omsg
353 295 handler = self.handlers.get(omsg.msg_type, None)
354 296 if handler is None:
355 297 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", omsg
356 298 else:
357 299 handler(ident, omsg)
358 300
359 301 #-----------------------------------------------------------------------------
360 302 # Kernel main and launch functions
361 303 #-----------------------------------------------------------------------------
362 304
363 305 class ExitPollerUnix(Thread):
364 306 """ A Unix-specific daemon thread that terminates the program immediately
365 when this process' parent process no longer exists.
307 when the parent process no longer exists.
366 308 """
367 309
368 310 def __init__(self):
369 311 super(ExitPollerUnix, self).__init__()
370 312 self.daemon = True
371 313
372 314 def run(self):
373 315 # We cannot use os.waitpid because it works only for child processes.
374 316 from errno import EINTR
375 317 while True:
376 318 try:
377 319 if os.getppid() == 1:
378 320 os._exit(1)
379 321 time.sleep(1.0)
380 322 except OSError, e:
381 323 if e.errno == EINTR:
382 324 continue
383 325 raise
384 326
385 327 class ExitPollerWindows(Thread):
386 328 """ A Windows-specific daemon thread that terminates the program immediately
387 329 when a Win32 handle is signaled.
388 330 """
389 331
390 332 def __init__(self, handle):
391 333 super(ExitPollerWindows, self).__init__()
392 334 self.daemon = True
393 335 self.handle = handle
394 336
395 337 def run(self):
396 338 from _subprocess import WaitForSingleObject, WAIT_OBJECT_0, INFINITE
397 339 result = WaitForSingleObject(self.handle, INFINITE)
398 340 if result == WAIT_OBJECT_0:
399 341 os._exit(1)
400 342
401 343
402 344 def bind_port(socket, ip, port):
403 345 """ Binds the specified ZMQ socket. If the port is less than zero, a random
404 346 port is chosen. Returns the port that was bound.
405 347 """
406 348 connection = 'tcp://%s' % ip
407 349 if port <= 0:
408 350 port = socket.bind_to_random_port(connection)
409 351 else:
410 352 connection += ':%i' % port
411 353 socket.bind(connection)
412 354 return port
413 355
414 356
415 357 def main():
416 358 """ Main entry point for launching a kernel.
417 359 """
418 360 # Parse command line arguments.
419 361 parser = ArgumentParser()
420 362 parser.add_argument('--ip', type=str, default='127.0.0.1',
421 363 help='set the kernel\'s IP address [default: local]')
422 364 parser.add_argument('--xrep', type=int, metavar='PORT', default=0,
423 365 help='set the XREP channel port [default: random]')
424 366 parser.add_argument('--pub', type=int, metavar='PORT', default=0,
425 367 help='set the PUB channel port [default: random]')
426 368 parser.add_argument('--req', type=int, metavar='PORT', default=0,
427 369 help='set the REQ channel port [default: random]')
428 370 if sys.platform == 'win32':
429 371 parser.add_argument('--parent', type=int, metavar='HANDLE',
430 372 default=0, help='kill this process if the process '
431 373 'with HANDLE dies')
432 374 else:
433 375 parser.add_argument('--parent', action='store_true',
434 376 help='kill this process if its parent dies')
435 377 namespace = parser.parse_args()
436 378
437 379 # Create a context, a session, and the kernel sockets.
438 380 print >>sys.__stdout__, "Starting the kernel..."
439 381 context = zmq.Context()
440 382 session = Session(username=u'kernel')
441 383
442 384 reply_socket = context.socket(zmq.XREP)
443 385 xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep)
444 386 print >>sys.__stdout__, "XREP Channel on port", xrep_port
445 387
446 388 pub_socket = context.socket(zmq.PUB)
447 389 pub_port = bind_port(pub_socket, namespace.ip, namespace.pub)
448 390 print >>sys.__stdout__, "PUB Channel on port", pub_port
449 391
450 392 req_socket = context.socket(zmq.XREQ)
451 393 req_port = bind_port(req_socket, namespace.ip, namespace.req)
452 394 print >>sys.__stdout__, "REQ Channel on port", req_port
453 395
454 396 # Redirect input streams and set a display hook.
455 sys.stdin = InStream(session, req_socket)
456 397 sys.stdout = OutStream(session, pub_socket, u'stdout')
457 398 sys.stderr = OutStream(session, pub_socket, u'stderr')
458 399 sys.displayhook = DisplayHook(session, pub_socket)
459 400
460 401 # Create the kernel.
461 kernel = Kernel(session, reply_socket, pub_socket)
402 kernel = Kernel(session, reply_socket, pub_socket, req_socket)
462 403
463 404 # Configure this kernel/process to die on parent termination, if necessary.
464 405 if namespace.parent:
465 406 if sys.platform == 'win32':
466 407 poller = ExitPollerWindows(namespace.parent)
467 408 else:
468 409 poller = ExitPollerUnix()
469 410 poller.start()
470 411
471 412 # Start the kernel mainloop.
472 413 kernel.start()
473 414
474 415
475 416 def launch_kernel(xrep_port=0, pub_port=0, req_port=0, independent=False):
476 417 """ Launches a localhost kernel, binding to the specified ports.
477 418
478 419 Parameters
479 420 ----------
480 421 xrep_port : int, optional
481 422 The port to use for XREP channel.
482 423
483 424 pub_port : int, optional
484 425 The port to use for the SUB channel.
485 426
486 427 req_port : int, optional
487 428 The port to use for the REQ (raw input) channel.
488 429
489 430 independent : bool, optional (default False)
490 431 If set, the kernel process is guaranteed to survive if this process
491 432 dies. If not set, an effort is made to ensure that the kernel is killed
492 433 when this process dies. Note that in this case it is still good practice
493 434 to kill kernels manually before exiting.
494 435
495 436 Returns
496 437 -------
497 438 A tuple of form:
498 439 (kernel_process, xrep_port, pub_port, req_port)
499 440 where kernel_process is a Popen object and the ports are integers.
500 441 """
501 442 import socket
502 443 from subprocess import Popen
503 444
504 445 # Find open ports as necessary.
505 446 ports = []
506 447 ports_needed = int(xrep_port <= 0) + int(pub_port <= 0) + int(req_port <= 0)
507 448 for i in xrange(ports_needed):
508 449 sock = socket.socket()
509 450 sock.bind(('', 0))
510 451 ports.append(sock)
511 452 for i, sock in enumerate(ports):
512 453 port = sock.getsockname()[1]
513 454 sock.close()
514 455 ports[i] = port
515 456 if xrep_port <= 0:
516 457 xrep_port = ports.pop(0)
517 458 if pub_port <= 0:
518 459 pub_port = ports.pop(0)
519 460 if req_port <= 0:
520 461 req_port = ports.pop(0)
521 462
522 463 # Spawn a kernel.
523 464 command = 'from IPython.zmq.kernel import main; main()'
524 465 arguments = [ sys.executable, '-c', command, '--xrep', str(xrep_port),
525 466 '--pub', str(pub_port), '--req', str(req_port) ]
526 467 if independent:
527 468 if sys.platform == 'win32':
528 469 proc = Popen(['start', '/b'] + arguments, shell=True)
529 470 else:
530 471 proc = Popen(arguments, preexec_fn=lambda: os.setsid())
531 472 else:
532 473 if sys.platform == 'win32':
533 474 from _subprocess import DuplicateHandle, GetCurrentProcess, \
534 475 DUPLICATE_SAME_ACCESS
535 476 pid = GetCurrentProcess()
536 477 handle = DuplicateHandle(pid, pid, pid, 0,
537 478 True, # Inheritable by new processes.
538 479 DUPLICATE_SAME_ACCESS)
539 480 proc = Popen(arguments + ['--parent', str(int(handle))])
540 481 else:
541 482 proc = Popen(arguments + ['--parent'])
542 483
543 484 return proc, xrep_port, pub_port, req_port
544 485
545 486
546 487 if __name__ == '__main__':
547 488 main()
@@ -1,594 +1,587
1 1 """Classes to manage the interaction with a running kernel.
2 2
3 3 Todo
4 4 ====
5 5
6 6 * Create logger to handle debugging and console messages.
7 7 """
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Copyright (C) 2008-2010 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-----------------------------------------------------------------------------
15 15
16 16 #-----------------------------------------------------------------------------
17 17 # Imports
18 18 #-----------------------------------------------------------------------------
19 19
20 20 # Standard library imports.
21 21 from Queue import Queue, Empty
22 22 from subprocess import Popen
23 23 from threading import Thread
24 24 import time
25 25
26 26 # System library imports.
27 27 import zmq
28 28 from zmq import POLLIN, POLLOUT, POLLERR
29 29 from zmq.eventloop import ioloop
30 30
31 31 # Local imports.
32 32 from IPython.utils.traitlets import HasTraits, Any, Instance, Type
33 33 from kernel import launch_kernel
34 34 from session import Session
35 35
36 36 #-----------------------------------------------------------------------------
37 37 # Constants and exceptions
38 38 #-----------------------------------------------------------------------------
39 39
40 40 LOCALHOST = '127.0.0.1'
41 41
42 42 class InvalidPortNumber(Exception):
43 43 pass
44 44
45 45 #-----------------------------------------------------------------------------
46 46 # ZMQ Socket Channel classes
47 47 #-----------------------------------------------------------------------------
48 48
49 49 class ZmqSocketChannel(Thread):
50 50 """The base class for the channels that use ZMQ sockets.
51 51 """
52 52 context = None
53 53 session = None
54 54 socket = None
55 55 ioloop = None
56 56 iostate = None
57 57 _address = None
58 58
59 59 def __init__(self, context, session, address):
60 60 """Create a channel
61 61
62 62 Parameters
63 63 ----------
64 64 context : zmq.Context
65 65 The ZMQ context to use.
66 66 session : session.Session
67 67 The session to use.
68 68 address : tuple
69 69 Standard (ip, port) tuple that the kernel is listening on.
70 70 """
71 71 super(ZmqSocketChannel, self).__init__()
72 72 self.daemon = True
73 73
74 74 self.context = context
75 75 self.session = session
76 76 if address[1] == 0:
77 77 message = 'The port number for a channel cannot be 0.'
78 78 raise InvalidPortNumber(message)
79 79 self._address = address
80 80
81 81 def stop(self):
82 82 """Stop the channel's activity.
83 83
84 84 This calls :method:`Thread.join` and returns when the thread
85 85 terminates. :class:`RuntimeError` will be raised if
86 86 :method:`self.start` is called again.
87 87 """
88 88 self.join()
89 89
90 90 @property
91 91 def address(self):
92 92 """Get the channel's address as an (ip, port) tuple.
93 93
94 94 By the default, the address is (localhost, 0), where 0 means a random
95 95 port.
96 96 """
97 97 return self._address
98 98
99 99 def add_io_state(self, state):
100 100 """Add IO state to the eventloop.
101 101
102 102 Parameters
103 103 ----------
104 104 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
105 105 The IO state flag to set.
106 106
107 107 This is thread safe as it uses the thread safe IOLoop.add_callback.
108 108 """
109 109 def add_io_state_callback():
110 110 if not self.iostate & state:
111 111 self.iostate = self.iostate | state
112 112 self.ioloop.update_handler(self.socket, self.iostate)
113 113 self.ioloop.add_callback(add_io_state_callback)
114 114
115 115 def drop_io_state(self, state):
116 116 """Drop IO state from the eventloop.
117 117
118 118 Parameters
119 119 ----------
120 120 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
121 121 The IO state flag to set.
122 122
123 123 This is thread safe as it uses the thread safe IOLoop.add_callback.
124 124 """
125 125 def drop_io_state_callback():
126 126 if self.iostate & state:
127 127 self.iostate = self.iostate & (~state)
128 128 self.ioloop.update_handler(self.socket, self.iostate)
129 129 self.ioloop.add_callback(drop_io_state_callback)
130 130
131 131
132 132 class XReqSocketChannel(ZmqSocketChannel):
133 133 """The XREQ channel for issues request/replies to the kernel.
134 134 """
135 135
136 136 command_queue = None
137 137
138 138 def __init__(self, context, session, address):
139 139 self.command_queue = Queue()
140 140 super(XReqSocketChannel, self).__init__(context, session, address)
141 141
142 142 def run(self):
143 143 """The thread's main activity. Call start() instead."""
144 144 self.socket = self.context.socket(zmq.XREQ)
145 145 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
146 146 self.socket.connect('tcp://%s:%i' % self.address)
147 147 self.ioloop = ioloop.IOLoop()
148 148 self.iostate = POLLERR|POLLIN
149 149 self.ioloop.add_handler(self.socket, self._handle_events,
150 150 self.iostate)
151 151 self.ioloop.start()
152 152
153 153 def stop(self):
154 154 self.ioloop.stop()
155 155 super(XReqSocketChannel, self).stop()
156 156
157 157 def call_handlers(self, msg):
158 158 """This method is called in the ioloop thread when a message arrives.
159 159
160 160 Subclasses should override this method to handle incoming messages.
161 161 It is important to remember that this method is called in the thread
162 162 so that some logic must be done to ensure that the application leve
163 163 handlers are called in the application thread.
164 164 """
165 165 raise NotImplementedError('call_handlers must be defined in a subclass.')
166 166
167 167 def execute(self, code):
168 168 """Execute code in the kernel.
169 169
170 170 Parameters
171 171 ----------
172 172 code : str
173 173 A string of Python code.
174 174
175 175 Returns
176 176 -------
177 177 The msg_id of the message sent.
178 178 """
179 179 # Create class for content/msg creation. Related to, but possibly
180 180 # not in Session.
181 181 content = dict(code=code)
182 182 msg = self.session.msg('execute_request', content)
183 183 self._queue_request(msg)
184 184 return msg['header']['msg_id']
185 185
186 186 def complete(self, text, line, block=None):
187 187 """Tab complete text, line, block in the kernel's namespace.
188 188
189 189 Parameters
190 190 ----------
191 191 text : str
192 192 The text to complete.
193 193 line : str
194 194 The full line of text that is the surrounding context for the
195 195 text to complete.
196 196 block : str
197 197 The full block of code in which the completion is being requested.
198 198
199 199 Returns
200 200 -------
201 201 The msg_id of the message sent.
202 202 """
203 203 content = dict(text=text, line=line)
204 204 msg = self.session.msg('complete_request', content)
205 205 self._queue_request(msg)
206 206 return msg['header']['msg_id']
207 207
208 208 def object_info(self, oname):
209 209 """Get metadata information about an object.
210 210
211 211 Parameters
212 212 ----------
213 213 oname : str
214 214 A string specifying the object name.
215 215
216 216 Returns
217 217 -------
218 218 The msg_id of the message sent.
219 219 """
220 220 content = dict(oname=oname)
221 221 msg = self.session.msg('object_info_request', content)
222 222 self._queue_request(msg)
223 223 return msg['header']['msg_id']
224 224
225 225 def _handle_events(self, socket, events):
226 226 if events & POLLERR:
227 227 self._handle_err()
228 228 if events & POLLOUT:
229 229 self._handle_send()
230 230 if events & POLLIN:
231 231 self._handle_recv()
232 232
233 233 def _handle_recv(self):
234 234 msg = self.socket.recv_json()
235 235 self.call_handlers(msg)
236 236
237 237 def _handle_send(self):
238 238 try:
239 239 msg = self.command_queue.get(False)
240 240 except Empty:
241 241 pass
242 242 else:
243 243 self.socket.send_json(msg)
244 244 if self.command_queue.empty():
245 245 self.drop_io_state(POLLOUT)
246 246
247 247 def _handle_err(self):
248 248 # We don't want to let this go silently, so eventually we should log.
249 249 raise zmq.ZMQError()
250 250
251 251 def _queue_request(self, msg):
252 252 self.command_queue.put(msg)
253 253 self.add_io_state(POLLOUT)
254 254
255 255
256 256 class SubSocketChannel(ZmqSocketChannel):
257 257 """The SUB channel which listens for messages that the kernel publishes.
258 258 """
259 259
260 260 def __init__(self, context, session, address):
261 261 super(SubSocketChannel, self).__init__(context, session, address)
262 262
263 263 def run(self):
264 264 """The thread's main activity. Call start() instead."""
265 265 self.socket = self.context.socket(zmq.SUB)
266 266 self.socket.setsockopt(zmq.SUBSCRIBE,'')
267 267 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
268 268 self.socket.connect('tcp://%s:%i' % self.address)
269 269 self.ioloop = ioloop.IOLoop()
270 270 self.iostate = POLLIN|POLLERR
271 271 self.ioloop.add_handler(self.socket, self._handle_events,
272 272 self.iostate)
273 273 self.ioloop.start()
274 274
275 275 def stop(self):
276 276 self.ioloop.stop()
277 277 super(SubSocketChannel, self).stop()
278 278
279 279 def call_handlers(self, msg):
280 280 """This method is called in the ioloop thread when a message arrives.
281 281
282 282 Subclasses should override this method to handle incoming messages.
283 283 It is important to remember that this method is called in the thread
284 284 so that some logic must be done to ensure that the application leve
285 285 handlers are called in the application thread.
286 286 """
287 287 raise NotImplementedError('call_handlers must be defined in a subclass.')
288 288
289 289 def flush(self, timeout=1.0):
290 290 """Immediately processes all pending messages on the SUB channel.
291 291
292 292 This method is thread safe.
293 293
294 294 Parameters
295 295 ----------
296 296 timeout : float, optional
297 297 The maximum amount of time to spend flushing, in seconds. The
298 298 default is one second.
299 299 """
300 300 # We do the IOLoop callback process twice to ensure that the IOLoop
301 301 # gets to perform at least one full poll.
302 302 stop_time = time.time() + timeout
303 303 for i in xrange(2):
304 304 self._flushed = False
305 305 self.ioloop.add_callback(self._flush)
306 306 while not self._flushed and time.time() < stop_time:
307 307 time.sleep(0.01)
308 308
309 309 def _handle_events(self, socket, events):
310 310 # Turn on and off POLLOUT depending on if we have made a request
311 311 if events & POLLERR:
312 312 self._handle_err()
313 313 if events & POLLIN:
314 314 self._handle_recv()
315 315
316 316 def _handle_err(self):
317 317 # We don't want to let this go silently, so eventually we should log.
318 318 raise zmq.ZMQError()
319 319
320 320 def _handle_recv(self):
321 321 # Get all of the messages we can
322 322 while True:
323 323 try:
324 324 msg = self.socket.recv_json(zmq.NOBLOCK)
325 325 except zmq.ZMQError:
326 326 # Check the errno?
327 327 # Will this tigger POLLERR?
328 328 break
329 329 else:
330 330 self.call_handlers(msg)
331 331
332 332 def _flush(self):
333 333 """Callback for :method:`self.flush`."""
334 334 self._flushed = True
335 335
336 336
337 337 class RepSocketChannel(ZmqSocketChannel):
338 338 """A reply channel to handle raw_input requests that the kernel makes."""
339 339
340 340 msg_queue = None
341 341
342 342 def __init__(self, context, session, address):
343 343 self.msg_queue = Queue()
344 344 super(RepSocketChannel, self).__init__(context, session, address)
345 345
346 346 def run(self):
347 347 """The thread's main activity. Call start() instead."""
348 348 self.socket = self.context.socket(zmq.XREQ)
349 349 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
350 350 self.socket.connect('tcp://%s:%i' % self.address)
351 351 self.ioloop = ioloop.IOLoop()
352 352 self.iostate = POLLERR|POLLIN
353 353 self.ioloop.add_handler(self.socket, self._handle_events,
354 354 self.iostate)
355 355 self.ioloop.start()
356 356
357 357 def stop(self):
358 358 self.ioloop.stop()
359 359 super(RepSocketChannel, self).stop()
360 360
361 361 def call_handlers(self, msg):
362 362 """This method is called in the ioloop thread when a message arrives.
363 363
364 364 Subclasses should override this method to handle incoming messages.
365 365 It is important to remember that this method is called in the thread
366 366 so that some logic must be done to ensure that the application leve
367 367 handlers are called in the application thread.
368 368 """
369 369 raise NotImplementedError('call_handlers must be defined in a subclass.')
370 370
371 def readline(self, line):
372 """A send a line of raw input to the kernel.
373
374 Parameters
375 ----------
376 line : str
377 The line of the input.
378 """
379 content = dict(line=line)
380 msg = self.session.msg('readline_reply', content)
371 def input(self, string):
372 """Send a string of raw input to the kernel."""
373 content = dict(value=string)
374 msg = self.session.msg('input_reply', content)
381 375 self._queue_reply(msg)
382 376
383 377 def _handle_events(self, socket, events):
384 378 if events & POLLERR:
385 379 self._handle_err()
386 380 if events & POLLOUT:
387 381 self._handle_send()
388 382 if events & POLLIN:
389 383 self._handle_recv()
390 384
391 385 def _handle_recv(self):
392 386 msg = self.socket.recv_json()
393 387 self.call_handlers(msg)
394 388
395 389 def _handle_send(self):
396 390 try:
397 391 msg = self.msg_queue.get(False)
398 392 except Empty:
399 393 pass
400 394 else:
401 395 self.socket.send_json(msg)
402 396 if self.msg_queue.empty():
403 397 self.drop_io_state(POLLOUT)
404 398
405 399 def _handle_err(self):
406 400 # We don't want to let this go silently, so eventually we should log.
407 401 raise zmq.ZMQError()
408 402
409 403 def _queue_reply(self, msg):
410 404 self.msg_queue.put(msg)
411 405 self.add_io_state(POLLOUT)
412 406
413 407
414 408 #-----------------------------------------------------------------------------
415 409 # Main kernel manager class
416 410 #-----------------------------------------------------------------------------
417 411
418 412 class KernelManager(HasTraits):
419 413 """ Manages a kernel for a frontend.
420 414
421 415 The SUB channel is for the frontend to receive messages published by the
422 416 kernel.
423 417
424 418 The REQ channel is for the frontend to make requests of the kernel.
425 419
426 420 The REP channel is for the kernel to request stdin (raw_input) from the
427 421 frontend.
428 422 """
429 423 # The PyZMQ Context to use for communication with the kernel.
430 424 context = Instance(zmq.Context)
431 425
432 426 # The Session to use for communication with the kernel.
433 427 session = Instance(Session)
434 428
429 # The kernel process with which the KernelManager is communicating.
430 kernel = Instance(Popen)
431
435 432 # The classes to use for the various channels.
436 433 xreq_channel_class = Type(XReqSocketChannel)
437 434 sub_channel_class = Type(SubSocketChannel)
438 435 rep_channel_class = Type(RepSocketChannel)
439 436
440 437 # Protected traits.
441 _kernel = Instance(Popen)
442 438 _xreq_address = Any
443 439 _sub_address = Any
444 440 _rep_address = Any
445 441 _xreq_channel = Any
446 442 _sub_channel = Any
447 443 _rep_channel = Any
448 444
449 445 def __init__(self, xreq_address=None, sub_address=None, rep_address=None,
450 446 context=None, session=None):
451 447 super(KernelManager, self).__init__()
452 448 self._xreq_address = (LOCALHOST, 0) if xreq_address is None else xreq_address
453 449 self._sub_address = (LOCALHOST, 0) if sub_address is None else sub_address
454 450 self._rep_address = (LOCALHOST, 0) if rep_address is None else rep_address
455 451 self.context = zmq.Context() if context is None else context
456 452 self.session = Session() if session is None else session
457 453
458 454 #--------------------------------------------------------------------------
459 455 # Channel management methods:
460 456 #--------------------------------------------------------------------------
461 457
462 458 def start_channels(self):
463 459 """Starts the channels for this kernel.
464 460
465 461 This will create the channels if they do not exist and then start
466 462 them. If port numbers of 0 are being used (random ports) then you
467 463 must first call :method:`start_kernel`. If the channels have been
468 464 stopped and you call this, :class:`RuntimeError` will be raised.
469 465 """
470 466 self.xreq_channel.start()
471 467 self.sub_channel.start()
472 468 self.rep_channel.start()
473 469
474 470 def stop_channels(self):
475 471 """Stops the channels for this kernel.
476 472
477 473 This stops the channels by joining their threads. If the channels
478 474 were not started, :class:`RuntimeError` will be raised.
479 475 """
480 476 self.xreq_channel.stop()
481 477 self.sub_channel.stop()
482 478 self.rep_channel.stop()
483 479
484 480 @property
485 481 def channels_running(self):
486 482 """Are all of the channels created and running?"""
487 483 return self.xreq_channel.is_alive() \
488 484 and self.sub_channel.is_alive() \
489 485 and self.rep_channel.is_alive()
490 486
491 487 #--------------------------------------------------------------------------
492 488 # Kernel process management methods:
493 489 #--------------------------------------------------------------------------
494 490
495 491 def start_kernel(self):
496 492 """Starts a kernel process and configures the manager to use it.
497 493
498 494 If random ports (port=0) are being used, this method must be called
499 495 before the channels are created.
500 496 """
501 497 xreq, sub, rep = self.xreq_address, self.sub_address, self.rep_address
502 498 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or rep[0] != LOCALHOST:
503 499 raise RuntimeError("Can only launch a kernel on localhost."
504 500 "Make sure that the '*_address' attributes are "
505 501 "configured properly.")
506 502
507 kernel, xrep, pub, req = launch_kernel(
503 self.kernel, xrep, pub, req = launch_kernel(
508 504 xrep_port=xreq[1], pub_port=sub[1], req_port=rep[1])
509 self._kernel = kernel
510 505 self._xreq_address = (LOCALHOST, xrep)
511 506 self._sub_address = (LOCALHOST, pub)
512 507 self._rep_address = (LOCALHOST, req)
513 508
514 509 @property
515 510 def has_kernel(self):
516 511 """Returns whether a kernel process has been specified for the kernel
517 512 manager.
518
519 A kernel process can be set via 'start_kernel' or 'set_kernel'.
520 513 """
521 return self._kernel is not None
514 return self.kernel is not None
522 515
523 516 def kill_kernel(self):
524 517 """ Kill the running kernel. """
525 if self._kernel is not None:
526 self._kernel.kill()
527 self._kernel = None
518 if self.kernel is not None:
519 self.kernel.kill()
520 self.kernel = None
528 521 else:
529 522 raise RuntimeError("Cannot kill kernel. No kernel is running!")
530 523
531 524 def signal_kernel(self, signum):
532 525 """ Sends a signal to the kernel. """
533 if self._kernel is not None:
534 self._kernel.send_signal(signum)
526 if self.kernel is not None:
527 self.kernel.send_signal(signum)
535 528 else:
536 529 raise RuntimeError("Cannot signal kernel. No kernel is running!")
537 530
538 531 @property
539 532 def is_alive(self):
540 533 """Is the kernel process still running?"""
541 if self._kernel is not None:
542 if self._kernel.poll() is None:
534 if self.kernel is not None:
535 if self.kernel.poll() is None:
543 536 return True
544 537 else:
545 538 return False
546 539 else:
547 540 # We didn't start the kernel with this KernelManager so we don't
548 541 # know if it is running. We should use a heartbeat for this case.
549 542 return True
550 543
551 544 #--------------------------------------------------------------------------
552 545 # Channels used for communication with the kernel:
553 546 #--------------------------------------------------------------------------
554 547
555 548 @property
556 549 def xreq_channel(self):
557 550 """Get the REQ socket channel object to make requests of the kernel."""
558 551 if self._xreq_channel is None:
559 552 self._xreq_channel = self.xreq_channel_class(self.context,
560 553 self.session,
561 554 self.xreq_address)
562 555 return self._xreq_channel
563 556
564 557 @property
565 558 def sub_channel(self):
566 559 """Get the SUB socket channel object."""
567 560 if self._sub_channel is None:
568 561 self._sub_channel = self.sub_channel_class(self.context,
569 562 self.session,
570 563 self.sub_address)
571 564 return self._sub_channel
572 565
573 566 @property
574 567 def rep_channel(self):
575 568 """Get the REP socket channel object to handle stdin (raw_input)."""
576 569 if self._rep_channel is None:
577 570 self._rep_channel = self.rep_channel_class(self.context,
578 571 self.session,
579 572 self.rep_address)
580 573 return self._rep_channel
581 574
582 575 @property
583 576 def xreq_address(self):
584 577 return self._xreq_address
585 578
586 579 @property
587 580 def sub_address(self):
588 581 return self._sub_address
589 582
590 583 @property
591 584 def rep_address(self):
592 585 return self._rep_address
593 586
594 587
General Comments 0
You need to be logged in to leave comments. Login now