##// END OF EJS Templates
add implementation and implementation_version to kernel_info_reply
MinRK -
Show More
@@ -1,399 +1,401 b''
1 1 """Test suite for our zeromq-based message specification."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import re
7 7 from distutils.version import LooseVersion as V
8 8 from subprocess import PIPE
9 9 try:
10 10 from queue import Empty # Py 3
11 11 except ImportError:
12 12 from Queue import Empty # Py 2
13 13
14 14 import nose.tools as nt
15 15
16 16 from IPython.kernel import KernelManager
17 17
18 18 from IPython.utils.traitlets import (
19 19 HasTraits, TraitError, Bool, Unicode, Dict, Integer, List, Enum, Any,
20 20 )
21 21 from IPython.utils.py3compat import string_types, iteritems
22 22
23 23 from .utils import TIMEOUT, start_global_kernel, flush_channels, execute
24 24
25 25 #-----------------------------------------------------------------------------
26 26 # Globals
27 27 #-----------------------------------------------------------------------------
28 28 KC = None
29 29
30 30 def setup():
31 31 global KC
32 32 KC = start_global_kernel()
33 33
34 34 #-----------------------------------------------------------------------------
35 35 # Message Spec References
36 36 #-----------------------------------------------------------------------------
37 37
38 38 class Reference(HasTraits):
39 39
40 40 """
41 41 Base class for message spec specification testing.
42 42
43 43 This class is the core of the message specification test. The
44 44 idea is that child classes implement trait attributes for each
45 45 message keys, so that message keys can be tested against these
46 46 traits using :meth:`check` method.
47 47
48 48 """
49 49
50 50 def check(self, d):
51 51 """validate a dict against our traits"""
52 52 for key in self.trait_names():
53 53 nt.assert_in(key, d)
54 54 # FIXME: always allow None, probably not a good idea
55 55 if d[key] is None:
56 56 continue
57 57 try:
58 58 setattr(self, key, d[key])
59 59 except TraitError as e:
60 60 assert False, str(e)
61 61
62 62 class Version(Unicode):
63 63 def validate(self, obj, value):
64 64 min_version = self.default_value
65 65 if V(value) < V(min_version):
66 66 raise TraitError("bad version: %s < %s" % (value, min_version))
67 67
68 68 class RMessage(Reference):
69 69 msg_id = Unicode()
70 70 msg_type = Unicode()
71 71 header = Dict()
72 72 parent_header = Dict()
73 73 content = Dict()
74 74
75 75 def check(self, d):
76 76 super(RMessage, self).check(d)
77 77 RHeader().check(self.header)
78 78 if self.parent_header:
79 79 RHeader().check(self.parent_header)
80 80
81 81 class RHeader(Reference):
82 82 msg_id = Unicode()
83 83 msg_type = Unicode()
84 84 session = Unicode()
85 85 username = Unicode()
86 86 version = Version('5.0')
87 87
88 88 mime_pat = re.compile(r'\w+/\w+')
89 89
90 90 class MimeBundle(Reference):
91 91 metadata = Dict()
92 92 data = Dict()
93 93 def _data_changed(self, name, old, new):
94 94 for k,v in iteritems(new):
95 95 assert mime_pat.match(k)
96 96 nt.assert_is_instance(v, string_types)
97 97
98 98 # shell replies
99 99
100 100 class ExecuteReply(Reference):
101 101 execution_count = Integer()
102 102 status = Enum((u'ok', u'error'))
103 103
104 104 def check(self, d):
105 105 Reference.check(self, d)
106 106 if d['status'] == 'ok':
107 107 ExecuteReplyOkay().check(d)
108 108 elif d['status'] == 'error':
109 109 ExecuteReplyError().check(d)
110 110
111 111
112 112 class ExecuteReplyOkay(Reference):
113 113 payload = List(Dict)
114 114 user_expressions = Dict()
115 115
116 116
117 117 class ExecuteReplyError(Reference):
118 118 ename = Unicode()
119 119 evalue = Unicode()
120 120 traceback = List(Unicode)
121 121
122 122
123 123 class OInfoReply(MimeBundle):
124 124 name = Unicode()
125 125 found = Bool()
126 126
127 127
128 128 class ArgSpec(Reference):
129 129 args = List(Unicode)
130 130 varargs = Unicode()
131 131 varkw = Unicode()
132 132 defaults = List()
133 133
134 134
135 135 class Status(Reference):
136 136 execution_state = Enum((u'busy', u'idle', u'starting'))
137 137
138 138
139 139 class CompleteReply(Reference):
140 140 matches = List(Unicode)
141 141
142 142
143 143 class KernelInfoReply(Reference):
144 144 protocol_version = Version('5.0')
145 ipython_version = Version('2.0')
145 implementation = Unicode('ipython')
146 implementation_version = Version('2.1')
146 147 language_version = Version('2.7')
147 language = Unicode()
148 language = Unicode('python')
149 banner = Unicode()
148 150
149 151
150 152 # IOPub messages
151 153
152 154 class ExecuteInput(Reference):
153 155 code = Unicode()
154 156 execution_count = Integer()
155 157
156 158
157 159 Error = ExecuteReplyError
158 160
159 161
160 162 class Stream(Reference):
161 163 name = Enum((u'stdout', u'stderr'))
162 164 data = Unicode()
163 165
164 166
165 167 class DisplayData(MimeBundle):
166 168 source = Unicode()
167 169
168 170
169 171 class ExecuteResult(MimeBundle):
170 172 execution_count = Integer()
171 173
172 174
173 175 references = {
174 176 'execute_reply' : ExecuteReply(),
175 177 'object_info_reply' : OInfoReply(),
176 178 'status' : Status(),
177 179 'complete_reply' : CompleteReply(),
178 180 'kernel_info_reply': KernelInfoReply(),
179 181 'execute_input' : ExecuteInput(),
180 182 'execute_result' : ExecuteResult(),
181 183 'error' : Error(),
182 184 'stream' : Stream(),
183 185 'display_data' : DisplayData(),
184 186 'header' : RHeader(),
185 187 }
186 188 """
187 189 Specifications of `content` part of the reply messages.
188 190 """
189 191
190 192
191 193 def validate_message(msg, msg_type=None, parent=None):
192 194 """validate a message
193 195
194 196 This is a generator, and must be iterated through to actually
195 197 trigger each test.
196 198
197 199 If msg_type and/or parent are given, the msg_type and/or parent msg_id
198 200 are compared with the given values.
199 201 """
200 202 RMessage().check(msg)
201 203 if msg_type:
202 204 nt.assert_equal(msg['msg_type'], msg_type)
203 205 if parent:
204 206 nt.assert_equal(msg['parent_header']['msg_id'], parent)
205 207 content = msg['content']
206 208 ref = references[msg['msg_type']]
207 209 ref.check(content)
208 210
209 211
210 212 #-----------------------------------------------------------------------------
211 213 # Tests
212 214 #-----------------------------------------------------------------------------
213 215
214 216 # Shell channel
215 217
216 218 def test_execute():
217 219 flush_channels()
218 220
219 221 msg_id = KC.execute(code='x=1')
220 222 reply = KC.get_shell_msg(timeout=TIMEOUT)
221 223 validate_message(reply, 'execute_reply', msg_id)
222 224
223 225
224 226 def test_execute_silent():
225 227 flush_channels()
226 228 msg_id, reply = execute(code='x=1', silent=True)
227 229
228 230 # flush status=idle
229 231 status = KC.iopub_channel.get_msg(timeout=TIMEOUT)
230 232 validate_message(status, 'status', msg_id)
231 233 nt.assert_equal(status['content']['execution_state'], 'idle')
232 234
233 235 nt.assert_raises(Empty, KC.iopub_channel.get_msg, timeout=0.1)
234 236 count = reply['execution_count']
235 237
236 238 msg_id, reply = execute(code='x=2', silent=True)
237 239
238 240 # flush status=idle
239 241 status = KC.iopub_channel.get_msg(timeout=TIMEOUT)
240 242 validate_message(status, 'status', msg_id)
241 243 nt.assert_equal(status['content']['execution_state'], 'idle')
242 244
243 245 nt.assert_raises(Empty, KC.iopub_channel.get_msg, timeout=0.1)
244 246 count_2 = reply['execution_count']
245 247 nt.assert_equal(count_2, count)
246 248
247 249
248 250 def test_execute_error():
249 251 flush_channels()
250 252
251 253 msg_id, reply = execute(code='1/0')
252 254 nt.assert_equal(reply['status'], 'error')
253 255 nt.assert_equal(reply['ename'], 'ZeroDivisionError')
254 256
255 257 error = KC.iopub_channel.get_msg(timeout=TIMEOUT)
256 258 validate_message(error, 'error', msg_id)
257 259
258 260
259 261 def test_execute_inc():
260 262 """execute request should increment execution_count"""
261 263 flush_channels()
262 264
263 265 msg_id, reply = execute(code='x=1')
264 266 count = reply['execution_count']
265 267
266 268 flush_channels()
267 269
268 270 msg_id, reply = execute(code='x=2')
269 271 count_2 = reply['execution_count']
270 272 nt.assert_equal(count_2, count+1)
271 273
272 274
273 275 def test_user_expressions():
274 276 flush_channels()
275 277
276 278 msg_id, reply = execute(code='x=1', user_expressions=dict(foo='x+1'))
277 279 user_expressions = reply['user_expressions']
278 280 nt.assert_equal(user_expressions, {u'foo': {
279 281 u'status': u'ok',
280 282 u'data': {u'text/plain': u'2'},
281 283 u'metadata': {},
282 284 }})
283 285
284 286
285 287 def test_user_expressions_fail():
286 288 flush_channels()
287 289
288 290 msg_id, reply = execute(code='x=0', user_expressions=dict(foo='nosuchname'))
289 291 user_expressions = reply['user_expressions']
290 292 foo = user_expressions['foo']
291 293 nt.assert_equal(foo['status'], 'error')
292 294 nt.assert_equal(foo['ename'], 'NameError')
293 295
294 296
295 297 def test_oinfo():
296 298 flush_channels()
297 299
298 300 msg_id = KC.object_info('a')
299 301 reply = KC.get_shell_msg(timeout=TIMEOUT)
300 302 validate_message(reply, 'object_info_reply', msg_id)
301 303
302 304
303 305 def test_oinfo_found():
304 306 flush_channels()
305 307
306 308 msg_id, reply = execute(code='a=5')
307 309
308 310 msg_id = KC.object_info('a')
309 311 reply = KC.get_shell_msg(timeout=TIMEOUT)
310 312 validate_message(reply, 'object_info_reply', msg_id)
311 313 content = reply['content']
312 314 assert content['found']
313 315 nt.assert_equal(content['name'], 'a')
314 316 text = content['data']['text/plain']
315 317 nt.assert_in('Type:', text)
316 318 nt.assert_in('Docstring:', text)
317 319
318 320
319 321 def test_oinfo_detail():
320 322 flush_channels()
321 323
322 324 msg_id, reply = execute(code='ip=get_ipython()')
323 325
324 326 msg_id = KC.object_info('ip.object_inspect', cursor_pos=10, detail_level=1)
325 327 reply = KC.get_shell_msg(timeout=TIMEOUT)
326 328 validate_message(reply, 'object_info_reply', msg_id)
327 329 content = reply['content']
328 330 assert content['found']
329 331 nt.assert_equal(content['name'], 'ip.object_inspect')
330 332 text = content['data']['text/plain']
331 333 nt.assert_in('Definition:', text)
332 334 nt.assert_in('Source:', text)
333 335
334 336
335 337 def test_oinfo_not_found():
336 338 flush_channels()
337 339
338 340 msg_id = KC.object_info('dne')
339 341 reply = KC.get_shell_msg(timeout=TIMEOUT)
340 342 validate_message(reply, 'object_info_reply', msg_id)
341 343 content = reply['content']
342 344 nt.assert_false(content['found'])
343 345
344 346
345 347 def test_complete():
346 348 flush_channels()
347 349
348 350 msg_id, reply = execute(code="alpha = albert = 5")
349 351
350 352 msg_id = KC.complete('al', 2)
351 353 reply = KC.get_shell_msg(timeout=TIMEOUT)
352 354 validate_message(reply, 'complete_reply', msg_id)
353 355 matches = reply['content']['matches']
354 356 for name in ('alpha', 'albert'):
355 357 nt.assert_in(name, matches)
356 358
357 359
358 360 def test_kernel_info_request():
359 361 flush_channels()
360 362
361 363 msg_id = KC.kernel_info()
362 364 reply = KC.get_shell_msg(timeout=TIMEOUT)
363 365 validate_message(reply, 'kernel_info_reply', msg_id)
364 366
365 367
366 368 def test_single_payload():
367 369 flush_channels()
368 370 msg_id, reply = execute(code="for i in range(3):\n"+
369 371 " x=range?\n")
370 372 payload = reply['payload']
371 373 next_input_pls = [pl for pl in payload if pl["source"] == "set_next_input"]
372 374 nt.assert_equal(len(next_input_pls), 1)
373 375
374 376
375 377 # IOPub channel
376 378
377 379
378 380 def test_stream():
379 381 flush_channels()
380 382
381 383 msg_id, reply = execute("print('hi')")
382 384
383 385 stdout = KC.iopub_channel.get_msg(timeout=TIMEOUT)
384 386 validate_message(stdout, 'stream', msg_id)
385 387 content = stdout['content']
386 388 nt.assert_equal(content['name'], u'stdout')
387 389 nt.assert_equal(content['data'], u'hi\n')
388 390
389 391
390 392 def test_display_data():
391 393 flush_channels()
392 394
393 395 msg_id, reply = execute("from IPython.core.display import display; display(1)")
394 396
395 397 display = KC.iopub_channel.get_msg(timeout=TIMEOUT)
396 398 validate_message(display, 'display_data', parent=msg_id)
397 399 data = display['content']['data']
398 400 nt.assert_equal(data['text/plain'], u'1')
399 401
@@ -1,851 +1,852 b''
1 1 """An interactive kernel that talks to frontends over 0MQ."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from __future__ import print_function
7 7
8 8 import getpass
9 9 import sys
10 10 import time
11 11 import traceback
12 12 import logging
13 13 import uuid
14 14
15 15 from datetime import datetime
16 16 from signal import (
17 17 signal, default_int_handler, SIGINT
18 18 )
19 19
20 20 import zmq
21 21 from zmq.eventloop import ioloop
22 22 from zmq.eventloop.zmqstream import ZMQStream
23 23
24 24 from IPython.config.configurable import Configurable
25 25 from IPython.core.error import StdinNotImplementedError
26 26 from IPython.core import release
27 27 from IPython.utils import py3compat
28 28 from IPython.utils.py3compat import builtin_mod, unicode_type, string_types
29 29 from IPython.utils.jsonutil import json_clean
30 30 from IPython.utils.tokenutil import token_at_cursor
31 31 from IPython.utils.traitlets import (
32 32 Any, Instance, Float, Dict, List, Set, Integer, Unicode,
33 33 Type, Bool,
34 34 )
35 35
36 36 from .serialize import serialize_object, unpack_apply_message
37 37 from .session import Session
38 38 from .zmqshell import ZMQInteractiveShell
39 39
40 40
41 41 #-----------------------------------------------------------------------------
42 42 # Main kernel class
43 43 #-----------------------------------------------------------------------------
44 44
45 45 protocol_version = release.kernel_protocol_version
46 46 ipython_version = release.version
47 47 language_version = sys.version.split()[0]
48 48
49 49
50 50 class Kernel(Configurable):
51 51
52 52 #---------------------------------------------------------------------------
53 53 # Kernel interface
54 54 #---------------------------------------------------------------------------
55 55
56 56 # attribute to override with a GUI
57 57 eventloop = Any(None)
58 58 def _eventloop_changed(self, name, old, new):
59 59 """schedule call to eventloop from IOLoop"""
60 60 loop = ioloop.IOLoop.instance()
61 61 loop.add_callback(self.enter_eventloop)
62 62
63 63 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
64 64 shell_class = Type(ZMQInteractiveShell)
65 65
66 66 session = Instance(Session)
67 67 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
68 68 shell_streams = List()
69 69 control_stream = Instance(ZMQStream)
70 70 iopub_socket = Instance(zmq.Socket)
71 71 stdin_socket = Instance(zmq.Socket)
72 72 log = Instance(logging.Logger)
73 73
74 74 user_module = Any()
75 75 def _user_module_changed(self, name, old, new):
76 76 if self.shell is not None:
77 77 self.shell.user_module = new
78 78
79 79 user_ns = Instance(dict, args=None, allow_none=True)
80 80 def _user_ns_changed(self, name, old, new):
81 81 if self.shell is not None:
82 82 self.shell.user_ns = new
83 83 self.shell.init_user_ns()
84 84
85 85 # identities:
86 86 int_id = Integer(-1)
87 87 ident = Unicode()
88 88
89 89 def _ident_default(self):
90 90 return unicode_type(uuid.uuid4())
91 91
92 92 # Private interface
93 93
94 94 _darwin_app_nap = Bool(True, config=True,
95 95 help="""Whether to use appnope for compatiblity with OS X App Nap.
96 96
97 97 Only affects OS X >= 10.9.
98 98 """
99 99 )
100 100
101 101 # track associations with current request
102 102 _allow_stdin = Bool(False)
103 103 _parent_header = Dict()
104 104 _parent_ident = Any(b'')
105 105 # Time to sleep after flushing the stdout/err buffers in each execute
106 106 # cycle. While this introduces a hard limit on the minimal latency of the
107 107 # execute cycle, it helps prevent output synchronization problems for
108 108 # clients.
109 109 # Units are in seconds. The minimum zmq latency on local host is probably
110 110 # ~150 microseconds, set this to 500us for now. We may need to increase it
111 111 # a little if it's not enough after more interactive testing.
112 112 _execute_sleep = Float(0.0005, config=True)
113 113
114 114 # Frequency of the kernel's event loop.
115 115 # Units are in seconds, kernel subclasses for GUI toolkits may need to
116 116 # adapt to milliseconds.
117 117 _poll_interval = Float(0.05, config=True)
118 118
119 119 # If the shutdown was requested over the network, we leave here the
120 120 # necessary reply message so it can be sent by our registered atexit
121 121 # handler. This ensures that the reply is only sent to clients truly at
122 122 # the end of our shutdown process (which happens after the underlying
123 123 # IPython shell's own shutdown).
124 124 _shutdown_message = None
125 125
126 126 # This is a dict of port number that the kernel is listening on. It is set
127 127 # by record_ports and used by connect_request.
128 128 _recorded_ports = Dict()
129 129
130 130 # A reference to the Python builtin 'raw_input' function.
131 131 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
132 132 _sys_raw_input = Any()
133 133 _sys_eval_input = Any()
134 134
135 135 # set of aborted msg_ids
136 136 aborted = Set()
137 137
138 138
139 139 def __init__(self, **kwargs):
140 140 super(Kernel, self).__init__(**kwargs)
141 141
142 142 # Initialize the InteractiveShell subclass
143 143 self.shell = self.shell_class.instance(parent=self,
144 144 profile_dir = self.profile_dir,
145 145 user_module = self.user_module,
146 146 user_ns = self.user_ns,
147 147 kernel = self,
148 148 )
149 149 self.shell.displayhook.session = self.session
150 150 self.shell.displayhook.pub_socket = self.iopub_socket
151 151 self.shell.displayhook.topic = self._topic('execute_result')
152 152 self.shell.display_pub.session = self.session
153 153 self.shell.display_pub.pub_socket = self.iopub_socket
154 154 self.shell.data_pub.session = self.session
155 155 self.shell.data_pub.pub_socket = self.iopub_socket
156 156
157 157 # TMP - hack while developing
158 158 self.shell._reply_content = None
159 159
160 160 # Build dict of handlers for message types
161 161 msg_types = [ 'execute_request', 'complete_request',
162 162 'object_info_request', 'history_request',
163 163 'kernel_info_request',
164 164 'connect_request', 'shutdown_request',
165 165 'apply_request',
166 166 ]
167 167 self.shell_handlers = {}
168 168 for msg_type in msg_types:
169 169 self.shell_handlers[msg_type] = getattr(self, msg_type)
170 170
171 171 comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]
172 172 comm_manager = self.shell.comm_manager
173 173 for msg_type in comm_msg_types:
174 174 self.shell_handlers[msg_type] = getattr(comm_manager, msg_type)
175 175
176 176 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
177 177 self.control_handlers = {}
178 178 for msg_type in control_msg_types:
179 179 self.control_handlers[msg_type] = getattr(self, msg_type)
180 180
181 181
182 182 def dispatch_control(self, msg):
183 183 """dispatch control requests"""
184 184 idents,msg = self.session.feed_identities(msg, copy=False)
185 185 try:
186 186 msg = self.session.unserialize(msg, content=True, copy=False)
187 187 except:
188 188 self.log.error("Invalid Control Message", exc_info=True)
189 189 return
190 190
191 191 self.log.debug("Control received: %s", msg)
192 192
193 193 header = msg['header']
194 194 msg_id = header['msg_id']
195 195 msg_type = header['msg_type']
196 196
197 197 handler = self.control_handlers.get(msg_type, None)
198 198 if handler is None:
199 199 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
200 200 else:
201 201 try:
202 202 handler(self.control_stream, idents, msg)
203 203 except Exception:
204 204 self.log.error("Exception in control handler:", exc_info=True)
205 205
206 206 def dispatch_shell(self, stream, msg):
207 207 """dispatch shell requests"""
208 208 # flush control requests first
209 209 if self.control_stream:
210 210 self.control_stream.flush()
211 211
212 212 idents,msg = self.session.feed_identities(msg, copy=False)
213 213 try:
214 214 msg = self.session.unserialize(msg, content=True, copy=False)
215 215 except:
216 216 self.log.error("Invalid Message", exc_info=True)
217 217 return
218 218
219 219 header = msg['header']
220 220 msg_id = header['msg_id']
221 221 msg_type = msg['header']['msg_type']
222 222
223 223 # Print some info about this message and leave a '--->' marker, so it's
224 224 # easier to trace visually the message chain when debugging. Each
225 225 # handler prints its message at the end.
226 226 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
227 227 self.log.debug(' Content: %s\n --->\n ', msg['content'])
228 228
229 229 if msg_id in self.aborted:
230 230 self.aborted.remove(msg_id)
231 231 # is it safe to assume a msg_id will not be resubmitted?
232 232 reply_type = msg_type.split('_')[0] + '_reply'
233 233 status = {'status' : 'aborted'}
234 234 md = {'engine' : self.ident}
235 235 md.update(status)
236 236 reply_msg = self.session.send(stream, reply_type, metadata=md,
237 237 content=status, parent=msg, ident=idents)
238 238 return
239 239
240 240 handler = self.shell_handlers.get(msg_type, None)
241 241 if handler is None:
242 242 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
243 243 else:
244 244 # ensure default_int_handler during handler call
245 245 sig = signal(SIGINT, default_int_handler)
246 246 self.log.debug("%s: %s", msg_type, msg)
247 247 try:
248 248 handler(stream, idents, msg)
249 249 except Exception:
250 250 self.log.error("Exception in message handler:", exc_info=True)
251 251 finally:
252 252 signal(SIGINT, sig)
253 253
254 254 def enter_eventloop(self):
255 255 """enter eventloop"""
256 256 self.log.info("entering eventloop %s", self.eventloop)
257 257 for stream in self.shell_streams:
258 258 # flush any pending replies,
259 259 # which may be skipped by entering the eventloop
260 260 stream.flush(zmq.POLLOUT)
261 261 # restore default_int_handler
262 262 signal(SIGINT, default_int_handler)
263 263 while self.eventloop is not None:
264 264 try:
265 265 self.eventloop(self)
266 266 except KeyboardInterrupt:
267 267 # Ctrl-C shouldn't crash the kernel
268 268 self.log.error("KeyboardInterrupt caught in kernel")
269 269 continue
270 270 else:
271 271 # eventloop exited cleanly, this means we should stop (right?)
272 272 self.eventloop = None
273 273 break
274 274 self.log.info("exiting eventloop")
275 275
276 276 def start(self):
277 277 """register dispatchers for streams"""
278 278 self.shell.exit_now = False
279 279 if self.control_stream:
280 280 self.control_stream.on_recv(self.dispatch_control, copy=False)
281 281
282 282 def make_dispatcher(stream):
283 283 def dispatcher(msg):
284 284 return self.dispatch_shell(stream, msg)
285 285 return dispatcher
286 286
287 287 for s in self.shell_streams:
288 288 s.on_recv(make_dispatcher(s), copy=False)
289 289
290 290 # publish idle status
291 291 self._publish_status('starting')
292 292
293 293 def do_one_iteration(self):
294 294 """step eventloop just once"""
295 295 if self.control_stream:
296 296 self.control_stream.flush()
297 297 for stream in self.shell_streams:
298 298 # handle at most one request per iteration
299 299 stream.flush(zmq.POLLIN, 1)
300 300 stream.flush(zmq.POLLOUT)
301 301
302 302
303 303 def record_ports(self, ports):
304 304 """Record the ports that this kernel is using.
305 305
306 306 The creator of the Kernel instance must call this methods if they
307 307 want the :meth:`connect_request` method to return the port numbers.
308 308 """
309 309 self._recorded_ports = ports
310 310
311 311 #---------------------------------------------------------------------------
312 312 # Kernel request handlers
313 313 #---------------------------------------------------------------------------
314 314
315 315 def _make_metadata(self, other=None):
316 316 """init metadata dict, for execute/apply_reply"""
317 317 new_md = {
318 318 'dependencies_met' : True,
319 319 'engine' : self.ident,
320 320 'started': datetime.now(),
321 321 }
322 322 if other:
323 323 new_md.update(other)
324 324 return new_md
325 325
326 326 def _publish_execute_input(self, code, parent, execution_count):
327 327 """Publish the code request on the iopub stream."""
328 328
329 329 self.session.send(self.iopub_socket, u'execute_input',
330 330 {u'code':code, u'execution_count': execution_count},
331 331 parent=parent, ident=self._topic('execute_input')
332 332 )
333 333
334 334 def _publish_status(self, status, parent=None):
335 335 """send status (busy/idle) on IOPub"""
336 336 self.session.send(self.iopub_socket,
337 337 u'status',
338 338 {u'execution_state': status},
339 339 parent=parent,
340 340 ident=self._topic('status'),
341 341 )
342 342
343 343 def _forward_input(self, allow_stdin=False):
344 344 """Forward raw_input and getpass to the current frontend.
345 345
346 346 via input_request
347 347 """
348 348 self._allow_stdin = allow_stdin
349 349
350 350 if py3compat.PY3:
351 351 self._sys_raw_input = builtin_mod.input
352 352 builtin_mod.input = self.raw_input
353 353 else:
354 354 self._sys_raw_input = builtin_mod.raw_input
355 355 self._sys_eval_input = builtin_mod.input
356 356 builtin_mod.raw_input = self.raw_input
357 357 builtin_mod.input = lambda prompt='': eval(self.raw_input(prompt))
358 358 self._save_getpass = getpass.getpass
359 359 getpass.getpass = self.getpass
360 360
361 361 def _restore_input(self):
362 362 """Restore raw_input, getpass"""
363 363 if py3compat.PY3:
364 364 builtin_mod.input = self._sys_raw_input
365 365 else:
366 366 builtin_mod.raw_input = self._sys_raw_input
367 367 builtin_mod.input = self._sys_eval_input
368 368
369 369 getpass.getpass = self._save_getpass
370 370
371 371 def set_parent(self, ident, parent):
372 372 """Record the parent state
373 373
374 374 For associating side effects with their requests.
375 375 """
376 376 self._parent_ident = ident
377 377 self._parent_header = parent
378 378 self.shell.set_parent(parent)
379 379
380 380 def execute_request(self, stream, ident, parent):
381 381 """handle an execute_request"""
382 382
383 383 self._publish_status(u'busy', parent)
384 384
385 385 try:
386 386 content = parent[u'content']
387 387 code = py3compat.cast_unicode_py2(content[u'code'])
388 388 silent = content[u'silent']
389 389 store_history = content.get(u'store_history', not silent)
390 390 except:
391 391 self.log.error("Got bad msg: ")
392 392 self.log.error("%s", parent)
393 393 return
394 394
395 395 md = self._make_metadata(parent['metadata'])
396 396
397 397 shell = self.shell # we'll need this a lot here
398 398
399 399 self._forward_input(content.get('allow_stdin', False))
400 400 # Set the parent message of the display hook and out streams.
401 401 self.set_parent(ident, parent)
402 402
403 403 # Re-broadcast our input for the benefit of listening clients, and
404 404 # start computing output
405 405 if not silent:
406 406 self._publish_execute_input(code, parent, shell.execution_count)
407 407
408 408 reply_content = {}
409 409 # FIXME: the shell calls the exception handler itself.
410 410 shell._reply_content = None
411 411 try:
412 412 shell.run_cell(code, store_history=store_history, silent=silent)
413 413 except:
414 414 status = u'error'
415 415 # FIXME: this code right now isn't being used yet by default,
416 416 # because the run_cell() call above directly fires off exception
417 417 # reporting. This code, therefore, is only active in the scenario
418 418 # where runlines itself has an unhandled exception. We need to
419 419 # uniformize this, for all exception construction to come from a
420 420 # single location in the codbase.
421 421 etype, evalue, tb = sys.exc_info()
422 422 tb_list = traceback.format_exception(etype, evalue, tb)
423 423 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
424 424 else:
425 425 status = u'ok'
426 426 finally:
427 427 self._restore_input()
428 428
429 429 reply_content[u'status'] = status
430 430
431 431 # Return the execution counter so clients can display prompts
432 432 reply_content['execution_count'] = shell.execution_count - 1
433 433
434 434 # FIXME - fish exception info out of shell, possibly left there by
435 435 # runlines. We'll need to clean up this logic later.
436 436 if shell._reply_content is not None:
437 437 reply_content.update(shell._reply_content)
438 438 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
439 439 reply_content['engine_info'] = e_info
440 440 # reset after use
441 441 shell._reply_content = None
442 442
443 443 if 'traceback' in reply_content:
444 444 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
445 445
446 446
447 447 # At this point, we can tell whether the main code execution succeeded
448 448 # or not. If it did, we proceed to evaluate user_expressions
449 449 if reply_content['status'] == 'ok':
450 450 reply_content[u'user_expressions'] = \
451 451 shell.user_expressions(content.get(u'user_expressions', {}))
452 452 else:
453 453 # If there was an error, don't even try to compute expressions
454 454 reply_content[u'user_expressions'] = {}
455 455
456 456 # Payloads should be retrieved regardless of outcome, so we can both
457 457 # recover partial output (that could have been generated early in a
458 458 # block, before an error) and clear the payload system always.
459 459 reply_content[u'payload'] = shell.payload_manager.read_payload()
460 460 # Be agressive about clearing the payload because we don't want
461 461 # it to sit in memory until the next execute_request comes in.
462 462 shell.payload_manager.clear_payload()
463 463
464 464 # Flush output before sending the reply.
465 465 sys.stdout.flush()
466 466 sys.stderr.flush()
467 467 # FIXME: on rare occasions, the flush doesn't seem to make it to the
468 468 # clients... This seems to mitigate the problem, but we definitely need
469 469 # to better understand what's going on.
470 470 if self._execute_sleep:
471 471 time.sleep(self._execute_sleep)
472 472
473 473 # Send the reply.
474 474 reply_content = json_clean(reply_content)
475 475
476 476 md['status'] = reply_content['status']
477 477 if reply_content['status'] == 'error' and \
478 478 reply_content['ename'] == 'UnmetDependency':
479 479 md['dependencies_met'] = False
480 480
481 481 reply_msg = self.session.send(stream, u'execute_reply',
482 482 reply_content, parent, metadata=md,
483 483 ident=ident)
484 484
485 485 self.log.debug("%s", reply_msg)
486 486
487 487 if not silent and reply_msg['content']['status'] == u'error':
488 488 self._abort_queues()
489 489
490 490 self._publish_status(u'idle', parent)
491 491
492 492 def complete_request(self, stream, ident, parent):
493 493 content = parent['content']
494 494 code = content['code']
495 495 cursor_pos = content['cursor_pos']
496 496
497 497 txt, matches = self.shell.complete('', code, cursor_pos)
498 498 matches = {'matches' : matches,
499 499 'matched_text' : txt,
500 500 'status' : 'ok'}
501 501 matches = json_clean(matches)
502 502 completion_msg = self.session.send(stream, 'complete_reply',
503 503 matches, parent, ident)
504 504 self.log.debug("%s", completion_msg)
505 505
506 506 def object_info_request(self, stream, ident, parent):
507 507 content = parent['content']
508 508
509 509 name = token_at_cursor(content['code'], content['cursor_pos'])
510 510 info = self.shell.object_inspect(name)
511 511
512 512 reply_content = {'status' : 'ok'}
513 513 reply_content['data'] = data = {}
514 514 reply_content['metadata'] = {}
515 515 reply_content['name'] = name
516 516 reply_content['found'] = info['found']
517 517 if info['found']:
518 518 info_text = self.shell.object_inspect_text(
519 519 name,
520 520 detail_level=content.get('detail_level', 0),
521 521 )
522 522 reply_content['data']['text/plain'] = info_text
523 523 # Before we send this object over, we scrub it for JSON usage
524 524 reply_content = json_clean(reply_content)
525 525 msg = self.session.send(stream, 'object_info_reply',
526 526 reply_content, parent, ident)
527 527 self.log.debug("%s", msg)
528 528
529 529 def history_request(self, stream, ident, parent):
530 530 # We need to pull these out, as passing **kwargs doesn't work with
531 531 # unicode keys before Python 2.6.5.
532 532 hist_access_type = parent['content']['hist_access_type']
533 533 raw = parent['content']['raw']
534 534 output = parent['content']['output']
535 535 if hist_access_type == 'tail':
536 536 n = parent['content']['n']
537 537 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
538 538 include_latest=True)
539 539
540 540 elif hist_access_type == 'range':
541 541 session = parent['content']['session']
542 542 start = parent['content']['start']
543 543 stop = parent['content']['stop']
544 544 hist = self.shell.history_manager.get_range(session, start, stop,
545 545 raw=raw, output=output)
546 546
547 547 elif hist_access_type == 'search':
548 548 n = parent['content'].get('n')
549 549 unique = parent['content'].get('unique', False)
550 550 pattern = parent['content']['pattern']
551 551 hist = self.shell.history_manager.search(
552 552 pattern, raw=raw, output=output, n=n, unique=unique)
553 553
554 554 else:
555 555 hist = []
556 556 hist = list(hist)
557 557 content = {'history' : hist}
558 558 content = json_clean(content)
559 559 msg = self.session.send(stream, 'history_reply',
560 560 content, parent, ident)
561 561 self.log.debug("Sending history reply with %i entries", len(hist))
562 562
563 563 def connect_request(self, stream, ident, parent):
564 564 if self._recorded_ports is not None:
565 565 content = self._recorded_ports.copy()
566 566 else:
567 567 content = {}
568 568 msg = self.session.send(stream, 'connect_reply',
569 569 content, parent, ident)
570 570 self.log.debug("%s", msg)
571 571
572 572 def kernel_info_request(self, stream, ident, parent):
573 573 vinfo = {
574 574 'protocol_version': protocol_version,
575 'ipython_version': ipython_version,
575 'implementation': 'ipython',
576 'implementation_version': ipython_version,
576 577 'language_version': language_version,
577 578 'language': 'python',
578 579 'banner': self.shell.banner,
579 580 }
580 581 msg = self.session.send(stream, 'kernel_info_reply',
581 582 vinfo, parent, ident)
582 583 self.log.debug("%s", msg)
583 584
584 585 def shutdown_request(self, stream, ident, parent):
585 586 self.shell.exit_now = True
586 587 content = dict(status='ok')
587 588 content.update(parent['content'])
588 589 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
589 590 # same content, but different msg_id for broadcasting on IOPub
590 591 self._shutdown_message = self.session.msg(u'shutdown_reply',
591 592 content, parent
592 593 )
593 594
594 595 self._at_shutdown()
595 596 # call sys.exit after a short delay
596 597 loop = ioloop.IOLoop.instance()
597 598 loop.add_timeout(time.time()+0.1, loop.stop)
598 599
599 600 #---------------------------------------------------------------------------
600 601 # Engine methods
601 602 #---------------------------------------------------------------------------
602 603
603 604 def apply_request(self, stream, ident, parent):
604 605 try:
605 606 content = parent[u'content']
606 607 bufs = parent[u'buffers']
607 608 msg_id = parent['header']['msg_id']
608 609 except:
609 610 self.log.error("Got bad msg: %s", parent, exc_info=True)
610 611 return
611 612
612 613 self._publish_status(u'busy', parent)
613 614
614 615 # Set the parent message of the display hook and out streams.
615 616 shell = self.shell
616 617 shell.set_parent(parent)
617 618
618 619 # execute_input_msg = self.session.msg(u'execute_input',{u'code':code}, parent=parent)
619 620 # self.iopub_socket.send(execute_input_msg)
620 621 # self.session.send(self.iopub_socket, u'execute_input', {u'code':code},parent=parent)
621 622 md = self._make_metadata(parent['metadata'])
622 623 try:
623 624 working = shell.user_ns
624 625
625 626 prefix = "_"+str(msg_id).replace("-","")+"_"
626 627
627 628 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
628 629
629 630 fname = getattr(f, '__name__', 'f')
630 631
631 632 fname = prefix+"f"
632 633 argname = prefix+"args"
633 634 kwargname = prefix+"kwargs"
634 635 resultname = prefix+"result"
635 636
636 637 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
637 638 # print ns
638 639 working.update(ns)
639 640 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
640 641 try:
641 642 exec(code, shell.user_global_ns, shell.user_ns)
642 643 result = working.get(resultname)
643 644 finally:
644 645 for key in ns:
645 646 working.pop(key)
646 647
647 648 result_buf = serialize_object(result,
648 649 buffer_threshold=self.session.buffer_threshold,
649 650 item_threshold=self.session.item_threshold,
650 651 )
651 652
652 653 except:
653 654 # invoke IPython traceback formatting
654 655 shell.showtraceback()
655 656 # FIXME - fish exception info out of shell, possibly left there by
656 657 # run_code. We'll need to clean up this logic later.
657 658 reply_content = {}
658 659 if shell._reply_content is not None:
659 660 reply_content.update(shell._reply_content)
660 661 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
661 662 reply_content['engine_info'] = e_info
662 663 # reset after use
663 664 shell._reply_content = None
664 665
665 666 self.session.send(self.iopub_socket, u'error', reply_content, parent=parent,
666 667 ident=self._topic('error'))
667 668 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
668 669 result_buf = []
669 670
670 671 if reply_content['ename'] == 'UnmetDependency':
671 672 md['dependencies_met'] = False
672 673 else:
673 674 reply_content = {'status' : 'ok'}
674 675
675 676 # put 'ok'/'error' status in header, for scheduler introspection:
676 677 md['status'] = reply_content['status']
677 678
678 679 # flush i/o
679 680 sys.stdout.flush()
680 681 sys.stderr.flush()
681 682
682 683 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
683 684 parent=parent, ident=ident,buffers=result_buf, metadata=md)
684 685
685 686 self._publish_status(u'idle', parent)
686 687
687 688 #---------------------------------------------------------------------------
688 689 # Control messages
689 690 #---------------------------------------------------------------------------
690 691
691 692 def abort_request(self, stream, ident, parent):
692 693 """abort a specifig msg by id"""
693 694 msg_ids = parent['content'].get('msg_ids', None)
694 695 if isinstance(msg_ids, string_types):
695 696 msg_ids = [msg_ids]
696 697 if not msg_ids:
697 698 self.abort_queues()
698 699 for mid in msg_ids:
699 700 self.aborted.add(str(mid))
700 701
701 702 content = dict(status='ok')
702 703 reply_msg = self.session.send(stream, 'abort_reply', content=content,
703 704 parent=parent, ident=ident)
704 705 self.log.debug("%s", reply_msg)
705 706
706 707 def clear_request(self, stream, idents, parent):
707 708 """Clear our namespace."""
708 709 self.shell.reset(False)
709 710 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
710 711 content = dict(status='ok'))
711 712
712 713
713 714 #---------------------------------------------------------------------------
714 715 # Protected interface
715 716 #---------------------------------------------------------------------------
716 717
717 718 def _wrap_exception(self, method=None):
718 719 # import here, because _wrap_exception is only used in parallel,
719 720 # and parallel has higher min pyzmq version
720 721 from IPython.parallel.error import wrap_exception
721 722 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
722 723 content = wrap_exception(e_info)
723 724 return content
724 725
725 726 def _topic(self, topic):
726 727 """prefixed topic for IOPub messages"""
727 728 if self.int_id >= 0:
728 729 base = "engine.%i" % self.int_id
729 730 else:
730 731 base = "kernel.%s" % self.ident
731 732
732 733 return py3compat.cast_bytes("%s.%s" % (base, topic))
733 734
734 735 def _abort_queues(self):
735 736 for stream in self.shell_streams:
736 737 if stream:
737 738 self._abort_queue(stream)
738 739
739 740 def _abort_queue(self, stream):
740 741 poller = zmq.Poller()
741 742 poller.register(stream.socket, zmq.POLLIN)
742 743 while True:
743 744 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
744 745 if msg is None:
745 746 return
746 747
747 748 self.log.info("Aborting:")
748 749 self.log.info("%s", msg)
749 750 msg_type = msg['header']['msg_type']
750 751 reply_type = msg_type.split('_')[0] + '_reply'
751 752
752 753 status = {'status' : 'aborted'}
753 754 md = {'engine' : self.ident}
754 755 md.update(status)
755 756 reply_msg = self.session.send(stream, reply_type, metadata=md,
756 757 content=status, parent=msg, ident=idents)
757 758 self.log.debug("%s", reply_msg)
758 759 # We need to wait a bit for requests to come in. This can probably
759 760 # be set shorter for true asynchronous clients.
760 761 poller.poll(50)
761 762
762 763
763 764 def _no_raw_input(self):
764 765 """Raise StdinNotImplentedError if active frontend doesn't support
765 766 stdin."""
766 767 raise StdinNotImplementedError("raw_input was called, but this "
767 768 "frontend does not support stdin.")
768 769
769 770 def getpass(self, prompt=''):
770 771 """Forward getpass to frontends
771 772
772 773 Raises
773 774 ------
774 775 StdinNotImplentedError if active frontend doesn't support stdin.
775 776 """
776 777 if not self._allow_stdin:
777 778 raise StdinNotImplementedError(
778 779 "getpass was called, but this frontend does not support input requests."
779 780 )
780 781 return self._input_request(prompt,
781 782 self._parent_ident,
782 783 self._parent_header,
783 784 password=True,
784 785 )
785 786
786 787 def raw_input(self, prompt=''):
787 788 """Forward raw_input to frontends
788 789
789 790 Raises
790 791 ------
791 792 StdinNotImplentedError if active frontend doesn't support stdin.
792 793 """
793 794 if not self._allow_stdin:
794 795 raise StdinNotImplementedError(
795 796 "raw_input was called, but this frontend does not support input requests."
796 797 )
797 798 return self._input_request(prompt,
798 799 self._parent_ident,
799 800 self._parent_header,
800 801 password=False,
801 802 )
802 803
803 804 def _input_request(self, prompt, ident, parent, password=False):
804 805 # Flush output before making the request.
805 806 sys.stderr.flush()
806 807 sys.stdout.flush()
807 808 # flush the stdin socket, to purge stale replies
808 809 while True:
809 810 try:
810 811 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
811 812 except zmq.ZMQError as e:
812 813 if e.errno == zmq.EAGAIN:
813 814 break
814 815 else:
815 816 raise
816 817
817 818 # Send the input request.
818 819 content = json_clean(dict(prompt=prompt, password=password))
819 820 self.session.send(self.stdin_socket, u'input_request', content, parent,
820 821 ident=ident)
821 822
822 823 # Await a response.
823 824 while True:
824 825 try:
825 826 ident, reply = self.session.recv(self.stdin_socket, 0)
826 827 except Exception:
827 828 self.log.warn("Invalid Message:", exc_info=True)
828 829 except KeyboardInterrupt:
829 830 # re-raise KeyboardInterrupt, to truncate traceback
830 831 raise KeyboardInterrupt
831 832 else:
832 833 break
833 834 try:
834 835 value = py3compat.unicode_to_str(reply['content']['value'])
835 836 except:
836 837 self.log.error("Bad input_reply: %s", parent)
837 838 value = ''
838 839 if value == '\x04':
839 840 # EOF
840 841 raise EOFError
841 842 return value
842 843
843 844 def _at_shutdown(self):
844 845 """Actions taken at shutdown by the kernel, called by python's atexit.
845 846 """
846 847 # io.rprint("Kernel at_shutdown") # dbg
847 848 if self._shutdown_message is not None:
848 849 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
849 850 self.log.debug("%s", self._shutdown_message)
850 851 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
851 852
General Comments 0
You need to be logged in to leave comments. Login now