##// END OF EJS Templates
Move kernel_info for adaptation onto KernelClient
Thomas Kluyver -
Show More
@@ -1,192 +1,174 b''
1 1 """Blocking channels
2 2
3 3 Useful for test suites and blocking terminal interfaces.
4 4 """
5 5
6 6 # Copyright (c) IPython Development Team.
7 7 # Distributed under the terms of the Modified BSD License.
8 8
9 9 try:
10 10 from queue import Queue, Empty # Py 3
11 11 except ImportError:
12 12 from Queue import Queue, Empty # Py 2
13 13
14 14 from IPython.kernel.channelsabc import ShellChannelABC, IOPubChannelABC, \
15 15 StdInChannelABC
16 16 from IPython.kernel.channels import HBChannel,\
17 17 make_iopub_socket, make_shell_socket, make_stdin_socket,\
18 18 InvalidPortNumber, major_protocol_version
19 19 from IPython.utils.py3compat import string_types, iteritems
20 20
21 21 # some utilities to validate message structure, these might get moved elsewhere
22 22 # if they prove to have more generic utility
23 23
24 24 def validate_string_list(lst):
25 25 """Validate that the input is a list of strings.
26 26
27 27 Raises ValueError if not."""
28 28 if not isinstance(lst, list):
29 29 raise ValueError('input %r must be a list' % lst)
30 30 for x in lst:
31 31 if not isinstance(x, string_types):
32 32 raise ValueError('element %r in list must be a string' % x)
33 33
34 34
35 35 def validate_string_dict(dct):
36 36 """Validate that the input is a dict with string keys and values.
37 37
38 38 Raises ValueError if not."""
39 39 for k,v in iteritems(dct):
40 40 if not isinstance(k, string_types):
41 41 raise ValueError('key %r in dict must be a string' % k)
42 42 if not isinstance(v, string_types):
43 43 raise ValueError('value %r in dict must be a string' % v)
44 44
45 45
46 46 class ZMQSocketChannel(object):
47 47 """The base class for the channels that use ZMQ sockets."""
48 48 context = None
49 49 session = None
50 50 socket = None
51 51 ioloop = None
52 52 stream = None
53 53 _address = None
54 54 _exiting = False
55 55 proxy_methods = []
56 56
57 57 def __init__(self, context, session, address):
58 58 """Create a channel.
59 59
60 60 Parameters
61 61 ----------
62 62 context : :class:`zmq.Context`
63 63 The ZMQ context to use.
64 64 session : :class:`session.Session`
65 65 The session to use.
66 66 address : zmq url
67 67 Standard (ip, port) tuple that the kernel is listening on.
68 68 """
69 69 super(ZMQSocketChannel, self).__init__()
70 70 self.daemon = True
71 71
72 72 self.context = context
73 73 self.session = session
74 74 if isinstance(address, tuple):
75 75 if address[1] == 0:
76 76 message = 'The port number for a channel cannot be 0.'
77 77 raise InvalidPortNumber(message)
78 78 address = "tcp://%s:%i" % address
79 79 self._address = address
80 80
81 81 def _recv(self, **kwargs):
82 82 msg = self.socket.recv_multipart(**kwargs)
83 83 ident,smsg = self.session.feed_identities(msg)
84 84 return self.session.deserialize(smsg)
85 85
86 86 def get_msg(self, block=True, timeout=None):
87 87 """ Gets a message if there is one that is ready. """
88 88 if block:
89 89 if timeout is not None:
90 90 timeout *= 1000 # seconds to ms
91 91 ready = self.socket.poll(timeout)
92 92 else:
93 93 ready = self.socket.poll(timeout=0)
94 94
95 95 if ready:
96 96 return self._recv()
97 97 else:
98 98 raise Empty
99 99
100 100 def get_msgs(self):
101 101 """ Get all messages that are currently ready. """
102 102 msgs = []
103 103 while True:
104 104 try:
105 105 msgs.append(self.get_msg(block=False))
106 106 except Empty:
107 107 break
108 108 return msgs
109 109
110 110 def msg_ready(self):
111 111 """ Is there a message that has been received? """
112 112 return bool(self.socket.poll(timeout=0))
113 113
114 114 def close(self):
115 115 if self.socket is not None:
116 116 try:
117 117 self.socket.close(linger=0)
118 118 except Exception:
119 119 pass
120 120 self.socket = None
121 121 stop = close
122 122
123 123 def is_alive(self):
124 124 return (self.socket is not None)
125 125
126 126 @property
127 127 def address(self):
128 128 """Get the channel's address as a zmq url string.
129 129
130 130 These URLS have the form: 'tcp://127.0.0.1:5555'.
131 131 """
132 132 return self._address
133 133
134 134 def _queue_send(self, msg):
135 135 """Pass a message to the ZMQ socket to send
136 136 """
137 137 self.session.send(self.socket, msg)
138 138
139 139
140 140 class BlockingShellChannel(ZMQSocketChannel):
141 141 """The shell channel for issuing request/replies to the kernel."""
142 142
143 143 def start(self):
144 144 self.socket = make_stdin_socket(self.context, self.session.bsession, self.address)
145 145
146 def _handle_kernel_info_reply(self, msg):
147 """handle kernel info reply
148
149 sets protocol adaptation version
150 """
151 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
152 if adapt_version != major_protocol_version:
153 self.session.adapt_version = adapt_version
154
155 def _recv(self, **kwargs):
156 # Listen for kernel_info_reply message to do protocol adaptation
157 msg = ZMQSocketChannel._recv(self, **kwargs)
158 if msg['msg_type'] == 'kernel_info_reply':
159 self._handle_kernel_info_reply(msg)
160 return msg
161 146
162 147 class BlockingIOPubChannel(ZMQSocketChannel):
163 148 """The iopub channel which listens for messages that the kernel publishes.
164 149
165 150 This channel is where all output is published to frontends.
166 151 """
167 152 def start(self):
168 153 self.socket = make_iopub_socket(self.context, self.session.bsession, self.address)
169 154
170 155 class BlockingStdInChannel(ZMQSocketChannel):
171 156 """The stdin channel to handle raw_input requests that the kernel makes."""
172 msg_queue = None
173 proxy_methods = ['input']
174
175 157 def start(self):
176 158 self.socket = make_stdin_socket(self.context, self.session.bsession, self.address)
177 159
178 160 ShellChannelABC.register(BlockingShellChannel)
179 161 IOPubChannelABC.register(BlockingIOPubChannel)
180 162 StdInChannelABC.register(BlockingStdInChannel)
181 163
182 164
183 165 class BlockingHBChannel(HBChannel):
184 166
185 167 # This kernel needs quicker monitoring, shorten to 1 sec.
186 168 # less than 0.5s is unreliable, and will get occasional
187 169 # false reports of missed beats.
188 170 time_to_dead = 1.
189 171
190 172 def call_handlers(self, since_last_heartbeat):
191 173 """ Pause beating on missed heartbeat. """
192 174 pass
@@ -1,33 +1,41 b''
1 1 """Implements a fully blocking kernel client.
2 2
3 3 Useful for test suites and blocking terminal interfaces.
4 4 """
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2013 The IPython Development Team
7 #
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING.txt, distributed as part of this software.
10 #-----------------------------------------------------------------------------
5 # Copyright (c) IPython Development Team.
6 # Distributed under the terms of the Modified BSD License.
11 7
12 #-----------------------------------------------------------------------------
13 # Imports
14 #-----------------------------------------------------------------------------
8 try:
9 from queue import Empty # Python 3
10 except ImportError:
11 from Queue import Empty # Python 2
15 12
16 13 from IPython.utils.traitlets import Type
17 14 from IPython.kernel.client import KernelClient
18 15 from .channels import (
19 16 BlockingIOPubChannel, BlockingHBChannel,
20 17 BlockingShellChannel, BlockingStdInChannel
21 18 )
22 19
23 #-----------------------------------------------------------------------------
24 # Blocking kernel manager
25 #-----------------------------------------------------------------------------
26
27 20 class BlockingKernelClient(KernelClient):
21 def wait_for_ready(self):
22 # Wait for kernel info reply on shell channel
23 while True:
24 msg = self.shell_channel.get_msg(block=True)
25 if msg['msg_type'] == 'kernel_info_reply':
26 self._handle_kernel_info_reply(msg)
27 break
28
29 # Flush IOPub channel
30 while True:
31 try:
32 msg = self.iopub_channel.get_msg(block=True, timeout=0.2)
33 print(msg['msg_type'])
34 except Empty:
35 break
28 36
29 37 # The classes to use for the various channels
30 38 shell_channel_class = Type(BlockingShellChannel)
31 39 iopub_channel_class = Type(BlockingIOPubChannel)
32 40 stdin_channel_class = Type(BlockingStdInChannel)
33 41 hb_channel_class = Type(BlockingHBChannel)
@@ -1,359 +1,369 b''
1 1 """Base class to manage the interaction with a running kernel"""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from __future__ import absolute_import
7 from IPython.kernel.channels import validate_string_dict
7 from IPython.kernel.channels import validate_string_dict, major_protocol_version
8 8 from IPython.utils.py3compat import string_types
9 9
10 10 import zmq
11 11
12 12 from IPython.utils.traitlets import (
13 13 Any, Instance, Type,
14 14 )
15 15
16 16 from .channelsabc import (
17 17 ShellChannelABC, IOPubChannelABC, HBChannelABC, StdInChannelABC
18 18 )
19 19 from .clientabc import KernelClientABC
20 20 from .connect import ConnectionFileMixin
21 21
22 22
23 23 class KernelClient(ConnectionFileMixin):
24 24 """Communicates with a single kernel on any host via zmq channels.
25 25
26 26 There are four channels associated with each kernel:
27 27
28 28 * shell: for request/reply calls to the kernel.
29 29 * iopub: for the kernel to publish results to frontends.
30 30 * hb: for monitoring the kernel's heartbeat.
31 31 * stdin: for frontends to reply to raw_input calls in the kernel.
32 32
33 33 The methods of the channels are exposed as methods of the client itself
34 34 (KernelClient.execute, complete, history, etc.).
35 35 See the channels themselves for documentation of these methods.
36 36
37 37 """
38 38
39 39 # The PyZMQ Context to use for communication with the kernel.
40 40 context = Instance(zmq.Context)
41 41 def _context_default(self):
42 42 return zmq.Context.instance()
43 43
44 44 # The classes to use for the various channels
45 45 shell_channel_class = Type(ShellChannelABC)
46 46 iopub_channel_class = Type(IOPubChannelABC)
47 47 stdin_channel_class = Type(StdInChannelABC)
48 48 hb_channel_class = Type(HBChannelABC)
49 49
50 50 # Protected traits
51 51 _shell_channel = Any
52 52 _iopub_channel = Any
53 53 _stdin_channel = Any
54 54 _hb_channel = Any
55 55
56 56 # flag for whether execute requests should be allowed to call raw_input:
57 57 allow_stdin = True
58 58
59 59 #--------------------------------------------------------------------------
60 60 # Channel proxy methods
61 61 #--------------------------------------------------------------------------
62 62
63 63 def _get_msg(channel, *args, **kwargs):
64 64 return channel.get_msg(*args, **kwargs)
65 65
66 66 def get_shell_msg(self, *args, **kwargs):
67 67 """Get a message from the shell channel"""
68 68 return self.shell_channel.get_msg(*args, **kwargs)
69 69
70 70 def get_iopub_msg(self, *args, **kwargs):
71 71 """Get a message from the iopub channel"""
72 72 return self.iopub_channel.get_msg(*args, **kwargs)
73 73
74 74 def get_stdin_msg(self, *args, **kwargs):
75 75 """Get a message from the stdin channel"""
76 76 return self.stdin_channel.get_msg(*args, **kwargs)
77 77
78 78 #--------------------------------------------------------------------------
79 79 # Channel management methods
80 80 #--------------------------------------------------------------------------
81 81
82 82 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
83 83 """Starts the channels for this kernel.
84 84
85 85 This will create the channels if they do not exist and then start
86 86 them (their activity runs in a thread). If port numbers of 0 are
87 87 being used (random ports) then you must first call
88 88 :meth:`start_kernel`. If the channels have been stopped and you
89 89 call this, :class:`RuntimeError` will be raised.
90 90 """
91 91 if shell:
92 92 self.shell_channel.start()
93 self.kernel_info()
93 94 if iopub:
94 95 self.iopub_channel.start()
95 96 if stdin:
96 97 self.stdin_channel.start()
97 98 self.allow_stdin = True
98 99 else:
99 100 self.allow_stdin = False
100 101 if hb:
101 102 self.hb_channel.start()
102 103
103 104 def stop_channels(self):
104 105 """Stops all the running channels for this kernel.
105 106
106 107 This stops their event loops and joins their threads.
107 108 """
108 109 if self.shell_channel.is_alive():
109 110 self.shell_channel.stop()
110 111 if self.iopub_channel.is_alive():
111 112 self.iopub_channel.stop()
112 113 if self.stdin_channel.is_alive():
113 114 self.stdin_channel.stop()
114 115 if self.hb_channel.is_alive():
115 116 self.hb_channel.stop()
116 117
117 118 @property
118 119 def channels_running(self):
119 120 """Are any of the channels created and running?"""
120 121 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
121 122 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
122 123
123 124 @property
124 125 def shell_channel(self):
125 126 """Get the shell channel object for this kernel."""
126 127 if self._shell_channel is None:
127 128 url = self._make_url('shell')
128 129 self.log.debug("connecting shell channel to %s", url)
129 130 self._shell_channel = self.shell_channel_class(
130 131 self.context, self.session, url
131 132 )
132 133 return self._shell_channel
133 134
134 135 @property
135 136 def iopub_channel(self):
136 137 """Get the iopub channel object for this kernel."""
137 138 if self._iopub_channel is None:
138 139 url = self._make_url('iopub')
139 140 self.log.debug("connecting iopub channel to %s", url)
140 141 self._iopub_channel = self.iopub_channel_class(
141 142 self.context, self.session, url
142 143 )
143 144 return self._iopub_channel
144 145
145 146 @property
146 147 def stdin_channel(self):
147 148 """Get the stdin channel object for this kernel."""
148 149 if self._stdin_channel is None:
149 150 url = self._make_url('stdin')
150 151 self.log.debug("connecting stdin channel to %s", url)
151 152 self._stdin_channel = self.stdin_channel_class(
152 153 self.context, self.session, url
153 154 )
154 155 return self._stdin_channel
155 156
156 157 @property
157 158 def hb_channel(self):
158 159 """Get the hb channel object for this kernel."""
159 160 if self._hb_channel is None:
160 161 url = self._make_url('hb')
161 162 self.log.debug("connecting heartbeat channel to %s", url)
162 163 self._hb_channel = self.hb_channel_class(
163 164 self.context, self.session, url
164 165 )
165 166 return self._hb_channel
166 167
167 168 def is_alive(self):
168 169 """Is the kernel process still running?"""
169 170 if self._hb_channel is not None:
170 171 # We didn't start the kernel with this KernelManager so we
171 172 # use the heartbeat.
172 173 return self._hb_channel.is_beating()
173 174 else:
174 175 # no heartbeat and not local, we can't tell if it's running,
175 176 # so naively return True
176 177 return True
177 178
178 179
179 180 # Methods to send specific messages on channels
180 181 def execute(self, code, silent=False, store_history=True,
181 182 user_expressions=None, allow_stdin=None):
182 183 """Execute code in the kernel.
183 184
184 185 Parameters
185 186 ----------
186 187 code : str
187 188 A string of Python code.
188 189
189 190 silent : bool, optional (default False)
190 191 If set, the kernel will execute the code as quietly possible, and
191 192 will force store_history to be False.
192 193
193 194 store_history : bool, optional (default True)
194 195 If set, the kernel will store command history. This is forced
195 196 to be False if silent is True.
196 197
197 198 user_expressions : dict, optional
198 199 A dict mapping names to expressions to be evaluated in the user's
199 200 dict. The expression values are returned as strings formatted using
200 201 :func:`repr`.
201 202
202 203 allow_stdin : bool, optional (default self.allow_stdin)
203 204 Flag for whether the kernel can send stdin requests to frontends.
204 205
205 206 Some frontends (e.g. the Notebook) do not support stdin requests.
206 207 If raw_input is called from code executed from such a frontend, a
207 208 StdinNotImplementedError will be raised.
208 209
209 210 Returns
210 211 -------
211 212 The msg_id of the message sent.
212 213 """
213 214 if user_expressions is None:
214 215 user_expressions = {}
215 216 if allow_stdin is None:
216 217 allow_stdin = self.allow_stdin
217 218
218 219
219 220 # Don't waste network traffic if inputs are invalid
220 221 if not isinstance(code, string_types):
221 222 raise ValueError('code %r must be a string' % code)
222 223 validate_string_dict(user_expressions)
223 224
224 225 # Create class for content/msg creation. Related to, but possibly
225 226 # not in Session.
226 227 content = dict(code=code, silent=silent, store_history=store_history,
227 228 user_expressions=user_expressions,
228 229 allow_stdin=allow_stdin,
229 230 )
230 231 msg = self.session.msg('execute_request', content)
231 232 self.shell_channel._queue_send(msg)
232 233 return msg['header']['msg_id']
233 234
234 235 def complete(self, code, cursor_pos=None):
235 236 """Tab complete text in the kernel's namespace.
236 237
237 238 Parameters
238 239 ----------
239 240 code : str
240 241 The context in which completion is requested.
241 242 Can be anything between a variable name and an entire cell.
242 243 cursor_pos : int, optional
243 244 The position of the cursor in the block of code where the completion was requested.
244 245 Default: ``len(code)``
245 246
246 247 Returns
247 248 -------
248 249 The msg_id of the message sent.
249 250 """
250 251 if cursor_pos is None:
251 252 cursor_pos = len(code)
252 253 content = dict(code=code, cursor_pos=cursor_pos)
253 254 msg = self.session.msg('complete_request', content)
254 255 self.shell_channel._queue_send(msg)
255 256 return msg['header']['msg_id']
256 257
257 258 def inspect(self, code, cursor_pos=None, detail_level=0):
258 259 """Get metadata information about an object in the kernel's namespace.
259 260
260 261 It is up to the kernel to determine the appropriate object to inspect.
261 262
262 263 Parameters
263 264 ----------
264 265 code : str
265 266 The context in which info is requested.
266 267 Can be anything between a variable name and an entire cell.
267 268 cursor_pos : int, optional
268 269 The position of the cursor in the block of code where the info was requested.
269 270 Default: ``len(code)``
270 271 detail_level : int, optional
271 272 The level of detail for the introspection (0-2)
272 273
273 274 Returns
274 275 -------
275 276 The msg_id of the message sent.
276 277 """
277 278 if cursor_pos is None:
278 279 cursor_pos = len(code)
279 280 content = dict(code=code, cursor_pos=cursor_pos,
280 281 detail_level=detail_level,
281 282 )
282 283 msg = self.session.msg('inspect_request', content)
283 284 self.shell_channel._queue_send(msg)
284 285 return msg['header']['msg_id']
285 286
286 287 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
287 288 """Get entries from the kernel's history list.
288 289
289 290 Parameters
290 291 ----------
291 292 raw : bool
292 293 If True, return the raw input.
293 294 output : bool
294 295 If True, then return the output as well.
295 296 hist_access_type : str
296 297 'range' (fill in session, start and stop params), 'tail' (fill in n)
297 298 or 'search' (fill in pattern param).
298 299
299 300 session : int
300 301 For a range request, the session from which to get lines. Session
301 302 numbers are positive integers; negative ones count back from the
302 303 current session.
303 304 start : int
304 305 The first line number of a history range.
305 306 stop : int
306 307 The final (excluded) line number of a history range.
307 308
308 309 n : int
309 310 The number of lines of history to get for a tail request.
310 311
311 312 pattern : str
312 313 The glob-syntax pattern for a search request.
313 314
314 315 Returns
315 316 -------
316 317 The msg_id of the message sent.
317 318 """
318 319 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
319 320 **kwargs)
320 321 msg = self.session.msg('history_request', content)
321 322 self.shell_channel._queue_send(msg)
322 323 return msg['header']['msg_id']
323 324
324 325 def kernel_info(self):
325 326 """Request kernel info."""
326 327 msg = self.session.msg('kernel_info_request')
327 328 self.shell_channel._queue_send(msg)
328 329 return msg['header']['msg_id']
329 330
331 def _handle_kernel_info_reply(self, msg):
332 """handle kernel info reply
333
334 sets protocol adaptation version
335 """
336 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
337 if adapt_version != major_protocol_version:
338 self.session.adapt_version = adapt_version
339
330 340 def shutdown(self, restart=False):
331 341 """Request an immediate kernel shutdown.
332 342
333 343 Upon receipt of the (empty) reply, client code can safely assume that
334 344 the kernel has shut down and it's safe to forcefully terminate it if
335 345 it's still alive.
336 346
337 347 The kernel will send the reply via a function registered with Python's
338 348 atexit module, ensuring it's truly done as the kernel is done with all
339 349 normal operation.
340 350 """
341 351 # Send quit message to kernel. Once we implement kernel-side setattr,
342 352 # this should probably be done that way, but for now this will do.
343 353 msg = self.session.msg('shutdown_request', {'restart':restart})
344 354 self.shell_channel._queue_send(msg)
345 355 return msg['header']['msg_id']
346 356
347 357 def is_complete(self, code):
348 358 msg = self.session.msg('is_complete_request', {'code': code})
349 359 self.shell_channel._queue_send(msg)
350 360 return msg['header']['msg_id']
351 361
352 362 def input(self, string):
353 363 """Send a string of raw input to the kernel."""
354 364 content = dict(value=string)
355 365 msg = self.session.msg('input_reply', content)
356 366 self.stdin_channel._queue_send(msg)
357 367
358 368
359 369 KernelClientABC.register(KernelClient)
@@ -1,452 +1,443 b''
1 1 """Base class to manage a running kernel"""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from __future__ import absolute_import
7 7
8 8 from contextlib import contextmanager
9 9 import os
10 10 import re
11 11 import signal
12 12 import sys
13 13 import time
14 14 import warnings
15 15 try:
16 16 from queue import Empty # Py 3
17 17 except ImportError:
18 18 from Queue import Empty # Py 2
19 19
20 20 import zmq
21 21
22 22 from IPython.utils.importstring import import_item
23 23 from IPython.utils.localinterfaces import is_local_ip, local_ips
24 24 from IPython.utils.path import get_ipython_dir
25 25 from IPython.utils.traitlets import (
26 26 Any, Instance, Unicode, List, Bool, Type, DottedObjectName
27 27 )
28 28 from IPython.kernel import (
29 29 launch_kernel,
30 30 kernelspec,
31 31 )
32 32 from .connect import ConnectionFileMixin
33 33 from .zmq.session import Session
34 34 from .managerabc import (
35 35 KernelManagerABC
36 36 )
37 37
38 38
39 39 class KernelManager(ConnectionFileMixin):
40 40 """Manages a single kernel in a subprocess on this host.
41 41
42 42 This version starts kernels with Popen.
43 43 """
44 44
45 45 # The PyZMQ Context to use for communication with the kernel.
46 46 context = Instance(zmq.Context)
47 47 def _context_default(self):
48 48 return zmq.Context.instance()
49 49
50 50 # the class to create with our `client` method
51 51 client_class = DottedObjectName('IPython.kernel.blocking.BlockingKernelClient')
52 52 client_factory = Type()
53 53 def _client_class_changed(self, name, old, new):
54 54 self.client_factory = import_item(str(new))
55 55
56 56 # The kernel process with which the KernelManager is communicating.
57 57 # generally a Popen instance
58 58 kernel = Any()
59 59
60 60 kernel_spec_manager = Instance(kernelspec.KernelSpecManager)
61 61
62 62 def _kernel_spec_manager_default(self):
63 63 return kernelspec.KernelSpecManager(ipython_dir=self.ipython_dir)
64 64
65 65 kernel_name = Unicode(kernelspec.NATIVE_KERNEL_NAME)
66 66
67 67 kernel_spec = Instance(kernelspec.KernelSpec)
68 68
69 69 def _kernel_spec_default(self):
70 70 return self.kernel_spec_manager.get_kernel_spec(self.kernel_name)
71 71
72 72 def _kernel_name_changed(self, name, old, new):
73 73 if new == 'python':
74 74 self.kernel_name = kernelspec.NATIVE_KERNEL_NAME
75 75 # This triggered another run of this function, so we can exit now
76 76 return
77 77 self.kernel_spec = self.kernel_spec_manager.get_kernel_spec(new)
78 78 self.ipython_kernel = new in {'python', 'python2', 'python3'}
79 79
80 80 kernel_cmd = List(Unicode, config=True,
81 81 help="""DEPRECATED: Use kernel_name instead.
82 82
83 83 The Popen Command to launch the kernel.
84 84 Override this if you have a custom kernel.
85 85 If kernel_cmd is specified in a configuration file,
86 86 IPython does not pass any arguments to the kernel,
87 87 because it cannot make any assumptions about the
88 88 arguments that the kernel understands. In particular,
89 89 this means that the kernel does not receive the
90 90 option --debug if it given on the IPython command line.
91 91 """
92 92 )
93 93
94 94 def _kernel_cmd_changed(self, name, old, new):
95 95 warnings.warn("Setting kernel_cmd is deprecated, use kernel_spec to "
96 96 "start different kernels.")
97 97 self.ipython_kernel = False
98 98
99 99 ipython_kernel = Bool(True)
100 100
101 101 ipython_dir = Unicode()
102 102 def _ipython_dir_default(self):
103 103 return get_ipython_dir()
104 104
105 105 # Protected traits
106 106 _launch_args = Any()
107 107 _control_socket = Any()
108 108
109 109 _restarter = Any()
110 110
111 111 autorestart = Bool(False, config=True,
112 112 help="""Should we autorestart the kernel if it dies."""
113 113 )
114 114
115 115 def __del__(self):
116 116 self._close_control_socket()
117 117 self.cleanup_connection_file()
118 118
119 119 #--------------------------------------------------------------------------
120 120 # Kernel restarter
121 121 #--------------------------------------------------------------------------
122 122
123 123 def start_restarter(self):
124 124 pass
125 125
126 126 def stop_restarter(self):
127 127 pass
128 128
129 129 def add_restart_callback(self, callback, event='restart'):
130 130 """register a callback to be called when a kernel is restarted"""
131 131 if self._restarter is None:
132 132 return
133 133 self._restarter.add_callback(callback, event)
134 134
135 135 def remove_restart_callback(self, callback, event='restart'):
136 136 """unregister a callback to be called when a kernel is restarted"""
137 137 if self._restarter is None:
138 138 return
139 139 self._restarter.remove_callback(callback, event)
140 140
141 141 #--------------------------------------------------------------------------
142 142 # create a Client connected to our Kernel
143 143 #--------------------------------------------------------------------------
144 144
145 145 def client(self, **kwargs):
146 146 """Create a client configured to connect to our kernel"""
147 147 if self.client_factory is None:
148 148 self.client_factory = import_item(self.client_class)
149 149
150 150 kw = {}
151 151 kw.update(self.get_connection_info())
152 152 kw.update(dict(
153 153 connection_file=self.connection_file,
154 154 session=self.session,
155 155 parent=self,
156 156 ))
157 157
158 158 # add kwargs last, for manual overrides
159 159 kw.update(kwargs)
160 160 return self.client_factory(**kw)
161 161
162 162 #--------------------------------------------------------------------------
163 163 # Kernel management
164 164 #--------------------------------------------------------------------------
165 165
166 166 def format_kernel_cmd(self, extra_arguments=None):
167 167 """replace templated args (e.g. {connection_file})"""
168 168 extra_arguments = extra_arguments or []
169 169 if self.kernel_cmd:
170 170 cmd = self.kernel_cmd + extra_arguments
171 171 else:
172 172 cmd = self.kernel_spec.argv + extra_arguments
173 173
174 174 ns = dict(connection_file=self.connection_file)
175 175 ns.update(self._launch_args)
176 176
177 177 pat = re.compile(r'\{([A-Za-z0-9_]+)\}')
178 178 def from_ns(match):
179 179 """Get the key out of ns if it's there, otherwise no change."""
180 180 return ns.get(match.group(1), match.group())
181 181
182 182 return [ pat.sub(from_ns, arg) for arg in cmd ]
183 183
184 184 def _launch_kernel(self, kernel_cmd, **kw):
185 185 """actually launch the kernel
186 186
187 187 override in a subclass to launch kernel subprocesses differently
188 188 """
189 189 return launch_kernel(kernel_cmd, **kw)
190 190
191 191 # Control socket used for polite kernel shutdown
192 192
193 193 def _connect_control_socket(self):
194 194 if self._control_socket is None:
195 195 self._control_socket = self.connect_control()
196 196 self._control_socket.linger = 100
197 197
198 198 def _close_control_socket(self):
199 199 if self._control_socket is None:
200 200 return
201 201 self._control_socket.close()
202 202 self._control_socket = None
203 203
204 204 def start_kernel(self, **kw):
205 205 """Starts a kernel on this host in a separate process.
206 206
207 207 If random ports (port=0) are being used, this method must be called
208 208 before the channels are created.
209 209
210 210 Parameters
211 211 ----------
212 212 **kw : optional
213 213 keyword arguments that are passed down to build the kernel_cmd
214 214 and launching the kernel (e.g. Popen kwargs).
215 215 """
216 216 if self.transport == 'tcp' and not is_local_ip(self.ip):
217 217 raise RuntimeError("Can only launch a kernel on a local interface. "
218 218 "Make sure that the '*_address' attributes are "
219 219 "configured properly. "
220 220 "Currently valid addresses are: %s" % local_ips()
221 221 )
222 222
223 223 # write connection file / get default ports
224 224 self.write_connection_file()
225 225
226 226 # save kwargs for use in restart
227 227 self._launch_args = kw.copy()
228 228 # build the Popen cmd
229 229 extra_arguments = kw.pop('extra_arguments', [])
230 230 kernel_cmd = self.format_kernel_cmd(extra_arguments=extra_arguments)
231 231 if self.kernel_cmd:
232 232 # If kernel_cmd has been set manually, don't refer to a kernel spec
233 233 env = os.environ
234 234 else:
235 235 # Environment variables from kernel spec are added to os.environ
236 236 env = os.environ.copy()
237 237 env.update(self.kernel_spec.env or {})
238 238 # launch the kernel subprocess
239 239 self.kernel = self._launch_kernel(kernel_cmd, env=env,
240 240 ipython_kernel=self.ipython_kernel,
241 241 **kw)
242 242 self.start_restarter()
243 243 self._connect_control_socket()
244 244
245 245 def request_shutdown(self, restart=False):
246 246 """Send a shutdown request via control channel
247 247
248 248 On Windows, this just kills kernels instead, because the shutdown
249 249 messages don't work.
250 250 """
251 251 content = dict(restart=restart)
252 252 msg = self.session.msg("shutdown_request", content=content)
253 253 self.session.send(self._control_socket, msg)
254 254
255 255 def finish_shutdown(self, waittime=1, pollinterval=0.1):
256 256 """Wait for kernel shutdown, then kill process if it doesn't shutdown.
257 257
258 258 This does not send shutdown requests - use :meth:`request_shutdown`
259 259 first.
260 260 """
261 261 for i in range(int(waittime/pollinterval)):
262 262 if self.is_alive():
263 263 time.sleep(pollinterval)
264 264 else:
265 265 break
266 266 else:
267 267 # OK, we've waited long enough.
268 268 if self.has_kernel:
269 269 self._kill_kernel()
270 270
271 271 def cleanup(self, connection_file=True):
272 272 """Clean up resources when the kernel is shut down"""
273 273 if connection_file:
274 274 self.cleanup_connection_file()
275 275
276 276 self.cleanup_ipc_files()
277 277 self._close_control_socket()
278 278
279 279 def shutdown_kernel(self, now=False, restart=False):
280 280 """Attempts to the stop the kernel process cleanly.
281 281
282 282 This attempts to shutdown the kernels cleanly by:
283 283
284 284 1. Sending it a shutdown message over the shell channel.
285 285 2. If that fails, the kernel is shutdown forcibly by sending it
286 286 a signal.
287 287
288 288 Parameters
289 289 ----------
290 290 now : bool
291 291 Should the kernel be forcible killed *now*. This skips the
292 292 first, nice shutdown attempt.
293 293 restart: bool
294 294 Will this kernel be restarted after it is shutdown. When this
295 295 is True, connection files will not be cleaned up.
296 296 """
297 297 # Stop monitoring for restarting while we shutdown.
298 298 self.stop_restarter()
299 299
300 300 if now:
301 301 self._kill_kernel()
302 302 else:
303 303 self.request_shutdown(restart=restart)
304 304 # Don't send any additional kernel kill messages immediately, to give
305 305 # the kernel a chance to properly execute shutdown actions. Wait for at
306 306 # most 1s, checking every 0.1s.
307 307 self.finish_shutdown()
308 308
309 309 self.cleanup(connection_file=not restart)
310 310
311 311 def restart_kernel(self, now=False, **kw):
312 312 """Restarts a kernel with the arguments that were used to launch it.
313 313
314 314 If the old kernel was launched with random ports, the same ports will be
315 315 used for the new kernel. The same connection file is used again.
316 316
317 317 Parameters
318 318 ----------
319 319 now : bool, optional
320 320 If True, the kernel is forcefully restarted *immediately*, without
321 321 having a chance to do any cleanup action. Otherwise the kernel is
322 322 given 1s to clean up before a forceful restart is issued.
323 323
324 324 In all cases the kernel is restarted, the only difference is whether
325 325 it is given a chance to perform a clean shutdown or not.
326 326
327 327 **kw : optional
328 328 Any options specified here will overwrite those used to launch the
329 329 kernel.
330 330 """
331 331 if self._launch_args is None:
332 332 raise RuntimeError("Cannot restart the kernel. "
333 333 "No previous call to 'start_kernel'.")
334 334 else:
335 335 # Stop currently running kernel.
336 336 self.shutdown_kernel(now=now, restart=True)
337 337
338 338 # Start new kernel.
339 339 self._launch_args.update(kw)
340 340 self.start_kernel(**self._launch_args)
341 341
342 342 @property
343 343 def has_kernel(self):
344 344 """Has a kernel been started that we are managing."""
345 345 return self.kernel is not None
346 346
347 347 def _kill_kernel(self):
348 348 """Kill the running kernel.
349 349
350 350 This is a private method, callers should use shutdown_kernel(now=True).
351 351 """
352 352 if self.has_kernel:
353 353
354 354 # Signal the kernel to terminate (sends SIGKILL on Unix and calls
355 355 # TerminateProcess() on Win32).
356 356 try:
357 357 self.kernel.kill()
358 358 except OSError as e:
359 359 # In Windows, we will get an Access Denied error if the process
360 360 # has already terminated. Ignore it.
361 361 if sys.platform == 'win32':
362 362 if e.winerror != 5:
363 363 raise
364 364 # On Unix, we may get an ESRCH error if the process has already
365 365 # terminated. Ignore it.
366 366 else:
367 367 from errno import ESRCH
368 368 if e.errno != ESRCH:
369 369 raise
370 370
371 371 # Block until the kernel terminates.
372 372 self.kernel.wait()
373 373 self.kernel = None
374 374 else:
375 375 raise RuntimeError("Cannot kill kernel. No kernel is running!")
376 376
377 377 def interrupt_kernel(self):
378 378 """Interrupts the kernel by sending it a signal.
379 379
380 380 Unlike ``signal_kernel``, this operation is well supported on all
381 381 platforms.
382 382 """
383 383 if self.has_kernel:
384 384 if sys.platform == 'win32':
385 385 from .zmq.parentpoller import ParentPollerWindows as Poller
386 386 Poller.send_interrupt(self.kernel.win32_interrupt_event)
387 387 else:
388 388 self.kernel.send_signal(signal.SIGINT)
389 389 else:
390 390 raise RuntimeError("Cannot interrupt kernel. No kernel is running!")
391 391
392 392 def signal_kernel(self, signum):
393 393 """Sends a signal to the kernel.
394 394
395 395 Note that since only SIGTERM is supported on Windows, this function is
396 396 only useful on Unix systems.
397 397 """
398 398 if self.has_kernel:
399 399 self.kernel.send_signal(signum)
400 400 else:
401 401 raise RuntimeError("Cannot signal kernel. No kernel is running!")
402 402
403 403 def is_alive(self):
404 404 """Is the kernel process still running?"""
405 405 if self.has_kernel:
406 406 if self.kernel.poll() is None:
407 407 return True
408 408 else:
409 409 return False
410 410 else:
411 411 # we don't have a kernel
412 412 return False
413 413
414 414
415 415 KernelManagerABC.register(KernelManager)
416 416
417 417
418 418 def start_new_kernel(startup_timeout=60, kernel_name='python', **kwargs):
419 419 """Start a new kernel, and return its Manager and Client"""
420 420 km = KernelManager(kernel_name=kernel_name)
421 421 km.start_kernel(**kwargs)
422 422 kc = km.client()
423 423 kc.start_channels()
424 kc.wait_for_ready()
424 425
425 kc.kernel_info()
426 kc.get_shell_msg(block=True, timeout=startup_timeout)
427
428 # Flush channels
429 for channel in (kc.shell_channel, kc.iopub_channel):
430 while True:
431 try:
432 channel.get_msg(block=True, timeout=0.1)
433 except Empty:
434 break
435 426 return km, kc
436 427
437 428 @contextmanager
438 429 def run_kernel(**kwargs):
439 430 """Context manager to create a kernel in a subprocess.
440 431
441 432 The kernel is shut down when the context exits.
442 433
443 434 Returns
444 435 -------
445 436 kernel_client: connected KernelClient instance
446 437 """
447 438 km, kc = start_new_kernel(**kwargs)
448 439 try:
449 440 yield kc
450 441 finally:
451 442 kc.stop_channels()
452 443 km.shutdown_kernel(now=True)
@@ -1,198 +1,199 b''
1 1 """test IPython.embed_kernel()"""
2 2
3 3 #-------------------------------------------------------------------------------
4 4 # Copyright (C) 2012 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING, distributed as part of this software.
8 8 #-------------------------------------------------------------------------------
9 9
10 10 #-------------------------------------------------------------------------------
11 11 # Imports
12 12 #-------------------------------------------------------------------------------
13 13
14 14 import os
15 15 import shutil
16 16 import sys
17 17 import tempfile
18 18 import time
19 19
20 20 from contextlib import contextmanager
21 21 from subprocess import Popen, PIPE
22 22
23 23 import nose.tools as nt
24 24
25 25 from IPython.kernel import BlockingKernelClient
26 26 from IPython.utils import path, py3compat
27 27 from IPython.utils.py3compat import unicode_type
28 28
29 29 #-------------------------------------------------------------------------------
30 30 # Tests
31 31 #-------------------------------------------------------------------------------
32 32
33 33 SETUP_TIMEOUT = 60
34 34 TIMEOUT = 15
35 35
36 36 def setup():
37 37 """setup temporary IPYTHONDIR for tests"""
38 38 global IPYTHONDIR
39 39 global env
40 40 global save_get_ipython_dir
41 41
42 42 IPYTHONDIR = tempfile.mkdtemp()
43 43
44 44 env = os.environ.copy()
45 45 env["IPYTHONDIR"] = IPYTHONDIR
46 46
47 47 save_get_ipython_dir = path.get_ipython_dir
48 48 path.get_ipython_dir = lambda : IPYTHONDIR
49 49
50 50
51 51 def teardown():
52 52 path.get_ipython_dir = save_get_ipython_dir
53 53
54 54 try:
55 55 shutil.rmtree(IPYTHONDIR)
56 56 except (OSError, IOError):
57 57 # no such file
58 58 pass
59 59
60 60
61 61 @contextmanager
62 62 def setup_kernel(cmd):
63 63 """start an embedded kernel in a subprocess, and wait for it to be ready
64 64
65 65 Returns
66 66 -------
67 67 kernel_manager: connected KernelManager instance
68 68 """
69 69 kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE, env=env)
70 70 connection_file = os.path.join(IPYTHONDIR,
71 71 'profile_default',
72 72 'security',
73 73 'kernel-%i.json' % kernel.pid
74 74 )
75 75 # wait for connection file to exist, timeout after 5s
76 76 tic = time.time()
77 77 while not os.path.exists(connection_file) \
78 78 and kernel.poll() is None \
79 79 and time.time() < tic + SETUP_TIMEOUT:
80 80 time.sleep(0.1)
81 81
82 82 if kernel.poll() is not None:
83 83 o,e = kernel.communicate()
84 84 e = py3compat.cast_unicode(e)
85 85 raise IOError("Kernel failed to start:\n%s" % e)
86 86
87 87 if not os.path.exists(connection_file):
88 88 if kernel.poll() is None:
89 89 kernel.terminate()
90 90 raise IOError("Connection file %r never arrived" % connection_file)
91 91
92 92 client = BlockingKernelClient(connection_file=connection_file)
93 93 client.load_connection_file()
94 94 client.start_channels()
95 client.wait_for_ready()
95 96
96 97 try:
97 98 yield client
98 99 finally:
99 100 client.stop_channels()
100 101 kernel.terminate()
101 102
102 103 def test_embed_kernel_basic():
103 104 """IPython.embed_kernel() is basically functional"""
104 105 cmd = '\n'.join([
105 106 'from IPython import embed_kernel',
106 107 'def go():',
107 108 ' a=5',
108 109 ' b="hi there"',
109 110 ' embed_kernel()',
110 111 'go()',
111 112 '',
112 113 ])
113 114
114 115 with setup_kernel(cmd) as client:
115 116 # oinfo a (int)
116 117 msg_id = client.inspect('a')
117 118 msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
118 119 content = msg['content']
119 120 nt.assert_true(content['found'])
120 121
121 122 msg_id = client.execute("c=a*2")
122 123 msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
123 124 content = msg['content']
124 125 nt.assert_equal(content['status'], u'ok')
125 126
126 127 # oinfo c (should be 10)
127 128 msg_id = client.inspect('c')
128 129 msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
129 130 content = msg['content']
130 131 nt.assert_true(content['found'])
131 132 text = content['data']['text/plain']
132 133 nt.assert_in('10', text)
133 134
134 135 def test_embed_kernel_namespace():
135 136 """IPython.embed_kernel() inherits calling namespace"""
136 137 cmd = '\n'.join([
137 138 'from IPython import embed_kernel',
138 139 'def go():',
139 140 ' a=5',
140 141 ' b="hi there"',
141 142 ' embed_kernel()',
142 143 'go()',
143 144 '',
144 145 ])
145 146
146 147 with setup_kernel(cmd) as client:
147 148 # oinfo a (int)
148 149 msg_id = client.inspect('a')
149 150 msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
150 151 content = msg['content']
151 152 nt.assert_true(content['found'])
152 153 text = content['data']['text/plain']
153 154 nt.assert_in(u'5', text)
154 155
155 156 # oinfo b (str)
156 157 msg_id = client.inspect('b')
157 158 msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
158 159 content = msg['content']
159 160 nt.assert_true(content['found'])
160 161 text = content['data']['text/plain']
161 162 nt.assert_in(u'hi there', text)
162 163
163 164 # oinfo c (undefined)
164 165 msg_id = client.inspect('c')
165 166 msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
166 167 content = msg['content']
167 168 nt.assert_false(content['found'])
168 169
169 170 def test_embed_kernel_reentrant():
170 171 """IPython.embed_kernel() can be called multiple times"""
171 172 cmd = '\n'.join([
172 173 'from IPython import embed_kernel',
173 174 'count = 0',
174 175 'def go():',
175 176 ' global count',
176 177 ' embed_kernel()',
177 178 ' count = count + 1',
178 179 '',
179 180 'while True:'
180 181 ' go()',
181 182 '',
182 183 ])
183 184
184 185 with setup_kernel(cmd) as client:
185 186 for i in range(5):
186 187 msg_id = client.inspect('count')
187 188 msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
188 189 content = msg['content']
189 190 nt.assert_true(content['found'])
190 191 text = content['data']['text/plain']
191 192 nt.assert_in(unicode_type(i), text)
192 193
193 194 # exit from embed_kernel
194 195 client.execute("get_ipython().exit_now = True")
195 196 msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
196 197 time.sleep(0.2)
197 198
198 199
@@ -1,373 +1,381 b''
1 1 """ Defines a KernelClient that provides signals and slots.
2 2 """
3 3 import atexit
4 4 import errno
5 5 from threading import Thread
6 6 import time
7 7
8 8 import zmq
9 9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 10 # during garbage collection of threads at exit:
11 11 from zmq import ZMQError
12 12 from zmq.eventloop import ioloop, zmqstream
13 13
14 14 from IPython.external.qt import QtCore
15 15
16 16 # Local imports
17 17 from IPython.utils.traitlets import Type
18 18 from IPython.kernel.channels import HBChannel,\
19 19 make_shell_socket, make_iopub_socket, make_stdin_socket
20 20 from IPython.kernel import KernelClient
21 21
22 22 from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin)
23 23 from .util import SuperQObject
24 24
25 25 class QtHBChannel(QtHBChannelMixin, HBChannel):
26 26 pass
27 27
28 28 from IPython.core.release import kernel_protocol_version_info
29 29
30 30 from IPython.kernel.channelsabc import (
31 31 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
32 32 )
33 33 from IPython.utils.py3compat import string_types, iteritems
34 34
35 35 major_protocol_version = kernel_protocol_version_info[0]
36 36
37 37 class InvalidPortNumber(Exception):
38 38 pass
39 39
40 40 # some utilities to validate message structure, these might get moved elsewhere
41 41 # if they prove to have more generic utility
42 42
43 43
44 44 def validate_string_dict(dct):
45 45 """Validate that the input is a dict with string keys and values.
46 46
47 47 Raises ValueError if not."""
48 48 for k,v in iteritems(dct):
49 49 if not isinstance(k, string_types):
50 50 raise ValueError('key %r in dict must be a string' % k)
51 51 if not isinstance(v, string_types):
52 52 raise ValueError('value %r in dict must be a string' % v)
53 53
54 54
55 55
56 56 class QtZMQSocketChannel(SuperQObject, Thread):
57 57 """The base class for the channels that use ZMQ sockets."""
58 58 context = None
59 59 session = None
60 60 socket = None
61 61 ioloop = None
62 62 stream = None
63 63 _address = None
64 64 _exiting = False
65 65 proxy_methods = []
66 66
67 67 # Emitted when the channel is started.
68 68 started = QtCore.Signal()
69 69
70 70 # Emitted when the channel is stopped.
71 71 stopped = QtCore.Signal()
72 72
73 73 message_received = QtCore.Signal(object)
74 74
75 75 #---------------------------------------------------------------------------
76 76 # InProcessChannel interface
77 77 #---------------------------------------------------------------------------
78 78
79 79 def call_handlers_later(self, *args, **kwds):
80 80 """ Call the message handlers later.
81 81 """
82 82 do_later = lambda: self.call_handlers(*args, **kwds)
83 83 QtCore.QTimer.singleShot(0, do_later)
84 84
85 85 def process_events(self):
86 86 """ Process any pending GUI events.
87 87 """
88 88 QtCore.QCoreApplication.instance().processEvents()
89 89
90 90 def __init__(self, context, session, address):
91 91 """Create a channel.
92 92
93 93 Parameters
94 94 ----------
95 95 context : :class:`zmq.Context`
96 96 The ZMQ context to use.
97 97 session : :class:`session.Session`
98 98 The session to use.
99 99 address : zmq url
100 100 Standard (ip, port) tuple that the kernel is listening on.
101 101 """
102 102 super(QtZMQSocketChannel, self).__init__()
103 103 self.daemon = True
104 104
105 105 self.context = context
106 106 self.session = session
107 107 if isinstance(address, tuple):
108 108 if address[1] == 0:
109 109 message = 'The port number for a channel cannot be 0.'
110 110 raise InvalidPortNumber(message)
111 111 address = "tcp://%s:%i" % address
112 112 self._address = address
113 113 atexit.register(self._notice_exit)
114 114
115 115 def _notice_exit(self):
116 116 self._exiting = True
117 117
118 118 def _run_loop(self):
119 119 """Run my loop, ignoring EINTR events in the poller"""
120 120 while True:
121 121 try:
122 122 self.ioloop.start()
123 123 except ZMQError as e:
124 124 if e.errno == errno.EINTR:
125 125 continue
126 126 else:
127 127 raise
128 128 except Exception:
129 129 if self._exiting:
130 130 break
131 131 else:
132 132 raise
133 133 else:
134 134 break
135 135
136 136 def start(self):
137 137 """ Reimplemented to emit signal.
138 138 """
139 139 super(QtZMQSocketChannel, self).start()
140 140 self.started.emit()
141 141
142 142 def stop(self):
143 143 """Stop the channel's event loop and join its thread.
144 144
145 145 This calls :meth:`~threading.Thread.join` and returns when the thread
146 146 terminates. :class:`RuntimeError` will be raised if
147 147 :meth:`~threading.Thread.start` is called again.
148 148 """
149 149 if self.ioloop is not None:
150 150 self.ioloop.stop()
151 151 self.join()
152 152 self.close()
153 153 self.stopped.emit()
154 154
155 155 def close(self):
156 156 if self.ioloop is not None:
157 157 try:
158 158 self.ioloop.close(all_fds=True)
159 159 except Exception:
160 160 pass
161 161 if self.socket is not None:
162 162 try:
163 163 self.socket.close(linger=0)
164 164 except Exception:
165 165 pass
166 166 self.socket = None
167 167
168 168 @property
169 169 def address(self):
170 170 """Get the channel's address as a zmq url string.
171 171
172 172 These URLS have the form: 'tcp://127.0.0.1:5555'.
173 173 """
174 174 return self._address
175 175
176 176 def _queue_send(self, msg):
177 177 """Queue a message to be sent from the IOLoop's thread.
178 178
179 179 Parameters
180 180 ----------
181 181 msg : message to send
182 182
183 183 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
184 184 thread control of the action.
185 185 """
186 186 def thread_send():
187 187 self.session.send(self.stream, msg)
188 188 self.ioloop.add_callback(thread_send)
189 189
190 190 def _handle_recv(self, msg):
191 191 """Callback for stream.on_recv.
192 192
193 193 Unpacks message, and calls handlers with it.
194 194 """
195 195 ident,smsg = self.session.feed_identities(msg)
196 196 msg = self.session.deserialize(smsg)
197 197 self.call_handlers(msg)
198 198
199 199 def call_handlers(self, msg):
200 200 """This method is called in the ioloop thread when a message arrives.
201 201
202 202 Subclasses should override this method to handle incoming messages.
203 203 It is important to remember that this method is called in the thread
204 204 so that some logic must be done to ensure that the application level
205 205 handlers are called in the application thread.
206 206 """
207 207 # Emit the generic signal.
208 208 self.message_received.emit(msg)
209 209
210 210
211 211 class QtShellChannel(QtZMQSocketChannel):
212 212 """The shell channel for issuing request/replies to the kernel."""
213 213
214 214 # Emitted when a reply has been received for the corresponding request type.
215 215 execute_reply = QtCore.Signal(object)
216 216 complete_reply = QtCore.Signal(object)
217 217 inspect_reply = QtCore.Signal(object)
218 218 history_reply = QtCore.Signal(object)
219 219 kernel_info_reply = QtCore.Signal(object)
220 220
221 221 def __init__(self, context, session, address):
222 222 super(QtShellChannel, self).__init__(context, session, address)
223 223 self.ioloop = ioloop.IOLoop()
224 224
225 225 def run(self):
226 226 """The thread's main activity. Call start() instead."""
227 227 self.socket = make_shell_socket(self.context, self.session.bsession, self.address)
228 228 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
229 229 self.stream.on_recv(self._handle_recv)
230 230 self._run_loop()
231 231
232 232 def call_handlers(self, msg):
233 233 super(QtShellChannel, self).call_handlers(msg)
234 234
235 235 # Catch kernel_info_reply for message spec adaptation
236 236 msg_type = msg['header']['msg_type']
237 237 if msg_type == 'kernel_info_reply':
238 238 self._handle_kernel_info_reply(msg)
239 239
240 240 # Emit specific signals
241 241 signal = getattr(self, msg_type, None)
242 242 if signal:
243 243 signal.emit(msg)
244 244
245 245 def _handle_kernel_info_reply(self, msg):
246 246 """handle kernel info reply
247 247
248 248 sets protocol adaptation version
249 249 """
250 250 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
251 251 if adapt_version != major_protocol_version:
252 252 self.session.adapt_version = adapt_version
253 253
254 254
255 255 class QtIOPubChannel(QtZMQSocketChannel):
256 256 """The iopub channel which listens for messages that the kernel publishes.
257 257
258 258 This channel is where all output is published to frontends.
259 259 """
260 260 # Emitted when a message of type 'stream' is received.
261 261 stream_received = QtCore.Signal(object)
262 262
263 263 # Emitted when a message of type 'execute_input' is received.
264 264 execute_input_received = QtCore.Signal(object)
265 265
266 266 # Emitted when a message of type 'execute_result' is received.
267 267 execute_result_received = QtCore.Signal(object)
268 268
269 269 # Emitted when a message of type 'error' is received.
270 270 error_received = QtCore.Signal(object)
271 271
272 272 # Emitted when a message of type 'display_data' is received
273 273 display_data_received = QtCore.Signal(object)
274 274
275 275 # Emitted when a crash report message is received from the kernel's
276 276 # last-resort sys.excepthook.
277 277 crash_received = QtCore.Signal(object)
278 278
279 279 # Emitted when a shutdown is noticed.
280 280 shutdown_reply_received = QtCore.Signal(object)
281 281
282 282 def __init__(self, context, session, address):
283 283 super(QtIOPubChannel, self).__init__(context, session, address)
284 284 self.ioloop = ioloop.IOLoop()
285 285
286 286 def run(self):
287 287 """The thread's main activity. Call start() instead."""
288 288 self.socket = make_iopub_socket(self.context, self.session.bsession, self.address)
289 289 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
290 290 self.stream.on_recv(self._handle_recv)
291 291 self._run_loop()
292 292
293 293 def call_handlers(self, msg):
294 294 super(QtIOPubChannel, self).call_handlers(msg)
295 295
296 296 # Emit signals for specialized message types.
297 297 msg_type = msg['header']['msg_type']
298 298 signal = getattr(self, msg_type + '_received', None)
299 299 if signal:
300 300 signal.emit(msg)
301 301
302 302 def flush(self, timeout=1.0):
303 303 """Immediately processes all pending messages on the iopub channel.
304 304
305 305 Callers should use this method to ensure that :meth:`call_handlers`
306 306 has been called for all messages that have been received on the
307 307 0MQ SUB socket of this channel.
308 308
309 309 This method is thread safe.
310 310
311 311 Parameters
312 312 ----------
313 313 timeout : float, optional
314 314 The maximum amount of time to spend flushing, in seconds. The
315 315 default is one second.
316 316 """
317 317 # We do the IOLoop callback process twice to ensure that the IOLoop
318 318 # gets to perform at least one full poll.
319 319 stop_time = time.time() + timeout
320 320 for i in range(2):
321 321 self._flushed = False
322 322 self.ioloop.add_callback(self._flush)
323 323 while not self._flushed and time.time() < stop_time:
324 324 time.sleep(0.01)
325 325
326 326 def _flush(self):
327 327 """Callback for :method:`self.flush`."""
328 328 self.stream.flush()
329 329 self._flushed = True
330 330
331 331
332 332 class QtStdInChannel(QtZMQSocketChannel):
333 333 """The stdin channel to handle raw_input requests that the kernel makes."""
334 334
335 335 msg_queue = None
336 336 proxy_methods = ['input']
337 337
338 338 # Emitted when an input request is received.
339 339 input_requested = QtCore.Signal(object)
340 340
341 341 def __init__(self, context, session, address):
342 342 super(QtStdInChannel, self).__init__(context, session, address)
343 343 self.ioloop = ioloop.IOLoop()
344 344
345 345 def run(self):
346 346 """The thread's main activity. Call start() instead."""
347 347 self.socket = make_stdin_socket(self.context, self.session.bsession, self.address)
348 348 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
349 349 self.stream.on_recv(self._handle_recv)
350 350 self._run_loop()
351 351
352 352 def call_handlers(self, msg):
353 353 super(QtStdInChannel, self).call_handlers(msg)
354 354
355 355 # Emit signals for specialized message types.
356 356 msg_type = msg['header']['msg_type']
357 357 if msg_type == 'input_request':
358 358 self.input_requested.emit(msg)
359 359
360 360
361 361 ShellChannelABC.register(QtShellChannel)
362 362 IOPubChannelABC.register(QtIOPubChannel)
363 363 StdInChannelABC.register(QtStdInChannel)
364 364
365 365
366 366 class QtKernelClient(QtKernelClientMixin, KernelClient):
367 367 """ A KernelClient that provides signals and slots.
368 368 """
369 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
370 if shell:
371 self.shell_channel.kernel_info_reply.connect(self._handle_kernel_info_reply)
372 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
373
374 def _handle_kernel_info_reply(self, msg):
375 super(QtKernelClient, self)._handle_kernel_info_reply(msg)
376 self.shell_channel.kernel_info_reply.disconnect(self._handle_kernel_info_reply)
369 377
370 378 iopub_channel_class = Type(QtIOPubChannel)
371 379 shell_channel_class = Type(QtShellChannel)
372 380 stdin_channel_class = Type(QtStdInChannel)
373 381 hb_channel_class = Type(QtHBChannel)
General Comments 0
You need to be logged in to leave comments. Login now