##// END OF EJS Templates
add implementation and implementation_version to kernel_info_reply
MinRK -
Show More
@@ -1,399 +1,401 b''
1 """Test suite for our zeromq-based message specification."""
1 """Test suite for our zeromq-based message specification."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import re
6 import re
7 from distutils.version import LooseVersion as V
7 from distutils.version import LooseVersion as V
8 from subprocess import PIPE
8 from subprocess import PIPE
9 try:
9 try:
10 from queue import Empty # Py 3
10 from queue import Empty # Py 3
11 except ImportError:
11 except ImportError:
12 from Queue import Empty # Py 2
12 from Queue import Empty # Py 2
13
13
14 import nose.tools as nt
14 import nose.tools as nt
15
15
16 from IPython.kernel import KernelManager
16 from IPython.kernel import KernelManager
17
17
18 from IPython.utils.traitlets import (
18 from IPython.utils.traitlets import (
19 HasTraits, TraitError, Bool, Unicode, Dict, Integer, List, Enum, Any,
19 HasTraits, TraitError, Bool, Unicode, Dict, Integer, List, Enum, Any,
20 )
20 )
21 from IPython.utils.py3compat import string_types, iteritems
21 from IPython.utils.py3compat import string_types, iteritems
22
22
23 from .utils import TIMEOUT, start_global_kernel, flush_channels, execute
23 from .utils import TIMEOUT, start_global_kernel, flush_channels, execute
24
24
25 #-----------------------------------------------------------------------------
25 #-----------------------------------------------------------------------------
26 # Globals
26 # Globals
27 #-----------------------------------------------------------------------------
27 #-----------------------------------------------------------------------------
28 KC = None
28 KC = None
29
29
30 def setup():
30 def setup():
31 global KC
31 global KC
32 KC = start_global_kernel()
32 KC = start_global_kernel()
33
33
34 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
35 # Message Spec References
35 # Message Spec References
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37
37
38 class Reference(HasTraits):
38 class Reference(HasTraits):
39
39
40 """
40 """
41 Base class for message spec specification testing.
41 Base class for message spec specification testing.
42
42
43 This class is the core of the message specification test. The
43 This class is the core of the message specification test. The
44 idea is that child classes implement trait attributes for each
44 idea is that child classes implement trait attributes for each
45 message keys, so that message keys can be tested against these
45 message keys, so that message keys can be tested against these
46 traits using :meth:`check` method.
46 traits using :meth:`check` method.
47
47
48 """
48 """
49
49
50 def check(self, d):
50 def check(self, d):
51 """validate a dict against our traits"""
51 """validate a dict against our traits"""
52 for key in self.trait_names():
52 for key in self.trait_names():
53 nt.assert_in(key, d)
53 nt.assert_in(key, d)
54 # FIXME: always allow None, probably not a good idea
54 # FIXME: always allow None, probably not a good idea
55 if d[key] is None:
55 if d[key] is None:
56 continue
56 continue
57 try:
57 try:
58 setattr(self, key, d[key])
58 setattr(self, key, d[key])
59 except TraitError as e:
59 except TraitError as e:
60 assert False, str(e)
60 assert False, str(e)
61
61
62 class Version(Unicode):
62 class Version(Unicode):
63 def validate(self, obj, value):
63 def validate(self, obj, value):
64 min_version = self.default_value
64 min_version = self.default_value
65 if V(value) < V(min_version):
65 if V(value) < V(min_version):
66 raise TraitError("bad version: %s < %s" % (value, min_version))
66 raise TraitError("bad version: %s < %s" % (value, min_version))
67
67
68 class RMessage(Reference):
68 class RMessage(Reference):
69 msg_id = Unicode()
69 msg_id = Unicode()
70 msg_type = Unicode()
70 msg_type = Unicode()
71 header = Dict()
71 header = Dict()
72 parent_header = Dict()
72 parent_header = Dict()
73 content = Dict()
73 content = Dict()
74
74
75 def check(self, d):
75 def check(self, d):
76 super(RMessage, self).check(d)
76 super(RMessage, self).check(d)
77 RHeader().check(self.header)
77 RHeader().check(self.header)
78 if self.parent_header:
78 if self.parent_header:
79 RHeader().check(self.parent_header)
79 RHeader().check(self.parent_header)
80
80
81 class RHeader(Reference):
81 class RHeader(Reference):
82 msg_id = Unicode()
82 msg_id = Unicode()
83 msg_type = Unicode()
83 msg_type = Unicode()
84 session = Unicode()
84 session = Unicode()
85 username = Unicode()
85 username = Unicode()
86 version = Version('5.0')
86 version = Version('5.0')
87
87
88 mime_pat = re.compile(r'\w+/\w+')
88 mime_pat = re.compile(r'\w+/\w+')
89
89
90 class MimeBundle(Reference):
90 class MimeBundle(Reference):
91 metadata = Dict()
91 metadata = Dict()
92 data = Dict()
92 data = Dict()
93 def _data_changed(self, name, old, new):
93 def _data_changed(self, name, old, new):
94 for k,v in iteritems(new):
94 for k,v in iteritems(new):
95 assert mime_pat.match(k)
95 assert mime_pat.match(k)
96 nt.assert_is_instance(v, string_types)
96 nt.assert_is_instance(v, string_types)
97
97
98 # shell replies
98 # shell replies
99
99
100 class ExecuteReply(Reference):
100 class ExecuteReply(Reference):
101 execution_count = Integer()
101 execution_count = Integer()
102 status = Enum((u'ok', u'error'))
102 status = Enum((u'ok', u'error'))
103
103
104 def check(self, d):
104 def check(self, d):
105 Reference.check(self, d)
105 Reference.check(self, d)
106 if d['status'] == 'ok':
106 if d['status'] == 'ok':
107 ExecuteReplyOkay().check(d)
107 ExecuteReplyOkay().check(d)
108 elif d['status'] == 'error':
108 elif d['status'] == 'error':
109 ExecuteReplyError().check(d)
109 ExecuteReplyError().check(d)
110
110
111
111
112 class ExecuteReplyOkay(Reference):
112 class ExecuteReplyOkay(Reference):
113 payload = List(Dict)
113 payload = List(Dict)
114 user_expressions = Dict()
114 user_expressions = Dict()
115
115
116
116
117 class ExecuteReplyError(Reference):
117 class ExecuteReplyError(Reference):
118 ename = Unicode()
118 ename = Unicode()
119 evalue = Unicode()
119 evalue = Unicode()
120 traceback = List(Unicode)
120 traceback = List(Unicode)
121
121
122
122
123 class OInfoReply(MimeBundle):
123 class OInfoReply(MimeBundle):
124 name = Unicode()
124 name = Unicode()
125 found = Bool()
125 found = Bool()
126
126
127
127
128 class ArgSpec(Reference):
128 class ArgSpec(Reference):
129 args = List(Unicode)
129 args = List(Unicode)
130 varargs = Unicode()
130 varargs = Unicode()
131 varkw = Unicode()
131 varkw = Unicode()
132 defaults = List()
132 defaults = List()
133
133
134
134
135 class Status(Reference):
135 class Status(Reference):
136 execution_state = Enum((u'busy', u'idle', u'starting'))
136 execution_state = Enum((u'busy', u'idle', u'starting'))
137
137
138
138
139 class CompleteReply(Reference):
139 class CompleteReply(Reference):
140 matches = List(Unicode)
140 matches = List(Unicode)
141
141
142
142
143 class KernelInfoReply(Reference):
143 class KernelInfoReply(Reference):
144 protocol_version = Version('5.0')
144 protocol_version = Version('5.0')
145 ipython_version = Version('2.0')
145 implementation = Unicode('ipython')
146 implementation_version = Version('2.1')
146 language_version = Version('2.7')
147 language_version = Version('2.7')
147 language = Unicode()
148 language = Unicode('python')
149 banner = Unicode()
148
150
149
151
150 # IOPub messages
152 # IOPub messages
151
153
152 class ExecuteInput(Reference):
154 class ExecuteInput(Reference):
153 code = Unicode()
155 code = Unicode()
154 execution_count = Integer()
156 execution_count = Integer()
155
157
156
158
157 Error = ExecuteReplyError
159 Error = ExecuteReplyError
158
160
159
161
160 class Stream(Reference):
162 class Stream(Reference):
161 name = Enum((u'stdout', u'stderr'))
163 name = Enum((u'stdout', u'stderr'))
162 data = Unicode()
164 data = Unicode()
163
165
164
166
165 class DisplayData(MimeBundle):
167 class DisplayData(MimeBundle):
166 source = Unicode()
168 source = Unicode()
167
169
168
170
169 class ExecuteResult(MimeBundle):
171 class ExecuteResult(MimeBundle):
170 execution_count = Integer()
172 execution_count = Integer()
171
173
172
174
173 references = {
175 references = {
174 'execute_reply' : ExecuteReply(),
176 'execute_reply' : ExecuteReply(),
175 'object_info_reply' : OInfoReply(),
177 'object_info_reply' : OInfoReply(),
176 'status' : Status(),
178 'status' : Status(),
177 'complete_reply' : CompleteReply(),
179 'complete_reply' : CompleteReply(),
178 'kernel_info_reply': KernelInfoReply(),
180 'kernel_info_reply': KernelInfoReply(),
179 'execute_input' : ExecuteInput(),
181 'execute_input' : ExecuteInput(),
180 'execute_result' : ExecuteResult(),
182 'execute_result' : ExecuteResult(),
181 'error' : Error(),
183 'error' : Error(),
182 'stream' : Stream(),
184 'stream' : Stream(),
183 'display_data' : DisplayData(),
185 'display_data' : DisplayData(),
184 'header' : RHeader(),
186 'header' : RHeader(),
185 }
187 }
186 """
188 """
187 Specifications of `content` part of the reply messages.
189 Specifications of `content` part of the reply messages.
188 """
190 """
189
191
190
192
191 def validate_message(msg, msg_type=None, parent=None):
193 def validate_message(msg, msg_type=None, parent=None):
192 """validate a message
194 """validate a message
193
195
194 This is a generator, and must be iterated through to actually
196 This is a generator, and must be iterated through to actually
195 trigger each test.
197 trigger each test.
196
198
197 If msg_type and/or parent are given, the msg_type and/or parent msg_id
199 If msg_type and/or parent are given, the msg_type and/or parent msg_id
198 are compared with the given values.
200 are compared with the given values.
199 """
201 """
200 RMessage().check(msg)
202 RMessage().check(msg)
201 if msg_type:
203 if msg_type:
202 nt.assert_equal(msg['msg_type'], msg_type)
204 nt.assert_equal(msg['msg_type'], msg_type)
203 if parent:
205 if parent:
204 nt.assert_equal(msg['parent_header']['msg_id'], parent)
206 nt.assert_equal(msg['parent_header']['msg_id'], parent)
205 content = msg['content']
207 content = msg['content']
206 ref = references[msg['msg_type']]
208 ref = references[msg['msg_type']]
207 ref.check(content)
209 ref.check(content)
208
210
209
211
210 #-----------------------------------------------------------------------------
212 #-----------------------------------------------------------------------------
211 # Tests
213 # Tests
212 #-----------------------------------------------------------------------------
214 #-----------------------------------------------------------------------------
213
215
214 # Shell channel
216 # Shell channel
215
217
216 def test_execute():
218 def test_execute():
217 flush_channels()
219 flush_channels()
218
220
219 msg_id = KC.execute(code='x=1')
221 msg_id = KC.execute(code='x=1')
220 reply = KC.get_shell_msg(timeout=TIMEOUT)
222 reply = KC.get_shell_msg(timeout=TIMEOUT)
221 validate_message(reply, 'execute_reply', msg_id)
223 validate_message(reply, 'execute_reply', msg_id)
222
224
223
225
224 def test_execute_silent():
226 def test_execute_silent():
225 flush_channels()
227 flush_channels()
226 msg_id, reply = execute(code='x=1', silent=True)
228 msg_id, reply = execute(code='x=1', silent=True)
227
229
228 # flush status=idle
230 # flush status=idle
229 status = KC.iopub_channel.get_msg(timeout=TIMEOUT)
231 status = KC.iopub_channel.get_msg(timeout=TIMEOUT)
230 validate_message(status, 'status', msg_id)
232 validate_message(status, 'status', msg_id)
231 nt.assert_equal(status['content']['execution_state'], 'idle')
233 nt.assert_equal(status['content']['execution_state'], 'idle')
232
234
233 nt.assert_raises(Empty, KC.iopub_channel.get_msg, timeout=0.1)
235 nt.assert_raises(Empty, KC.iopub_channel.get_msg, timeout=0.1)
234 count = reply['execution_count']
236 count = reply['execution_count']
235
237
236 msg_id, reply = execute(code='x=2', silent=True)
238 msg_id, reply = execute(code='x=2', silent=True)
237
239
238 # flush status=idle
240 # flush status=idle
239 status = KC.iopub_channel.get_msg(timeout=TIMEOUT)
241 status = KC.iopub_channel.get_msg(timeout=TIMEOUT)
240 validate_message(status, 'status', msg_id)
242 validate_message(status, 'status', msg_id)
241 nt.assert_equal(status['content']['execution_state'], 'idle')
243 nt.assert_equal(status['content']['execution_state'], 'idle')
242
244
243 nt.assert_raises(Empty, KC.iopub_channel.get_msg, timeout=0.1)
245 nt.assert_raises(Empty, KC.iopub_channel.get_msg, timeout=0.1)
244 count_2 = reply['execution_count']
246 count_2 = reply['execution_count']
245 nt.assert_equal(count_2, count)
247 nt.assert_equal(count_2, count)
246
248
247
249
248 def test_execute_error():
250 def test_execute_error():
249 flush_channels()
251 flush_channels()
250
252
251 msg_id, reply = execute(code='1/0')
253 msg_id, reply = execute(code='1/0')
252 nt.assert_equal(reply['status'], 'error')
254 nt.assert_equal(reply['status'], 'error')
253 nt.assert_equal(reply['ename'], 'ZeroDivisionError')
255 nt.assert_equal(reply['ename'], 'ZeroDivisionError')
254
256
255 error = KC.iopub_channel.get_msg(timeout=TIMEOUT)
257 error = KC.iopub_channel.get_msg(timeout=TIMEOUT)
256 validate_message(error, 'error', msg_id)
258 validate_message(error, 'error', msg_id)
257
259
258
260
259 def test_execute_inc():
261 def test_execute_inc():
260 """execute request should increment execution_count"""
262 """execute request should increment execution_count"""
261 flush_channels()
263 flush_channels()
262
264
263 msg_id, reply = execute(code='x=1')
265 msg_id, reply = execute(code='x=1')
264 count = reply['execution_count']
266 count = reply['execution_count']
265
267
266 flush_channels()
268 flush_channels()
267
269
268 msg_id, reply = execute(code='x=2')
270 msg_id, reply = execute(code='x=2')
269 count_2 = reply['execution_count']
271 count_2 = reply['execution_count']
270 nt.assert_equal(count_2, count+1)
272 nt.assert_equal(count_2, count+1)
271
273
272
274
273 def test_user_expressions():
275 def test_user_expressions():
274 flush_channels()
276 flush_channels()
275
277
276 msg_id, reply = execute(code='x=1', user_expressions=dict(foo='x+1'))
278 msg_id, reply = execute(code='x=1', user_expressions=dict(foo='x+1'))
277 user_expressions = reply['user_expressions']
279 user_expressions = reply['user_expressions']
278 nt.assert_equal(user_expressions, {u'foo': {
280 nt.assert_equal(user_expressions, {u'foo': {
279 u'status': u'ok',
281 u'status': u'ok',
280 u'data': {u'text/plain': u'2'},
282 u'data': {u'text/plain': u'2'},
281 u'metadata': {},
283 u'metadata': {},
282 }})
284 }})
283
285
284
286
285 def test_user_expressions_fail():
287 def test_user_expressions_fail():
286 flush_channels()
288 flush_channels()
287
289
288 msg_id, reply = execute(code='x=0', user_expressions=dict(foo='nosuchname'))
290 msg_id, reply = execute(code='x=0', user_expressions=dict(foo='nosuchname'))
289 user_expressions = reply['user_expressions']
291 user_expressions = reply['user_expressions']
290 foo = user_expressions['foo']
292 foo = user_expressions['foo']
291 nt.assert_equal(foo['status'], 'error')
293 nt.assert_equal(foo['status'], 'error')
292 nt.assert_equal(foo['ename'], 'NameError')
294 nt.assert_equal(foo['ename'], 'NameError')
293
295
294
296
295 def test_oinfo():
297 def test_oinfo():
296 flush_channels()
298 flush_channels()
297
299
298 msg_id = KC.object_info('a')
300 msg_id = KC.object_info('a')
299 reply = KC.get_shell_msg(timeout=TIMEOUT)
301 reply = KC.get_shell_msg(timeout=TIMEOUT)
300 validate_message(reply, 'object_info_reply', msg_id)
302 validate_message(reply, 'object_info_reply', msg_id)
301
303
302
304
303 def test_oinfo_found():
305 def test_oinfo_found():
304 flush_channels()
306 flush_channels()
305
307
306 msg_id, reply = execute(code='a=5')
308 msg_id, reply = execute(code='a=5')
307
309
308 msg_id = KC.object_info('a')
310 msg_id = KC.object_info('a')
309 reply = KC.get_shell_msg(timeout=TIMEOUT)
311 reply = KC.get_shell_msg(timeout=TIMEOUT)
310 validate_message(reply, 'object_info_reply', msg_id)
312 validate_message(reply, 'object_info_reply', msg_id)
311 content = reply['content']
313 content = reply['content']
312 assert content['found']
314 assert content['found']
313 nt.assert_equal(content['name'], 'a')
315 nt.assert_equal(content['name'], 'a')
314 text = content['data']['text/plain']
316 text = content['data']['text/plain']
315 nt.assert_in('Type:', text)
317 nt.assert_in('Type:', text)
316 nt.assert_in('Docstring:', text)
318 nt.assert_in('Docstring:', text)
317
319
318
320
319 def test_oinfo_detail():
321 def test_oinfo_detail():
320 flush_channels()
322 flush_channels()
321
323
322 msg_id, reply = execute(code='ip=get_ipython()')
324 msg_id, reply = execute(code='ip=get_ipython()')
323
325
324 msg_id = KC.object_info('ip.object_inspect', cursor_pos=10, detail_level=1)
326 msg_id = KC.object_info('ip.object_inspect', cursor_pos=10, detail_level=1)
325 reply = KC.get_shell_msg(timeout=TIMEOUT)
327 reply = KC.get_shell_msg(timeout=TIMEOUT)
326 validate_message(reply, 'object_info_reply', msg_id)
328 validate_message(reply, 'object_info_reply', msg_id)
327 content = reply['content']
329 content = reply['content']
328 assert content['found']
330 assert content['found']
329 nt.assert_equal(content['name'], 'ip.object_inspect')
331 nt.assert_equal(content['name'], 'ip.object_inspect')
330 text = content['data']['text/plain']
332 text = content['data']['text/plain']
331 nt.assert_in('Definition:', text)
333 nt.assert_in('Definition:', text)
332 nt.assert_in('Source:', text)
334 nt.assert_in('Source:', text)
333
335
334
336
335 def test_oinfo_not_found():
337 def test_oinfo_not_found():
336 flush_channels()
338 flush_channels()
337
339
338 msg_id = KC.object_info('dne')
340 msg_id = KC.object_info('dne')
339 reply = KC.get_shell_msg(timeout=TIMEOUT)
341 reply = KC.get_shell_msg(timeout=TIMEOUT)
340 validate_message(reply, 'object_info_reply', msg_id)
342 validate_message(reply, 'object_info_reply', msg_id)
341 content = reply['content']
343 content = reply['content']
342 nt.assert_false(content['found'])
344 nt.assert_false(content['found'])
343
345
344
346
345 def test_complete():
347 def test_complete():
346 flush_channels()
348 flush_channels()
347
349
348 msg_id, reply = execute(code="alpha = albert = 5")
350 msg_id, reply = execute(code="alpha = albert = 5")
349
351
350 msg_id = KC.complete('al', 2)
352 msg_id = KC.complete('al', 2)
351 reply = KC.get_shell_msg(timeout=TIMEOUT)
353 reply = KC.get_shell_msg(timeout=TIMEOUT)
352 validate_message(reply, 'complete_reply', msg_id)
354 validate_message(reply, 'complete_reply', msg_id)
353 matches = reply['content']['matches']
355 matches = reply['content']['matches']
354 for name in ('alpha', 'albert'):
356 for name in ('alpha', 'albert'):
355 nt.assert_in(name, matches)
357 nt.assert_in(name, matches)
356
358
357
359
358 def test_kernel_info_request():
360 def test_kernel_info_request():
359 flush_channels()
361 flush_channels()
360
362
361 msg_id = KC.kernel_info()
363 msg_id = KC.kernel_info()
362 reply = KC.get_shell_msg(timeout=TIMEOUT)
364 reply = KC.get_shell_msg(timeout=TIMEOUT)
363 validate_message(reply, 'kernel_info_reply', msg_id)
365 validate_message(reply, 'kernel_info_reply', msg_id)
364
366
365
367
366 def test_single_payload():
368 def test_single_payload():
367 flush_channels()
369 flush_channels()
368 msg_id, reply = execute(code="for i in range(3):\n"+
370 msg_id, reply = execute(code="for i in range(3):\n"+
369 " x=range?\n")
371 " x=range?\n")
370 payload = reply['payload']
372 payload = reply['payload']
371 next_input_pls = [pl for pl in payload if pl["source"] == "set_next_input"]
373 next_input_pls = [pl for pl in payload if pl["source"] == "set_next_input"]
372 nt.assert_equal(len(next_input_pls), 1)
374 nt.assert_equal(len(next_input_pls), 1)
373
375
374
376
375 # IOPub channel
377 # IOPub channel
376
378
377
379
378 def test_stream():
380 def test_stream():
379 flush_channels()
381 flush_channels()
380
382
381 msg_id, reply = execute("print('hi')")
383 msg_id, reply = execute("print('hi')")
382
384
383 stdout = KC.iopub_channel.get_msg(timeout=TIMEOUT)
385 stdout = KC.iopub_channel.get_msg(timeout=TIMEOUT)
384 validate_message(stdout, 'stream', msg_id)
386 validate_message(stdout, 'stream', msg_id)
385 content = stdout['content']
387 content = stdout['content']
386 nt.assert_equal(content['name'], u'stdout')
388 nt.assert_equal(content['name'], u'stdout')
387 nt.assert_equal(content['data'], u'hi\n')
389 nt.assert_equal(content['data'], u'hi\n')
388
390
389
391
390 def test_display_data():
392 def test_display_data():
391 flush_channels()
393 flush_channels()
392
394
393 msg_id, reply = execute("from IPython.core.display import display; display(1)")
395 msg_id, reply = execute("from IPython.core.display import display; display(1)")
394
396
395 display = KC.iopub_channel.get_msg(timeout=TIMEOUT)
397 display = KC.iopub_channel.get_msg(timeout=TIMEOUT)
396 validate_message(display, 'display_data', parent=msg_id)
398 validate_message(display, 'display_data', parent=msg_id)
397 data = display['content']['data']
399 data = display['content']['data']
398 nt.assert_equal(data['text/plain'], u'1')
400 nt.assert_equal(data['text/plain'], u'1')
399
401
@@ -1,851 +1,852 b''
1 """An interactive kernel that talks to frontends over 0MQ."""
1 """An interactive kernel that talks to frontends over 0MQ."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from __future__ import print_function
6 from __future__ import print_function
7
7
8 import getpass
8 import getpass
9 import sys
9 import sys
10 import time
10 import time
11 import traceback
11 import traceback
12 import logging
12 import logging
13 import uuid
13 import uuid
14
14
15 from datetime import datetime
15 from datetime import datetime
16 from signal import (
16 from signal import (
17 signal, default_int_handler, SIGINT
17 signal, default_int_handler, SIGINT
18 )
18 )
19
19
20 import zmq
20 import zmq
21 from zmq.eventloop import ioloop
21 from zmq.eventloop import ioloop
22 from zmq.eventloop.zmqstream import ZMQStream
22 from zmq.eventloop.zmqstream import ZMQStream
23
23
24 from IPython.config.configurable import Configurable
24 from IPython.config.configurable import Configurable
25 from IPython.core.error import StdinNotImplementedError
25 from IPython.core.error import StdinNotImplementedError
26 from IPython.core import release
26 from IPython.core import release
27 from IPython.utils import py3compat
27 from IPython.utils import py3compat
28 from IPython.utils.py3compat import builtin_mod, unicode_type, string_types
28 from IPython.utils.py3compat import builtin_mod, unicode_type, string_types
29 from IPython.utils.jsonutil import json_clean
29 from IPython.utils.jsonutil import json_clean
30 from IPython.utils.tokenutil import token_at_cursor
30 from IPython.utils.tokenutil import token_at_cursor
31 from IPython.utils.traitlets import (
31 from IPython.utils.traitlets import (
32 Any, Instance, Float, Dict, List, Set, Integer, Unicode,
32 Any, Instance, Float, Dict, List, Set, Integer, Unicode,
33 Type, Bool,
33 Type, Bool,
34 )
34 )
35
35
36 from .serialize import serialize_object, unpack_apply_message
36 from .serialize import serialize_object, unpack_apply_message
37 from .session import Session
37 from .session import Session
38 from .zmqshell import ZMQInteractiveShell
38 from .zmqshell import ZMQInteractiveShell
39
39
40
40
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42 # Main kernel class
42 # Main kernel class
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44
44
45 protocol_version = release.kernel_protocol_version
45 protocol_version = release.kernel_protocol_version
46 ipython_version = release.version
46 ipython_version = release.version
47 language_version = sys.version.split()[0]
47 language_version = sys.version.split()[0]
48
48
49
49
50 class Kernel(Configurable):
50 class Kernel(Configurable):
51
51
52 #---------------------------------------------------------------------------
52 #---------------------------------------------------------------------------
53 # Kernel interface
53 # Kernel interface
54 #---------------------------------------------------------------------------
54 #---------------------------------------------------------------------------
55
55
56 # attribute to override with a GUI
56 # attribute to override with a GUI
57 eventloop = Any(None)
57 eventloop = Any(None)
58 def _eventloop_changed(self, name, old, new):
58 def _eventloop_changed(self, name, old, new):
59 """schedule call to eventloop from IOLoop"""
59 """schedule call to eventloop from IOLoop"""
60 loop = ioloop.IOLoop.instance()
60 loop = ioloop.IOLoop.instance()
61 loop.add_callback(self.enter_eventloop)
61 loop.add_callback(self.enter_eventloop)
62
62
63 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
63 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
64 shell_class = Type(ZMQInteractiveShell)
64 shell_class = Type(ZMQInteractiveShell)
65
65
66 session = Instance(Session)
66 session = Instance(Session)
67 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
67 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
68 shell_streams = List()
68 shell_streams = List()
69 control_stream = Instance(ZMQStream)
69 control_stream = Instance(ZMQStream)
70 iopub_socket = Instance(zmq.Socket)
70 iopub_socket = Instance(zmq.Socket)
71 stdin_socket = Instance(zmq.Socket)
71 stdin_socket = Instance(zmq.Socket)
72 log = Instance(logging.Logger)
72 log = Instance(logging.Logger)
73
73
74 user_module = Any()
74 user_module = Any()
75 def _user_module_changed(self, name, old, new):
75 def _user_module_changed(self, name, old, new):
76 if self.shell is not None:
76 if self.shell is not None:
77 self.shell.user_module = new
77 self.shell.user_module = new
78
78
79 user_ns = Instance(dict, args=None, allow_none=True)
79 user_ns = Instance(dict, args=None, allow_none=True)
80 def _user_ns_changed(self, name, old, new):
80 def _user_ns_changed(self, name, old, new):
81 if self.shell is not None:
81 if self.shell is not None:
82 self.shell.user_ns = new
82 self.shell.user_ns = new
83 self.shell.init_user_ns()
83 self.shell.init_user_ns()
84
84
85 # identities:
85 # identities:
86 int_id = Integer(-1)
86 int_id = Integer(-1)
87 ident = Unicode()
87 ident = Unicode()
88
88
89 def _ident_default(self):
89 def _ident_default(self):
90 return unicode_type(uuid.uuid4())
90 return unicode_type(uuid.uuid4())
91
91
92 # Private interface
92 # Private interface
93
93
94 _darwin_app_nap = Bool(True, config=True,
94 _darwin_app_nap = Bool(True, config=True,
95 help="""Whether to use appnope for compatiblity with OS X App Nap.
95 help="""Whether to use appnope for compatiblity with OS X App Nap.
96
96
97 Only affects OS X >= 10.9.
97 Only affects OS X >= 10.9.
98 """
98 """
99 )
99 )
100
100
101 # track associations with current request
101 # track associations with current request
102 _allow_stdin = Bool(False)
102 _allow_stdin = Bool(False)
103 _parent_header = Dict()
103 _parent_header = Dict()
104 _parent_ident = Any(b'')
104 _parent_ident = Any(b'')
105 # Time to sleep after flushing the stdout/err buffers in each execute
105 # Time to sleep after flushing the stdout/err buffers in each execute
106 # cycle. While this introduces a hard limit on the minimal latency of the
106 # cycle. While this introduces a hard limit on the minimal latency of the
107 # execute cycle, it helps prevent output synchronization problems for
107 # execute cycle, it helps prevent output synchronization problems for
108 # clients.
108 # clients.
109 # Units are in seconds. The minimum zmq latency on local host is probably
109 # Units are in seconds. The minimum zmq latency on local host is probably
110 # ~150 microseconds, set this to 500us for now. We may need to increase it
110 # ~150 microseconds, set this to 500us for now. We may need to increase it
111 # a little if it's not enough after more interactive testing.
111 # a little if it's not enough after more interactive testing.
112 _execute_sleep = Float(0.0005, config=True)
112 _execute_sleep = Float(0.0005, config=True)
113
113
114 # Frequency of the kernel's event loop.
114 # Frequency of the kernel's event loop.
115 # Units are in seconds, kernel subclasses for GUI toolkits may need to
115 # Units are in seconds, kernel subclasses for GUI toolkits may need to
116 # adapt to milliseconds.
116 # adapt to milliseconds.
117 _poll_interval = Float(0.05, config=True)
117 _poll_interval = Float(0.05, config=True)
118
118
119 # If the shutdown was requested over the network, we leave here the
119 # If the shutdown was requested over the network, we leave here the
120 # necessary reply message so it can be sent by our registered atexit
120 # necessary reply message so it can be sent by our registered atexit
121 # handler. This ensures that the reply is only sent to clients truly at
121 # handler. This ensures that the reply is only sent to clients truly at
122 # the end of our shutdown process (which happens after the underlying
122 # the end of our shutdown process (which happens after the underlying
123 # IPython shell's own shutdown).
123 # IPython shell's own shutdown).
124 _shutdown_message = None
124 _shutdown_message = None
125
125
126 # This is a dict of port number that the kernel is listening on. It is set
126 # This is a dict of port number that the kernel is listening on. It is set
127 # by record_ports and used by connect_request.
127 # by record_ports and used by connect_request.
128 _recorded_ports = Dict()
128 _recorded_ports = Dict()
129
129
130 # A reference to the Python builtin 'raw_input' function.
130 # A reference to the Python builtin 'raw_input' function.
131 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
131 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
132 _sys_raw_input = Any()
132 _sys_raw_input = Any()
133 _sys_eval_input = Any()
133 _sys_eval_input = Any()
134
134
135 # set of aborted msg_ids
135 # set of aborted msg_ids
136 aborted = Set()
136 aborted = Set()
137
137
138
138
139 def __init__(self, **kwargs):
139 def __init__(self, **kwargs):
140 super(Kernel, self).__init__(**kwargs)
140 super(Kernel, self).__init__(**kwargs)
141
141
142 # Initialize the InteractiveShell subclass
142 # Initialize the InteractiveShell subclass
143 self.shell = self.shell_class.instance(parent=self,
143 self.shell = self.shell_class.instance(parent=self,
144 profile_dir = self.profile_dir,
144 profile_dir = self.profile_dir,
145 user_module = self.user_module,
145 user_module = self.user_module,
146 user_ns = self.user_ns,
146 user_ns = self.user_ns,
147 kernel = self,
147 kernel = self,
148 )
148 )
149 self.shell.displayhook.session = self.session
149 self.shell.displayhook.session = self.session
150 self.shell.displayhook.pub_socket = self.iopub_socket
150 self.shell.displayhook.pub_socket = self.iopub_socket
151 self.shell.displayhook.topic = self._topic('execute_result')
151 self.shell.displayhook.topic = self._topic('execute_result')
152 self.shell.display_pub.session = self.session
152 self.shell.display_pub.session = self.session
153 self.shell.display_pub.pub_socket = self.iopub_socket
153 self.shell.display_pub.pub_socket = self.iopub_socket
154 self.shell.data_pub.session = self.session
154 self.shell.data_pub.session = self.session
155 self.shell.data_pub.pub_socket = self.iopub_socket
155 self.shell.data_pub.pub_socket = self.iopub_socket
156
156
157 # TMP - hack while developing
157 # TMP - hack while developing
158 self.shell._reply_content = None
158 self.shell._reply_content = None
159
159
160 # Build dict of handlers for message types
160 # Build dict of handlers for message types
161 msg_types = [ 'execute_request', 'complete_request',
161 msg_types = [ 'execute_request', 'complete_request',
162 'object_info_request', 'history_request',
162 'object_info_request', 'history_request',
163 'kernel_info_request',
163 'kernel_info_request',
164 'connect_request', 'shutdown_request',
164 'connect_request', 'shutdown_request',
165 'apply_request',
165 'apply_request',
166 ]
166 ]
167 self.shell_handlers = {}
167 self.shell_handlers = {}
168 for msg_type in msg_types:
168 for msg_type in msg_types:
169 self.shell_handlers[msg_type] = getattr(self, msg_type)
169 self.shell_handlers[msg_type] = getattr(self, msg_type)
170
170
171 comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]
171 comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]
172 comm_manager = self.shell.comm_manager
172 comm_manager = self.shell.comm_manager
173 for msg_type in comm_msg_types:
173 for msg_type in comm_msg_types:
174 self.shell_handlers[msg_type] = getattr(comm_manager, msg_type)
174 self.shell_handlers[msg_type] = getattr(comm_manager, msg_type)
175
175
176 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
176 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
177 self.control_handlers = {}
177 self.control_handlers = {}
178 for msg_type in control_msg_types:
178 for msg_type in control_msg_types:
179 self.control_handlers[msg_type] = getattr(self, msg_type)
179 self.control_handlers[msg_type] = getattr(self, msg_type)
180
180
181
181
182 def dispatch_control(self, msg):
182 def dispatch_control(self, msg):
183 """dispatch control requests"""
183 """dispatch control requests"""
184 idents,msg = self.session.feed_identities(msg, copy=False)
184 idents,msg = self.session.feed_identities(msg, copy=False)
185 try:
185 try:
186 msg = self.session.unserialize(msg, content=True, copy=False)
186 msg = self.session.unserialize(msg, content=True, copy=False)
187 except:
187 except:
188 self.log.error("Invalid Control Message", exc_info=True)
188 self.log.error("Invalid Control Message", exc_info=True)
189 return
189 return
190
190
191 self.log.debug("Control received: %s", msg)
191 self.log.debug("Control received: %s", msg)
192
192
193 header = msg['header']
193 header = msg['header']
194 msg_id = header['msg_id']
194 msg_id = header['msg_id']
195 msg_type = header['msg_type']
195 msg_type = header['msg_type']
196
196
197 handler = self.control_handlers.get(msg_type, None)
197 handler = self.control_handlers.get(msg_type, None)
198 if handler is None:
198 if handler is None:
199 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
199 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
200 else:
200 else:
201 try:
201 try:
202 handler(self.control_stream, idents, msg)
202 handler(self.control_stream, idents, msg)
203 except Exception:
203 except Exception:
204 self.log.error("Exception in control handler:", exc_info=True)
204 self.log.error("Exception in control handler:", exc_info=True)
205
205
206 def dispatch_shell(self, stream, msg):
206 def dispatch_shell(self, stream, msg):
207 """dispatch shell requests"""
207 """dispatch shell requests"""
208 # flush control requests first
208 # flush control requests first
209 if self.control_stream:
209 if self.control_stream:
210 self.control_stream.flush()
210 self.control_stream.flush()
211
211
212 idents,msg = self.session.feed_identities(msg, copy=False)
212 idents,msg = self.session.feed_identities(msg, copy=False)
213 try:
213 try:
214 msg = self.session.unserialize(msg, content=True, copy=False)
214 msg = self.session.unserialize(msg, content=True, copy=False)
215 except:
215 except:
216 self.log.error("Invalid Message", exc_info=True)
216 self.log.error("Invalid Message", exc_info=True)
217 return
217 return
218
218
219 header = msg['header']
219 header = msg['header']
220 msg_id = header['msg_id']
220 msg_id = header['msg_id']
221 msg_type = msg['header']['msg_type']
221 msg_type = msg['header']['msg_type']
222
222
223 # Print some info about this message and leave a '--->' marker, so it's
223 # Print some info about this message and leave a '--->' marker, so it's
224 # easier to trace visually the message chain when debugging. Each
224 # easier to trace visually the message chain when debugging. Each
225 # handler prints its message at the end.
225 # handler prints its message at the end.
226 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
226 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
227 self.log.debug(' Content: %s\n --->\n ', msg['content'])
227 self.log.debug(' Content: %s\n --->\n ', msg['content'])
228
228
229 if msg_id in self.aborted:
229 if msg_id in self.aborted:
230 self.aborted.remove(msg_id)
230 self.aborted.remove(msg_id)
231 # is it safe to assume a msg_id will not be resubmitted?
231 # is it safe to assume a msg_id will not be resubmitted?
232 reply_type = msg_type.split('_')[0] + '_reply'
232 reply_type = msg_type.split('_')[0] + '_reply'
233 status = {'status' : 'aborted'}
233 status = {'status' : 'aborted'}
234 md = {'engine' : self.ident}
234 md = {'engine' : self.ident}
235 md.update(status)
235 md.update(status)
236 reply_msg = self.session.send(stream, reply_type, metadata=md,
236 reply_msg = self.session.send(stream, reply_type, metadata=md,
237 content=status, parent=msg, ident=idents)
237 content=status, parent=msg, ident=idents)
238 return
238 return
239
239
240 handler = self.shell_handlers.get(msg_type, None)
240 handler = self.shell_handlers.get(msg_type, None)
241 if handler is None:
241 if handler is None:
242 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
242 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
243 else:
243 else:
244 # ensure default_int_handler during handler call
244 # ensure default_int_handler during handler call
245 sig = signal(SIGINT, default_int_handler)
245 sig = signal(SIGINT, default_int_handler)
246 self.log.debug("%s: %s", msg_type, msg)
246 self.log.debug("%s: %s", msg_type, msg)
247 try:
247 try:
248 handler(stream, idents, msg)
248 handler(stream, idents, msg)
249 except Exception:
249 except Exception:
250 self.log.error("Exception in message handler:", exc_info=True)
250 self.log.error("Exception in message handler:", exc_info=True)
251 finally:
251 finally:
252 signal(SIGINT, sig)
252 signal(SIGINT, sig)
253
253
254 def enter_eventloop(self):
254 def enter_eventloop(self):
255 """enter eventloop"""
255 """enter eventloop"""
256 self.log.info("entering eventloop %s", self.eventloop)
256 self.log.info("entering eventloop %s", self.eventloop)
257 for stream in self.shell_streams:
257 for stream in self.shell_streams:
258 # flush any pending replies,
258 # flush any pending replies,
259 # which may be skipped by entering the eventloop
259 # which may be skipped by entering the eventloop
260 stream.flush(zmq.POLLOUT)
260 stream.flush(zmq.POLLOUT)
261 # restore default_int_handler
261 # restore default_int_handler
262 signal(SIGINT, default_int_handler)
262 signal(SIGINT, default_int_handler)
263 while self.eventloop is not None:
263 while self.eventloop is not None:
264 try:
264 try:
265 self.eventloop(self)
265 self.eventloop(self)
266 except KeyboardInterrupt:
266 except KeyboardInterrupt:
267 # Ctrl-C shouldn't crash the kernel
267 # Ctrl-C shouldn't crash the kernel
268 self.log.error("KeyboardInterrupt caught in kernel")
268 self.log.error("KeyboardInterrupt caught in kernel")
269 continue
269 continue
270 else:
270 else:
271 # eventloop exited cleanly, this means we should stop (right?)
271 # eventloop exited cleanly, this means we should stop (right?)
272 self.eventloop = None
272 self.eventloop = None
273 break
273 break
274 self.log.info("exiting eventloop")
274 self.log.info("exiting eventloop")
275
275
276 def start(self):
276 def start(self):
277 """register dispatchers for streams"""
277 """register dispatchers for streams"""
278 self.shell.exit_now = False
278 self.shell.exit_now = False
279 if self.control_stream:
279 if self.control_stream:
280 self.control_stream.on_recv(self.dispatch_control, copy=False)
280 self.control_stream.on_recv(self.dispatch_control, copy=False)
281
281
282 def make_dispatcher(stream):
282 def make_dispatcher(stream):
283 def dispatcher(msg):
283 def dispatcher(msg):
284 return self.dispatch_shell(stream, msg)
284 return self.dispatch_shell(stream, msg)
285 return dispatcher
285 return dispatcher
286
286
287 for s in self.shell_streams:
287 for s in self.shell_streams:
288 s.on_recv(make_dispatcher(s), copy=False)
288 s.on_recv(make_dispatcher(s), copy=False)
289
289
290 # publish idle status
290 # publish idle status
291 self._publish_status('starting')
291 self._publish_status('starting')
292
292
293 def do_one_iteration(self):
293 def do_one_iteration(self):
294 """step eventloop just once"""
294 """step eventloop just once"""
295 if self.control_stream:
295 if self.control_stream:
296 self.control_stream.flush()
296 self.control_stream.flush()
297 for stream in self.shell_streams:
297 for stream in self.shell_streams:
298 # handle at most one request per iteration
298 # handle at most one request per iteration
299 stream.flush(zmq.POLLIN, 1)
299 stream.flush(zmq.POLLIN, 1)
300 stream.flush(zmq.POLLOUT)
300 stream.flush(zmq.POLLOUT)
301
301
302
302
303 def record_ports(self, ports):
303 def record_ports(self, ports):
304 """Record the ports that this kernel is using.
304 """Record the ports that this kernel is using.
305
305
306 The creator of the Kernel instance must call this methods if they
306 The creator of the Kernel instance must call this methods if they
307 want the :meth:`connect_request` method to return the port numbers.
307 want the :meth:`connect_request` method to return the port numbers.
308 """
308 """
309 self._recorded_ports = ports
309 self._recorded_ports = ports
310
310
311 #---------------------------------------------------------------------------
311 #---------------------------------------------------------------------------
312 # Kernel request handlers
312 # Kernel request handlers
313 #---------------------------------------------------------------------------
313 #---------------------------------------------------------------------------
314
314
315 def _make_metadata(self, other=None):
315 def _make_metadata(self, other=None):
316 """init metadata dict, for execute/apply_reply"""
316 """init metadata dict, for execute/apply_reply"""
317 new_md = {
317 new_md = {
318 'dependencies_met' : True,
318 'dependencies_met' : True,
319 'engine' : self.ident,
319 'engine' : self.ident,
320 'started': datetime.now(),
320 'started': datetime.now(),
321 }
321 }
322 if other:
322 if other:
323 new_md.update(other)
323 new_md.update(other)
324 return new_md
324 return new_md
325
325
326 def _publish_execute_input(self, code, parent, execution_count):
326 def _publish_execute_input(self, code, parent, execution_count):
327 """Publish the code request on the iopub stream."""
327 """Publish the code request on the iopub stream."""
328
328
329 self.session.send(self.iopub_socket, u'execute_input',
329 self.session.send(self.iopub_socket, u'execute_input',
330 {u'code':code, u'execution_count': execution_count},
330 {u'code':code, u'execution_count': execution_count},
331 parent=parent, ident=self._topic('execute_input')
331 parent=parent, ident=self._topic('execute_input')
332 )
332 )
333
333
334 def _publish_status(self, status, parent=None):
334 def _publish_status(self, status, parent=None):
335 """send status (busy/idle) on IOPub"""
335 """send status (busy/idle) on IOPub"""
336 self.session.send(self.iopub_socket,
336 self.session.send(self.iopub_socket,
337 u'status',
337 u'status',
338 {u'execution_state': status},
338 {u'execution_state': status},
339 parent=parent,
339 parent=parent,
340 ident=self._topic('status'),
340 ident=self._topic('status'),
341 )
341 )
342
342
343 def _forward_input(self, allow_stdin=False):
343 def _forward_input(self, allow_stdin=False):
344 """Forward raw_input and getpass to the current frontend.
344 """Forward raw_input and getpass to the current frontend.
345
345
346 via input_request
346 via input_request
347 """
347 """
348 self._allow_stdin = allow_stdin
348 self._allow_stdin = allow_stdin
349
349
350 if py3compat.PY3:
350 if py3compat.PY3:
351 self._sys_raw_input = builtin_mod.input
351 self._sys_raw_input = builtin_mod.input
352 builtin_mod.input = self.raw_input
352 builtin_mod.input = self.raw_input
353 else:
353 else:
354 self._sys_raw_input = builtin_mod.raw_input
354 self._sys_raw_input = builtin_mod.raw_input
355 self._sys_eval_input = builtin_mod.input
355 self._sys_eval_input = builtin_mod.input
356 builtin_mod.raw_input = self.raw_input
356 builtin_mod.raw_input = self.raw_input
357 builtin_mod.input = lambda prompt='': eval(self.raw_input(prompt))
357 builtin_mod.input = lambda prompt='': eval(self.raw_input(prompt))
358 self._save_getpass = getpass.getpass
358 self._save_getpass = getpass.getpass
359 getpass.getpass = self.getpass
359 getpass.getpass = self.getpass
360
360
361 def _restore_input(self):
361 def _restore_input(self):
362 """Restore raw_input, getpass"""
362 """Restore raw_input, getpass"""
363 if py3compat.PY3:
363 if py3compat.PY3:
364 builtin_mod.input = self._sys_raw_input
364 builtin_mod.input = self._sys_raw_input
365 else:
365 else:
366 builtin_mod.raw_input = self._sys_raw_input
366 builtin_mod.raw_input = self._sys_raw_input
367 builtin_mod.input = self._sys_eval_input
367 builtin_mod.input = self._sys_eval_input
368
368
369 getpass.getpass = self._save_getpass
369 getpass.getpass = self._save_getpass
370
370
371 def set_parent(self, ident, parent):
371 def set_parent(self, ident, parent):
372 """Record the parent state
372 """Record the parent state
373
373
374 For associating side effects with their requests.
374 For associating side effects with their requests.
375 """
375 """
376 self._parent_ident = ident
376 self._parent_ident = ident
377 self._parent_header = parent
377 self._parent_header = parent
378 self.shell.set_parent(parent)
378 self.shell.set_parent(parent)
379
379
380 def execute_request(self, stream, ident, parent):
380 def execute_request(self, stream, ident, parent):
381 """handle an execute_request"""
381 """handle an execute_request"""
382
382
383 self._publish_status(u'busy', parent)
383 self._publish_status(u'busy', parent)
384
384
385 try:
385 try:
386 content = parent[u'content']
386 content = parent[u'content']
387 code = py3compat.cast_unicode_py2(content[u'code'])
387 code = py3compat.cast_unicode_py2(content[u'code'])
388 silent = content[u'silent']
388 silent = content[u'silent']
389 store_history = content.get(u'store_history', not silent)
389 store_history = content.get(u'store_history', not silent)
390 except:
390 except:
391 self.log.error("Got bad msg: ")
391 self.log.error("Got bad msg: ")
392 self.log.error("%s", parent)
392 self.log.error("%s", parent)
393 return
393 return
394
394
395 md = self._make_metadata(parent['metadata'])
395 md = self._make_metadata(parent['metadata'])
396
396
397 shell = self.shell # we'll need this a lot here
397 shell = self.shell # we'll need this a lot here
398
398
399 self._forward_input(content.get('allow_stdin', False))
399 self._forward_input(content.get('allow_stdin', False))
400 # Set the parent message of the display hook and out streams.
400 # Set the parent message of the display hook and out streams.
401 self.set_parent(ident, parent)
401 self.set_parent(ident, parent)
402
402
403 # Re-broadcast our input for the benefit of listening clients, and
403 # Re-broadcast our input for the benefit of listening clients, and
404 # start computing output
404 # start computing output
405 if not silent:
405 if not silent:
406 self._publish_execute_input(code, parent, shell.execution_count)
406 self._publish_execute_input(code, parent, shell.execution_count)
407
407
408 reply_content = {}
408 reply_content = {}
409 # FIXME: the shell calls the exception handler itself.
409 # FIXME: the shell calls the exception handler itself.
410 shell._reply_content = None
410 shell._reply_content = None
411 try:
411 try:
412 shell.run_cell(code, store_history=store_history, silent=silent)
412 shell.run_cell(code, store_history=store_history, silent=silent)
413 except:
413 except:
414 status = u'error'
414 status = u'error'
415 # FIXME: this code right now isn't being used yet by default,
415 # FIXME: this code right now isn't being used yet by default,
416 # because the run_cell() call above directly fires off exception
416 # because the run_cell() call above directly fires off exception
417 # reporting. This code, therefore, is only active in the scenario
417 # reporting. This code, therefore, is only active in the scenario
418 # where runlines itself has an unhandled exception. We need to
418 # where runlines itself has an unhandled exception. We need to
419 # uniformize this, for all exception construction to come from a
419 # uniformize this, for all exception construction to come from a
420 # single location in the codbase.
420 # single location in the codbase.
421 etype, evalue, tb = sys.exc_info()
421 etype, evalue, tb = sys.exc_info()
422 tb_list = traceback.format_exception(etype, evalue, tb)
422 tb_list = traceback.format_exception(etype, evalue, tb)
423 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
423 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
424 else:
424 else:
425 status = u'ok'
425 status = u'ok'
426 finally:
426 finally:
427 self._restore_input()
427 self._restore_input()
428
428
429 reply_content[u'status'] = status
429 reply_content[u'status'] = status
430
430
431 # Return the execution counter so clients can display prompts
431 # Return the execution counter so clients can display prompts
432 reply_content['execution_count'] = shell.execution_count - 1
432 reply_content['execution_count'] = shell.execution_count - 1
433
433
434 # FIXME - fish exception info out of shell, possibly left there by
434 # FIXME - fish exception info out of shell, possibly left there by
435 # runlines. We'll need to clean up this logic later.
435 # runlines. We'll need to clean up this logic later.
436 if shell._reply_content is not None:
436 if shell._reply_content is not None:
437 reply_content.update(shell._reply_content)
437 reply_content.update(shell._reply_content)
438 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
438 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
439 reply_content['engine_info'] = e_info
439 reply_content['engine_info'] = e_info
440 # reset after use
440 # reset after use
441 shell._reply_content = None
441 shell._reply_content = None
442
442
443 if 'traceback' in reply_content:
443 if 'traceback' in reply_content:
444 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
444 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
445
445
446
446
447 # At this point, we can tell whether the main code execution succeeded
447 # At this point, we can tell whether the main code execution succeeded
448 # or not. If it did, we proceed to evaluate user_expressions
448 # or not. If it did, we proceed to evaluate user_expressions
449 if reply_content['status'] == 'ok':
449 if reply_content['status'] == 'ok':
450 reply_content[u'user_expressions'] = \
450 reply_content[u'user_expressions'] = \
451 shell.user_expressions(content.get(u'user_expressions', {}))
451 shell.user_expressions(content.get(u'user_expressions', {}))
452 else:
452 else:
453 # If there was an error, don't even try to compute expressions
453 # If there was an error, don't even try to compute expressions
454 reply_content[u'user_expressions'] = {}
454 reply_content[u'user_expressions'] = {}
455
455
456 # Payloads should be retrieved regardless of outcome, so we can both
456 # Payloads should be retrieved regardless of outcome, so we can both
457 # recover partial output (that could have been generated early in a
457 # recover partial output (that could have been generated early in a
458 # block, before an error) and clear the payload system always.
458 # block, before an error) and clear the payload system always.
459 reply_content[u'payload'] = shell.payload_manager.read_payload()
459 reply_content[u'payload'] = shell.payload_manager.read_payload()
460 # Be agressive about clearing the payload because we don't want
460 # Be agressive about clearing the payload because we don't want
461 # it to sit in memory until the next execute_request comes in.
461 # it to sit in memory until the next execute_request comes in.
462 shell.payload_manager.clear_payload()
462 shell.payload_manager.clear_payload()
463
463
464 # Flush output before sending the reply.
464 # Flush output before sending the reply.
465 sys.stdout.flush()
465 sys.stdout.flush()
466 sys.stderr.flush()
466 sys.stderr.flush()
467 # FIXME: on rare occasions, the flush doesn't seem to make it to the
467 # FIXME: on rare occasions, the flush doesn't seem to make it to the
468 # clients... This seems to mitigate the problem, but we definitely need
468 # clients... This seems to mitigate the problem, but we definitely need
469 # to better understand what's going on.
469 # to better understand what's going on.
470 if self._execute_sleep:
470 if self._execute_sleep:
471 time.sleep(self._execute_sleep)
471 time.sleep(self._execute_sleep)
472
472
473 # Send the reply.
473 # Send the reply.
474 reply_content = json_clean(reply_content)
474 reply_content = json_clean(reply_content)
475
475
476 md['status'] = reply_content['status']
476 md['status'] = reply_content['status']
477 if reply_content['status'] == 'error' and \
477 if reply_content['status'] == 'error' and \
478 reply_content['ename'] == 'UnmetDependency':
478 reply_content['ename'] == 'UnmetDependency':
479 md['dependencies_met'] = False
479 md['dependencies_met'] = False
480
480
481 reply_msg = self.session.send(stream, u'execute_reply',
481 reply_msg = self.session.send(stream, u'execute_reply',
482 reply_content, parent, metadata=md,
482 reply_content, parent, metadata=md,
483 ident=ident)
483 ident=ident)
484
484
485 self.log.debug("%s", reply_msg)
485 self.log.debug("%s", reply_msg)
486
486
487 if not silent and reply_msg['content']['status'] == u'error':
487 if not silent and reply_msg['content']['status'] == u'error':
488 self._abort_queues()
488 self._abort_queues()
489
489
490 self._publish_status(u'idle', parent)
490 self._publish_status(u'idle', parent)
491
491
492 def complete_request(self, stream, ident, parent):
492 def complete_request(self, stream, ident, parent):
493 content = parent['content']
493 content = parent['content']
494 code = content['code']
494 code = content['code']
495 cursor_pos = content['cursor_pos']
495 cursor_pos = content['cursor_pos']
496
496
497 txt, matches = self.shell.complete('', code, cursor_pos)
497 txt, matches = self.shell.complete('', code, cursor_pos)
498 matches = {'matches' : matches,
498 matches = {'matches' : matches,
499 'matched_text' : txt,
499 'matched_text' : txt,
500 'status' : 'ok'}
500 'status' : 'ok'}
501 matches = json_clean(matches)
501 matches = json_clean(matches)
502 completion_msg = self.session.send(stream, 'complete_reply',
502 completion_msg = self.session.send(stream, 'complete_reply',
503 matches, parent, ident)
503 matches, parent, ident)
504 self.log.debug("%s", completion_msg)
504 self.log.debug("%s", completion_msg)
505
505
506 def object_info_request(self, stream, ident, parent):
506 def object_info_request(self, stream, ident, parent):
507 content = parent['content']
507 content = parent['content']
508
508
509 name = token_at_cursor(content['code'], content['cursor_pos'])
509 name = token_at_cursor(content['code'], content['cursor_pos'])
510 info = self.shell.object_inspect(name)
510 info = self.shell.object_inspect(name)
511
511
512 reply_content = {'status' : 'ok'}
512 reply_content = {'status' : 'ok'}
513 reply_content['data'] = data = {}
513 reply_content['data'] = data = {}
514 reply_content['metadata'] = {}
514 reply_content['metadata'] = {}
515 reply_content['name'] = name
515 reply_content['name'] = name
516 reply_content['found'] = info['found']
516 reply_content['found'] = info['found']
517 if info['found']:
517 if info['found']:
518 info_text = self.shell.object_inspect_text(
518 info_text = self.shell.object_inspect_text(
519 name,
519 name,
520 detail_level=content.get('detail_level', 0),
520 detail_level=content.get('detail_level', 0),
521 )
521 )
522 reply_content['data']['text/plain'] = info_text
522 reply_content['data']['text/plain'] = info_text
523 # Before we send this object over, we scrub it for JSON usage
523 # Before we send this object over, we scrub it for JSON usage
524 reply_content = json_clean(reply_content)
524 reply_content = json_clean(reply_content)
525 msg = self.session.send(stream, 'object_info_reply',
525 msg = self.session.send(stream, 'object_info_reply',
526 reply_content, parent, ident)
526 reply_content, parent, ident)
527 self.log.debug("%s", msg)
527 self.log.debug("%s", msg)
528
528
529 def history_request(self, stream, ident, parent):
529 def history_request(self, stream, ident, parent):
530 # We need to pull these out, as passing **kwargs doesn't work with
530 # We need to pull these out, as passing **kwargs doesn't work with
531 # unicode keys before Python 2.6.5.
531 # unicode keys before Python 2.6.5.
532 hist_access_type = parent['content']['hist_access_type']
532 hist_access_type = parent['content']['hist_access_type']
533 raw = parent['content']['raw']
533 raw = parent['content']['raw']
534 output = parent['content']['output']
534 output = parent['content']['output']
535 if hist_access_type == 'tail':
535 if hist_access_type == 'tail':
536 n = parent['content']['n']
536 n = parent['content']['n']
537 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
537 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
538 include_latest=True)
538 include_latest=True)
539
539
540 elif hist_access_type == 'range':
540 elif hist_access_type == 'range':
541 session = parent['content']['session']
541 session = parent['content']['session']
542 start = parent['content']['start']
542 start = parent['content']['start']
543 stop = parent['content']['stop']
543 stop = parent['content']['stop']
544 hist = self.shell.history_manager.get_range(session, start, stop,
544 hist = self.shell.history_manager.get_range(session, start, stop,
545 raw=raw, output=output)
545 raw=raw, output=output)
546
546
547 elif hist_access_type == 'search':
547 elif hist_access_type == 'search':
548 n = parent['content'].get('n')
548 n = parent['content'].get('n')
549 unique = parent['content'].get('unique', False)
549 unique = parent['content'].get('unique', False)
550 pattern = parent['content']['pattern']
550 pattern = parent['content']['pattern']
551 hist = self.shell.history_manager.search(
551 hist = self.shell.history_manager.search(
552 pattern, raw=raw, output=output, n=n, unique=unique)
552 pattern, raw=raw, output=output, n=n, unique=unique)
553
553
554 else:
554 else:
555 hist = []
555 hist = []
556 hist = list(hist)
556 hist = list(hist)
557 content = {'history' : hist}
557 content = {'history' : hist}
558 content = json_clean(content)
558 content = json_clean(content)
559 msg = self.session.send(stream, 'history_reply',
559 msg = self.session.send(stream, 'history_reply',
560 content, parent, ident)
560 content, parent, ident)
561 self.log.debug("Sending history reply with %i entries", len(hist))
561 self.log.debug("Sending history reply with %i entries", len(hist))
562
562
563 def connect_request(self, stream, ident, parent):
563 def connect_request(self, stream, ident, parent):
564 if self._recorded_ports is not None:
564 if self._recorded_ports is not None:
565 content = self._recorded_ports.copy()
565 content = self._recorded_ports.copy()
566 else:
566 else:
567 content = {}
567 content = {}
568 msg = self.session.send(stream, 'connect_reply',
568 msg = self.session.send(stream, 'connect_reply',
569 content, parent, ident)
569 content, parent, ident)
570 self.log.debug("%s", msg)
570 self.log.debug("%s", msg)
571
571
572 def kernel_info_request(self, stream, ident, parent):
572 def kernel_info_request(self, stream, ident, parent):
573 vinfo = {
573 vinfo = {
574 'protocol_version': protocol_version,
574 'protocol_version': protocol_version,
575 'ipython_version': ipython_version,
575 'implementation': 'ipython',
576 'implementation_version': ipython_version,
576 'language_version': language_version,
577 'language_version': language_version,
577 'language': 'python',
578 'language': 'python',
578 'banner': self.shell.banner,
579 'banner': self.shell.banner,
579 }
580 }
580 msg = self.session.send(stream, 'kernel_info_reply',
581 msg = self.session.send(stream, 'kernel_info_reply',
581 vinfo, parent, ident)
582 vinfo, parent, ident)
582 self.log.debug("%s", msg)
583 self.log.debug("%s", msg)
583
584
584 def shutdown_request(self, stream, ident, parent):
585 def shutdown_request(self, stream, ident, parent):
585 self.shell.exit_now = True
586 self.shell.exit_now = True
586 content = dict(status='ok')
587 content = dict(status='ok')
587 content.update(parent['content'])
588 content.update(parent['content'])
588 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
589 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
589 # same content, but different msg_id for broadcasting on IOPub
590 # same content, but different msg_id for broadcasting on IOPub
590 self._shutdown_message = self.session.msg(u'shutdown_reply',
591 self._shutdown_message = self.session.msg(u'shutdown_reply',
591 content, parent
592 content, parent
592 )
593 )
593
594
594 self._at_shutdown()
595 self._at_shutdown()
595 # call sys.exit after a short delay
596 # call sys.exit after a short delay
596 loop = ioloop.IOLoop.instance()
597 loop = ioloop.IOLoop.instance()
597 loop.add_timeout(time.time()+0.1, loop.stop)
598 loop.add_timeout(time.time()+0.1, loop.stop)
598
599
599 #---------------------------------------------------------------------------
600 #---------------------------------------------------------------------------
600 # Engine methods
601 # Engine methods
601 #---------------------------------------------------------------------------
602 #---------------------------------------------------------------------------
602
603
603 def apply_request(self, stream, ident, parent):
604 def apply_request(self, stream, ident, parent):
604 try:
605 try:
605 content = parent[u'content']
606 content = parent[u'content']
606 bufs = parent[u'buffers']
607 bufs = parent[u'buffers']
607 msg_id = parent['header']['msg_id']
608 msg_id = parent['header']['msg_id']
608 except:
609 except:
609 self.log.error("Got bad msg: %s", parent, exc_info=True)
610 self.log.error("Got bad msg: %s", parent, exc_info=True)
610 return
611 return
611
612
612 self._publish_status(u'busy', parent)
613 self._publish_status(u'busy', parent)
613
614
614 # Set the parent message of the display hook and out streams.
615 # Set the parent message of the display hook and out streams.
615 shell = self.shell
616 shell = self.shell
616 shell.set_parent(parent)
617 shell.set_parent(parent)
617
618
618 # execute_input_msg = self.session.msg(u'execute_input',{u'code':code}, parent=parent)
619 # execute_input_msg = self.session.msg(u'execute_input',{u'code':code}, parent=parent)
619 # self.iopub_socket.send(execute_input_msg)
620 # self.iopub_socket.send(execute_input_msg)
620 # self.session.send(self.iopub_socket, u'execute_input', {u'code':code},parent=parent)
621 # self.session.send(self.iopub_socket, u'execute_input', {u'code':code},parent=parent)
621 md = self._make_metadata(parent['metadata'])
622 md = self._make_metadata(parent['metadata'])
622 try:
623 try:
623 working = shell.user_ns
624 working = shell.user_ns
624
625
625 prefix = "_"+str(msg_id).replace("-","")+"_"
626 prefix = "_"+str(msg_id).replace("-","")+"_"
626
627
627 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
628 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
628
629
629 fname = getattr(f, '__name__', 'f')
630 fname = getattr(f, '__name__', 'f')
630
631
631 fname = prefix+"f"
632 fname = prefix+"f"
632 argname = prefix+"args"
633 argname = prefix+"args"
633 kwargname = prefix+"kwargs"
634 kwargname = prefix+"kwargs"
634 resultname = prefix+"result"
635 resultname = prefix+"result"
635
636
636 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
637 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
637 # print ns
638 # print ns
638 working.update(ns)
639 working.update(ns)
639 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
640 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
640 try:
641 try:
641 exec(code, shell.user_global_ns, shell.user_ns)
642 exec(code, shell.user_global_ns, shell.user_ns)
642 result = working.get(resultname)
643 result = working.get(resultname)
643 finally:
644 finally:
644 for key in ns:
645 for key in ns:
645 working.pop(key)
646 working.pop(key)
646
647
647 result_buf = serialize_object(result,
648 result_buf = serialize_object(result,
648 buffer_threshold=self.session.buffer_threshold,
649 buffer_threshold=self.session.buffer_threshold,
649 item_threshold=self.session.item_threshold,
650 item_threshold=self.session.item_threshold,
650 )
651 )
651
652
652 except:
653 except:
653 # invoke IPython traceback formatting
654 # invoke IPython traceback formatting
654 shell.showtraceback()
655 shell.showtraceback()
655 # FIXME - fish exception info out of shell, possibly left there by
656 # FIXME - fish exception info out of shell, possibly left there by
656 # run_code. We'll need to clean up this logic later.
657 # run_code. We'll need to clean up this logic later.
657 reply_content = {}
658 reply_content = {}
658 if shell._reply_content is not None:
659 if shell._reply_content is not None:
659 reply_content.update(shell._reply_content)
660 reply_content.update(shell._reply_content)
660 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
661 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
661 reply_content['engine_info'] = e_info
662 reply_content['engine_info'] = e_info
662 # reset after use
663 # reset after use
663 shell._reply_content = None
664 shell._reply_content = None
664
665
665 self.session.send(self.iopub_socket, u'error', reply_content, parent=parent,
666 self.session.send(self.iopub_socket, u'error', reply_content, parent=parent,
666 ident=self._topic('error'))
667 ident=self._topic('error'))
667 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
668 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
668 result_buf = []
669 result_buf = []
669
670
670 if reply_content['ename'] == 'UnmetDependency':
671 if reply_content['ename'] == 'UnmetDependency':
671 md['dependencies_met'] = False
672 md['dependencies_met'] = False
672 else:
673 else:
673 reply_content = {'status' : 'ok'}
674 reply_content = {'status' : 'ok'}
674
675
675 # put 'ok'/'error' status in header, for scheduler introspection:
676 # put 'ok'/'error' status in header, for scheduler introspection:
676 md['status'] = reply_content['status']
677 md['status'] = reply_content['status']
677
678
678 # flush i/o
679 # flush i/o
679 sys.stdout.flush()
680 sys.stdout.flush()
680 sys.stderr.flush()
681 sys.stderr.flush()
681
682
682 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
683 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
683 parent=parent, ident=ident,buffers=result_buf, metadata=md)
684 parent=parent, ident=ident,buffers=result_buf, metadata=md)
684
685
685 self._publish_status(u'idle', parent)
686 self._publish_status(u'idle', parent)
686
687
687 #---------------------------------------------------------------------------
688 #---------------------------------------------------------------------------
688 # Control messages
689 # Control messages
689 #---------------------------------------------------------------------------
690 #---------------------------------------------------------------------------
690
691
691 def abort_request(self, stream, ident, parent):
692 def abort_request(self, stream, ident, parent):
692 """abort a specifig msg by id"""
693 """abort a specifig msg by id"""
693 msg_ids = parent['content'].get('msg_ids', None)
694 msg_ids = parent['content'].get('msg_ids', None)
694 if isinstance(msg_ids, string_types):
695 if isinstance(msg_ids, string_types):
695 msg_ids = [msg_ids]
696 msg_ids = [msg_ids]
696 if not msg_ids:
697 if not msg_ids:
697 self.abort_queues()
698 self.abort_queues()
698 for mid in msg_ids:
699 for mid in msg_ids:
699 self.aborted.add(str(mid))
700 self.aborted.add(str(mid))
700
701
701 content = dict(status='ok')
702 content = dict(status='ok')
702 reply_msg = self.session.send(stream, 'abort_reply', content=content,
703 reply_msg = self.session.send(stream, 'abort_reply', content=content,
703 parent=parent, ident=ident)
704 parent=parent, ident=ident)
704 self.log.debug("%s", reply_msg)
705 self.log.debug("%s", reply_msg)
705
706
706 def clear_request(self, stream, idents, parent):
707 def clear_request(self, stream, idents, parent):
707 """Clear our namespace."""
708 """Clear our namespace."""
708 self.shell.reset(False)
709 self.shell.reset(False)
709 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
710 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
710 content = dict(status='ok'))
711 content = dict(status='ok'))
711
712
712
713
713 #---------------------------------------------------------------------------
714 #---------------------------------------------------------------------------
714 # Protected interface
715 # Protected interface
715 #---------------------------------------------------------------------------
716 #---------------------------------------------------------------------------
716
717
717 def _wrap_exception(self, method=None):
718 def _wrap_exception(self, method=None):
718 # import here, because _wrap_exception is only used in parallel,
719 # import here, because _wrap_exception is only used in parallel,
719 # and parallel has higher min pyzmq version
720 # and parallel has higher min pyzmq version
720 from IPython.parallel.error import wrap_exception
721 from IPython.parallel.error import wrap_exception
721 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
722 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
722 content = wrap_exception(e_info)
723 content = wrap_exception(e_info)
723 return content
724 return content
724
725
725 def _topic(self, topic):
726 def _topic(self, topic):
726 """prefixed topic for IOPub messages"""
727 """prefixed topic for IOPub messages"""
727 if self.int_id >= 0:
728 if self.int_id >= 0:
728 base = "engine.%i" % self.int_id
729 base = "engine.%i" % self.int_id
729 else:
730 else:
730 base = "kernel.%s" % self.ident
731 base = "kernel.%s" % self.ident
731
732
732 return py3compat.cast_bytes("%s.%s" % (base, topic))
733 return py3compat.cast_bytes("%s.%s" % (base, topic))
733
734
734 def _abort_queues(self):
735 def _abort_queues(self):
735 for stream in self.shell_streams:
736 for stream in self.shell_streams:
736 if stream:
737 if stream:
737 self._abort_queue(stream)
738 self._abort_queue(stream)
738
739
739 def _abort_queue(self, stream):
740 def _abort_queue(self, stream):
740 poller = zmq.Poller()
741 poller = zmq.Poller()
741 poller.register(stream.socket, zmq.POLLIN)
742 poller.register(stream.socket, zmq.POLLIN)
742 while True:
743 while True:
743 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
744 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
744 if msg is None:
745 if msg is None:
745 return
746 return
746
747
747 self.log.info("Aborting:")
748 self.log.info("Aborting:")
748 self.log.info("%s", msg)
749 self.log.info("%s", msg)
749 msg_type = msg['header']['msg_type']
750 msg_type = msg['header']['msg_type']
750 reply_type = msg_type.split('_')[0] + '_reply'
751 reply_type = msg_type.split('_')[0] + '_reply'
751
752
752 status = {'status' : 'aborted'}
753 status = {'status' : 'aborted'}
753 md = {'engine' : self.ident}
754 md = {'engine' : self.ident}
754 md.update(status)
755 md.update(status)
755 reply_msg = self.session.send(stream, reply_type, metadata=md,
756 reply_msg = self.session.send(stream, reply_type, metadata=md,
756 content=status, parent=msg, ident=idents)
757 content=status, parent=msg, ident=idents)
757 self.log.debug("%s", reply_msg)
758 self.log.debug("%s", reply_msg)
758 # We need to wait a bit for requests to come in. This can probably
759 # We need to wait a bit for requests to come in. This can probably
759 # be set shorter for true asynchronous clients.
760 # be set shorter for true asynchronous clients.
760 poller.poll(50)
761 poller.poll(50)
761
762
762
763
763 def _no_raw_input(self):
764 def _no_raw_input(self):
764 """Raise StdinNotImplentedError if active frontend doesn't support
765 """Raise StdinNotImplentedError if active frontend doesn't support
765 stdin."""
766 stdin."""
766 raise StdinNotImplementedError("raw_input was called, but this "
767 raise StdinNotImplementedError("raw_input was called, but this "
767 "frontend does not support stdin.")
768 "frontend does not support stdin.")
768
769
769 def getpass(self, prompt=''):
770 def getpass(self, prompt=''):
770 """Forward getpass to frontends
771 """Forward getpass to frontends
771
772
772 Raises
773 Raises
773 ------
774 ------
774 StdinNotImplentedError if active frontend doesn't support stdin.
775 StdinNotImplentedError if active frontend doesn't support stdin.
775 """
776 """
776 if not self._allow_stdin:
777 if not self._allow_stdin:
777 raise StdinNotImplementedError(
778 raise StdinNotImplementedError(
778 "getpass was called, but this frontend does not support input requests."
779 "getpass was called, but this frontend does not support input requests."
779 )
780 )
780 return self._input_request(prompt,
781 return self._input_request(prompt,
781 self._parent_ident,
782 self._parent_ident,
782 self._parent_header,
783 self._parent_header,
783 password=True,
784 password=True,
784 )
785 )
785
786
786 def raw_input(self, prompt=''):
787 def raw_input(self, prompt=''):
787 """Forward raw_input to frontends
788 """Forward raw_input to frontends
788
789
789 Raises
790 Raises
790 ------
791 ------
791 StdinNotImplentedError if active frontend doesn't support stdin.
792 StdinNotImplentedError if active frontend doesn't support stdin.
792 """
793 """
793 if not self._allow_stdin:
794 if not self._allow_stdin:
794 raise StdinNotImplementedError(
795 raise StdinNotImplementedError(
795 "raw_input was called, but this frontend does not support input requests."
796 "raw_input was called, but this frontend does not support input requests."
796 )
797 )
797 return self._input_request(prompt,
798 return self._input_request(prompt,
798 self._parent_ident,
799 self._parent_ident,
799 self._parent_header,
800 self._parent_header,
800 password=False,
801 password=False,
801 )
802 )
802
803
803 def _input_request(self, prompt, ident, parent, password=False):
804 def _input_request(self, prompt, ident, parent, password=False):
804 # Flush output before making the request.
805 # Flush output before making the request.
805 sys.stderr.flush()
806 sys.stderr.flush()
806 sys.stdout.flush()
807 sys.stdout.flush()
807 # flush the stdin socket, to purge stale replies
808 # flush the stdin socket, to purge stale replies
808 while True:
809 while True:
809 try:
810 try:
810 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
811 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
811 except zmq.ZMQError as e:
812 except zmq.ZMQError as e:
812 if e.errno == zmq.EAGAIN:
813 if e.errno == zmq.EAGAIN:
813 break
814 break
814 else:
815 else:
815 raise
816 raise
816
817
817 # Send the input request.
818 # Send the input request.
818 content = json_clean(dict(prompt=prompt, password=password))
819 content = json_clean(dict(prompt=prompt, password=password))
819 self.session.send(self.stdin_socket, u'input_request', content, parent,
820 self.session.send(self.stdin_socket, u'input_request', content, parent,
820 ident=ident)
821 ident=ident)
821
822
822 # Await a response.
823 # Await a response.
823 while True:
824 while True:
824 try:
825 try:
825 ident, reply = self.session.recv(self.stdin_socket, 0)
826 ident, reply = self.session.recv(self.stdin_socket, 0)
826 except Exception:
827 except Exception:
827 self.log.warn("Invalid Message:", exc_info=True)
828 self.log.warn("Invalid Message:", exc_info=True)
828 except KeyboardInterrupt:
829 except KeyboardInterrupt:
829 # re-raise KeyboardInterrupt, to truncate traceback
830 # re-raise KeyboardInterrupt, to truncate traceback
830 raise KeyboardInterrupt
831 raise KeyboardInterrupt
831 else:
832 else:
832 break
833 break
833 try:
834 try:
834 value = py3compat.unicode_to_str(reply['content']['value'])
835 value = py3compat.unicode_to_str(reply['content']['value'])
835 except:
836 except:
836 self.log.error("Bad input_reply: %s", parent)
837 self.log.error("Bad input_reply: %s", parent)
837 value = ''
838 value = ''
838 if value == '\x04':
839 if value == '\x04':
839 # EOF
840 # EOF
840 raise EOFError
841 raise EOFError
841 return value
842 return value
842
843
843 def _at_shutdown(self):
844 def _at_shutdown(self):
844 """Actions taken at shutdown by the kernel, called by python's atexit.
845 """Actions taken at shutdown by the kernel, called by python's atexit.
845 """
846 """
846 # io.rprint("Kernel at_shutdown") # dbg
847 # io.rprint("Kernel at_shutdown") # dbg
847 if self._shutdown_message is not None:
848 if self._shutdown_message is not None:
848 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
849 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
849 self.log.debug("%s", self._shutdown_message)
850 self.log.debug("%s", self._shutdown_message)
850 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
851 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
851
852
General Comments 0
You need to be logged in to leave comments. Login now