##// END OF EJS Templates
Collapse Qt out of process channel class hierarchy
Thomas Kluyver -
Show More
This diff has been collapsed as it changes many lines, (573 lines changed) Show them Hide them
@@ -1,31 +1,576 b''
1 """ Defines a KernelClient that provides signals and slots.
1 """ Defines a KernelClient that provides signals and slots.
2 """
2 """
3 import atexit
4 import errno
5 from threading import Thread
6 import time
7
8 import zmq
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 # during garbage collection of threads at exit:
11 from zmq import ZMQError
12 from zmq.eventloop import ioloop, zmqstream
13
14 from IPython.external.qt import QtCore
3
15
4 # Local imports
16 # Local imports
5 from IPython.utils.traitlets import Type
17 from IPython.utils.traitlets import Type
6 from IPython.kernel.channels import (
18 from IPython.kernel.channels import HBChannel
7 ShellChannel, IOPubChannel, StdInChannel, HBChannel
8 )
9 from IPython.kernel import KernelClient
19 from IPython.kernel import KernelClient
10
20
11 from .kernel_mixins import (
21 from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin)
12 QtShellChannelMixin, QtIOPubChannelMixin,
22 from .util import SuperQObject
13 QtStdInChannelMixin, QtHBChannelMixin,
14 QtKernelClientMixin
15 )
16
23
17 class QtShellChannel(QtShellChannelMixin, ShellChannel):
24 class QtHBChannel(QtHBChannelMixin, HBChannel):
18 pass
25 pass
19
26
20 class QtIOPubChannel(QtIOPubChannelMixin, IOPubChannel):
27 from IPython.core.release import kernel_protocol_version_info
21 pass
22
28
23 class QtStdInChannel(QtStdInChannelMixin, StdInChannel):
29 from IPython.kernel.channelsabc import (
24 pass
30 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
31 )
32 from IPython.utils.py3compat import string_types, iteritems
25
33
26 class QtHBChannel(QtHBChannelMixin, HBChannel):
34 major_protocol_version = kernel_protocol_version_info[0]
35
36 class InvalidPortNumber(Exception):
27 pass
37 pass
28
38
39 # some utilities to validate message structure, these might get moved elsewhere
40 # if they prove to have more generic utility
41
42 def validate_string_list(lst):
43 """Validate that the input is a list of strings.
44
45 Raises ValueError if not."""
46 if not isinstance(lst, list):
47 raise ValueError('input %r must be a list' % lst)
48 for x in lst:
49 if not isinstance(x, string_types):
50 raise ValueError('element %r in list must be a string' % x)
51
52
53 def validate_string_dict(dct):
54 """Validate that the input is a dict with string keys and values.
55
56 Raises ValueError if not."""
57 for k,v in iteritems(dct):
58 if not isinstance(k, string_types):
59 raise ValueError('key %r in dict must be a string' % k)
60 if not isinstance(v, string_types):
61 raise ValueError('value %r in dict must be a string' % v)
62
63
64
65 class QtZMQSocketChannel(SuperQObject, Thread):
66 """The base class for the channels that use ZMQ sockets."""
67 context = None
68 session = None
69 socket = None
70 ioloop = None
71 stream = None
72 _address = None
73 _exiting = False
74 proxy_methods = []
75
76 # Emitted when the channel is started.
77 started = QtCore.Signal()
78
79 # Emitted when the channel is stopped.
80 stopped = QtCore.Signal()
81
82 message_received = QtCore.Signal(object)
83
84 #---------------------------------------------------------------------------
85 # InProcessChannel interface
86 #---------------------------------------------------------------------------
87
88 def call_handlers_later(self, *args, **kwds):
89 """ Call the message handlers later.
90 """
91 do_later = lambda: self.call_handlers(*args, **kwds)
92 QtCore.QTimer.singleShot(0, do_later)
93
94 def process_events(self):
95 """ Process any pending GUI events.
96 """
97 QtCore.QCoreApplication.instance().processEvents()
98
99 def __init__(self, context, session, address):
100 """Create a channel.
101
102 Parameters
103 ----------
104 context : :class:`zmq.Context`
105 The ZMQ context to use.
106 session : :class:`session.Session`
107 The session to use.
108 address : zmq url
109 Standard (ip, port) tuple that the kernel is listening on.
110 """
111 super(QtZMQSocketChannel, self).__init__()
112 self.daemon = True
113
114 self.context = context
115 self.session = session
116 if isinstance(address, tuple):
117 if address[1] == 0:
118 message = 'The port number for a channel cannot be 0.'
119 raise InvalidPortNumber(message)
120 address = "tcp://%s:%i" % address
121 self._address = address
122 atexit.register(self._notice_exit)
123
124 def _notice_exit(self):
125 self._exiting = True
126
127 def _run_loop(self):
128 """Run my loop, ignoring EINTR events in the poller"""
129 while True:
130 try:
131 self.ioloop.start()
132 except ZMQError as e:
133 if e.errno == errno.EINTR:
134 continue
135 else:
136 raise
137 except Exception:
138 if self._exiting:
139 break
140 else:
141 raise
142 else:
143 break
144
145 def start(self):
146 """ Reimplemented to emit signal.
147 """
148 super(QtZMQSocketChannel, self).start()
149 self.started.emit()
150
151 def stop(self):
152 """Stop the channel's event loop and join its thread.
153
154 This calls :meth:`~threading.Thread.join` and returns when the thread
155 terminates. :class:`RuntimeError` will be raised if
156 :meth:`~threading.Thread.start` is called again.
157 """
158 if self.ioloop is not None:
159 self.ioloop.stop()
160 self.join()
161 self.close()
162 self.stopped.emit()
163
164 def close(self):
165 if self.ioloop is not None:
166 try:
167 self.ioloop.close(all_fds=True)
168 except Exception:
169 pass
170 if self.socket is not None:
171 try:
172 self.socket.close(linger=0)
173 except Exception:
174 pass
175 self.socket = None
176
177 @property
178 def address(self):
179 """Get the channel's address as a zmq url string.
180
181 These URLS have the form: 'tcp://127.0.0.1:5555'.
182 """
183 return self._address
184
185 def _queue_send(self, msg):
186 """Queue a message to be sent from the IOLoop's thread.
187
188 Parameters
189 ----------
190 msg : message to send
191
192 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
193 thread control of the action.
194 """
195 def thread_send():
196 self.session.send(self.stream, msg)
197 self.ioloop.add_callback(thread_send)
198
199 def _handle_recv(self, msg):
200 """Callback for stream.on_recv.
201
202 Unpacks message, and calls handlers with it.
203 """
204 ident,smsg = self.session.feed_identities(msg)
205 msg = self.session.deserialize(smsg)
206 self.call_handlers(msg)
207
208 def call_handlers(self, msg):
209 """This method is called in the ioloop thread when a message arrives.
210
211 Subclasses should override this method to handle incoming messages.
212 It is important to remember that this method is called in the thread
213 so that some logic must be done to ensure that the application level
214 handlers are called in the application thread.
215 """
216 # Emit the generic signal.
217 self.message_received.emit(msg)
218
219
220 class QtShellChannel(QtZMQSocketChannel):
221 """The shell channel for issuing request/replies to the kernel."""
222
223 command_queue = None
224 # flag for whether execute requests should be allowed to call raw_input:
225 allow_stdin = True
226 proxy_methods = [
227 'execute',
228 'complete',
229 'inspect',
230 'history',
231 'kernel_info',
232 'shutdown',
233 'is_complete',
234 ]
235
236 # Emitted when a reply has been received for the corresponding request type.
237 execute_reply = QtCore.Signal(object)
238 complete_reply = QtCore.Signal(object)
239 inspect_reply = QtCore.Signal(object)
240 history_reply = QtCore.Signal(object)
241 kernel_info_reply = QtCore.Signal(object)
242
243 def __init__(self, context, session, address):
244 super(QtShellChannel, self).__init__(context, session, address)
245 self.ioloop = ioloop.IOLoop()
246
247 def run(self):
248 """The thread's main activity. Call start() instead."""
249 self.socket = self.context.socket(zmq.DEALER)
250 self.socket.linger = 1000
251 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
252 self.socket.connect(self.address)
253 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
254 self.stream.on_recv(self._handle_recv)
255 self._run_loop()
256
257 def call_handlers(self, msg):
258 super(QtShellChannel, self).call_handlers(msg)
259
260 # Catch kernel_info_reply for message spec adaptation
261 msg_type = msg['header']['msg_type']
262 if msg_type == 'kernel_info_reply':
263 self._handle_kernel_info_reply(msg)
264
265 # Emit specific signals
266 signal = getattr(self, msg_type, None)
267 if signal:
268 signal.emit(msg)
269
270 def execute(self, code, silent=False, store_history=True,
271 user_expressions=None, allow_stdin=None):
272 """Execute code in the kernel.
273
274 Parameters
275 ----------
276 code : str
277 A string of Python code.
278
279 silent : bool, optional (default False)
280 If set, the kernel will execute the code as quietly possible, and
281 will force store_history to be False.
282
283 store_history : bool, optional (default True)
284 If set, the kernel will store command history. This is forced
285 to be False if silent is True.
286
287 user_expressions : dict, optional
288 A dict mapping names to expressions to be evaluated in the user's
289 dict. The expression values are returned as strings formatted using
290 :func:`repr`.
291
292 allow_stdin : bool, optional (default self.allow_stdin)
293 Flag for whether the kernel can send stdin requests to frontends.
294
295 Some frontends (e.g. the Notebook) do not support stdin requests.
296 If raw_input is called from code executed from such a frontend, a
297 StdinNotImplementedError will be raised.
298
299 Returns
300 -------
301 The msg_id of the message sent.
302 """
303 if user_expressions is None:
304 user_expressions = {}
305 if allow_stdin is None:
306 allow_stdin = self.allow_stdin
307
308
309 # Don't waste network traffic if inputs are invalid
310 if not isinstance(code, string_types):
311 raise ValueError('code %r must be a string' % code)
312 validate_string_dict(user_expressions)
313
314 # Create class for content/msg creation. Related to, but possibly
315 # not in Session.
316 content = dict(code=code, silent=silent, store_history=store_history,
317 user_expressions=user_expressions,
318 allow_stdin=allow_stdin,
319 )
320 msg = self.session.msg('execute_request', content)
321 self._queue_send(msg)
322 return msg['header']['msg_id']
323
324 def complete(self, code, cursor_pos=None):
325 """Tab complete text in the kernel's namespace.
326
327 Parameters
328 ----------
329 code : str
330 The context in which completion is requested.
331 Can be anything between a variable name and an entire cell.
332 cursor_pos : int, optional
333 The position of the cursor in the block of code where the completion was requested.
334 Default: ``len(code)``
335
336 Returns
337 -------
338 The msg_id of the message sent.
339 """
340 if cursor_pos is None:
341 cursor_pos = len(code)
342 content = dict(code=code, cursor_pos=cursor_pos)
343 msg = self.session.msg('complete_request', content)
344 self._queue_send(msg)
345 return msg['header']['msg_id']
346
347 def inspect(self, code, cursor_pos=None, detail_level=0):
348 """Get metadata information about an object in the kernel's namespace.
349
350 It is up to the kernel to determine the appropriate object to inspect.
351
352 Parameters
353 ----------
354 code : str
355 The context in which info is requested.
356 Can be anything between a variable name and an entire cell.
357 cursor_pos : int, optional
358 The position of the cursor in the block of code where the info was requested.
359 Default: ``len(code)``
360 detail_level : int, optional
361 The level of detail for the introspection (0-2)
362
363 Returns
364 -------
365 The msg_id of the message sent.
366 """
367 if cursor_pos is None:
368 cursor_pos = len(code)
369 content = dict(code=code, cursor_pos=cursor_pos,
370 detail_level=detail_level,
371 )
372 msg = self.session.msg('inspect_request', content)
373 self._queue_send(msg)
374 return msg['header']['msg_id']
375
376 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
377 """Get entries from the kernel's history list.
378
379 Parameters
380 ----------
381 raw : bool
382 If True, return the raw input.
383 output : bool
384 If True, then return the output as well.
385 hist_access_type : str
386 'range' (fill in session, start and stop params), 'tail' (fill in n)
387 or 'search' (fill in pattern param).
388
389 session : int
390 For a range request, the session from which to get lines. Session
391 numbers are positive integers; negative ones count back from the
392 current session.
393 start : int
394 The first line number of a history range.
395 stop : int
396 The final (excluded) line number of a history range.
397
398 n : int
399 The number of lines of history to get for a tail request.
400
401 pattern : str
402 The glob-syntax pattern for a search request.
403
404 Returns
405 -------
406 The msg_id of the message sent.
407 """
408 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
409 **kwargs)
410 msg = self.session.msg('history_request', content)
411 self._queue_send(msg)
412 return msg['header']['msg_id']
413
414 def kernel_info(self):
415 """Request kernel info."""
416 msg = self.session.msg('kernel_info_request')
417 self._queue_send(msg)
418 return msg['header']['msg_id']
419
420 def _handle_kernel_info_reply(self, msg):
421 """handle kernel info reply
422
423 sets protocol adaptation version
424 """
425 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
426 if adapt_version != major_protocol_version:
427 self.session.adapt_version = adapt_version
428
429 def shutdown(self, restart=False):
430 """Request an immediate kernel shutdown.
431
432 Upon receipt of the (empty) reply, client code can safely assume that
433 the kernel has shut down and it's safe to forcefully terminate it if
434 it's still alive.
435
436 The kernel will send the reply via a function registered with Python's
437 atexit module, ensuring it's truly done as the kernel is done with all
438 normal operation.
439 """
440 # Send quit message to kernel. Once we implement kernel-side setattr,
441 # this should probably be done that way, but for now this will do.
442 msg = self.session.msg('shutdown_request', {'restart':restart})
443 self._queue_send(msg)
444 return msg['header']['msg_id']
445
446 def is_complete(self, code):
447 msg = self.session.msg('is_complete_request', {'code': code})
448 self._queue_send(msg)
449 return msg['header']['msg_id']
450
451
452 class QtIOPubChannel(QtZMQSocketChannel):
453 """The iopub channel which listens for messages that the kernel publishes.
454
455 This channel is where all output is published to frontends.
456 """
457 # Emitted when a message of type 'stream' is received.
458 stream_received = QtCore.Signal(object)
459
460 # Emitted when a message of type 'execute_input' is received.
461 execute_input_received = QtCore.Signal(object)
462
463 # Emitted when a message of type 'execute_result' is received.
464 execute_result_received = QtCore.Signal(object)
465
466 # Emitted when a message of type 'error' is received.
467 error_received = QtCore.Signal(object)
468
469 # Emitted when a message of type 'display_data' is received
470 display_data_received = QtCore.Signal(object)
471
472 # Emitted when a crash report message is received from the kernel's
473 # last-resort sys.excepthook.
474 crash_received = QtCore.Signal(object)
475
476 # Emitted when a shutdown is noticed.
477 shutdown_reply_received = QtCore.Signal(object)
478
479 def __init__(self, context, session, address):
480 super(QtIOPubChannel, self).__init__(context, session, address)
481 self.ioloop = ioloop.IOLoop()
482
483 def run(self):
484 """The thread's main activity. Call start() instead."""
485 self.socket = self.context.socket(zmq.SUB)
486 self.socket.linger = 1000
487 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
488 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
489 self.socket.connect(self.address)
490 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
491 self.stream.on_recv(self._handle_recv)
492 self._run_loop()
493
494 def call_handlers(self, msg):
495 super(QtIOPubChannel, self).call_handlers(msg)
496
497 # Emit signals for specialized message types.
498 msg_type = msg['header']['msg_type']
499 signal = getattr(self, msg_type + '_received', None)
500 if signal:
501 signal.emit(msg)
502
503 def flush(self, timeout=1.0):
504 """Immediately processes all pending messages on the iopub channel.
505
506 Callers should use this method to ensure that :meth:`call_handlers`
507 has been called for all messages that have been received on the
508 0MQ SUB socket of this channel.
509
510 This method is thread safe.
511
512 Parameters
513 ----------
514 timeout : float, optional
515 The maximum amount of time to spend flushing, in seconds. The
516 default is one second.
517 """
518 # We do the IOLoop callback process twice to ensure that the IOLoop
519 # gets to perform at least one full poll.
520 stop_time = time.time() + timeout
521 for i in range(2):
522 self._flushed = False
523 self.ioloop.add_callback(self._flush)
524 while not self._flushed and time.time() < stop_time:
525 time.sleep(0.01)
526
527 def _flush(self):
528 """Callback for :method:`self.flush`."""
529 self.stream.flush()
530 self._flushed = True
531
532
533 class QtStdInChannel(QtZMQSocketChannel):
534 """The stdin channel to handle raw_input requests that the kernel makes."""
535
536 msg_queue = None
537 proxy_methods = ['input']
538
539 # Emitted when an input request is received.
540 input_requested = QtCore.Signal(object)
541
542 def __init__(self, context, session, address):
543 super(QtStdInChannel, self).__init__(context, session, address)
544 self.ioloop = ioloop.IOLoop()
545
546 def run(self):
547 """The thread's main activity. Call start() instead."""
548 self.socket = self.context.socket(zmq.DEALER)
549 self.socket.linger = 1000
550 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
551 self.socket.connect(self.address)
552 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
553 self.stream.on_recv(self._handle_recv)
554 self._run_loop()
555
556 def call_handlers(self, msg):
557 super(QtStdInChannel, self).call_handlers(msg)
558
559 # Emit signals for specialized message types.
560 msg_type = msg['header']['msg_type']
561 if msg_type == 'input_request':
562 self.input_requested.emit(msg)
563
564 def input(self, string):
565 """Send a string of raw input to the kernel."""
566 content = dict(value=string)
567 msg = self.session.msg('input_reply', content)
568 self._queue_send(msg)
569
570 ShellChannelABC.register(QtShellChannel)
571 IOPubChannelABC.register(QtIOPubChannel)
572 StdInChannelABC.register(QtStdInChannel)
573
29
574
30 class QtKernelClient(QtKernelClientMixin, KernelClient):
575 class QtKernelClient(QtKernelClientMixin, KernelClient):
31 """ A KernelClient that provides signals and slots.
576 """ A KernelClient that provides signals and slots.
General Comments 0
You need to be logged in to leave comments. Login now