##// END OF EJS Templates
clear _reply_content cache before using it...
MinRK -
Show More
@@ -1,799 +1,800
1 1 #!/usr/bin/env python
2 2 """An interactive kernel that talks to frontends over 0MQ."""
3 3
4 4 #-----------------------------------------------------------------------------
5 5 # Imports
6 6 #-----------------------------------------------------------------------------
7 7 from __future__ import print_function
8 8
9 9 # Standard library imports
10 10 import sys
11 11 import time
12 12 import traceback
13 13 import logging
14 14 import uuid
15 15
16 16 from datetime import datetime
17 17 from signal import (
18 18 signal, default_int_handler, SIGINT
19 19 )
20 20
21 21 # System library imports
22 22 import zmq
23 23 from zmq.eventloop import ioloop
24 24 from zmq.eventloop.zmqstream import ZMQStream
25 25
26 26 # Local imports
27 27 from IPython.config.configurable import Configurable
28 28 from IPython.core.error import StdinNotImplementedError
29 29 from IPython.core import release
30 30 from IPython.utils import py3compat
31 31 from IPython.utils.py3compat import builtin_mod, unicode_type, string_types
32 32 from IPython.utils.jsonutil import json_clean
33 33 from IPython.utils.traitlets import (
34 34 Any, Instance, Float, Dict, List, Set, Integer, Unicode,
35 35 Type, Bool,
36 36 )
37 37
38 38 from .serialize import serialize_object, unpack_apply_message
39 39 from .session import Session
40 40 from .zmqshell import ZMQInteractiveShell
41 41
42 42
43 43 #-----------------------------------------------------------------------------
44 44 # Main kernel class
45 45 #-----------------------------------------------------------------------------
46 46
47 47 protocol_version = list(release.kernel_protocol_version_info)
48 48 ipython_version = list(release.version_info)
49 49 language_version = list(sys.version_info[:3])
50 50
51 51
52 52 class Kernel(Configurable):
53 53
54 54 #---------------------------------------------------------------------------
55 55 # Kernel interface
56 56 #---------------------------------------------------------------------------
57 57
58 58 # attribute to override with a GUI
59 59 eventloop = Any(None)
60 60 def _eventloop_changed(self, name, old, new):
61 61 """schedule call to eventloop from IOLoop"""
62 62 loop = ioloop.IOLoop.instance()
63 63 loop.add_callback(self.enter_eventloop)
64 64
65 65 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
66 66 shell_class = Type(ZMQInteractiveShell)
67 67
68 68 session = Instance(Session)
69 69 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
70 70 shell_streams = List()
71 71 control_stream = Instance(ZMQStream)
72 72 iopub_socket = Instance(zmq.Socket)
73 73 stdin_socket = Instance(zmq.Socket)
74 74 log = Instance(logging.Logger)
75 75
76 76 user_module = Any()
77 77 def _user_module_changed(self, name, old, new):
78 78 if self.shell is not None:
79 79 self.shell.user_module = new
80 80
81 81 user_ns = Instance(dict, args=None, allow_none=True)
82 82 def _user_ns_changed(self, name, old, new):
83 83 if self.shell is not None:
84 84 self.shell.user_ns = new
85 85 self.shell.init_user_ns()
86 86
87 87 # identities:
88 88 int_id = Integer(-1)
89 89 ident = Unicode()
90 90
91 91 def _ident_default(self):
92 92 return unicode_type(uuid.uuid4())
93 93
94 94 # Private interface
95 95
96 96 _darwin_app_nap = Bool(True, config=True,
97 97 help="""Whether to use appnope for compatiblity with OS X App Nap.
98 98
99 99 Only affects OS X >= 10.9.
100 100 """
101 101 )
102 102
103 103 # Time to sleep after flushing the stdout/err buffers in each execute
104 104 # cycle. While this introduces a hard limit on the minimal latency of the
105 105 # execute cycle, it helps prevent output synchronization problems for
106 106 # clients.
107 107 # Units are in seconds. The minimum zmq latency on local host is probably
108 108 # ~150 microseconds, set this to 500us for now. We may need to increase it
109 109 # a little if it's not enough after more interactive testing.
110 110 _execute_sleep = Float(0.0005, config=True)
111 111
112 112 # Frequency of the kernel's event loop.
113 113 # Units are in seconds, kernel subclasses for GUI toolkits may need to
114 114 # adapt to milliseconds.
115 115 _poll_interval = Float(0.05, config=True)
116 116
117 117 # If the shutdown was requested over the network, we leave here the
118 118 # necessary reply message so it can be sent by our registered atexit
119 119 # handler. This ensures that the reply is only sent to clients truly at
120 120 # the end of our shutdown process (which happens after the underlying
121 121 # IPython shell's own shutdown).
122 122 _shutdown_message = None
123 123
124 124 # This is a dict of port number that the kernel is listening on. It is set
125 125 # by record_ports and used by connect_request.
126 126 _recorded_ports = Dict()
127 127
128 128 # A reference to the Python builtin 'raw_input' function.
129 129 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
130 130 _sys_raw_input = Any()
131 131 _sys_eval_input = Any()
132 132
133 133 # set of aborted msg_ids
134 134 aborted = Set()
135 135
136 136
137 137 def __init__(self, **kwargs):
138 138 super(Kernel, self).__init__(**kwargs)
139 139
140 140 # Initialize the InteractiveShell subclass
141 141 self.shell = self.shell_class.instance(parent=self,
142 142 profile_dir = self.profile_dir,
143 143 user_module = self.user_module,
144 144 user_ns = self.user_ns,
145 145 kernel = self,
146 146 )
147 147 self.shell.displayhook.session = self.session
148 148 self.shell.displayhook.pub_socket = self.iopub_socket
149 149 self.shell.displayhook.topic = self._topic('pyout')
150 150 self.shell.display_pub.session = self.session
151 151 self.shell.display_pub.pub_socket = self.iopub_socket
152 152 self.shell.data_pub.session = self.session
153 153 self.shell.data_pub.pub_socket = self.iopub_socket
154 154
155 155 # TMP - hack while developing
156 156 self.shell._reply_content = None
157 157
158 158 # Build dict of handlers for message types
159 159 msg_types = [ 'execute_request', 'complete_request',
160 160 'object_info_request', 'history_request',
161 161 'kernel_info_request',
162 162 'connect_request', 'shutdown_request',
163 163 'apply_request',
164 164 ]
165 165 self.shell_handlers = {}
166 166 for msg_type in msg_types:
167 167 self.shell_handlers[msg_type] = getattr(self, msg_type)
168 168
169 169 comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]
170 170 comm_manager = self.shell.comm_manager
171 171 for msg_type in comm_msg_types:
172 172 self.shell_handlers[msg_type] = getattr(comm_manager, msg_type)
173 173
174 174 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
175 175 self.control_handlers = {}
176 176 for msg_type in control_msg_types:
177 177 self.control_handlers[msg_type] = getattr(self, msg_type)
178 178
179 179
180 180 def dispatch_control(self, msg):
181 181 """dispatch control requests"""
182 182 idents,msg = self.session.feed_identities(msg, copy=False)
183 183 try:
184 184 msg = self.session.unserialize(msg, content=True, copy=False)
185 185 except:
186 186 self.log.error("Invalid Control Message", exc_info=True)
187 187 return
188 188
189 189 self.log.debug("Control received: %s", msg)
190 190
191 191 header = msg['header']
192 192 msg_id = header['msg_id']
193 193 msg_type = header['msg_type']
194 194
195 195 handler = self.control_handlers.get(msg_type, None)
196 196 if handler is None:
197 197 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
198 198 else:
199 199 try:
200 200 handler(self.control_stream, idents, msg)
201 201 except Exception:
202 202 self.log.error("Exception in control handler:", exc_info=True)
203 203
204 204 def dispatch_shell(self, stream, msg):
205 205 """dispatch shell requests"""
206 206 # flush control requests first
207 207 if self.control_stream:
208 208 self.control_stream.flush()
209 209
210 210 idents,msg = self.session.feed_identities(msg, copy=False)
211 211 try:
212 212 msg = self.session.unserialize(msg, content=True, copy=False)
213 213 except:
214 214 self.log.error("Invalid Message", exc_info=True)
215 215 return
216 216
217 217 header = msg['header']
218 218 msg_id = header['msg_id']
219 219 msg_type = msg['header']['msg_type']
220 220
221 221 # Print some info about this message and leave a '--->' marker, so it's
222 222 # easier to trace visually the message chain when debugging. Each
223 223 # handler prints its message at the end.
224 224 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
225 225 self.log.debug(' Content: %s\n --->\n ', msg['content'])
226 226
227 227 if msg_id in self.aborted:
228 228 self.aborted.remove(msg_id)
229 229 # is it safe to assume a msg_id will not be resubmitted?
230 230 reply_type = msg_type.split('_')[0] + '_reply'
231 231 status = {'status' : 'aborted'}
232 232 md = {'engine' : self.ident}
233 233 md.update(status)
234 234 reply_msg = self.session.send(stream, reply_type, metadata=md,
235 235 content=status, parent=msg, ident=idents)
236 236 return
237 237
238 238 handler = self.shell_handlers.get(msg_type, None)
239 239 if handler is None:
240 240 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
241 241 else:
242 242 # ensure default_int_handler during handler call
243 243 sig = signal(SIGINT, default_int_handler)
244 244 try:
245 245 handler(stream, idents, msg)
246 246 except Exception:
247 247 self.log.error("Exception in message handler:", exc_info=True)
248 248 finally:
249 249 signal(SIGINT, sig)
250 250
251 251 def enter_eventloop(self):
252 252 """enter eventloop"""
253 253 self.log.info("entering eventloop %s", self.eventloop)
254 254 for stream in self.shell_streams:
255 255 # flush any pending replies,
256 256 # which may be skipped by entering the eventloop
257 257 stream.flush(zmq.POLLOUT)
258 258 # restore default_int_handler
259 259 signal(SIGINT, default_int_handler)
260 260 while self.eventloop is not None:
261 261 try:
262 262 self.eventloop(self)
263 263 except KeyboardInterrupt:
264 264 # Ctrl-C shouldn't crash the kernel
265 265 self.log.error("KeyboardInterrupt caught in kernel")
266 266 continue
267 267 else:
268 268 # eventloop exited cleanly, this means we should stop (right?)
269 269 self.eventloop = None
270 270 break
271 271 self.log.info("exiting eventloop")
272 272
273 273 def start(self):
274 274 """register dispatchers for streams"""
275 275 self.shell.exit_now = False
276 276 if self.control_stream:
277 277 self.control_stream.on_recv(self.dispatch_control, copy=False)
278 278
279 279 def make_dispatcher(stream):
280 280 def dispatcher(msg):
281 281 return self.dispatch_shell(stream, msg)
282 282 return dispatcher
283 283
284 284 for s in self.shell_streams:
285 285 s.on_recv(make_dispatcher(s), copy=False)
286 286
287 287 # publish idle status
288 288 self._publish_status('starting')
289 289
290 290 def do_one_iteration(self):
291 291 """step eventloop just once"""
292 292 if self.control_stream:
293 293 self.control_stream.flush()
294 294 for stream in self.shell_streams:
295 295 # handle at most one request per iteration
296 296 stream.flush(zmq.POLLIN, 1)
297 297 stream.flush(zmq.POLLOUT)
298 298
299 299
300 300 def record_ports(self, ports):
301 301 """Record the ports that this kernel is using.
302 302
303 303 The creator of the Kernel instance must call this methods if they
304 304 want the :meth:`connect_request` method to return the port numbers.
305 305 """
306 306 self._recorded_ports = ports
307 307
308 308 #---------------------------------------------------------------------------
309 309 # Kernel request handlers
310 310 #---------------------------------------------------------------------------
311 311
312 312 def _make_metadata(self, other=None):
313 313 """init metadata dict, for execute/apply_reply"""
314 314 new_md = {
315 315 'dependencies_met' : True,
316 316 'engine' : self.ident,
317 317 'started': datetime.now(),
318 318 }
319 319 if other:
320 320 new_md.update(other)
321 321 return new_md
322 322
323 323 def _publish_pyin(self, code, parent, execution_count):
324 324 """Publish the code request on the pyin stream."""
325 325
326 326 self.session.send(self.iopub_socket, u'pyin',
327 327 {u'code':code, u'execution_count': execution_count},
328 328 parent=parent, ident=self._topic('pyin')
329 329 )
330 330
331 331 def _publish_status(self, status, parent=None):
332 332 """send status (busy/idle) on IOPub"""
333 333 self.session.send(self.iopub_socket,
334 334 u'status',
335 335 {u'execution_state': status},
336 336 parent=parent,
337 337 ident=self._topic('status'),
338 338 )
339 339
340 340
341 341 def execute_request(self, stream, ident, parent):
342 342 """handle an execute_request"""
343 343
344 344 self._publish_status(u'busy', parent)
345 345
346 346 try:
347 347 content = parent[u'content']
348 348 code = py3compat.cast_unicode_py2(content[u'code'])
349 349 silent = content[u'silent']
350 350 store_history = content.get(u'store_history', not silent)
351 351 except:
352 352 self.log.error("Got bad msg: ")
353 353 self.log.error("%s", parent)
354 354 return
355 355
356 356 md = self._make_metadata(parent['metadata'])
357 357
358 358 shell = self.shell # we'll need this a lot here
359 359
360 360 # Replace raw_input. Note that is not sufficient to replace
361 361 # raw_input in the user namespace.
362 362 if content.get('allow_stdin', False):
363 363 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
364 364 input = lambda prompt='': eval(raw_input(prompt))
365 365 else:
366 366 raw_input = input = lambda prompt='' : self._no_raw_input()
367 367
368 368 if py3compat.PY3:
369 369 self._sys_raw_input = builtin_mod.input
370 370 builtin_mod.input = raw_input
371 371 else:
372 372 self._sys_raw_input = builtin_mod.raw_input
373 373 self._sys_eval_input = builtin_mod.input
374 374 builtin_mod.raw_input = raw_input
375 375 builtin_mod.input = input
376 376
377 377 # Set the parent message of the display hook and out streams.
378 378 shell.set_parent(parent)
379 379
380 380 # Re-broadcast our input for the benefit of listening clients, and
381 381 # start computing output
382 382 if not silent:
383 383 self._publish_pyin(code, parent, shell.execution_count)
384 384
385 385 reply_content = {}
386 try:
387 386 # FIXME: the shell calls the exception handler itself.
387 shell._reply_content = None
388 try:
388 389 shell.run_cell(code, store_history=store_history, silent=silent)
389 390 except:
390 391 status = u'error'
391 392 # FIXME: this code right now isn't being used yet by default,
392 393 # because the run_cell() call above directly fires off exception
393 394 # reporting. This code, therefore, is only active in the scenario
394 395 # where runlines itself has an unhandled exception. We need to
395 396 # uniformize this, for all exception construction to come from a
396 397 # single location in the codbase.
397 398 etype, evalue, tb = sys.exc_info()
398 399 tb_list = traceback.format_exception(etype, evalue, tb)
399 400 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
400 401 else:
401 402 status = u'ok'
402 403 finally:
403 404 # Restore raw_input.
404 405 if py3compat.PY3:
405 406 builtin_mod.input = self._sys_raw_input
406 407 else:
407 408 builtin_mod.raw_input = self._sys_raw_input
408 409 builtin_mod.input = self._sys_eval_input
409 410
410 411 reply_content[u'status'] = status
411 412
412 413 # Return the execution counter so clients can display prompts
413 414 reply_content['execution_count'] = shell.execution_count - 1
414 415
415 416 # FIXME - fish exception info out of shell, possibly left there by
416 417 # runlines. We'll need to clean up this logic later.
417 418 if shell._reply_content is not None:
418 419 reply_content.update(shell._reply_content)
419 420 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
420 421 reply_content['engine_info'] = e_info
421 422 # reset after use
422 423 shell._reply_content = None
423 424
424 425 if 'traceback' in reply_content:
425 426 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
426 427
427 428
428 429 # At this point, we can tell whether the main code execution succeeded
429 430 # or not. If it did, we proceed to evaluate user_variables/expressions
430 431 if reply_content['status'] == 'ok':
431 432 reply_content[u'user_variables'] = \
432 433 shell.user_variables(content.get(u'user_variables', []))
433 434 reply_content[u'user_expressions'] = \
434 435 shell.user_expressions(content.get(u'user_expressions', {}))
435 436 else:
436 437 # If there was an error, don't even try to compute variables or
437 438 # expressions
438 439 reply_content[u'user_variables'] = {}
439 440 reply_content[u'user_expressions'] = {}
440 441
441 442 # Payloads should be retrieved regardless of outcome, so we can both
442 443 # recover partial output (that could have been generated early in a
443 444 # block, before an error) and clear the payload system always.
444 445 reply_content[u'payload'] = shell.payload_manager.read_payload()
445 446 # Be agressive about clearing the payload because we don't want
446 447 # it to sit in memory until the next execute_request comes in.
447 448 shell.payload_manager.clear_payload()
448 449
449 450 # Flush output before sending the reply.
450 451 sys.stdout.flush()
451 452 sys.stderr.flush()
452 453 # FIXME: on rare occasions, the flush doesn't seem to make it to the
453 454 # clients... This seems to mitigate the problem, but we definitely need
454 455 # to better understand what's going on.
455 456 if self._execute_sleep:
456 457 time.sleep(self._execute_sleep)
457 458
458 459 # Send the reply.
459 460 reply_content = json_clean(reply_content)
460 461
461 462 md['status'] = reply_content['status']
462 463 if reply_content['status'] == 'error' and \
463 464 reply_content['ename'] == 'UnmetDependency':
464 465 md['dependencies_met'] = False
465 466
466 467 reply_msg = self.session.send(stream, u'execute_reply',
467 468 reply_content, parent, metadata=md,
468 469 ident=ident)
469 470
470 471 self.log.debug("%s", reply_msg)
471 472
472 473 if not silent and reply_msg['content']['status'] == u'error':
473 474 self._abort_queues()
474 475
475 476 self._publish_status(u'idle', parent)
476 477
477 478 def complete_request(self, stream, ident, parent):
478 479 txt, matches = self._complete(parent)
479 480 matches = {'matches' : matches,
480 481 'matched_text' : txt,
481 482 'status' : 'ok'}
482 483 matches = json_clean(matches)
483 484 completion_msg = self.session.send(stream, 'complete_reply',
484 485 matches, parent, ident)
485 486 self.log.debug("%s", completion_msg)
486 487
487 488 def object_info_request(self, stream, ident, parent):
488 489 content = parent['content']
489 490 object_info = self.shell.object_inspect(content['oname'],
490 491 detail_level = content.get('detail_level', 0)
491 492 )
492 493 # Before we send this object over, we scrub it for JSON usage
493 494 oinfo = json_clean(object_info)
494 495 msg = self.session.send(stream, 'object_info_reply',
495 496 oinfo, parent, ident)
496 497 self.log.debug("%s", msg)
497 498
498 499 def history_request(self, stream, ident, parent):
499 500 # We need to pull these out, as passing **kwargs doesn't work with
500 501 # unicode keys before Python 2.6.5.
501 502 hist_access_type = parent['content']['hist_access_type']
502 503 raw = parent['content']['raw']
503 504 output = parent['content']['output']
504 505 if hist_access_type == 'tail':
505 506 n = parent['content']['n']
506 507 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
507 508 include_latest=True)
508 509
509 510 elif hist_access_type == 'range':
510 511 session = parent['content']['session']
511 512 start = parent['content']['start']
512 513 stop = parent['content']['stop']
513 514 hist = self.shell.history_manager.get_range(session, start, stop,
514 515 raw=raw, output=output)
515 516
516 517 elif hist_access_type == 'search':
517 518 n = parent['content'].get('n')
518 519 unique = parent['content'].get('unique', False)
519 520 pattern = parent['content']['pattern']
520 521 hist = self.shell.history_manager.search(
521 522 pattern, raw=raw, output=output, n=n, unique=unique)
522 523
523 524 else:
524 525 hist = []
525 526 hist = list(hist)
526 527 content = {'history' : hist}
527 528 content = json_clean(content)
528 529 msg = self.session.send(stream, 'history_reply',
529 530 content, parent, ident)
530 531 self.log.debug("Sending history reply with %i entries", len(hist))
531 532
532 533 def connect_request(self, stream, ident, parent):
533 534 if self._recorded_ports is not None:
534 535 content = self._recorded_ports.copy()
535 536 else:
536 537 content = {}
537 538 msg = self.session.send(stream, 'connect_reply',
538 539 content, parent, ident)
539 540 self.log.debug("%s", msg)
540 541
541 542 def kernel_info_request(self, stream, ident, parent):
542 543 vinfo = {
543 544 'protocol_version': protocol_version,
544 545 'ipython_version': ipython_version,
545 546 'language_version': language_version,
546 547 'language': 'python',
547 548 }
548 549 msg = self.session.send(stream, 'kernel_info_reply',
549 550 vinfo, parent, ident)
550 551 self.log.debug("%s", msg)
551 552
552 553 def shutdown_request(self, stream, ident, parent):
553 554 self.shell.exit_now = True
554 555 content = dict(status='ok')
555 556 content.update(parent['content'])
556 557 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
557 558 # same content, but different msg_id for broadcasting on IOPub
558 559 self._shutdown_message = self.session.msg(u'shutdown_reply',
559 560 content, parent
560 561 )
561 562
562 563 self._at_shutdown()
563 564 # call sys.exit after a short delay
564 565 loop = ioloop.IOLoop.instance()
565 566 loop.add_timeout(time.time()+0.1, loop.stop)
566 567
567 568 #---------------------------------------------------------------------------
568 569 # Engine methods
569 570 #---------------------------------------------------------------------------
570 571
571 572 def apply_request(self, stream, ident, parent):
572 573 try:
573 574 content = parent[u'content']
574 575 bufs = parent[u'buffers']
575 576 msg_id = parent['header']['msg_id']
576 577 except:
577 578 self.log.error("Got bad msg: %s", parent, exc_info=True)
578 579 return
579 580
580 581 self._publish_status(u'busy', parent)
581 582
582 583 # Set the parent message of the display hook and out streams.
583 584 shell = self.shell
584 585 shell.set_parent(parent)
585 586
586 587 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
587 588 # self.iopub_socket.send(pyin_msg)
588 589 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
589 590 md = self._make_metadata(parent['metadata'])
590 591 try:
591 592 working = shell.user_ns
592 593
593 594 prefix = "_"+str(msg_id).replace("-","")+"_"
594 595
595 596 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
596 597
597 598 fname = getattr(f, '__name__', 'f')
598 599
599 600 fname = prefix+"f"
600 601 argname = prefix+"args"
601 602 kwargname = prefix+"kwargs"
602 603 resultname = prefix+"result"
603 604
604 605 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
605 606 # print ns
606 607 working.update(ns)
607 608 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
608 609 try:
609 610 exec(code, shell.user_global_ns, shell.user_ns)
610 611 result = working.get(resultname)
611 612 finally:
612 613 for key in ns:
613 614 working.pop(key)
614 615
615 616 result_buf = serialize_object(result,
616 617 buffer_threshold=self.session.buffer_threshold,
617 618 item_threshold=self.session.item_threshold,
618 619 )
619 620
620 621 except:
621 622 # invoke IPython traceback formatting
622 623 shell.showtraceback()
623 624 # FIXME - fish exception info out of shell, possibly left there by
624 625 # run_code. We'll need to clean up this logic later.
625 626 reply_content = {}
626 627 if shell._reply_content is not None:
627 628 reply_content.update(shell._reply_content)
628 629 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
629 630 reply_content['engine_info'] = e_info
630 631 # reset after use
631 632 shell._reply_content = None
632 633
633 634 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
634 635 ident=self._topic('pyerr'))
635 636 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
636 637 result_buf = []
637 638
638 639 if reply_content['ename'] == 'UnmetDependency':
639 640 md['dependencies_met'] = False
640 641 else:
641 642 reply_content = {'status' : 'ok'}
642 643
643 644 # put 'ok'/'error' status in header, for scheduler introspection:
644 645 md['status'] = reply_content['status']
645 646
646 647 # flush i/o
647 648 sys.stdout.flush()
648 649 sys.stderr.flush()
649 650
650 651 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
651 652 parent=parent, ident=ident,buffers=result_buf, metadata=md)
652 653
653 654 self._publish_status(u'idle', parent)
654 655
655 656 #---------------------------------------------------------------------------
656 657 # Control messages
657 658 #---------------------------------------------------------------------------
658 659
659 660 def abort_request(self, stream, ident, parent):
660 661 """abort a specifig msg by id"""
661 662 msg_ids = parent['content'].get('msg_ids', None)
662 663 if isinstance(msg_ids, string_types):
663 664 msg_ids = [msg_ids]
664 665 if not msg_ids:
665 666 self.abort_queues()
666 667 for mid in msg_ids:
667 668 self.aborted.add(str(mid))
668 669
669 670 content = dict(status='ok')
670 671 reply_msg = self.session.send(stream, 'abort_reply', content=content,
671 672 parent=parent, ident=ident)
672 673 self.log.debug("%s", reply_msg)
673 674
674 675 def clear_request(self, stream, idents, parent):
675 676 """Clear our namespace."""
676 677 self.shell.reset(False)
677 678 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
678 679 content = dict(status='ok'))
679 680
680 681
681 682 #---------------------------------------------------------------------------
682 683 # Protected interface
683 684 #---------------------------------------------------------------------------
684 685
685 686 def _wrap_exception(self, method=None):
686 687 # import here, because _wrap_exception is only used in parallel,
687 688 # and parallel has higher min pyzmq version
688 689 from IPython.parallel.error import wrap_exception
689 690 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
690 691 content = wrap_exception(e_info)
691 692 return content
692 693
693 694 def _topic(self, topic):
694 695 """prefixed topic for IOPub messages"""
695 696 if self.int_id >= 0:
696 697 base = "engine.%i" % self.int_id
697 698 else:
698 699 base = "kernel.%s" % self.ident
699 700
700 701 return py3compat.cast_bytes("%s.%s" % (base, topic))
701 702
702 703 def _abort_queues(self):
703 704 for stream in self.shell_streams:
704 705 if stream:
705 706 self._abort_queue(stream)
706 707
707 708 def _abort_queue(self, stream):
708 709 poller = zmq.Poller()
709 710 poller.register(stream.socket, zmq.POLLIN)
710 711 while True:
711 712 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
712 713 if msg is None:
713 714 return
714 715
715 716 self.log.info("Aborting:")
716 717 self.log.info("%s", msg)
717 718 msg_type = msg['header']['msg_type']
718 719 reply_type = msg_type.split('_')[0] + '_reply'
719 720
720 721 status = {'status' : 'aborted'}
721 722 md = {'engine' : self.ident}
722 723 md.update(status)
723 724 reply_msg = self.session.send(stream, reply_type, metadata=md,
724 725 content=status, parent=msg, ident=idents)
725 726 self.log.debug("%s", reply_msg)
726 727 # We need to wait a bit for requests to come in. This can probably
727 728 # be set shorter for true asynchronous clients.
728 729 poller.poll(50)
729 730
730 731
731 732 def _no_raw_input(self):
732 733 """Raise StdinNotImplentedError if active frontend doesn't support
733 734 stdin."""
734 735 raise StdinNotImplementedError("raw_input was called, but this "
735 736 "frontend does not support stdin.")
736 737
737 738 def _raw_input(self, prompt, ident, parent):
738 739 # Flush output before making the request.
739 740 sys.stderr.flush()
740 741 sys.stdout.flush()
741 742 # flush the stdin socket, to purge stale replies
742 743 while True:
743 744 try:
744 745 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
745 746 except zmq.ZMQError as e:
746 747 if e.errno == zmq.EAGAIN:
747 748 break
748 749 else:
749 750 raise
750 751
751 752 # Send the input request.
752 753 content = json_clean(dict(prompt=prompt))
753 754 self.session.send(self.stdin_socket, u'input_request', content, parent,
754 755 ident=ident)
755 756
756 757 # Await a response.
757 758 while True:
758 759 try:
759 760 ident, reply = self.session.recv(self.stdin_socket, 0)
760 761 except Exception:
761 762 self.log.warn("Invalid Message:", exc_info=True)
762 763 except KeyboardInterrupt:
763 764 # re-raise KeyboardInterrupt, to truncate traceback
764 765 raise KeyboardInterrupt
765 766 else:
766 767 break
767 768 try:
768 769 value = py3compat.unicode_to_str(reply['content']['value'])
769 770 except:
770 771 self.log.error("Got bad raw_input reply: ")
771 772 self.log.error("%s", parent)
772 773 value = ''
773 774 if value == '\x04':
774 775 # EOF
775 776 raise EOFError
776 777 return value
777 778
778 779 def _complete(self, msg):
779 780 c = msg['content']
780 781 try:
781 782 cpos = int(c['cursor_pos'])
782 783 except:
783 784 # If we don't get something that we can convert to an integer, at
784 785 # least attempt the completion guessing the cursor is at the end of
785 786 # the text, if there's any, and otherwise of the line
786 787 cpos = len(c['text'])
787 788 if cpos==0:
788 789 cpos = len(c['line'])
789 790 return self.shell.complete(c['text'], c['line'], cpos)
790 791
791 792 def _at_shutdown(self):
792 793 """Actions taken at shutdown by the kernel, called by python's atexit.
793 794 """
794 795 # io.rprint("Kernel at_shutdown") # dbg
795 796 if self._shutdown_message is not None:
796 797 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
797 798 self.log.debug("%s", self._shutdown_message)
798 799 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
799 800
General Comments 0
You need to be logged in to leave comments. Login now