##// END OF EJS Templates
remove 'name' from inspect_reply
MinRK -
Show More
@@ -1,105 +1,104 b''
1 1 # Copyright (c) IPython Development Team.
2 2 # Distributed under the terms of the Modified BSD License.
3 3
4 4 from __future__ import print_function
5 5
6 6 import unittest
7 7
8 8 from IPython.kernel.inprocess.blocking import BlockingInProcessKernelClient
9 9 from IPython.kernel.inprocess.manager import InProcessKernelManager
10 10
11 11 #-----------------------------------------------------------------------------
12 12 # Test case
13 13 #-----------------------------------------------------------------------------
14 14
15 15 class InProcessKernelManagerTestCase(unittest.TestCase):
16 16
17 17 def test_interface(self):
18 18 """ Does the in-process kernel manager implement the basic KM interface?
19 19 """
20 20 km = InProcessKernelManager()
21 21 self.assert_(not km.has_kernel)
22 22
23 23 km.start_kernel()
24 24 self.assert_(km.has_kernel)
25 25 self.assert_(km.kernel is not None)
26 26
27 27 kc = BlockingInProcessKernelClient(kernel=km.kernel)
28 28 self.assert_(not kc.channels_running)
29 29
30 30 kc.start_channels()
31 31 self.assert_(kc.channels_running)
32 32
33 33 old_kernel = km.kernel
34 34 km.restart_kernel()
35 35 self.assert_(km.kernel is not None)
36 36 self.assertNotEquals(km.kernel, old_kernel)
37 37
38 38 km.shutdown_kernel()
39 39 self.assert_(not km.has_kernel)
40 40
41 41 self.assertRaises(NotImplementedError, km.interrupt_kernel)
42 42 self.assertRaises(NotImplementedError, km.signal_kernel, 9)
43 43
44 44 kc.stop_channels()
45 45 self.assert_(not kc.channels_running)
46 46
47 47 def test_execute(self):
48 48 """ Does executing code in an in-process kernel work?
49 49 """
50 50 km = InProcessKernelManager()
51 51 km.start_kernel()
52 52 kc = BlockingInProcessKernelClient(kernel=km.kernel)
53 53 kc.start_channels()
54 54 kc.execute('foo = 1')
55 55 self.assertEquals(km.kernel.shell.user_ns['foo'], 1)
56 56
57 57 def test_complete(self):
58 58 """ Does requesting completion from an in-process kernel work?
59 59 """
60 60 km = InProcessKernelManager()
61 61 km.start_kernel()
62 62 kc = BlockingInProcessKernelClient(kernel=km.kernel)
63 63 kc.start_channels()
64 64 km.kernel.shell.push({'my_bar': 0, 'my_baz': 1})
65 65 kc.complete('my_ba', 5)
66 66 msg = kc.get_shell_msg()
67 67 self.assertEqual(msg['header']['msg_type'], 'complete_reply')
68 68 self.assertEqual(sorted(msg['content']['matches']),
69 69 ['my_bar', 'my_baz'])
70 70
71 71 def test_inspect(self):
72 72 """ Does requesting object information from an in-process kernel work?
73 73 """
74 74 km = InProcessKernelManager()
75 75 km.start_kernel()
76 76 kc = BlockingInProcessKernelClient(kernel=km.kernel)
77 77 kc.start_channels()
78 78 km.kernel.shell.user_ns['foo'] = 1
79 79 kc.inspect('foo')
80 80 msg = kc.get_shell_msg()
81 81 self.assertEqual(msg['header']['msg_type'], 'inspect_reply')
82 82 content = msg['content']
83 83 assert content['found']
84 self.assertEqual(content['name'], 'foo')
85 84 text = content['data']['text/plain']
86 85 self.assertIn('int', text)
87 86
88 87 def test_history(self):
89 88 """ Does requesting history from an in-process kernel work?
90 89 """
91 90 km = InProcessKernelManager()
92 91 km.start_kernel()
93 92 kc = BlockingInProcessKernelClient(kernel=km.kernel)
94 93 kc.start_channels()
95 94 kc.execute('%who')
96 95 kc.history(hist_access_type='tail', n=1)
97 96 msg = kc.shell_channel.get_msgs()[-1]
98 97 self.assertEquals(msg['header']['msg_type'], 'history_reply')
99 98 history = msg['content']['history']
100 99 self.assertEquals(len(history), 1)
101 100 self.assertEquals(history[0][2], '%who')
102 101
103 102
104 103 if __name__ == '__main__':
105 104 unittest.main()
@@ -1,404 +1,400 b''
1 1 """Test suite for our zeromq-based message specification."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import re
7 7 from distutils.version import LooseVersion as V
8 8 from subprocess import PIPE
9 9 try:
10 10 from queue import Empty # Py 3
11 11 except ImportError:
12 12 from Queue import Empty # Py 2
13 13
14 14 import nose.tools as nt
15 15
16 16 from IPython.kernel import KernelManager
17 17
18 18 from IPython.utils.traitlets import (
19 19 HasTraits, TraitError, Bool, Unicode, Dict, Integer, List, Enum, Any,
20 20 )
21 21 from IPython.utils.py3compat import string_types, iteritems
22 22
23 23 from .utils import TIMEOUT, start_global_kernel, flush_channels, execute
24 24
25 25 #-----------------------------------------------------------------------------
26 26 # Globals
27 27 #-----------------------------------------------------------------------------
28 28 KC = None
29 29
30 30 def setup():
31 31 global KC
32 32 KC = start_global_kernel()
33 33
34 34 #-----------------------------------------------------------------------------
35 35 # Message Spec References
36 36 #-----------------------------------------------------------------------------
37 37
38 38 class Reference(HasTraits):
39 39
40 40 """
41 41 Base class for message spec specification testing.
42 42
43 43 This class is the core of the message specification test. The
44 44 idea is that child classes implement trait attributes for each
45 45 message keys, so that message keys can be tested against these
46 46 traits using :meth:`check` method.
47 47
48 48 """
49 49
50 50 def check(self, d):
51 51 """validate a dict against our traits"""
52 52 for key in self.trait_names():
53 53 nt.assert_in(key, d)
54 54 # FIXME: always allow None, probably not a good idea
55 55 if d[key] is None:
56 56 continue
57 57 try:
58 58 setattr(self, key, d[key])
59 59 except TraitError as e:
60 60 assert False, str(e)
61 61
62 62 class Version(Unicode):
63 63 def validate(self, obj, value):
64 64 min_version = self.default_value
65 65 if V(value) < V(min_version):
66 66 raise TraitError("bad version: %s < %s" % (value, min_version))
67 67
68 68 class RMessage(Reference):
69 69 msg_id = Unicode()
70 70 msg_type = Unicode()
71 71 header = Dict()
72 72 parent_header = Dict()
73 73 content = Dict()
74 74
75 75 def check(self, d):
76 76 super(RMessage, self).check(d)
77 77 RHeader().check(self.header)
78 78 if self.parent_header:
79 79 RHeader().check(self.parent_header)
80 80
81 81 class RHeader(Reference):
82 82 msg_id = Unicode()
83 83 msg_type = Unicode()
84 84 session = Unicode()
85 85 username = Unicode()
86 86 version = Version('5.0')
87 87
88 88 mime_pat = re.compile(r'\w+/\w+')
89 89
90 90 class MimeBundle(Reference):
91 91 metadata = Dict()
92 92 data = Dict()
93 93 def _data_changed(self, name, old, new):
94 94 for k,v in iteritems(new):
95 95 assert mime_pat.match(k)
96 96 nt.assert_is_instance(v, string_types)
97 97
98 98 # shell replies
99 99
100 100 class ExecuteReply(Reference):
101 101 execution_count = Integer()
102 102 status = Enum((u'ok', u'error'))
103 103
104 104 def check(self, d):
105 105 Reference.check(self, d)
106 106 if d['status'] == 'ok':
107 107 ExecuteReplyOkay().check(d)
108 108 elif d['status'] == 'error':
109 109 ExecuteReplyError().check(d)
110 110
111 111
112 112 class ExecuteReplyOkay(Reference):
113 113 payload = List(Dict)
114 114 user_expressions = Dict()
115 115
116 116
117 117 class ExecuteReplyError(Reference):
118 118 ename = Unicode()
119 119 evalue = Unicode()
120 120 traceback = List(Unicode)
121 121
122 122
123 123 class InspectReply(MimeBundle):
124 name = Unicode()
125 124 found = Bool()
126 125
127 126
128 127 class ArgSpec(Reference):
129 128 args = List(Unicode)
130 129 varargs = Unicode()
131 130 varkw = Unicode()
132 131 defaults = List()
133 132
134 133
135 134 class Status(Reference):
136 135 execution_state = Enum((u'busy', u'idle', u'starting'))
137 136
138 137
139 138 class CompleteReply(Reference):
140 139 matches = List(Unicode)
141 140 cursor_start = Integer()
142 141 cursor_end = Integer()
143 142 status = Unicode()
144 143
145 144
146 145 class KernelInfoReply(Reference):
147 146 protocol_version = Version('5.0')
148 147 implementation = Unicode('ipython')
149 148 implementation_version = Version('2.1')
150 149 language_version = Version('2.7')
151 150 language = Unicode('python')
152 151 banner = Unicode()
153 152
154 153
155 154 # IOPub messages
156 155
157 156 class ExecuteInput(Reference):
158 157 code = Unicode()
159 158 execution_count = Integer()
160 159
161 160
162 161 Error = ExecuteReplyError
163 162
164 163
165 164 class Stream(Reference):
166 165 name = Enum((u'stdout', u'stderr'))
167 166 data = Unicode()
168 167
169 168
170 169 class DisplayData(MimeBundle):
171 170 pass
172 171
173 172
174 173 class ExecuteResult(MimeBundle):
175 174 execution_count = Integer()
176 175
177 176
178 177 references = {
179 178 'execute_reply' : ExecuteReply(),
180 179 'inspect_reply' : InspectReply(),
181 180 'status' : Status(),
182 181 'complete_reply' : CompleteReply(),
183 182 'kernel_info_reply': KernelInfoReply(),
184 183 'execute_input' : ExecuteInput(),
185 184 'execute_result' : ExecuteResult(),
186 185 'error' : Error(),
187 186 'stream' : Stream(),
188 187 'display_data' : DisplayData(),
189 188 'header' : RHeader(),
190 189 }
191 190 """
192 191 Specifications of `content` part of the reply messages.
193 192 """
194 193
195 194
196 195 def validate_message(msg, msg_type=None, parent=None):
197 196 """validate a message
198 197
199 198 This is a generator, and must be iterated through to actually
200 199 trigger each test.
201 200
202 201 If msg_type and/or parent are given, the msg_type and/or parent msg_id
203 202 are compared with the given values.
204 203 """
205 204 RMessage().check(msg)
206 205 if msg_type:
207 206 nt.assert_equal(msg['msg_type'], msg_type)
208 207 if parent:
209 208 nt.assert_equal(msg['parent_header']['msg_id'], parent)
210 209 content = msg['content']
211 210 ref = references[msg['msg_type']]
212 211 ref.check(content)
213 212
214 213
215 214 #-----------------------------------------------------------------------------
216 215 # Tests
217 216 #-----------------------------------------------------------------------------
218 217
219 218 # Shell channel
220 219
221 220 def test_execute():
222 221 flush_channels()
223 222
224 223 msg_id = KC.execute(code='x=1')
225 224 reply = KC.get_shell_msg(timeout=TIMEOUT)
226 225 validate_message(reply, 'execute_reply', msg_id)
227 226
228 227
229 228 def test_execute_silent():
230 229 flush_channels()
231 230 msg_id, reply = execute(code='x=1', silent=True)
232 231
233 232 # flush status=idle
234 233 status = KC.iopub_channel.get_msg(timeout=TIMEOUT)
235 234 validate_message(status, 'status', msg_id)
236 235 nt.assert_equal(status['content']['execution_state'], 'idle')
237 236
238 237 nt.assert_raises(Empty, KC.iopub_channel.get_msg, timeout=0.1)
239 238 count = reply['execution_count']
240 239
241 240 msg_id, reply = execute(code='x=2', silent=True)
242 241
243 242 # flush status=idle
244 243 status = KC.iopub_channel.get_msg(timeout=TIMEOUT)
245 244 validate_message(status, 'status', msg_id)
246 245 nt.assert_equal(status['content']['execution_state'], 'idle')
247 246
248 247 nt.assert_raises(Empty, KC.iopub_channel.get_msg, timeout=0.1)
249 248 count_2 = reply['execution_count']
250 249 nt.assert_equal(count_2, count)
251 250
252 251
253 252 def test_execute_error():
254 253 flush_channels()
255 254
256 255 msg_id, reply = execute(code='1/0')
257 256 nt.assert_equal(reply['status'], 'error')
258 257 nt.assert_equal(reply['ename'], 'ZeroDivisionError')
259 258
260 259 error = KC.iopub_channel.get_msg(timeout=TIMEOUT)
261 260 validate_message(error, 'error', msg_id)
262 261
263 262
264 263 def test_execute_inc():
265 264 """execute request should increment execution_count"""
266 265 flush_channels()
267 266
268 267 msg_id, reply = execute(code='x=1')
269 268 count = reply['execution_count']
270 269
271 270 flush_channels()
272 271
273 272 msg_id, reply = execute(code='x=2')
274 273 count_2 = reply['execution_count']
275 274 nt.assert_equal(count_2, count+1)
276 275
277 276
278 277 def test_user_expressions():
279 278 flush_channels()
280 279
281 280 msg_id, reply = execute(code='x=1', user_expressions=dict(foo='x+1'))
282 281 user_expressions = reply['user_expressions']
283 282 nt.assert_equal(user_expressions, {u'foo': {
284 283 u'status': u'ok',
285 284 u'data': {u'text/plain': u'2'},
286 285 u'metadata': {},
287 286 }})
288 287
289 288
290 289 def test_user_expressions_fail():
291 290 flush_channels()
292 291
293 292 msg_id, reply = execute(code='x=0', user_expressions=dict(foo='nosuchname'))
294 293 user_expressions = reply['user_expressions']
295 294 foo = user_expressions['foo']
296 295 nt.assert_equal(foo['status'], 'error')
297 296 nt.assert_equal(foo['ename'], 'NameError')
298 297
299 298
300 299 def test_oinfo():
301 300 flush_channels()
302 301
303 302 msg_id = KC.inspect('a')
304 303 reply = KC.get_shell_msg(timeout=TIMEOUT)
305 304 validate_message(reply, 'inspect_reply', msg_id)
306 305
307 306
308 307 def test_oinfo_found():
309 308 flush_channels()
310 309
311 310 msg_id, reply = execute(code='a=5')
312 311
313 312 msg_id = KC.inspect('a')
314 313 reply = KC.get_shell_msg(timeout=TIMEOUT)
315 314 validate_message(reply, 'inspect_reply', msg_id)
316 315 content = reply['content']
317 316 assert content['found']
318 nt.assert_equal(content['name'], 'a')
319 317 text = content['data']['text/plain']
320 318 nt.assert_in('Type:', text)
321 319 nt.assert_in('Docstring:', text)
322 320
323 321
324 322 def test_oinfo_detail():
325 323 flush_channels()
326 324
327 325 msg_id, reply = execute(code='ip=get_ipython()')
328 326
329 327 msg_id = KC.inspect('ip.object_inspect', cursor_pos=10, detail_level=1)
330 328 reply = KC.get_shell_msg(timeout=TIMEOUT)
331 329 validate_message(reply, 'inspect_reply', msg_id)
332 330 content = reply['content']
333 331 assert content['found']
334 nt.assert_equal(content['name'], 'ip.object_inspect')
335 332 text = content['data']['text/plain']
336 333 nt.assert_in('Definition:', text)
337 334 nt.assert_in('Source:', text)
338 335
339 336
340 337 def test_oinfo_not_found():
341 338 flush_channels()
342 339
343 340 msg_id = KC.inspect('dne')
344 341 reply = KC.get_shell_msg(timeout=TIMEOUT)
345 342 validate_message(reply, 'inspect_reply', msg_id)
346 343 content = reply['content']
347 344 nt.assert_false(content['found'])
348 345
349 346
350 347 def test_complete():
351 348 flush_channels()
352 349
353 350 msg_id, reply = execute(code="alpha = albert = 5")
354 351
355 352 msg_id = KC.complete('al', 2)
356 353 reply = KC.get_shell_msg(timeout=TIMEOUT)
357 354 validate_message(reply, 'complete_reply', msg_id)
358 355 matches = reply['content']['matches']
359 356 for name in ('alpha', 'albert'):
360 357 nt.assert_in(name, matches)
361 358
362 359
363 360 def test_kernel_info_request():
364 361 flush_channels()
365 362
366 363 msg_id = KC.kernel_info()
367 364 reply = KC.get_shell_msg(timeout=TIMEOUT)
368 365 validate_message(reply, 'kernel_info_reply', msg_id)
369 366
370 367
371 368 def test_single_payload():
372 369 flush_channels()
373 370 msg_id, reply = execute(code="for i in range(3):\n"+
374 371 " x=range?\n")
375 372 payload = reply['payload']
376 373 next_input_pls = [pl for pl in payload if pl["source"] == "set_next_input"]
377 374 nt.assert_equal(len(next_input_pls), 1)
378 375
379 376
380 377 # IOPub channel
381 378
382 379
383 380 def test_stream():
384 381 flush_channels()
385 382
386 383 msg_id, reply = execute("print('hi')")
387 384
388 385 stdout = KC.iopub_channel.get_msg(timeout=TIMEOUT)
389 386 validate_message(stdout, 'stream', msg_id)
390 387 content = stdout['content']
391 nt.assert_equal(content['name'], u'stdout')
392 388 nt.assert_equal(content['data'], u'hi\n')
393 389
394 390
395 391 def test_display_data():
396 392 flush_channels()
397 393
398 394 msg_id, reply = execute("from IPython.core.display import display; display(1)")
399 395
400 396 display = KC.iopub_channel.get_msg(timeout=TIMEOUT)
401 397 validate_message(display, 'display_data', parent=msg_id)
402 398 data = display['content']['data']
403 399 nt.assert_equal(data['text/plain'], u'1')
404 400
@@ -1,854 +1,853 b''
1 1 """An interactive kernel that talks to frontends over 0MQ."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from __future__ import print_function
7 7
8 8 import getpass
9 9 import sys
10 10 import time
11 11 import traceback
12 12 import logging
13 13 import uuid
14 14
15 15 from datetime import datetime
16 16 from signal import (
17 17 signal, default_int_handler, SIGINT
18 18 )
19 19
20 20 import zmq
21 21 from zmq.eventloop import ioloop
22 22 from zmq.eventloop.zmqstream import ZMQStream
23 23
24 24 from IPython.config.configurable import Configurable
25 25 from IPython.core.error import StdinNotImplementedError
26 26 from IPython.core import release
27 27 from IPython.utils import py3compat
28 28 from IPython.utils.py3compat import builtin_mod, unicode_type, string_types
29 29 from IPython.utils.jsonutil import json_clean
30 30 from IPython.utils.tokenutil import token_at_cursor
31 31 from IPython.utils.traitlets import (
32 32 Any, Instance, Float, Dict, List, Set, Integer, Unicode,
33 33 Type, Bool,
34 34 )
35 35
36 36 from .serialize import serialize_object, unpack_apply_message
37 37 from .session import Session
38 38 from .zmqshell import ZMQInteractiveShell
39 39
40 40
41 41 #-----------------------------------------------------------------------------
42 42 # Main kernel class
43 43 #-----------------------------------------------------------------------------
44 44
45 45 protocol_version = release.kernel_protocol_version
46 46 ipython_version = release.version
47 47 language_version = sys.version.split()[0]
48 48
49 49
50 50 class Kernel(Configurable):
51 51
52 52 #---------------------------------------------------------------------------
53 53 # Kernel interface
54 54 #---------------------------------------------------------------------------
55 55
56 56 # attribute to override with a GUI
57 57 eventloop = Any(None)
58 58 def _eventloop_changed(self, name, old, new):
59 59 """schedule call to eventloop from IOLoop"""
60 60 loop = ioloop.IOLoop.instance()
61 61 loop.add_callback(self.enter_eventloop)
62 62
63 63 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
64 64 shell_class = Type(ZMQInteractiveShell)
65 65
66 66 session = Instance(Session)
67 67 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
68 68 shell_streams = List()
69 69 control_stream = Instance(ZMQStream)
70 70 iopub_socket = Instance(zmq.Socket)
71 71 stdin_socket = Instance(zmq.Socket)
72 72 log = Instance(logging.Logger)
73 73
74 74 user_module = Any()
75 75 def _user_module_changed(self, name, old, new):
76 76 if self.shell is not None:
77 77 self.shell.user_module = new
78 78
79 79 user_ns = Instance(dict, args=None, allow_none=True)
80 80 def _user_ns_changed(self, name, old, new):
81 81 if self.shell is not None:
82 82 self.shell.user_ns = new
83 83 self.shell.init_user_ns()
84 84
85 85 # identities:
86 86 int_id = Integer(-1)
87 87 ident = Unicode()
88 88
89 89 def _ident_default(self):
90 90 return unicode_type(uuid.uuid4())
91 91
92 92 # Private interface
93 93
94 94 _darwin_app_nap = Bool(True, config=True,
95 95 help="""Whether to use appnope for compatiblity with OS X App Nap.
96 96
97 97 Only affects OS X >= 10.9.
98 98 """
99 99 )
100 100
101 101 # track associations with current request
102 102 _allow_stdin = Bool(False)
103 103 _parent_header = Dict()
104 104 _parent_ident = Any(b'')
105 105 # Time to sleep after flushing the stdout/err buffers in each execute
106 106 # cycle. While this introduces a hard limit on the minimal latency of the
107 107 # execute cycle, it helps prevent output synchronization problems for
108 108 # clients.
109 109 # Units are in seconds. The minimum zmq latency on local host is probably
110 110 # ~150 microseconds, set this to 500us for now. We may need to increase it
111 111 # a little if it's not enough after more interactive testing.
112 112 _execute_sleep = Float(0.0005, config=True)
113 113
114 114 # Frequency of the kernel's event loop.
115 115 # Units are in seconds, kernel subclasses for GUI toolkits may need to
116 116 # adapt to milliseconds.
117 117 _poll_interval = Float(0.05, config=True)
118 118
119 119 # If the shutdown was requested over the network, we leave here the
120 120 # necessary reply message so it can be sent by our registered atexit
121 121 # handler. This ensures that the reply is only sent to clients truly at
122 122 # the end of our shutdown process (which happens after the underlying
123 123 # IPython shell's own shutdown).
124 124 _shutdown_message = None
125 125
126 126 # This is a dict of port number that the kernel is listening on. It is set
127 127 # by record_ports and used by connect_request.
128 128 _recorded_ports = Dict()
129 129
130 130 # A reference to the Python builtin 'raw_input' function.
131 131 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
132 132 _sys_raw_input = Any()
133 133 _sys_eval_input = Any()
134 134
135 135 # set of aborted msg_ids
136 136 aborted = Set()
137 137
138 138
139 139 def __init__(self, **kwargs):
140 140 super(Kernel, self).__init__(**kwargs)
141 141
142 142 # Initialize the InteractiveShell subclass
143 143 self.shell = self.shell_class.instance(parent=self,
144 144 profile_dir = self.profile_dir,
145 145 user_module = self.user_module,
146 146 user_ns = self.user_ns,
147 147 kernel = self,
148 148 )
149 149 self.shell.displayhook.session = self.session
150 150 self.shell.displayhook.pub_socket = self.iopub_socket
151 151 self.shell.displayhook.topic = self._topic('execute_result')
152 152 self.shell.display_pub.session = self.session
153 153 self.shell.display_pub.pub_socket = self.iopub_socket
154 154 self.shell.data_pub.session = self.session
155 155 self.shell.data_pub.pub_socket = self.iopub_socket
156 156
157 157 # TMP - hack while developing
158 158 self.shell._reply_content = None
159 159
160 160 # Build dict of handlers for message types
161 161 msg_types = [ 'execute_request', 'complete_request',
162 162 'inspect_request', 'history_request',
163 163 'kernel_info_request',
164 164 'connect_request', 'shutdown_request',
165 165 'apply_request',
166 166 ]
167 167 self.shell_handlers = {}
168 168 for msg_type in msg_types:
169 169 self.shell_handlers[msg_type] = getattr(self, msg_type)
170 170
171 171 comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]
172 172 comm_manager = self.shell.comm_manager
173 173 for msg_type in comm_msg_types:
174 174 self.shell_handlers[msg_type] = getattr(comm_manager, msg_type)
175 175
176 176 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
177 177 self.control_handlers = {}
178 178 for msg_type in control_msg_types:
179 179 self.control_handlers[msg_type] = getattr(self, msg_type)
180 180
181 181
182 182 def dispatch_control(self, msg):
183 183 """dispatch control requests"""
184 184 idents,msg = self.session.feed_identities(msg, copy=False)
185 185 try:
186 186 msg = self.session.unserialize(msg, content=True, copy=False)
187 187 except:
188 188 self.log.error("Invalid Control Message", exc_info=True)
189 189 return
190 190
191 191 self.log.debug("Control received: %s", msg)
192 192
193 193 header = msg['header']
194 194 msg_id = header['msg_id']
195 195 msg_type = header['msg_type']
196 196
197 197 handler = self.control_handlers.get(msg_type, None)
198 198 if handler is None:
199 199 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
200 200 else:
201 201 try:
202 202 handler(self.control_stream, idents, msg)
203 203 except Exception:
204 204 self.log.error("Exception in control handler:", exc_info=True)
205 205
206 206 def dispatch_shell(self, stream, msg):
207 207 """dispatch shell requests"""
208 208 # flush control requests first
209 209 if self.control_stream:
210 210 self.control_stream.flush()
211 211
212 212 idents,msg = self.session.feed_identities(msg, copy=False)
213 213 try:
214 214 msg = self.session.unserialize(msg, content=True, copy=False)
215 215 except:
216 216 self.log.error("Invalid Message", exc_info=True)
217 217 return
218 218
219 219 header = msg['header']
220 220 msg_id = header['msg_id']
221 221 msg_type = msg['header']['msg_type']
222 222
223 223 # Print some info about this message and leave a '--->' marker, so it's
224 224 # easier to trace visually the message chain when debugging. Each
225 225 # handler prints its message at the end.
226 226 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
227 227 self.log.debug(' Content: %s\n --->\n ', msg['content'])
228 228
229 229 if msg_id in self.aborted:
230 230 self.aborted.remove(msg_id)
231 231 # is it safe to assume a msg_id will not be resubmitted?
232 232 reply_type = msg_type.split('_')[0] + '_reply'
233 233 status = {'status' : 'aborted'}
234 234 md = {'engine' : self.ident}
235 235 md.update(status)
236 236 reply_msg = self.session.send(stream, reply_type, metadata=md,
237 237 content=status, parent=msg, ident=idents)
238 238 return
239 239
240 240 handler = self.shell_handlers.get(msg_type, None)
241 241 if handler is None:
242 242 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
243 243 else:
244 244 # ensure default_int_handler during handler call
245 245 sig = signal(SIGINT, default_int_handler)
246 246 self.log.debug("%s: %s", msg_type, msg)
247 247 try:
248 248 handler(stream, idents, msg)
249 249 except Exception:
250 250 self.log.error("Exception in message handler:", exc_info=True)
251 251 finally:
252 252 signal(SIGINT, sig)
253 253
254 254 def enter_eventloop(self):
255 255 """enter eventloop"""
256 256 self.log.info("entering eventloop %s", self.eventloop)
257 257 for stream in self.shell_streams:
258 258 # flush any pending replies,
259 259 # which may be skipped by entering the eventloop
260 260 stream.flush(zmq.POLLOUT)
261 261 # restore default_int_handler
262 262 signal(SIGINT, default_int_handler)
263 263 while self.eventloop is not None:
264 264 try:
265 265 self.eventloop(self)
266 266 except KeyboardInterrupt:
267 267 # Ctrl-C shouldn't crash the kernel
268 268 self.log.error("KeyboardInterrupt caught in kernel")
269 269 continue
270 270 else:
271 271 # eventloop exited cleanly, this means we should stop (right?)
272 272 self.eventloop = None
273 273 break
274 274 self.log.info("exiting eventloop")
275 275
276 276 def start(self):
277 277 """register dispatchers for streams"""
278 278 self.shell.exit_now = False
279 279 if self.control_stream:
280 280 self.control_stream.on_recv(self.dispatch_control, copy=False)
281 281
282 282 def make_dispatcher(stream):
283 283 def dispatcher(msg):
284 284 return self.dispatch_shell(stream, msg)
285 285 return dispatcher
286 286
287 287 for s in self.shell_streams:
288 288 s.on_recv(make_dispatcher(s), copy=False)
289 289
290 290 # publish idle status
291 291 self._publish_status('starting')
292 292
293 293 def do_one_iteration(self):
294 294 """step eventloop just once"""
295 295 if self.control_stream:
296 296 self.control_stream.flush()
297 297 for stream in self.shell_streams:
298 298 # handle at most one request per iteration
299 299 stream.flush(zmq.POLLIN, 1)
300 300 stream.flush(zmq.POLLOUT)
301 301
302 302
303 303 def record_ports(self, ports):
304 304 """Record the ports that this kernel is using.
305 305
306 306 The creator of the Kernel instance must call this methods if they
307 307 want the :meth:`connect_request` method to return the port numbers.
308 308 """
309 309 self._recorded_ports = ports
310 310
311 311 #---------------------------------------------------------------------------
312 312 # Kernel request handlers
313 313 #---------------------------------------------------------------------------
314 314
315 315 def _make_metadata(self, other=None):
316 316 """init metadata dict, for execute/apply_reply"""
317 317 new_md = {
318 318 'dependencies_met' : True,
319 319 'engine' : self.ident,
320 320 'started': datetime.now(),
321 321 }
322 322 if other:
323 323 new_md.update(other)
324 324 return new_md
325 325
326 326 def _publish_execute_input(self, code, parent, execution_count):
327 327 """Publish the code request on the iopub stream."""
328 328
329 329 self.session.send(self.iopub_socket, u'execute_input',
330 330 {u'code':code, u'execution_count': execution_count},
331 331 parent=parent, ident=self._topic('execute_input')
332 332 )
333 333
334 334 def _publish_status(self, status, parent=None):
335 335 """send status (busy/idle) on IOPub"""
336 336 self.session.send(self.iopub_socket,
337 337 u'status',
338 338 {u'execution_state': status},
339 339 parent=parent,
340 340 ident=self._topic('status'),
341 341 )
342 342
343 343 def _forward_input(self, allow_stdin=False):
344 344 """Forward raw_input and getpass to the current frontend.
345 345
346 346 via input_request
347 347 """
348 348 self._allow_stdin = allow_stdin
349 349
350 350 if py3compat.PY3:
351 351 self._sys_raw_input = builtin_mod.input
352 352 builtin_mod.input = self.raw_input
353 353 else:
354 354 self._sys_raw_input = builtin_mod.raw_input
355 355 self._sys_eval_input = builtin_mod.input
356 356 builtin_mod.raw_input = self.raw_input
357 357 builtin_mod.input = lambda prompt='': eval(self.raw_input(prompt))
358 358 self._save_getpass = getpass.getpass
359 359 getpass.getpass = self.getpass
360 360
361 361 def _restore_input(self):
362 362 """Restore raw_input, getpass"""
363 363 if py3compat.PY3:
364 364 builtin_mod.input = self._sys_raw_input
365 365 else:
366 366 builtin_mod.raw_input = self._sys_raw_input
367 367 builtin_mod.input = self._sys_eval_input
368 368
369 369 getpass.getpass = self._save_getpass
370 370
371 371 def set_parent(self, ident, parent):
372 372 """Record the parent state
373 373
374 374 For associating side effects with their requests.
375 375 """
376 376 self._parent_ident = ident
377 377 self._parent_header = parent
378 378 self.shell.set_parent(parent)
379 379
380 380 def execute_request(self, stream, ident, parent):
381 381 """handle an execute_request"""
382 382
383 383 self._publish_status(u'busy', parent)
384 384
385 385 try:
386 386 content = parent[u'content']
387 387 code = py3compat.cast_unicode_py2(content[u'code'])
388 388 silent = content[u'silent']
389 389 store_history = content.get(u'store_history', not silent)
390 390 except:
391 391 self.log.error("Got bad msg: ")
392 392 self.log.error("%s", parent)
393 393 return
394 394
395 395 md = self._make_metadata(parent['metadata'])
396 396
397 397 shell = self.shell # we'll need this a lot here
398 398
399 399 self._forward_input(content.get('allow_stdin', False))
400 400 # Set the parent message of the display hook and out streams.
401 401 self.set_parent(ident, parent)
402 402
403 403 # Re-broadcast our input for the benefit of listening clients, and
404 404 # start computing output
405 405 if not silent:
406 406 self._publish_execute_input(code, parent, shell.execution_count)
407 407
408 408 reply_content = {}
409 409 # FIXME: the shell calls the exception handler itself.
410 410 shell._reply_content = None
411 411 try:
412 412 shell.run_cell(code, store_history=store_history, silent=silent)
413 413 except:
414 414 status = u'error'
415 415 # FIXME: this code right now isn't being used yet by default,
416 416 # because the run_cell() call above directly fires off exception
417 417 # reporting. This code, therefore, is only active in the scenario
418 418 # where runlines itself has an unhandled exception. We need to
419 419 # uniformize this, for all exception construction to come from a
420 420 # single location in the codbase.
421 421 etype, evalue, tb = sys.exc_info()
422 422 tb_list = traceback.format_exception(etype, evalue, tb)
423 423 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
424 424 else:
425 425 status = u'ok'
426 426 finally:
427 427 self._restore_input()
428 428
429 429 reply_content[u'status'] = status
430 430
431 431 # Return the execution counter so clients can display prompts
432 432 reply_content['execution_count'] = shell.execution_count - 1
433 433
434 434 # FIXME - fish exception info out of shell, possibly left there by
435 435 # runlines. We'll need to clean up this logic later.
436 436 if shell._reply_content is not None:
437 437 reply_content.update(shell._reply_content)
438 438 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
439 439 reply_content['engine_info'] = e_info
440 440 # reset after use
441 441 shell._reply_content = None
442 442
443 443 if 'traceback' in reply_content:
444 444 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
445 445
446 446
447 447 # At this point, we can tell whether the main code execution succeeded
448 448 # or not. If it did, we proceed to evaluate user_expressions
449 449 if reply_content['status'] == 'ok':
450 450 reply_content[u'user_expressions'] = \
451 451 shell.user_expressions(content.get(u'user_expressions', {}))
452 452 else:
453 453 # If there was an error, don't even try to compute expressions
454 454 reply_content[u'user_expressions'] = {}
455 455
456 456 # Payloads should be retrieved regardless of outcome, so we can both
457 457 # recover partial output (that could have been generated early in a
458 458 # block, before an error) and clear the payload system always.
459 459 reply_content[u'payload'] = shell.payload_manager.read_payload()
460 460 # Be agressive about clearing the payload because we don't want
461 461 # it to sit in memory until the next execute_request comes in.
462 462 shell.payload_manager.clear_payload()
463 463
464 464 # Flush output before sending the reply.
465 465 sys.stdout.flush()
466 466 sys.stderr.flush()
467 467 # FIXME: on rare occasions, the flush doesn't seem to make it to the
468 468 # clients... This seems to mitigate the problem, but we definitely need
469 469 # to better understand what's going on.
470 470 if self._execute_sleep:
471 471 time.sleep(self._execute_sleep)
472 472
473 473 # Send the reply.
474 474 reply_content = json_clean(reply_content)
475 475
476 476 md['status'] = reply_content['status']
477 477 if reply_content['status'] == 'error' and \
478 478 reply_content['ename'] == 'UnmetDependency':
479 479 md['dependencies_met'] = False
480 480
481 481 reply_msg = self.session.send(stream, u'execute_reply',
482 482 reply_content, parent, metadata=md,
483 483 ident=ident)
484 484
485 485 self.log.debug("%s", reply_msg)
486 486
487 487 if not silent and reply_msg['content']['status'] == u'error':
488 488 self._abort_queues()
489 489
490 490 self._publish_status(u'idle', parent)
491 491
492 492 def complete_request(self, stream, ident, parent):
493 493 content = parent['content']
494 494 code = content['code']
495 495 cursor_pos = content['cursor_pos']
496 496
497 497 txt, matches = self.shell.complete('', code, cursor_pos)
498 498 matches = {'matches' : matches,
499 499 'cursor_end' : cursor_pos,
500 500 'cursor_start' : cursor_pos - len(txt),
501 501 'metadata' : {},
502 502 'status' : 'ok'}
503 503 matches = json_clean(matches)
504 504 completion_msg = self.session.send(stream, 'complete_reply',
505 505 matches, parent, ident)
506 506 self.log.debug("%s", completion_msg)
507 507
508 508 def inspect_request(self, stream, ident, parent):
509 509 content = parent['content']
510 510
511 511 name = token_at_cursor(content['code'], content['cursor_pos'])
512 512 info = self.shell.object_inspect(name)
513 513
514 514 reply_content = {'status' : 'ok'}
515 515 reply_content['data'] = data = {}
516 516 reply_content['metadata'] = {}
517 reply_content['name'] = name
518 517 reply_content['found'] = info['found']
519 518 if info['found']:
520 519 info_text = self.shell.object_inspect_text(
521 520 name,
522 521 detail_level=content.get('detail_level', 0),
523 522 )
524 523 reply_content['data']['text/plain'] = info_text
525 524 # Before we send this object over, we scrub it for JSON usage
526 525 reply_content = json_clean(reply_content)
527 526 msg = self.session.send(stream, 'inspect_reply',
528 527 reply_content, parent, ident)
529 528 self.log.debug("%s", msg)
530 529
531 530 def history_request(self, stream, ident, parent):
532 531 # We need to pull these out, as passing **kwargs doesn't work with
533 532 # unicode keys before Python 2.6.5.
534 533 hist_access_type = parent['content']['hist_access_type']
535 534 raw = parent['content']['raw']
536 535 output = parent['content']['output']
537 536 if hist_access_type == 'tail':
538 537 n = parent['content']['n']
539 538 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
540 539 include_latest=True)
541 540
542 541 elif hist_access_type == 'range':
543 542 session = parent['content']['session']
544 543 start = parent['content']['start']
545 544 stop = parent['content']['stop']
546 545 hist = self.shell.history_manager.get_range(session, start, stop,
547 546 raw=raw, output=output)
548 547
549 548 elif hist_access_type == 'search':
550 549 n = parent['content'].get('n')
551 550 unique = parent['content'].get('unique', False)
552 551 pattern = parent['content']['pattern']
553 552 hist = self.shell.history_manager.search(
554 553 pattern, raw=raw, output=output, n=n, unique=unique)
555 554
556 555 else:
557 556 hist = []
558 557 hist = list(hist)
559 558 content = {'history' : hist}
560 559 content = json_clean(content)
561 560 msg = self.session.send(stream, 'history_reply',
562 561 content, parent, ident)
563 562 self.log.debug("Sending history reply with %i entries", len(hist))
564 563
565 564 def connect_request(self, stream, ident, parent):
566 565 if self._recorded_ports is not None:
567 566 content = self._recorded_ports.copy()
568 567 else:
569 568 content = {}
570 569 msg = self.session.send(stream, 'connect_reply',
571 570 content, parent, ident)
572 571 self.log.debug("%s", msg)
573 572
574 573 def kernel_info_request(self, stream, ident, parent):
575 574 vinfo = {
576 575 'protocol_version': protocol_version,
577 576 'implementation': 'ipython',
578 577 'implementation_version': ipython_version,
579 578 'language_version': language_version,
580 579 'language': 'python',
581 580 'banner': self.shell.banner,
582 581 }
583 582 msg = self.session.send(stream, 'kernel_info_reply',
584 583 vinfo, parent, ident)
585 584 self.log.debug("%s", msg)
586 585
587 586 def shutdown_request(self, stream, ident, parent):
588 587 self.shell.exit_now = True
589 588 content = dict(status='ok')
590 589 content.update(parent['content'])
591 590 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
592 591 # same content, but different msg_id for broadcasting on IOPub
593 592 self._shutdown_message = self.session.msg(u'shutdown_reply',
594 593 content, parent
595 594 )
596 595
597 596 self._at_shutdown()
598 597 # call sys.exit after a short delay
599 598 loop = ioloop.IOLoop.instance()
600 599 loop.add_timeout(time.time()+0.1, loop.stop)
601 600
602 601 #---------------------------------------------------------------------------
603 602 # Engine methods
604 603 #---------------------------------------------------------------------------
605 604
606 605 def apply_request(self, stream, ident, parent):
607 606 try:
608 607 content = parent[u'content']
609 608 bufs = parent[u'buffers']
610 609 msg_id = parent['header']['msg_id']
611 610 except:
612 611 self.log.error("Got bad msg: %s", parent, exc_info=True)
613 612 return
614 613
615 614 self._publish_status(u'busy', parent)
616 615
617 616 # Set the parent message of the display hook and out streams.
618 617 shell = self.shell
619 618 shell.set_parent(parent)
620 619
621 620 # execute_input_msg = self.session.msg(u'execute_input',{u'code':code}, parent=parent)
622 621 # self.iopub_socket.send(execute_input_msg)
623 622 # self.session.send(self.iopub_socket, u'execute_input', {u'code':code},parent=parent)
624 623 md = self._make_metadata(parent['metadata'])
625 624 try:
626 625 working = shell.user_ns
627 626
628 627 prefix = "_"+str(msg_id).replace("-","")+"_"
629 628
630 629 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
631 630
632 631 fname = getattr(f, '__name__', 'f')
633 632
634 633 fname = prefix+"f"
635 634 argname = prefix+"args"
636 635 kwargname = prefix+"kwargs"
637 636 resultname = prefix+"result"
638 637
639 638 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
640 639 # print ns
641 640 working.update(ns)
642 641 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
643 642 try:
644 643 exec(code, shell.user_global_ns, shell.user_ns)
645 644 result = working.get(resultname)
646 645 finally:
647 646 for key in ns:
648 647 working.pop(key)
649 648
650 649 result_buf = serialize_object(result,
651 650 buffer_threshold=self.session.buffer_threshold,
652 651 item_threshold=self.session.item_threshold,
653 652 )
654 653
655 654 except:
656 655 # invoke IPython traceback formatting
657 656 shell.showtraceback()
658 657 # FIXME - fish exception info out of shell, possibly left there by
659 658 # run_code. We'll need to clean up this logic later.
660 659 reply_content = {}
661 660 if shell._reply_content is not None:
662 661 reply_content.update(shell._reply_content)
663 662 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
664 663 reply_content['engine_info'] = e_info
665 664 # reset after use
666 665 shell._reply_content = None
667 666
668 667 self.session.send(self.iopub_socket, u'error', reply_content, parent=parent,
669 668 ident=self._topic('error'))
670 669 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
671 670 result_buf = []
672 671
673 672 if reply_content['ename'] == 'UnmetDependency':
674 673 md['dependencies_met'] = False
675 674 else:
676 675 reply_content = {'status' : 'ok'}
677 676
678 677 # put 'ok'/'error' status in header, for scheduler introspection:
679 678 md['status'] = reply_content['status']
680 679
681 680 # flush i/o
682 681 sys.stdout.flush()
683 682 sys.stderr.flush()
684 683
685 684 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
686 685 parent=parent, ident=ident,buffers=result_buf, metadata=md)
687 686
688 687 self._publish_status(u'idle', parent)
689 688
690 689 #---------------------------------------------------------------------------
691 690 # Control messages
692 691 #---------------------------------------------------------------------------
693 692
694 693 def abort_request(self, stream, ident, parent):
695 694 """abort a specifig msg by id"""
696 695 msg_ids = parent['content'].get('msg_ids', None)
697 696 if isinstance(msg_ids, string_types):
698 697 msg_ids = [msg_ids]
699 698 if not msg_ids:
700 699 self.abort_queues()
701 700 for mid in msg_ids:
702 701 self.aborted.add(str(mid))
703 702
704 703 content = dict(status='ok')
705 704 reply_msg = self.session.send(stream, 'abort_reply', content=content,
706 705 parent=parent, ident=ident)
707 706 self.log.debug("%s", reply_msg)
708 707
709 708 def clear_request(self, stream, idents, parent):
710 709 """Clear our namespace."""
711 710 self.shell.reset(False)
712 711 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
713 712 content = dict(status='ok'))
714 713
715 714
716 715 #---------------------------------------------------------------------------
717 716 # Protected interface
718 717 #---------------------------------------------------------------------------
719 718
720 719 def _wrap_exception(self, method=None):
721 720 # import here, because _wrap_exception is only used in parallel,
722 721 # and parallel has higher min pyzmq version
723 722 from IPython.parallel.error import wrap_exception
724 723 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
725 724 content = wrap_exception(e_info)
726 725 return content
727 726
728 727 def _topic(self, topic):
729 728 """prefixed topic for IOPub messages"""
730 729 if self.int_id >= 0:
731 730 base = "engine.%i" % self.int_id
732 731 else:
733 732 base = "kernel.%s" % self.ident
734 733
735 734 return py3compat.cast_bytes("%s.%s" % (base, topic))
736 735
737 736 def _abort_queues(self):
738 737 for stream in self.shell_streams:
739 738 if stream:
740 739 self._abort_queue(stream)
741 740
742 741 def _abort_queue(self, stream):
743 742 poller = zmq.Poller()
744 743 poller.register(stream.socket, zmq.POLLIN)
745 744 while True:
746 745 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
747 746 if msg is None:
748 747 return
749 748
750 749 self.log.info("Aborting:")
751 750 self.log.info("%s", msg)
752 751 msg_type = msg['header']['msg_type']
753 752 reply_type = msg_type.split('_')[0] + '_reply'
754 753
755 754 status = {'status' : 'aborted'}
756 755 md = {'engine' : self.ident}
757 756 md.update(status)
758 757 reply_msg = self.session.send(stream, reply_type, metadata=md,
759 758 content=status, parent=msg, ident=idents)
760 759 self.log.debug("%s", reply_msg)
761 760 # We need to wait a bit for requests to come in. This can probably
762 761 # be set shorter for true asynchronous clients.
763 762 poller.poll(50)
764 763
765 764
766 765 def _no_raw_input(self):
767 766 """Raise StdinNotImplentedError if active frontend doesn't support
768 767 stdin."""
769 768 raise StdinNotImplementedError("raw_input was called, but this "
770 769 "frontend does not support stdin.")
771 770
772 771 def getpass(self, prompt=''):
773 772 """Forward getpass to frontends
774 773
775 774 Raises
776 775 ------
777 776 StdinNotImplentedError if active frontend doesn't support stdin.
778 777 """
779 778 if not self._allow_stdin:
780 779 raise StdinNotImplementedError(
781 780 "getpass was called, but this frontend does not support input requests."
782 781 )
783 782 return self._input_request(prompt,
784 783 self._parent_ident,
785 784 self._parent_header,
786 785 password=True,
787 786 )
788 787
789 788 def raw_input(self, prompt=''):
790 789 """Forward raw_input to frontends
791 790
792 791 Raises
793 792 ------
794 793 StdinNotImplentedError if active frontend doesn't support stdin.
795 794 """
796 795 if not self._allow_stdin:
797 796 raise StdinNotImplementedError(
798 797 "raw_input was called, but this frontend does not support input requests."
799 798 )
800 799 return self._input_request(prompt,
801 800 self._parent_ident,
802 801 self._parent_header,
803 802 password=False,
804 803 )
805 804
806 805 def _input_request(self, prompt, ident, parent, password=False):
807 806 # Flush output before making the request.
808 807 sys.stderr.flush()
809 808 sys.stdout.flush()
810 809 # flush the stdin socket, to purge stale replies
811 810 while True:
812 811 try:
813 812 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
814 813 except zmq.ZMQError as e:
815 814 if e.errno == zmq.EAGAIN:
816 815 break
817 816 else:
818 817 raise
819 818
820 819 # Send the input request.
821 820 content = json_clean(dict(prompt=prompt, password=password))
822 821 self.session.send(self.stdin_socket, u'input_request', content, parent,
823 822 ident=ident)
824 823
825 824 # Await a response.
826 825 while True:
827 826 try:
828 827 ident, reply = self.session.recv(self.stdin_socket, 0)
829 828 except Exception:
830 829 self.log.warn("Invalid Message:", exc_info=True)
831 830 except KeyboardInterrupt:
832 831 # re-raise KeyboardInterrupt, to truncate traceback
833 832 raise KeyboardInterrupt
834 833 else:
835 834 break
836 835 try:
837 836 value = py3compat.unicode_to_str(reply['content']['value'])
838 837 except:
839 838 self.log.error("Bad input_reply: %s", parent)
840 839 value = ''
841 840 if value == '\x04':
842 841 # EOF
843 842 raise EOFError
844 843 return value
845 844
846 845 def _at_shutdown(self):
847 846 """Actions taken at shutdown by the kernel, called by python's atexit.
848 847 """
849 848 # io.rprint("Kernel at_shutdown") # dbg
850 849 if self._shutdown_message is not None:
851 850 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
852 851 self.log.debug("%s", self._shutdown_message)
853 852 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
854 853
General Comments 0
You need to be logged in to leave comments. Login now