##// END OF EJS Templates
Implemented kernel interrupts for Windows.
epatters -
Show More
@@ -0,0 +1,125 b''
1 # Standard library imports.
2 import ctypes
3 import os
4 import time
5 from thread import interrupt_main
6 from threading import Thread
7
8 # Local imports.
9 from IPython.utils.io import raw_print
10
11
12 class ParentPollerUnix(Thread):
13 """ A Unix-specific daemon thread that terminates the program immediately
14 when the parent process no longer exists.
15 """
16
17 def __init__(self):
18 super(ParentPollerUnix, self).__init__()
19 self.daemon = True
20
21 def run(self):
22 # We cannot use os.waitpid because it works only for child processes.
23 from errno import EINTR
24 while True:
25 try:
26 if os.getppid() == 1:
27 raw_print('Killed by parent poller!')
28 os._exit(1)
29 time.sleep(1.0)
30 except OSError, e:
31 if e.errno == EINTR:
32 continue
33 raise
34
35
36 class ParentPollerWindows(Thread):
37 """ A Windows-specific daemon thread that listens for a special event that
38 signals an interrupt and, optionally, terminates the program immediately
39 when the parent process no longer exists.
40 """
41
42 def __init__(self, interrupt_handle=None, parent_handle=None):
43 """ Create the poller. At least one of the optional parameters must be
44 provided.
45
46 Parameters:
47 -----------
48 interrupt_handle : HANDLE (int), optional
49 If provided, the program will generate a Ctrl+C event when this
50 handle is signaled.
51
52 parent_handle : HANDLE (int), optional
53 If provided, the program will terminate immediately when this
54 handle is signaled.
55 """
56 assert(interrupt_handle or parent_handle)
57 super(ParentPollerWindows, self).__init__()
58 self.daemon = True
59 self.interrupt_handle = interrupt_handle
60 self.parent_handle = parent_handle
61
62 @staticmethod
63 def create_interrupt_event():
64 """ Create an interrupt event handle.
65
66 The parent process should use this static method for creating the
67 interrupt event that is passed to the child process. It should store
68 this handle and use it with ``send_interrupt`` to interrupt the child
69 process.
70 """
71 # Create a security attributes struct that permits inheritance of the
72 # handle by new processes.
73 # FIXME: We can clean up this mess by requiring pywin32 for IPython.
74 class SECURITY_ATTRIBUTES(ctypes.Structure):
75 _fields_ = [ ("nLength", ctypes.c_int),
76 ("lpSecurityDescriptor", ctypes.c_void_p),
77 ("bInheritHandle", ctypes.c_int) ]
78 sa = SECURITY_ATTRIBUTES()
79 sa_p = ctypes.pointer(sa)
80 sa.nLength = ctypes.sizeof(SECURITY_ATTRIBUTES)
81 sa.lpSecurityDescriptor = 0
82 sa.bInheritHandle = 1
83
84 return ctypes.windll.kernel32.CreateEventA(
85 sa_p, # lpEventAttributes
86 False, # bManualReset
87 False, # bInitialState
88 '') # lpName
89
90 @staticmethod
91 def send_interrupt(interrupt_handle):
92 """ Sends an interrupt event using the specified handle.
93 """
94 ctypes.windll.kernel32.SetEvent(interrupt_handle)
95
96 def run(self):
97 """ Run the poll loop. This method never returns.
98 """
99 from _subprocess import WAIT_OBJECT_0, INFINITE
100
101 # Build the list of handle to listen on.
102 handles = []
103 if self.interrupt_handle:
104 handles.append(self.interrupt_handle)
105 if self.parent_handle:
106 handles.append(self.parent_handle)
107
108 # Listen forever.
109 while True:
110 result = ctypes.windll.kernel32.WaitForMultipleObjects(
111 len(handles), # nCount
112 (ctypes.c_int * len(handles))(*handles), # lpHandles
113 False, # bWaitAll
114 INFINITE) # dwMilliseconds
115
116 if WAIT_OBJECT_0 <= result < len(handles):
117 handle = handles[result - WAIT_OBJECT_0]
118
119 if handle == self.interrupt_handle:
120 raw_print('Interrupted by parent poller!')
121 interrupt_main()
122
123 elif handle == self.parent_handle:
124 raw_print('Killed by parent poller!')
125 os._exit(1)
@@ -1,546 +1,545 b''
1 1 from __future__ import print_function
2 2
3 3 # Standard library imports
4 4 from collections import namedtuple
5 import signal
6 5 import sys
7 6
8 7 # System library imports
9 8 from pygments.lexers import PythonLexer
10 9 from PyQt4 import QtCore, QtGui
11 10
12 11 # Local imports
13 12 from IPython.core.inputsplitter import InputSplitter, transform_classic_prompt
14 13 from IPython.frontend.qt.base_frontend_mixin import BaseFrontendMixin
15 14 from IPython.utils.traitlets import Bool
16 15 from bracket_matcher import BracketMatcher
17 16 from call_tip_widget import CallTipWidget
18 17 from completion_lexer import CompletionLexer
19 18 from history_console_widget import HistoryConsoleWidget
20 19 from pygments_highlighter import PygmentsHighlighter
21 20
22 21
23 22 class FrontendHighlighter(PygmentsHighlighter):
24 23 """ A PygmentsHighlighter that can be turned on and off and that ignores
25 24 prompts.
26 25 """
27 26
28 27 def __init__(self, frontend):
29 28 super(FrontendHighlighter, self).__init__(frontend._control.document())
30 29 self._current_offset = 0
31 30 self._frontend = frontend
32 31 self.highlighting_on = False
33 32
34 33 def highlightBlock(self, qstring):
35 34 """ Highlight a block of text. Reimplemented to highlight selectively.
36 35 """
37 36 if not self.highlighting_on:
38 37 return
39 38
40 39 # The input to this function is unicode string that may contain
41 40 # paragraph break characters, non-breaking spaces, etc. Here we acquire
42 41 # the string as plain text so we can compare it.
43 42 current_block = self.currentBlock()
44 43 string = self._frontend._get_block_plain_text(current_block)
45 44
46 45 # Decide whether to check for the regular or continuation prompt.
47 46 if current_block.contains(self._frontend._prompt_pos):
48 47 prompt = self._frontend._prompt
49 48 else:
50 49 prompt = self._frontend._continuation_prompt
51 50
52 51 # Don't highlight the part of the string that contains the prompt.
53 52 if string.startswith(prompt):
54 53 self._current_offset = len(prompt)
55 54 qstring.remove(0, len(prompt))
56 55 else:
57 56 self._current_offset = 0
58 57
59 58 PygmentsHighlighter.highlightBlock(self, qstring)
60 59
61 60 def rehighlightBlock(self, block):
62 61 """ Reimplemented to temporarily enable highlighting if disabled.
63 62 """
64 63 old = self.highlighting_on
65 64 self.highlighting_on = True
66 65 super(FrontendHighlighter, self).rehighlightBlock(block)
67 66 self.highlighting_on = old
68 67
69 68 def setFormat(self, start, count, format):
70 69 """ Reimplemented to highlight selectively.
71 70 """
72 71 start += self._current_offset
73 72 PygmentsHighlighter.setFormat(self, start, count, format)
74 73
75 74
76 75 class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin):
77 76 """ A Qt frontend for a generic Python kernel.
78 77 """
79 78
80 79 # An option and corresponding signal for overriding the default kernel
81 80 # interrupt behavior.
82 81 custom_interrupt = Bool(False)
83 82 custom_interrupt_requested = QtCore.pyqtSignal()
84 83
85 84 # An option and corresponding signals for overriding the default kernel
86 85 # restart behavior.
87 86 custom_restart = Bool(False)
88 87 custom_restart_kernel_died = QtCore.pyqtSignal(float)
89 88 custom_restart_requested = QtCore.pyqtSignal()
90 89
91 90 # Emitted when an 'execute_reply' has been received from the kernel and
92 91 # processed by the FrontendWidget.
93 92 executed = QtCore.pyqtSignal(object)
94 93
95 94 # Emitted when an exit request has been received from the kernel.
96 95 exit_requested = QtCore.pyqtSignal()
97 96
98 97 # Protected class variables.
99 98 _CallTipRequest = namedtuple('_CallTipRequest', ['id', 'pos'])
100 99 _CompletionRequest = namedtuple('_CompletionRequest', ['id', 'pos'])
101 100 _ExecutionRequest = namedtuple('_ExecutionRequest', ['id', 'kind'])
102 101 _input_splitter_class = InputSplitter
103 102
104 103 #---------------------------------------------------------------------------
105 104 # 'object' interface
106 105 #---------------------------------------------------------------------------
107 106
108 107 def __init__(self, *args, **kw):
109 108 super(FrontendWidget, self).__init__(*args, **kw)
110 109
111 110 # FrontendWidget protected variables.
112 111 self._bracket_matcher = BracketMatcher(self._control)
113 112 self._call_tip_widget = CallTipWidget(self._control)
114 113 self._completion_lexer = CompletionLexer(PythonLexer())
115 114 self._copy_raw_action = QtGui.QAction('Copy (Raw Text)', None)
116 115 self._hidden = False
117 116 self._highlighter = FrontendHighlighter(self)
118 117 self._input_splitter = self._input_splitter_class(input_mode='cell')
119 118 self._kernel_manager = None
120 119 self._possible_kernel_restart = False
121 120 self._request_info = {}
122 121
123 122 # Configure the ConsoleWidget.
124 123 self.tab_width = 4
125 124 self._set_continuation_prompt('... ')
126 125
127 126 # Configure actions.
128 127 action = self._copy_raw_action
129 128 key = QtCore.Qt.CTRL | QtCore.Qt.SHIFT | QtCore.Qt.Key_C
130 129 action.setEnabled(False)
131 130 action.setShortcut(QtGui.QKeySequence(key))
132 131 action.setShortcutContext(QtCore.Qt.WidgetWithChildrenShortcut)
133 132 action.triggered.connect(self.copy_raw)
134 133 self.copy_available.connect(action.setEnabled)
135 134 self.addAction(action)
136 135
137 136 # Connect signal handlers.
138 137 document = self._control.document()
139 138 document.contentsChange.connect(self._document_contents_change)
140 139
141 140 #---------------------------------------------------------------------------
142 141 # 'ConsoleWidget' public interface
143 142 #---------------------------------------------------------------------------
144 143
145 144 def copy(self):
146 145 """ Copy the currently selected text to the clipboard, removing prompts.
147 146 """
148 147 text = str(self._control.textCursor().selection().toPlainText())
149 148 if text:
150 149 lines = map(transform_classic_prompt, text.splitlines())
151 150 text = '\n'.join(lines)
152 151 QtGui.QApplication.clipboard().setText(text)
153 152
154 153 #---------------------------------------------------------------------------
155 154 # 'ConsoleWidget' abstract interface
156 155 #---------------------------------------------------------------------------
157 156
158 157 def _is_complete(self, source, interactive):
159 158 """ Returns whether 'source' can be completely processed and a new
160 159 prompt created. When triggered by an Enter/Return key press,
161 160 'interactive' is True; otherwise, it is False.
162 161 """
163 162 complete = self._input_splitter.push(source)
164 163 if interactive:
165 164 complete = not self._input_splitter.push_accepts_more()
166 165 return complete
167 166
168 167 def _execute(self, source, hidden):
169 168 """ Execute 'source'. If 'hidden', do not show any output.
170 169
171 170 See parent class :meth:`execute` docstring for full details.
172 171 """
173 172 msg_id = self.kernel_manager.xreq_channel.execute(source, hidden)
174 173 self._request_info['execute'] = self._ExecutionRequest(msg_id, 'user')
175 174 self._hidden = hidden
176 175
177 176 def _prompt_started_hook(self):
178 177 """ Called immediately after a new prompt is displayed.
179 178 """
180 179 if not self._reading:
181 180 self._highlighter.highlighting_on = True
182 181
183 182 def _prompt_finished_hook(self):
184 183 """ Called immediately after a prompt is finished, i.e. when some input
185 184 will be processed and a new prompt displayed.
186 185 """
187 186 if not self._reading:
188 187 self._highlighter.highlighting_on = False
189 188
190 189 def _tab_pressed(self):
191 190 """ Called when the tab key is pressed. Returns whether to continue
192 191 processing the event.
193 192 """
194 193 # Perform tab completion if:
195 194 # 1) The cursor is in the input buffer.
196 195 # 2) There is a non-whitespace character before the cursor.
197 196 text = self._get_input_buffer_cursor_line()
198 197 if text is None:
199 198 return False
200 199 complete = bool(text[:self._get_input_buffer_cursor_column()].strip())
201 200 if complete:
202 201 self._complete()
203 202 return not complete
204 203
205 204 #---------------------------------------------------------------------------
206 205 # 'ConsoleWidget' protected interface
207 206 #---------------------------------------------------------------------------
208 207
209 208 def _context_menu_make(self, pos):
210 209 """ Reimplemented to add an action for raw copy.
211 210 """
212 211 menu = super(FrontendWidget, self)._context_menu_make(pos)
213 212 for before_action in menu.actions():
214 213 if before_action.shortcut().matches(QtGui.QKeySequence.Paste) == \
215 214 QtGui.QKeySequence.ExactMatch:
216 215 menu.insertAction(before_action, self._copy_raw_action)
217 216 break
218 217 return menu
219 218
220 219 def _event_filter_console_keypress(self, event):
221 220 """ Reimplemented for execution interruption and smart backspace.
222 221 """
223 222 key = event.key()
224 223 if self._control_key_down(event.modifiers(), include_command=False):
225 224
226 225 if key == QtCore.Qt.Key_C and self._executing:
227 226 self.interrupt_kernel()
228 227 return True
229 228
230 229 elif key == QtCore.Qt.Key_Period:
231 230 message = 'Are you sure you want to restart the kernel?'
232 231 self.restart_kernel(message, instant_death=False)
233 232 return True
234 233
235 234 elif not event.modifiers() & QtCore.Qt.AltModifier:
236 235
237 236 # Smart backspace: remove four characters in one backspace if:
238 237 # 1) everything left of the cursor is whitespace
239 238 # 2) the four characters immediately left of the cursor are spaces
240 239 if key == QtCore.Qt.Key_Backspace:
241 240 col = self._get_input_buffer_cursor_column()
242 241 cursor = self._control.textCursor()
243 242 if col > 3 and not cursor.hasSelection():
244 243 text = self._get_input_buffer_cursor_line()[:col]
245 244 if text.endswith(' ') and not text.strip():
246 245 cursor.movePosition(QtGui.QTextCursor.Left,
247 246 QtGui.QTextCursor.KeepAnchor, 4)
248 247 cursor.removeSelectedText()
249 248 return True
250 249
251 250 return super(FrontendWidget, self)._event_filter_console_keypress(event)
252 251
253 252 def _insert_continuation_prompt(self, cursor):
254 253 """ Reimplemented for auto-indentation.
255 254 """
256 255 super(FrontendWidget, self)._insert_continuation_prompt(cursor)
257 256 cursor.insertText(' ' * self._input_splitter.indent_spaces)
258 257
259 258 #---------------------------------------------------------------------------
260 259 # 'BaseFrontendMixin' abstract interface
261 260 #---------------------------------------------------------------------------
262 261
263 262 def _handle_complete_reply(self, rep):
264 263 """ Handle replies for tab completion.
265 264 """
266 265 cursor = self._get_cursor()
267 266 info = self._request_info.get('complete')
268 267 if info and info.id == rep['parent_header']['msg_id'] and \
269 268 info.pos == cursor.position():
270 269 text = '.'.join(self._get_context())
271 270 cursor.movePosition(QtGui.QTextCursor.Left, n=len(text))
272 271 self._complete_with_items(cursor, rep['content']['matches'])
273 272
274 273 def _handle_execute_reply(self, msg):
275 274 """ Handles replies for code execution.
276 275 """
277 276 info = self._request_info.get('execute')
278 277 if info and info.id == msg['parent_header']['msg_id'] and \
279 278 info.kind == 'user' and not self._hidden:
280 279 # Make sure that all output from the SUB channel has been processed
281 280 # before writing a new prompt.
282 281 self.kernel_manager.sub_channel.flush()
283 282
284 283 # Reset the ANSI style information to prevent bad text in stdout
285 284 # from messing up our colors. We're not a true terminal so we're
286 285 # allowed to do this.
287 286 if self.ansi_codes:
288 287 self._ansi_processor.reset_sgr()
289 288
290 289 content = msg['content']
291 290 status = content['status']
292 291 if status == 'ok':
293 292 self._process_execute_ok(msg)
294 293 elif status == 'error':
295 294 self._process_execute_error(msg)
296 295 elif status == 'abort':
297 296 self._process_execute_abort(msg)
298 297
299 298 self._show_interpreter_prompt_for_reply(msg)
300 299 self.executed.emit(msg)
301 300
302 301 def _handle_input_request(self, msg):
303 302 """ Handle requests for raw_input.
304 303 """
305 304 if self._hidden:
306 305 raise RuntimeError('Request for raw input during hidden execution.')
307 306
308 307 # Make sure that all output from the SUB channel has been processed
309 308 # before entering readline mode.
310 309 self.kernel_manager.sub_channel.flush()
311 310
312 311 def callback(line):
313 312 self.kernel_manager.rep_channel.input(line)
314 313 self._readline(msg['content']['prompt'], callback=callback)
315 314
316 315 def _handle_kernel_died(self, since_last_heartbeat):
317 316 """ Handle the kernel's death by asking if the user wants to restart.
318 317 """
319 318 message = 'The kernel heartbeat has been inactive for %.2f ' \
320 319 'seconds. Do you want to restart the kernel? You may ' \
321 320 'first want to check the network connection.' % \
322 321 since_last_heartbeat
323 322 if self.custom_restart:
324 323 self.custom_restart_kernel_died.emit(since_last_heartbeat)
325 324 else:
326 325 self.restart_kernel(message, instant_death=True)
327 326
328 327 def _handle_object_info_reply(self, rep):
329 328 """ Handle replies for call tips.
330 329 """
331 330 cursor = self._get_cursor()
332 331 info = self._request_info.get('call_tip')
333 332 if info and info.id == rep['parent_header']['msg_id'] and \
334 333 info.pos == cursor.position():
335 334 doc = rep['content']['docstring']
336 335 if doc:
337 336 self._call_tip_widget.show_docstring(doc)
338 337
339 338 def _handle_pyout(self, msg):
340 339 """ Handle display hook output.
341 340 """
342 341 if not self._hidden and self._is_from_this_session(msg):
343 342 self._append_plain_text(msg['content']['data'] + '\n')
344 343
345 344 def _handle_stream(self, msg):
346 345 """ Handle stdout, stderr, and stdin.
347 346 """
348 347 if not self._hidden and self._is_from_this_session(msg):
349 348 # Most consoles treat tabs as being 8 space characters. Convert tabs
350 349 # to spaces so that output looks as expected regardless of this
351 350 # widget's tab width.
352 351 text = msg['content']['data'].expandtabs(8)
353 352
354 353 self._append_plain_text(text)
355 354 self._control.moveCursor(QtGui.QTextCursor.End)
356 355
357 356 def _started_channels(self):
358 357 """ Called when the KernelManager channels have started listening or
359 358 when the frontend is assigned an already listening KernelManager.
360 359 """
361 360 self._control.clear()
362 361 self._append_plain_text(self._get_banner())
363 362 self._show_interpreter_prompt()
364 363
365 364 def _stopped_channels(self):
366 365 """ Called when the KernelManager channels have stopped listening or
367 366 when a listening KernelManager is removed from the frontend.
368 367 """
369 368 self._executing = self._reading = False
370 369 self._highlighter.highlighting_on = False
371 370
372 371 #---------------------------------------------------------------------------
373 372 # 'FrontendWidget' public interface
374 373 #---------------------------------------------------------------------------
375 374
376 375 def copy_raw(self):
377 376 """ Copy the currently selected text to the clipboard without attempting
378 377 to remove prompts or otherwise alter the text.
379 378 """
380 379 self._control.copy()
381 380
382 381 def execute_file(self, path, hidden=False):
383 382 """ Attempts to execute file with 'path'. If 'hidden', no output is
384 383 shown.
385 384 """
386 385 self.execute('execfile("%s")' % path, hidden=hidden)
387 386
388 387 def interrupt_kernel(self):
389 388 """ Attempts to interrupt the running kernel.
390 389 """
391 390 if self.custom_interrupt:
392 391 self.custom_interrupt_requested.emit()
393 392 elif self.kernel_manager.has_kernel:
394 self.kernel_manager.signal_kernel(signal.SIGINT)
393 self.kernel_manager.interrupt_kernel()
395 394 else:
396 395 self._append_plain_text('Kernel process is either remote or '
397 396 'unspecified. Cannot interrupt.\n')
398 397
399 398 def restart_kernel(self, message, instant_death=False):
400 399 """ Attempts to restart the running kernel.
401 400 """
402 401 # FIXME: instant_death should be configurable via a checkbox in the
403 402 # dialog. Right now at least the heartbeat path sets it to True and
404 403 # the manual restart to False. But those should just be the
405 404 # pre-selected states of a checkbox that the user could override if so
406 405 # desired. But I don't know enough Qt to go implementing the checkbox
407 406 # now.
408 407
409 408 # We want to make sure that if this dialog is already happening, that
410 409 # other signals don't trigger it again. This can happen when the
411 410 # kernel_died heartbeat signal is emitted and the user is slow to
412 411 # respond to the dialog.
413 412 if not self._possible_kernel_restart:
414 413 if self.custom_restart:
415 414 self.custom_restart_requested.emit()
416 415 elif self.kernel_manager.has_kernel:
417 416 # Setting this to True will prevent this logic from happening
418 417 # again until the current pass is completed.
419 418 self._possible_kernel_restart = True
420 419 buttons = QtGui.QMessageBox.Yes | QtGui.QMessageBox.No
421 420 result = QtGui.QMessageBox.question(self, 'Restart kernel?',
422 421 message, buttons)
423 422 if result == QtGui.QMessageBox.Yes:
424 423 try:
425 424 self.kernel_manager.restart_kernel(
426 425 instant_death=instant_death)
427 426 except RuntimeError:
428 427 message = 'Kernel started externally. Cannot restart.\n'
429 428 self._append_plain_text(message)
430 429 else:
431 430 self._stopped_channels()
432 431 self._append_plain_text('Kernel restarting...\n')
433 432 self._show_interpreter_prompt()
434 433 # This might need to be moved to another location?
435 434 self._possible_kernel_restart = False
436 435 else:
437 436 self._append_plain_text('Kernel process is either remote or '
438 437 'unspecified. Cannot restart.\n')
439 438
440 439 #---------------------------------------------------------------------------
441 440 # 'FrontendWidget' protected interface
442 441 #---------------------------------------------------------------------------
443 442
444 443 def _call_tip(self):
445 444 """ Shows a call tip, if appropriate, at the current cursor location.
446 445 """
447 446 # Decide if it makes sense to show a call tip
448 447 cursor = self._get_cursor()
449 448 cursor.movePosition(QtGui.QTextCursor.Left)
450 449 if cursor.document().characterAt(cursor.position()).toAscii() != '(':
451 450 return False
452 451 context = self._get_context(cursor)
453 452 if not context:
454 453 return False
455 454
456 455 # Send the metadata request to the kernel
457 456 name = '.'.join(context)
458 457 msg_id = self.kernel_manager.xreq_channel.object_info(name)
459 458 pos = self._get_cursor().position()
460 459 self._request_info['call_tip'] = self._CallTipRequest(msg_id, pos)
461 460 return True
462 461
463 462 def _complete(self):
464 463 """ Performs completion at the current cursor location.
465 464 """
466 465 context = self._get_context()
467 466 if context:
468 467 # Send the completion request to the kernel
469 468 msg_id = self.kernel_manager.xreq_channel.complete(
470 469 '.'.join(context), # text
471 470 self._get_input_buffer_cursor_line(), # line
472 471 self._get_input_buffer_cursor_column(), # cursor_pos
473 472 self.input_buffer) # block
474 473 pos = self._get_cursor().position()
475 474 info = self._CompletionRequest(msg_id, pos)
476 475 self._request_info['complete'] = info
477 476
478 477 def _get_banner(self):
479 478 """ Gets a banner to display at the beginning of a session.
480 479 """
481 480 banner = 'Python %s on %s\nType "help", "copyright", "credits" or ' \
482 481 '"license" for more information.'
483 482 return banner % (sys.version, sys.platform)
484 483
485 484 def _get_context(self, cursor=None):
486 485 """ Gets the context for the specified cursor (or the current cursor
487 486 if none is specified).
488 487 """
489 488 if cursor is None:
490 489 cursor = self._get_cursor()
491 490 cursor.movePosition(QtGui.QTextCursor.StartOfBlock,
492 491 QtGui.QTextCursor.KeepAnchor)
493 492 text = str(cursor.selection().toPlainText())
494 493 return self._completion_lexer.get_context(text)
495 494
496 495 def _process_execute_abort(self, msg):
497 496 """ Process a reply for an aborted execution request.
498 497 """
499 498 self._append_plain_text("ERROR: execution aborted\n")
500 499
501 500 def _process_execute_error(self, msg):
502 501 """ Process a reply for an execution request that resulted in an error.
503 502 """
504 503 content = msg['content']
505 504 traceback = ''.join(content['traceback'])
506 505 self._append_plain_text(traceback)
507 506
508 507 def _process_execute_ok(self, msg):
509 508 """ Process a reply for a successful execution equest.
510 509 """
511 510 payload = msg['content']['payload']
512 511 for item in payload:
513 512 if not self._process_execute_payload(item):
514 513 warning = 'Warning: received unknown payload of type %s'
515 514 print(warning % repr(item['source']))
516 515
517 516 def _process_execute_payload(self, item):
518 517 """ Process a single payload item from the list of payload items in an
519 518 execution reply. Returns whether the payload was handled.
520 519 """
521 520 # The basic FrontendWidget doesn't handle payloads, as they are a
522 521 # mechanism for going beyond the standard Python interpreter model.
523 522 return False
524 523
525 524 def _show_interpreter_prompt(self):
526 525 """ Shows a prompt for the interpreter.
527 526 """
528 527 self._show_prompt('>>> ')
529 528
530 529 def _show_interpreter_prompt_for_reply(self, msg):
531 530 """ Shows a prompt for the interpreter given an 'execute_reply' message.
532 531 """
533 532 self._show_interpreter_prompt()
534 533
535 534 #------ Signal handlers ----------------------------------------------------
536 535
537 536 def _document_contents_change(self, position, removed, added):
538 537 """ Called whenever the document's content changes. Display a call tip
539 538 if appropriate.
540 539 """
541 540 # Calculate where the cursor should be *after* the change:
542 541 position += added
543 542
544 543 document = self._control.document()
545 544 if position == self._get_cursor().position():
546 545 self._call_tip()
@@ -1,239 +1,250 b''
1 1 """ Defines helper functions for creating kernel entry points and process
2 2 launchers.
3 3 """
4 4
5 5 # Standard library imports.
6 6 import os
7 7 import socket
8 8 from subprocess import Popen, PIPE
9 9 import sys
10 10
11 11 # System library imports.
12 12 import zmq
13 13
14 14 # Local imports.
15 15 from IPython.core.ultratb import FormattedTB
16 16 from IPython.external.argparse import ArgumentParser
17 17 from IPython.utils import io
18 from exitpoller import ExitPollerUnix, ExitPollerWindows
19 18 from displayhook import DisplayHook
19 from heartbeat import Heartbeat
20 20 from iostream import OutStream
21 from parentpoller import ParentPollerUnix, ParentPollerWindows
21 22 from session import Session
22 from heartbeat import Heartbeat
23 23
24 24 def bind_port(socket, ip, port):
25 25 """ Binds the specified ZMQ socket. If the port is zero, a random port is
26 26 chosen. Returns the port that was bound.
27 27 """
28 28 connection = 'tcp://%s' % ip
29 29 if port <= 0:
30 30 port = socket.bind_to_random_port(connection)
31 31 else:
32 32 connection += ':%i' % port
33 33 socket.bind(connection)
34 34 return port
35 35
36 36
37 37 def make_argument_parser():
38 38 """ Creates an ArgumentParser for the generic arguments supported by all
39 39 kernel entry points.
40 40 """
41 41 parser = ArgumentParser()
42 42 parser.add_argument('--ip', type=str, default='127.0.0.1',
43 43 help='set the kernel\'s IP address [default: local]')
44 44 parser.add_argument('--xrep', type=int, metavar='PORT', default=0,
45 45 help='set the XREP channel port [default: random]')
46 46 parser.add_argument('--pub', type=int, metavar='PORT', default=0,
47 47 help='set the PUB channel port [default: random]')
48 48 parser.add_argument('--req', type=int, metavar='PORT', default=0,
49 49 help='set the REQ channel port [default: random]')
50 50 parser.add_argument('--hb', type=int, metavar='PORT', default=0,
51 51 help='set the heartbeat port [default: random]')
52 52
53 53 if sys.platform == 'win32':
54 parser.add_argument('--interrupt', type=int, metavar='HANDLE',
55 default=0, help='interrupt this process when '
56 'HANDLE is signaled')
54 57 parser.add_argument('--parent', type=int, metavar='HANDLE',
55 58 default=0, help='kill this process if the process '
56 59 'with HANDLE dies')
57 60 else:
58 61 parser.add_argument('--parent', action='store_true',
59 62 help='kill this process if its parent dies')
60 63
61 64 return parser
62 65
63 66
64 67 def make_kernel(namespace, kernel_factory,
65 68 out_stream_factory=None, display_hook_factory=None):
66 69 """ Creates a kernel, redirects stdout/stderr, and installs a display hook
67 70 and exception handler.
68 71 """
69 72 # If running under pythonw.exe, the interpreter will crash if more than 4KB
70 73 # of data is written to stdout or stderr. This is a bug that has been with
71 74 # Python for a very long time; see http://bugs.python.org/issue706263.
72 75 if sys.executable.endswith('pythonw.exe'):
73 76 blackhole = file(os.devnull, 'w')
74 77 sys.stdout = sys.stderr = blackhole
75 78 sys.__stdout__ = sys.__stderr__ = blackhole
76 79
77 80 # Install minimal exception handling
78 81 sys.excepthook = FormattedTB(mode='Verbose', color_scheme='NoColor',
79 82 ostream=sys.__stdout__)
80 83
81 84 # Create a context, a session, and the kernel sockets.
82 85 io.raw_print("Starting the kernel at pid:", os.getpid())
83 86 context = zmq.Context()
84 87 session = Session(username=u'kernel')
85 88
86 89 reply_socket = context.socket(zmq.XREP)
87 90 xrep_port = bind_port(reply_socket, namespace.ip, namespace.xrep)
88 91 io.raw_print("XREP Channel on port", xrep_port)
89 92
90 93 pub_socket = context.socket(zmq.PUB)
91 94 pub_port = bind_port(pub_socket, namespace.ip, namespace.pub)
92 95 io.raw_print("PUB Channel on port", pub_port)
93 96
94 97 req_socket = context.socket(zmq.XREQ)
95 98 req_port = bind_port(req_socket, namespace.ip, namespace.req)
96 99 io.raw_print("REQ Channel on port", req_port)
97 100
98 101 hb = Heartbeat(context, (namespace.ip, namespace.hb))
99 102 hb.start()
100 103 hb_port = hb.port
101 104 io.raw_print("Heartbeat REP Channel on port", hb_port)
102 105
103 106 # Redirect input streams and set a display hook.
104 107 if out_stream_factory:
105 108 sys.stdout = out_stream_factory(session, pub_socket, u'stdout')
106 109 sys.stderr = out_stream_factory(session, pub_socket, u'stderr')
107 110 if display_hook_factory:
108 111 sys.displayhook = display_hook_factory(session, pub_socket)
109 112
110 113 # Create the kernel.
111 114 kernel = kernel_factory(session=session, reply_socket=reply_socket,
112 115 pub_socket=pub_socket, req_socket=req_socket)
113 116 kernel.record_ports(xrep_port=xrep_port, pub_port=pub_port,
114 117 req_port=req_port, hb_port=hb_port)
115 118 return kernel
116 119
117 120
118 121 def start_kernel(namespace, kernel):
119 122 """ Starts a kernel.
120 123 """
121 # Configure this kernel/process to die on parent termination, if necessary.
122 if namespace.parent:
123 if sys.platform == 'win32':
124 poller = ExitPollerWindows(namespace.parent)
125 else:
126 poller = ExitPollerUnix()
124 # Configure this kernel process to poll the parent process, if necessary.
125 if sys.platform == 'win32':
126 if namespace.interrupt or namespace.parent:
127 poller = ParentPollerWindows(namespace.interrupt, namespace.parent)
128 poller.start()
129 elif namespace.parent:
130 poller = ParentPollerUnix()
127 131 poller.start()
128 132
129 133 # Start the kernel mainloop.
130 134 kernel.start()
131 135
132 136
133 137 def make_default_main(kernel_factory):
134 138 """ Creates the simplest possible kernel entry point.
135 139 """
136 140 def main():
137 141 namespace = make_argument_parser().parse_args()
138 142 kernel = make_kernel(namespace, kernel_factory, OutStream, DisplayHook)
139 143 start_kernel(namespace, kernel)
140 144 return main
141 145
142 146
143 147 def base_launch_kernel(code, xrep_port=0, pub_port=0, req_port=0, hb_port=0,
144 148 independent=False, extra_arguments=[]):
145 149 """ Launches a localhost kernel, binding to the specified ports.
146 150
147 151 Parameters
148 152 ----------
149 153 code : str,
150 154 A string of Python code that imports and executes a kernel entry point.
151 155
152 156 xrep_port : int, optional
153 157 The port to use for XREP channel.
154 158
155 159 pub_port : int, optional
156 160 The port to use for the SUB channel.
157 161
158 162 req_port : int, optional
159 163 The port to use for the REQ (raw input) channel.
160 164
161 165 hb_port : int, optional
162 166 The port to use for the hearbeat REP channel.
163 167
164 168 independent : bool, optional (default False)
165 169 If set, the kernel process is guaranteed to survive if this process
166 170 dies. If not set, an effort is made to ensure that the kernel is killed
167 171 when this process dies. Note that in this case it is still good practice
168 172 to kill kernels manually before exiting.
169 173
170 174 extra_arguments = list, optional
171 175 A list of extra arguments to pass when executing the launch code.
172 176
173 177 Returns
174 178 -------
175 179 A tuple of form:
176 180 (kernel_process, xrep_port, pub_port, req_port)
177 181 where kernel_process is a Popen object and the ports are integers.
178 182 """
179 183 # Find open ports as necessary.
180 184 ports = []
181 185 ports_needed = int(xrep_port <= 0) + int(pub_port <= 0) + \
182 186 int(req_port <= 0) + int(hb_port <= 0)
183 187 for i in xrange(ports_needed):
184 188 sock = socket.socket()
185 189 sock.bind(('', 0))
186 190 ports.append(sock)
187 191 for i, sock in enumerate(ports):
188 192 port = sock.getsockname()[1]
189 193 sock.close()
190 194 ports[i] = port
191 195 if xrep_port <= 0:
192 196 xrep_port = ports.pop(0)
193 197 if pub_port <= 0:
194 198 pub_port = ports.pop(0)
195 199 if req_port <= 0:
196 200 req_port = ports.pop(0)
197 201 if hb_port <= 0:
198 202 hb_port = ports.pop(0)
199 203
200 204 # Build the kernel launch command.
201 205 arguments = [ sys.executable, '-c', code, '--xrep', str(xrep_port),
202 206 '--pub', str(pub_port), '--req', str(req_port),
203 207 '--hb', str(hb_port) ]
204 208 arguments.extend(extra_arguments)
205 209
206 210 # Spawn a kernel.
207 211 if sys.platform == 'win32':
212 # Create a Win32 event for interrupting the kernel.
213 interrupt_event = ParentPollerWindows.create_interrupt_event()
214 arguments += [ '--interrupt', str(int(interrupt_event)) ]
208 215
209 216 # If using pythonw, stdin, stdout, and stderr are invalid. Popen will
210 217 # fail unless they are suitably redirected. We don't read from the
211 218 # pipes, but they must exist.
212 219 redirect = PIPE if sys.executable.endswith('pythonw.exe') else None
213 220
214 221 if independent:
215 proc = Popen(['start', '/b'] + arguments, shell=True,
222 proc = Popen(arguments,
223 creationflags=512, # CREATE_NEW_PROCESS_GROUP
216 224 stdout=redirect, stderr=redirect, stdin=redirect)
217 225 else:
218 226 from _subprocess import DuplicateHandle, GetCurrentProcess, \
219 227 DUPLICATE_SAME_ACCESS
220 228 pid = GetCurrentProcess()
221 229 handle = DuplicateHandle(pid, pid, pid, 0,
222 230 True, # Inheritable by new processes.
223 231 DUPLICATE_SAME_ACCESS)
224 232 proc = Popen(arguments + ['--parent', str(int(handle))],
225 233 stdout=redirect, stderr=redirect, stdin=redirect)
226 234
235 # Attach the interrupt event to the Popen objet so it can be used later.
236 proc.win32_interrupt_event = interrupt_event
237
227 238 # Clean up pipes created to work around Popen bug.
228 239 if redirect is not None:
229 240 proc.stdout.close()
230 241 proc.stderr.close()
231 242 proc.stdin.close()
232 243
233 244 else:
234 245 if independent:
235 246 proc = Popen(arguments, preexec_fn=lambda: os.setsid())
236 247 else:
237 248 proc = Popen(arguments + ['--parent'])
238 249
239 250 return proc, xrep_port, pub_port, req_port, hb_port
@@ -1,867 +1,881 b''
1 1 """Base classes to manage the interaction with a running kernel.
2 2
3 Todo
4 ====
5
3 TODO
6 4 * Create logger to handle debugging and console messages.
7 5 """
8 6
9 7 #-----------------------------------------------------------------------------
10 8 # Copyright (C) 2008-2010 The IPython Development Team
11 9 #
12 10 # Distributed under the terms of the BSD License. The full license is in
13 11 # the file COPYING, distributed as part of this software.
14 12 #-----------------------------------------------------------------------------
15 13
16 14 #-----------------------------------------------------------------------------
17 15 # Imports
18 16 #-----------------------------------------------------------------------------
19 17
20 18 # Standard library imports.
21 19 from Queue import Queue, Empty
22 20 from subprocess import Popen
21 import signal
23 22 import sys
24 23 from threading import Thread
25 24 import time
26 25
27 26 # System library imports.
28 27 import zmq
29 28 from zmq import POLLIN, POLLOUT, POLLERR
30 29 from zmq.eventloop import ioloop
31 30
32 31 # Local imports.
33 32 from IPython.utils import io
34 33 from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress
35 34 from session import Session
36 35
37 36 #-----------------------------------------------------------------------------
38 37 # Constants and exceptions
39 38 #-----------------------------------------------------------------------------
40 39
41 40 LOCALHOST = '127.0.0.1'
42 41
43 42 class InvalidPortNumber(Exception):
44 43 pass
45 44
46 45 #-----------------------------------------------------------------------------
47 46 # Utility functions
48 47 #-----------------------------------------------------------------------------
49 48
50 49 # some utilities to validate message structure, these might get moved elsewhere
51 50 # if they prove to have more generic utility
52 51
53 52 def validate_string_list(lst):
54 53 """Validate that the input is a list of strings.
55 54
56 55 Raises ValueError if not."""
57 56 if not isinstance(lst, list):
58 57 raise ValueError('input %r must be a list' % lst)
59 58 for x in lst:
60 59 if not isinstance(x, basestring):
61 60 raise ValueError('element %r in list must be a string' % x)
62 61
63 62
64 63 def validate_string_dict(dct):
65 64 """Validate that the input is a dict with string keys and values.
66 65
67 66 Raises ValueError if not."""
68 67 for k,v in dct.iteritems():
69 68 if not isinstance(k, basestring):
70 69 raise ValueError('key %r in dict must be a string' % k)
71 70 if not isinstance(v, basestring):
72 71 raise ValueError('value %r in dict must be a string' % v)
73 72
74 73
75 74 #-----------------------------------------------------------------------------
76 75 # ZMQ Socket Channel classes
77 76 #-----------------------------------------------------------------------------
78 77
79 78 class ZmqSocketChannel(Thread):
80 79 """The base class for the channels that use ZMQ sockets.
81 80 """
82 81 context = None
83 82 session = None
84 83 socket = None
85 84 ioloop = None
86 85 iostate = None
87 86 _address = None
88 87
89 88 def __init__(self, context, session, address):
90 89 """Create a channel
91 90
92 91 Parameters
93 92 ----------
94 93 context : :class:`zmq.Context`
95 94 The ZMQ context to use.
96 95 session : :class:`session.Session`
97 96 The session to use.
98 97 address : tuple
99 98 Standard (ip, port) tuple that the kernel is listening on.
100 99 """
101 100 super(ZmqSocketChannel, self).__init__()
102 101 self.daemon = True
103 102
104 103 self.context = context
105 104 self.session = session
106 105 if address[1] == 0:
107 106 message = 'The port number for a channel cannot be 0.'
108 107 raise InvalidPortNumber(message)
109 108 self._address = address
110 109
111 110 def stop(self):
112 111 """Stop the channel's activity.
113 112
114 113 This calls :method:`Thread.join` and returns when the thread
115 114 terminates. :class:`RuntimeError` will be raised if
116 115 :method:`self.start` is called again.
117 116 """
118 117 self.join()
119 118
120 119 @property
121 120 def address(self):
122 121 """Get the channel's address as an (ip, port) tuple.
123 122
124 123 By the default, the address is (localhost, 0), where 0 means a random
125 124 port.
126 125 """
127 126 return self._address
128 127
129 128 def add_io_state(self, state):
130 129 """Add IO state to the eventloop.
131 130
132 131 Parameters
133 132 ----------
134 133 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
135 134 The IO state flag to set.
136 135
137 136 This is thread safe as it uses the thread safe IOLoop.add_callback.
138 137 """
139 138 def add_io_state_callback():
140 139 if not self.iostate & state:
141 140 self.iostate = self.iostate | state
142 141 self.ioloop.update_handler(self.socket, self.iostate)
143 142 self.ioloop.add_callback(add_io_state_callback)
144 143
145 144 def drop_io_state(self, state):
146 145 """Drop IO state from the eventloop.
147 146
148 147 Parameters
149 148 ----------
150 149 state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
151 150 The IO state flag to set.
152 151
153 152 This is thread safe as it uses the thread safe IOLoop.add_callback.
154 153 """
155 154 def drop_io_state_callback():
156 155 if self.iostate & state:
157 156 self.iostate = self.iostate & (~state)
158 157 self.ioloop.update_handler(self.socket, self.iostate)
159 158 self.ioloop.add_callback(drop_io_state_callback)
160 159
161 160
162 161 class XReqSocketChannel(ZmqSocketChannel):
163 162 """The XREQ channel for issues request/replies to the kernel.
164 163 """
165 164
166 165 command_queue = None
167 166
168 167 def __init__(self, context, session, address):
169 168 super(XReqSocketChannel, self).__init__(context, session, address)
170 169 self.command_queue = Queue()
171 170 self.ioloop = ioloop.IOLoop()
172 171
173 172 def run(self):
174 173 """The thread's main activity. Call start() instead."""
175 174 self.socket = self.context.socket(zmq.XREQ)
176 175 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
177 176 self.socket.connect('tcp://%s:%i' % self.address)
178 177 self.iostate = POLLERR|POLLIN
179 178 self.ioloop.add_handler(self.socket, self._handle_events,
180 179 self.iostate)
181 180 self.ioloop.start()
182 181
183 182 def stop(self):
184 183 self.ioloop.stop()
185 184 super(XReqSocketChannel, self).stop()
186 185
187 186 def call_handlers(self, msg):
188 187 """This method is called in the ioloop thread when a message arrives.
189 188
190 189 Subclasses should override this method to handle incoming messages.
191 190 It is important to remember that this method is called in the thread
192 191 so that some logic must be done to ensure that the application leve
193 192 handlers are called in the application thread.
194 193 """
195 194 raise NotImplementedError('call_handlers must be defined in a subclass.')
196 195
197 196 def execute(self, code, silent=False,
198 197 user_variables=None, user_expressions=None):
199 198 """Execute code in the kernel.
200 199
201 200 Parameters
202 201 ----------
203 202 code : str
204 203 A string of Python code.
205 204
206 205 silent : bool, optional (default False)
207 206 If set, the kernel will execute the code as quietly possible.
208 207
209 208 user_variables : list, optional
210 209 A list of variable names to pull from the user's namespace. They
211 210 will come back as a dict with these names as keys and their
212 211 :func:`repr` as values.
213 212
214 213 user_expressions : dict, optional
215 214 A dict with string keys and to pull from the user's
216 215 namespace. They will come back as a dict with these names as keys
217 216 and their :func:`repr` as values.
218 217
219 218 Returns
220 219 -------
221 220 The msg_id of the message sent.
222 221 """
223 222 if user_variables is None:
224 223 user_variables = []
225 224 if user_expressions is None:
226 225 user_expressions = {}
227 226
228 227 # Don't waste network traffic if inputs are invalid
229 228 if not isinstance(code, basestring):
230 229 raise ValueError('code %r must be a string' % code)
231 230 validate_string_list(user_variables)
232 231 validate_string_dict(user_expressions)
233 232
234 233 # Create class for content/msg creation. Related to, but possibly
235 234 # not in Session.
236 235 content = dict(code=code, silent=silent,
237 236 user_variables=user_variables,
238 237 user_expressions=user_expressions)
239 238 msg = self.session.msg('execute_request', content)
240 239 self._queue_request(msg)
241 240 return msg['header']['msg_id']
242 241
243 242 def complete(self, text, line, cursor_pos, block=None):
244 243 """Tab complete text in the kernel's namespace.
245 244
246 245 Parameters
247 246 ----------
248 247 text : str
249 248 The text to complete.
250 249 line : str
251 250 The full line of text that is the surrounding context for the
252 251 text to complete.
253 252 cursor_pos : int
254 253 The position of the cursor in the line where the completion was
255 254 requested.
256 255 block : str, optional
257 256 The full block of code in which the completion is being requested.
258 257
259 258 Returns
260 259 -------
261 260 The msg_id of the message sent.
262 261 """
263 262 content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
264 263 msg = self.session.msg('complete_request', content)
265 264 self._queue_request(msg)
266 265 return msg['header']['msg_id']
267 266
268 267 def object_info(self, oname):
269 268 """Get metadata information about an object.
270 269
271 270 Parameters
272 271 ----------
273 272 oname : str
274 273 A string specifying the object name.
275 274
276 275 Returns
277 276 -------
278 277 The msg_id of the message sent.
279 278 """
280 279 content = dict(oname=oname)
281 280 msg = self.session.msg('object_info_request', content)
282 281 self._queue_request(msg)
283 282 return msg['header']['msg_id']
284 283
285 284 def history(self, index=None, raw=False, output=True):
286 285 """Get the history list.
287 286
288 287 Parameters
289 288 ----------
290 289 index : n or (n1, n2) or None
291 290 If n, then the last entries. If a tuple, then all in
292 291 range(n1, n2). If None, then all entries. Raises IndexError if
293 292 the format of index is incorrect.
294 293 raw : bool
295 294 If True, return the raw input.
296 295 output : bool
297 296 If True, then return the output as well.
298 297
299 298 Returns
300 299 -------
301 300 The msg_id of the message sent.
302 301 """
303 302 content = dict(index=index, raw=raw, output=output)
304 303 msg = self.session.msg('history_request', content)
305 304 self._queue_request(msg)
306 305 return msg['header']['msg_id']
307 306
308 307 def shutdown(self):
309 308 """Request an immediate kernel shutdown.
310 309
311 310 Upon receipt of the (empty) reply, client code can safely assume that
312 311 the kernel has shut down and it's safe to forcefully terminate it if
313 312 it's still alive.
314 313
315 314 The kernel will send the reply via a function registered with Python's
316 315 atexit module, ensuring it's truly done as the kernel is done with all
317 316 normal operation.
318 317 """
319 318 # Send quit message to kernel. Once we implement kernel-side setattr,
320 319 # this should probably be done that way, but for now this will do.
321 320 msg = self.session.msg('shutdown_request', {})
322 321 self._queue_request(msg)
323 322 return msg['header']['msg_id']
324 323
325 324 def _handle_events(self, socket, events):
326 325 if events & POLLERR:
327 326 self._handle_err()
328 327 if events & POLLOUT:
329 328 self._handle_send()
330 329 if events & POLLIN:
331 330 self._handle_recv()
332 331
333 332 def _handle_recv(self):
334 333 msg = self.socket.recv_json()
335 334 self.call_handlers(msg)
336 335
337 336 def _handle_send(self):
338 337 try:
339 338 msg = self.command_queue.get(False)
340 339 except Empty:
341 340 pass
342 341 else:
343 342 self.socket.send_json(msg)
344 343 if self.command_queue.empty():
345 344 self.drop_io_state(POLLOUT)
346 345
347 346 def _handle_err(self):
348 347 # We don't want to let this go silently, so eventually we should log.
349 348 raise zmq.ZMQError()
350 349
351 350 def _queue_request(self, msg):
352 351 self.command_queue.put(msg)
353 352 self.add_io_state(POLLOUT)
354 353
355 354
356 355 class SubSocketChannel(ZmqSocketChannel):
357 356 """The SUB channel which listens for messages that the kernel publishes.
358 357 """
359 358
360 359 def __init__(self, context, session, address):
361 360 super(SubSocketChannel, self).__init__(context, session, address)
362 361 self.ioloop = ioloop.IOLoop()
363 362
364 363 def run(self):
365 364 """The thread's main activity. Call start() instead."""
366 365 self.socket = self.context.socket(zmq.SUB)
367 366 self.socket.setsockopt(zmq.SUBSCRIBE,'')
368 367 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
369 368 self.socket.connect('tcp://%s:%i' % self.address)
370 369 self.iostate = POLLIN|POLLERR
371 370 self.ioloop.add_handler(self.socket, self._handle_events,
372 371 self.iostate)
373 372 self.ioloop.start()
374 373
375 374 def stop(self):
376 375 self.ioloop.stop()
377 376 super(SubSocketChannel, self).stop()
378 377
379 378 def call_handlers(self, msg):
380 379 """This method is called in the ioloop thread when a message arrives.
381 380
382 381 Subclasses should override this method to handle incoming messages.
383 382 It is important to remember that this method is called in the thread
384 383 so that some logic must be done to ensure that the application leve
385 384 handlers are called in the application thread.
386 385 """
387 386 raise NotImplementedError('call_handlers must be defined in a subclass.')
388 387
389 388 def flush(self, timeout=1.0):
390 389 """Immediately processes all pending messages on the SUB channel.
391 390
392 391 Callers should use this method to ensure that :method:`call_handlers`
393 392 has been called for all messages that have been received on the
394 393 0MQ SUB socket of this channel.
395 394
396 395 This method is thread safe.
397 396
398 397 Parameters
399 398 ----------
400 399 timeout : float, optional
401 400 The maximum amount of time to spend flushing, in seconds. The
402 401 default is one second.
403 402 """
404 403 # We do the IOLoop callback process twice to ensure that the IOLoop
405 404 # gets to perform at least one full poll.
406 405 stop_time = time.time() + timeout
407 406 for i in xrange(2):
408 407 self._flushed = False
409 408 self.ioloop.add_callback(self._flush)
410 409 while not self._flushed and time.time() < stop_time:
411 410 time.sleep(0.01)
412 411
413 412 def _handle_events(self, socket, events):
414 413 # Turn on and off POLLOUT depending on if we have made a request
415 414 if events & POLLERR:
416 415 self._handle_err()
417 416 if events & POLLIN:
418 417 self._handle_recv()
419 418
420 419 def _handle_err(self):
421 420 # We don't want to let this go silently, so eventually we should log.
422 421 raise zmq.ZMQError()
423 422
424 423 def _handle_recv(self):
425 424 # Get all of the messages we can
426 425 while True:
427 426 try:
428 427 msg = self.socket.recv_json(zmq.NOBLOCK)
429 428 except zmq.ZMQError:
430 429 # Check the errno?
431 430 # Will this trigger POLLERR?
432 431 break
433 432 else:
434 433 self.call_handlers(msg)
435 434
436 435 def _flush(self):
437 436 """Callback for :method:`self.flush`."""
438 437 self._flushed = True
439 438
440 439
441 440 class RepSocketChannel(ZmqSocketChannel):
442 441 """A reply channel to handle raw_input requests that the kernel makes."""
443 442
444 443 msg_queue = None
445 444
446 445 def __init__(self, context, session, address):
447 446 super(RepSocketChannel, self).__init__(context, session, address)
448 447 self.ioloop = ioloop.IOLoop()
449 448 self.msg_queue = Queue()
450 449
451 450 def run(self):
452 451 """The thread's main activity. Call start() instead."""
453 452 self.socket = self.context.socket(zmq.XREQ)
454 453 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
455 454 self.socket.connect('tcp://%s:%i' % self.address)
456 455 self.iostate = POLLERR|POLLIN
457 456 self.ioloop.add_handler(self.socket, self._handle_events,
458 457 self.iostate)
459 458 self.ioloop.start()
460 459
461 460 def stop(self):
462 461 self.ioloop.stop()
463 462 super(RepSocketChannel, self).stop()
464 463
465 464 def call_handlers(self, msg):
466 465 """This method is called in the ioloop thread when a message arrives.
467 466
468 467 Subclasses should override this method to handle incoming messages.
469 468 It is important to remember that this method is called in the thread
470 469 so that some logic must be done to ensure that the application leve
471 470 handlers are called in the application thread.
472 471 """
473 472 raise NotImplementedError('call_handlers must be defined in a subclass.')
474 473
475 474 def input(self, string):
476 475 """Send a string of raw input to the kernel."""
477 476 content = dict(value=string)
478 477 msg = self.session.msg('input_reply', content)
479 478 self._queue_reply(msg)
480 479
481 480 def _handle_events(self, socket, events):
482 481 if events & POLLERR:
483 482 self._handle_err()
484 483 if events & POLLOUT:
485 484 self._handle_send()
486 485 if events & POLLIN:
487 486 self._handle_recv()
488 487
489 488 def _handle_recv(self):
490 489 msg = self.socket.recv_json()
491 490 self.call_handlers(msg)
492 491
493 492 def _handle_send(self):
494 493 try:
495 494 msg = self.msg_queue.get(False)
496 495 except Empty:
497 496 pass
498 497 else:
499 498 self.socket.send_json(msg)
500 499 if self.msg_queue.empty():
501 500 self.drop_io_state(POLLOUT)
502 501
503 502 def _handle_err(self):
504 503 # We don't want to let this go silently, so eventually we should log.
505 504 raise zmq.ZMQError()
506 505
507 506 def _queue_reply(self, msg):
508 507 self.msg_queue.put(msg)
509 508 self.add_io_state(POLLOUT)
510 509
511 510
512 511 class HBSocketChannel(ZmqSocketChannel):
513 512 """The heartbeat channel which monitors the kernel heartbeat."""
514 513
515 514 time_to_dead = 3.0
516 515 socket = None
517 516 poller = None
518 517 _running = None
519 518 _pause = None
520 519
521 520 def __init__(self, context, session, address):
522 521 super(HBSocketChannel, self).__init__(context, session, address)
523 522 self._running = False
524 523 self._pause = False
525 524
526 525 def _create_socket(self):
527 526 self.socket = self.context.socket(zmq.REQ)
528 527 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
529 528 self.socket.connect('tcp://%s:%i' % self.address)
530 529 self.poller = zmq.Poller()
531 530 self.poller.register(self.socket, zmq.POLLIN)
532 531
533 532 def run(self):
534 533 """The thread's main activity. Call start() instead."""
535 534 self._create_socket()
536 535 self._running = True
537 536 # Wait 2 seconds for the kernel to come up and the sockets to auto
538 537 # connect. If we don't we will see the kernel as dead. Also, before
539 538 # the sockets are connected, the poller.poll line below is returning
540 539 # too fast. This avoids that because the polling doesn't start until
541 540 # after the sockets are connected.
542 541 time.sleep(2.0)
543 542 while self._running:
544 543 if self._pause:
545 544 time.sleep(self.time_to_dead)
546 545 else:
547 546 since_last_heartbeat = 0.0
548 547 request_time = time.time()
549 548 try:
550 549 #io.rprint('Ping from HB channel') # dbg
551 550 self.socket.send_json('ping')
552 551 except zmq.ZMQError, e:
553 552 #io.rprint('*** HB Error:', e) # dbg
554 553 if e.errno == zmq.EFSM:
555 554 #io.rprint('sleep...', self.time_to_dead) # dbg
556 555 time.sleep(self.time_to_dead)
557 556 self._create_socket()
558 557 else:
559 558 raise
560 559 else:
561 560 while True:
562 561 try:
563 562 self.socket.recv_json(zmq.NOBLOCK)
564 563 except zmq.ZMQError, e:
565 564 #io.rprint('*** HB Error 2:', e) # dbg
566 565 if e.errno == zmq.EAGAIN:
567 566 before_poll = time.time()
568 567 until_dead = self.time_to_dead - (before_poll -
569 568 request_time)
570 569
571 570 # When the return value of poll() is an empty list,
572 571 # that is when things have gone wrong (zeromq bug).
573 572 # As long as it is not an empty list, poll is
574 573 # working correctly even if it returns quickly.
575 574 # Note: poll timeout is in milliseconds.
576 575 self.poller.poll(1000*until_dead)
577 576
578 577 since_last_heartbeat = time.time() - request_time
579 578 if since_last_heartbeat > self.time_to_dead:
580 579 self.call_handlers(since_last_heartbeat)
581 580 break
582 581 else:
583 582 # FIXME: We should probably log this instead.
584 583 raise
585 584 else:
586 585 until_dead = self.time_to_dead - (time.time() -
587 586 request_time)
588 587 if until_dead > 0.0:
589 588 #io.rprint('sleep...', self.time_to_dead) # dbg
590 589 time.sleep(until_dead)
591 590 break
592 591
593 592 def pause(self):
594 593 """Pause the heartbeat."""
595 594 self._pause = True
596 595
597 596 def unpause(self):
598 597 """Unpause the heartbeat."""
599 598 self._pause = False
600 599
601 600 def is_beating(self):
602 601 """Is the heartbeat running and not paused."""
603 602 if self.is_alive() and not self._pause:
604 603 return True
605 604 else:
606 605 return False
607 606
608 607 def stop(self):
609 608 self._running = False
610 609 super(HBSocketChannel, self).stop()
611 610
612 611 def call_handlers(self, since_last_heartbeat):
613 612 """This method is called in the ioloop thread when a message arrives.
614 613
615 614 Subclasses should override this method to handle incoming messages.
616 615 It is important to remember that this method is called in the thread
617 616 so that some logic must be done to ensure that the application leve
618 617 handlers are called in the application thread.
619 618 """
620 619 raise NotImplementedError('call_handlers must be defined in a subclass.')
621 620
622 621
623 622 #-----------------------------------------------------------------------------
624 623 # Main kernel manager class
625 624 #-----------------------------------------------------------------------------
626 625
627 626 class KernelManager(HasTraits):
628 627 """ Manages a kernel for a frontend.
629 628
630 629 The SUB channel is for the frontend to receive messages published by the
631 630 kernel.
632 631
633 632 The REQ channel is for the frontend to make requests of the kernel.
634 633
635 634 The REP channel is for the kernel to request stdin (raw_input) from the
636 635 frontend.
637 636 """
638 637 # The PyZMQ Context to use for communication with the kernel.
639 638 context = Instance(zmq.Context,(),{})
640 639
641 640 # The Session to use for communication with the kernel.
642 641 session = Instance(Session,(),{})
643 642
644 643 # The kernel process with which the KernelManager is communicating.
645 644 kernel = Instance(Popen)
646 645
647 646 # The addresses for the communication channels.
648 647 xreq_address = TCPAddress((LOCALHOST, 0))
649 648 sub_address = TCPAddress((LOCALHOST, 0))
650 649 rep_address = TCPAddress((LOCALHOST, 0))
651 650 hb_address = TCPAddress((LOCALHOST, 0))
652 651
653 652 # The classes to use for the various channels.
654 653 xreq_channel_class = Type(XReqSocketChannel)
655 654 sub_channel_class = Type(SubSocketChannel)
656 655 rep_channel_class = Type(RepSocketChannel)
657 656 hb_channel_class = Type(HBSocketChannel)
658 657
659 658 # Protected traits.
660 659 _launch_args = Any
661 660 _xreq_channel = Any
662 661 _sub_channel = Any
663 662 _rep_channel = Any
664 663 _hb_channel = Any
665 664
666 665 #--------------------------------------------------------------------------
667 666 # Channel management methods:
668 667 #--------------------------------------------------------------------------
669 668
670 669 def start_channels(self, xreq=True, sub=True, rep=True):
671 670 """Starts the channels for this kernel, but not the heartbeat.
672 671
673 672 This will create the channels if they do not exist and then start
674 673 them. If port numbers of 0 are being used (random ports) then you
675 674 must first call :method:`start_kernel`. If the channels have been
676 675 stopped and you call this, :class:`RuntimeError` will be raised.
677 676 """
678 677 if xreq:
679 678 self.xreq_channel.start()
680 679 if sub:
681 680 self.sub_channel.start()
682 681 if rep:
683 682 self.rep_channel.start()
684 683
685 684 def stop_channels(self):
686 685 """Stops all the running channels for this kernel.
687 686 """
688 687 if self.xreq_channel.is_alive():
689 688 self.xreq_channel.stop()
690 689 if self.sub_channel.is_alive():
691 690 self.sub_channel.stop()
692 691 if self.rep_channel.is_alive():
693 692 self.rep_channel.stop()
694 693
695 694 @property
696 695 def channels_running(self):
697 696 """Are any of the channels created and running?"""
698 697 return self.xreq_channel.is_alive() \
699 698 or self.sub_channel.is_alive() \
700 699 or self.rep_channel.is_alive()
701 700
702 701 #--------------------------------------------------------------------------
703 702 # Kernel process management methods:
704 703 #--------------------------------------------------------------------------
705 704
706 705 def start_kernel(self, **kw):
707 706 """Starts a kernel process and configures the manager to use it.
708 707
709 708 If random ports (port=0) are being used, this method must be called
710 709 before the channels are created.
711 710
712 711 Parameters:
713 712 -----------
714 713 ipython : bool, optional (default True)
715 714 Whether to use an IPython kernel instead of a plain Python kernel.
716 715 """
717 716 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
718 717 self.rep_address, self.hb_address
719 718 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or \
720 719 rep[0] != LOCALHOST or hb[0] != LOCALHOST:
721 720 raise RuntimeError("Can only launch a kernel on localhost."
722 721 "Make sure that the '*_address' attributes are "
723 722 "configured properly.")
724 723
725 724 self._launch_args = kw.copy()
726 725 if kw.pop('ipython', True):
727 726 from ipkernel import launch_kernel
728 727 else:
729 728 from pykernel import launch_kernel
730 729 self.kernel, xrep, pub, req, hb = launch_kernel(
731 730 xrep_port=xreq[1], pub_port=sub[1],
732 731 req_port=rep[1], hb_port=hb[1], **kw)
733 732 self.xreq_address = (LOCALHOST, xrep)
734 733 self.sub_address = (LOCALHOST, pub)
735 734 self.rep_address = (LOCALHOST, req)
736 735 self.hb_address = (LOCALHOST, hb)
737 736
738 737 def shutdown_kernel(self):
739 738 """ Attempts to the stop the kernel process cleanly. If the kernel
740 739 cannot be stopped, it is killed, if possible.
741 740 """
742 741 # FIXME: Shutdown does not work on Windows due to ZMQ errors!
743 742 if sys.platform == 'win32':
744 743 self.kill_kernel()
745 744 return
746 745
747 746 self.xreq_channel.shutdown()
748 747 # Don't send any additional kernel kill messages immediately, to give
749 748 # the kernel a chance to properly execute shutdown actions. Wait for at
750 749 # most 1s, checking every 0.1s.
751 750 for i in range(10):
752 751 if self.is_alive:
753 752 time.sleep(0.1)
754 753 else:
755 754 break
756 755 else:
757 756 # OK, we've waited long enough.
758 757 if self.has_kernel:
759 758 self.kill_kernel()
760 759
761 760 def restart_kernel(self, instant_death=False):
762 761 """Restarts a kernel with the same arguments that were used to launch
763 762 it. If the old kernel was launched with random ports, the same ports
764 763 will be used for the new kernel.
765 764
766 765 Parameters
767 766 ----------
768 767 instant_death : bool, optional
769 768 If True, the kernel is forcefully restarted *immediately*, without
770 769 having a chance to do any cleanup action. Otherwise the kernel is
771 770 given 1s to clean up before a forceful restart is issued.
772 771
773 772 In all cases the kernel is restarted, the only difference is whether
774 773 it is given a chance to perform a clean shutdown or not.
775 774 """
776 775 if self._launch_args is None:
777 776 raise RuntimeError("Cannot restart the kernel. "
778 777 "No previous call to 'start_kernel'.")
779 778 else:
780 779 if self.has_kernel:
781 780 if instant_death:
782 781 self.kill_kernel()
783 782 else:
784 783 self.shutdown_kernel()
785 784 self.start_kernel(**self._launch_args)
786 785
787 786 # FIXME: Messages get dropped in Windows due to probable ZMQ bug
788 787 # unless there is some delay here.
789 788 if sys.platform == 'win32':
790 789 time.sleep(0.2)
791 790
792 791 @property
793 792 def has_kernel(self):
794 793 """Returns whether a kernel process has been specified for the kernel
795 794 manager.
796 795 """
797 796 return self.kernel is not None
798 797
799 798 def kill_kernel(self):
800 799 """ Kill the running kernel. """
801 800 if self.has_kernel:
802 801 self.kernel.kill()
803 802 self.kernel = None
804 803 else:
805 804 raise RuntimeError("Cannot kill kernel. No kernel is running!")
806 805
806 def interrupt_kernel(self):
807 """ Interrupts the kernel. Unlike ``signal_kernel``, this operation is
808 well supported on all platforms.
809 """
810 if self.has_kernel:
811 if sys.platform == 'win32':
812 from parentpoller import ParentPollerWindows as Poller
813 Poller.send_interrupt(self.kernel.win32_interrupt_event)
814 else:
815 self.kernel.send_signal(signal.SIGINT)
816 else:
817 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
818
807 819 def signal_kernel(self, signum):
808 """ Sends a signal to the kernel. """
820 """ Sends a signal to the kernel. Note that since only SIGTERM is
821 supported on Windows, this function is only useful on Unix systems.
822 """
809 823 if self.has_kernel:
810 824 self.kernel.send_signal(signum)
811 825 else:
812 826 raise RuntimeError("Cannot signal kernel. No kernel is running!")
813 827
814 828 @property
815 829 def is_alive(self):
816 830 """Is the kernel process still running?"""
817 831 # FIXME: not using a heartbeat means this method is broken for any
818 832 # remote kernel, it's only capable of handling local kernels.
819 833 if self.has_kernel:
820 834 if self.kernel.poll() is None:
821 835 return True
822 836 else:
823 837 return False
824 838 else:
825 839 # We didn't start the kernel with this KernelManager so we don't
826 840 # know if it is running. We should use a heartbeat for this case.
827 841 return True
828 842
829 843 #--------------------------------------------------------------------------
830 844 # Channels used for communication with the kernel:
831 845 #--------------------------------------------------------------------------
832 846
833 847 @property
834 848 def xreq_channel(self):
835 849 """Get the REQ socket channel object to make requests of the kernel."""
836 850 if self._xreq_channel is None:
837 851 self._xreq_channel = self.xreq_channel_class(self.context,
838 852 self.session,
839 853 self.xreq_address)
840 854 return self._xreq_channel
841 855
842 856 @property
843 857 def sub_channel(self):
844 858 """Get the SUB socket channel object."""
845 859 if self._sub_channel is None:
846 860 self._sub_channel = self.sub_channel_class(self.context,
847 861 self.session,
848 862 self.sub_address)
849 863 return self._sub_channel
850 864
851 865 @property
852 866 def rep_channel(self):
853 867 """Get the REP socket channel object to handle stdin (raw_input)."""
854 868 if self._rep_channel is None:
855 869 self._rep_channel = self.rep_channel_class(self.context,
856 870 self.session,
857 871 self.rep_address)
858 872 return self._rep_channel
859 873
860 874 @property
861 875 def hb_channel(self):
862 876 """Get the REP socket channel object to handle stdin (raw_input)."""
863 877 if self._hb_channel is None:
864 878 self._hb_channel = self.hb_channel_class(self.context,
865 879 self.session,
866 880 self.hb_address)
867 881 return self._hb_channel
1 NO CONTENT: file was removed
General Comments 0
You need to be logged in to leave comments. Login now