##// END OF EJS Templates
Factor out code to set up ZMQ sockets
Thomas Kluyver -
Show More
@@ -1,391 +1,381 b''
1 1 """Blocking channels
2 2
3 3 Useful for test suites and blocking terminal interfaces.
4 4 """
5 5
6 6 # Copyright (c) IPython Development Team.
7 7 # Distributed under the terms of the Modified BSD License.
8 8
9 9 import atexit
10 10 import zmq
11 11
12 12 try:
13 13 from queue import Queue, Empty # Py 3
14 14 except ImportError:
15 15 from Queue import Queue, Empty # Py 2
16 16
17 from IPython.kernel.channels import IOPubChannel, HBChannel, \
18 ShellChannel, StdInChannel, InvalidPortNumber, major_protocol_version
17 from IPython.kernel.channels import HBChannel,\
18 make_iopub_socket, make_shell_socket, make_stdin_socket,\
19 InvalidPortNumber, major_protocol_version
19 20 from IPython.utils.py3compat import string_types, iteritems
20 21
21 22 # some utilities to validate message structure, these might get moved elsewhere
22 23 # if they prove to have more generic utility
23 24
24 25 def validate_string_list(lst):
25 26 """Validate that the input is a list of strings.
26 27
27 28 Raises ValueError if not."""
28 29 if not isinstance(lst, list):
29 30 raise ValueError('input %r must be a list' % lst)
30 31 for x in lst:
31 32 if not isinstance(x, string_types):
32 33 raise ValueError('element %r in list must be a string' % x)
33 34
34 35
35 36 def validate_string_dict(dct):
36 37 """Validate that the input is a dict with string keys and values.
37 38
38 39 Raises ValueError if not."""
39 40 for k,v in iteritems(dct):
40 41 if not isinstance(k, string_types):
41 42 raise ValueError('key %r in dict must be a string' % k)
42 43 if not isinstance(v, string_types):
43 44 raise ValueError('value %r in dict must be a string' % v)
44 45
45 46
46 47 class ZMQSocketChannel(object):
47 48 """The base class for the channels that use ZMQ sockets."""
48 49 context = None
49 50 session = None
50 51 socket = None
51 52 ioloop = None
52 53 stream = None
53 54 _address = None
54 55 _exiting = False
55 56 proxy_methods = []
56 57
57 58 def __init__(self, context, session, address):
58 59 """Create a channel.
59 60
60 61 Parameters
61 62 ----------
62 63 context : :class:`zmq.Context`
63 64 The ZMQ context to use.
64 65 session : :class:`session.Session`
65 66 The session to use.
66 67 address : zmq url
67 68 Standard (ip, port) tuple that the kernel is listening on.
68 69 """
69 70 super(ZMQSocketChannel, self).__init__()
70 71 self.daemon = True
71 72
72 73 self.context = context
73 74 self.session = session
74 75 if isinstance(address, tuple):
75 76 if address[1] == 0:
76 77 message = 'The port number for a channel cannot be 0.'
77 78 raise InvalidPortNumber(message)
78 79 address = "tcp://%s:%i" % address
79 80 self._address = address
80 81
81 82 def _recv(self, **kwargs):
82 83 msg = self.socket.recv_multipart(**kwargs)
83 84 ident,smsg = self.session.feed_identities(msg)
84 85 return self.session.deserialize(smsg)
85 86
86 87 def get_msg(self, block=True, timeout=None):
87 88 """ Gets a message if there is one that is ready. """
88 89 if block:
89 90 if timeout is not None:
90 91 timeout *= 1000 # seconds to ms
91 92 ready = self.socket.poll(timeout)
92 93 else:
93 94 ready = self.socket.poll(timeout=0)
94 95
95 96 if ready:
96 97 return self._recv()
97 98 else:
98 99 raise Empty
99 100
100 101 def get_msgs(self):
101 102 """ Get all messages that are currently ready. """
102 103 msgs = []
103 104 while True:
104 105 try:
105 106 msgs.append(self.get_msg(block=False))
106 107 except Empty:
107 108 break
108 109 return msgs
109 110
110 111 def msg_ready(self):
111 112 """ Is there a message that has been received? """
112 113 return bool(self.socket.poll(timeout=0))
113 114
114 115 def close(self):
115 116 if self.socket is not None:
116 117 try:
117 118 self.socket.close(linger=0)
118 119 except Exception:
119 120 pass
120 121 self.socket = None
121 122 stop = close
122 123
123 124 def is_alive(self):
124 125 return (self.socket is not None)
125 126
126 127 @property
127 128 def address(self):
128 129 """Get the channel's address as a zmq url string.
129 130
130 131 These URLS have the form: 'tcp://127.0.0.1:5555'.
131 132 """
132 133 return self._address
133 134
134 135 def _queue_send(self, msg):
135 136 """Pass a message to the ZMQ socket to send
136 137 """
137 138 self.session.send(self.socket, msg)
138 139
139 140
140 141 class BlockingShellChannel(ZMQSocketChannel):
141 142 """The shell channel for issuing request/replies to the kernel."""
142 143
143 144 command_queue = None
144 145 # flag for whether execute requests should be allowed to call raw_input:
145 146 allow_stdin = True
146 147 proxy_methods = [
147 148 'execute',
148 149 'complete',
149 150 'inspect',
150 151 'history',
151 152 'kernel_info',
152 153 'shutdown',
153 154 'is_complete',
154 155 ]
155 156
156 157 def start(self):
157 self.socket = self.context.socket(zmq.DEALER)
158 self.socket.linger = 1000
159 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
160 self.socket.connect(self.address)
158 self.socket = make_stdin_socket(self.context, self.session.bsession, self.address)
161 159
162 160 def execute(self, code, silent=False, store_history=True,
163 161 user_expressions=None, allow_stdin=None):
164 162 """Execute code in the kernel.
165 163
166 164 Parameters
167 165 ----------
168 166 code : str
169 167 A string of Python code.
170 168
171 169 silent : bool, optional (default False)
172 170 If set, the kernel will execute the code as quietly possible, and
173 171 will force store_history to be False.
174 172
175 173 store_history : bool, optional (default True)
176 174 If set, the kernel will store command history. This is forced
177 175 to be False if silent is True.
178 176
179 177 user_expressions : dict, optional
180 178 A dict mapping names to expressions to be evaluated in the user's
181 179 dict. The expression values are returned as strings formatted using
182 180 :func:`repr`.
183 181
184 182 allow_stdin : bool, optional (default self.allow_stdin)
185 183 Flag for whether the kernel can send stdin requests to frontends.
186 184
187 185 Some frontends (e.g. the Notebook) do not support stdin requests.
188 186 If raw_input is called from code executed from such a frontend, a
189 187 StdinNotImplementedError will be raised.
190 188
191 189 Returns
192 190 -------
193 191 The msg_id of the message sent.
194 192 """
195 193 if user_expressions is None:
196 194 user_expressions = {}
197 195 if allow_stdin is None:
198 196 allow_stdin = self.allow_stdin
199 197
200 198
201 199 # Don't waste network traffic if inputs are invalid
202 200 if not isinstance(code, string_types):
203 201 raise ValueError('code %r must be a string' % code)
204 202 validate_string_dict(user_expressions)
205 203
206 204 # Create class for content/msg creation. Related to, but possibly
207 205 # not in Session.
208 206 content = dict(code=code, silent=silent, store_history=store_history,
209 207 user_expressions=user_expressions,
210 208 allow_stdin=allow_stdin,
211 209 )
212 210 msg = self.session.msg('execute_request', content)
213 211 self._queue_send(msg)
214 212 return msg['header']['msg_id']
215 213
216 214 def complete(self, code, cursor_pos=None):
217 215 """Tab complete text in the kernel's namespace.
218 216
219 217 Parameters
220 218 ----------
221 219 code : str
222 220 The context in which completion is requested.
223 221 Can be anything between a variable name and an entire cell.
224 222 cursor_pos : int, optional
225 223 The position of the cursor in the block of code where the completion was requested.
226 224 Default: ``len(code)``
227 225
228 226 Returns
229 227 -------
230 228 The msg_id of the message sent.
231 229 """
232 230 if cursor_pos is None:
233 231 cursor_pos = len(code)
234 232 content = dict(code=code, cursor_pos=cursor_pos)
235 233 msg = self.session.msg('complete_request', content)
236 234 self._queue_send(msg)
237 235 return msg['header']['msg_id']
238 236
239 237 def inspect(self, code, cursor_pos=None, detail_level=0):
240 238 """Get metadata information about an object in the kernel's namespace.
241 239
242 240 It is up to the kernel to determine the appropriate object to inspect.
243 241
244 242 Parameters
245 243 ----------
246 244 code : str
247 245 The context in which info is requested.
248 246 Can be anything between a variable name and an entire cell.
249 247 cursor_pos : int, optional
250 248 The position of the cursor in the block of code where the info was requested.
251 249 Default: ``len(code)``
252 250 detail_level : int, optional
253 251 The level of detail for the introspection (0-2)
254 252
255 253 Returns
256 254 -------
257 255 The msg_id of the message sent.
258 256 """
259 257 if cursor_pos is None:
260 258 cursor_pos = len(code)
261 259 content = dict(code=code, cursor_pos=cursor_pos,
262 260 detail_level=detail_level,
263 261 )
264 262 msg = self.session.msg('inspect_request', content)
265 263 self._queue_send(msg)
266 264 return msg['header']['msg_id']
267 265
268 266 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
269 267 """Get entries from the kernel's history list.
270 268
271 269 Parameters
272 270 ----------
273 271 raw : bool
274 272 If True, return the raw input.
275 273 output : bool
276 274 If True, then return the output as well.
277 275 hist_access_type : str
278 276 'range' (fill in session, start and stop params), 'tail' (fill in n)
279 277 or 'search' (fill in pattern param).
280 278
281 279 session : int
282 280 For a range request, the session from which to get lines. Session
283 281 numbers are positive integers; negative ones count back from the
284 282 current session.
285 283 start : int
286 284 The first line number of a history range.
287 285 stop : int
288 286 The final (excluded) line number of a history range.
289 287
290 288 n : int
291 289 The number of lines of history to get for a tail request.
292 290
293 291 pattern : str
294 292 The glob-syntax pattern for a search request.
295 293
296 294 Returns
297 295 -------
298 296 The msg_id of the message sent.
299 297 """
300 298 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
301 299 **kwargs)
302 300 msg = self.session.msg('history_request', content)
303 301 self._queue_send(msg)
304 302 return msg['header']['msg_id']
305 303
306 304 def kernel_info(self):
307 305 """Request kernel info."""
308 306 msg = self.session.msg('kernel_info_request')
309 307 self._queue_send(msg)
310 308 return msg['header']['msg_id']
311 309
312 310 def _handle_kernel_info_reply(self, msg):
313 311 """handle kernel info reply
314 312
315 313 sets protocol adaptation version
316 314 """
317 315 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
318 316 if adapt_version != major_protocol_version:
319 317 self.session.adapt_version = adapt_version
320 318
321 319 def shutdown(self, restart=False):
322 320 """Request an immediate kernel shutdown.
323 321
324 322 Upon receipt of the (empty) reply, client code can safely assume that
325 323 the kernel has shut down and it's safe to forcefully terminate it if
326 324 it's still alive.
327 325
328 326 The kernel will send the reply via a function registered with Python's
329 327 atexit module, ensuring it's truly done as the kernel is done with all
330 328 normal operation.
331 329 """
332 330 # Send quit message to kernel. Once we implement kernel-side setattr,
333 331 # this should probably be done that way, but for now this will do.
334 332 msg = self.session.msg('shutdown_request', {'restart':restart})
335 333 self._queue_send(msg)
336 334 return msg['header']['msg_id']
337 335
338 336 def is_complete(self, code):
339 337 msg = self.session.msg('is_complete_request', {'code': code})
340 338 self._queue_send(msg)
341 339 return msg['header']['msg_id']
342 340
343 341 def _recv(self, **kwargs):
344 342 # Listen for kernel_info_reply message to do protocol adaptation
345 343 msg = ZMQSocketChannel._recv(self, **kwargs)
346 344 if msg['msg_type'] == 'kernel_info_reply':
347 345 self._handle_kernel_info_reply(msg)
348 346 return msg
349 347
350 348
351 349 class BlockingIOPubChannel(ZMQSocketChannel):
352 350 """The iopub channel which listens for messages that the kernel publishes.
353 351
354 352 This channel is where all output is published to frontends.
355 353 """
356 354 def start(self):
357 self.socket = self.context.socket(zmq.SUB)
358 self.socket.linger = 1000
359 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
360 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
361 self.socket.connect(self.address)
362
355 self.socket = make_iopub_socket(self.context, self.session.bsession, self.address)
363 356
364 357 class BlockingStdInChannel(ZMQSocketChannel):
365 358 """The stdin channel to handle raw_input requests that the kernel makes."""
366 359 msg_queue = None
367 360 proxy_methods = ['input']
368 361
369 362 def start(self):
370 self.socket = self.context.socket(zmq.DEALER)
371 self.socket.linger = 1000
372 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
373 self.socket.connect(self.address)
363 self.socket = make_stdin_socket(self.context, self.session.bsession, self.address)
374 364
375 365 def input(self, string):
376 366 """Send a string of raw input to the kernel."""
377 367 content = dict(value=string)
378 368 msg = self.session.msg('input_reply', content)
379 369 self._queue_send(msg)
380 370
381 371
382 372 class BlockingHBChannel(HBChannel):
383 373
384 374 # This kernel needs quicker monitoring, shorten to 1 sec.
385 375 # less than 0.5s is unreliable, and will get occasional
386 376 # false reports of missed beats.
387 377 time_to_dead = 1.
388 378
389 379 def call_handlers(self, since_last_heartbeat):
390 380 """ Pause beating on missed heartbeat. """
391 381 pass
@@ -1,644 +1,665 b''
1 1 """Base classes to manage a Client's interaction with a running kernel"""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from __future__ import absolute_import
7 7
8 8 import atexit
9 9 import errno
10 10 from threading import Thread
11 11 import time
12 12
13 13 import zmq
14 14 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
15 15 # during garbage collection of threads at exit:
16 16 from zmq import ZMQError
17 17 from zmq.eventloop import ioloop, zmqstream
18 18
19 19 from IPython.core.release import kernel_protocol_version_info
20 20
21 21 from .channelsabc import (
22 22 ShellChannelABC, IOPubChannelABC,
23 23 HBChannelABC, StdInChannelABC,
24 24 )
25 25 from IPython.utils.py3compat import string_types, iteritems
26 26
27 27 #-----------------------------------------------------------------------------
28 28 # Constants and exceptions
29 29 #-----------------------------------------------------------------------------
30 30
31 31 major_protocol_version = kernel_protocol_version_info[0]
32 32
33 33 class InvalidPortNumber(Exception):
34 34 pass
35 35
36 36 #-----------------------------------------------------------------------------
37 37 # Utility functions
38 38 #-----------------------------------------------------------------------------
39 39
40 40 # some utilities to validate message structure, these might get moved elsewhere
41 41 # if they prove to have more generic utility
42 42
43 43 def validate_string_list(lst):
44 44 """Validate that the input is a list of strings.
45 45
46 46 Raises ValueError if not."""
47 47 if not isinstance(lst, list):
48 48 raise ValueError('input %r must be a list' % lst)
49 49 for x in lst:
50 50 if not isinstance(x, string_types):
51 51 raise ValueError('element %r in list must be a string' % x)
52 52
53 53
54 54 def validate_string_dict(dct):
55 55 """Validate that the input is a dict with string keys and values.
56 56
57 57 Raises ValueError if not."""
58 58 for k,v in iteritems(dct):
59 59 if not isinstance(k, string_types):
60 60 raise ValueError('key %r in dict must be a string' % k)
61 61 if not isinstance(v, string_types):
62 62 raise ValueError('value %r in dict must be a string' % v)
63 63
64 64
65 65 #-----------------------------------------------------------------------------
66 66 # ZMQ Socket Channel classes
67 67 #-----------------------------------------------------------------------------
68 68
69 69 class ZMQSocketChannel(Thread):
70 70 """The base class for the channels that use ZMQ sockets."""
71 71 context = None
72 72 session = None
73 73 socket = None
74 74 ioloop = None
75 75 stream = None
76 76 _address = None
77 77 _exiting = False
78 78 proxy_methods = []
79 79
80 80 def __init__(self, context, session, address):
81 81 """Create a channel.
82 82
83 83 Parameters
84 84 ----------
85 85 context : :class:`zmq.Context`
86 86 The ZMQ context to use.
87 87 session : :class:`session.Session`
88 88 The session to use.
89 89 address : zmq url
90 90 Standard (ip, port) tuple that the kernel is listening on.
91 91 """
92 92 super(ZMQSocketChannel, self).__init__()
93 93 self.daemon = True
94 94
95 95 self.context = context
96 96 self.session = session
97 97 if isinstance(address, tuple):
98 98 if address[1] == 0:
99 99 message = 'The port number for a channel cannot be 0.'
100 100 raise InvalidPortNumber(message)
101 101 address = "tcp://%s:%i" % address
102 102 self._address = address
103 103 atexit.register(self._notice_exit)
104 104
105 105 def _notice_exit(self):
106 106 self._exiting = True
107 107
108 108 def _run_loop(self):
109 109 """Run my loop, ignoring EINTR events in the poller"""
110 110 while True:
111 111 try:
112 112 self.ioloop.start()
113 113 except ZMQError as e:
114 114 if e.errno == errno.EINTR:
115 115 continue
116 116 else:
117 117 raise
118 118 except Exception:
119 119 if self._exiting:
120 120 break
121 121 else:
122 122 raise
123 123 else:
124 124 break
125 125
126 126 def stop(self):
127 127 """Stop the channel's event loop and join its thread.
128 128
129 129 This calls :meth:`~threading.Thread.join` and returns when the thread
130 130 terminates. :class:`RuntimeError` will be raised if
131 131 :meth:`~threading.Thread.start` is called again.
132 132 """
133 133 if self.ioloop is not None:
134 134 self.ioloop.stop()
135 135 self.join()
136 136 self.close()
137 137
138 138 def close(self):
139 139 if self.ioloop is not None:
140 140 try:
141 141 self.ioloop.close(all_fds=True)
142 142 except Exception:
143 143 pass
144 144 if self.socket is not None:
145 145 try:
146 146 self.socket.close(linger=0)
147 147 except Exception:
148 148 pass
149 149 self.socket = None
150 150
151 151 @property
152 152 def address(self):
153 153 """Get the channel's address as a zmq url string.
154 154
155 155 These URLS have the form: 'tcp://127.0.0.1:5555'.
156 156 """
157 157 return self._address
158 158
159 159 def _queue_send(self, msg):
160 160 """Queue a message to be sent from the IOLoop's thread.
161 161
162 162 Parameters
163 163 ----------
164 164 msg : message to send
165 165
166 166 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
167 167 thread control of the action.
168 168 """
169 169 def thread_send():
170 170 self.session.send(self.stream, msg)
171 171 self.ioloop.add_callback(thread_send)
172 172
173 173 def _handle_recv(self, msg):
174 174 """Callback for stream.on_recv.
175 175
176 176 Unpacks message, and calls handlers with it.
177 177 """
178 178 ident,smsg = self.session.feed_identities(msg)
179 179 msg = self.session.deserialize(smsg)
180 180 self.call_handlers(msg)
181 181
182 182
183 183
184 184 class ShellChannel(ZMQSocketChannel):
185 185 """The shell channel for issuing request/replies to the kernel."""
186 186
187 187 command_queue = None
188 188 # flag for whether execute requests should be allowed to call raw_input:
189 189 allow_stdin = True
190 190 proxy_methods = [
191 191 'execute',
192 192 'complete',
193 193 'inspect',
194 194 'history',
195 195 'kernel_info',
196 196 'shutdown',
197 197 'is_complete',
198 198 ]
199 199
200 200 def __init__(self, context, session, address):
201 201 super(ShellChannel, self).__init__(context, session, address)
202 202 self.ioloop = ioloop.IOLoop()
203 203
204 204 def run(self):
205 205 """The thread's main activity. Call start() instead."""
206 206 self.socket = self.context.socket(zmq.DEALER)
207 207 self.socket.linger = 1000
208 208 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
209 209 self.socket.connect(self.address)
210 210 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
211 211 self.stream.on_recv(self._handle_recv)
212 212 self._run_loop()
213 213
214 214 def call_handlers(self, msg):
215 215 """This method is called in the ioloop thread when a message arrives.
216 216
217 217 Subclasses should override this method to handle incoming messages.
218 218 It is important to remember that this method is called in the thread
219 219 so that some logic must be done to ensure that the application level
220 220 handlers are called in the application thread.
221 221 """
222 222 raise NotImplementedError('call_handlers must be defined in a subclass.')
223 223
224 224 def execute(self, code, silent=False, store_history=True,
225 225 user_expressions=None, allow_stdin=None):
226 226 """Execute code in the kernel.
227 227
228 228 Parameters
229 229 ----------
230 230 code : str
231 231 A string of Python code.
232 232
233 233 silent : bool, optional (default False)
234 234 If set, the kernel will execute the code as quietly possible, and
235 235 will force store_history to be False.
236 236
237 237 store_history : bool, optional (default True)
238 238 If set, the kernel will store command history. This is forced
239 239 to be False if silent is True.
240 240
241 241 user_expressions : dict, optional
242 242 A dict mapping names to expressions to be evaluated in the user's
243 243 dict. The expression values are returned as strings formatted using
244 244 :func:`repr`.
245 245
246 246 allow_stdin : bool, optional (default self.allow_stdin)
247 247 Flag for whether the kernel can send stdin requests to frontends.
248 248
249 249 Some frontends (e.g. the Notebook) do not support stdin requests.
250 250 If raw_input is called from code executed from such a frontend, a
251 251 StdinNotImplementedError will be raised.
252 252
253 253 Returns
254 254 -------
255 255 The msg_id of the message sent.
256 256 """
257 257 if user_expressions is None:
258 258 user_expressions = {}
259 259 if allow_stdin is None:
260 260 allow_stdin = self.allow_stdin
261 261
262 262
263 263 # Don't waste network traffic if inputs are invalid
264 264 if not isinstance(code, string_types):
265 265 raise ValueError('code %r must be a string' % code)
266 266 validate_string_dict(user_expressions)
267 267
268 268 # Create class for content/msg creation. Related to, but possibly
269 269 # not in Session.
270 270 content = dict(code=code, silent=silent, store_history=store_history,
271 271 user_expressions=user_expressions,
272 272 allow_stdin=allow_stdin,
273 273 )
274 274 msg = self.session.msg('execute_request', content)
275 275 self._queue_send(msg)
276 276 return msg['header']['msg_id']
277 277
278 278 def complete(self, code, cursor_pos=None):
279 279 """Tab complete text in the kernel's namespace.
280 280
281 281 Parameters
282 282 ----------
283 283 code : str
284 284 The context in which completion is requested.
285 285 Can be anything between a variable name and an entire cell.
286 286 cursor_pos : int, optional
287 287 The position of the cursor in the block of code where the completion was requested.
288 288 Default: ``len(code)``
289 289
290 290 Returns
291 291 -------
292 292 The msg_id of the message sent.
293 293 """
294 294 if cursor_pos is None:
295 295 cursor_pos = len(code)
296 296 content = dict(code=code, cursor_pos=cursor_pos)
297 297 msg = self.session.msg('complete_request', content)
298 298 self._queue_send(msg)
299 299 return msg['header']['msg_id']
300 300
301 301 def inspect(self, code, cursor_pos=None, detail_level=0):
302 302 """Get metadata information about an object in the kernel's namespace.
303 303
304 304 It is up to the kernel to determine the appropriate object to inspect.
305 305
306 306 Parameters
307 307 ----------
308 308 code : str
309 309 The context in which info is requested.
310 310 Can be anything between a variable name and an entire cell.
311 311 cursor_pos : int, optional
312 312 The position of the cursor in the block of code where the info was requested.
313 313 Default: ``len(code)``
314 314 detail_level : int, optional
315 315 The level of detail for the introspection (0-2)
316 316
317 317 Returns
318 318 -------
319 319 The msg_id of the message sent.
320 320 """
321 321 if cursor_pos is None:
322 322 cursor_pos = len(code)
323 323 content = dict(code=code, cursor_pos=cursor_pos,
324 324 detail_level=detail_level,
325 325 )
326 326 msg = self.session.msg('inspect_request', content)
327 327 self._queue_send(msg)
328 328 return msg['header']['msg_id']
329 329
330 330 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
331 331 """Get entries from the kernel's history list.
332 332
333 333 Parameters
334 334 ----------
335 335 raw : bool
336 336 If True, return the raw input.
337 337 output : bool
338 338 If True, then return the output as well.
339 339 hist_access_type : str
340 340 'range' (fill in session, start and stop params), 'tail' (fill in n)
341 341 or 'search' (fill in pattern param).
342 342
343 343 session : int
344 344 For a range request, the session from which to get lines. Session
345 345 numbers are positive integers; negative ones count back from the
346 346 current session.
347 347 start : int
348 348 The first line number of a history range.
349 349 stop : int
350 350 The final (excluded) line number of a history range.
351 351
352 352 n : int
353 353 The number of lines of history to get for a tail request.
354 354
355 355 pattern : str
356 356 The glob-syntax pattern for a search request.
357 357
358 358 Returns
359 359 -------
360 360 The msg_id of the message sent.
361 361 """
362 362 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
363 363 **kwargs)
364 364 msg = self.session.msg('history_request', content)
365 365 self._queue_send(msg)
366 366 return msg['header']['msg_id']
367 367
368 368 def kernel_info(self):
369 369 """Request kernel info."""
370 370 msg = self.session.msg('kernel_info_request')
371 371 self._queue_send(msg)
372 372 return msg['header']['msg_id']
373 373
374 374 def _handle_kernel_info_reply(self, msg):
375 375 """handle kernel info reply
376 376
377 377 sets protocol adaptation version
378 378 """
379 379 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
380 380 if adapt_version != major_protocol_version:
381 381 self.session.adapt_version = adapt_version
382 382
383 383 def shutdown(self, restart=False):
384 384 """Request an immediate kernel shutdown.
385 385
386 386 Upon receipt of the (empty) reply, client code can safely assume that
387 387 the kernel has shut down and it's safe to forcefully terminate it if
388 388 it's still alive.
389 389
390 390 The kernel will send the reply via a function registered with Python's
391 391 atexit module, ensuring it's truly done as the kernel is done with all
392 392 normal operation.
393 393 """
394 394 # Send quit message to kernel. Once we implement kernel-side setattr,
395 395 # this should probably be done that way, but for now this will do.
396 396 msg = self.session.msg('shutdown_request', {'restart':restart})
397 397 self._queue_send(msg)
398 398 return msg['header']['msg_id']
399 399
400 400 def is_complete(self, code):
401 401 msg = self.session.msg('is_complete_request', {'code': code})
402 402 self._queue_send(msg)
403 403 return msg['header']['msg_id']
404 404
405 405
406 406 class IOPubChannel(ZMQSocketChannel):
407 407 """The iopub channel which listens for messages that the kernel publishes.
408 408
409 409 This channel is where all output is published to frontends.
410 410 """
411 411
412 412 def __init__(self, context, session, address):
413 413 super(IOPubChannel, self).__init__(context, session, address)
414 414 self.ioloop = ioloop.IOLoop()
415 415
416 416 def run(self):
417 417 """The thread's main activity. Call start() instead."""
418 418 self.socket = self.context.socket(zmq.SUB)
419 419 self.socket.linger = 1000
420 420 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
421 421 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
422 422 self.socket.connect(self.address)
423 423 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
424 424 self.stream.on_recv(self._handle_recv)
425 425 self._run_loop()
426 426
427 427 def call_handlers(self, msg):
428 428 """This method is called in the ioloop thread when a message arrives.
429 429
430 430 Subclasses should override this method to handle incoming messages.
431 431 It is important to remember that this method is called in the thread
432 432 so that some logic must be done to ensure that the application leve
433 433 handlers are called in the application thread.
434 434 """
435 435 raise NotImplementedError('call_handlers must be defined in a subclass.')
436 436
437 437 def flush(self, timeout=1.0):
438 438 """Immediately processes all pending messages on the iopub channel.
439 439
440 440 Callers should use this method to ensure that :meth:`call_handlers`
441 441 has been called for all messages that have been received on the
442 442 0MQ SUB socket of this channel.
443 443
444 444 This method is thread safe.
445 445
446 446 Parameters
447 447 ----------
448 448 timeout : float, optional
449 449 The maximum amount of time to spend flushing, in seconds. The
450 450 default is one second.
451 451 """
452 452 # We do the IOLoop callback process twice to ensure that the IOLoop
453 453 # gets to perform at least one full poll.
454 454 stop_time = time.time() + timeout
455 455 for i in range(2):
456 456 self._flushed = False
457 457 self.ioloop.add_callback(self._flush)
458 458 while not self._flushed and time.time() < stop_time:
459 459 time.sleep(0.01)
460 460
461 461 def _flush(self):
462 462 """Callback for :method:`self.flush`."""
463 463 self.stream.flush()
464 464 self._flushed = True
465 465
466 466
467 467 class StdInChannel(ZMQSocketChannel):
468 468 """The stdin channel to handle raw_input requests that the kernel makes."""
469 469
470 470 msg_queue = None
471 471 proxy_methods = ['input']
472 472
473 473 def __init__(self, context, session, address):
474 474 super(StdInChannel, self).__init__(context, session, address)
475 475 self.ioloop = ioloop.IOLoop()
476 476
477 477 def run(self):
478 478 """The thread's main activity. Call start() instead."""
479 479 self.socket = self.context.socket(zmq.DEALER)
480 480 self.socket.linger = 1000
481 481 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
482 482 self.socket.connect(self.address)
483 483 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
484 484 self.stream.on_recv(self._handle_recv)
485 485 self._run_loop()
486 486
487 487 def call_handlers(self, msg):
488 488 """This method is called in the ioloop thread when a message arrives.
489 489
490 490 Subclasses should override this method to handle incoming messages.
491 491 It is important to remember that this method is called in the thread
492 492 so that some logic must be done to ensure that the application leve
493 493 handlers are called in the application thread.
494 494 """
495 495 raise NotImplementedError('call_handlers must be defined in a subclass.')
496 496
497 497 def input(self, string):
498 498 """Send a string of raw input to the kernel."""
499 499 content = dict(value=string)
500 500 msg = self.session.msg('input_reply', content)
501 501 self._queue_send(msg)
502 502
503 def make_shell_socket(context, identity, address):
504 socket = context.socket(zmq.DEALER)
505 socket.linger = 1000
506 socket.setsockopt(zmq.IDENTITY, identity)
507 socket.connect(address)
508 return socket
509
510 def make_iopub_socket(context, identity, address):
511 socket = context.socket(zmq.SUB)
512 socket.linger = 1000
513 socket.setsockopt(zmq.SUBSCRIBE,b'')
514 socket.setsockopt(zmq.IDENTITY, identity)
515 socket.connect(address)
516 return socket
517
518 def make_stdin_socket(context, identity, address):
519 socket = context.socket(zmq.DEALER)
520 socket.linger = 1000
521 socket.setsockopt(zmq.IDENTITY, identity)
522 socket.connect(address)
523 return socket
503 524
504 525 class HBChannel(ZMQSocketChannel):
505 526 """The heartbeat channel which monitors the kernel heartbeat.
506 527
507 528 Note that the heartbeat channel is paused by default. As long as you start
508 529 this channel, the kernel manager will ensure that it is paused and un-paused
509 530 as appropriate.
510 531 """
511 532
512 533 time_to_dead = 3.0
513 534 socket = None
514 535 poller = None
515 536 _running = None
516 537 _pause = None
517 538 _beating = None
518 539
519 540 def __init__(self, context, session, address):
520 541 super(HBChannel, self).__init__(context, session, address)
521 542 self._running = False
522 543 self._pause =True
523 544 self.poller = zmq.Poller()
524 545
525 546 def _create_socket(self):
526 547 if self.socket is not None:
527 548 # close previous socket, before opening a new one
528 549 self.poller.unregister(self.socket)
529 550 self.socket.close()
530 551 self.socket = self.context.socket(zmq.REQ)
531 552 self.socket.linger = 1000
532 553 self.socket.connect(self.address)
533 554
534 555 self.poller.register(self.socket, zmq.POLLIN)
535 556
536 557 def _poll(self, start_time):
537 558 """poll for heartbeat replies until we reach self.time_to_dead.
538 559
539 560 Ignores interrupts, and returns the result of poll(), which
540 561 will be an empty list if no messages arrived before the timeout,
541 562 or the event tuple if there is a message to receive.
542 563 """
543 564
544 565 until_dead = self.time_to_dead - (time.time() - start_time)
545 566 # ensure poll at least once
546 567 until_dead = max(until_dead, 1e-3)
547 568 events = []
548 569 while True:
549 570 try:
550 571 events = self.poller.poll(1000 * until_dead)
551 572 except ZMQError as e:
552 573 if e.errno == errno.EINTR:
553 574 # ignore interrupts during heartbeat
554 575 # this may never actually happen
555 576 until_dead = self.time_to_dead - (time.time() - start_time)
556 577 until_dead = max(until_dead, 1e-3)
557 578 pass
558 579 else:
559 580 raise
560 581 except Exception:
561 582 if self._exiting:
562 583 break
563 584 else:
564 585 raise
565 586 else:
566 587 break
567 588 return events
568 589
569 590 def run(self):
570 591 """The thread's main activity. Call start() instead."""
571 592 self._create_socket()
572 593 self._running = True
573 594 self._beating = True
574 595
575 596 while self._running:
576 597 if self._pause:
577 598 # just sleep, and skip the rest of the loop
578 599 time.sleep(self.time_to_dead)
579 600 continue
580 601
581 602 since_last_heartbeat = 0.0
582 603 # io.rprint('Ping from HB channel') # dbg
583 604 # no need to catch EFSM here, because the previous event was
584 605 # either a recv or connect, which cannot be followed by EFSM
585 606 self.socket.send(b'ping')
586 607 request_time = time.time()
587 608 ready = self._poll(request_time)
588 609 if ready:
589 610 self._beating = True
590 611 # the poll above guarantees we have something to recv
591 612 self.socket.recv()
592 613 # sleep the remainder of the cycle
593 614 remainder = self.time_to_dead - (time.time() - request_time)
594 615 if remainder > 0:
595 616 time.sleep(remainder)
596 617 continue
597 618 else:
598 619 # nothing was received within the time limit, signal heart failure
599 620 self._beating = False
600 621 since_last_heartbeat = time.time() - request_time
601 622 self.call_handlers(since_last_heartbeat)
602 623 # and close/reopen the socket, because the REQ/REP cycle has been broken
603 624 self._create_socket()
604 625 continue
605 626
606 627 def pause(self):
607 628 """Pause the heartbeat."""
608 629 self._pause = True
609 630
610 631 def unpause(self):
611 632 """Unpause the heartbeat."""
612 633 self._pause = False
613 634
614 635 def is_beating(self):
615 636 """Is the heartbeat running and responsive (and not paused)."""
616 637 if self.is_alive() and not self._pause and self._beating:
617 638 return True
618 639 else:
619 640 return False
620 641
621 642 def stop(self):
622 643 """Stop the channel's event loop and join its thread."""
623 644 self._running = False
624 645 super(HBChannel, self).stop()
625 646
626 647 def call_handlers(self, since_last_heartbeat):
627 648 """This method is called in the ioloop thread when a message arrives.
628 649
629 650 Subclasses should override this method to handle incoming messages.
630 651 It is important to remember that this method is called in the thread
631 652 so that some logic must be done to ensure that the application level
632 653 handlers are called in the application thread.
633 654 """
634 655 raise NotImplementedError('call_handlers must be defined in a subclass.')
635 656
636 657
637 658 #---------------------------------------------------------------------#-----------------------------------------------------------------------------
638 659 # ABC Registration
639 660 #-----------------------------------------------------------------------------
640 661
641 662 ShellChannelABC.register(ShellChannel)
642 663 IOPubChannelABC.register(IOPubChannel)
643 664 HBChannelABC.register(HBChannel)
644 665 StdInChannelABC.register(StdInChannel)
@@ -1,582 +1,563 b''
1 1 """ Defines a KernelClient that provides signals and slots.
2 2 """
3 3 import atexit
4 4 import errno
5 5 from threading import Thread
6 6 import time
7 7
8 8 import zmq
9 9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 10 # during garbage collection of threads at exit:
11 11 from zmq import ZMQError
12 12 from zmq.eventloop import ioloop, zmqstream
13 13
14 14 from IPython.external.qt import QtCore
15 15
16 16 # Local imports
17 17 from IPython.utils.traitlets import Type
18 from IPython.kernel.channels import HBChannel
18 from IPython.kernel.channels import HBChannel,\
19 make_shell_socket, make_iopub_socket, make_stdin_socket
19 20 from IPython.kernel import KernelClient
20 21
21 22 from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin)
22 23 from .util import SuperQObject
23 24
24 25 class QtHBChannel(QtHBChannelMixin, HBChannel):
25 26 pass
26 27
27 28 from IPython.core.release import kernel_protocol_version_info
28 29
29 30 from IPython.kernel.channelsabc import (
30 31 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
31 32 )
32 33 from IPython.utils.py3compat import string_types, iteritems
33 34
34 35 major_protocol_version = kernel_protocol_version_info[0]
35 36
36 37 class InvalidPortNumber(Exception):
37 38 pass
38 39
39 40 # some utilities to validate message structure, these might get moved elsewhere
40 41 # if they prove to have more generic utility
41 42
42 def validate_string_list(lst):
43 """Validate that the input is a list of strings.
44
45 Raises ValueError if not."""
46 if not isinstance(lst, list):
47 raise ValueError('input %r must be a list' % lst)
48 for x in lst:
49 if not isinstance(x, string_types):
50 raise ValueError('element %r in list must be a string' % x)
51
52 43
53 44 def validate_string_dict(dct):
54 45 """Validate that the input is a dict with string keys and values.
55 46
56 47 Raises ValueError if not."""
57 48 for k,v in iteritems(dct):
58 49 if not isinstance(k, string_types):
59 50 raise ValueError('key %r in dict must be a string' % k)
60 51 if not isinstance(v, string_types):
61 52 raise ValueError('value %r in dict must be a string' % v)
62 53
63 54
64 55
65 56 class QtZMQSocketChannel(SuperQObject, Thread):
66 57 """The base class for the channels that use ZMQ sockets."""
67 58 context = None
68 59 session = None
69 60 socket = None
70 61 ioloop = None
71 62 stream = None
72 63 _address = None
73 64 _exiting = False
74 65 proxy_methods = []
75 66
76 67 # Emitted when the channel is started.
77 68 started = QtCore.Signal()
78 69
79 70 # Emitted when the channel is stopped.
80 71 stopped = QtCore.Signal()
81 72
82 73 message_received = QtCore.Signal(object)
83 74
84 75 #---------------------------------------------------------------------------
85 76 # InProcessChannel interface
86 77 #---------------------------------------------------------------------------
87 78
88 79 def call_handlers_later(self, *args, **kwds):
89 80 """ Call the message handlers later.
90 81 """
91 82 do_later = lambda: self.call_handlers(*args, **kwds)
92 83 QtCore.QTimer.singleShot(0, do_later)
93 84
94 85 def process_events(self):
95 86 """ Process any pending GUI events.
96 87 """
97 88 QtCore.QCoreApplication.instance().processEvents()
98 89
99 90 def __init__(self, context, session, address):
100 91 """Create a channel.
101 92
102 93 Parameters
103 94 ----------
104 95 context : :class:`zmq.Context`
105 96 The ZMQ context to use.
106 97 session : :class:`session.Session`
107 98 The session to use.
108 99 address : zmq url
109 100 Standard (ip, port) tuple that the kernel is listening on.
110 101 """
111 102 super(QtZMQSocketChannel, self).__init__()
112 103 self.daemon = True
113 104
114 105 self.context = context
115 106 self.session = session
116 107 if isinstance(address, tuple):
117 108 if address[1] == 0:
118 109 message = 'The port number for a channel cannot be 0.'
119 110 raise InvalidPortNumber(message)
120 111 address = "tcp://%s:%i" % address
121 112 self._address = address
122 113 atexit.register(self._notice_exit)
123 114
124 115 def _notice_exit(self):
125 116 self._exiting = True
126 117
127 118 def _run_loop(self):
128 119 """Run my loop, ignoring EINTR events in the poller"""
129 120 while True:
130 121 try:
131 122 self.ioloop.start()
132 123 except ZMQError as e:
133 124 if e.errno == errno.EINTR:
134 125 continue
135 126 else:
136 127 raise
137 128 except Exception:
138 129 if self._exiting:
139 130 break
140 131 else:
141 132 raise
142 133 else:
143 134 break
144 135
145 136 def start(self):
146 137 """ Reimplemented to emit signal.
147 138 """
148 139 super(QtZMQSocketChannel, self).start()
149 140 self.started.emit()
150 141
151 142 def stop(self):
152 143 """Stop the channel's event loop and join its thread.
153 144
154 145 This calls :meth:`~threading.Thread.join` and returns when the thread
155 146 terminates. :class:`RuntimeError` will be raised if
156 147 :meth:`~threading.Thread.start` is called again.
157 148 """
158 149 if self.ioloop is not None:
159 150 self.ioloop.stop()
160 151 self.join()
161 152 self.close()
162 153 self.stopped.emit()
163 154
164 155 def close(self):
165 156 if self.ioloop is not None:
166 157 try:
167 158 self.ioloop.close(all_fds=True)
168 159 except Exception:
169 160 pass
170 161 if self.socket is not None:
171 162 try:
172 163 self.socket.close(linger=0)
173 164 except Exception:
174 165 pass
175 166 self.socket = None
176 167
177 168 @property
178 169 def address(self):
179 170 """Get the channel's address as a zmq url string.
180 171
181 172 These URLS have the form: 'tcp://127.0.0.1:5555'.
182 173 """
183 174 return self._address
184 175
185 176 def _queue_send(self, msg):
186 177 """Queue a message to be sent from the IOLoop's thread.
187 178
188 179 Parameters
189 180 ----------
190 181 msg : message to send
191 182
192 183 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
193 184 thread control of the action.
194 185 """
195 186 def thread_send():
196 187 self.session.send(self.stream, msg)
197 188 self.ioloop.add_callback(thread_send)
198 189
199 190 def _handle_recv(self, msg):
200 191 """Callback for stream.on_recv.
201 192
202 193 Unpacks message, and calls handlers with it.
203 194 """
204 195 ident,smsg = self.session.feed_identities(msg)
205 196 msg = self.session.deserialize(smsg)
206 197 self.call_handlers(msg)
207 198
208 199 def call_handlers(self, msg):
209 200 """This method is called in the ioloop thread when a message arrives.
210 201
211 202 Subclasses should override this method to handle incoming messages.
212 203 It is important to remember that this method is called in the thread
213 204 so that some logic must be done to ensure that the application level
214 205 handlers are called in the application thread.
215 206 """
216 207 # Emit the generic signal.
217 208 self.message_received.emit(msg)
218 209
219 210
220 211 class QtShellChannel(QtZMQSocketChannel):
221 212 """The shell channel for issuing request/replies to the kernel."""
222 213
223 214 command_queue = None
224 215 # flag for whether execute requests should be allowed to call raw_input:
225 216 allow_stdin = True
226 217 proxy_methods = [
227 218 'execute',
228 219 'complete',
229 220 'inspect',
230 221 'history',
231 222 'kernel_info',
232 223 'shutdown',
233 224 'is_complete',
234 225 ]
235 226
236 227 # Emitted when a reply has been received for the corresponding request type.
237 228 execute_reply = QtCore.Signal(object)
238 229 complete_reply = QtCore.Signal(object)
239 230 inspect_reply = QtCore.Signal(object)
240 231 history_reply = QtCore.Signal(object)
241 232 kernel_info_reply = QtCore.Signal(object)
242 233
243 234 def __init__(self, context, session, address):
244 235 super(QtShellChannel, self).__init__(context, session, address)
245 236 self.ioloop = ioloop.IOLoop()
246 237
247 238 def run(self):
248 239 """The thread's main activity. Call start() instead."""
249 self.socket = self.context.socket(zmq.DEALER)
250 self.socket.linger = 1000
251 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
252 self.socket.connect(self.address)
240 self.socket = make_shell_socket(self.context, self.session.bsession, self.address)
253 241 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
254 242 self.stream.on_recv(self._handle_recv)
255 243 self._run_loop()
256 244
257 245 def call_handlers(self, msg):
258 246 super(QtShellChannel, self).call_handlers(msg)
259 247
260 248 # Catch kernel_info_reply for message spec adaptation
261 249 msg_type = msg['header']['msg_type']
262 250 if msg_type == 'kernel_info_reply':
263 251 self._handle_kernel_info_reply(msg)
264 252
265 253 # Emit specific signals
266 254 signal = getattr(self, msg_type, None)
267 255 if signal:
268 256 signal.emit(msg)
269 257
270 258 def execute(self, code, silent=False, store_history=True,
271 259 user_expressions=None, allow_stdin=None):
272 260 """Execute code in the kernel.
273 261
274 262 Parameters
275 263 ----------
276 264 code : str
277 265 A string of Python code.
278 266
279 267 silent : bool, optional (default False)
280 268 If set, the kernel will execute the code as quietly possible, and
281 269 will force store_history to be False.
282 270
283 271 store_history : bool, optional (default True)
284 272 If set, the kernel will store command history. This is forced
285 273 to be False if silent is True.
286 274
287 275 user_expressions : dict, optional
288 276 A dict mapping names to expressions to be evaluated in the user's
289 277 dict. The expression values are returned as strings formatted using
290 278 :func:`repr`.
291 279
292 280 allow_stdin : bool, optional (default self.allow_stdin)
293 281 Flag for whether the kernel can send stdin requests to frontends.
294 282
295 283 Some frontends (e.g. the Notebook) do not support stdin requests.
296 284 If raw_input is called from code executed from such a frontend, a
297 285 StdinNotImplementedError will be raised.
298 286
299 287 Returns
300 288 -------
301 289 The msg_id of the message sent.
302 290 """
303 291 if user_expressions is None:
304 292 user_expressions = {}
305 293 if allow_stdin is None:
306 294 allow_stdin = self.allow_stdin
307 295
308 296
309 297 # Don't waste network traffic if inputs are invalid
310 298 if not isinstance(code, string_types):
311 299 raise ValueError('code %r must be a string' % code)
312 300 validate_string_dict(user_expressions)
313 301
314 302 # Create class for content/msg creation. Related to, but possibly
315 303 # not in Session.
316 304 content = dict(code=code, silent=silent, store_history=store_history,
317 305 user_expressions=user_expressions,
318 306 allow_stdin=allow_stdin,
319 307 )
320 308 msg = self.session.msg('execute_request', content)
321 309 self._queue_send(msg)
322 310 return msg['header']['msg_id']
323 311
324 312 def complete(self, code, cursor_pos=None):
325 313 """Tab complete text in the kernel's namespace.
326 314
327 315 Parameters
328 316 ----------
329 317 code : str
330 318 The context in which completion is requested.
331 319 Can be anything between a variable name and an entire cell.
332 320 cursor_pos : int, optional
333 321 The position of the cursor in the block of code where the completion was requested.
334 322 Default: ``len(code)``
335 323
336 324 Returns
337 325 -------
338 326 The msg_id of the message sent.
339 327 """
340 328 if cursor_pos is None:
341 329 cursor_pos = len(code)
342 330 content = dict(code=code, cursor_pos=cursor_pos)
343 331 msg = self.session.msg('complete_request', content)
344 332 self._queue_send(msg)
345 333 return msg['header']['msg_id']
346 334
347 335 def inspect(self, code, cursor_pos=None, detail_level=0):
348 336 """Get metadata information about an object in the kernel's namespace.
349 337
350 338 It is up to the kernel to determine the appropriate object to inspect.
351 339
352 340 Parameters
353 341 ----------
354 342 code : str
355 343 The context in which info is requested.
356 344 Can be anything between a variable name and an entire cell.
357 345 cursor_pos : int, optional
358 346 The position of the cursor in the block of code where the info was requested.
359 347 Default: ``len(code)``
360 348 detail_level : int, optional
361 349 The level of detail for the introspection (0-2)
362 350
363 351 Returns
364 352 -------
365 353 The msg_id of the message sent.
366 354 """
367 355 if cursor_pos is None:
368 356 cursor_pos = len(code)
369 357 content = dict(code=code, cursor_pos=cursor_pos,
370 358 detail_level=detail_level,
371 359 )
372 360 msg = self.session.msg('inspect_request', content)
373 361 self._queue_send(msg)
374 362 return msg['header']['msg_id']
375 363
376 364 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
377 365 """Get entries from the kernel's history list.
378 366
379 367 Parameters
380 368 ----------
381 369 raw : bool
382 370 If True, return the raw input.
383 371 output : bool
384 372 If True, then return the output as well.
385 373 hist_access_type : str
386 374 'range' (fill in session, start and stop params), 'tail' (fill in n)
387 375 or 'search' (fill in pattern param).
388 376
389 377 session : int
390 378 For a range request, the session from which to get lines. Session
391 379 numbers are positive integers; negative ones count back from the
392 380 current session.
393 381 start : int
394 382 The first line number of a history range.
395 383 stop : int
396 384 The final (excluded) line number of a history range.
397 385
398 386 n : int
399 387 The number of lines of history to get for a tail request.
400 388
401 389 pattern : str
402 390 The glob-syntax pattern for a search request.
403 391
404 392 Returns
405 393 -------
406 394 The msg_id of the message sent.
407 395 """
408 396 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
409 397 **kwargs)
410 398 msg = self.session.msg('history_request', content)
411 399 self._queue_send(msg)
412 400 return msg['header']['msg_id']
413 401
414 402 def kernel_info(self):
415 403 """Request kernel info."""
416 404 msg = self.session.msg('kernel_info_request')
417 405 self._queue_send(msg)
418 406 return msg['header']['msg_id']
419 407
420 408 def _handle_kernel_info_reply(self, msg):
421 409 """handle kernel info reply
422 410
423 411 sets protocol adaptation version
424 412 """
425 413 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
426 414 if adapt_version != major_protocol_version:
427 415 self.session.adapt_version = adapt_version
428 416
429 417 def shutdown(self, restart=False):
430 418 """Request an immediate kernel shutdown.
431 419
432 420 Upon receipt of the (empty) reply, client code can safely assume that
433 421 the kernel has shut down and it's safe to forcefully terminate it if
434 422 it's still alive.
435 423
436 424 The kernel will send the reply via a function registered with Python's
437 425 atexit module, ensuring it's truly done as the kernel is done with all
438 426 normal operation.
439 427 """
440 428 # Send quit message to kernel. Once we implement kernel-side setattr,
441 429 # this should probably be done that way, but for now this will do.
442 430 msg = self.session.msg('shutdown_request', {'restart':restart})
443 431 self._queue_send(msg)
444 432 return msg['header']['msg_id']
445 433
446 434 def is_complete(self, code):
447 435 msg = self.session.msg('is_complete_request', {'code': code})
448 436 self._queue_send(msg)
449 437 return msg['header']['msg_id']
450 438
451 439
452 440 class QtIOPubChannel(QtZMQSocketChannel):
453 441 """The iopub channel which listens for messages that the kernel publishes.
454 442
455 443 This channel is where all output is published to frontends.
456 444 """
457 445 # Emitted when a message of type 'stream' is received.
458 446 stream_received = QtCore.Signal(object)
459 447
460 448 # Emitted when a message of type 'execute_input' is received.
461 449 execute_input_received = QtCore.Signal(object)
462 450
463 451 # Emitted when a message of type 'execute_result' is received.
464 452 execute_result_received = QtCore.Signal(object)
465 453
466 454 # Emitted when a message of type 'error' is received.
467 455 error_received = QtCore.Signal(object)
468 456
469 457 # Emitted when a message of type 'display_data' is received
470 458 display_data_received = QtCore.Signal(object)
471 459
472 460 # Emitted when a crash report message is received from the kernel's
473 461 # last-resort sys.excepthook.
474 462 crash_received = QtCore.Signal(object)
475 463
476 464 # Emitted when a shutdown is noticed.
477 465 shutdown_reply_received = QtCore.Signal(object)
478 466
479 467 def __init__(self, context, session, address):
480 468 super(QtIOPubChannel, self).__init__(context, session, address)
481 469 self.ioloop = ioloop.IOLoop()
482 470
483 471 def run(self):
484 472 """The thread's main activity. Call start() instead."""
485 self.socket = self.context.socket(zmq.SUB)
486 self.socket.linger = 1000
487 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
488 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
489 self.socket.connect(self.address)
473 self.socket = make_iopub_socket(self.context, self.session.bsession, self.address)
490 474 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
491 475 self.stream.on_recv(self._handle_recv)
492 476 self._run_loop()
493 477
494 478 def call_handlers(self, msg):
495 479 super(QtIOPubChannel, self).call_handlers(msg)
496 480
497 481 # Emit signals for specialized message types.
498 482 msg_type = msg['header']['msg_type']
499 483 signal = getattr(self, msg_type + '_received', None)
500 484 if signal:
501 485 signal.emit(msg)
502 486
503 487 def flush(self, timeout=1.0):
504 488 """Immediately processes all pending messages on the iopub channel.
505 489
506 490 Callers should use this method to ensure that :meth:`call_handlers`
507 491 has been called for all messages that have been received on the
508 492 0MQ SUB socket of this channel.
509 493
510 494 This method is thread safe.
511 495
512 496 Parameters
513 497 ----------
514 498 timeout : float, optional
515 499 The maximum amount of time to spend flushing, in seconds. The
516 500 default is one second.
517 501 """
518 502 # We do the IOLoop callback process twice to ensure that the IOLoop
519 503 # gets to perform at least one full poll.
520 504 stop_time = time.time() + timeout
521 505 for i in range(2):
522 506 self._flushed = False
523 507 self.ioloop.add_callback(self._flush)
524 508 while not self._flushed and time.time() < stop_time:
525 509 time.sleep(0.01)
526 510
527 511 def _flush(self):
528 512 """Callback for :method:`self.flush`."""
529 513 self.stream.flush()
530 514 self._flushed = True
531 515
532 516
533 517 class QtStdInChannel(QtZMQSocketChannel):
534 518 """The stdin channel to handle raw_input requests that the kernel makes."""
535 519
536 520 msg_queue = None
537 521 proxy_methods = ['input']
538 522
539 523 # Emitted when an input request is received.
540 524 input_requested = QtCore.Signal(object)
541 525
542 526 def __init__(self, context, session, address):
543 527 super(QtStdInChannel, self).__init__(context, session, address)
544 528 self.ioloop = ioloop.IOLoop()
545 529
546 530 def run(self):
547 531 """The thread's main activity. Call start() instead."""
548 self.socket = self.context.socket(zmq.DEALER)
549 self.socket.linger = 1000
550 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
551 self.socket.connect(self.address)
532 self.socket = make_stdin_socket(self.context, self.session.bsession, self.address)
552 533 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
553 534 self.stream.on_recv(self._handle_recv)
554 535 self._run_loop()
555 536
556 537 def call_handlers(self, msg):
557 538 super(QtStdInChannel, self).call_handlers(msg)
558 539
559 540 # Emit signals for specialized message types.
560 541 msg_type = msg['header']['msg_type']
561 542 if msg_type == 'input_request':
562 543 self.input_requested.emit(msg)
563 544
564 545 def input(self, string):
565 546 """Send a string of raw input to the kernel."""
566 547 content = dict(value=string)
567 548 msg = self.session.msg('input_reply', content)
568 549 self._queue_send(msg)
569 550
570 551 ShellChannelABC.register(QtShellChannel)
571 552 IOPubChannelABC.register(QtIOPubChannel)
572 553 StdInChannelABC.register(QtStdInChannel)
573 554
574 555
575 556 class QtKernelClient(QtKernelClientMixin, KernelClient):
576 557 """ A KernelClient that provides signals and slots.
577 558 """
578 559
579 560 iopub_channel_class = Type(QtIOPubChannel)
580 561 shell_channel_class = Type(QtShellChannel)
581 562 stdin_channel_class = Type(QtStdInChannel)
582 563 hb_channel_class = Type(QtHBChannel)
General Comments 0
You need to be logged in to leave comments. Login now