##// END OF EJS Templates
Merge pull request #7108 from takluyver/digging-channels...
Min RK -
r19452:fe5dd275 merge
parent child Browse files
Show More
@@ -2,46 +2,59 b''
2
2
3 Useful for test suites and blocking terminal interfaces.
3 Useful for test suites and blocking terminal interfaces.
4 """
4 """
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2013 The IPython Development Team
7 #
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING.txt, distributed as part of this software.
10 #-----------------------------------------------------------------------------
11
5
12 #-----------------------------------------------------------------------------
6 # Copyright (c) IPython Development Team.
13 # Imports
7 # Distributed under the terms of the Modified BSD License.
14 #-----------------------------------------------------------------------------
15
8
16 try:
9 try:
17 from queue import Queue, Empty # Py 3
10 from queue import Queue, Empty # Py 3
18 except ImportError:
11 except ImportError:
19 from Queue import Queue, Empty # Py 2
12 from Queue import Queue, Empty # Py 2
20
13
21 from IPython.kernel.channels import IOPubChannel, HBChannel, \
22 ShellChannel, StdInChannel
23
14
24 #-----------------------------------------------------------------------------
15 class ZMQSocketChannel(object):
25 # Blocking kernel manager
16 """A ZMQ socket in a simple blocking API"""
26 #-----------------------------------------------------------------------------
17 session = None
27
18 socket = None
28
19 stream = None
29 class BlockingChannelMixin(object):
20 _exiting = False
30
21 proxy_methods = []
31 def __init__(self, *args, **kwds):
22
32 super(BlockingChannelMixin, self).__init__(*args, **kwds)
23 def __init__(self, socket, session, loop=None):
33 self._in_queue = Queue()
24 """Create a channel.
34
25
35 def call_handlers(self, msg):
26 Parameters
36 self._in_queue.put(msg)
27 ----------
28 socket : :class:`zmq.Socket`
29 The ZMQ socket to use.
30 session : :class:`session.Session`
31 The session to use.
32 loop
33 Unused here, for other implementations
34 """
35 super(ZMQSocketChannel, self).__init__()
36
37 self.socket = socket
38 self.session = session
39
40 def _recv(self, **kwargs):
41 msg = self.socket.recv_multipart(**kwargs)
42 ident,smsg = self.session.feed_identities(msg)
43 return self.session.deserialize(smsg)
37
44
38 def get_msg(self, block=True, timeout=None):
45 def get_msg(self, block=True, timeout=None):
39 """ Gets a message if there is one that is ready. """
46 """ Gets a message if there is one that is ready. """
40 if timeout is None:
47 if block:
41 # Queue.get(timeout=None) has stupid uninteruptible
48 if timeout is not None:
42 # behavior, so wait for a week instead
49 timeout *= 1000 # seconds to ms
43 timeout = 604800
50 ready = self.socket.poll(timeout)
44 return self._in_queue.get(block, timeout)
51 else:
52 ready = self.socket.poll(timeout=0)
53
54 if ready:
55 return self._recv()
56 else:
57 raise Empty
45
58
46 def get_msgs(self):
59 def get_msgs(self):
47 """ Get all messages that are currently ready. """
60 """ Get all messages that are currently ready. """
@@ -55,31 +68,25 b' class BlockingChannelMixin(object):'
55
68
56 def msg_ready(self):
69 def msg_ready(self):
57 """ Is there a message that has been received? """
70 """ Is there a message that has been received? """
58 return not self._in_queue.empty()
71 return bool(self.socket.poll(timeout=0))
59
60
61 class BlockingIOPubChannel(BlockingChannelMixin, IOPubChannel):
62 pass
63
64
65 class BlockingShellChannel(BlockingChannelMixin, ShellChannel):
66 def call_handlers(self, msg):
67 if msg['msg_type'] == 'kernel_info_reply':
68 self._handle_kernel_info_reply(msg)
69 return super(BlockingShellChannel, self).call_handlers(msg)
70
71
72 class BlockingStdInChannel(BlockingChannelMixin, StdInChannel):
73 pass
74
72
73 def close(self):
74 if self.socket is not None:
75 try:
76 self.socket.close(linger=0)
77 except Exception:
78 pass
79 self.socket = None
80 stop = close
75
81
76 class BlockingHBChannel(HBChannel):
82 def is_alive(self):
83 return (self.socket is not None)
77
84
78 # This kernel needs quicker monitoring, shorten to 1 sec.
85 def send(self, msg):
79 # less than 0.5s is unreliable, and will get occasional
86 """Pass a message to the ZMQ socket to send
80 # false reports of missed beats.
87 """
81 time_to_dead = 1.
88 self.session.send(self.socket, msg)
82
89
83 def call_handlers(self, since_last_heartbeat):
90 def start(self):
84 """ Pause beating on missed heartbeat. """
85 pass
91 pass
92
@@ -2,32 +2,38 b''
2
2
3 Useful for test suites and blocking terminal interfaces.
3 Useful for test suites and blocking terminal interfaces.
4 """
4 """
5 #-----------------------------------------------------------------------------
5 # Copyright (c) IPython Development Team.
6 # Copyright (C) 2013 The IPython Development Team
6 # Distributed under the terms of the Modified BSD License.
7 #
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING.txt, distributed as part of this software.
10 #-----------------------------------------------------------------------------
11
7
12 #-----------------------------------------------------------------------------
8 try:
13 # Imports
9 from queue import Empty # Python 3
14 #-----------------------------------------------------------------------------
10 except ImportError:
11 from Queue import Empty # Python 2
15
12
16 from IPython.utils.traitlets import Type
13 from IPython.utils.traitlets import Type
14 from IPython.kernel.channels import HBChannel
17 from IPython.kernel.client import KernelClient
15 from IPython.kernel.client import KernelClient
18 from .channels import (
16 from .channels import ZMQSocketChannel
19 BlockingIOPubChannel, BlockingHBChannel,
20 BlockingShellChannel, BlockingStdInChannel
21 )
22
23 #-----------------------------------------------------------------------------
24 # Blocking kernel manager
25 #-----------------------------------------------------------------------------
26
17
27 class BlockingKernelClient(KernelClient):
18 class BlockingKernelClient(KernelClient):
19 def wait_for_ready(self):
20 # Wait for kernel info reply on shell channel
21 while True:
22 msg = self.shell_channel.get_msg(block=True)
23 if msg['msg_type'] == 'kernel_info_reply':
24 self._handle_kernel_info_reply(msg)
25 break
26
27 # Flush IOPub channel
28 while True:
29 try:
30 msg = self.iopub_channel.get_msg(block=True, timeout=0.2)
31 print(msg['msg_type'])
32 except Empty:
33 break
28
34
29 # The classes to use for the various channels
35 # The classes to use for the various channels
30 shell_channel_class = Type(BlockingShellChannel)
36 shell_channel_class = Type(ZMQSocketChannel)
31 iopub_channel_class = Type(BlockingIOPubChannel)
37 iopub_channel_class = Type(ZMQSocketChannel)
32 stdin_channel_class = Type(BlockingStdInChannel)
38 stdin_channel_class = Type(ZMQSocketChannel)
33 hb_channel_class = Type(BlockingHBChannel)
39 hb_channel_class = Type(HBChannel)
This diff has been collapsed as it changes many lines, (505 lines changed) Show them Hide them
@@ -14,15 +14,10 b' import zmq'
14 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
14 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
15 # during garbage collection of threads at exit:
15 # during garbage collection of threads at exit:
16 from zmq import ZMQError
16 from zmq import ZMQError
17 from zmq.eventloop import ioloop, zmqstream
18
17
19 from IPython.core.release import kernel_protocol_version_info
18 from IPython.core.release import kernel_protocol_version_info
20
19
21 from .channelsabc import (
20 from .channelsabc import HBChannelABC
22 ShellChannelABC, IOPubChannelABC,
23 HBChannelABC, StdInChannelABC,
24 )
25 from IPython.utils.py3compat import string_types, iteritems
26
21
27 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
28 # Constants and exceptions
23 # Constants and exceptions
@@ -33,52 +28,27 b' major_protocol_version = kernel_protocol_version_info[0]'
33 class InvalidPortNumber(Exception):
28 class InvalidPortNumber(Exception):
34 pass
29 pass
35
30
36 #-----------------------------------------------------------------------------
31 class HBChannel(Thread):
37 # Utility functions
32 """The heartbeat channel which monitors the kernel heartbeat.
38 #-----------------------------------------------------------------------------
39
40 # some utilities to validate message structure, these might get moved elsewhere
41 # if they prove to have more generic utility
42
43 def validate_string_list(lst):
44 """Validate that the input is a list of strings.
45
46 Raises ValueError if not."""
47 if not isinstance(lst, list):
48 raise ValueError('input %r must be a list' % lst)
49 for x in lst:
50 if not isinstance(x, string_types):
51 raise ValueError('element %r in list must be a string' % x)
52
53
54 def validate_string_dict(dct):
55 """Validate that the input is a dict with string keys and values.
56
57 Raises ValueError if not."""
58 for k,v in iteritems(dct):
59 if not isinstance(k, string_types):
60 raise ValueError('key %r in dict must be a string' % k)
61 if not isinstance(v, string_types):
62 raise ValueError('value %r in dict must be a string' % v)
63
64
65 #-----------------------------------------------------------------------------
66 # ZMQ Socket Channel classes
67 #-----------------------------------------------------------------------------
68
33
69 class ZMQSocketChannel(Thread):
34 Note that the heartbeat channel is paused by default. As long as you start
70 """The base class for the channels that use ZMQ sockets."""
35 this channel, the kernel manager will ensure that it is paused and un-paused
36 as appropriate.
37 """
71 context = None
38 context = None
72 session = None
39 session = None
73 socket = None
40 socket = None
74 ioloop = None
41 address = None
75 stream = None
76 _address = None
77 _exiting = False
42 _exiting = False
78 proxy_methods = []
43
44 time_to_dead = 1.
45 poller = None
46 _running = None
47 _pause = None
48 _beating = None
79
49
80 def __init__(self, context, session, address):
50 def __init__(self, context, session, address):
81 """Create a channel.
51 """Create the heartbeat monitor thread.
82
52
83 Parameters
53 Parameters
84 ----------
54 ----------
@@ -89,7 +59,7 b' class ZMQSocketChannel(Thread):'
89 address : zmq url
59 address : zmq url
90 Standard (ip, port) tuple that the kernel is listening on.
60 Standard (ip, port) tuple that the kernel is listening on.
91 """
61 """
92 super(ZMQSocketChannel, self).__init__()
62 super(HBChannel, self).__init__()
93 self.daemon = True
63 self.daemon = True
94
64
95 self.context = context
65 self.context = context
@@ -99,429 +69,16 b' class ZMQSocketChannel(Thread):'
99 message = 'The port number for a channel cannot be 0.'
69 message = 'The port number for a channel cannot be 0.'
100 raise InvalidPortNumber(message)
70 raise InvalidPortNumber(message)
101 address = "tcp://%s:%i" % address
71 address = "tcp://%s:%i" % address
102 self._address = address
72 self.address = address
103 atexit.register(self._notice_exit)
73 atexit.register(self._notice_exit)
104
74
105 def _notice_exit(self):
106 self._exiting = True
107
108 def _run_loop(self):
109 """Run my loop, ignoring EINTR events in the poller"""
110 while True:
111 try:
112 self.ioloop.start()
113 except ZMQError as e:
114 if e.errno == errno.EINTR:
115 continue
116 else:
117 raise
118 except Exception:
119 if self._exiting:
120 break
121 else:
122 raise
123 else:
124 break
125
126 def stop(self):
127 """Stop the channel's event loop and join its thread.
128
129 This calls :meth:`~threading.Thread.join` and returns when the thread
130 terminates. :class:`RuntimeError` will be raised if
131 :meth:`~threading.Thread.start` is called again.
132 """
133 if self.ioloop is not None:
134 self.ioloop.stop()
135 self.join()
136 self.close()
137
138 def close(self):
139 if self.ioloop is not None:
140 try:
141 self.ioloop.close(all_fds=True)
142 except Exception:
143 pass
144 if self.socket is not None:
145 try:
146 self.socket.close(linger=0)
147 except Exception:
148 pass
149 self.socket = None
150
151 @property
152 def address(self):
153 """Get the channel's address as a zmq url string.
154
155 These URLS have the form: 'tcp://127.0.0.1:5555'.
156 """
157 return self._address
158
159 def _queue_send(self, msg):
160 """Queue a message to be sent from the IOLoop's thread.
161
162 Parameters
163 ----------
164 msg : message to send
165
166 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
167 thread control of the action.
168 """
169 def thread_send():
170 self.session.send(self.stream, msg)
171 self.ioloop.add_callback(thread_send)
172
173 def _handle_recv(self, msg):
174 """Callback for stream.on_recv.
175
176 Unpacks message, and calls handlers with it.
177 """
178 ident,smsg = self.session.feed_identities(msg)
179 msg = self.session.deserialize(smsg)
180 self.call_handlers(msg)
181
182
183
184 class ShellChannel(ZMQSocketChannel):
185 """The shell channel for issuing request/replies to the kernel."""
186
187 command_queue = None
188 # flag for whether execute requests should be allowed to call raw_input:
189 allow_stdin = True
190 proxy_methods = [
191 'execute',
192 'complete',
193 'inspect',
194 'history',
195 'kernel_info',
196 'shutdown',
197 'is_complete',
198 ]
199
200 def __init__(self, context, session, address):
201 super(ShellChannel, self).__init__(context, session, address)
202 self.ioloop = ioloop.IOLoop()
203
204 def run(self):
205 """The thread's main activity. Call start() instead."""
206 self.socket = self.context.socket(zmq.DEALER)
207 self.socket.linger = 1000
208 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
209 self.socket.connect(self.address)
210 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
211 self.stream.on_recv(self._handle_recv)
212 self._run_loop()
213
214 def call_handlers(self, msg):
215 """This method is called in the ioloop thread when a message arrives.
216
217 Subclasses should override this method to handle incoming messages.
218 It is important to remember that this method is called in the thread
219 so that some logic must be done to ensure that the application level
220 handlers are called in the application thread.
221 """
222 raise NotImplementedError('call_handlers must be defined in a subclass.')
223
224 def execute(self, code, silent=False, store_history=True,
225 user_expressions=None, allow_stdin=None):
226 """Execute code in the kernel.
227
228 Parameters
229 ----------
230 code : str
231 A string of Python code.
232
233 silent : bool, optional (default False)
234 If set, the kernel will execute the code as quietly possible, and
235 will force store_history to be False.
236
237 store_history : bool, optional (default True)
238 If set, the kernel will store command history. This is forced
239 to be False if silent is True.
240
241 user_expressions : dict, optional
242 A dict mapping names to expressions to be evaluated in the user's
243 dict. The expression values are returned as strings formatted using
244 :func:`repr`.
245
246 allow_stdin : bool, optional (default self.allow_stdin)
247 Flag for whether the kernel can send stdin requests to frontends.
248
249 Some frontends (e.g. the Notebook) do not support stdin requests.
250 If raw_input is called from code executed from such a frontend, a
251 StdinNotImplementedError will be raised.
252
253 Returns
254 -------
255 The msg_id of the message sent.
256 """
257 if user_expressions is None:
258 user_expressions = {}
259 if allow_stdin is None:
260 allow_stdin = self.allow_stdin
261
262
263 # Don't waste network traffic if inputs are invalid
264 if not isinstance(code, string_types):
265 raise ValueError('code %r must be a string' % code)
266 validate_string_dict(user_expressions)
267
268 # Create class for content/msg creation. Related to, but possibly
269 # not in Session.
270 content = dict(code=code, silent=silent, store_history=store_history,
271 user_expressions=user_expressions,
272 allow_stdin=allow_stdin,
273 )
274 msg = self.session.msg('execute_request', content)
275 self._queue_send(msg)
276 return msg['header']['msg_id']
277
278 def complete(self, code, cursor_pos=None):
279 """Tab complete text in the kernel's namespace.
280
281 Parameters
282 ----------
283 code : str
284 The context in which completion is requested.
285 Can be anything between a variable name and an entire cell.
286 cursor_pos : int, optional
287 The position of the cursor in the block of code where the completion was requested.
288 Default: ``len(code)``
289
290 Returns
291 -------
292 The msg_id of the message sent.
293 """
294 if cursor_pos is None:
295 cursor_pos = len(code)
296 content = dict(code=code, cursor_pos=cursor_pos)
297 msg = self.session.msg('complete_request', content)
298 self._queue_send(msg)
299 return msg['header']['msg_id']
300
301 def inspect(self, code, cursor_pos=None, detail_level=0):
302 """Get metadata information about an object in the kernel's namespace.
303
304 It is up to the kernel to determine the appropriate object to inspect.
305
306 Parameters
307 ----------
308 code : str
309 The context in which info is requested.
310 Can be anything between a variable name and an entire cell.
311 cursor_pos : int, optional
312 The position of the cursor in the block of code where the info was requested.
313 Default: ``len(code)``
314 detail_level : int, optional
315 The level of detail for the introspection (0-2)
316
317 Returns
318 -------
319 The msg_id of the message sent.
320 """
321 if cursor_pos is None:
322 cursor_pos = len(code)
323 content = dict(code=code, cursor_pos=cursor_pos,
324 detail_level=detail_level,
325 )
326 msg = self.session.msg('inspect_request', content)
327 self._queue_send(msg)
328 return msg['header']['msg_id']
329
330 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
331 """Get entries from the kernel's history list.
332
333 Parameters
334 ----------
335 raw : bool
336 If True, return the raw input.
337 output : bool
338 If True, then return the output as well.
339 hist_access_type : str
340 'range' (fill in session, start and stop params), 'tail' (fill in n)
341 or 'search' (fill in pattern param).
342
343 session : int
344 For a range request, the session from which to get lines. Session
345 numbers are positive integers; negative ones count back from the
346 current session.
347 start : int
348 The first line number of a history range.
349 stop : int
350 The final (excluded) line number of a history range.
351
352 n : int
353 The number of lines of history to get for a tail request.
354
355 pattern : str
356 The glob-syntax pattern for a search request.
357
358 Returns
359 -------
360 The msg_id of the message sent.
361 """
362 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
363 **kwargs)
364 msg = self.session.msg('history_request', content)
365 self._queue_send(msg)
366 return msg['header']['msg_id']
367
368 def kernel_info(self):
369 """Request kernel info."""
370 msg = self.session.msg('kernel_info_request')
371 self._queue_send(msg)
372 return msg['header']['msg_id']
373
374 def _handle_kernel_info_reply(self, msg):
375 """handle kernel info reply
376
377 sets protocol adaptation version
378 """
379 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
380 if adapt_version != major_protocol_version:
381 self.session.adapt_version = adapt_version
382
383 def shutdown(self, restart=False):
384 """Request an immediate kernel shutdown.
385
386 Upon receipt of the (empty) reply, client code can safely assume that
387 the kernel has shut down and it's safe to forcefully terminate it if
388 it's still alive.
389
390 The kernel will send the reply via a function registered with Python's
391 atexit module, ensuring it's truly done as the kernel is done with all
392 normal operation.
393 """
394 # Send quit message to kernel. Once we implement kernel-side setattr,
395 # this should probably be done that way, but for now this will do.
396 msg = self.session.msg('shutdown_request', {'restart':restart})
397 self._queue_send(msg)
398 return msg['header']['msg_id']
399
400 def is_complete(self, code):
401 msg = self.session.msg('is_complete_request', {'code': code})
402 self._queue_send(msg)
403 return msg['header']['msg_id']
404
405
406 class IOPubChannel(ZMQSocketChannel):
407 """The iopub channel which listens for messages that the kernel publishes.
408
409 This channel is where all output is published to frontends.
410 """
411
412 def __init__(self, context, session, address):
413 super(IOPubChannel, self).__init__(context, session, address)
414 self.ioloop = ioloop.IOLoop()
415
416 def run(self):
417 """The thread's main activity. Call start() instead."""
418 self.socket = self.context.socket(zmq.SUB)
419 self.socket.linger = 1000
420 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
421 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
422 self.socket.connect(self.address)
423 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
424 self.stream.on_recv(self._handle_recv)
425 self._run_loop()
426
427 def call_handlers(self, msg):
428 """This method is called in the ioloop thread when a message arrives.
429
430 Subclasses should override this method to handle incoming messages.
431 It is important to remember that this method is called in the thread
432 so that some logic must be done to ensure that the application leve
433 handlers are called in the application thread.
434 """
435 raise NotImplementedError('call_handlers must be defined in a subclass.')
436
437 def flush(self, timeout=1.0):
438 """Immediately processes all pending messages on the iopub channel.
439
440 Callers should use this method to ensure that :meth:`call_handlers`
441 has been called for all messages that have been received on the
442 0MQ SUB socket of this channel.
443
444 This method is thread safe.
445
446 Parameters
447 ----------
448 timeout : float, optional
449 The maximum amount of time to spend flushing, in seconds. The
450 default is one second.
451 """
452 # We do the IOLoop callback process twice to ensure that the IOLoop
453 # gets to perform at least one full poll.
454 stop_time = time.time() + timeout
455 for i in range(2):
456 self._flushed = False
457 self.ioloop.add_callback(self._flush)
458 while not self._flushed and time.time() < stop_time:
459 time.sleep(0.01)
460
461 def _flush(self):
462 """Callback for :method:`self.flush`."""
463 self.stream.flush()
464 self._flushed = True
465
466
467 class StdInChannel(ZMQSocketChannel):
468 """The stdin channel to handle raw_input requests that the kernel makes."""
469
470 msg_queue = None
471 proxy_methods = ['input']
472
473 def __init__(self, context, session, address):
474 super(StdInChannel, self).__init__(context, session, address)
475 self.ioloop = ioloop.IOLoop()
476
477 def run(self):
478 """The thread's main activity. Call start() instead."""
479 self.socket = self.context.socket(zmq.DEALER)
480 self.socket.linger = 1000
481 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
482 self.socket.connect(self.address)
483 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
484 self.stream.on_recv(self._handle_recv)
485 self._run_loop()
486
487 def call_handlers(self, msg):
488 """This method is called in the ioloop thread when a message arrives.
489
490 Subclasses should override this method to handle incoming messages.
491 It is important to remember that this method is called in the thread
492 so that some logic must be done to ensure that the application leve
493 handlers are called in the application thread.
494 """
495 raise NotImplementedError('call_handlers must be defined in a subclass.')
496
497 def input(self, string):
498 """Send a string of raw input to the kernel."""
499 content = dict(value=string)
500 msg = self.session.msg('input_reply', content)
501 self._queue_send(msg)
502
503
504 class HBChannel(ZMQSocketChannel):
505 """The heartbeat channel which monitors the kernel heartbeat.
506
507 Note that the heartbeat channel is paused by default. As long as you start
508 this channel, the kernel manager will ensure that it is paused and un-paused
509 as appropriate.
510 """
511
512 time_to_dead = 3.0
513 socket = None
514 poller = None
515 _running = None
516 _pause = None
517 _beating = None
518
519 def __init__(self, context, session, address):
520 super(HBChannel, self).__init__(context, session, address)
521 self._running = False
75 self._running = False
522 self._pause =True
76 self._pause = True
523 self.poller = zmq.Poller()
77 self.poller = zmq.Poller()
524
78
79 def _notice_exit(self):
80 self._exiting = True
81
525 def _create_socket(self):
82 def _create_socket(self):
526 if self.socket is not None:
83 if self.socket is not None:
527 # close previous socket, before opening a new one
84 # close previous socket, before opening a new one
@@ -621,7 +178,16 b' class HBChannel(ZMQSocketChannel):'
621 def stop(self):
178 def stop(self):
622 """Stop the channel's event loop and join its thread."""
179 """Stop the channel's event loop and join its thread."""
623 self._running = False
180 self._running = False
624 super(HBChannel, self).stop()
181 self.join()
182 self.close()
183
184 def close(self):
185 if self.socket is not None:
186 try:
187 self.socket.close(linger=0)
188 except Exception:
189 pass
190 self.socket = None
625
191
626 def call_handlers(self, since_last_heartbeat):
192 def call_handlers(self, since_last_heartbeat):
627 """This method is called in the ioloop thread when a message arrives.
193 """This method is called in the ioloop thread when a message arrives.
@@ -631,14 +197,7 b' class HBChannel(ZMQSocketChannel):'
631 so that some logic must be done to ensure that the application level
197 so that some logic must be done to ensure that the application level
632 handlers are called in the application thread.
198 handlers are called in the application thread.
633 """
199 """
634 raise NotImplementedError('call_handlers must be defined in a subclass.')
200 pass
635
201
636
202
637 #---------------------------------------------------------------------#-----------------------------------------------------------------------------
638 # ABC Registration
639 #-----------------------------------------------------------------------------
640
641 ShellChannelABC.register(ShellChannel)
642 IOPubChannelABC.register(IOPubChannel)
643 HBChannelABC.register(HBChannel)
203 HBChannelABC.register(HBChannel)
644 StdInChannelABC.register(StdInChannel)
@@ -24,70 +24,6 b' class ChannelABC(with_metaclass(abc.ABCMeta, object)):'
24 pass
24 pass
25
25
26
26
27 class ShellChannelABC(ChannelABC):
28 """ShellChannel ABC.
29
30 The docstrings for this class can be found in the base implementation:
31
32 `IPython.kernel.channels.ShellChannel`
33 """
34
35 @abc.abstractproperty
36 def allow_stdin(self):
37 pass
38
39 @abc.abstractmethod
40 def execute(self, code, silent=False, store_history=True,
41 user_expressions=None, allow_stdin=None):
42 pass
43
44 @abc.abstractmethod
45 def complete(self, text, line, cursor_pos, block=None):
46 pass
47
48 @abc.abstractmethod
49 def inspect(self, oname, detail_level=0):
50 pass
51
52 @abc.abstractmethod
53 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
54 pass
55
56 @abc.abstractmethod
57 def kernel_info(self):
58 pass
59
60 @abc.abstractmethod
61 def shutdown(self, restart=False):
62 pass
63
64
65 class IOPubChannelABC(ChannelABC):
66 """IOPubChannel ABC.
67
68 The docstrings for this class can be found in the base implementation:
69
70 `IPython.kernel.channels.IOPubChannel`
71 """
72
73 @abc.abstractmethod
74 def flush(self, timeout=1.0):
75 pass
76
77
78 class StdInChannelABC(ChannelABC):
79 """StdInChannel ABC.
80
81 The docstrings for this class can be found in the base implementation:
82
83 `IPython.kernel.channels.StdInChannel`
84 """
85
86 @abc.abstractmethod
87 def input(self, string):
88 pass
89
90
91 class HBChannelABC(ChannelABC):
27 class HBChannelABC(ChannelABC):
92 """HBChannel ABC.
28 """HBChannel ABC.
93
29
@@ -4,6 +4,8 b''
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from __future__ import absolute_import
6 from __future__ import absolute_import
7 from IPython.kernel.channels import major_protocol_version
8 from IPython.utils.py3compat import string_types, iteritems
7
9
8 import zmq
10 import zmq
9
11
@@ -11,15 +13,25 b' from IPython.utils.traitlets import ('
11 Any, Instance, Type,
13 Any, Instance, Type,
12 )
14 )
13
15
14 from .zmq.session import Session
16 from .channelsabc import (ChannelABC, HBChannelABC)
15 from .channels import (
16 ShellChannel, IOPubChannel,
17 HBChannel, StdInChannel,
18 )
19 from .clientabc import KernelClientABC
17 from .clientabc import KernelClientABC
20 from .connect import ConnectionFileMixin
18 from .connect import ConnectionFileMixin
21
19
22
20
21 # some utilities to validate message structure, these might get moved elsewhere
22 # if they prove to have more generic utility
23
24 def validate_string_dict(dct):
25 """Validate that the input is a dict with string keys and values.
26
27 Raises ValueError if not."""
28 for k,v in iteritems(dct):
29 if not isinstance(k, string_types):
30 raise ValueError('key %r in dict must be a string' % k)
31 if not isinstance(v, string_types):
32 raise ValueError('value %r in dict must be a string' % v)
33
34
23 class KernelClient(ConnectionFileMixin):
35 class KernelClient(ConnectionFileMixin):
24 """Communicates with a single kernel on any host via zmq channels.
36 """Communicates with a single kernel on any host via zmq channels.
25
37
@@ -42,10 +54,10 b' class KernelClient(ConnectionFileMixin):'
42 return zmq.Context.instance()
54 return zmq.Context.instance()
43
55
44 # The classes to use for the various channels
56 # The classes to use for the various channels
45 shell_channel_class = Type(ShellChannel)
57 shell_channel_class = Type(ChannelABC)
46 iopub_channel_class = Type(IOPubChannel)
58 iopub_channel_class = Type(ChannelABC)
47 stdin_channel_class = Type(StdInChannel)
59 stdin_channel_class = Type(ChannelABC)
48 hb_channel_class = Type(HBChannel)
60 hb_channel_class = Type(HBChannelABC)
49
61
50 # Protected traits
62 # Protected traits
51 _shell_channel = Any
63 _shell_channel = Any
@@ -53,6 +65,9 b' class KernelClient(ConnectionFileMixin):'
53 _stdin_channel = Any
65 _stdin_channel = Any
54 _hb_channel = Any
66 _hb_channel = Any
55
67
68 # flag for whether execute requests should be allowed to call raw_input:
69 allow_stdin = True
70
56 #--------------------------------------------------------------------------
71 #--------------------------------------------------------------------------
57 # Channel proxy methods
72 # Channel proxy methods
58 #--------------------------------------------------------------------------
73 #--------------------------------------------------------------------------
@@ -87,19 +102,14 b' class KernelClient(ConnectionFileMixin):'
87 """
102 """
88 if shell:
103 if shell:
89 self.shell_channel.start()
104 self.shell_channel.start()
90 for method in self.shell_channel.proxy_methods:
105 self.kernel_info()
91 setattr(self, method, getattr(self.shell_channel, method))
92 if iopub:
106 if iopub:
93 self.iopub_channel.start()
107 self.iopub_channel.start()
94 for method in self.iopub_channel.proxy_methods:
95 setattr(self, method, getattr(self.iopub_channel, method))
96 if stdin:
108 if stdin:
97 self.stdin_channel.start()
109 self.stdin_channel.start()
98 for method in self.stdin_channel.proxy_methods:
110 self.allow_stdin = True
99 setattr(self, method, getattr(self.stdin_channel, method))
100 self.shell_channel.allow_stdin = True
101 else:
111 else:
102 self.shell_channel.allow_stdin = False
112 self.allow_stdin = False
103 if hb:
113 if hb:
104 self.hb_channel.start()
114 self.hb_channel.start()
105
115
@@ -123,14 +133,17 b' class KernelClient(ConnectionFileMixin):'
123 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
133 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
124 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
134 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
125
135
136 ioloop = None # Overridden in subclasses that use pyzmq event loop
137
126 @property
138 @property
127 def shell_channel(self):
139 def shell_channel(self):
128 """Get the shell channel object for this kernel."""
140 """Get the shell channel object for this kernel."""
129 if self._shell_channel is None:
141 if self._shell_channel is None:
130 url = self._make_url('shell')
142 url = self._make_url('shell')
131 self.log.debug("connecting shell channel to %s", url)
143 self.log.debug("connecting shell channel to %s", url)
144 socket = self.connect_shell(identity=self.session.bsession)
132 self._shell_channel = self.shell_channel_class(
145 self._shell_channel = self.shell_channel_class(
133 self.context, self.session, url
146 socket, self.session, self.ioloop
134 )
147 )
135 return self._shell_channel
148 return self._shell_channel
136
149
@@ -140,8 +153,9 b' class KernelClient(ConnectionFileMixin):'
140 if self._iopub_channel is None:
153 if self._iopub_channel is None:
141 url = self._make_url('iopub')
154 url = self._make_url('iopub')
142 self.log.debug("connecting iopub channel to %s", url)
155 self.log.debug("connecting iopub channel to %s", url)
156 socket = self.connect_iopub()
143 self._iopub_channel = self.iopub_channel_class(
157 self._iopub_channel = self.iopub_channel_class(
144 self.context, self.session, url
158 socket, self.session, self.ioloop
145 )
159 )
146 return self._iopub_channel
160 return self._iopub_channel
147
161
@@ -151,8 +165,9 b' class KernelClient(ConnectionFileMixin):'
151 if self._stdin_channel is None:
165 if self._stdin_channel is None:
152 url = self._make_url('stdin')
166 url = self._make_url('stdin')
153 self.log.debug("connecting stdin channel to %s", url)
167 self.log.debug("connecting stdin channel to %s", url)
168 socket = self.connect_stdin(identity=self.session.bsession)
154 self._stdin_channel = self.stdin_channel_class(
169 self._stdin_channel = self.stdin_channel_class(
155 self.context, self.session, url
170 socket, self.session, self.ioloop
156 )
171 )
157 return self._stdin_channel
172 return self._stdin_channel
158
173
@@ -179,8 +194,193 b' class KernelClient(ConnectionFileMixin):'
179 return True
194 return True
180
195
181
196
182 #-----------------------------------------------------------------------------
197 # Methods to send specific messages on channels
183 # ABC Registration
198 def execute(self, code, silent=False, store_history=True,
184 #-----------------------------------------------------------------------------
199 user_expressions=None, allow_stdin=None):
200 """Execute code in the kernel.
201
202 Parameters
203 ----------
204 code : str
205 A string of Python code.
206
207 silent : bool, optional (default False)
208 If set, the kernel will execute the code as quietly possible, and
209 will force store_history to be False.
210
211 store_history : bool, optional (default True)
212 If set, the kernel will store command history. This is forced
213 to be False if silent is True.
214
215 user_expressions : dict, optional
216 A dict mapping names to expressions to be evaluated in the user's
217 dict. The expression values are returned as strings formatted using
218 :func:`repr`.
219
220 allow_stdin : bool, optional (default self.allow_stdin)
221 Flag for whether the kernel can send stdin requests to frontends.
222
223 Some frontends (e.g. the Notebook) do not support stdin requests.
224 If raw_input is called from code executed from such a frontend, a
225 StdinNotImplementedError will be raised.
226
227 Returns
228 -------
229 The msg_id of the message sent.
230 """
231 if user_expressions is None:
232 user_expressions = {}
233 if allow_stdin is None:
234 allow_stdin = self.allow_stdin
235
236
237 # Don't waste network traffic if inputs are invalid
238 if not isinstance(code, string_types):
239 raise ValueError('code %r must be a string' % code)
240 validate_string_dict(user_expressions)
241
242 # Create class for content/msg creation. Related to, but possibly
243 # not in Session.
244 content = dict(code=code, silent=silent, store_history=store_history,
245 user_expressions=user_expressions,
246 allow_stdin=allow_stdin,
247 )
248 msg = self.session.msg('execute_request', content)
249 self.shell_channel.send(msg)
250 return msg['header']['msg_id']
251
252 def complete(self, code, cursor_pos=None):
253 """Tab complete text in the kernel's namespace.
254
255 Parameters
256 ----------
257 code : str
258 The context in which completion is requested.
259 Can be anything between a variable name and an entire cell.
260 cursor_pos : int, optional
261 The position of the cursor in the block of code where the completion was requested.
262 Default: ``len(code)``
263
264 Returns
265 -------
266 The msg_id of the message sent.
267 """
268 if cursor_pos is None:
269 cursor_pos = len(code)
270 content = dict(code=code, cursor_pos=cursor_pos)
271 msg = self.session.msg('complete_request', content)
272 self.shell_channel.send(msg)
273 return msg['header']['msg_id']
274
275 def inspect(self, code, cursor_pos=None, detail_level=0):
276 """Get metadata information about an object in the kernel's namespace.
277
278 It is up to the kernel to determine the appropriate object to inspect.
279
280 Parameters
281 ----------
282 code : str
283 The context in which info is requested.
284 Can be anything between a variable name and an entire cell.
285 cursor_pos : int, optional
286 The position of the cursor in the block of code where the info was requested.
287 Default: ``len(code)``
288 detail_level : int, optional
289 The level of detail for the introspection (0-2)
290
291 Returns
292 -------
293 The msg_id of the message sent.
294 """
295 if cursor_pos is None:
296 cursor_pos = len(code)
297 content = dict(code=code, cursor_pos=cursor_pos,
298 detail_level=detail_level,
299 )
300 msg = self.session.msg('inspect_request', content)
301 self.shell_channel.send(msg)
302 return msg['header']['msg_id']
303
304 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
305 """Get entries from the kernel's history list.
306
307 Parameters
308 ----------
309 raw : bool
310 If True, return the raw input.
311 output : bool
312 If True, then return the output as well.
313 hist_access_type : str
314 'range' (fill in session, start and stop params), 'tail' (fill in n)
315 or 'search' (fill in pattern param).
316
317 session : int
318 For a range request, the session from which to get lines. Session
319 numbers are positive integers; negative ones count back from the
320 current session.
321 start : int
322 The first line number of a history range.
323 stop : int
324 The final (excluded) line number of a history range.
325
326 n : int
327 The number of lines of history to get for a tail request.
328
329 pattern : str
330 The glob-syntax pattern for a search request.
331
332 Returns
333 -------
334 The msg_id of the message sent.
335 """
336 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
337 **kwargs)
338 msg = self.session.msg('history_request', content)
339 self.shell_channel.send(msg)
340 return msg['header']['msg_id']
341
342 def kernel_info(self):
343 """Request kernel info."""
344 msg = self.session.msg('kernel_info_request')
345 self.shell_channel.send(msg)
346 return msg['header']['msg_id']
347
348 def _handle_kernel_info_reply(self, msg):
349 """handle kernel info reply
350
351 sets protocol adaptation version
352 """
353 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
354 if adapt_version != major_protocol_version:
355 self.session.adapt_version = adapt_version
356
357 def shutdown(self, restart=False):
358 """Request an immediate kernel shutdown.
359
360 Upon receipt of the (empty) reply, client code can safely assume that
361 the kernel has shut down and it's safe to forcefully terminate it if
362 it's still alive.
363
364 The kernel will send the reply via a function registered with Python's
365 atexit module, ensuring it's truly done as the kernel is done with all
366 normal operation.
367 """
368 # Send quit message to kernel. Once we implement kernel-side setattr,
369 # this should probably be done that way, but for now this will do.
370 msg = self.session.msg('shutdown_request', {'restart':restart})
371 self.shell_channel.send(msg)
372 return msg['header']['msg_id']
373
374 def is_complete(self, code):
375 msg = self.session.msg('is_complete_request', {'code': code})
376 self.shell_channel.send(msg)
377 return msg['header']['msg_id']
378
379 def input(self, string):
380 """Send a string of raw input to the kernel."""
381 content = dict(value=string)
382 msg = self.session.msg('input_reply', content)
383 self.stdin_channel.send(msg)
384
185
385
186 KernelClientABC.register(KernelClient)
386 KernelClientABC.register(KernelClient)
@@ -1,7 +1,5 b''
1 from .channels import (
1 from .channels import (
2 InProcessShellChannel,
2 InProcessChannel,
3 InProcessIOPubChannel,
4 InProcessStdInChannel,
5 InProcessHBChannel,
3 InProcessHBChannel,
6 )
4 )
7
5
@@ -9,35 +9,55 b' Useful for test suites and blocking terminal interfaces.'
9 # the file COPYING.txt, distributed as part of this software.
9 # the file COPYING.txt, distributed as part of this software.
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11
11
12 #-----------------------------------------------------------------------------
12 try:
13 # Imports
13 from queue import Queue, Empty # Py 3
14 #-----------------------------------------------------------------------------
14 except ImportError:
15 from Queue import Queue, Empty # Py 2
15
16
16 # IPython imports
17 # IPython imports
17 from IPython.utils.io import raw_print
18 from IPython.utils.io import raw_print
18 from IPython.utils.traitlets import Type
19 from IPython.utils.traitlets import Type
19 from IPython.kernel.blocking.channels import BlockingChannelMixin
20 #from IPython.kernel.blocking.channels import BlockingChannelMixin
20
21
21 # Local imports
22 # Local imports
22 from .channels import (
23 from .channels import (
23 InProcessShellChannel,
24 InProcessChannel,
24 InProcessIOPubChannel,
25 InProcessStdInChannel,
26 )
25 )
27 from .client import InProcessKernelClient
26 from .client import InProcessKernelClient
28
27
29 #-----------------------------------------------------------------------------
28 class BlockingInProcessChannel(InProcessChannel):
30 # Blocking kernel manager
29
31 #-----------------------------------------------------------------------------
30 def __init__(self, *args, **kwds):
31 super(BlockingInProcessChannel, self).__init__(*args, **kwds)
32 self._in_queue = Queue()
32
33
33 class BlockingInProcessShellChannel(BlockingChannelMixin, InProcessShellChannel):
34 def call_handlers(self, msg):
34 pass
35 self._in_queue.put(msg)
36
37 def get_msg(self, block=True, timeout=None):
38 """ Gets a message if there is one that is ready. """
39 if timeout is None:
40 # Queue.get(timeout=None) has stupid uninteruptible
41 # behavior, so wait for a week instead
42 timeout = 604800
43 return self._in_queue.get(block, timeout)
44
45 def get_msgs(self):
46 """ Get all messages that are currently ready. """
47 msgs = []
48 while True:
49 try:
50 msgs.append(self.get_msg(block=False))
51 except Empty:
52 break
53 return msgs
35
54
36 class BlockingInProcessIOPubChannel(BlockingChannelMixin, InProcessIOPubChannel):
55 def msg_ready(self):
37 pass
56 """ Is there a message that has been received? """
57 return not self._in_queue.empty()
38
58
39 class BlockingInProcessStdInChannel(BlockingChannelMixin, InProcessStdInChannel):
40
59
60 class BlockingInProcessStdInChannel(BlockingInProcessChannel):
41 def call_handlers(self, msg):
61 def call_handlers(self, msg):
42 """ Overridden for the in-process channel.
62 """ Overridden for the in-process channel.
43
63
@@ -48,11 +68,27 b' class BlockingInProcessStdInChannel(BlockingChannelMixin, InProcessStdInChannel)'
48 _raw_input = self.client.kernel._sys_raw_input
68 _raw_input = self.client.kernel._sys_raw_input
49 prompt = msg['content']['prompt']
69 prompt = msg['content']['prompt']
50 raw_print(prompt, end='')
70 raw_print(prompt, end='')
51 self.input(_raw_input())
71 self.client.input(_raw_input())
52
72
53 class BlockingInProcessKernelClient(InProcessKernelClient):
73 class BlockingInProcessKernelClient(InProcessKernelClient):
54
74
55 # The classes to use for the various channels.
75 # The classes to use for the various channels.
56 shell_channel_class = Type(BlockingInProcessShellChannel)
76 shell_channel_class = Type(BlockingInProcessChannel)
57 iopub_channel_class = Type(BlockingInProcessIOPubChannel)
77 iopub_channel_class = Type(BlockingInProcessChannel)
58 stdin_channel_class = Type(BlockingInProcessStdInChannel)
78 stdin_channel_class = Type(BlockingInProcessStdInChannel)
79
80 def wait_for_ready(self):
81 # Wait for kernel info reply on shell channel
82 while True:
83 msg = self.shell_channel.get_msg(block=True)
84 if msg['msg_type'] == 'kernel_info_reply':
85 self._handle_kernel_info_reply(msg)
86 break
87
88 # Flush IOPub channel
89 while True:
90 try:
91 msg = self.iopub_channel.get_msg(block=True, timeout=0.2)
92 print(msg['msg_type'])
93 except Empty:
94 break
@@ -3,10 +3,7 b''
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from IPython.kernel.channelsabc import (
6 from IPython.kernel.channelsabc import HBChannelABC
7 ShellChannelABC, IOPubChannelABC,
8 HBChannelABC, StdInChannelABC,
9 )
10
7
11 from .socket import DummySocket
8 from .socket import DummySocket
12
9
@@ -23,10 +20,6 b' class InProcessChannel(object):'
23 self.client = client
20 self.client = client
24 self._is_alive = False
21 self._is_alive = False
25
22
26 #--------------------------------------------------------------------------
27 # Channel interface
28 #--------------------------------------------------------------------------
29
30 def is_alive(self):
23 def is_alive(self):
31 return self._is_alive
24 return self._is_alive
32
25
@@ -43,9 +36,9 b' class InProcessChannel(object):'
43 """
36 """
44 raise NotImplementedError('call_handlers must be defined in a subclass.')
37 raise NotImplementedError('call_handlers must be defined in a subclass.')
45
38
46 #--------------------------------------------------------------------------
39 def flush(self, timeout=1.0):
47 # InProcessChannel interface
40 pass
48 #--------------------------------------------------------------------------
41
49
42
50 def call_handlers_later(self, *args, **kwds):
43 def call_handlers_later(self, *args, **kwds):
51 """ Call the message handlers later.
44 """ Call the message handlers later.
@@ -65,117 +58,31 b' class InProcessChannel(object):'
65 raise NotImplementedError
58 raise NotImplementedError
66
59
67
60
68 class InProcessShellChannel(InProcessChannel):
69 """See `IPython.kernel.channels.ShellChannel` for docstrings."""
70
71 # flag for whether execute requests should be allowed to call raw_input
72 allow_stdin = True
73 proxy_methods = [
74 'execute',
75 'complete',
76 'inspect',
77 'history',
78 'shutdown',
79 'kernel_info',
80 ]
81
82 #--------------------------------------------------------------------------
83 # ShellChannel interface
84 #--------------------------------------------------------------------------
85
86 def execute(self, code, silent=False, store_history=True,
87 user_expressions={}, allow_stdin=None):
88 if allow_stdin is None:
89 allow_stdin = self.allow_stdin
90 content = dict(code=code, silent=silent, store_history=store_history,
91 user_expressions=user_expressions,
92 allow_stdin=allow_stdin)
93 msg = self.client.session.msg('execute_request', content)
94 self._dispatch_to_kernel(msg)
95 return msg['header']['msg_id']
96
97 def complete(self, code, cursor_pos=None):
98 if cursor_pos is None:
99 cursor_pos = len(code)
100 content = dict(code=code, cursor_pos=cursor_pos)
101 msg = self.client.session.msg('complete_request', content)
102 self._dispatch_to_kernel(msg)
103 return msg['header']['msg_id']
104
105 def inspect(self, code, cursor_pos=None, detail_level=0):
106 if cursor_pos is None:
107 cursor_pos = len(code)
108 content = dict(code=code, cursor_pos=cursor_pos,
109 detail_level=detail_level,
110 )
111 msg = self.client.session.msg('inspect_request', content)
112 self._dispatch_to_kernel(msg)
113 return msg['header']['msg_id']
114
115 def history(self, raw=True, output=False, hist_access_type='range', **kwds):
116 content = dict(raw=raw, output=output,
117 hist_access_type=hist_access_type, **kwds)
118 msg = self.client.session.msg('history_request', content)
119 self._dispatch_to_kernel(msg)
120 return msg['header']['msg_id']
121
122 def shutdown(self, restart=False):
123 # FIXME: What to do here?
124 raise NotImplementedError('Cannot shutdown in-process kernel')
125
126 def kernel_info(self):
127 """Request kernel info."""
128 msg = self.client.session.msg('kernel_info_request')
129 self._dispatch_to_kernel(msg)
130 return msg['header']['msg_id']
131
132 #--------------------------------------------------------------------------
133 # Protected interface
134 #--------------------------------------------------------------------------
135
136 def _dispatch_to_kernel(self, msg):
137 """ Send a message to the kernel and handle a reply.
138 """
139 kernel = self.client.kernel
140 if kernel is None:
141 raise RuntimeError('Cannot send request. No kernel exists.')
142
143 stream = DummySocket()
144 self.client.session.send(stream, msg)
145 msg_parts = stream.recv_multipart()
146 kernel.dispatch_shell(stream, msg_parts)
147
148 idents, reply_msg = self.client.session.recv(stream, copy=False)
149 self.call_handlers_later(reply_msg)
150
61
62 class InProcessHBChannel(object):
63 """A dummy heartbeat channel interface for in-process kernels.
151
64
152 class InProcessIOPubChannel(InProcessChannel):
65 Normally we use the heartbeat to check that the kernel process is alive.
153 """See `IPython.kernel.channels.IOPubChannel` for docstrings."""
66 When the kernel is in-process, that doesn't make sense, but clients still
67 expect this interface.
68 """
154
69
155 def flush(self, timeout=1.0):
70 time_to_dead = 3.0
156 pass
157
158
159 class InProcessStdInChannel(InProcessChannel):
160 """See `IPython.kernel.channels.StdInChannel` for docstrings."""
161
162 proxy_methods = ['input']
163
164 def input(self, string):
165 kernel = self.client.kernel
166 if kernel is None:
167 raise RuntimeError('Cannot send input reply. No kernel exists.')
168 kernel.raw_input_str = string
169
71
72 def __init__(self, client=None):
73 super(InProcessHBChannel, self).__init__()
74 self.client = client
75 self._is_alive = False
76 self._pause = True
170
77
171 class InProcessHBChannel(InProcessChannel):
78 def is_alive(self):
172 """See `IPython.kernel.channels.HBChannel` for docstrings."""
79 return self._is_alive
173
80
174 time_to_dead = 3.0
81 def start(self):
82 self._is_alive = True
175
83
176 def __init__(self, *args, **kwds):
84 def stop(self):
177 super(InProcessHBChannel, self).__init__(*args, **kwds)
85 self._is_alive = False
178 self._pause = True
179
86
180 def pause(self):
87 def pause(self):
181 self._pause = True
88 self._pause = True
@@ -186,11 +93,5 b' class InProcessHBChannel(InProcessChannel):'
186 def is_beating(self):
93 def is_beating(self):
187 return not self._pause
94 return not self._pause
188
95
189 #-----------------------------------------------------------------------------
190 # ABC Registration
191 #-----------------------------------------------------------------------------
192
96
193 ShellChannelABC.register(InProcessShellChannel)
194 IOPubChannelABC.register(InProcessIOPubChannel)
195 HBChannelABC.register(InProcessHBChannel)
97 HBChannelABC.register(InProcessHBChannel)
196 StdInChannelABC.register(InProcessStdInChannel)
@@ -12,16 +12,15 b''
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 # IPython imports
14 # IPython imports
15 from IPython.kernel.inprocess.socket import DummySocket
15 from IPython.utils.traitlets import Type, Instance
16 from IPython.utils.traitlets import Type, Instance
16 from IPython.kernel.clientabc import KernelClientABC
17 from IPython.kernel.clientabc import KernelClientABC
17 from IPython.kernel.client import KernelClient
18 from IPython.kernel.client import KernelClient
18
19
19 # Local imports
20 # Local imports
20 from .channels import (
21 from .channels import (
21 InProcessShellChannel,
22 InProcessChannel,
22 InProcessIOPubChannel,
23 InProcessHBChannel,
23 InProcessHBChannel,
24 InProcessStdInChannel,
25
24
26 )
25 )
27
26
@@ -40,9 +39,9 b' class InProcessKernelClient(KernelClient):'
40 """
39 """
41
40
42 # The classes to use for the various channels.
41 # The classes to use for the various channels.
43 shell_channel_class = Type(InProcessShellChannel)
42 shell_channel_class = Type(InProcessChannel)
44 iopub_channel_class = Type(InProcessIOPubChannel)
43 iopub_channel_class = Type(InProcessChannel)
45 stdin_channel_class = Type(InProcessStdInChannel)
44 stdin_channel_class = Type(InProcessChannel)
46 hb_channel_class = Type(InProcessHBChannel)
45 hb_channel_class = Type(InProcessHBChannel)
47
46
48 kernel = Instance('IPython.kernel.inprocess.ipkernel.InProcessKernel')
47 kernel = Instance('IPython.kernel.inprocess.ipkernel.InProcessKernel')
@@ -79,6 +78,76 b' class InProcessKernelClient(KernelClient):'
79 self._hb_channel = self.hb_channel_class(self)
78 self._hb_channel = self.hb_channel_class(self)
80 return self._hb_channel
79 return self._hb_channel
81
80
81 # Methods for sending specific messages
82 # -------------------------------------
83
84 def execute(self, code, silent=False, store_history=True,
85 user_expressions={}, allow_stdin=None):
86 if allow_stdin is None:
87 allow_stdin = self.allow_stdin
88 content = dict(code=code, silent=silent, store_history=store_history,
89 user_expressions=user_expressions,
90 allow_stdin=allow_stdin)
91 msg = self.session.msg('execute_request', content)
92 self._dispatch_to_kernel(msg)
93 return msg['header']['msg_id']
94
95 def complete(self, code, cursor_pos=None):
96 if cursor_pos is None:
97 cursor_pos = len(code)
98 content = dict(code=code, cursor_pos=cursor_pos)
99 msg = self.session.msg('complete_request', content)
100 self._dispatch_to_kernel(msg)
101 return msg['header']['msg_id']
102
103 def inspect(self, code, cursor_pos=None, detail_level=0):
104 if cursor_pos is None:
105 cursor_pos = len(code)
106 content = dict(code=code, cursor_pos=cursor_pos,
107 detail_level=detail_level,
108 )
109 msg = self.session.msg('inspect_request', content)
110 self._dispatch_to_kernel(msg)
111 return msg['header']['msg_id']
112
113 def history(self, raw=True, output=False, hist_access_type='range', **kwds):
114 content = dict(raw=raw, output=output,
115 hist_access_type=hist_access_type, **kwds)
116 msg = self.session.msg('history_request', content)
117 self._dispatch_to_kernel(msg)
118 return msg['header']['msg_id']
119
120 def shutdown(self, restart=False):
121 # FIXME: What to do here?
122 raise NotImplementedError('Cannot shutdown in-process kernel')
123
124 def kernel_info(self):
125 """Request kernel info."""
126 msg = self.session.msg('kernel_info_request')
127 self._dispatch_to_kernel(msg)
128 return msg['header']['msg_id']
129
130 def input(self, string):
131 if self.kernel is None:
132 raise RuntimeError('Cannot send input reply. No kernel exists.')
133 self.kernel.raw_input_str = string
134
135
136 def _dispatch_to_kernel(self, msg):
137 """ Send a message to the kernel and handle a reply.
138 """
139 kernel = self.kernel
140 if kernel is None:
141 raise RuntimeError('Cannot send request. No kernel exists.')
142
143 stream = DummySocket()
144 self.session.send(stream, msg)
145 msg_parts = stream.recv_multipart()
146 kernel.dispatch_shell(stream, msg_parts)
147
148 idents, reply_msg = self.session.recv(stream, copy=False)
149 self.shell_channel.call_handlers_later(reply_msg)
150
82
151
83 #-----------------------------------------------------------------------------
152 #-----------------------------------------------------------------------------
84 # ABC Registration
153 # ABC Registration
@@ -26,6 +26,7 b' class InProcessKernelTestCase(unittest.TestCase):'
26 self.km.start_kernel()
26 self.km.start_kernel()
27 self.kc = BlockingInProcessKernelClient(kernel=self.km.kernel)
27 self.kc = BlockingInProcessKernelClient(kernel=self.km.kernel)
28 self.kc.start_channels()
28 self.kc.start_channels()
29 self.kc.wait_for_ready()
29
30
30 @skipif_not_matplotlib
31 @skipif_not_matplotlib
31 def test_pylab(self):
32 def test_pylab(self):
@@ -61,7 +62,7 b' class InProcessKernelTestCase(unittest.TestCase):'
61
62
62 kc = BlockingInProcessKernelClient(kernel=kernel)
63 kc = BlockingInProcessKernelClient(kernel=kernel)
63 kernel.frontends.append(kc)
64 kernel.frontends.append(kc)
64 kc.shell_channel.execute('print("bar")')
65 kc.execute('print("bar")')
65 msg = get_stream_message(kc)
66 msg = get_stream_message(kc)
66 self.assertEqual(msg['content']['text'], 'bar\n')
67 self.assertEqual(msg['content']['text'], 'bar\n')
67
68
@@ -51,6 +51,7 b' class InProcessKernelManagerTestCase(unittest.TestCase):'
51 km.start_kernel()
51 km.start_kernel()
52 kc = BlockingInProcessKernelClient(kernel=km.kernel)
52 kc = BlockingInProcessKernelClient(kernel=km.kernel)
53 kc.start_channels()
53 kc.start_channels()
54 kc.wait_for_ready()
54 kc.execute('foo = 1')
55 kc.execute('foo = 1')
55 self.assertEquals(km.kernel.shell.user_ns['foo'], 1)
56 self.assertEquals(km.kernel.shell.user_ns['foo'], 1)
56
57
@@ -61,6 +62,7 b' class InProcessKernelManagerTestCase(unittest.TestCase):'
61 km.start_kernel()
62 km.start_kernel()
62 kc = BlockingInProcessKernelClient(kernel=km.kernel)
63 kc = BlockingInProcessKernelClient(kernel=km.kernel)
63 kc.start_channels()
64 kc.start_channels()
65 kc.wait_for_ready()
64 km.kernel.shell.push({'my_bar': 0, 'my_baz': 1})
66 km.kernel.shell.push({'my_bar': 0, 'my_baz': 1})
65 kc.complete('my_ba', 5)
67 kc.complete('my_ba', 5)
66 msg = kc.get_shell_msg()
68 msg = kc.get_shell_msg()
@@ -75,6 +77,7 b' class InProcessKernelManagerTestCase(unittest.TestCase):'
75 km.start_kernel()
77 km.start_kernel()
76 kc = BlockingInProcessKernelClient(kernel=km.kernel)
78 kc = BlockingInProcessKernelClient(kernel=km.kernel)
77 kc.start_channels()
79 kc.start_channels()
80 kc.wait_for_ready()
78 km.kernel.shell.user_ns['foo'] = 1
81 km.kernel.shell.user_ns['foo'] = 1
79 kc.inspect('foo')
82 kc.inspect('foo')
80 msg = kc.get_shell_msg()
83 msg = kc.get_shell_msg()
@@ -91,6 +94,7 b' class InProcessKernelManagerTestCase(unittest.TestCase):'
91 km.start_kernel()
94 km.start_kernel()
92 kc = BlockingInProcessKernelClient(kernel=km.kernel)
95 kc = BlockingInProcessKernelClient(kernel=km.kernel)
93 kc.start_channels()
96 kc.start_channels()
97 kc.wait_for_ready()
94 kc.execute('%who')
98 kc.execute('%who')
95 kc.history(hist_access_type='tail', n=1)
99 kc.history(hist_access_type='tail', n=1)
96 msg = kc.shell_channel.get_msgs()[-1]
100 msg = kc.shell_channel.get_msgs()[-1]
@@ -420,17 +420,8 b" def start_new_kernel(startup_timeout=60, kernel_name='python', **kwargs):"
420 km.start_kernel(**kwargs)
420 km.start_kernel(**kwargs)
421 kc = km.client()
421 kc = km.client()
422 kc.start_channels()
422 kc.start_channels()
423 kc.wait_for_ready()
423
424
424 kc.kernel_info()
425 kc.get_shell_msg(block=True, timeout=startup_timeout)
426
427 # Flush channels
428 for channel in (kc.shell_channel, kc.iopub_channel):
429 while True:
430 try:
431 channel.get_msg(block=True, timeout=0.1)
432 except Empty:
433 break
434 return km, kc
425 return km, kc
435
426
436 @contextmanager
427 @contextmanager
@@ -92,6 +92,7 b' def setup_kernel(cmd):'
92 client = BlockingKernelClient(connection_file=connection_file)
92 client = BlockingKernelClient(connection_file=connection_file)
93 client.load_connection_file()
93 client.load_connection_file()
94 client.start_channels()
94 client.start_channels()
95 client.wait_for_ready()
95
96
96 try:
97 try:
97 yield client
98 yield client
@@ -46,7 +46,7 b' class ExecutePreprocessor(Preprocessor):'
46 if cell.cell_type != 'code':
46 if cell.cell_type != 'code':
47 return cell, resources
47 return cell, resources
48 try:
48 try:
49 outputs = self.run_cell(self.kc.shell_channel, self.kc.iopub_channel, cell)
49 outputs = self.run_cell(cell)
50 except Exception as e:
50 except Exception as e:
51 self.log.error("failed to run cell: " + repr(e))
51 self.log.error("failed to run cell: " + repr(e))
52 self.log.error(str(cell.source))
52 self.log.error(str(cell.source))
@@ -54,13 +54,13 b' class ExecutePreprocessor(Preprocessor):'
54 cell.outputs = outputs
54 cell.outputs = outputs
55 return cell, resources
55 return cell, resources
56
56
57 def run_cell(self, shell, iopub, cell):
57 def run_cell(self, cell):
58 msg_id = shell.execute(cell.source)
58 msg_id = self.kc.execute(cell.source)
59 self.log.debug("Executing cell:\n%s", cell.source)
59 self.log.debug("Executing cell:\n%s", cell.source)
60 # wait for finish, with timeout
60 # wait for finish, with timeout
61 while True:
61 while True:
62 try:
62 try:
63 msg = shell.get_msg(timeout=self.timeout)
63 msg = self.kc.shell_channel.get_msg(timeout=self.timeout)
64 except Empty:
64 except Empty:
65 self.log.error("Timeout waiting for execute reply")
65 self.log.error("Timeout waiting for execute reply")
66 raise
66 raise
@@ -74,7 +74,7 b' class ExecutePreprocessor(Preprocessor):'
74
74
75 while True:
75 while True:
76 try:
76 try:
77 msg = iopub.get_msg(timeout=self.timeout)
77 msg = self.kc.iopub_channel.get_msg(timeout=self.timeout)
78 except Empty:
78 except Empty:
79 self.log.warn("Timeout waiting for IOPub output")
79 self.log.warn("Timeout waiting for IOPub output")
80 break
80 break
@@ -1,37 +1,249 b''
1 """ Defines a KernelClient that provides signals and slots.
1 """ Defines a KernelClient that provides signals and slots.
2 """
2 """
3 import atexit
4 import errno
5 from threading import Thread
6 import time
7
8 import zmq
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 # during garbage collection of threads at exit:
11 from zmq import ZMQError
12 from zmq.eventloop import ioloop, zmqstream
13
14 from IPython.external.qt import QtCore
3
15
4 # Local imports
16 # Local imports
5 from IPython.utils.traitlets import Type
17 from IPython.utils.traitlets import Type, Instance
6 from IPython.kernel.channels import (
18 from IPython.kernel.channels import HBChannel
7 ShellChannel, IOPubChannel, StdInChannel, HBChannel
8 )
9 from IPython.kernel import KernelClient
19 from IPython.kernel import KernelClient
10
20
11 from .kernel_mixins import (
21 from .kernel_mixins import QtKernelClientMixin
12 QtShellChannelMixin, QtIOPubChannelMixin,
22 from .util import SuperQObject
13 QtStdInChannelMixin, QtHBChannelMixin,
14 QtKernelClientMixin
15 )
16
23
17 class QtShellChannel(QtShellChannelMixin, ShellChannel):
24 class QtHBChannel(SuperQObject, HBChannel):
18 pass
25 # A longer timeout than the base class
26 time_to_dead = 3.0
19
27
20 class QtIOPubChannel(QtIOPubChannelMixin, IOPubChannel):
28 # Emitted when the kernel has died.
21 pass
29 kernel_died = QtCore.Signal(object)
22
30
23 class QtStdInChannel(QtStdInChannelMixin, StdInChannel):
31 def call_handlers(self, since_last_heartbeat):
24 pass
32 """ Reimplemented to emit signals instead of making callbacks.
33 """
34 # Emit the generic signal.
35 self.kernel_died.emit(since_last_heartbeat)
36
37 from IPython.core.release import kernel_protocol_version_info
25
38
26 class QtHBChannel(QtHBChannelMixin, HBChannel):
39 major_protocol_version = kernel_protocol_version_info[0]
40
41 class InvalidPortNumber(Exception):
27 pass
42 pass
28
43
29
44
45 class QtZMQSocketChannel(SuperQObject):
46 """A ZMQ socket emitting a Qt signal when a message is received."""
47 session = None
48 socket = None
49 ioloop = None
50 stream = None
51
52 message_received = QtCore.Signal(object)
53
54 def process_events(self):
55 """ Process any pending GUI events.
56 """
57 QtCore.QCoreApplication.instance().processEvents()
58
59 def __init__(self, socket, session, loop):
60 """Create a channel.
61
62 Parameters
63 ----------
64 socket : :class:`zmq.Socket`
65 The ZMQ socket to use.
66 session : :class:`session.Session`
67 The session to use.
68 loop
69 A pyzmq ioloop to connect the socket to using a ZMQStream
70 """
71 super(QtZMQSocketChannel, self).__init__()
72
73 self.socket = socket
74 self.session = session
75 self.ioloop = loop
76
77 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
78 self.stream.on_recv(self._handle_recv)
79
80 _is_alive = False
81 def is_alive(self):
82 return self._is_alive
83
84 def start(self):
85 self._is_alive = True
86
87 def stop(self):
88 self._is_alive = False
89
90 def close(self):
91 if self.socket is not None:
92 try:
93 self.socket.close(linger=0)
94 except Exception:
95 pass
96 self.socket = None
97
98 def send(self, msg):
99 """Queue a message to be sent from the IOLoop's thread.
100
101 Parameters
102 ----------
103 msg : message to send
104
105 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
106 thread control of the action.
107 """
108 def thread_send():
109 self.session.send(self.stream, msg)
110 self.ioloop.add_callback(thread_send)
111
112 def _handle_recv(self, msg):
113 """Callback for stream.on_recv.
114
115 Unpacks message, and calls handlers with it.
116 """
117 ident,smsg = self.session.feed_identities(msg)
118 msg = self.session.deserialize(smsg)
119 self.call_handlers(msg)
120
121 def call_handlers(self, msg):
122 """This method is called in the ioloop thread when a message arrives.
123
124 Subclasses should override this method to handle incoming messages.
125 It is important to remember that this method is called in the thread
126 so that some logic must be done to ensure that the application level
127 handlers are called in the application thread.
128 """
129 # Emit the generic signal.
130 self.message_received.emit(msg)
131
132 def flush(self, timeout=1.0):
133 """Immediately processes all pending messages on this channel.
134
135 This is only used for the IOPub channel.
136
137 Callers should use this method to ensure that :meth:`call_handlers`
138 has been called for all messages that have been received on the
139 0MQ SUB socket of this channel.
140
141 This method is thread safe.
142
143 Parameters
144 ----------
145 timeout : float, optional
146 The maximum amount of time to spend flushing, in seconds. The
147 default is one second.
148 """
149 # We do the IOLoop callback process twice to ensure that the IOLoop
150 # gets to perform at least one full poll.
151 stop_time = time.time() + timeout
152 for i in range(2):
153 self._flushed = False
154 self.ioloop.add_callback(self._flush)
155 while not self._flushed and time.time() < stop_time:
156 time.sleep(0.01)
157
158 def _flush(self):
159 """Callback for :method:`self.flush`."""
160 self.stream.flush()
161 self._flushed = True
162
163
164 class IOLoopThread(Thread):
165 """Run a pyzmq ioloop in a thread to send and receive messages
166 """
167 def __init__(self, loop):
168 super(IOLoopThread, self).__init__()
169 self.daemon = True
170 atexit.register(self._notice_exit)
171 self.ioloop = loop or ioloop.IOLoop()
172
173 def _notice_exit(self):
174 self._exiting = True
175
176 def run(self):
177 """Run my loop, ignoring EINTR events in the poller"""
178 while True:
179 try:
180 self.ioloop.start()
181 except ZMQError as e:
182 if e.errno == errno.EINTR:
183 continue
184 else:
185 raise
186 except Exception:
187 if self._exiting:
188 break
189 else:
190 raise
191 else:
192 break
193
194 def stop(self):
195 """Stop the channel's event loop and join its thread.
196
197 This calls :meth:`~threading.Thread.join` and returns when the thread
198 terminates. :class:`RuntimeError` will be raised if
199 :meth:`~threading.Thread.start` is called again.
200 """
201 if self.ioloop is not None:
202 self.ioloop.stop()
203 self.join()
204 self.close()
205
206 def close(self):
207 if self.ioloop is not None:
208 try:
209 self.ioloop.close(all_fds=True)
210 except Exception:
211 pass
212
213
30 class QtKernelClient(QtKernelClientMixin, KernelClient):
214 class QtKernelClient(QtKernelClientMixin, KernelClient):
31 """ A KernelClient that provides signals and slots.
215 """ A KernelClient that provides signals and slots.
32 """
216 """
33
217
34 iopub_channel_class = Type(QtIOPubChannel)
218 _ioloop = None
35 shell_channel_class = Type(QtShellChannel)
219 @property
36 stdin_channel_class = Type(QtStdInChannel)
220 def ioloop(self):
221 if self._ioloop is None:
222 self._ioloop = ioloop.IOLoop()
223 return self._ioloop
224
225 ioloop_thread = Instance(IOLoopThread)
226
227 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
228 if shell:
229 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
230
231 self.ioloop_thread = IOLoopThread(self.ioloop)
232 self.ioloop_thread.start()
233
234 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
235
236 def _check_kernel_info_reply(self, msg):
237 if msg['msg_type'] == 'kernel_info_reply':
238 self._handle_kernel_info_reply(msg)
239 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
240
241 def stop_channels(self):
242 super(QtKernelClient, self).stop_channels()
243 if self.ioloop_thread.is_alive():
244 self.ioloop_thread.stop()
245
246 iopub_channel_class = Type(QtZMQSocketChannel)
247 shell_channel_class = Type(QtZMQSocketChannel)
248 stdin_channel_class = Type(QtZMQSocketChannel)
37 hb_channel_class = Type(QtHBChannel)
249 hb_channel_class = Type(QtHBChannel)
@@ -82,6 +82,8 b' class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin):'
82 # The text to show when the kernel is (re)started.
82 # The text to show when the kernel is (re)started.
83 banner = Unicode(config=True)
83 banner = Unicode(config=True)
84 kernel_banner = Unicode()
84 kernel_banner = Unicode()
85 # Whether to show the banner
86 _display_banner = Bool(False)
85
87
86 # An option and corresponding signal for overriding the default kernel
88 # An option and corresponding signal for overriding the default kernel
87 # interrupt behavior.
89 # interrupt behavior.
@@ -464,7 +466,7 b' class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin):'
464 self.kernel_client.iopub_channel.flush()
466 self.kernel_client.iopub_channel.flush()
465
467
466 def callback(line):
468 def callback(line):
467 self.kernel_client.stdin_channel.input(line)
469 self.kernel_client.input(line)
468 if self._reading:
470 if self._reading:
469 self.log.debug("Got second input request, assuming first was interrupted.")
471 self.log.debug("Got second input request, assuming first was interrupted.")
470 self._reading = False
472 self._reading = False
@@ -225,7 +225,7 b' class HistoryConsoleWidget(ConsoleWidget):'
225 return self._history[-n:]
225 return self._history[-n:]
226
226
227 def _request_update_session_history_length(self):
227 def _request_update_session_history_length(self):
228 msg_id = self.kernel_client.shell_channel.execute('',
228 msg_id = self.kernel_client.execute('',
229 silent=True,
229 silent=True,
230 user_expressions={
230 user_expressions={
231 'hlen':'len(get_ipython().history_manager.input_hist_raw)',
231 'hlen':'len(get_ipython().history_manager.input_hist_raw)',
@@ -203,7 +203,7 b' class IPythonWidget(FrontendWidget):'
203 self._retrying_history_request = True
203 self._retrying_history_request = True
204 # wait out the kernel's queue flush, which is currently timed at 0.1s
204 # wait out the kernel's queue flush, which is currently timed at 0.1s
205 time.sleep(0.25)
205 time.sleep(0.25)
206 self.kernel_client.shell_channel.history(hist_access_type='tail',n=1000)
206 self.kernel_client.history(hist_access_type='tail',n=1000)
207 else:
207 else:
208 self._retrying_history_request = False
208 self._retrying_history_request = False
209 return
209 return
@@ -296,12 +296,11 b' class IPythonWidget(FrontendWidget):'
296 # The reply will trigger %guiref load provided language=='python'
296 # The reply will trigger %guiref load provided language=='python'
297 self.kernel_client.kernel_info()
297 self.kernel_client.kernel_info()
298
298
299 self.kernel_client.shell_channel.history(hist_access_type='tail',
299 self.kernel_client.history(hist_access_type='tail', n=1000)
300 n=1000)
301
300
302 def _load_guiref_magic(self):
301 def _load_guiref_magic(self):
303 """Load %guiref magic."""
302 """Load %guiref magic."""
304 self.kernel_client.shell_channel.execute('\n'.join([
303 self.kernel_client.execute('\n'.join([
305 "try:",
304 "try:",
306 " _usage",
305 " _usage",
307 "except:",
306 "except:",
@@ -385,7 +384,7 b' class IPythonWidget(FrontendWidget):'
385 """
384 """
386 # If a number was not specified, make a prompt number request.
385 # If a number was not specified, make a prompt number request.
387 if number is None:
386 if number is None:
388 msg_id = self.kernel_client.shell_channel.execute('', silent=True)
387 msg_id = self.kernel_client.execute('', silent=True)
389 info = self._ExecutionRequest(msg_id, 'prompt')
388 info = self._ExecutionRequest(msg_id, 'prompt')
390 self._request_info['execute'][msg_id] = info
389 self._request_info['execute'][msg_id] = info
391 return
390 return
@@ -2,38 +2,73 b''
2 """
2 """
3
3
4 # Local imports.
4 # Local imports.
5 from IPython.external.qt import QtCore
5 from IPython.kernel.inprocess import (
6 from IPython.kernel.inprocess import (
6 InProcessShellChannel, InProcessIOPubChannel, InProcessStdInChannel,
7 InProcessHBChannel, InProcessKernelClient, InProcessKernelManager,
7 InProcessHBChannel, InProcessKernelClient, InProcessKernelManager,
8 )
8 )
9 from IPython.kernel.inprocess.channels import InProcessChannel
9
10
10 from IPython.utils.traitlets import Type
11 from IPython.utils.traitlets import Type
12 from .util import SuperQObject
11 from .kernel_mixins import (
13 from .kernel_mixins import (
12 QtShellChannelMixin, QtIOPubChannelMixin,
14 QtKernelClientMixin, QtKernelManagerMixin,
13 QtStdInChannelMixin, QtHBChannelMixin, QtKernelClientMixin,
14 QtKernelManagerMixin,
15 )
15 )
16
16
17 class QtInProcessChannel(SuperQObject, InProcessChannel):
18 # Emitted when the channel is started.
19 started = QtCore.Signal()
17
20
18 class QtInProcessShellChannel(QtShellChannelMixin, InProcessShellChannel):
21 # Emitted when the channel is stopped.
19 pass
22 stopped = QtCore.Signal()
20
23
21 class QtInProcessIOPubChannel(QtIOPubChannelMixin, InProcessIOPubChannel):
24 # Emitted when any message is received.
22 pass
25 message_received = QtCore.Signal(object)
23
26
24 class QtInProcessStdInChannel(QtStdInChannelMixin, InProcessStdInChannel):
27 def start(self):
25 pass
28 """ Reimplemented to emit signal.
29 """
30 super(QtInProcessChannel, self).start()
31 self.started.emit()
32
33 def stop(self):
34 """ Reimplemented to emit signal.
35 """
36 super(QtInProcessChannel, self).stop()
37 self.stopped.emit()
38
39 def call_handlers_later(self, *args, **kwds):
40 """ Call the message handlers later.
41 """
42 do_later = lambda: self.call_handlers(*args, **kwds)
43 QtCore.QTimer.singleShot(0, do_later)
44
45 def call_handlers(self, msg):
46 self.message_received.emit(msg)
47
48 def process_events(self):
49 """ Process any pending GUI events.
50 """
51 QtCore.QCoreApplication.instance().processEvents()
52
53 def flush(self, timeout=1.0):
54 """ Reimplemented to ensure that signals are dispatched immediately.
55 """
56 super(QtInProcessChannel, self).flush()
57 self.process_events()
58
59
60 class QtInProcessHBChannel(SuperQObject, InProcessHBChannel):
61 # This signal will never be fired, but it needs to exist
62 kernel_died = QtCore.Signal()
26
63
27 class QtInProcessHBChannel(QtHBChannelMixin, InProcessHBChannel):
28 pass
29
64
30 class QtInProcessKernelClient(QtKernelClientMixin, InProcessKernelClient):
65 class QtInProcessKernelClient(QtKernelClientMixin, InProcessKernelClient):
31 """ An in-process KernelManager with signals and slots.
66 """ An in-process KernelManager with signals and slots.
32 """
67 """
33
68
34 iopub_channel_class = Type(QtInProcessIOPubChannel)
69 iopub_channel_class = Type(QtInProcessChannel)
35 shell_channel_class = Type(QtInProcessShellChannel)
70 shell_channel_class = Type(QtInProcessChannel)
36 stdin_channel_class = Type(QtInProcessStdInChannel)
71 stdin_channel_class = Type(QtInProcessChannel)
37 hb_channel_class = Type(QtInProcessHBChannel)
72 hb_channel_class = Type(QtInProcessHBChannel)
38
73
39 class QtInProcessKernelManager(QtKernelManagerMixin, InProcessKernelManager):
74 class QtInProcessKernelManager(QtKernelManagerMixin, InProcessKernelManager):
@@ -9,146 +9,6 b' from IPython.utils.traitlets import HasTraits, Type'
9 from .util import MetaQObjectHasTraits, SuperQObject
9 from .util import MetaQObjectHasTraits, SuperQObject
10
10
11
11
12 class ChannelQObject(SuperQObject):
13
14 # Emitted when the channel is started.
15 started = QtCore.Signal()
16
17 # Emitted when the channel is stopped.
18 stopped = QtCore.Signal()
19
20 def start(self):
21 """ Reimplemented to emit signal.
22 """
23 super(ChannelQObject, self).start()
24 self.started.emit()
25
26 def stop(self):
27 """ Reimplemented to emit signal.
28 """
29 super(ChannelQObject, self).stop()
30 self.stopped.emit()
31
32 #---------------------------------------------------------------------------
33 # InProcessChannel interface
34 #---------------------------------------------------------------------------
35
36 def call_handlers_later(self, *args, **kwds):
37 """ Call the message handlers later.
38 """
39 do_later = lambda: self.call_handlers(*args, **kwds)
40 QtCore.QTimer.singleShot(0, do_later)
41
42 def process_events(self):
43 """ Process any pending GUI events.
44 """
45 QtCore.QCoreApplication.instance().processEvents()
46
47
48 class QtShellChannelMixin(ChannelQObject):
49
50 # Emitted when any message is received.
51 message_received = QtCore.Signal(object)
52
53 # Emitted when a reply has been received for the corresponding request type.
54 execute_reply = QtCore.Signal(object)
55 complete_reply = QtCore.Signal(object)
56 inspect_reply = QtCore.Signal(object)
57 history_reply = QtCore.Signal(object)
58 kernel_info_reply = QtCore.Signal(object)
59
60 def call_handlers(self, msg):
61 """ Reimplemented to emit signals instead of making callbacks.
62 """
63 # Emit the generic signal.
64 self.message_received.emit(msg)
65
66 # Emit signals for specialized message types.
67 msg_type = msg['header']['msg_type']
68 if msg_type == 'kernel_info_reply':
69 self._handle_kernel_info_reply(msg)
70
71 signal = getattr(self, msg_type, None)
72 if signal:
73 signal.emit(msg)
74
75
76 class QtIOPubChannelMixin(ChannelQObject):
77
78 # Emitted when any message is received.
79 message_received = QtCore.Signal(object)
80
81 # Emitted when a message of type 'stream' is received.
82 stream_received = QtCore.Signal(object)
83
84 # Emitted when a message of type 'execute_input' is received.
85 execute_input_received = QtCore.Signal(object)
86
87 # Emitted when a message of type 'execute_result' is received.
88 execute_result_received = QtCore.Signal(object)
89
90 # Emitted when a message of type 'error' is received.
91 error_received = QtCore.Signal(object)
92
93 # Emitted when a message of type 'display_data' is received
94 display_data_received = QtCore.Signal(object)
95
96 # Emitted when a crash report message is received from the kernel's
97 # last-resort sys.excepthook.
98 crash_received = QtCore.Signal(object)
99
100 # Emitted when a shutdown is noticed.
101 shutdown_reply_received = QtCore.Signal(object)
102
103 def call_handlers(self, msg):
104 """ Reimplemented to emit signals instead of making callbacks.
105 """
106 # Emit the generic signal.
107 self.message_received.emit(msg)
108 # Emit signals for specialized message types.
109 msg_type = msg['header']['msg_type']
110 signal = getattr(self, msg_type + '_received', None)
111 if signal:
112 signal.emit(msg)
113
114 def flush(self):
115 """ Reimplemented to ensure that signals are dispatched immediately.
116 """
117 super(QtIOPubChannelMixin, self).flush()
118 QtCore.QCoreApplication.instance().processEvents()
119
120
121 class QtStdInChannelMixin(ChannelQObject):
122
123 # Emitted when any message is received.
124 message_received = QtCore.Signal(object)
125
126 # Emitted when an input request is received.
127 input_requested = QtCore.Signal(object)
128
129 def call_handlers(self, msg):
130 """ Reimplemented to emit signals instead of making callbacks.
131 """
132 # Emit the generic signal.
133 self.message_received.emit(msg)
134
135 # Emit signals for specialized message types.
136 msg_type = msg['header']['msg_type']
137 if msg_type == 'input_request':
138 self.input_requested.emit(msg)
139
140
141 class QtHBChannelMixin(ChannelQObject):
142
143 # Emitted when the kernel has died.
144 kernel_died = QtCore.Signal(object)
145
146 def call_handlers(self, since_last_heartbeat):
147 """ Reimplemented to emit signals instead of making callbacks.
148 """
149 self.kernel_died.emit(since_last_heartbeat)
150
151
152 class QtKernelRestarterMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
12 class QtKernelRestarterMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
153
13
154 _timer = None
14 _timer = None
@@ -171,12 +31,6 b" class QtKernelClientMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObje"
171 # Emitted when the kernel client has stopped listening.
31 # Emitted when the kernel client has stopped listening.
172 stopped_channels = QtCore.Signal()
32 stopped_channels = QtCore.Signal()
173
33
174 # Use Qt-specific channel classes that emit signals.
175 iopub_channel_class = Type(QtIOPubChannelMixin)
176 shell_channel_class = Type(QtShellChannelMixin)
177 stdin_channel_class = Type(QtStdInChannelMixin)
178 hb_channel_class = Type(QtHBChannelMixin)
179
180 #---------------------------------------------------------------------------
34 #---------------------------------------------------------------------------
181 # 'KernelClient' interface
35 # 'KernelClient' interface
182 #---------------------------------------------------------------------------
36 #---------------------------------------------------------------------------
@@ -36,7 +36,7 b' class ZMQCompleter(IPCompleter):'
36
36
37 # send completion request to kernel
37 # send completion request to kernel
38 # Give the kernel up to 0.5s to respond
38 # Give the kernel up to 0.5s to respond
39 msg_id = self.client.shell_channel.complete(
39 msg_id = self.client.complete(
40 code=line,
40 code=line,
41 cursor_pos=cursor_pos,
41 cursor_pos=cursor_pos,
42 )
42 )
@@ -157,8 +157,8 b' class ZMQTerminalInteractiveShell(TerminalInteractiveShell):'
157 # flush stale replies, which could have been ignored, due to missed heartbeats
157 # flush stale replies, which could have been ignored, due to missed heartbeats
158 while self.client.shell_channel.msg_ready():
158 while self.client.shell_channel.msg_ready():
159 self.client.shell_channel.get_msg()
159 self.client.shell_channel.get_msg()
160 # shell_channel.execute takes 'hidden', which is the inverse of store_hist
160 # execute takes 'hidden', which is the inverse of store_hist
161 msg_id = self.client.shell_channel.execute(cell, not store_history)
161 msg_id = self.client.execute(cell, not store_history)
162
162
163 # first thing is wait for any side effects (output, stdin, etc.)
163 # first thing is wait for any side effects (output, stdin, etc.)
164 self._executing = True
164 self._executing = True
@@ -399,7 +399,7 b' class ZMQTerminalInteractiveShell(TerminalInteractiveShell):'
399 # only send stdin reply if there *was not* another request
399 # only send stdin reply if there *was not* another request
400 # or execution finished while we were reading.
400 # or execution finished while we were reading.
401 if not (self.client.stdin_channel.msg_ready() or self.client.shell_channel.msg_ready()):
401 if not (self.client.stdin_channel.msg_ready() or self.client.shell_channel.msg_ready()):
402 self.client.stdin_channel.input(raw_data)
402 self.client.input(raw_data)
403
403
404 def mainloop(self, display_banner=False):
404 def mainloop(self, display_banner=False):
405 while True:
405 while True:
@@ -414,7 +414,7 b' class ZMQTerminalInteractiveShell(TerminalInteractiveShell):'
414 # handling seems rather unpredictable...
414 # handling seems rather unpredictable...
415 self.write("\nKeyboardInterrupt in interact()\n")
415 self.write("\nKeyboardInterrupt in interact()\n")
416
416
417 self.client.shell_channel.shutdown()
417 self.client.shutdown()
418
418
419 def _banner1_default(self):
419 def _banner1_default(self):
420 return "IPython Console {version}\n".format(version=release.version)
420 return "IPython Console {version}\n".format(version=release.version)
General Comments 0
You need to be logged in to leave comments. Login now