##// END OF EJS Templates
Merge pull request #7108 from takluyver/digging-channels...
Min RK -
r19452:fe5dd275 merge
parent child Browse files
Show More
@@ -2,46 +2,59 b''
2 2
3 3 Useful for test suites and blocking terminal interfaces.
4 4 """
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2013 The IPython Development Team
7 #
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING.txt, distributed as part of this software.
10 #-----------------------------------------------------------------------------
11 5
12 #-----------------------------------------------------------------------------
13 # Imports
14 #-----------------------------------------------------------------------------
6 # Copyright (c) IPython Development Team.
7 # Distributed under the terms of the Modified BSD License.
15 8
16 9 try:
17 10 from queue import Queue, Empty # Py 3
18 11 except ImportError:
19 12 from Queue import Queue, Empty # Py 2
20 13
21 from IPython.kernel.channels import IOPubChannel, HBChannel, \
22 ShellChannel, StdInChannel
23 14
24 #-----------------------------------------------------------------------------
25 # Blocking kernel manager
26 #-----------------------------------------------------------------------------
27
28
29 class BlockingChannelMixin(object):
15 class ZMQSocketChannel(object):
16 """A ZMQ socket in a simple blocking API"""
17 session = None
18 socket = None
19 stream = None
20 _exiting = False
21 proxy_methods = []
22
23 def __init__(self, socket, session, loop=None):
24 """Create a channel.
25
26 Parameters
27 ----------
28 socket : :class:`zmq.Socket`
29 The ZMQ socket to use.
30 session : :class:`session.Session`
31 The session to use.
32 loop
33 Unused here, for other implementations
34 """
35 super(ZMQSocketChannel, self).__init__()
30 36
31 def __init__(self, *args, **kwds):
32 super(BlockingChannelMixin, self).__init__(*args, **kwds)
33 self._in_queue = Queue()
37 self.socket = socket
38 self.session = session
34 39
35 def call_handlers(self, msg):
36 self._in_queue.put(msg)
40 def _recv(self, **kwargs):
41 msg = self.socket.recv_multipart(**kwargs)
42 ident,smsg = self.session.feed_identities(msg)
43 return self.session.deserialize(smsg)
37 44
38 45 def get_msg(self, block=True, timeout=None):
39 46 """ Gets a message if there is one that is ready. """
40 if timeout is None:
41 # Queue.get(timeout=None) has stupid uninteruptible
42 # behavior, so wait for a week instead
43 timeout = 604800
44 return self._in_queue.get(block, timeout)
47 if block:
48 if timeout is not None:
49 timeout *= 1000 # seconds to ms
50 ready = self.socket.poll(timeout)
51 else:
52 ready = self.socket.poll(timeout=0)
53
54 if ready:
55 return self._recv()
56 else:
57 raise Empty
45 58
46 59 def get_msgs(self):
47 60 """ Get all messages that are currently ready. """
@@ -55,31 +68,25 b' class BlockingChannelMixin(object):'
55 68
56 69 def msg_ready(self):
57 70 """ Is there a message that has been received? """
58 return not self._in_queue.empty()
71 return bool(self.socket.poll(timeout=0))
59 72
60
61 class BlockingIOPubChannel(BlockingChannelMixin, IOPubChannel):
73 def close(self):
74 if self.socket is not None:
75 try:
76 self.socket.close(linger=0)
77 except Exception:
62 78 pass
79 self.socket = None
80 stop = close
63 81
82 def is_alive(self):
83 return (self.socket is not None)
64 84
65 class BlockingShellChannel(BlockingChannelMixin, ShellChannel):
66 def call_handlers(self, msg):
67 if msg['msg_type'] == 'kernel_info_reply':
68 self._handle_kernel_info_reply(msg)
69 return super(BlockingShellChannel, self).call_handlers(msg)
70
85 def send(self, msg):
86 """Pass a message to the ZMQ socket to send
87 """
88 self.session.send(self.socket, msg)
71 89
72 class BlockingStdInChannel(BlockingChannelMixin, StdInChannel):
90 def start(self):
73 91 pass
74 92
75
76 class BlockingHBChannel(HBChannel):
77
78 # This kernel needs quicker monitoring, shorten to 1 sec.
79 # less than 0.5s is unreliable, and will get occasional
80 # false reports of missed beats.
81 time_to_dead = 1.
82
83 def call_handlers(self, since_last_heartbeat):
84 """ Pause beating on missed heartbeat. """
85 pass
@@ -2,32 +2,38 b''
2 2
3 3 Useful for test suites and blocking terminal interfaces.
4 4 """
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2013 The IPython Development Team
7 #
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING.txt, distributed as part of this software.
10 #-----------------------------------------------------------------------------
5 # Copyright (c) IPython Development Team.
6 # Distributed under the terms of the Modified BSD License.
11 7
12 #-----------------------------------------------------------------------------
13 # Imports
14 #-----------------------------------------------------------------------------
8 try:
9 from queue import Empty # Python 3
10 except ImportError:
11 from Queue import Empty # Python 2
15 12
16 13 from IPython.utils.traitlets import Type
14 from IPython.kernel.channels import HBChannel
17 15 from IPython.kernel.client import KernelClient
18 from .channels import (
19 BlockingIOPubChannel, BlockingHBChannel,
20 BlockingShellChannel, BlockingStdInChannel
21 )
22
23 #-----------------------------------------------------------------------------
24 # Blocking kernel manager
25 #-----------------------------------------------------------------------------
16 from .channels import ZMQSocketChannel
26 17
27 18 class BlockingKernelClient(KernelClient):
19 def wait_for_ready(self):
20 # Wait for kernel info reply on shell channel
21 while True:
22 msg = self.shell_channel.get_msg(block=True)
23 if msg['msg_type'] == 'kernel_info_reply':
24 self._handle_kernel_info_reply(msg)
25 break
26
27 # Flush IOPub channel
28 while True:
29 try:
30 msg = self.iopub_channel.get_msg(block=True, timeout=0.2)
31 print(msg['msg_type'])
32 except Empty:
33 break
28 34
29 35 # The classes to use for the various channels
30 shell_channel_class = Type(BlockingShellChannel)
31 iopub_channel_class = Type(BlockingIOPubChannel)
32 stdin_channel_class = Type(BlockingStdInChannel)
33 hb_channel_class = Type(BlockingHBChannel)
36 shell_channel_class = Type(ZMQSocketChannel)
37 iopub_channel_class = Type(ZMQSocketChannel)
38 stdin_channel_class = Type(ZMQSocketChannel)
39 hb_channel_class = Type(HBChannel)
This diff has been collapsed as it changes many lines, (503 lines changed) Show them Hide them
@@ -14,15 +14,10 b' import zmq'
14 14 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
15 15 # during garbage collection of threads at exit:
16 16 from zmq import ZMQError
17 from zmq.eventloop import ioloop, zmqstream
18 17
19 18 from IPython.core.release import kernel_protocol_version_info
20 19
21 from .channelsabc import (
22 ShellChannelABC, IOPubChannelABC,
23 HBChannelABC, StdInChannelABC,
24 )
25 from IPython.utils.py3compat import string_types, iteritems
20 from .channelsabc import HBChannelABC
26 21
27 22 #-----------------------------------------------------------------------------
28 23 # Constants and exceptions
@@ -33,52 +28,27 b' major_protocol_version = kernel_protocol_version_info[0]'
33 28 class InvalidPortNumber(Exception):
34 29 pass
35 30
36 #-----------------------------------------------------------------------------
37 # Utility functions
38 #-----------------------------------------------------------------------------
39
40 # some utilities to validate message structure, these might get moved elsewhere
41 # if they prove to have more generic utility
42
43 def validate_string_list(lst):
44 """Validate that the input is a list of strings.
45
46 Raises ValueError if not."""
47 if not isinstance(lst, list):
48 raise ValueError('input %r must be a list' % lst)
49 for x in lst:
50 if not isinstance(x, string_types):
51 raise ValueError('element %r in list must be a string' % x)
52
53
54 def validate_string_dict(dct):
55 """Validate that the input is a dict with string keys and values.
56
57 Raises ValueError if not."""
58 for k,v in iteritems(dct):
59 if not isinstance(k, string_types):
60 raise ValueError('key %r in dict must be a string' % k)
61 if not isinstance(v, string_types):
62 raise ValueError('value %r in dict must be a string' % v)
63
64
65 #-----------------------------------------------------------------------------
66 # ZMQ Socket Channel classes
67 #-----------------------------------------------------------------------------
31 class HBChannel(Thread):
32 """The heartbeat channel which monitors the kernel heartbeat.
68 33
69 class ZMQSocketChannel(Thread):
70 """The base class for the channels that use ZMQ sockets."""
34 Note that the heartbeat channel is paused by default. As long as you start
35 this channel, the kernel manager will ensure that it is paused and un-paused
36 as appropriate.
37 """
71 38 context = None
72 39 session = None
73 40 socket = None
74 ioloop = None
75 stream = None
76 _address = None
41 address = None
77 42 _exiting = False
78 proxy_methods = []
43
44 time_to_dead = 1.
45 poller = None
46 _running = None
47 _pause = None
48 _beating = None
79 49
80 50 def __init__(self, context, session, address):
81 """Create a channel.
51 """Create the heartbeat monitor thread.
82 52
83 53 Parameters
84 54 ----------
@@ -89,7 +59,7 b' class ZMQSocketChannel(Thread):'
89 59 address : zmq url
90 60 Standard (ip, port) tuple that the kernel is listening on.
91 61 """
92 super(ZMQSocketChannel, self).__init__()
62 super(HBChannel, self).__init__()
93 63 self.daemon = True
94 64
95 65 self.context = context
@@ -99,429 +69,16 b' class ZMQSocketChannel(Thread):'
99 69 message = 'The port number for a channel cannot be 0.'
100 70 raise InvalidPortNumber(message)
101 71 address = "tcp://%s:%i" % address
102 self._address = address
72 self.address = address
103 73 atexit.register(self._notice_exit)
104 74
105 def _notice_exit(self):
106 self._exiting = True
107
108 def _run_loop(self):
109 """Run my loop, ignoring EINTR events in the poller"""
110 while True:
111 try:
112 self.ioloop.start()
113 except ZMQError as e:
114 if e.errno == errno.EINTR:
115 continue
116 else:
117 raise
118 except Exception:
119 if self._exiting:
120 break
121 else:
122 raise
123 else:
124 break
125
126 def stop(self):
127 """Stop the channel's event loop and join its thread.
128
129 This calls :meth:`~threading.Thread.join` and returns when the thread
130 terminates. :class:`RuntimeError` will be raised if
131 :meth:`~threading.Thread.start` is called again.
132 """
133 if self.ioloop is not None:
134 self.ioloop.stop()
135 self.join()
136 self.close()
137
138 def close(self):
139 if self.ioloop is not None:
140 try:
141 self.ioloop.close(all_fds=True)
142 except Exception:
143 pass
144 if self.socket is not None:
145 try:
146 self.socket.close(linger=0)
147 except Exception:
148 pass
149 self.socket = None
150
151 @property
152 def address(self):
153 """Get the channel's address as a zmq url string.
154
155 These URLS have the form: 'tcp://127.0.0.1:5555'.
156 """
157 return self._address
158
159 def _queue_send(self, msg):
160 """Queue a message to be sent from the IOLoop's thread.
161
162 Parameters
163 ----------
164 msg : message to send
165
166 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
167 thread control of the action.
168 """
169 def thread_send():
170 self.session.send(self.stream, msg)
171 self.ioloop.add_callback(thread_send)
172
173 def _handle_recv(self, msg):
174 """Callback for stream.on_recv.
175
176 Unpacks message, and calls handlers with it.
177 """
178 ident,smsg = self.session.feed_identities(msg)
179 msg = self.session.deserialize(smsg)
180 self.call_handlers(msg)
181
182
183
184 class ShellChannel(ZMQSocketChannel):
185 """The shell channel for issuing request/replies to the kernel."""
186
187 command_queue = None
188 # flag for whether execute requests should be allowed to call raw_input:
189 allow_stdin = True
190 proxy_methods = [
191 'execute',
192 'complete',
193 'inspect',
194 'history',
195 'kernel_info',
196 'shutdown',
197 'is_complete',
198 ]
199
200 def __init__(self, context, session, address):
201 super(ShellChannel, self).__init__(context, session, address)
202 self.ioloop = ioloop.IOLoop()
203
204 def run(self):
205 """The thread's main activity. Call start() instead."""
206 self.socket = self.context.socket(zmq.DEALER)
207 self.socket.linger = 1000
208 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
209 self.socket.connect(self.address)
210 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
211 self.stream.on_recv(self._handle_recv)
212 self._run_loop()
213
214 def call_handlers(self, msg):
215 """This method is called in the ioloop thread when a message arrives.
216
217 Subclasses should override this method to handle incoming messages.
218 It is important to remember that this method is called in the thread
219 so that some logic must be done to ensure that the application level
220 handlers are called in the application thread.
221 """
222 raise NotImplementedError('call_handlers must be defined in a subclass.')
223
224 def execute(self, code, silent=False, store_history=True,
225 user_expressions=None, allow_stdin=None):
226 """Execute code in the kernel.
227
228 Parameters
229 ----------
230 code : str
231 A string of Python code.
232
233 silent : bool, optional (default False)
234 If set, the kernel will execute the code as quietly possible, and
235 will force store_history to be False.
236
237 store_history : bool, optional (default True)
238 If set, the kernel will store command history. This is forced
239 to be False if silent is True.
240
241 user_expressions : dict, optional
242 A dict mapping names to expressions to be evaluated in the user's
243 dict. The expression values are returned as strings formatted using
244 :func:`repr`.
245
246 allow_stdin : bool, optional (default self.allow_stdin)
247 Flag for whether the kernel can send stdin requests to frontends.
248
249 Some frontends (e.g. the Notebook) do not support stdin requests.
250 If raw_input is called from code executed from such a frontend, a
251 StdinNotImplementedError will be raised.
252
253 Returns
254 -------
255 The msg_id of the message sent.
256 """
257 if user_expressions is None:
258 user_expressions = {}
259 if allow_stdin is None:
260 allow_stdin = self.allow_stdin
261
262
263 # Don't waste network traffic if inputs are invalid
264 if not isinstance(code, string_types):
265 raise ValueError('code %r must be a string' % code)
266 validate_string_dict(user_expressions)
267
268 # Create class for content/msg creation. Related to, but possibly
269 # not in Session.
270 content = dict(code=code, silent=silent, store_history=store_history,
271 user_expressions=user_expressions,
272 allow_stdin=allow_stdin,
273 )
274 msg = self.session.msg('execute_request', content)
275 self._queue_send(msg)
276 return msg['header']['msg_id']
277
278 def complete(self, code, cursor_pos=None):
279 """Tab complete text in the kernel's namespace.
280
281 Parameters
282 ----------
283 code : str
284 The context in which completion is requested.
285 Can be anything between a variable name and an entire cell.
286 cursor_pos : int, optional
287 The position of the cursor in the block of code where the completion was requested.
288 Default: ``len(code)``
289
290 Returns
291 -------
292 The msg_id of the message sent.
293 """
294 if cursor_pos is None:
295 cursor_pos = len(code)
296 content = dict(code=code, cursor_pos=cursor_pos)
297 msg = self.session.msg('complete_request', content)
298 self._queue_send(msg)
299 return msg['header']['msg_id']
300
301 def inspect(self, code, cursor_pos=None, detail_level=0):
302 """Get metadata information about an object in the kernel's namespace.
303
304 It is up to the kernel to determine the appropriate object to inspect.
305
306 Parameters
307 ----------
308 code : str
309 The context in which info is requested.
310 Can be anything between a variable name and an entire cell.
311 cursor_pos : int, optional
312 The position of the cursor in the block of code where the info was requested.
313 Default: ``len(code)``
314 detail_level : int, optional
315 The level of detail for the introspection (0-2)
316
317 Returns
318 -------
319 The msg_id of the message sent.
320 """
321 if cursor_pos is None:
322 cursor_pos = len(code)
323 content = dict(code=code, cursor_pos=cursor_pos,
324 detail_level=detail_level,
325 )
326 msg = self.session.msg('inspect_request', content)
327 self._queue_send(msg)
328 return msg['header']['msg_id']
329
330 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
331 """Get entries from the kernel's history list.
332
333 Parameters
334 ----------
335 raw : bool
336 If True, return the raw input.
337 output : bool
338 If True, then return the output as well.
339 hist_access_type : str
340 'range' (fill in session, start and stop params), 'tail' (fill in n)
341 or 'search' (fill in pattern param).
342
343 session : int
344 For a range request, the session from which to get lines. Session
345 numbers are positive integers; negative ones count back from the
346 current session.
347 start : int
348 The first line number of a history range.
349 stop : int
350 The final (excluded) line number of a history range.
351
352 n : int
353 The number of lines of history to get for a tail request.
354
355 pattern : str
356 The glob-syntax pattern for a search request.
357
358 Returns
359 -------
360 The msg_id of the message sent.
361 """
362 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
363 **kwargs)
364 msg = self.session.msg('history_request', content)
365 self._queue_send(msg)
366 return msg['header']['msg_id']
367
368 def kernel_info(self):
369 """Request kernel info."""
370 msg = self.session.msg('kernel_info_request')
371 self._queue_send(msg)
372 return msg['header']['msg_id']
373
374 def _handle_kernel_info_reply(self, msg):
375 """handle kernel info reply
376
377 sets protocol adaptation version
378 """
379 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
380 if adapt_version != major_protocol_version:
381 self.session.adapt_version = adapt_version
382
383 def shutdown(self, restart=False):
384 """Request an immediate kernel shutdown.
385
386 Upon receipt of the (empty) reply, client code can safely assume that
387 the kernel has shut down and it's safe to forcefully terminate it if
388 it's still alive.
389
390 The kernel will send the reply via a function registered with Python's
391 atexit module, ensuring it's truly done as the kernel is done with all
392 normal operation.
393 """
394 # Send quit message to kernel. Once we implement kernel-side setattr,
395 # this should probably be done that way, but for now this will do.
396 msg = self.session.msg('shutdown_request', {'restart':restart})
397 self._queue_send(msg)
398 return msg['header']['msg_id']
399
400 def is_complete(self, code):
401 msg = self.session.msg('is_complete_request', {'code': code})
402 self._queue_send(msg)
403 return msg['header']['msg_id']
404
405
406 class IOPubChannel(ZMQSocketChannel):
407 """The iopub channel which listens for messages that the kernel publishes.
408
409 This channel is where all output is published to frontends.
410 """
411
412 def __init__(self, context, session, address):
413 super(IOPubChannel, self).__init__(context, session, address)
414 self.ioloop = ioloop.IOLoop()
415
416 def run(self):
417 """The thread's main activity. Call start() instead."""
418 self.socket = self.context.socket(zmq.SUB)
419 self.socket.linger = 1000
420 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
421 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
422 self.socket.connect(self.address)
423 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
424 self.stream.on_recv(self._handle_recv)
425 self._run_loop()
426
427 def call_handlers(self, msg):
428 """This method is called in the ioloop thread when a message arrives.
429
430 Subclasses should override this method to handle incoming messages.
431 It is important to remember that this method is called in the thread
432 so that some logic must be done to ensure that the application leve
433 handlers are called in the application thread.
434 """
435 raise NotImplementedError('call_handlers must be defined in a subclass.')
436
437 def flush(self, timeout=1.0):
438 """Immediately processes all pending messages on the iopub channel.
439
440 Callers should use this method to ensure that :meth:`call_handlers`
441 has been called for all messages that have been received on the
442 0MQ SUB socket of this channel.
443
444 This method is thread safe.
445
446 Parameters
447 ----------
448 timeout : float, optional
449 The maximum amount of time to spend flushing, in seconds. The
450 default is one second.
451 """
452 # We do the IOLoop callback process twice to ensure that the IOLoop
453 # gets to perform at least one full poll.
454 stop_time = time.time() + timeout
455 for i in range(2):
456 self._flushed = False
457 self.ioloop.add_callback(self._flush)
458 while not self._flushed and time.time() < stop_time:
459 time.sleep(0.01)
460
461 def _flush(self):
462 """Callback for :method:`self.flush`."""
463 self.stream.flush()
464 self._flushed = True
465
466
467 class StdInChannel(ZMQSocketChannel):
468 """The stdin channel to handle raw_input requests that the kernel makes."""
469
470 msg_queue = None
471 proxy_methods = ['input']
472
473 def __init__(self, context, session, address):
474 super(StdInChannel, self).__init__(context, session, address)
475 self.ioloop = ioloop.IOLoop()
476
477 def run(self):
478 """The thread's main activity. Call start() instead."""
479 self.socket = self.context.socket(zmq.DEALER)
480 self.socket.linger = 1000
481 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
482 self.socket.connect(self.address)
483 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
484 self.stream.on_recv(self._handle_recv)
485 self._run_loop()
486
487 def call_handlers(self, msg):
488 """This method is called in the ioloop thread when a message arrives.
489
490 Subclasses should override this method to handle incoming messages.
491 It is important to remember that this method is called in the thread
492 so that some logic must be done to ensure that the application leve
493 handlers are called in the application thread.
494 """
495 raise NotImplementedError('call_handlers must be defined in a subclass.')
496
497 def input(self, string):
498 """Send a string of raw input to the kernel."""
499 content = dict(value=string)
500 msg = self.session.msg('input_reply', content)
501 self._queue_send(msg)
502
503
504 class HBChannel(ZMQSocketChannel):
505 """The heartbeat channel which monitors the kernel heartbeat.
506
507 Note that the heartbeat channel is paused by default. As long as you start
508 this channel, the kernel manager will ensure that it is paused and un-paused
509 as appropriate.
510 """
511
512 time_to_dead = 3.0
513 socket = None
514 poller = None
515 _running = None
516 _pause = None
517 _beating = None
518
519 def __init__(self, context, session, address):
520 super(HBChannel, self).__init__(context, session, address)
521 75 self._running = False
522 76 self._pause =True
523 77 self.poller = zmq.Poller()
524 78
79 def _notice_exit(self):
80 self._exiting = True
81
525 82 def _create_socket(self):
526 83 if self.socket is not None:
527 84 # close previous socket, before opening a new one
@@ -621,7 +178,16 b' class HBChannel(ZMQSocketChannel):'
621 178 def stop(self):
622 179 """Stop the channel's event loop and join its thread."""
623 180 self._running = False
624 super(HBChannel, self).stop()
181 self.join()
182 self.close()
183
184 def close(self):
185 if self.socket is not None:
186 try:
187 self.socket.close(linger=0)
188 except Exception:
189 pass
190 self.socket = None
625 191
626 192 def call_handlers(self, since_last_heartbeat):
627 193 """This method is called in the ioloop thread when a message arrives.
@@ -631,14 +197,7 b' class HBChannel(ZMQSocketChannel):'
631 197 so that some logic must be done to ensure that the application level
632 198 handlers are called in the application thread.
633 199 """
634 raise NotImplementedError('call_handlers must be defined in a subclass.')
635
200 pass
636 201
637 #---------------------------------------------------------------------#-----------------------------------------------------------------------------
638 # ABC Registration
639 #-----------------------------------------------------------------------------
640 202
641 ShellChannelABC.register(ShellChannel)
642 IOPubChannelABC.register(IOPubChannel)
643 203 HBChannelABC.register(HBChannel)
644 StdInChannelABC.register(StdInChannel)
@@ -24,70 +24,6 b' class ChannelABC(with_metaclass(abc.ABCMeta, object)):'
24 24 pass
25 25
26 26
27 class ShellChannelABC(ChannelABC):
28 """ShellChannel ABC.
29
30 The docstrings for this class can be found in the base implementation:
31
32 `IPython.kernel.channels.ShellChannel`
33 """
34
35 @abc.abstractproperty
36 def allow_stdin(self):
37 pass
38
39 @abc.abstractmethod
40 def execute(self, code, silent=False, store_history=True,
41 user_expressions=None, allow_stdin=None):
42 pass
43
44 @abc.abstractmethod
45 def complete(self, text, line, cursor_pos, block=None):
46 pass
47
48 @abc.abstractmethod
49 def inspect(self, oname, detail_level=0):
50 pass
51
52 @abc.abstractmethod
53 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
54 pass
55
56 @abc.abstractmethod
57 def kernel_info(self):
58 pass
59
60 @abc.abstractmethod
61 def shutdown(self, restart=False):
62 pass
63
64
65 class IOPubChannelABC(ChannelABC):
66 """IOPubChannel ABC.
67
68 The docstrings for this class can be found in the base implementation:
69
70 `IPython.kernel.channels.IOPubChannel`
71 """
72
73 @abc.abstractmethod
74 def flush(self, timeout=1.0):
75 pass
76
77
78 class StdInChannelABC(ChannelABC):
79 """StdInChannel ABC.
80
81 The docstrings for this class can be found in the base implementation:
82
83 `IPython.kernel.channels.StdInChannel`
84 """
85
86 @abc.abstractmethod
87 def input(self, string):
88 pass
89
90
91 27 class HBChannelABC(ChannelABC):
92 28 """HBChannel ABC.
93 29
@@ -4,6 +4,8 b''
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from __future__ import absolute_import
7 from IPython.kernel.channels import major_protocol_version
8 from IPython.utils.py3compat import string_types, iteritems
7 9
8 10 import zmq
9 11
@@ -11,15 +13,25 b' from IPython.utils.traitlets import ('
11 13 Any, Instance, Type,
12 14 )
13 15
14 from .zmq.session import Session
15 from .channels import (
16 ShellChannel, IOPubChannel,
17 HBChannel, StdInChannel,
18 )
16 from .channelsabc import (ChannelABC, HBChannelABC)
19 17 from .clientabc import KernelClientABC
20 18 from .connect import ConnectionFileMixin
21 19
22 20
21 # some utilities to validate message structure, these might get moved elsewhere
22 # if they prove to have more generic utility
23
24 def validate_string_dict(dct):
25 """Validate that the input is a dict with string keys and values.
26
27 Raises ValueError if not."""
28 for k,v in iteritems(dct):
29 if not isinstance(k, string_types):
30 raise ValueError('key %r in dict must be a string' % k)
31 if not isinstance(v, string_types):
32 raise ValueError('value %r in dict must be a string' % v)
33
34
23 35 class KernelClient(ConnectionFileMixin):
24 36 """Communicates with a single kernel on any host via zmq channels.
25 37
@@ -42,10 +54,10 b' class KernelClient(ConnectionFileMixin):'
42 54 return zmq.Context.instance()
43 55
44 56 # The classes to use for the various channels
45 shell_channel_class = Type(ShellChannel)
46 iopub_channel_class = Type(IOPubChannel)
47 stdin_channel_class = Type(StdInChannel)
48 hb_channel_class = Type(HBChannel)
57 shell_channel_class = Type(ChannelABC)
58 iopub_channel_class = Type(ChannelABC)
59 stdin_channel_class = Type(ChannelABC)
60 hb_channel_class = Type(HBChannelABC)
49 61
50 62 # Protected traits
51 63 _shell_channel = Any
@@ -53,6 +65,9 b' class KernelClient(ConnectionFileMixin):'
53 65 _stdin_channel = Any
54 66 _hb_channel = Any
55 67
68 # flag for whether execute requests should be allowed to call raw_input:
69 allow_stdin = True
70
56 71 #--------------------------------------------------------------------------
57 72 # Channel proxy methods
58 73 #--------------------------------------------------------------------------
@@ -87,19 +102,14 b' class KernelClient(ConnectionFileMixin):'
87 102 """
88 103 if shell:
89 104 self.shell_channel.start()
90 for method in self.shell_channel.proxy_methods:
91 setattr(self, method, getattr(self.shell_channel, method))
105 self.kernel_info()
92 106 if iopub:
93 107 self.iopub_channel.start()
94 for method in self.iopub_channel.proxy_methods:
95 setattr(self, method, getattr(self.iopub_channel, method))
96 108 if stdin:
97 109 self.stdin_channel.start()
98 for method in self.stdin_channel.proxy_methods:
99 setattr(self, method, getattr(self.stdin_channel, method))
100 self.shell_channel.allow_stdin = True
110 self.allow_stdin = True
101 111 else:
102 self.shell_channel.allow_stdin = False
112 self.allow_stdin = False
103 113 if hb:
104 114 self.hb_channel.start()
105 115
@@ -123,14 +133,17 b' class KernelClient(ConnectionFileMixin):'
123 133 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
124 134 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
125 135
136 ioloop = None # Overridden in subclasses that use pyzmq event loop
137
126 138 @property
127 139 def shell_channel(self):
128 140 """Get the shell channel object for this kernel."""
129 141 if self._shell_channel is None:
130 142 url = self._make_url('shell')
131 143 self.log.debug("connecting shell channel to %s", url)
144 socket = self.connect_shell(identity=self.session.bsession)
132 145 self._shell_channel = self.shell_channel_class(
133 self.context, self.session, url
146 socket, self.session, self.ioloop
134 147 )
135 148 return self._shell_channel
136 149
@@ -140,8 +153,9 b' class KernelClient(ConnectionFileMixin):'
140 153 if self._iopub_channel is None:
141 154 url = self._make_url('iopub')
142 155 self.log.debug("connecting iopub channel to %s", url)
156 socket = self.connect_iopub()
143 157 self._iopub_channel = self.iopub_channel_class(
144 self.context, self.session, url
158 socket, self.session, self.ioloop
145 159 )
146 160 return self._iopub_channel
147 161
@@ -151,8 +165,9 b' class KernelClient(ConnectionFileMixin):'
151 165 if self._stdin_channel is None:
152 166 url = self._make_url('stdin')
153 167 self.log.debug("connecting stdin channel to %s", url)
168 socket = self.connect_stdin(identity=self.session.bsession)
154 169 self._stdin_channel = self.stdin_channel_class(
155 self.context, self.session, url
170 socket, self.session, self.ioloop
156 171 )
157 172 return self._stdin_channel
158 173
@@ -179,8 +194,193 b' class KernelClient(ConnectionFileMixin):'
179 194 return True
180 195
181 196
182 #-----------------------------------------------------------------------------
183 # ABC Registration
184 #-----------------------------------------------------------------------------
197 # Methods to send specific messages on channels
198 def execute(self, code, silent=False, store_history=True,
199 user_expressions=None, allow_stdin=None):
200 """Execute code in the kernel.
201
202 Parameters
203 ----------
204 code : str
205 A string of Python code.
206
207 silent : bool, optional (default False)
208 If set, the kernel will execute the code as quietly possible, and
209 will force store_history to be False.
210
211 store_history : bool, optional (default True)
212 If set, the kernel will store command history. This is forced
213 to be False if silent is True.
214
215 user_expressions : dict, optional
216 A dict mapping names to expressions to be evaluated in the user's
217 dict. The expression values are returned as strings formatted using
218 :func:`repr`.
219
220 allow_stdin : bool, optional (default self.allow_stdin)
221 Flag for whether the kernel can send stdin requests to frontends.
222
223 Some frontends (e.g. the Notebook) do not support stdin requests.
224 If raw_input is called from code executed from such a frontend, a
225 StdinNotImplementedError will be raised.
226
227 Returns
228 -------
229 The msg_id of the message sent.
230 """
231 if user_expressions is None:
232 user_expressions = {}
233 if allow_stdin is None:
234 allow_stdin = self.allow_stdin
235
236
237 # Don't waste network traffic if inputs are invalid
238 if not isinstance(code, string_types):
239 raise ValueError('code %r must be a string' % code)
240 validate_string_dict(user_expressions)
241
242 # Create class for content/msg creation. Related to, but possibly
243 # not in Session.
244 content = dict(code=code, silent=silent, store_history=store_history,
245 user_expressions=user_expressions,
246 allow_stdin=allow_stdin,
247 )
248 msg = self.session.msg('execute_request', content)
249 self.shell_channel.send(msg)
250 return msg['header']['msg_id']
251
252 def complete(self, code, cursor_pos=None):
253 """Tab complete text in the kernel's namespace.
254
255 Parameters
256 ----------
257 code : str
258 The context in which completion is requested.
259 Can be anything between a variable name and an entire cell.
260 cursor_pos : int, optional
261 The position of the cursor in the block of code where the completion was requested.
262 Default: ``len(code)``
263
264 Returns
265 -------
266 The msg_id of the message sent.
267 """
268 if cursor_pos is None:
269 cursor_pos = len(code)
270 content = dict(code=code, cursor_pos=cursor_pos)
271 msg = self.session.msg('complete_request', content)
272 self.shell_channel.send(msg)
273 return msg['header']['msg_id']
274
275 def inspect(self, code, cursor_pos=None, detail_level=0):
276 """Get metadata information about an object in the kernel's namespace.
277
278 It is up to the kernel to determine the appropriate object to inspect.
279
280 Parameters
281 ----------
282 code : str
283 The context in which info is requested.
284 Can be anything between a variable name and an entire cell.
285 cursor_pos : int, optional
286 The position of the cursor in the block of code where the info was requested.
287 Default: ``len(code)``
288 detail_level : int, optional
289 The level of detail for the introspection (0-2)
290
291 Returns
292 -------
293 The msg_id of the message sent.
294 """
295 if cursor_pos is None:
296 cursor_pos = len(code)
297 content = dict(code=code, cursor_pos=cursor_pos,
298 detail_level=detail_level,
299 )
300 msg = self.session.msg('inspect_request', content)
301 self.shell_channel.send(msg)
302 return msg['header']['msg_id']
303
304 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
305 """Get entries from the kernel's history list.
306
307 Parameters
308 ----------
309 raw : bool
310 If True, return the raw input.
311 output : bool
312 If True, then return the output as well.
313 hist_access_type : str
314 'range' (fill in session, start and stop params), 'tail' (fill in n)
315 or 'search' (fill in pattern param).
316
317 session : int
318 For a range request, the session from which to get lines. Session
319 numbers are positive integers; negative ones count back from the
320 current session.
321 start : int
322 The first line number of a history range.
323 stop : int
324 The final (excluded) line number of a history range.
325
326 n : int
327 The number of lines of history to get for a tail request.
328
329 pattern : str
330 The glob-syntax pattern for a search request.
331
332 Returns
333 -------
334 The msg_id of the message sent.
335 """
336 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
337 **kwargs)
338 msg = self.session.msg('history_request', content)
339 self.shell_channel.send(msg)
340 return msg['header']['msg_id']
341
342 def kernel_info(self):
343 """Request kernel info."""
344 msg = self.session.msg('kernel_info_request')
345 self.shell_channel.send(msg)
346 return msg['header']['msg_id']
347
348 def _handle_kernel_info_reply(self, msg):
349 """handle kernel info reply
350
351 sets protocol adaptation version
352 """
353 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
354 if adapt_version != major_protocol_version:
355 self.session.adapt_version = adapt_version
356
357 def shutdown(self, restart=False):
358 """Request an immediate kernel shutdown.
359
360 Upon receipt of the (empty) reply, client code can safely assume that
361 the kernel has shut down and it's safe to forcefully terminate it if
362 it's still alive.
363
364 The kernel will send the reply via a function registered with Python's
365 atexit module, ensuring it's truly done as the kernel is done with all
366 normal operation.
367 """
368 # Send quit message to kernel. Once we implement kernel-side setattr,
369 # this should probably be done that way, but for now this will do.
370 msg = self.session.msg('shutdown_request', {'restart':restart})
371 self.shell_channel.send(msg)
372 return msg['header']['msg_id']
373
374 def is_complete(self, code):
375 msg = self.session.msg('is_complete_request', {'code': code})
376 self.shell_channel.send(msg)
377 return msg['header']['msg_id']
378
379 def input(self, string):
380 """Send a string of raw input to the kernel."""
381 content = dict(value=string)
382 msg = self.session.msg('input_reply', content)
383 self.stdin_channel.send(msg)
384
185 385
186 386 KernelClientABC.register(KernelClient)
@@ -1,7 +1,5 b''
1 1 from .channels import (
2 InProcessShellChannel,
3 InProcessIOPubChannel,
4 InProcessStdInChannel,
2 InProcessChannel,
5 3 InProcessHBChannel,
6 4 )
7 5
@@ -9,35 +9,55 b' Useful for test suites and blocking terminal interfaces.'
9 9 # the file COPYING.txt, distributed as part of this software.
10 10 #-----------------------------------------------------------------------------
11 11
12 #-----------------------------------------------------------------------------
13 # Imports
14 #-----------------------------------------------------------------------------
12 try:
13 from queue import Queue, Empty # Py 3
14 except ImportError:
15 from Queue import Queue, Empty # Py 2
15 16
16 17 # IPython imports
17 18 from IPython.utils.io import raw_print
18 19 from IPython.utils.traitlets import Type
19 from IPython.kernel.blocking.channels import BlockingChannelMixin
20 #from IPython.kernel.blocking.channels import BlockingChannelMixin
20 21
21 22 # Local imports
22 23 from .channels import (
23 InProcessShellChannel,
24 InProcessIOPubChannel,
25 InProcessStdInChannel,
24 InProcessChannel,
26 25 )
27 26 from .client import InProcessKernelClient
28 27
29 #-----------------------------------------------------------------------------
30 # Blocking kernel manager
31 #-----------------------------------------------------------------------------
28 class BlockingInProcessChannel(InProcessChannel):
29
30 def __init__(self, *args, **kwds):
31 super(BlockingInProcessChannel, self).__init__(*args, **kwds)
32 self._in_queue = Queue()
32 33
33 class BlockingInProcessShellChannel(BlockingChannelMixin, InProcessShellChannel):
34 pass
34 def call_handlers(self, msg):
35 self._in_queue.put(msg)
36
37 def get_msg(self, block=True, timeout=None):
38 """ Gets a message if there is one that is ready. """
39 if timeout is None:
40 # Queue.get(timeout=None) has stupid uninteruptible
41 # behavior, so wait for a week instead
42 timeout = 604800
43 return self._in_queue.get(block, timeout)
44
45 def get_msgs(self):
46 """ Get all messages that are currently ready. """
47 msgs = []
48 while True:
49 try:
50 msgs.append(self.get_msg(block=False))
51 except Empty:
52 break
53 return msgs
35 54
36 class BlockingInProcessIOPubChannel(BlockingChannelMixin, InProcessIOPubChannel):
37 pass
55 def msg_ready(self):
56 """ Is there a message that has been received? """
57 return not self._in_queue.empty()
38 58
39 class BlockingInProcessStdInChannel(BlockingChannelMixin, InProcessStdInChannel):
40 59
60 class BlockingInProcessStdInChannel(BlockingInProcessChannel):
41 61 def call_handlers(self, msg):
42 62 """ Overridden for the in-process channel.
43 63
@@ -48,11 +68,27 b' class BlockingInProcessStdInChannel(BlockingChannelMixin, InProcessStdInChannel)'
48 68 _raw_input = self.client.kernel._sys_raw_input
49 69 prompt = msg['content']['prompt']
50 70 raw_print(prompt, end='')
51 self.input(_raw_input())
71 self.client.input(_raw_input())
52 72
53 73 class BlockingInProcessKernelClient(InProcessKernelClient):
54 74
55 75 # The classes to use for the various channels.
56 shell_channel_class = Type(BlockingInProcessShellChannel)
57 iopub_channel_class = Type(BlockingInProcessIOPubChannel)
76 shell_channel_class = Type(BlockingInProcessChannel)
77 iopub_channel_class = Type(BlockingInProcessChannel)
58 78 stdin_channel_class = Type(BlockingInProcessStdInChannel)
79
80 def wait_for_ready(self):
81 # Wait for kernel info reply on shell channel
82 while True:
83 msg = self.shell_channel.get_msg(block=True)
84 if msg['msg_type'] == 'kernel_info_reply':
85 self._handle_kernel_info_reply(msg)
86 break
87
88 # Flush IOPub channel
89 while True:
90 try:
91 msg = self.iopub_channel.get_msg(block=True, timeout=0.2)
92 print(msg['msg_type'])
93 except Empty:
94 break
@@ -3,10 +3,7 b''
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 from IPython.kernel.channelsabc import (
7 ShellChannelABC, IOPubChannelABC,
8 HBChannelABC, StdInChannelABC,
9 )
6 from IPython.kernel.channelsabc import HBChannelABC
10 7
11 8 from .socket import DummySocket
12 9
@@ -23,10 +20,6 b' class InProcessChannel(object):'
23 20 self.client = client
24 21 self._is_alive = False
25 22
26 #--------------------------------------------------------------------------
27 # Channel interface
28 #--------------------------------------------------------------------------
29
30 23 def is_alive(self):
31 24 return self._is_alive
32 25
@@ -43,9 +36,9 b' class InProcessChannel(object):'
43 36 """
44 37 raise NotImplementedError('call_handlers must be defined in a subclass.')
45 38
46 #--------------------------------------------------------------------------
47 # InProcessChannel interface
48 #--------------------------------------------------------------------------
39 def flush(self, timeout=1.0):
40 pass
41
49 42
50 43 def call_handlers_later(self, *args, **kwds):
51 44 """ Call the message handlers later.
@@ -65,117 +58,31 b' class InProcessChannel(object):'
65 58 raise NotImplementedError
66 59
67 60
68 class InProcessShellChannel(InProcessChannel):
69 """See `IPython.kernel.channels.ShellChannel` for docstrings."""
70
71 # flag for whether execute requests should be allowed to call raw_input
72 allow_stdin = True
73 proxy_methods = [
74 'execute',
75 'complete',
76 'inspect',
77 'history',
78 'shutdown',
79 'kernel_info',
80 ]
81
82 #--------------------------------------------------------------------------
83 # ShellChannel interface
84 #--------------------------------------------------------------------------
85
86 def execute(self, code, silent=False, store_history=True,
87 user_expressions={}, allow_stdin=None):
88 if allow_stdin is None:
89 allow_stdin = self.allow_stdin
90 content = dict(code=code, silent=silent, store_history=store_history,
91 user_expressions=user_expressions,
92 allow_stdin=allow_stdin)
93 msg = self.client.session.msg('execute_request', content)
94 self._dispatch_to_kernel(msg)
95 return msg['header']['msg_id']
96
97 def complete(self, code, cursor_pos=None):
98 if cursor_pos is None:
99 cursor_pos = len(code)
100 content = dict(code=code, cursor_pos=cursor_pos)
101 msg = self.client.session.msg('complete_request', content)
102 self._dispatch_to_kernel(msg)
103 return msg['header']['msg_id']
104
105 def inspect(self, code, cursor_pos=None, detail_level=0):
106 if cursor_pos is None:
107 cursor_pos = len(code)
108 content = dict(code=code, cursor_pos=cursor_pos,
109 detail_level=detail_level,
110 )
111 msg = self.client.session.msg('inspect_request', content)
112 self._dispatch_to_kernel(msg)
113 return msg['header']['msg_id']
114
115 def history(self, raw=True, output=False, hist_access_type='range', **kwds):
116 content = dict(raw=raw, output=output,
117 hist_access_type=hist_access_type, **kwds)
118 msg = self.client.session.msg('history_request', content)
119 self._dispatch_to_kernel(msg)
120 return msg['header']['msg_id']
121
122 def shutdown(self, restart=False):
123 # FIXME: What to do here?
124 raise NotImplementedError('Cannot shutdown in-process kernel')
125
126 def kernel_info(self):
127 """Request kernel info."""
128 msg = self.client.session.msg('kernel_info_request')
129 self._dispatch_to_kernel(msg)
130 return msg['header']['msg_id']
131
132 #--------------------------------------------------------------------------
133 # Protected interface
134 #--------------------------------------------------------------------------
135
136 def _dispatch_to_kernel(self, msg):
137 """ Send a message to the kernel and handle a reply.
138 """
139 kernel = self.client.kernel
140 if kernel is None:
141 raise RuntimeError('Cannot send request. No kernel exists.')
142
143 stream = DummySocket()
144 self.client.session.send(stream, msg)
145 msg_parts = stream.recv_multipart()
146 kernel.dispatch_shell(stream, msg_parts)
147
148 idents, reply_msg = self.client.session.recv(stream, copy=False)
149 self.call_handlers_later(reply_msg)
150
151 61
152 class InProcessIOPubChannel(InProcessChannel):
153 """See `IPython.kernel.channels.IOPubChannel` for docstrings."""
62 class InProcessHBChannel(object):
63 """A dummy heartbeat channel interface for in-process kernels.
154 64
155 def flush(self, timeout=1.0):
156 pass
157
158
159 class InProcessStdInChannel(InProcessChannel):
160 """See `IPython.kernel.channels.StdInChannel` for docstrings."""
161
162 proxy_methods = ['input']
65 Normally we use the heartbeat to check that the kernel process is alive.
66 When the kernel is in-process, that doesn't make sense, but clients still
67 expect this interface.
68 """
163 69
164 def input(self, string):
165 kernel = self.client.kernel
166 if kernel is None:
167 raise RuntimeError('Cannot send input reply. No kernel exists.')
168 kernel.raw_input_str = string
70 time_to_dead = 3.0
169 71
72 def __init__(self, client=None):
73 super(InProcessHBChannel, self).__init__()
74 self.client = client
75 self._is_alive = False
76 self._pause = True
170 77
171 class InProcessHBChannel(InProcessChannel):
172 """See `IPython.kernel.channels.HBChannel` for docstrings."""
78 def is_alive(self):
79 return self._is_alive
173 80
174 time_to_dead = 3.0
81 def start(self):
82 self._is_alive = True
175 83
176 def __init__(self, *args, **kwds):
177 super(InProcessHBChannel, self).__init__(*args, **kwds)
178 self._pause = True
84 def stop(self):
85 self._is_alive = False
179 86
180 87 def pause(self):
181 88 self._pause = True
@@ -186,11 +93,5 b' class InProcessHBChannel(InProcessChannel):'
186 93 def is_beating(self):
187 94 return not self._pause
188 95
189 #-----------------------------------------------------------------------------
190 # ABC Registration
191 #-----------------------------------------------------------------------------
192 96
193 ShellChannelABC.register(InProcessShellChannel)
194 IOPubChannelABC.register(InProcessIOPubChannel)
195 97 HBChannelABC.register(InProcessHBChannel)
196 StdInChannelABC.register(InProcessStdInChannel)
@@ -12,16 +12,15 b''
12 12 #-----------------------------------------------------------------------------
13 13
14 14 # IPython imports
15 from IPython.kernel.inprocess.socket import DummySocket
15 16 from IPython.utils.traitlets import Type, Instance
16 17 from IPython.kernel.clientabc import KernelClientABC
17 18 from IPython.kernel.client import KernelClient
18 19
19 20 # Local imports
20 21 from .channels import (
21 InProcessShellChannel,
22 InProcessIOPubChannel,
22 InProcessChannel,
23 23 InProcessHBChannel,
24 InProcessStdInChannel,
25 24
26 25 )
27 26
@@ -40,9 +39,9 b' class InProcessKernelClient(KernelClient):'
40 39 """
41 40
42 41 # The classes to use for the various channels.
43 shell_channel_class = Type(InProcessShellChannel)
44 iopub_channel_class = Type(InProcessIOPubChannel)
45 stdin_channel_class = Type(InProcessStdInChannel)
42 shell_channel_class = Type(InProcessChannel)
43 iopub_channel_class = Type(InProcessChannel)
44 stdin_channel_class = Type(InProcessChannel)
46 45 hb_channel_class = Type(InProcessHBChannel)
47 46
48 47 kernel = Instance('IPython.kernel.inprocess.ipkernel.InProcessKernel')
@@ -79,6 +78,76 b' class InProcessKernelClient(KernelClient):'
79 78 self._hb_channel = self.hb_channel_class(self)
80 79 return self._hb_channel
81 80
81 # Methods for sending specific messages
82 # -------------------------------------
83
84 def execute(self, code, silent=False, store_history=True,
85 user_expressions={}, allow_stdin=None):
86 if allow_stdin is None:
87 allow_stdin = self.allow_stdin
88 content = dict(code=code, silent=silent, store_history=store_history,
89 user_expressions=user_expressions,
90 allow_stdin=allow_stdin)
91 msg = self.session.msg('execute_request', content)
92 self._dispatch_to_kernel(msg)
93 return msg['header']['msg_id']
94
95 def complete(self, code, cursor_pos=None):
96 if cursor_pos is None:
97 cursor_pos = len(code)
98 content = dict(code=code, cursor_pos=cursor_pos)
99 msg = self.session.msg('complete_request', content)
100 self._dispatch_to_kernel(msg)
101 return msg['header']['msg_id']
102
103 def inspect(self, code, cursor_pos=None, detail_level=0):
104 if cursor_pos is None:
105 cursor_pos = len(code)
106 content = dict(code=code, cursor_pos=cursor_pos,
107 detail_level=detail_level,
108 )
109 msg = self.session.msg('inspect_request', content)
110 self._dispatch_to_kernel(msg)
111 return msg['header']['msg_id']
112
113 def history(self, raw=True, output=False, hist_access_type='range', **kwds):
114 content = dict(raw=raw, output=output,
115 hist_access_type=hist_access_type, **kwds)
116 msg = self.session.msg('history_request', content)
117 self._dispatch_to_kernel(msg)
118 return msg['header']['msg_id']
119
120 def shutdown(self, restart=False):
121 # FIXME: What to do here?
122 raise NotImplementedError('Cannot shutdown in-process kernel')
123
124 def kernel_info(self):
125 """Request kernel info."""
126 msg = self.session.msg('kernel_info_request')
127 self._dispatch_to_kernel(msg)
128 return msg['header']['msg_id']
129
130 def input(self, string):
131 if self.kernel is None:
132 raise RuntimeError('Cannot send input reply. No kernel exists.')
133 self.kernel.raw_input_str = string
134
135
136 def _dispatch_to_kernel(self, msg):
137 """ Send a message to the kernel and handle a reply.
138 """
139 kernel = self.kernel
140 if kernel is None:
141 raise RuntimeError('Cannot send request. No kernel exists.')
142
143 stream = DummySocket()
144 self.session.send(stream, msg)
145 msg_parts = stream.recv_multipart()
146 kernel.dispatch_shell(stream, msg_parts)
147
148 idents, reply_msg = self.session.recv(stream, copy=False)
149 self.shell_channel.call_handlers_later(reply_msg)
150
82 151
83 152 #-----------------------------------------------------------------------------
84 153 # ABC Registration
@@ -26,6 +26,7 b' class InProcessKernelTestCase(unittest.TestCase):'
26 26 self.km.start_kernel()
27 27 self.kc = BlockingInProcessKernelClient(kernel=self.km.kernel)
28 28 self.kc.start_channels()
29 self.kc.wait_for_ready()
29 30
30 31 @skipif_not_matplotlib
31 32 def test_pylab(self):
@@ -61,7 +62,7 b' class InProcessKernelTestCase(unittest.TestCase):'
61 62
62 63 kc = BlockingInProcessKernelClient(kernel=kernel)
63 64 kernel.frontends.append(kc)
64 kc.shell_channel.execute('print("bar")')
65 kc.execute('print("bar")')
65 66 msg = get_stream_message(kc)
66 67 self.assertEqual(msg['content']['text'], 'bar\n')
67 68
@@ -51,6 +51,7 b' class InProcessKernelManagerTestCase(unittest.TestCase):'
51 51 km.start_kernel()
52 52 kc = BlockingInProcessKernelClient(kernel=km.kernel)
53 53 kc.start_channels()
54 kc.wait_for_ready()
54 55 kc.execute('foo = 1')
55 56 self.assertEquals(km.kernel.shell.user_ns['foo'], 1)
56 57
@@ -61,6 +62,7 b' class InProcessKernelManagerTestCase(unittest.TestCase):'
61 62 km.start_kernel()
62 63 kc = BlockingInProcessKernelClient(kernel=km.kernel)
63 64 kc.start_channels()
65 kc.wait_for_ready()
64 66 km.kernel.shell.push({'my_bar': 0, 'my_baz': 1})
65 67 kc.complete('my_ba', 5)
66 68 msg = kc.get_shell_msg()
@@ -75,6 +77,7 b' class InProcessKernelManagerTestCase(unittest.TestCase):'
75 77 km.start_kernel()
76 78 kc = BlockingInProcessKernelClient(kernel=km.kernel)
77 79 kc.start_channels()
80 kc.wait_for_ready()
78 81 km.kernel.shell.user_ns['foo'] = 1
79 82 kc.inspect('foo')
80 83 msg = kc.get_shell_msg()
@@ -91,6 +94,7 b' class InProcessKernelManagerTestCase(unittest.TestCase):'
91 94 km.start_kernel()
92 95 kc = BlockingInProcessKernelClient(kernel=km.kernel)
93 96 kc.start_channels()
97 kc.wait_for_ready()
94 98 kc.execute('%who')
95 99 kc.history(hist_access_type='tail', n=1)
96 100 msg = kc.shell_channel.get_msgs()[-1]
@@ -420,17 +420,8 b" def start_new_kernel(startup_timeout=60, kernel_name='python', **kwargs):"
420 420 km.start_kernel(**kwargs)
421 421 kc = km.client()
422 422 kc.start_channels()
423 kc.wait_for_ready()
423 424
424 kc.kernel_info()
425 kc.get_shell_msg(block=True, timeout=startup_timeout)
426
427 # Flush channels
428 for channel in (kc.shell_channel, kc.iopub_channel):
429 while True:
430 try:
431 channel.get_msg(block=True, timeout=0.1)
432 except Empty:
433 break
434 425 return km, kc
435 426
436 427 @contextmanager
@@ -92,6 +92,7 b' def setup_kernel(cmd):'
92 92 client = BlockingKernelClient(connection_file=connection_file)
93 93 client.load_connection_file()
94 94 client.start_channels()
95 client.wait_for_ready()
95 96
96 97 try:
97 98 yield client
@@ -46,7 +46,7 b' class ExecutePreprocessor(Preprocessor):'
46 46 if cell.cell_type != 'code':
47 47 return cell, resources
48 48 try:
49 outputs = self.run_cell(self.kc.shell_channel, self.kc.iopub_channel, cell)
49 outputs = self.run_cell(cell)
50 50 except Exception as e:
51 51 self.log.error("failed to run cell: " + repr(e))
52 52 self.log.error(str(cell.source))
@@ -54,13 +54,13 b' class ExecutePreprocessor(Preprocessor):'
54 54 cell.outputs = outputs
55 55 return cell, resources
56 56
57 def run_cell(self, shell, iopub, cell):
58 msg_id = shell.execute(cell.source)
57 def run_cell(self, cell):
58 msg_id = self.kc.execute(cell.source)
59 59 self.log.debug("Executing cell:\n%s", cell.source)
60 60 # wait for finish, with timeout
61 61 while True:
62 62 try:
63 msg = shell.get_msg(timeout=self.timeout)
63 msg = self.kc.shell_channel.get_msg(timeout=self.timeout)
64 64 except Empty:
65 65 self.log.error("Timeout waiting for execute reply")
66 66 raise
@@ -74,7 +74,7 b' class ExecutePreprocessor(Preprocessor):'
74 74
75 75 while True:
76 76 try:
77 msg = iopub.get_msg(timeout=self.timeout)
77 msg = self.kc.iopub_channel.get_msg(timeout=self.timeout)
78 78 except Empty:
79 79 self.log.warn("Timeout waiting for IOPub output")
80 80 break
@@ -1,29 +1,213 b''
1 1 """ Defines a KernelClient that provides signals and slots.
2 2 """
3 import atexit
4 import errno
5 from threading import Thread
6 import time
7
8 import zmq
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 # during garbage collection of threads at exit:
11 from zmq import ZMQError
12 from zmq.eventloop import ioloop, zmqstream
13
14 from IPython.external.qt import QtCore
3 15
4 16 # Local imports
5 from IPython.utils.traitlets import Type
6 from IPython.kernel.channels import (
7 ShellChannel, IOPubChannel, StdInChannel, HBChannel
8 )
17 from IPython.utils.traitlets import Type, Instance
18 from IPython.kernel.channels import HBChannel
9 19 from IPython.kernel import KernelClient
10 20
11 from .kernel_mixins import (
12 QtShellChannelMixin, QtIOPubChannelMixin,
13 QtStdInChannelMixin, QtHBChannelMixin,
14 QtKernelClientMixin
15 )
21 from .kernel_mixins import QtKernelClientMixin
22 from .util import SuperQObject
16 23
17 class QtShellChannel(QtShellChannelMixin, ShellChannel):
18 pass
24 class QtHBChannel(SuperQObject, HBChannel):
25 # A longer timeout than the base class
26 time_to_dead = 3.0
27
28 # Emitted when the kernel has died.
29 kernel_died = QtCore.Signal(object)
30
31 def call_handlers(self, since_last_heartbeat):
32 """ Reimplemented to emit signals instead of making callbacks.
33 """
34 # Emit the generic signal.
35 self.kernel_died.emit(since_last_heartbeat)
36
37 from IPython.core.release import kernel_protocol_version_info
19 38
20 class QtIOPubChannel(QtIOPubChannelMixin, IOPubChannel):
39 major_protocol_version = kernel_protocol_version_info[0]
40
41 class InvalidPortNumber(Exception):
21 42 pass
22 43
23 class QtStdInChannel(QtStdInChannelMixin, StdInChannel):
44
45 class QtZMQSocketChannel(SuperQObject):
46 """A ZMQ socket emitting a Qt signal when a message is received."""
47 session = None
48 socket = None
49 ioloop = None
50 stream = None
51
52 message_received = QtCore.Signal(object)
53
54 def process_events(self):
55 """ Process any pending GUI events.
56 """
57 QtCore.QCoreApplication.instance().processEvents()
58
59 def __init__(self, socket, session, loop):
60 """Create a channel.
61
62 Parameters
63 ----------
64 socket : :class:`zmq.Socket`
65 The ZMQ socket to use.
66 session : :class:`session.Session`
67 The session to use.
68 loop
69 A pyzmq ioloop to connect the socket to using a ZMQStream
70 """
71 super(QtZMQSocketChannel, self).__init__()
72
73 self.socket = socket
74 self.session = session
75 self.ioloop = loop
76
77 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
78 self.stream.on_recv(self._handle_recv)
79
80 _is_alive = False
81 def is_alive(self):
82 return self._is_alive
83
84 def start(self):
85 self._is_alive = True
86
87 def stop(self):
88 self._is_alive = False
89
90 def close(self):
91 if self.socket is not None:
92 try:
93 self.socket.close(linger=0)
94 except Exception:
24 95 pass
96 self.socket = None
25 97
26 class QtHBChannel(QtHBChannelMixin, HBChannel):
98 def send(self, msg):
99 """Queue a message to be sent from the IOLoop's thread.
100
101 Parameters
102 ----------
103 msg : message to send
104
105 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
106 thread control of the action.
107 """
108 def thread_send():
109 self.session.send(self.stream, msg)
110 self.ioloop.add_callback(thread_send)
111
112 def _handle_recv(self, msg):
113 """Callback for stream.on_recv.
114
115 Unpacks message, and calls handlers with it.
116 """
117 ident,smsg = self.session.feed_identities(msg)
118 msg = self.session.deserialize(smsg)
119 self.call_handlers(msg)
120
121 def call_handlers(self, msg):
122 """This method is called in the ioloop thread when a message arrives.
123
124 Subclasses should override this method to handle incoming messages.
125 It is important to remember that this method is called in the thread
126 so that some logic must be done to ensure that the application level
127 handlers are called in the application thread.
128 """
129 # Emit the generic signal.
130 self.message_received.emit(msg)
131
132 def flush(self, timeout=1.0):
133 """Immediately processes all pending messages on this channel.
134
135 This is only used for the IOPub channel.
136
137 Callers should use this method to ensure that :meth:`call_handlers`
138 has been called for all messages that have been received on the
139 0MQ SUB socket of this channel.
140
141 This method is thread safe.
142
143 Parameters
144 ----------
145 timeout : float, optional
146 The maximum amount of time to spend flushing, in seconds. The
147 default is one second.
148 """
149 # We do the IOLoop callback process twice to ensure that the IOLoop
150 # gets to perform at least one full poll.
151 stop_time = time.time() + timeout
152 for i in range(2):
153 self._flushed = False
154 self.ioloop.add_callback(self._flush)
155 while not self._flushed and time.time() < stop_time:
156 time.sleep(0.01)
157
158 def _flush(self):
159 """Callback for :method:`self.flush`."""
160 self.stream.flush()
161 self._flushed = True
162
163
164 class IOLoopThread(Thread):
165 """Run a pyzmq ioloop in a thread to send and receive messages
166 """
167 def __init__(self, loop):
168 super(IOLoopThread, self).__init__()
169 self.daemon = True
170 atexit.register(self._notice_exit)
171 self.ioloop = loop or ioloop.IOLoop()
172
173 def _notice_exit(self):
174 self._exiting = True
175
176 def run(self):
177 """Run my loop, ignoring EINTR events in the poller"""
178 while True:
179 try:
180 self.ioloop.start()
181 except ZMQError as e:
182 if e.errno == errno.EINTR:
183 continue
184 else:
185 raise
186 except Exception:
187 if self._exiting:
188 break
189 else:
190 raise
191 else:
192 break
193
194 def stop(self):
195 """Stop the channel's event loop and join its thread.
196
197 This calls :meth:`~threading.Thread.join` and returns when the thread
198 terminates. :class:`RuntimeError` will be raised if
199 :meth:`~threading.Thread.start` is called again.
200 """
201 if self.ioloop is not None:
202 self.ioloop.stop()
203 self.join()
204 self.close()
205
206 def close(self):
207 if self.ioloop is not None:
208 try:
209 self.ioloop.close(all_fds=True)
210 except Exception:
27 211 pass
28 212
29 213
@@ -31,7 +215,35 b' class QtKernelClient(QtKernelClientMixin, KernelClient):'
31 215 """ A KernelClient that provides signals and slots.
32 216 """
33 217
34 iopub_channel_class = Type(QtIOPubChannel)
35 shell_channel_class = Type(QtShellChannel)
36 stdin_channel_class = Type(QtStdInChannel)
218 _ioloop = None
219 @property
220 def ioloop(self):
221 if self._ioloop is None:
222 self._ioloop = ioloop.IOLoop()
223 return self._ioloop
224
225 ioloop_thread = Instance(IOLoopThread)
226
227 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
228 if shell:
229 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
230
231 self.ioloop_thread = IOLoopThread(self.ioloop)
232 self.ioloop_thread.start()
233
234 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
235
236 def _check_kernel_info_reply(self, msg):
237 if msg['msg_type'] == 'kernel_info_reply':
238 self._handle_kernel_info_reply(msg)
239 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
240
241 def stop_channels(self):
242 super(QtKernelClient, self).stop_channels()
243 if self.ioloop_thread.is_alive():
244 self.ioloop_thread.stop()
245
246 iopub_channel_class = Type(QtZMQSocketChannel)
247 shell_channel_class = Type(QtZMQSocketChannel)
248 stdin_channel_class = Type(QtZMQSocketChannel)
37 249 hb_channel_class = Type(QtHBChannel)
@@ -82,6 +82,8 b' class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin):'
82 82 # The text to show when the kernel is (re)started.
83 83 banner = Unicode(config=True)
84 84 kernel_banner = Unicode()
85 # Whether to show the banner
86 _display_banner = Bool(False)
85 87
86 88 # An option and corresponding signal for overriding the default kernel
87 89 # interrupt behavior.
@@ -464,7 +466,7 b' class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin):'
464 466 self.kernel_client.iopub_channel.flush()
465 467
466 468 def callback(line):
467 self.kernel_client.stdin_channel.input(line)
469 self.kernel_client.input(line)
468 470 if self._reading:
469 471 self.log.debug("Got second input request, assuming first was interrupted.")
470 472 self._reading = False
@@ -225,7 +225,7 b' class HistoryConsoleWidget(ConsoleWidget):'
225 225 return self._history[-n:]
226 226
227 227 def _request_update_session_history_length(self):
228 msg_id = self.kernel_client.shell_channel.execute('',
228 msg_id = self.kernel_client.execute('',
229 229 silent=True,
230 230 user_expressions={
231 231 'hlen':'len(get_ipython().history_manager.input_hist_raw)',
@@ -203,7 +203,7 b' class IPythonWidget(FrontendWidget):'
203 203 self._retrying_history_request = True
204 204 # wait out the kernel's queue flush, which is currently timed at 0.1s
205 205 time.sleep(0.25)
206 self.kernel_client.shell_channel.history(hist_access_type='tail',n=1000)
206 self.kernel_client.history(hist_access_type='tail',n=1000)
207 207 else:
208 208 self._retrying_history_request = False
209 209 return
@@ -296,12 +296,11 b' class IPythonWidget(FrontendWidget):'
296 296 # The reply will trigger %guiref load provided language=='python'
297 297 self.kernel_client.kernel_info()
298 298
299 self.kernel_client.shell_channel.history(hist_access_type='tail',
300 n=1000)
299 self.kernel_client.history(hist_access_type='tail', n=1000)
301 300
302 301 def _load_guiref_magic(self):
303 302 """Load %guiref magic."""
304 self.kernel_client.shell_channel.execute('\n'.join([
303 self.kernel_client.execute('\n'.join([
305 304 "try:",
306 305 " _usage",
307 306 "except:",
@@ -385,7 +384,7 b' class IPythonWidget(FrontendWidget):'
385 384 """
386 385 # If a number was not specified, make a prompt number request.
387 386 if number is None:
388 msg_id = self.kernel_client.shell_channel.execute('', silent=True)
387 msg_id = self.kernel_client.execute('', silent=True)
389 388 info = self._ExecutionRequest(msg_id, 'prompt')
390 389 self._request_info['execute'][msg_id] = info
391 390 return
@@ -2,38 +2,73 b''
2 2 """
3 3
4 4 # Local imports.
5 from IPython.external.qt import QtCore
5 6 from IPython.kernel.inprocess import (
6 InProcessShellChannel, InProcessIOPubChannel, InProcessStdInChannel,
7 7 InProcessHBChannel, InProcessKernelClient, InProcessKernelManager,
8 8 )
9 from IPython.kernel.inprocess.channels import InProcessChannel
9 10
10 11 from IPython.utils.traitlets import Type
12 from .util import SuperQObject
11 13 from .kernel_mixins import (
12 QtShellChannelMixin, QtIOPubChannelMixin,
13 QtStdInChannelMixin, QtHBChannelMixin, QtKernelClientMixin,
14 QtKernelManagerMixin,
14 QtKernelClientMixin, QtKernelManagerMixin,
15 15 )
16 16
17 class QtInProcessChannel(SuperQObject, InProcessChannel):
18 # Emitted when the channel is started.
19 started = QtCore.Signal()
17 20
18 class QtInProcessShellChannel(QtShellChannelMixin, InProcessShellChannel):
19 pass
21 # Emitted when the channel is stopped.
22 stopped = QtCore.Signal()
20 23
21 class QtInProcessIOPubChannel(QtIOPubChannelMixin, InProcessIOPubChannel):
22 pass
24 # Emitted when any message is received.
25 message_received = QtCore.Signal(object)
23 26
24 class QtInProcessStdInChannel(QtStdInChannelMixin, InProcessStdInChannel):
25 pass
27 def start(self):
28 """ Reimplemented to emit signal.
29 """
30 super(QtInProcessChannel, self).start()
31 self.started.emit()
32
33 def stop(self):
34 """ Reimplemented to emit signal.
35 """
36 super(QtInProcessChannel, self).stop()
37 self.stopped.emit()
38
39 def call_handlers_later(self, *args, **kwds):
40 """ Call the message handlers later.
41 """
42 do_later = lambda: self.call_handlers(*args, **kwds)
43 QtCore.QTimer.singleShot(0, do_later)
44
45 def call_handlers(self, msg):
46 self.message_received.emit(msg)
47
48 def process_events(self):
49 """ Process any pending GUI events.
50 """
51 QtCore.QCoreApplication.instance().processEvents()
52
53 def flush(self, timeout=1.0):
54 """ Reimplemented to ensure that signals are dispatched immediately.
55 """
56 super(QtInProcessChannel, self).flush()
57 self.process_events()
58
59
60 class QtInProcessHBChannel(SuperQObject, InProcessHBChannel):
61 # This signal will never be fired, but it needs to exist
62 kernel_died = QtCore.Signal()
26 63
27 class QtInProcessHBChannel(QtHBChannelMixin, InProcessHBChannel):
28 pass
29 64
30 65 class QtInProcessKernelClient(QtKernelClientMixin, InProcessKernelClient):
31 66 """ An in-process KernelManager with signals and slots.
32 67 """
33 68
34 iopub_channel_class = Type(QtInProcessIOPubChannel)
35 shell_channel_class = Type(QtInProcessShellChannel)
36 stdin_channel_class = Type(QtInProcessStdInChannel)
69 iopub_channel_class = Type(QtInProcessChannel)
70 shell_channel_class = Type(QtInProcessChannel)
71 stdin_channel_class = Type(QtInProcessChannel)
37 72 hb_channel_class = Type(QtInProcessHBChannel)
38 73
39 74 class QtInProcessKernelManager(QtKernelManagerMixin, InProcessKernelManager):
@@ -9,146 +9,6 b' from IPython.utils.traitlets import HasTraits, Type'
9 9 from .util import MetaQObjectHasTraits, SuperQObject
10 10
11 11
12 class ChannelQObject(SuperQObject):
13
14 # Emitted when the channel is started.
15 started = QtCore.Signal()
16
17 # Emitted when the channel is stopped.
18 stopped = QtCore.Signal()
19
20 def start(self):
21 """ Reimplemented to emit signal.
22 """
23 super(ChannelQObject, self).start()
24 self.started.emit()
25
26 def stop(self):
27 """ Reimplemented to emit signal.
28 """
29 super(ChannelQObject, self).stop()
30 self.stopped.emit()
31
32 #---------------------------------------------------------------------------
33 # InProcessChannel interface
34 #---------------------------------------------------------------------------
35
36 def call_handlers_later(self, *args, **kwds):
37 """ Call the message handlers later.
38 """
39 do_later = lambda: self.call_handlers(*args, **kwds)
40 QtCore.QTimer.singleShot(0, do_later)
41
42 def process_events(self):
43 """ Process any pending GUI events.
44 """
45 QtCore.QCoreApplication.instance().processEvents()
46
47
48 class QtShellChannelMixin(ChannelQObject):
49
50 # Emitted when any message is received.
51 message_received = QtCore.Signal(object)
52
53 # Emitted when a reply has been received for the corresponding request type.
54 execute_reply = QtCore.Signal(object)
55 complete_reply = QtCore.Signal(object)
56 inspect_reply = QtCore.Signal(object)
57 history_reply = QtCore.Signal(object)
58 kernel_info_reply = QtCore.Signal(object)
59
60 def call_handlers(self, msg):
61 """ Reimplemented to emit signals instead of making callbacks.
62 """
63 # Emit the generic signal.
64 self.message_received.emit(msg)
65
66 # Emit signals for specialized message types.
67 msg_type = msg['header']['msg_type']
68 if msg_type == 'kernel_info_reply':
69 self._handle_kernel_info_reply(msg)
70
71 signal = getattr(self, msg_type, None)
72 if signal:
73 signal.emit(msg)
74
75
76 class QtIOPubChannelMixin(ChannelQObject):
77
78 # Emitted when any message is received.
79 message_received = QtCore.Signal(object)
80
81 # Emitted when a message of type 'stream' is received.
82 stream_received = QtCore.Signal(object)
83
84 # Emitted when a message of type 'execute_input' is received.
85 execute_input_received = QtCore.Signal(object)
86
87 # Emitted when a message of type 'execute_result' is received.
88 execute_result_received = QtCore.Signal(object)
89
90 # Emitted when a message of type 'error' is received.
91 error_received = QtCore.Signal(object)
92
93 # Emitted when a message of type 'display_data' is received
94 display_data_received = QtCore.Signal(object)
95
96 # Emitted when a crash report message is received from the kernel's
97 # last-resort sys.excepthook.
98 crash_received = QtCore.Signal(object)
99
100 # Emitted when a shutdown is noticed.
101 shutdown_reply_received = QtCore.Signal(object)
102
103 def call_handlers(self, msg):
104 """ Reimplemented to emit signals instead of making callbacks.
105 """
106 # Emit the generic signal.
107 self.message_received.emit(msg)
108 # Emit signals for specialized message types.
109 msg_type = msg['header']['msg_type']
110 signal = getattr(self, msg_type + '_received', None)
111 if signal:
112 signal.emit(msg)
113
114 def flush(self):
115 """ Reimplemented to ensure that signals are dispatched immediately.
116 """
117 super(QtIOPubChannelMixin, self).flush()
118 QtCore.QCoreApplication.instance().processEvents()
119
120
121 class QtStdInChannelMixin(ChannelQObject):
122
123 # Emitted when any message is received.
124 message_received = QtCore.Signal(object)
125
126 # Emitted when an input request is received.
127 input_requested = QtCore.Signal(object)
128
129 def call_handlers(self, msg):
130 """ Reimplemented to emit signals instead of making callbacks.
131 """
132 # Emit the generic signal.
133 self.message_received.emit(msg)
134
135 # Emit signals for specialized message types.
136 msg_type = msg['header']['msg_type']
137 if msg_type == 'input_request':
138 self.input_requested.emit(msg)
139
140
141 class QtHBChannelMixin(ChannelQObject):
142
143 # Emitted when the kernel has died.
144 kernel_died = QtCore.Signal(object)
145
146 def call_handlers(self, since_last_heartbeat):
147 """ Reimplemented to emit signals instead of making callbacks.
148 """
149 self.kernel_died.emit(since_last_heartbeat)
150
151
152 12 class QtKernelRestarterMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
153 13
154 14 _timer = None
@@ -171,12 +31,6 b" class QtKernelClientMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObje"
171 31 # Emitted when the kernel client has stopped listening.
172 32 stopped_channels = QtCore.Signal()
173 33
174 # Use Qt-specific channel classes that emit signals.
175 iopub_channel_class = Type(QtIOPubChannelMixin)
176 shell_channel_class = Type(QtShellChannelMixin)
177 stdin_channel_class = Type(QtStdInChannelMixin)
178 hb_channel_class = Type(QtHBChannelMixin)
179
180 34 #---------------------------------------------------------------------------
181 35 # 'KernelClient' interface
182 36 #---------------------------------------------------------------------------
@@ -36,7 +36,7 b' class ZMQCompleter(IPCompleter):'
36 36
37 37 # send completion request to kernel
38 38 # Give the kernel up to 0.5s to respond
39 msg_id = self.client.shell_channel.complete(
39 msg_id = self.client.complete(
40 40 code=line,
41 41 cursor_pos=cursor_pos,
42 42 )
@@ -157,8 +157,8 b' class ZMQTerminalInteractiveShell(TerminalInteractiveShell):'
157 157 # flush stale replies, which could have been ignored, due to missed heartbeats
158 158 while self.client.shell_channel.msg_ready():
159 159 self.client.shell_channel.get_msg()
160 # shell_channel.execute takes 'hidden', which is the inverse of store_hist
161 msg_id = self.client.shell_channel.execute(cell, not store_history)
160 # execute takes 'hidden', which is the inverse of store_hist
161 msg_id = self.client.execute(cell, not store_history)
162 162
163 163 # first thing is wait for any side effects (output, stdin, etc.)
164 164 self._executing = True
@@ -399,7 +399,7 b' class ZMQTerminalInteractiveShell(TerminalInteractiveShell):'
399 399 # only send stdin reply if there *was not* another request
400 400 # or execution finished while we were reading.
401 401 if not (self.client.stdin_channel.msg_ready() or self.client.shell_channel.msg_ready()):
402 self.client.stdin_channel.input(raw_data)
402 self.client.input(raw_data)
403 403
404 404 def mainloop(self, display_banner=False):
405 405 while True:
@@ -414,7 +414,7 b' class ZMQTerminalInteractiveShell(TerminalInteractiveShell):'
414 414 # handling seems rather unpredictable...
415 415 self.write("\nKeyboardInterrupt in interact()\n")
416 416
417 self.client.shell_channel.shutdown()
417 self.client.shutdown()
418 418
419 419 def _banner1_default(self):
420 420 return "IPython Console {version}\n".format(version=release.version)
General Comments 0
You need to be logged in to leave comments. Login now