##// END OF EJS Templates
Add is_complete request and reply to messaging infrastructure
Thomas Kluyver -
Show More
@@ -1,639 +1,644 b''
1 """Base classes to manage a Client's interaction with a running kernel"""
1 """Base classes to manage a Client's interaction with a running kernel"""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from __future__ import absolute_import
6 from __future__ import absolute_import
7
7
8 import atexit
8 import atexit
9 import errno
9 import errno
10 from threading import Thread
10 from threading import Thread
11 import time
11 import time
12
12
13 import zmq
13 import zmq
14 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
14 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
15 # during garbage collection of threads at exit:
15 # during garbage collection of threads at exit:
16 from zmq import ZMQError
16 from zmq import ZMQError
17 from zmq.eventloop import ioloop, zmqstream
17 from zmq.eventloop import ioloop, zmqstream
18
18
19 from IPython.core.release import kernel_protocol_version_info
19 from IPython.core.release import kernel_protocol_version_info
20
20
21 from .channelsabc import (
21 from .channelsabc import (
22 ShellChannelABC, IOPubChannelABC,
22 ShellChannelABC, IOPubChannelABC,
23 HBChannelABC, StdInChannelABC,
23 HBChannelABC, StdInChannelABC,
24 )
24 )
25 from IPython.utils.py3compat import string_types, iteritems
25 from IPython.utils.py3compat import string_types, iteritems
26
26
27 #-----------------------------------------------------------------------------
27 #-----------------------------------------------------------------------------
28 # Constants and exceptions
28 # Constants and exceptions
29 #-----------------------------------------------------------------------------
29 #-----------------------------------------------------------------------------
30
30
31 major_protocol_version = kernel_protocol_version_info[0]
31 major_protocol_version = kernel_protocol_version_info[0]
32
32
33 class InvalidPortNumber(Exception):
33 class InvalidPortNumber(Exception):
34 pass
34 pass
35
35
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37 # Utility functions
37 # Utility functions
38 #-----------------------------------------------------------------------------
38 #-----------------------------------------------------------------------------
39
39
40 # some utilities to validate message structure, these might get moved elsewhere
40 # some utilities to validate message structure, these might get moved elsewhere
41 # if they prove to have more generic utility
41 # if they prove to have more generic utility
42
42
43 def validate_string_list(lst):
43 def validate_string_list(lst):
44 """Validate that the input is a list of strings.
44 """Validate that the input is a list of strings.
45
45
46 Raises ValueError if not."""
46 Raises ValueError if not."""
47 if not isinstance(lst, list):
47 if not isinstance(lst, list):
48 raise ValueError('input %r must be a list' % lst)
48 raise ValueError('input %r must be a list' % lst)
49 for x in lst:
49 for x in lst:
50 if not isinstance(x, string_types):
50 if not isinstance(x, string_types):
51 raise ValueError('element %r in list must be a string' % x)
51 raise ValueError('element %r in list must be a string' % x)
52
52
53
53
54 def validate_string_dict(dct):
54 def validate_string_dict(dct):
55 """Validate that the input is a dict with string keys and values.
55 """Validate that the input is a dict with string keys and values.
56
56
57 Raises ValueError if not."""
57 Raises ValueError if not."""
58 for k,v in iteritems(dct):
58 for k,v in iteritems(dct):
59 if not isinstance(k, string_types):
59 if not isinstance(k, string_types):
60 raise ValueError('key %r in dict must be a string' % k)
60 raise ValueError('key %r in dict must be a string' % k)
61 if not isinstance(v, string_types):
61 if not isinstance(v, string_types):
62 raise ValueError('value %r in dict must be a string' % v)
62 raise ValueError('value %r in dict must be a string' % v)
63
63
64
64
65 #-----------------------------------------------------------------------------
65 #-----------------------------------------------------------------------------
66 # ZMQ Socket Channel classes
66 # ZMQ Socket Channel classes
67 #-----------------------------------------------------------------------------
67 #-----------------------------------------------------------------------------
68
68
69 class ZMQSocketChannel(Thread):
69 class ZMQSocketChannel(Thread):
70 """The base class for the channels that use ZMQ sockets."""
70 """The base class for the channels that use ZMQ sockets."""
71 context = None
71 context = None
72 session = None
72 session = None
73 socket = None
73 socket = None
74 ioloop = None
74 ioloop = None
75 stream = None
75 stream = None
76 _address = None
76 _address = None
77 _exiting = False
77 _exiting = False
78 proxy_methods = []
78 proxy_methods = []
79
79
80 def __init__(self, context, session, address):
80 def __init__(self, context, session, address):
81 """Create a channel.
81 """Create a channel.
82
82
83 Parameters
83 Parameters
84 ----------
84 ----------
85 context : :class:`zmq.Context`
85 context : :class:`zmq.Context`
86 The ZMQ context to use.
86 The ZMQ context to use.
87 session : :class:`session.Session`
87 session : :class:`session.Session`
88 The session to use.
88 The session to use.
89 address : zmq url
89 address : zmq url
90 Standard (ip, port) tuple that the kernel is listening on.
90 Standard (ip, port) tuple that the kernel is listening on.
91 """
91 """
92 super(ZMQSocketChannel, self).__init__()
92 super(ZMQSocketChannel, self).__init__()
93 self.daemon = True
93 self.daemon = True
94
94
95 self.context = context
95 self.context = context
96 self.session = session
96 self.session = session
97 if isinstance(address, tuple):
97 if isinstance(address, tuple):
98 if address[1] == 0:
98 if address[1] == 0:
99 message = 'The port number for a channel cannot be 0.'
99 message = 'The port number for a channel cannot be 0.'
100 raise InvalidPortNumber(message)
100 raise InvalidPortNumber(message)
101 address = "tcp://%s:%i" % address
101 address = "tcp://%s:%i" % address
102 self._address = address
102 self._address = address
103 atexit.register(self._notice_exit)
103 atexit.register(self._notice_exit)
104
104
105 def _notice_exit(self):
105 def _notice_exit(self):
106 self._exiting = True
106 self._exiting = True
107
107
108 def _run_loop(self):
108 def _run_loop(self):
109 """Run my loop, ignoring EINTR events in the poller"""
109 """Run my loop, ignoring EINTR events in the poller"""
110 while True:
110 while True:
111 try:
111 try:
112 self.ioloop.start()
112 self.ioloop.start()
113 except ZMQError as e:
113 except ZMQError as e:
114 if e.errno == errno.EINTR:
114 if e.errno == errno.EINTR:
115 continue
115 continue
116 else:
116 else:
117 raise
117 raise
118 except Exception:
118 except Exception:
119 if self._exiting:
119 if self._exiting:
120 break
120 break
121 else:
121 else:
122 raise
122 raise
123 else:
123 else:
124 break
124 break
125
125
126 def stop(self):
126 def stop(self):
127 """Stop the channel's event loop and join its thread.
127 """Stop the channel's event loop and join its thread.
128
128
129 This calls :meth:`~threading.Thread.join` and returns when the thread
129 This calls :meth:`~threading.Thread.join` and returns when the thread
130 terminates. :class:`RuntimeError` will be raised if
130 terminates. :class:`RuntimeError` will be raised if
131 :meth:`~threading.Thread.start` is called again.
131 :meth:`~threading.Thread.start` is called again.
132 """
132 """
133 if self.ioloop is not None:
133 if self.ioloop is not None:
134 self.ioloop.stop()
134 self.ioloop.stop()
135 self.join()
135 self.join()
136 self.close()
136 self.close()
137
137
138 def close(self):
138 def close(self):
139 if self.ioloop is not None:
139 if self.ioloop is not None:
140 try:
140 try:
141 self.ioloop.close(all_fds=True)
141 self.ioloop.close(all_fds=True)
142 except Exception:
142 except Exception:
143 pass
143 pass
144 if self.socket is not None:
144 if self.socket is not None:
145 try:
145 try:
146 self.socket.close(linger=0)
146 self.socket.close(linger=0)
147 except Exception:
147 except Exception:
148 pass
148 pass
149 self.socket = None
149 self.socket = None
150
150
151 @property
151 @property
152 def address(self):
152 def address(self):
153 """Get the channel's address as a zmq url string.
153 """Get the channel's address as a zmq url string.
154
154
155 These URLS have the form: 'tcp://127.0.0.1:5555'.
155 These URLS have the form: 'tcp://127.0.0.1:5555'.
156 """
156 """
157 return self._address
157 return self._address
158
158
159 def _queue_send(self, msg):
159 def _queue_send(self, msg):
160 """Queue a message to be sent from the IOLoop's thread.
160 """Queue a message to be sent from the IOLoop's thread.
161
161
162 Parameters
162 Parameters
163 ----------
163 ----------
164 msg : message to send
164 msg : message to send
165
165
166 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
166 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
167 thread control of the action.
167 thread control of the action.
168 """
168 """
169 def thread_send():
169 def thread_send():
170 self.session.send(self.stream, msg)
170 self.session.send(self.stream, msg)
171 self.ioloop.add_callback(thread_send)
171 self.ioloop.add_callback(thread_send)
172
172
173 def _handle_recv(self, msg):
173 def _handle_recv(self, msg):
174 """Callback for stream.on_recv.
174 """Callback for stream.on_recv.
175
175
176 Unpacks message, and calls handlers with it.
176 Unpacks message, and calls handlers with it.
177 """
177 """
178 ident,smsg = self.session.feed_identities(msg)
178 ident,smsg = self.session.feed_identities(msg)
179 msg = self.session.unserialize(smsg)
179 msg = self.session.unserialize(smsg)
180 self.call_handlers(msg)
180 self.call_handlers(msg)
181
181
182
182
183
183
184 class ShellChannel(ZMQSocketChannel):
184 class ShellChannel(ZMQSocketChannel):
185 """The shell channel for issuing request/replies to the kernel."""
185 """The shell channel for issuing request/replies to the kernel."""
186
186
187 command_queue = None
187 command_queue = None
188 # flag for whether execute requests should be allowed to call raw_input:
188 # flag for whether execute requests should be allowed to call raw_input:
189 allow_stdin = True
189 allow_stdin = True
190 proxy_methods = [
190 proxy_methods = [
191 'execute',
191 'execute',
192 'complete',
192 'complete',
193 'inspect',
193 'inspect',
194 'history',
194 'history',
195 'kernel_info',
195 'kernel_info',
196 'shutdown',
196 'shutdown',
197 'is_complete',
197 ]
198 ]
198
199
199 def __init__(self, context, session, address):
200 def __init__(self, context, session, address):
200 super(ShellChannel, self).__init__(context, session, address)
201 super(ShellChannel, self).__init__(context, session, address)
201 self.ioloop = ioloop.IOLoop()
202 self.ioloop = ioloop.IOLoop()
202
203
203 def run(self):
204 def run(self):
204 """The thread's main activity. Call start() instead."""
205 """The thread's main activity. Call start() instead."""
205 self.socket = self.context.socket(zmq.DEALER)
206 self.socket = self.context.socket(zmq.DEALER)
206 self.socket.linger = 1000
207 self.socket.linger = 1000
207 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
208 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
208 self.socket.connect(self.address)
209 self.socket.connect(self.address)
209 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
210 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
210 self.stream.on_recv(self._handle_recv)
211 self.stream.on_recv(self._handle_recv)
211 self._run_loop()
212 self._run_loop()
212
213
213 def call_handlers(self, msg):
214 def call_handlers(self, msg):
214 """This method is called in the ioloop thread when a message arrives.
215 """This method is called in the ioloop thread when a message arrives.
215
216
216 Subclasses should override this method to handle incoming messages.
217 Subclasses should override this method to handle incoming messages.
217 It is important to remember that this method is called in the thread
218 It is important to remember that this method is called in the thread
218 so that some logic must be done to ensure that the application level
219 so that some logic must be done to ensure that the application level
219 handlers are called in the application thread.
220 handlers are called in the application thread.
220 """
221 """
221 raise NotImplementedError('call_handlers must be defined in a subclass.')
222 raise NotImplementedError('call_handlers must be defined in a subclass.')
222
223
223 def execute(self, code, silent=False, store_history=True,
224 def execute(self, code, silent=False, store_history=True,
224 user_expressions=None, allow_stdin=None):
225 user_expressions=None, allow_stdin=None):
225 """Execute code in the kernel.
226 """Execute code in the kernel.
226
227
227 Parameters
228 Parameters
228 ----------
229 ----------
229 code : str
230 code : str
230 A string of Python code.
231 A string of Python code.
231
232
232 silent : bool, optional (default False)
233 silent : bool, optional (default False)
233 If set, the kernel will execute the code as quietly possible, and
234 If set, the kernel will execute the code as quietly possible, and
234 will force store_history to be False.
235 will force store_history to be False.
235
236
236 store_history : bool, optional (default True)
237 store_history : bool, optional (default True)
237 If set, the kernel will store command history. This is forced
238 If set, the kernel will store command history. This is forced
238 to be False if silent is True.
239 to be False if silent is True.
239
240
240 user_expressions : dict, optional
241 user_expressions : dict, optional
241 A dict mapping names to expressions to be evaluated in the user's
242 A dict mapping names to expressions to be evaluated in the user's
242 dict. The expression values are returned as strings formatted using
243 dict. The expression values are returned as strings formatted using
243 :func:`repr`.
244 :func:`repr`.
244
245
245 allow_stdin : bool, optional (default self.allow_stdin)
246 allow_stdin : bool, optional (default self.allow_stdin)
246 Flag for whether the kernel can send stdin requests to frontends.
247 Flag for whether the kernel can send stdin requests to frontends.
247
248
248 Some frontends (e.g. the Notebook) do not support stdin requests.
249 Some frontends (e.g. the Notebook) do not support stdin requests.
249 If raw_input is called from code executed from such a frontend, a
250 If raw_input is called from code executed from such a frontend, a
250 StdinNotImplementedError will be raised.
251 StdinNotImplementedError will be raised.
251
252
252 Returns
253 Returns
253 -------
254 -------
254 The msg_id of the message sent.
255 The msg_id of the message sent.
255 """
256 """
256 if user_expressions is None:
257 if user_expressions is None:
257 user_expressions = {}
258 user_expressions = {}
258 if allow_stdin is None:
259 if allow_stdin is None:
259 allow_stdin = self.allow_stdin
260 allow_stdin = self.allow_stdin
260
261
261
262
262 # Don't waste network traffic if inputs are invalid
263 # Don't waste network traffic if inputs are invalid
263 if not isinstance(code, string_types):
264 if not isinstance(code, string_types):
264 raise ValueError('code %r must be a string' % code)
265 raise ValueError('code %r must be a string' % code)
265 validate_string_dict(user_expressions)
266 validate_string_dict(user_expressions)
266
267
267 # Create class for content/msg creation. Related to, but possibly
268 # Create class for content/msg creation. Related to, but possibly
268 # not in Session.
269 # not in Session.
269 content = dict(code=code, silent=silent, store_history=store_history,
270 content = dict(code=code, silent=silent, store_history=store_history,
270 user_expressions=user_expressions,
271 user_expressions=user_expressions,
271 allow_stdin=allow_stdin,
272 allow_stdin=allow_stdin,
272 )
273 )
273 msg = self.session.msg('execute_request', content)
274 msg = self.session.msg('execute_request', content)
274 self._queue_send(msg)
275 self._queue_send(msg)
275 return msg['header']['msg_id']
276 return msg['header']['msg_id']
276
277
277 def complete(self, code, cursor_pos=None):
278 def complete(self, code, cursor_pos=None):
278 """Tab complete text in the kernel's namespace.
279 """Tab complete text in the kernel's namespace.
279
280
280 Parameters
281 Parameters
281 ----------
282 ----------
282 code : str
283 code : str
283 The context in which completion is requested.
284 The context in which completion is requested.
284 Can be anything between a variable name and an entire cell.
285 Can be anything between a variable name and an entire cell.
285 cursor_pos : int, optional
286 cursor_pos : int, optional
286 The position of the cursor in the block of code where the completion was requested.
287 The position of the cursor in the block of code where the completion was requested.
287 Default: ``len(code)``
288 Default: ``len(code)``
288
289
289 Returns
290 Returns
290 -------
291 -------
291 The msg_id of the message sent.
292 The msg_id of the message sent.
292 """
293 """
293 if cursor_pos is None:
294 if cursor_pos is None:
294 cursor_pos = len(code)
295 cursor_pos = len(code)
295 content = dict(code=code, cursor_pos=cursor_pos)
296 content = dict(code=code, cursor_pos=cursor_pos)
296 msg = self.session.msg('complete_request', content)
297 msg = self.session.msg('complete_request', content)
297 self._queue_send(msg)
298 self._queue_send(msg)
298 return msg['header']['msg_id']
299 return msg['header']['msg_id']
299
300
300 def inspect(self, code, cursor_pos=None, detail_level=0):
301 def inspect(self, code, cursor_pos=None, detail_level=0):
301 """Get metadata information about an object in the kernel's namespace.
302 """Get metadata information about an object in the kernel's namespace.
302
303
303 It is up to the kernel to determine the appropriate object to inspect.
304 It is up to the kernel to determine the appropriate object to inspect.
304
305
305 Parameters
306 Parameters
306 ----------
307 ----------
307 code : str
308 code : str
308 The context in which info is requested.
309 The context in which info is requested.
309 Can be anything between a variable name and an entire cell.
310 Can be anything between a variable name and an entire cell.
310 cursor_pos : int, optional
311 cursor_pos : int, optional
311 The position of the cursor in the block of code where the info was requested.
312 The position of the cursor in the block of code where the info was requested.
312 Default: ``len(code)``
313 Default: ``len(code)``
313 detail_level : int, optional
314 detail_level : int, optional
314 The level of detail for the introspection (0-2)
315 The level of detail for the introspection (0-2)
315
316
316 Returns
317 Returns
317 -------
318 -------
318 The msg_id of the message sent.
319 The msg_id of the message sent.
319 """
320 """
320 if cursor_pos is None:
321 if cursor_pos is None:
321 cursor_pos = len(code)
322 cursor_pos = len(code)
322 content = dict(code=code, cursor_pos=cursor_pos,
323 content = dict(code=code, cursor_pos=cursor_pos,
323 detail_level=detail_level,
324 detail_level=detail_level,
324 )
325 )
325 msg = self.session.msg('inspect_request', content)
326 msg = self.session.msg('inspect_request', content)
326 self._queue_send(msg)
327 self._queue_send(msg)
327 return msg['header']['msg_id']
328 return msg['header']['msg_id']
328
329
329 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
330 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
330 """Get entries from the kernel's history list.
331 """Get entries from the kernel's history list.
331
332
332 Parameters
333 Parameters
333 ----------
334 ----------
334 raw : bool
335 raw : bool
335 If True, return the raw input.
336 If True, return the raw input.
336 output : bool
337 output : bool
337 If True, then return the output as well.
338 If True, then return the output as well.
338 hist_access_type : str
339 hist_access_type : str
339 'range' (fill in session, start and stop params), 'tail' (fill in n)
340 'range' (fill in session, start and stop params), 'tail' (fill in n)
340 or 'search' (fill in pattern param).
341 or 'search' (fill in pattern param).
341
342
342 session : int
343 session : int
343 For a range request, the session from which to get lines. Session
344 For a range request, the session from which to get lines. Session
344 numbers are positive integers; negative ones count back from the
345 numbers are positive integers; negative ones count back from the
345 current session.
346 current session.
346 start : int
347 start : int
347 The first line number of a history range.
348 The first line number of a history range.
348 stop : int
349 stop : int
349 The final (excluded) line number of a history range.
350 The final (excluded) line number of a history range.
350
351
351 n : int
352 n : int
352 The number of lines of history to get for a tail request.
353 The number of lines of history to get for a tail request.
353
354
354 pattern : str
355 pattern : str
355 The glob-syntax pattern for a search request.
356 The glob-syntax pattern for a search request.
356
357
357 Returns
358 Returns
358 -------
359 -------
359 The msg_id of the message sent.
360 The msg_id of the message sent.
360 """
361 """
361 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
362 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
362 **kwargs)
363 **kwargs)
363 msg = self.session.msg('history_request', content)
364 msg = self.session.msg('history_request', content)
364 self._queue_send(msg)
365 self._queue_send(msg)
365 return msg['header']['msg_id']
366 return msg['header']['msg_id']
366
367
367 def kernel_info(self):
368 def kernel_info(self):
368 """Request kernel info."""
369 """Request kernel info."""
369 msg = self.session.msg('kernel_info_request')
370 msg = self.session.msg('kernel_info_request')
370 self._queue_send(msg)
371 self._queue_send(msg)
371 return msg['header']['msg_id']
372 return msg['header']['msg_id']
372
373
373 def _handle_kernel_info_reply(self, msg):
374 def _handle_kernel_info_reply(self, msg):
374 """handle kernel info reply
375 """handle kernel info reply
375
376
376 sets protocol adaptation version
377 sets protocol adaptation version
377 """
378 """
378 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
379 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
379 if adapt_version != major_protocol_version:
380 if adapt_version != major_protocol_version:
380 self.session.adapt_version = adapt_version
381 self.session.adapt_version = adapt_version
381
382
382 def shutdown(self, restart=False):
383 def shutdown(self, restart=False):
383 """Request an immediate kernel shutdown.
384 """Request an immediate kernel shutdown.
384
385
385 Upon receipt of the (empty) reply, client code can safely assume that
386 Upon receipt of the (empty) reply, client code can safely assume that
386 the kernel has shut down and it's safe to forcefully terminate it if
387 the kernel has shut down and it's safe to forcefully terminate it if
387 it's still alive.
388 it's still alive.
388
389
389 The kernel will send the reply via a function registered with Python's
390 The kernel will send the reply via a function registered with Python's
390 atexit module, ensuring it's truly done as the kernel is done with all
391 atexit module, ensuring it's truly done as the kernel is done with all
391 normal operation.
392 normal operation.
392 """
393 """
393 # Send quit message to kernel. Once we implement kernel-side setattr,
394 # Send quit message to kernel. Once we implement kernel-side setattr,
394 # this should probably be done that way, but for now this will do.
395 # this should probably be done that way, but for now this will do.
395 msg = self.session.msg('shutdown_request', {'restart':restart})
396 msg = self.session.msg('shutdown_request', {'restart':restart})
396 self._queue_send(msg)
397 self._queue_send(msg)
397 return msg['header']['msg_id']
398 return msg['header']['msg_id']
398
399
400 def is_complete(self, code):
401 msg = self.session.msg('is_complete_request', {'code': code})
402 self._queue_send(msg)
403 return msg['header']['msg_id']
399
404
400
405
401 class IOPubChannel(ZMQSocketChannel):
406 class IOPubChannel(ZMQSocketChannel):
402 """The iopub channel which listens for messages that the kernel publishes.
407 """The iopub channel which listens for messages that the kernel publishes.
403
408
404 This channel is where all output is published to frontends.
409 This channel is where all output is published to frontends.
405 """
410 """
406
411
407 def __init__(self, context, session, address):
412 def __init__(self, context, session, address):
408 super(IOPubChannel, self).__init__(context, session, address)
413 super(IOPubChannel, self).__init__(context, session, address)
409 self.ioloop = ioloop.IOLoop()
414 self.ioloop = ioloop.IOLoop()
410
415
411 def run(self):
416 def run(self):
412 """The thread's main activity. Call start() instead."""
417 """The thread's main activity. Call start() instead."""
413 self.socket = self.context.socket(zmq.SUB)
418 self.socket = self.context.socket(zmq.SUB)
414 self.socket.linger = 1000
419 self.socket.linger = 1000
415 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
420 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
416 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
421 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
417 self.socket.connect(self.address)
422 self.socket.connect(self.address)
418 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
423 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
419 self.stream.on_recv(self._handle_recv)
424 self.stream.on_recv(self._handle_recv)
420 self._run_loop()
425 self._run_loop()
421
426
422 def call_handlers(self, msg):
427 def call_handlers(self, msg):
423 """This method is called in the ioloop thread when a message arrives.
428 """This method is called in the ioloop thread when a message arrives.
424
429
425 Subclasses should override this method to handle incoming messages.
430 Subclasses should override this method to handle incoming messages.
426 It is important to remember that this method is called in the thread
431 It is important to remember that this method is called in the thread
427 so that some logic must be done to ensure that the application leve
432 so that some logic must be done to ensure that the application leve
428 handlers are called in the application thread.
433 handlers are called in the application thread.
429 """
434 """
430 raise NotImplementedError('call_handlers must be defined in a subclass.')
435 raise NotImplementedError('call_handlers must be defined in a subclass.')
431
436
432 def flush(self, timeout=1.0):
437 def flush(self, timeout=1.0):
433 """Immediately processes all pending messages on the iopub channel.
438 """Immediately processes all pending messages on the iopub channel.
434
439
435 Callers should use this method to ensure that :meth:`call_handlers`
440 Callers should use this method to ensure that :meth:`call_handlers`
436 has been called for all messages that have been received on the
441 has been called for all messages that have been received on the
437 0MQ SUB socket of this channel.
442 0MQ SUB socket of this channel.
438
443
439 This method is thread safe.
444 This method is thread safe.
440
445
441 Parameters
446 Parameters
442 ----------
447 ----------
443 timeout : float, optional
448 timeout : float, optional
444 The maximum amount of time to spend flushing, in seconds. The
449 The maximum amount of time to spend flushing, in seconds. The
445 default is one second.
450 default is one second.
446 """
451 """
447 # We do the IOLoop callback process twice to ensure that the IOLoop
452 # We do the IOLoop callback process twice to ensure that the IOLoop
448 # gets to perform at least one full poll.
453 # gets to perform at least one full poll.
449 stop_time = time.time() + timeout
454 stop_time = time.time() + timeout
450 for i in range(2):
455 for i in range(2):
451 self._flushed = False
456 self._flushed = False
452 self.ioloop.add_callback(self._flush)
457 self.ioloop.add_callback(self._flush)
453 while not self._flushed and time.time() < stop_time:
458 while not self._flushed and time.time() < stop_time:
454 time.sleep(0.01)
459 time.sleep(0.01)
455
460
456 def _flush(self):
461 def _flush(self):
457 """Callback for :method:`self.flush`."""
462 """Callback for :method:`self.flush`."""
458 self.stream.flush()
463 self.stream.flush()
459 self._flushed = True
464 self._flushed = True
460
465
461
466
462 class StdInChannel(ZMQSocketChannel):
467 class StdInChannel(ZMQSocketChannel):
463 """The stdin channel to handle raw_input requests that the kernel makes."""
468 """The stdin channel to handle raw_input requests that the kernel makes."""
464
469
465 msg_queue = None
470 msg_queue = None
466 proxy_methods = ['input']
471 proxy_methods = ['input']
467
472
468 def __init__(self, context, session, address):
473 def __init__(self, context, session, address):
469 super(StdInChannel, self).__init__(context, session, address)
474 super(StdInChannel, self).__init__(context, session, address)
470 self.ioloop = ioloop.IOLoop()
475 self.ioloop = ioloop.IOLoop()
471
476
472 def run(self):
477 def run(self):
473 """The thread's main activity. Call start() instead."""
478 """The thread's main activity. Call start() instead."""
474 self.socket = self.context.socket(zmq.DEALER)
479 self.socket = self.context.socket(zmq.DEALER)
475 self.socket.linger = 1000
480 self.socket.linger = 1000
476 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
481 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
477 self.socket.connect(self.address)
482 self.socket.connect(self.address)
478 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
483 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
479 self.stream.on_recv(self._handle_recv)
484 self.stream.on_recv(self._handle_recv)
480 self._run_loop()
485 self._run_loop()
481
486
482 def call_handlers(self, msg):
487 def call_handlers(self, msg):
483 """This method is called in the ioloop thread when a message arrives.
488 """This method is called in the ioloop thread when a message arrives.
484
489
485 Subclasses should override this method to handle incoming messages.
490 Subclasses should override this method to handle incoming messages.
486 It is important to remember that this method is called in the thread
491 It is important to remember that this method is called in the thread
487 so that some logic must be done to ensure that the application leve
492 so that some logic must be done to ensure that the application leve
488 handlers are called in the application thread.
493 handlers are called in the application thread.
489 """
494 """
490 raise NotImplementedError('call_handlers must be defined in a subclass.')
495 raise NotImplementedError('call_handlers must be defined in a subclass.')
491
496
492 def input(self, string):
497 def input(self, string):
493 """Send a string of raw input to the kernel."""
498 """Send a string of raw input to the kernel."""
494 content = dict(value=string)
499 content = dict(value=string)
495 msg = self.session.msg('input_reply', content)
500 msg = self.session.msg('input_reply', content)
496 self._queue_send(msg)
501 self._queue_send(msg)
497
502
498
503
499 class HBChannel(ZMQSocketChannel):
504 class HBChannel(ZMQSocketChannel):
500 """The heartbeat channel which monitors the kernel heartbeat.
505 """The heartbeat channel which monitors the kernel heartbeat.
501
506
502 Note that the heartbeat channel is paused by default. As long as you start
507 Note that the heartbeat channel is paused by default. As long as you start
503 this channel, the kernel manager will ensure that it is paused and un-paused
508 this channel, the kernel manager will ensure that it is paused and un-paused
504 as appropriate.
509 as appropriate.
505 """
510 """
506
511
507 time_to_dead = 3.0
512 time_to_dead = 3.0
508 socket = None
513 socket = None
509 poller = None
514 poller = None
510 _running = None
515 _running = None
511 _pause = None
516 _pause = None
512 _beating = None
517 _beating = None
513
518
514 def __init__(self, context, session, address):
519 def __init__(self, context, session, address):
515 super(HBChannel, self).__init__(context, session, address)
520 super(HBChannel, self).__init__(context, session, address)
516 self._running = False
521 self._running = False
517 self._pause =True
522 self._pause =True
518 self.poller = zmq.Poller()
523 self.poller = zmq.Poller()
519
524
520 def _create_socket(self):
525 def _create_socket(self):
521 if self.socket is not None:
526 if self.socket is not None:
522 # close previous socket, before opening a new one
527 # close previous socket, before opening a new one
523 self.poller.unregister(self.socket)
528 self.poller.unregister(self.socket)
524 self.socket.close()
529 self.socket.close()
525 self.socket = self.context.socket(zmq.REQ)
530 self.socket = self.context.socket(zmq.REQ)
526 self.socket.linger = 1000
531 self.socket.linger = 1000
527 self.socket.connect(self.address)
532 self.socket.connect(self.address)
528
533
529 self.poller.register(self.socket, zmq.POLLIN)
534 self.poller.register(self.socket, zmq.POLLIN)
530
535
531 def _poll(self, start_time):
536 def _poll(self, start_time):
532 """poll for heartbeat replies until we reach self.time_to_dead.
537 """poll for heartbeat replies until we reach self.time_to_dead.
533
538
534 Ignores interrupts, and returns the result of poll(), which
539 Ignores interrupts, and returns the result of poll(), which
535 will be an empty list if no messages arrived before the timeout,
540 will be an empty list if no messages arrived before the timeout,
536 or the event tuple if there is a message to receive.
541 or the event tuple if there is a message to receive.
537 """
542 """
538
543
539 until_dead = self.time_to_dead - (time.time() - start_time)
544 until_dead = self.time_to_dead - (time.time() - start_time)
540 # ensure poll at least once
545 # ensure poll at least once
541 until_dead = max(until_dead, 1e-3)
546 until_dead = max(until_dead, 1e-3)
542 events = []
547 events = []
543 while True:
548 while True:
544 try:
549 try:
545 events = self.poller.poll(1000 * until_dead)
550 events = self.poller.poll(1000 * until_dead)
546 except ZMQError as e:
551 except ZMQError as e:
547 if e.errno == errno.EINTR:
552 if e.errno == errno.EINTR:
548 # ignore interrupts during heartbeat
553 # ignore interrupts during heartbeat
549 # this may never actually happen
554 # this may never actually happen
550 until_dead = self.time_to_dead - (time.time() - start_time)
555 until_dead = self.time_to_dead - (time.time() - start_time)
551 until_dead = max(until_dead, 1e-3)
556 until_dead = max(until_dead, 1e-3)
552 pass
557 pass
553 else:
558 else:
554 raise
559 raise
555 except Exception:
560 except Exception:
556 if self._exiting:
561 if self._exiting:
557 break
562 break
558 else:
563 else:
559 raise
564 raise
560 else:
565 else:
561 break
566 break
562 return events
567 return events
563
568
564 def run(self):
569 def run(self):
565 """The thread's main activity. Call start() instead."""
570 """The thread's main activity. Call start() instead."""
566 self._create_socket()
571 self._create_socket()
567 self._running = True
572 self._running = True
568 self._beating = True
573 self._beating = True
569
574
570 while self._running:
575 while self._running:
571 if self._pause:
576 if self._pause:
572 # just sleep, and skip the rest of the loop
577 # just sleep, and skip the rest of the loop
573 time.sleep(self.time_to_dead)
578 time.sleep(self.time_to_dead)
574 continue
579 continue
575
580
576 since_last_heartbeat = 0.0
581 since_last_heartbeat = 0.0
577 # io.rprint('Ping from HB channel') # dbg
582 # io.rprint('Ping from HB channel') # dbg
578 # no need to catch EFSM here, because the previous event was
583 # no need to catch EFSM here, because the previous event was
579 # either a recv or connect, which cannot be followed by EFSM
584 # either a recv or connect, which cannot be followed by EFSM
580 self.socket.send(b'ping')
585 self.socket.send(b'ping')
581 request_time = time.time()
586 request_time = time.time()
582 ready = self._poll(request_time)
587 ready = self._poll(request_time)
583 if ready:
588 if ready:
584 self._beating = True
589 self._beating = True
585 # the poll above guarantees we have something to recv
590 # the poll above guarantees we have something to recv
586 self.socket.recv()
591 self.socket.recv()
587 # sleep the remainder of the cycle
592 # sleep the remainder of the cycle
588 remainder = self.time_to_dead - (time.time() - request_time)
593 remainder = self.time_to_dead - (time.time() - request_time)
589 if remainder > 0:
594 if remainder > 0:
590 time.sleep(remainder)
595 time.sleep(remainder)
591 continue
596 continue
592 else:
597 else:
593 # nothing was received within the time limit, signal heart failure
598 # nothing was received within the time limit, signal heart failure
594 self._beating = False
599 self._beating = False
595 since_last_heartbeat = time.time() - request_time
600 since_last_heartbeat = time.time() - request_time
596 self.call_handlers(since_last_heartbeat)
601 self.call_handlers(since_last_heartbeat)
597 # and close/reopen the socket, because the REQ/REP cycle has been broken
602 # and close/reopen the socket, because the REQ/REP cycle has been broken
598 self._create_socket()
603 self._create_socket()
599 continue
604 continue
600
605
601 def pause(self):
606 def pause(self):
602 """Pause the heartbeat."""
607 """Pause the heartbeat."""
603 self._pause = True
608 self._pause = True
604
609
605 def unpause(self):
610 def unpause(self):
606 """Unpause the heartbeat."""
611 """Unpause the heartbeat."""
607 self._pause = False
612 self._pause = False
608
613
609 def is_beating(self):
614 def is_beating(self):
610 """Is the heartbeat running and responsive (and not paused)."""
615 """Is the heartbeat running and responsive (and not paused)."""
611 if self.is_alive() and not self._pause and self._beating:
616 if self.is_alive() and not self._pause and self._beating:
612 return True
617 return True
613 else:
618 else:
614 return False
619 return False
615
620
616 def stop(self):
621 def stop(self):
617 """Stop the channel's event loop and join its thread."""
622 """Stop the channel's event loop and join its thread."""
618 self._running = False
623 self._running = False
619 super(HBChannel, self).stop()
624 super(HBChannel, self).stop()
620
625
621 def call_handlers(self, since_last_heartbeat):
626 def call_handlers(self, since_last_heartbeat):
622 """This method is called in the ioloop thread when a message arrives.
627 """This method is called in the ioloop thread when a message arrives.
623
628
624 Subclasses should override this method to handle incoming messages.
629 Subclasses should override this method to handle incoming messages.
625 It is important to remember that this method is called in the thread
630 It is important to remember that this method is called in the thread
626 so that some logic must be done to ensure that the application level
631 so that some logic must be done to ensure that the application level
627 handlers are called in the application thread.
632 handlers are called in the application thread.
628 """
633 """
629 raise NotImplementedError('call_handlers must be defined in a subclass.')
634 raise NotImplementedError('call_handlers must be defined in a subclass.')
630
635
631
636
632 #---------------------------------------------------------------------#-----------------------------------------------------------------------------
637 #---------------------------------------------------------------------#-----------------------------------------------------------------------------
633 # ABC Registration
638 # ABC Registration
634 #-----------------------------------------------------------------------------
639 #-----------------------------------------------------------------------------
635
640
636 ShellChannelABC.register(ShellChannel)
641 ShellChannelABC.register(ShellChannel)
637 IOPubChannelABC.register(IOPubChannel)
642 IOPubChannelABC.register(IOPubChannel)
638 HBChannelABC.register(HBChannel)
643 HBChannelABC.register(HBChannel)
639 StdInChannelABC.register(StdInChannel)
644 StdInChannelABC.register(StdInChannel)
@@ -1,409 +1,420 b''
1 """Test suite for our zeromq-based message specification."""
1 """Test suite for our zeromq-based message specification."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import re
6 import re
7 from distutils.version import LooseVersion as V
7 from distutils.version import LooseVersion as V
8 from subprocess import PIPE
8 from subprocess import PIPE
9 try:
9 try:
10 from queue import Empty # Py 3
10 from queue import Empty # Py 3
11 except ImportError:
11 except ImportError:
12 from Queue import Empty # Py 2
12 from Queue import Empty # Py 2
13
13
14 import nose.tools as nt
14 import nose.tools as nt
15
15
16 from IPython.kernel import KernelManager
16 from IPython.kernel import KernelManager
17
17
18 from IPython.utils.traitlets import (
18 from IPython.utils.traitlets import (
19 HasTraits, TraitError, Bool, Unicode, Dict, Integer, List, Enum, Any,
19 HasTraits, TraitError, Bool, Unicode, Dict, Integer, List, Enum, Any,
20 )
20 )
21 from IPython.utils.py3compat import string_types, iteritems
21 from IPython.utils.py3compat import string_types, iteritems
22
22
23 from .utils import TIMEOUT, start_global_kernel, flush_channels, execute
23 from .utils import TIMEOUT, start_global_kernel, flush_channels, execute
24
24
25 #-----------------------------------------------------------------------------
25 #-----------------------------------------------------------------------------
26 # Globals
26 # Globals
27 #-----------------------------------------------------------------------------
27 #-----------------------------------------------------------------------------
28 KC = None
28 KC = None
29
29
30 def setup():
30 def setup():
31 global KC
31 global KC
32 KC = start_global_kernel()
32 KC = start_global_kernel()
33
33
34 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
35 # Message Spec References
35 # Message Spec References
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37
37
38 class Reference(HasTraits):
38 class Reference(HasTraits):
39
39
40 """
40 """
41 Base class for message spec specification testing.
41 Base class for message spec specification testing.
42
42
43 This class is the core of the message specification test. The
43 This class is the core of the message specification test. The
44 idea is that child classes implement trait attributes for each
44 idea is that child classes implement trait attributes for each
45 message keys, so that message keys can be tested against these
45 message keys, so that message keys can be tested against these
46 traits using :meth:`check` method.
46 traits using :meth:`check` method.
47
47
48 """
48 """
49
49
50 def check(self, d):
50 def check(self, d):
51 """validate a dict against our traits"""
51 """validate a dict against our traits"""
52 for key in self.trait_names():
52 for key in self.trait_names():
53 nt.assert_in(key, d)
53 nt.assert_in(key, d)
54 # FIXME: always allow None, probably not a good idea
54 # FIXME: always allow None, probably not a good idea
55 if d[key] is None:
55 if d[key] is None:
56 continue
56 continue
57 try:
57 try:
58 setattr(self, key, d[key])
58 setattr(self, key, d[key])
59 except TraitError as e:
59 except TraitError as e:
60 assert False, str(e)
60 assert False, str(e)
61
61
62
62
63 class Version(Unicode):
63 class Version(Unicode):
64 def __init__(self, *args, **kwargs):
64 def __init__(self, *args, **kwargs):
65 self.min = kwargs.pop('min', None)
65 self.min = kwargs.pop('min', None)
66 self.max = kwargs.pop('max', None)
66 self.max = kwargs.pop('max', None)
67 kwargs['default_value'] = self.min
67 kwargs['default_value'] = self.min
68 super(Version, self).__init__(*args, **kwargs)
68 super(Version, self).__init__(*args, **kwargs)
69
69
70 def validate(self, obj, value):
70 def validate(self, obj, value):
71 if self.min and V(value) < V(self.min):
71 if self.min and V(value) < V(self.min):
72 raise TraitError("bad version: %s < %s" % (value, self.min))
72 raise TraitError("bad version: %s < %s" % (value, self.min))
73 if self.max and (V(value) > V(self.max)):
73 if self.max and (V(value) > V(self.max)):
74 raise TraitError("bad version: %s > %s" % (value, self.max))
74 raise TraitError("bad version: %s > %s" % (value, self.max))
75
75
76
76
77 class RMessage(Reference):
77 class RMessage(Reference):
78 msg_id = Unicode()
78 msg_id = Unicode()
79 msg_type = Unicode()
79 msg_type = Unicode()
80 header = Dict()
80 header = Dict()
81 parent_header = Dict()
81 parent_header = Dict()
82 content = Dict()
82 content = Dict()
83
83
84 def check(self, d):
84 def check(self, d):
85 super(RMessage, self).check(d)
85 super(RMessage, self).check(d)
86 RHeader().check(self.header)
86 RHeader().check(self.header)
87 if self.parent_header:
87 if self.parent_header:
88 RHeader().check(self.parent_header)
88 RHeader().check(self.parent_header)
89
89
90 class RHeader(Reference):
90 class RHeader(Reference):
91 msg_id = Unicode()
91 msg_id = Unicode()
92 msg_type = Unicode()
92 msg_type = Unicode()
93 session = Unicode()
93 session = Unicode()
94 username = Unicode()
94 username = Unicode()
95 version = Version(min='5.0')
95 version = Version(min='5.0')
96
96
97 mime_pat = re.compile(r'^[\w\-\+\.]+/[\w\-\+\.]+$')
97 mime_pat = re.compile(r'^[\w\-\+\.]+/[\w\-\+\.]+$')
98
98
99 class MimeBundle(Reference):
99 class MimeBundle(Reference):
100 metadata = Dict()
100 metadata = Dict()
101 data = Dict()
101 data = Dict()
102 def _data_changed(self, name, old, new):
102 def _data_changed(self, name, old, new):
103 for k,v in iteritems(new):
103 for k,v in iteritems(new):
104 assert mime_pat.match(k)
104 assert mime_pat.match(k)
105 nt.assert_is_instance(v, string_types)
105 nt.assert_is_instance(v, string_types)
106
106
107 # shell replies
107 # shell replies
108
108
109 class ExecuteReply(Reference):
109 class ExecuteReply(Reference):
110 execution_count = Integer()
110 execution_count = Integer()
111 status = Enum((u'ok', u'error'))
111 status = Enum((u'ok', u'error'))
112
112
113 def check(self, d):
113 def check(self, d):
114 Reference.check(self, d)
114 Reference.check(self, d)
115 if d['status'] == 'ok':
115 if d['status'] == 'ok':
116 ExecuteReplyOkay().check(d)
116 ExecuteReplyOkay().check(d)
117 elif d['status'] == 'error':
117 elif d['status'] == 'error':
118 ExecuteReplyError().check(d)
118 ExecuteReplyError().check(d)
119
119
120
120
121 class ExecuteReplyOkay(Reference):
121 class ExecuteReplyOkay(Reference):
122 payload = List(Dict)
122 payload = List(Dict)
123 user_expressions = Dict()
123 user_expressions = Dict()
124
124
125
125
126 class ExecuteReplyError(Reference):
126 class ExecuteReplyError(Reference):
127 ename = Unicode()
127 ename = Unicode()
128 evalue = Unicode()
128 evalue = Unicode()
129 traceback = List(Unicode)
129 traceback = List(Unicode)
130
130
131
131
132 class InspectReply(MimeBundle):
132 class InspectReply(MimeBundle):
133 found = Bool()
133 found = Bool()
134
134
135
135
136 class ArgSpec(Reference):
136 class ArgSpec(Reference):
137 args = List(Unicode)
137 args = List(Unicode)
138 varargs = Unicode()
138 varargs = Unicode()
139 varkw = Unicode()
139 varkw = Unicode()
140 defaults = List()
140 defaults = List()
141
141
142
142
143 class Status(Reference):
143 class Status(Reference):
144 execution_state = Enum((u'busy', u'idle', u'starting'))
144 execution_state = Enum((u'busy', u'idle', u'starting'))
145
145
146
146
147 class CompleteReply(Reference):
147 class CompleteReply(Reference):
148 matches = List(Unicode)
148 matches = List(Unicode)
149 cursor_start = Integer()
149 cursor_start = Integer()
150 cursor_end = Integer()
150 cursor_end = Integer()
151 status = Unicode()
151 status = Unicode()
152
152
153
153
154 class KernelInfoReply(Reference):
154 class KernelInfoReply(Reference):
155 protocol_version = Version(min='5.0')
155 protocol_version = Version(min='5.0')
156 implementation = Unicode('ipython')
156 implementation = Unicode('ipython')
157 implementation_version = Version(min='2.1')
157 implementation_version = Version(min='2.1')
158 language_version = Version(min='2.7')
158 language_version = Version(min='2.7')
159 language = Unicode('python')
159 language = Unicode('python')
160 banner = Unicode()
160 banner = Unicode()
161
161
162
162
163 class IsCompleteReply(Reference):
164 complete = Bool()
165
166
163 # IOPub messages
167 # IOPub messages
164
168
165 class ExecuteInput(Reference):
169 class ExecuteInput(Reference):
166 code = Unicode()
170 code = Unicode()
167 execution_count = Integer()
171 execution_count = Integer()
168
172
169
173
170 Error = ExecuteReplyError
174 Error = ExecuteReplyError
171
175
172
176
173 class Stream(Reference):
177 class Stream(Reference):
174 name = Enum((u'stdout', u'stderr'))
178 name = Enum((u'stdout', u'stderr'))
175 data = Unicode()
179 data = Unicode()
176
180
177
181
178 class DisplayData(MimeBundle):
182 class DisplayData(MimeBundle):
179 pass
183 pass
180
184
181
185
182 class ExecuteResult(MimeBundle):
186 class ExecuteResult(MimeBundle):
183 execution_count = Integer()
187 execution_count = Integer()
184
188
185
189
186 references = {
190 references = {
187 'execute_reply' : ExecuteReply(),
191 'execute_reply' : ExecuteReply(),
188 'inspect_reply' : InspectReply(),
192 'inspect_reply' : InspectReply(),
189 'status' : Status(),
193 'status' : Status(),
190 'complete_reply' : CompleteReply(),
194 'complete_reply' : CompleteReply(),
191 'kernel_info_reply': KernelInfoReply(),
195 'kernel_info_reply': KernelInfoReply(),
196 'is_complete_reply': IsCompleteReply(),
192 'execute_input' : ExecuteInput(),
197 'execute_input' : ExecuteInput(),
193 'execute_result' : ExecuteResult(),
198 'execute_result' : ExecuteResult(),
194 'error' : Error(),
199 'error' : Error(),
195 'stream' : Stream(),
200 'stream' : Stream(),
196 'display_data' : DisplayData(),
201 'display_data' : DisplayData(),
197 'header' : RHeader(),
202 'header' : RHeader(),
198 }
203 }
199 """
204 """
200 Specifications of `content` part of the reply messages.
205 Specifications of `content` part of the reply messages.
201 """
206 """
202
207
203
208
204 def validate_message(msg, msg_type=None, parent=None):
209 def validate_message(msg, msg_type=None, parent=None):
205 """validate a message
210 """validate a message
206
211
207 This is a generator, and must be iterated through to actually
212 This is a generator, and must be iterated through to actually
208 trigger each test.
213 trigger each test.
209
214
210 If msg_type and/or parent are given, the msg_type and/or parent msg_id
215 If msg_type and/or parent are given, the msg_type and/or parent msg_id
211 are compared with the given values.
216 are compared with the given values.
212 """
217 """
213 RMessage().check(msg)
218 RMessage().check(msg)
214 if msg_type:
219 if msg_type:
215 nt.assert_equal(msg['msg_type'], msg_type)
220 nt.assert_equal(msg['msg_type'], msg_type)
216 if parent:
221 if parent:
217 nt.assert_equal(msg['parent_header']['msg_id'], parent)
222 nt.assert_equal(msg['parent_header']['msg_id'], parent)
218 content = msg['content']
223 content = msg['content']
219 ref = references[msg['msg_type']]
224 ref = references[msg['msg_type']]
220 ref.check(content)
225 ref.check(content)
221
226
222
227
223 #-----------------------------------------------------------------------------
228 #-----------------------------------------------------------------------------
224 # Tests
229 # Tests
225 #-----------------------------------------------------------------------------
230 #-----------------------------------------------------------------------------
226
231
227 # Shell channel
232 # Shell channel
228
233
229 def test_execute():
234 def test_execute():
230 flush_channels()
235 flush_channels()
231
236
232 msg_id = KC.execute(code='x=1')
237 msg_id = KC.execute(code='x=1')
233 reply = KC.get_shell_msg(timeout=TIMEOUT)
238 reply = KC.get_shell_msg(timeout=TIMEOUT)
234 validate_message(reply, 'execute_reply', msg_id)
239 validate_message(reply, 'execute_reply', msg_id)
235
240
236
241
237 def test_execute_silent():
242 def test_execute_silent():
238 flush_channels()
243 flush_channels()
239 msg_id, reply = execute(code='x=1', silent=True)
244 msg_id, reply = execute(code='x=1', silent=True)
240
245
241 # flush status=idle
246 # flush status=idle
242 status = KC.iopub_channel.get_msg(timeout=TIMEOUT)
247 status = KC.iopub_channel.get_msg(timeout=TIMEOUT)
243 validate_message(status, 'status', msg_id)
248 validate_message(status, 'status', msg_id)
244 nt.assert_equal(status['content']['execution_state'], 'idle')
249 nt.assert_equal(status['content']['execution_state'], 'idle')
245
250
246 nt.assert_raises(Empty, KC.iopub_channel.get_msg, timeout=0.1)
251 nt.assert_raises(Empty, KC.iopub_channel.get_msg, timeout=0.1)
247 count = reply['execution_count']
252 count = reply['execution_count']
248
253
249 msg_id, reply = execute(code='x=2', silent=True)
254 msg_id, reply = execute(code='x=2', silent=True)
250
255
251 # flush status=idle
256 # flush status=idle
252 status = KC.iopub_channel.get_msg(timeout=TIMEOUT)
257 status = KC.iopub_channel.get_msg(timeout=TIMEOUT)
253 validate_message(status, 'status', msg_id)
258 validate_message(status, 'status', msg_id)
254 nt.assert_equal(status['content']['execution_state'], 'idle')
259 nt.assert_equal(status['content']['execution_state'], 'idle')
255
260
256 nt.assert_raises(Empty, KC.iopub_channel.get_msg, timeout=0.1)
261 nt.assert_raises(Empty, KC.iopub_channel.get_msg, timeout=0.1)
257 count_2 = reply['execution_count']
262 count_2 = reply['execution_count']
258 nt.assert_equal(count_2, count)
263 nt.assert_equal(count_2, count)
259
264
260
265
261 def test_execute_error():
266 def test_execute_error():
262 flush_channels()
267 flush_channels()
263
268
264 msg_id, reply = execute(code='1/0')
269 msg_id, reply = execute(code='1/0')
265 nt.assert_equal(reply['status'], 'error')
270 nt.assert_equal(reply['status'], 'error')
266 nt.assert_equal(reply['ename'], 'ZeroDivisionError')
271 nt.assert_equal(reply['ename'], 'ZeroDivisionError')
267
272
268 error = KC.iopub_channel.get_msg(timeout=TIMEOUT)
273 error = KC.iopub_channel.get_msg(timeout=TIMEOUT)
269 validate_message(error, 'error', msg_id)
274 validate_message(error, 'error', msg_id)
270
275
271
276
272 def test_execute_inc():
277 def test_execute_inc():
273 """execute request should increment execution_count"""
278 """execute request should increment execution_count"""
274 flush_channels()
279 flush_channels()
275
280
276 msg_id, reply = execute(code='x=1')
281 msg_id, reply = execute(code='x=1')
277 count = reply['execution_count']
282 count = reply['execution_count']
278
283
279 flush_channels()
284 flush_channels()
280
285
281 msg_id, reply = execute(code='x=2')
286 msg_id, reply = execute(code='x=2')
282 count_2 = reply['execution_count']
287 count_2 = reply['execution_count']
283 nt.assert_equal(count_2, count+1)
288 nt.assert_equal(count_2, count+1)
284
289
285
290
286 def test_user_expressions():
291 def test_user_expressions():
287 flush_channels()
292 flush_channels()
288
293
289 msg_id, reply = execute(code='x=1', user_expressions=dict(foo='x+1'))
294 msg_id, reply = execute(code='x=1', user_expressions=dict(foo='x+1'))
290 user_expressions = reply['user_expressions']
295 user_expressions = reply['user_expressions']
291 nt.assert_equal(user_expressions, {u'foo': {
296 nt.assert_equal(user_expressions, {u'foo': {
292 u'status': u'ok',
297 u'status': u'ok',
293 u'data': {u'text/plain': u'2'},
298 u'data': {u'text/plain': u'2'},
294 u'metadata': {},
299 u'metadata': {},
295 }})
300 }})
296
301
297
302
298 def test_user_expressions_fail():
303 def test_user_expressions_fail():
299 flush_channels()
304 flush_channels()
300
305
301 msg_id, reply = execute(code='x=0', user_expressions=dict(foo='nosuchname'))
306 msg_id, reply = execute(code='x=0', user_expressions=dict(foo='nosuchname'))
302 user_expressions = reply['user_expressions']
307 user_expressions = reply['user_expressions']
303 foo = user_expressions['foo']
308 foo = user_expressions['foo']
304 nt.assert_equal(foo['status'], 'error')
309 nt.assert_equal(foo['status'], 'error')
305 nt.assert_equal(foo['ename'], 'NameError')
310 nt.assert_equal(foo['ename'], 'NameError')
306
311
307
312
308 def test_oinfo():
313 def test_oinfo():
309 flush_channels()
314 flush_channels()
310
315
311 msg_id = KC.inspect('a')
316 msg_id = KC.inspect('a')
312 reply = KC.get_shell_msg(timeout=TIMEOUT)
317 reply = KC.get_shell_msg(timeout=TIMEOUT)
313 validate_message(reply, 'inspect_reply', msg_id)
318 validate_message(reply, 'inspect_reply', msg_id)
314
319
315
320
316 def test_oinfo_found():
321 def test_oinfo_found():
317 flush_channels()
322 flush_channels()
318
323
319 msg_id, reply = execute(code='a=5')
324 msg_id, reply = execute(code='a=5')
320
325
321 msg_id = KC.inspect('a')
326 msg_id = KC.inspect('a')
322 reply = KC.get_shell_msg(timeout=TIMEOUT)
327 reply = KC.get_shell_msg(timeout=TIMEOUT)
323 validate_message(reply, 'inspect_reply', msg_id)
328 validate_message(reply, 'inspect_reply', msg_id)
324 content = reply['content']
329 content = reply['content']
325 assert content['found']
330 assert content['found']
326 text = content['data']['text/plain']
331 text = content['data']['text/plain']
327 nt.assert_in('Type:', text)
332 nt.assert_in('Type:', text)
328 nt.assert_in('Docstring:', text)
333 nt.assert_in('Docstring:', text)
329
334
330
335
331 def test_oinfo_detail():
336 def test_oinfo_detail():
332 flush_channels()
337 flush_channels()
333
338
334 msg_id, reply = execute(code='ip=get_ipython()')
339 msg_id, reply = execute(code='ip=get_ipython()')
335
340
336 msg_id = KC.inspect('ip.object_inspect', cursor_pos=10, detail_level=1)
341 msg_id = KC.inspect('ip.object_inspect', cursor_pos=10, detail_level=1)
337 reply = KC.get_shell_msg(timeout=TIMEOUT)
342 reply = KC.get_shell_msg(timeout=TIMEOUT)
338 validate_message(reply, 'inspect_reply', msg_id)
343 validate_message(reply, 'inspect_reply', msg_id)
339 content = reply['content']
344 content = reply['content']
340 assert content['found']
345 assert content['found']
341 text = content['data']['text/plain']
346 text = content['data']['text/plain']
342 nt.assert_in('Definition:', text)
347 nt.assert_in('Definition:', text)
343 nt.assert_in('Source:', text)
348 nt.assert_in('Source:', text)
344
349
345
350
346 def test_oinfo_not_found():
351 def test_oinfo_not_found():
347 flush_channels()
352 flush_channels()
348
353
349 msg_id = KC.inspect('dne')
354 msg_id = KC.inspect('dne')
350 reply = KC.get_shell_msg(timeout=TIMEOUT)
355 reply = KC.get_shell_msg(timeout=TIMEOUT)
351 validate_message(reply, 'inspect_reply', msg_id)
356 validate_message(reply, 'inspect_reply', msg_id)
352 content = reply['content']
357 content = reply['content']
353 nt.assert_false(content['found'])
358 nt.assert_false(content['found'])
354
359
355
360
356 def test_complete():
361 def test_complete():
357 flush_channels()
362 flush_channels()
358
363
359 msg_id, reply = execute(code="alpha = albert = 5")
364 msg_id, reply = execute(code="alpha = albert = 5")
360
365
361 msg_id = KC.complete('al', 2)
366 msg_id = KC.complete('al', 2)
362 reply = KC.get_shell_msg(timeout=TIMEOUT)
367 reply = KC.get_shell_msg(timeout=TIMEOUT)
363 validate_message(reply, 'complete_reply', msg_id)
368 validate_message(reply, 'complete_reply', msg_id)
364 matches = reply['content']['matches']
369 matches = reply['content']['matches']
365 for name in ('alpha', 'albert'):
370 for name in ('alpha', 'albert'):
366 nt.assert_in(name, matches)
371 nt.assert_in(name, matches)
367
372
368
373
369 def test_kernel_info_request():
374 def test_kernel_info_request():
370 flush_channels()
375 flush_channels()
371
376
372 msg_id = KC.kernel_info()
377 msg_id = KC.kernel_info()
373 reply = KC.get_shell_msg(timeout=TIMEOUT)
378 reply = KC.get_shell_msg(timeout=TIMEOUT)
374 validate_message(reply, 'kernel_info_reply', msg_id)
379 validate_message(reply, 'kernel_info_reply', msg_id)
375
380
376
381
377 def test_single_payload():
382 def test_single_payload():
378 flush_channels()
383 flush_channels()
379 msg_id, reply = execute(code="for i in range(3):\n"+
384 msg_id, reply = execute(code="for i in range(3):\n"+
380 " x=range?\n")
385 " x=range?\n")
381 payload = reply['payload']
386 payload = reply['payload']
382 next_input_pls = [pl for pl in payload if pl["source"] == "set_next_input"]
387 next_input_pls = [pl for pl in payload if pl["source"] == "set_next_input"]
383 nt.assert_equal(len(next_input_pls), 1)
388 nt.assert_equal(len(next_input_pls), 1)
384
389
390 def test_is_complete():
391 flush_channels()
392
393 msg_id = KC.is_complete("a = 1")
394 reply = KC.get_shell_msg(timeout=TIMEOUT)
395 validate_message(reply, 'is_complete_reply', msg_id)
385
396
386 # IOPub channel
397 # IOPub channel
387
398
388
399
389 def test_stream():
400 def test_stream():
390 flush_channels()
401 flush_channels()
391
402
392 msg_id, reply = execute("print('hi')")
403 msg_id, reply = execute("print('hi')")
393
404
394 stdout = KC.iopub_channel.get_msg(timeout=TIMEOUT)
405 stdout = KC.iopub_channel.get_msg(timeout=TIMEOUT)
395 validate_message(stdout, 'stream', msg_id)
406 validate_message(stdout, 'stream', msg_id)
396 content = stdout['content']
407 content = stdout['content']
397 nt.assert_equal(content['data'], u'hi\n')
408 nt.assert_equal(content['data'], u'hi\n')
398
409
399
410
400 def test_display_data():
411 def test_display_data():
401 flush_channels()
412 flush_channels()
402
413
403 msg_id, reply = execute("from IPython.core.display import display; display(1)")
414 msg_id, reply = execute("from IPython.core.display import display; display(1)")
404
415
405 display = KC.iopub_channel.get_msg(timeout=TIMEOUT)
416 display = KC.iopub_channel.get_msg(timeout=TIMEOUT)
406 validate_message(display, 'display_data', parent=msg_id)
417 validate_message(display, 'display_data', parent=msg_id)
407 data = display['content']['data']
418 data = display['content']['data']
408 nt.assert_equal(data['text/plain'], u'1')
419 nt.assert_equal(data['text/plain'], u'1')
409
420
@@ -1,677 +1,693 b''
1 """Base class for a kernel that talks to frontends over 0MQ."""
1 """Base class for a kernel that talks to frontends over 0MQ."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from __future__ import print_function
6 from __future__ import print_function
7
7
8 import sys
8 import sys
9 import time
9 import time
10 import logging
10 import logging
11 import uuid
11 import uuid
12
12
13 from datetime import datetime
13 from datetime import datetime
14 from signal import (
14 from signal import (
15 signal, default_int_handler, SIGINT
15 signal, default_int_handler, SIGINT
16 )
16 )
17
17
18 import zmq
18 import zmq
19 from zmq.eventloop import ioloop
19 from zmq.eventloop import ioloop
20 from zmq.eventloop.zmqstream import ZMQStream
20 from zmq.eventloop.zmqstream import ZMQStream
21
21
22 from IPython.config.configurable import Configurable
22 from IPython.config.configurable import Configurable
23 from IPython.core.error import StdinNotImplementedError
23 from IPython.core.error import StdinNotImplementedError
24 from IPython.core import release
24 from IPython.core import release
25 from IPython.utils import py3compat
25 from IPython.utils import py3compat
26 from IPython.utils.py3compat import unicode_type, string_types
26 from IPython.utils.py3compat import unicode_type, string_types
27 from IPython.utils.jsonutil import json_clean
27 from IPython.utils.jsonutil import json_clean
28 from IPython.utils.traitlets import (
28 from IPython.utils.traitlets import (
29 Any, Instance, Float, Dict, List, Set, Integer, Unicode, Bool,
29 Any, Instance, Float, Dict, List, Set, Integer, Unicode, Bool,
30 )
30 )
31
31
32 from .session import Session
32 from .session import Session
33
33
34
34
35 class Kernel(Configurable):
35 class Kernel(Configurable):
36
36
37 #---------------------------------------------------------------------------
37 #---------------------------------------------------------------------------
38 # Kernel interface
38 # Kernel interface
39 #---------------------------------------------------------------------------
39 #---------------------------------------------------------------------------
40
40
41 # attribute to override with a GUI
41 # attribute to override with a GUI
42 eventloop = Any(None)
42 eventloop = Any(None)
43 def _eventloop_changed(self, name, old, new):
43 def _eventloop_changed(self, name, old, new):
44 """schedule call to eventloop from IOLoop"""
44 """schedule call to eventloop from IOLoop"""
45 loop = ioloop.IOLoop.instance()
45 loop = ioloop.IOLoop.instance()
46 loop.add_callback(self.enter_eventloop)
46 loop.add_callback(self.enter_eventloop)
47
47
48 session = Instance(Session)
48 session = Instance(Session)
49 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
49 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
50 shell_streams = List()
50 shell_streams = List()
51 control_stream = Instance(ZMQStream)
51 control_stream = Instance(ZMQStream)
52 iopub_socket = Instance(zmq.Socket)
52 iopub_socket = Instance(zmq.Socket)
53 stdin_socket = Instance(zmq.Socket)
53 stdin_socket = Instance(zmq.Socket)
54 log = Instance(logging.Logger)
54 log = Instance(logging.Logger)
55
55
56 # identities:
56 # identities:
57 int_id = Integer(-1)
57 int_id = Integer(-1)
58 ident = Unicode()
58 ident = Unicode()
59
59
60 def _ident_default(self):
60 def _ident_default(self):
61 return unicode_type(uuid.uuid4())
61 return unicode_type(uuid.uuid4())
62
62
63 # Private interface
63 # Private interface
64
64
65 _darwin_app_nap = Bool(True, config=True,
65 _darwin_app_nap = Bool(True, config=True,
66 help="""Whether to use appnope for compatiblity with OS X App Nap.
66 help="""Whether to use appnope for compatiblity with OS X App Nap.
67
67
68 Only affects OS X >= 10.9.
68 Only affects OS X >= 10.9.
69 """
69 """
70 )
70 )
71
71
72 # track associations with current request
72 # track associations with current request
73 _allow_stdin = Bool(False)
73 _allow_stdin = Bool(False)
74 _parent_header = Dict()
74 _parent_header = Dict()
75 _parent_ident = Any(b'')
75 _parent_ident = Any(b'')
76 # Time to sleep after flushing the stdout/err buffers in each execute
76 # Time to sleep after flushing the stdout/err buffers in each execute
77 # cycle. While this introduces a hard limit on the minimal latency of the
77 # cycle. While this introduces a hard limit on the minimal latency of the
78 # execute cycle, it helps prevent output synchronization problems for
78 # execute cycle, it helps prevent output synchronization problems for
79 # clients.
79 # clients.
80 # Units are in seconds. The minimum zmq latency on local host is probably
80 # Units are in seconds. The minimum zmq latency on local host is probably
81 # ~150 microseconds, set this to 500us for now. We may need to increase it
81 # ~150 microseconds, set this to 500us for now. We may need to increase it
82 # a little if it's not enough after more interactive testing.
82 # a little if it's not enough after more interactive testing.
83 _execute_sleep = Float(0.0005, config=True)
83 _execute_sleep = Float(0.0005, config=True)
84
84
85 # Frequency of the kernel's event loop.
85 # Frequency of the kernel's event loop.
86 # Units are in seconds, kernel subclasses for GUI toolkits may need to
86 # Units are in seconds, kernel subclasses for GUI toolkits may need to
87 # adapt to milliseconds.
87 # adapt to milliseconds.
88 _poll_interval = Float(0.05, config=True)
88 _poll_interval = Float(0.05, config=True)
89
89
90 # If the shutdown was requested over the network, we leave here the
90 # If the shutdown was requested over the network, we leave here the
91 # necessary reply message so it can be sent by our registered atexit
91 # necessary reply message so it can be sent by our registered atexit
92 # handler. This ensures that the reply is only sent to clients truly at
92 # handler. This ensures that the reply is only sent to clients truly at
93 # the end of our shutdown process (which happens after the underlying
93 # the end of our shutdown process (which happens after the underlying
94 # IPython shell's own shutdown).
94 # IPython shell's own shutdown).
95 _shutdown_message = None
95 _shutdown_message = None
96
96
97 # This is a dict of port number that the kernel is listening on. It is set
97 # This is a dict of port number that the kernel is listening on. It is set
98 # by record_ports and used by connect_request.
98 # by record_ports and used by connect_request.
99 _recorded_ports = Dict()
99 _recorded_ports = Dict()
100
100
101 # set of aborted msg_ids
101 # set of aborted msg_ids
102 aborted = Set()
102 aborted = Set()
103
103
104 # Track execution count here. For IPython, we override this to use the
104 # Track execution count here. For IPython, we override this to use the
105 # execution count we store in the shell.
105 # execution count we store in the shell.
106 execution_count = 0
106 execution_count = 0
107
107
108
108
109 def __init__(self, **kwargs):
109 def __init__(self, **kwargs):
110 super(Kernel, self).__init__(**kwargs)
110 super(Kernel, self).__init__(**kwargs)
111
111
112 # Build dict of handlers for message types
112 # Build dict of handlers for message types
113 msg_types = [ 'execute_request', 'complete_request',
113 msg_types = [ 'execute_request', 'complete_request',
114 'inspect_request', 'history_request',
114 'inspect_request', 'history_request',
115 'kernel_info_request',
115 'kernel_info_request',
116 'connect_request', 'shutdown_request',
116 'connect_request', 'shutdown_request',
117 'apply_request',
117 'apply_request', 'is_complete_request',
118 ]
118 ]
119 self.shell_handlers = {}
119 self.shell_handlers = {}
120 for msg_type in msg_types:
120 for msg_type in msg_types:
121 self.shell_handlers[msg_type] = getattr(self, msg_type)
121 self.shell_handlers[msg_type] = getattr(self, msg_type)
122
122
123 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
123 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
124 self.control_handlers = {}
124 self.control_handlers = {}
125 for msg_type in control_msg_types:
125 for msg_type in control_msg_types:
126 self.control_handlers[msg_type] = getattr(self, msg_type)
126 self.control_handlers[msg_type] = getattr(self, msg_type)
127
127
128
128
129 def dispatch_control(self, msg):
129 def dispatch_control(self, msg):
130 """dispatch control requests"""
130 """dispatch control requests"""
131 idents,msg = self.session.feed_identities(msg, copy=False)
131 idents,msg = self.session.feed_identities(msg, copy=False)
132 try:
132 try:
133 msg = self.session.unserialize(msg, content=True, copy=False)
133 msg = self.session.unserialize(msg, content=True, copy=False)
134 except:
134 except:
135 self.log.error("Invalid Control Message", exc_info=True)
135 self.log.error("Invalid Control Message", exc_info=True)
136 return
136 return
137
137
138 self.log.debug("Control received: %s", msg)
138 self.log.debug("Control received: %s", msg)
139
139
140 # Set the parent message for side effects.
140 # Set the parent message for side effects.
141 self.set_parent(idents, msg)
141 self.set_parent(idents, msg)
142 self._publish_status(u'busy')
142 self._publish_status(u'busy')
143
143
144 header = msg['header']
144 header = msg['header']
145 msg_type = header['msg_type']
145 msg_type = header['msg_type']
146
146
147 handler = self.control_handlers.get(msg_type, None)
147 handler = self.control_handlers.get(msg_type, None)
148 if handler is None:
148 if handler is None:
149 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
149 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
150 else:
150 else:
151 try:
151 try:
152 handler(self.control_stream, idents, msg)
152 handler(self.control_stream, idents, msg)
153 except Exception:
153 except Exception:
154 self.log.error("Exception in control handler:", exc_info=True)
154 self.log.error("Exception in control handler:", exc_info=True)
155
155
156 sys.stdout.flush()
156 sys.stdout.flush()
157 sys.stderr.flush()
157 sys.stderr.flush()
158 self._publish_status(u'idle')
158 self._publish_status(u'idle')
159
159
160 def dispatch_shell(self, stream, msg):
160 def dispatch_shell(self, stream, msg):
161 """dispatch shell requests"""
161 """dispatch shell requests"""
162 # flush control requests first
162 # flush control requests first
163 if self.control_stream:
163 if self.control_stream:
164 self.control_stream.flush()
164 self.control_stream.flush()
165
165
166 idents,msg = self.session.feed_identities(msg, copy=False)
166 idents,msg = self.session.feed_identities(msg, copy=False)
167 try:
167 try:
168 msg = self.session.unserialize(msg, content=True, copy=False)
168 msg = self.session.unserialize(msg, content=True, copy=False)
169 except:
169 except:
170 self.log.error("Invalid Message", exc_info=True)
170 self.log.error("Invalid Message", exc_info=True)
171 return
171 return
172
172
173 # Set the parent message for side effects.
173 # Set the parent message for side effects.
174 self.set_parent(idents, msg)
174 self.set_parent(idents, msg)
175 self._publish_status(u'busy')
175 self._publish_status(u'busy')
176
176
177 header = msg['header']
177 header = msg['header']
178 msg_id = header['msg_id']
178 msg_id = header['msg_id']
179 msg_type = msg['header']['msg_type']
179 msg_type = msg['header']['msg_type']
180
180
181 # Print some info about this message and leave a '--->' marker, so it's
181 # Print some info about this message and leave a '--->' marker, so it's
182 # easier to trace visually the message chain when debugging. Each
182 # easier to trace visually the message chain when debugging. Each
183 # handler prints its message at the end.
183 # handler prints its message at the end.
184 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
184 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
185 self.log.debug(' Content: %s\n --->\n ', msg['content'])
185 self.log.debug(' Content: %s\n --->\n ', msg['content'])
186
186
187 if msg_id in self.aborted:
187 if msg_id in self.aborted:
188 self.aborted.remove(msg_id)
188 self.aborted.remove(msg_id)
189 # is it safe to assume a msg_id will not be resubmitted?
189 # is it safe to assume a msg_id will not be resubmitted?
190 reply_type = msg_type.split('_')[0] + '_reply'
190 reply_type = msg_type.split('_')[0] + '_reply'
191 status = {'status' : 'aborted'}
191 status = {'status' : 'aborted'}
192 md = {'engine' : self.ident}
192 md = {'engine' : self.ident}
193 md.update(status)
193 md.update(status)
194 self.session.send(stream, reply_type, metadata=md,
194 self.session.send(stream, reply_type, metadata=md,
195 content=status, parent=msg, ident=idents)
195 content=status, parent=msg, ident=idents)
196 return
196 return
197
197
198 handler = self.shell_handlers.get(msg_type, None)
198 handler = self.shell_handlers.get(msg_type, None)
199 if handler is None:
199 if handler is None:
200 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
200 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
201 else:
201 else:
202 # ensure default_int_handler during handler call
202 # ensure default_int_handler during handler call
203 sig = signal(SIGINT, default_int_handler)
203 sig = signal(SIGINT, default_int_handler)
204 self.log.debug("%s: %s", msg_type, msg)
204 self.log.debug("%s: %s", msg_type, msg)
205 try:
205 try:
206 handler(stream, idents, msg)
206 handler(stream, idents, msg)
207 except Exception:
207 except Exception:
208 self.log.error("Exception in message handler:", exc_info=True)
208 self.log.error("Exception in message handler:", exc_info=True)
209 finally:
209 finally:
210 signal(SIGINT, sig)
210 signal(SIGINT, sig)
211
211
212 sys.stdout.flush()
212 sys.stdout.flush()
213 sys.stderr.flush()
213 sys.stderr.flush()
214 self._publish_status(u'idle')
214 self._publish_status(u'idle')
215
215
216 def enter_eventloop(self):
216 def enter_eventloop(self):
217 """enter eventloop"""
217 """enter eventloop"""
218 self.log.info("entering eventloop %s", self.eventloop)
218 self.log.info("entering eventloop %s", self.eventloop)
219 for stream in self.shell_streams:
219 for stream in self.shell_streams:
220 # flush any pending replies,
220 # flush any pending replies,
221 # which may be skipped by entering the eventloop
221 # which may be skipped by entering the eventloop
222 stream.flush(zmq.POLLOUT)
222 stream.flush(zmq.POLLOUT)
223 # restore default_int_handler
223 # restore default_int_handler
224 signal(SIGINT, default_int_handler)
224 signal(SIGINT, default_int_handler)
225 while self.eventloop is not None:
225 while self.eventloop is not None:
226 try:
226 try:
227 self.eventloop(self)
227 self.eventloop(self)
228 except KeyboardInterrupt:
228 except KeyboardInterrupt:
229 # Ctrl-C shouldn't crash the kernel
229 # Ctrl-C shouldn't crash the kernel
230 self.log.error("KeyboardInterrupt caught in kernel")
230 self.log.error("KeyboardInterrupt caught in kernel")
231 continue
231 continue
232 else:
232 else:
233 # eventloop exited cleanly, this means we should stop (right?)
233 # eventloop exited cleanly, this means we should stop (right?)
234 self.eventloop = None
234 self.eventloop = None
235 break
235 break
236 self.log.info("exiting eventloop")
236 self.log.info("exiting eventloop")
237
237
238 def start(self):
238 def start(self):
239 """register dispatchers for streams"""
239 """register dispatchers for streams"""
240 if self.control_stream:
240 if self.control_stream:
241 self.control_stream.on_recv(self.dispatch_control, copy=False)
241 self.control_stream.on_recv(self.dispatch_control, copy=False)
242
242
243 def make_dispatcher(stream):
243 def make_dispatcher(stream):
244 def dispatcher(msg):
244 def dispatcher(msg):
245 return self.dispatch_shell(stream, msg)
245 return self.dispatch_shell(stream, msg)
246 return dispatcher
246 return dispatcher
247
247
248 for s in self.shell_streams:
248 for s in self.shell_streams:
249 s.on_recv(make_dispatcher(s), copy=False)
249 s.on_recv(make_dispatcher(s), copy=False)
250
250
251 # publish idle status
251 # publish idle status
252 self._publish_status('starting')
252 self._publish_status('starting')
253
253
254 def do_one_iteration(self):
254 def do_one_iteration(self):
255 """step eventloop just once"""
255 """step eventloop just once"""
256 if self.control_stream:
256 if self.control_stream:
257 self.control_stream.flush()
257 self.control_stream.flush()
258 for stream in self.shell_streams:
258 for stream in self.shell_streams:
259 # handle at most one request per iteration
259 # handle at most one request per iteration
260 stream.flush(zmq.POLLIN, 1)
260 stream.flush(zmq.POLLIN, 1)
261 stream.flush(zmq.POLLOUT)
261 stream.flush(zmq.POLLOUT)
262
262
263
263
264 def record_ports(self, ports):
264 def record_ports(self, ports):
265 """Record the ports that this kernel is using.
265 """Record the ports that this kernel is using.
266
266
267 The creator of the Kernel instance must call this methods if they
267 The creator of the Kernel instance must call this methods if they
268 want the :meth:`connect_request` method to return the port numbers.
268 want the :meth:`connect_request` method to return the port numbers.
269 """
269 """
270 self._recorded_ports = ports
270 self._recorded_ports = ports
271
271
272 #---------------------------------------------------------------------------
272 #---------------------------------------------------------------------------
273 # Kernel request handlers
273 # Kernel request handlers
274 #---------------------------------------------------------------------------
274 #---------------------------------------------------------------------------
275
275
276 def _make_metadata(self, other=None):
276 def _make_metadata(self, other=None):
277 """init metadata dict, for execute/apply_reply"""
277 """init metadata dict, for execute/apply_reply"""
278 new_md = {
278 new_md = {
279 'dependencies_met' : True,
279 'dependencies_met' : True,
280 'engine' : self.ident,
280 'engine' : self.ident,
281 'started': datetime.now(),
281 'started': datetime.now(),
282 }
282 }
283 if other:
283 if other:
284 new_md.update(other)
284 new_md.update(other)
285 return new_md
285 return new_md
286
286
287 def _publish_execute_input(self, code, parent, execution_count):
287 def _publish_execute_input(self, code, parent, execution_count):
288 """Publish the code request on the iopub stream."""
288 """Publish the code request on the iopub stream."""
289
289
290 self.session.send(self.iopub_socket, u'execute_input',
290 self.session.send(self.iopub_socket, u'execute_input',
291 {u'code':code, u'execution_count': execution_count},
291 {u'code':code, u'execution_count': execution_count},
292 parent=parent, ident=self._topic('execute_input')
292 parent=parent, ident=self._topic('execute_input')
293 )
293 )
294
294
295 def _publish_status(self, status, parent=None):
295 def _publish_status(self, status, parent=None):
296 """send status (busy/idle) on IOPub"""
296 """send status (busy/idle) on IOPub"""
297 self.session.send(self.iopub_socket,
297 self.session.send(self.iopub_socket,
298 u'status',
298 u'status',
299 {u'execution_state': status},
299 {u'execution_state': status},
300 parent=parent or self._parent_header,
300 parent=parent or self._parent_header,
301 ident=self._topic('status'),
301 ident=self._topic('status'),
302 )
302 )
303
303
304 def set_parent(self, ident, parent):
304 def set_parent(self, ident, parent):
305 """Set the current parent_header
305 """Set the current parent_header
306
306
307 Side effects (IOPub messages) and replies are associated with
307 Side effects (IOPub messages) and replies are associated with
308 the request that caused them via the parent_header.
308 the request that caused them via the parent_header.
309
309
310 The parent identity is used to route input_request messages
310 The parent identity is used to route input_request messages
311 on the stdin channel.
311 on the stdin channel.
312 """
312 """
313 self._parent_ident = ident
313 self._parent_ident = ident
314 self._parent_header = parent
314 self._parent_header = parent
315
315
316 def send_response(self, stream, msg_or_type, content=None, ident=None,
316 def send_response(self, stream, msg_or_type, content=None, ident=None,
317 buffers=None, track=False, header=None, metadata=None):
317 buffers=None, track=False, header=None, metadata=None):
318 """Send a response to the message we're currently processing.
318 """Send a response to the message we're currently processing.
319
319
320 This accepts all the parameters of :meth:`IPython.kernel.zmq.session.Session.send`
320 This accepts all the parameters of :meth:`IPython.kernel.zmq.session.Session.send`
321 except ``parent``.
321 except ``parent``.
322
322
323 This relies on :meth:`set_parent` having been called for the current
323 This relies on :meth:`set_parent` having been called for the current
324 message.
324 message.
325 """
325 """
326 return self.session.send(stream, msg_or_type, content, self._parent_header,
326 return self.session.send(stream, msg_or_type, content, self._parent_header,
327 ident, buffers, track, header, metadata)
327 ident, buffers, track, header, metadata)
328
328
329 def execute_request(self, stream, ident, parent):
329 def execute_request(self, stream, ident, parent):
330 """handle an execute_request"""
330 """handle an execute_request"""
331
331
332 try:
332 try:
333 content = parent[u'content']
333 content = parent[u'content']
334 code = py3compat.cast_unicode_py2(content[u'code'])
334 code = py3compat.cast_unicode_py2(content[u'code'])
335 silent = content[u'silent']
335 silent = content[u'silent']
336 store_history = content.get(u'store_history', not silent)
336 store_history = content.get(u'store_history', not silent)
337 user_expressions = content.get('user_expressions', {})
337 user_expressions = content.get('user_expressions', {})
338 allow_stdin = content.get('allow_stdin', False)
338 allow_stdin = content.get('allow_stdin', False)
339 except:
339 except:
340 self.log.error("Got bad msg: ")
340 self.log.error("Got bad msg: ")
341 self.log.error("%s", parent)
341 self.log.error("%s", parent)
342 return
342 return
343
343
344 md = self._make_metadata(parent['metadata'])
344 md = self._make_metadata(parent['metadata'])
345
345
346 # Re-broadcast our input for the benefit of listening clients, and
346 # Re-broadcast our input for the benefit of listening clients, and
347 # start computing output
347 # start computing output
348 if not silent:
348 if not silent:
349 self.execution_count += 1
349 self.execution_count += 1
350 self._publish_execute_input(code, parent, self.execution_count)
350 self._publish_execute_input(code, parent, self.execution_count)
351
351
352 reply_content = self.do_execute(code, silent, store_history,
352 reply_content = self.do_execute(code, silent, store_history,
353 user_expressions, allow_stdin)
353 user_expressions, allow_stdin)
354
354
355 # Flush output before sending the reply.
355 # Flush output before sending the reply.
356 sys.stdout.flush()
356 sys.stdout.flush()
357 sys.stderr.flush()
357 sys.stderr.flush()
358 # FIXME: on rare occasions, the flush doesn't seem to make it to the
358 # FIXME: on rare occasions, the flush doesn't seem to make it to the
359 # clients... This seems to mitigate the problem, but we definitely need
359 # clients... This seems to mitigate the problem, but we definitely need
360 # to better understand what's going on.
360 # to better understand what's going on.
361 if self._execute_sleep:
361 if self._execute_sleep:
362 time.sleep(self._execute_sleep)
362 time.sleep(self._execute_sleep)
363
363
364 # Send the reply.
364 # Send the reply.
365 reply_content = json_clean(reply_content)
365 reply_content = json_clean(reply_content)
366
366
367 md['status'] = reply_content['status']
367 md['status'] = reply_content['status']
368 if reply_content['status'] == 'error' and \
368 if reply_content['status'] == 'error' and \
369 reply_content['ename'] == 'UnmetDependency':
369 reply_content['ename'] == 'UnmetDependency':
370 md['dependencies_met'] = False
370 md['dependencies_met'] = False
371
371
372 reply_msg = self.session.send(stream, u'execute_reply',
372 reply_msg = self.session.send(stream, u'execute_reply',
373 reply_content, parent, metadata=md,
373 reply_content, parent, metadata=md,
374 ident=ident)
374 ident=ident)
375
375
376 self.log.debug("%s", reply_msg)
376 self.log.debug("%s", reply_msg)
377
377
378 if not silent and reply_msg['content']['status'] == u'error':
378 if not silent and reply_msg['content']['status'] == u'error':
379 self._abort_queues()
379 self._abort_queues()
380
380
381 def do_execute(self, code, silent, store_history=True,
381 def do_execute(self, code, silent, store_history=True,
382 user_experssions=None, allow_stdin=False):
382 user_experssions=None, allow_stdin=False):
383 """Execute user code. Must be overridden by subclasses.
383 """Execute user code. Must be overridden by subclasses.
384 """
384 """
385 raise NotImplementedError
385 raise NotImplementedError
386
386
387 def complete_request(self, stream, ident, parent):
387 def complete_request(self, stream, ident, parent):
388 content = parent['content']
388 content = parent['content']
389 code = content['code']
389 code = content['code']
390 cursor_pos = content['cursor_pos']
390 cursor_pos = content['cursor_pos']
391
391
392 matches = self.do_complete(code, cursor_pos)
392 matches = self.do_complete(code, cursor_pos)
393 matches = json_clean(matches)
393 matches = json_clean(matches)
394 completion_msg = self.session.send(stream, 'complete_reply',
394 completion_msg = self.session.send(stream, 'complete_reply',
395 matches, parent, ident)
395 matches, parent, ident)
396 self.log.debug("%s", completion_msg)
396 self.log.debug("%s", completion_msg)
397
397
398 def do_complete(self, code, cursor_pos):
398 def do_complete(self, code, cursor_pos):
399 """Override in subclasses to find completions.
399 """Override in subclasses to find completions.
400 """
400 """
401 return {'matches' : [],
401 return {'matches' : [],
402 'cursor_end' : cursor_pos,
402 'cursor_end' : cursor_pos,
403 'cursor_start' : cursor_pos,
403 'cursor_start' : cursor_pos,
404 'metadata' : {},
404 'metadata' : {},
405 'status' : 'ok'}
405 'status' : 'ok'}
406
406
407 def inspect_request(self, stream, ident, parent):
407 def inspect_request(self, stream, ident, parent):
408 content = parent['content']
408 content = parent['content']
409
409
410 reply_content = self.do_inspect(content['code'], content['cursor_pos'],
410 reply_content = self.do_inspect(content['code'], content['cursor_pos'],
411 content.get('detail_level', 0))
411 content.get('detail_level', 0))
412 # Before we send this object over, we scrub it for JSON usage
412 # Before we send this object over, we scrub it for JSON usage
413 reply_content = json_clean(reply_content)
413 reply_content = json_clean(reply_content)
414 msg = self.session.send(stream, 'inspect_reply',
414 msg = self.session.send(stream, 'inspect_reply',
415 reply_content, parent, ident)
415 reply_content, parent, ident)
416 self.log.debug("%s", msg)
416 self.log.debug("%s", msg)
417
417
418 def do_inspect(self, code, cursor_pos, detail_level=0):
418 def do_inspect(self, code, cursor_pos, detail_level=0):
419 """Override in subclasses to allow introspection.
419 """Override in subclasses to allow introspection.
420 """
420 """
421 return {'status': 'ok', 'data':{}, 'metadata':{}, 'found':False}
421 return {'status': 'ok', 'data':{}, 'metadata':{}, 'found':False}
422
422
423 def history_request(self, stream, ident, parent):
423 def history_request(self, stream, ident, parent):
424 content = parent['content']
424 content = parent['content']
425
425
426 reply_content = self.do_history(**content)
426 reply_content = self.do_history(**content)
427
427
428 reply_content = json_clean(reply_content)
428 reply_content = json_clean(reply_content)
429 msg = self.session.send(stream, 'history_reply',
429 msg = self.session.send(stream, 'history_reply',
430 reply_content, parent, ident)
430 reply_content, parent, ident)
431 self.log.debug("%s", msg)
431 self.log.debug("%s", msg)
432
432
433 def do_history(self, hist_access_type, output, raw, session=None, start=None,
433 def do_history(self, hist_access_type, output, raw, session=None, start=None,
434 stop=None, n=None, pattern=None, unique=False):
434 stop=None, n=None, pattern=None, unique=False):
435 """Override in subclasses to access history.
435 """Override in subclasses to access history.
436 """
436 """
437 return {'history': []}
437 return {'history': []}
438
438
439 def connect_request(self, stream, ident, parent):
439 def connect_request(self, stream, ident, parent):
440 if self._recorded_ports is not None:
440 if self._recorded_ports is not None:
441 content = self._recorded_ports.copy()
441 content = self._recorded_ports.copy()
442 else:
442 else:
443 content = {}
443 content = {}
444 msg = self.session.send(stream, 'connect_reply',
444 msg = self.session.send(stream, 'connect_reply',
445 content, parent, ident)
445 content, parent, ident)
446 self.log.debug("%s", msg)
446 self.log.debug("%s", msg)
447
447
448 @property
448 @property
449 def kernel_info(self):
449 def kernel_info(self):
450 return {
450 return {
451 'protocol_version': release.kernel_protocol_version,
451 'protocol_version': release.kernel_protocol_version,
452 'implementation': self.implementation,
452 'implementation': self.implementation,
453 'implementation_version': self.implementation_version,
453 'implementation_version': self.implementation_version,
454 'language': self.language,
454 'language': self.language,
455 'language_version': self.language_version,
455 'language_version': self.language_version,
456 'banner': self.banner,
456 'banner': self.banner,
457 }
457 }
458
458
459 def kernel_info_request(self, stream, ident, parent):
459 def kernel_info_request(self, stream, ident, parent):
460 msg = self.session.send(stream, 'kernel_info_reply',
460 msg = self.session.send(stream, 'kernel_info_reply',
461 self.kernel_info, parent, ident)
461 self.kernel_info, parent, ident)
462 self.log.debug("%s", msg)
462 self.log.debug("%s", msg)
463
463
464 def shutdown_request(self, stream, ident, parent):
464 def shutdown_request(self, stream, ident, parent):
465 content = self.do_shutdown(parent['content']['restart'])
465 content = self.do_shutdown(parent['content']['restart'])
466 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
466 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
467 # same content, but different msg_id for broadcasting on IOPub
467 # same content, but different msg_id for broadcasting on IOPub
468 self._shutdown_message = self.session.msg(u'shutdown_reply',
468 self._shutdown_message = self.session.msg(u'shutdown_reply',
469 content, parent
469 content, parent
470 )
470 )
471
471
472 self._at_shutdown()
472 self._at_shutdown()
473 # call sys.exit after a short delay
473 # call sys.exit after a short delay
474 loop = ioloop.IOLoop.instance()
474 loop = ioloop.IOLoop.instance()
475 loop.add_timeout(time.time()+0.1, loop.stop)
475 loop.add_timeout(time.time()+0.1, loop.stop)
476
476
477 def do_shutdown(self, restart):
477 def do_shutdown(self, restart):
478 """Override in subclasses to do things when the frontend shuts down the
478 """Override in subclasses to do things when the frontend shuts down the
479 kernel.
479 kernel.
480 """
480 """
481 return {'status': 'ok', 'restart': restart}
481 return {'status': 'ok', 'restart': restart}
482
483 def is_complete_request(self, stream, ident, parent):
484 content = parent['content']
485 code = content['code']
486
487 reply_content = self.do_is_complete(code)
488 reply_content = json_clean(reply_content)
489 reply_msg = self.session.send(stream, 'is_complete_reply',
490 reply_content, parent, ident)
491 self.log.debug("%s", reply_msg)
492
493 def do_is_complete(self, code):
494 """Override in subclasses to find completions.
495 """
496 return {'complete' : True,
497 }
482
498
483 #---------------------------------------------------------------------------
499 #---------------------------------------------------------------------------
484 # Engine methods
500 # Engine methods
485 #---------------------------------------------------------------------------
501 #---------------------------------------------------------------------------
486
502
487 def apply_request(self, stream, ident, parent):
503 def apply_request(self, stream, ident, parent):
488 try:
504 try:
489 content = parent[u'content']
505 content = parent[u'content']
490 bufs = parent[u'buffers']
506 bufs = parent[u'buffers']
491 msg_id = parent['header']['msg_id']
507 msg_id = parent['header']['msg_id']
492 except:
508 except:
493 self.log.error("Got bad msg: %s", parent, exc_info=True)
509 self.log.error("Got bad msg: %s", parent, exc_info=True)
494 return
510 return
495
511
496 md = self._make_metadata(parent['metadata'])
512 md = self._make_metadata(parent['metadata'])
497
513
498 reply_content, result_buf = self.do_apply(content, bufs, msg_id, md)
514 reply_content, result_buf = self.do_apply(content, bufs, msg_id, md)
499
515
500 # put 'ok'/'error' status in header, for scheduler introspection:
516 # put 'ok'/'error' status in header, for scheduler introspection:
501 md['status'] = reply_content['status']
517 md['status'] = reply_content['status']
502
518
503 # flush i/o
519 # flush i/o
504 sys.stdout.flush()
520 sys.stdout.flush()
505 sys.stderr.flush()
521 sys.stderr.flush()
506
522
507 self.session.send(stream, u'apply_reply', reply_content,
523 self.session.send(stream, u'apply_reply', reply_content,
508 parent=parent, ident=ident,buffers=result_buf, metadata=md)
524 parent=parent, ident=ident,buffers=result_buf, metadata=md)
509
525
510 def do_apply(self, content, bufs, msg_id, reply_metadata):
526 def do_apply(self, content, bufs, msg_id, reply_metadata):
511 """Override in subclasses to support the IPython parallel framework.
527 """Override in subclasses to support the IPython parallel framework.
512 """
528 """
513 raise NotImplementedError
529 raise NotImplementedError
514
530
515 #---------------------------------------------------------------------------
531 #---------------------------------------------------------------------------
516 # Control messages
532 # Control messages
517 #---------------------------------------------------------------------------
533 #---------------------------------------------------------------------------
518
534
519 def abort_request(self, stream, ident, parent):
535 def abort_request(self, stream, ident, parent):
520 """abort a specifig msg by id"""
536 """abort a specifig msg by id"""
521 msg_ids = parent['content'].get('msg_ids', None)
537 msg_ids = parent['content'].get('msg_ids', None)
522 if isinstance(msg_ids, string_types):
538 if isinstance(msg_ids, string_types):
523 msg_ids = [msg_ids]
539 msg_ids = [msg_ids]
524 if not msg_ids:
540 if not msg_ids:
525 self.abort_queues()
541 self.abort_queues()
526 for mid in msg_ids:
542 for mid in msg_ids:
527 self.aborted.add(str(mid))
543 self.aborted.add(str(mid))
528
544
529 content = dict(status='ok')
545 content = dict(status='ok')
530 reply_msg = self.session.send(stream, 'abort_reply', content=content,
546 reply_msg = self.session.send(stream, 'abort_reply', content=content,
531 parent=parent, ident=ident)
547 parent=parent, ident=ident)
532 self.log.debug("%s", reply_msg)
548 self.log.debug("%s", reply_msg)
533
549
534 def clear_request(self, stream, idents, parent):
550 def clear_request(self, stream, idents, parent):
535 """Clear our namespace."""
551 """Clear our namespace."""
536 content = self.do_clear()
552 content = self.do_clear()
537 self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
553 self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
538 content = content)
554 content = content)
539
555
540 def do_clear(self):
556 def do_clear(self):
541 """Override in subclasses to clear the namespace
557 """Override in subclasses to clear the namespace
542
558
543 This is only required for IPython.parallel.
559 This is only required for IPython.parallel.
544 """
560 """
545 raise NotImplementedError
561 raise NotImplementedError
546
562
547 #---------------------------------------------------------------------------
563 #---------------------------------------------------------------------------
548 # Protected interface
564 # Protected interface
549 #---------------------------------------------------------------------------
565 #---------------------------------------------------------------------------
550
566
551 def _topic(self, topic):
567 def _topic(self, topic):
552 """prefixed topic for IOPub messages"""
568 """prefixed topic for IOPub messages"""
553 if self.int_id >= 0:
569 if self.int_id >= 0:
554 base = "engine.%i" % self.int_id
570 base = "engine.%i" % self.int_id
555 else:
571 else:
556 base = "kernel.%s" % self.ident
572 base = "kernel.%s" % self.ident
557
573
558 return py3compat.cast_bytes("%s.%s" % (base, topic))
574 return py3compat.cast_bytes("%s.%s" % (base, topic))
559
575
560 def _abort_queues(self):
576 def _abort_queues(self):
561 for stream in self.shell_streams:
577 for stream in self.shell_streams:
562 if stream:
578 if stream:
563 self._abort_queue(stream)
579 self._abort_queue(stream)
564
580
565 def _abort_queue(self, stream):
581 def _abort_queue(self, stream):
566 poller = zmq.Poller()
582 poller = zmq.Poller()
567 poller.register(stream.socket, zmq.POLLIN)
583 poller.register(stream.socket, zmq.POLLIN)
568 while True:
584 while True:
569 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
585 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
570 if msg is None:
586 if msg is None:
571 return
587 return
572
588
573 self.log.info("Aborting:")
589 self.log.info("Aborting:")
574 self.log.info("%s", msg)
590 self.log.info("%s", msg)
575 msg_type = msg['header']['msg_type']
591 msg_type = msg['header']['msg_type']
576 reply_type = msg_type.split('_')[0] + '_reply'
592 reply_type = msg_type.split('_')[0] + '_reply'
577
593
578 status = {'status' : 'aborted'}
594 status = {'status' : 'aborted'}
579 md = {'engine' : self.ident}
595 md = {'engine' : self.ident}
580 md.update(status)
596 md.update(status)
581 reply_msg = self.session.send(stream, reply_type, metadata=md,
597 reply_msg = self.session.send(stream, reply_type, metadata=md,
582 content=status, parent=msg, ident=idents)
598 content=status, parent=msg, ident=idents)
583 self.log.debug("%s", reply_msg)
599 self.log.debug("%s", reply_msg)
584 # We need to wait a bit for requests to come in. This can probably
600 # We need to wait a bit for requests to come in. This can probably
585 # be set shorter for true asynchronous clients.
601 # be set shorter for true asynchronous clients.
586 poller.poll(50)
602 poller.poll(50)
587
603
588
604
589 def _no_raw_input(self):
605 def _no_raw_input(self):
590 """Raise StdinNotImplentedError if active frontend doesn't support
606 """Raise StdinNotImplentedError if active frontend doesn't support
591 stdin."""
607 stdin."""
592 raise StdinNotImplementedError("raw_input was called, but this "
608 raise StdinNotImplementedError("raw_input was called, but this "
593 "frontend does not support stdin.")
609 "frontend does not support stdin.")
594
610
595 def getpass(self, prompt=''):
611 def getpass(self, prompt=''):
596 """Forward getpass to frontends
612 """Forward getpass to frontends
597
613
598 Raises
614 Raises
599 ------
615 ------
600 StdinNotImplentedError if active frontend doesn't support stdin.
616 StdinNotImplentedError if active frontend doesn't support stdin.
601 """
617 """
602 if not self._allow_stdin:
618 if not self._allow_stdin:
603 raise StdinNotImplementedError(
619 raise StdinNotImplementedError(
604 "getpass was called, but this frontend does not support input requests."
620 "getpass was called, but this frontend does not support input requests."
605 )
621 )
606 return self._input_request(prompt,
622 return self._input_request(prompt,
607 self._parent_ident,
623 self._parent_ident,
608 self._parent_header,
624 self._parent_header,
609 password=True,
625 password=True,
610 )
626 )
611
627
612 def raw_input(self, prompt=''):
628 def raw_input(self, prompt=''):
613 """Forward raw_input to frontends
629 """Forward raw_input to frontends
614
630
615 Raises
631 Raises
616 ------
632 ------
617 StdinNotImplentedError if active frontend doesn't support stdin.
633 StdinNotImplentedError if active frontend doesn't support stdin.
618 """
634 """
619 if not self._allow_stdin:
635 if not self._allow_stdin:
620 raise StdinNotImplementedError(
636 raise StdinNotImplementedError(
621 "raw_input was called, but this frontend does not support input requests."
637 "raw_input was called, but this frontend does not support input requests."
622 )
638 )
623 return self._input_request(prompt,
639 return self._input_request(prompt,
624 self._parent_ident,
640 self._parent_ident,
625 self._parent_header,
641 self._parent_header,
626 password=False,
642 password=False,
627 )
643 )
628
644
629 def _input_request(self, prompt, ident, parent, password=False):
645 def _input_request(self, prompt, ident, parent, password=False):
630 # Flush output before making the request.
646 # Flush output before making the request.
631 sys.stderr.flush()
647 sys.stderr.flush()
632 sys.stdout.flush()
648 sys.stdout.flush()
633 # flush the stdin socket, to purge stale replies
649 # flush the stdin socket, to purge stale replies
634 while True:
650 while True:
635 try:
651 try:
636 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
652 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
637 except zmq.ZMQError as e:
653 except zmq.ZMQError as e:
638 if e.errno == zmq.EAGAIN:
654 if e.errno == zmq.EAGAIN:
639 break
655 break
640 else:
656 else:
641 raise
657 raise
642
658
643 # Send the input request.
659 # Send the input request.
644 content = json_clean(dict(prompt=prompt, password=password))
660 content = json_clean(dict(prompt=prompt, password=password))
645 self.session.send(self.stdin_socket, u'input_request', content, parent,
661 self.session.send(self.stdin_socket, u'input_request', content, parent,
646 ident=ident)
662 ident=ident)
647
663
648 # Await a response.
664 # Await a response.
649 while True:
665 while True:
650 try:
666 try:
651 ident, reply = self.session.recv(self.stdin_socket, 0)
667 ident, reply = self.session.recv(self.stdin_socket, 0)
652 except Exception:
668 except Exception:
653 self.log.warn("Invalid Message:", exc_info=True)
669 self.log.warn("Invalid Message:", exc_info=True)
654 except KeyboardInterrupt:
670 except KeyboardInterrupt:
655 # re-raise KeyboardInterrupt, to truncate traceback
671 # re-raise KeyboardInterrupt, to truncate traceback
656 raise KeyboardInterrupt
672 raise KeyboardInterrupt
657 else:
673 else:
658 break
674 break
659 try:
675 try:
660 value = py3compat.unicode_to_str(reply['content']['value'])
676 value = py3compat.unicode_to_str(reply['content']['value'])
661 except:
677 except:
662 self.log.error("Bad input_reply: %s", parent)
678 self.log.error("Bad input_reply: %s", parent)
663 value = ''
679 value = ''
664 if value == '\x04':
680 if value == '\x04':
665 # EOF
681 # EOF
666 raise EOFError
682 raise EOFError
667 return value
683 return value
668
684
669 def _at_shutdown(self):
685 def _at_shutdown(self):
670 """Actions taken at shutdown by the kernel, called by python's atexit.
686 """Actions taken at shutdown by the kernel, called by python's atexit.
671 """
687 """
672 # io.rprint("Kernel at_shutdown") # dbg
688 # io.rprint("Kernel at_shutdown") # dbg
673 if self._shutdown_message is not None:
689 if self._shutdown_message is not None:
674 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
690 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
675 self.log.debug("%s", self._shutdown_message)
691 self.log.debug("%s", self._shutdown_message)
676 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
692 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
677
693
General Comments 0
You need to be logged in to leave comments. Login now