##// END OF EJS Templates
Merge pull request #6374 from dhirschfeld/about_queues...
Min RK -
r17746:13e42d67 merge
parent child Browse files
Show More
@@ -1,677 +1,676 b''
1 1 """Base class for a kernel that talks to frontends over 0MQ."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from __future__ import print_function
7 7
8 8 import sys
9 9 import time
10 10 import logging
11 11 import uuid
12 12
13 13 from datetime import datetime
14 14 from signal import (
15 15 signal, default_int_handler, SIGINT
16 16 )
17 17
18 18 import zmq
19 19 from zmq.eventloop import ioloop
20 20 from zmq.eventloop.zmqstream import ZMQStream
21 21
22 22 from IPython.config.configurable import Configurable
23 23 from IPython.core.error import StdinNotImplementedError
24 24 from IPython.core import release
25 25 from IPython.utils import py3compat
26 26 from IPython.utils.py3compat import unicode_type, string_types
27 27 from IPython.utils.jsonutil import json_clean
28 28 from IPython.utils.traitlets import (
29 29 Any, Instance, Float, Dict, List, Set, Integer, Unicode, Bool,
30 30 )
31 31
32 32 from .session import Session
33 33
34 34
35 35 class Kernel(Configurable):
36 36
37 37 #---------------------------------------------------------------------------
38 38 # Kernel interface
39 39 #---------------------------------------------------------------------------
40 40
41 41 # attribute to override with a GUI
42 42 eventloop = Any(None)
43 43 def _eventloop_changed(self, name, old, new):
44 44 """schedule call to eventloop from IOLoop"""
45 45 loop = ioloop.IOLoop.instance()
46 46 loop.add_callback(self.enter_eventloop)
47 47
48 48 session = Instance(Session)
49 49 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
50 50 shell_streams = List()
51 51 control_stream = Instance(ZMQStream)
52 52 iopub_socket = Instance(zmq.Socket)
53 53 stdin_socket = Instance(zmq.Socket)
54 54 log = Instance(logging.Logger)
55 55
56 56 # identities:
57 57 int_id = Integer(-1)
58 58 ident = Unicode()
59 59
60 60 def _ident_default(self):
61 61 return unicode_type(uuid.uuid4())
62 62
63 63 # Private interface
64 64
65 65 _darwin_app_nap = Bool(True, config=True,
66 66 help="""Whether to use appnope for compatiblity with OS X App Nap.
67 67
68 68 Only affects OS X >= 10.9.
69 69 """
70 70 )
71 71
72 72 # track associations with current request
73 73 _allow_stdin = Bool(False)
74 74 _parent_header = Dict()
75 75 _parent_ident = Any(b'')
76 76 # Time to sleep after flushing the stdout/err buffers in each execute
77 77 # cycle. While this introduces a hard limit on the minimal latency of the
78 78 # execute cycle, it helps prevent output synchronization problems for
79 79 # clients.
80 80 # Units are in seconds. The minimum zmq latency on local host is probably
81 81 # ~150 microseconds, set this to 500us for now. We may need to increase it
82 82 # a little if it's not enough after more interactive testing.
83 83 _execute_sleep = Float(0.0005, config=True)
84 84
85 85 # Frequency of the kernel's event loop.
86 86 # Units are in seconds, kernel subclasses for GUI toolkits may need to
87 87 # adapt to milliseconds.
88 88 _poll_interval = Float(0.05, config=True)
89 89
90 90 # If the shutdown was requested over the network, we leave here the
91 91 # necessary reply message so it can be sent by our registered atexit
92 92 # handler. This ensures that the reply is only sent to clients truly at
93 93 # the end of our shutdown process (which happens after the underlying
94 94 # IPython shell's own shutdown).
95 95 _shutdown_message = None
96 96
97 97 # This is a dict of port number that the kernel is listening on. It is set
98 98 # by record_ports and used by connect_request.
99 99 _recorded_ports = Dict()
100 100
101 101 # set of aborted msg_ids
102 102 aborted = Set()
103 103
104 104 # Track execution count here. For IPython, we override this to use the
105 105 # execution count we store in the shell.
106 106 execution_count = 0
107 107
108 108
109 109 def __init__(self, **kwargs):
110 110 super(Kernel, self).__init__(**kwargs)
111 111
112 112 # Build dict of handlers for message types
113 113 msg_types = [ 'execute_request', 'complete_request',
114 114 'inspect_request', 'history_request',
115 115 'kernel_info_request',
116 116 'connect_request', 'shutdown_request',
117 117 'apply_request',
118 118 ]
119 119 self.shell_handlers = {}
120 120 for msg_type in msg_types:
121 121 self.shell_handlers[msg_type] = getattr(self, msg_type)
122 122
123 123 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
124 124 self.control_handlers = {}
125 125 for msg_type in control_msg_types:
126 126 self.control_handlers[msg_type] = getattr(self, msg_type)
127 127
128 128
129 129 def dispatch_control(self, msg):
130 130 """dispatch control requests"""
131 131 idents,msg = self.session.feed_identities(msg, copy=False)
132 132 try:
133 133 msg = self.session.unserialize(msg, content=True, copy=False)
134 134 except:
135 135 self.log.error("Invalid Control Message", exc_info=True)
136 136 return
137 137
138 138 self.log.debug("Control received: %s", msg)
139 139
140 140 # Set the parent message for side effects.
141 141 self.set_parent(idents, msg)
142 142 self._publish_status(u'busy')
143 143
144 144 header = msg['header']
145 145 msg_type = header['msg_type']
146 146
147 147 handler = self.control_handlers.get(msg_type, None)
148 148 if handler is None:
149 149 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
150 150 else:
151 151 try:
152 152 handler(self.control_stream, idents, msg)
153 153 except Exception:
154 154 self.log.error("Exception in control handler:", exc_info=True)
155 155
156 156 sys.stdout.flush()
157 157 sys.stderr.flush()
158 158 self._publish_status(u'idle')
159 159
160 160 def dispatch_shell(self, stream, msg):
161 161 """dispatch shell requests"""
162 162 # flush control requests first
163 163 if self.control_stream:
164 164 self.control_stream.flush()
165 165
166 166 idents,msg = self.session.feed_identities(msg, copy=False)
167 167 try:
168 168 msg = self.session.unserialize(msg, content=True, copy=False)
169 169 except:
170 170 self.log.error("Invalid Message", exc_info=True)
171 171 return
172 172
173 173 # Set the parent message for side effects.
174 174 self.set_parent(idents, msg)
175 175 self._publish_status(u'busy')
176 176
177 177 header = msg['header']
178 178 msg_id = header['msg_id']
179 179 msg_type = msg['header']['msg_type']
180 180
181 181 # Print some info about this message and leave a '--->' marker, so it's
182 182 # easier to trace visually the message chain when debugging. Each
183 183 # handler prints its message at the end.
184 184 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
185 185 self.log.debug(' Content: %s\n --->\n ', msg['content'])
186 186
187 187 if msg_id in self.aborted:
188 188 self.aborted.remove(msg_id)
189 189 # is it safe to assume a msg_id will not be resubmitted?
190 190 reply_type = msg_type.split('_')[0] + '_reply'
191 191 status = {'status' : 'aborted'}
192 192 md = {'engine' : self.ident}
193 193 md.update(status)
194 194 self.session.send(stream, reply_type, metadata=md,
195 195 content=status, parent=msg, ident=idents)
196 196 return
197 197
198 198 handler = self.shell_handlers.get(msg_type, None)
199 199 if handler is None:
200 200 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
201 201 else:
202 202 # ensure default_int_handler during handler call
203 203 sig = signal(SIGINT, default_int_handler)
204 204 self.log.debug("%s: %s", msg_type, msg)
205 205 try:
206 206 handler(stream, idents, msg)
207 207 except Exception:
208 208 self.log.error("Exception in message handler:", exc_info=True)
209 209 finally:
210 210 signal(SIGINT, sig)
211 211
212 212 sys.stdout.flush()
213 213 sys.stderr.flush()
214 214 self._publish_status(u'idle')
215 215
216 216 def enter_eventloop(self):
217 217 """enter eventloop"""
218 218 self.log.info("entering eventloop %s", self.eventloop)
219 219 for stream in self.shell_streams:
220 220 # flush any pending replies,
221 221 # which may be skipped by entering the eventloop
222 222 stream.flush(zmq.POLLOUT)
223 223 # restore default_int_handler
224 224 signal(SIGINT, default_int_handler)
225 225 while self.eventloop is not None:
226 226 try:
227 227 self.eventloop(self)
228 228 except KeyboardInterrupt:
229 229 # Ctrl-C shouldn't crash the kernel
230 230 self.log.error("KeyboardInterrupt caught in kernel")
231 231 continue
232 232 else:
233 233 # eventloop exited cleanly, this means we should stop (right?)
234 234 self.eventloop = None
235 235 break
236 236 self.log.info("exiting eventloop")
237 237
238 238 def start(self):
239 239 """register dispatchers for streams"""
240 240 if self.control_stream:
241 241 self.control_stream.on_recv(self.dispatch_control, copy=False)
242 242
243 243 def make_dispatcher(stream):
244 244 def dispatcher(msg):
245 245 return self.dispatch_shell(stream, msg)
246 246 return dispatcher
247 247
248 248 for s in self.shell_streams:
249 249 s.on_recv(make_dispatcher(s), copy=False)
250 250
251 251 # publish idle status
252 252 self._publish_status('starting')
253 253
254 254 def do_one_iteration(self):
255 255 """step eventloop just once"""
256 256 if self.control_stream:
257 257 self.control_stream.flush()
258 258 for stream in self.shell_streams:
259 259 # handle at most one request per iteration
260 260 stream.flush(zmq.POLLIN, 1)
261 261 stream.flush(zmq.POLLOUT)
262 262
263 263
264 264 def record_ports(self, ports):
265 265 """Record the ports that this kernel is using.
266 266
267 267 The creator of the Kernel instance must call this methods if they
268 268 want the :meth:`connect_request` method to return the port numbers.
269 269 """
270 270 self._recorded_ports = ports
271 271
272 272 #---------------------------------------------------------------------------
273 273 # Kernel request handlers
274 274 #---------------------------------------------------------------------------
275 275
276 276 def _make_metadata(self, other=None):
277 277 """init metadata dict, for execute/apply_reply"""
278 278 new_md = {
279 279 'dependencies_met' : True,
280 280 'engine' : self.ident,
281 281 'started': datetime.now(),
282 282 }
283 283 if other:
284 284 new_md.update(other)
285 285 return new_md
286 286
287 287 def _publish_execute_input(self, code, parent, execution_count):
288 288 """Publish the code request on the iopub stream."""
289 289
290 290 self.session.send(self.iopub_socket, u'execute_input',
291 291 {u'code':code, u'execution_count': execution_count},
292 292 parent=parent, ident=self._topic('execute_input')
293 293 )
294 294
295 295 def _publish_status(self, status, parent=None):
296 296 """send status (busy/idle) on IOPub"""
297 297 self.session.send(self.iopub_socket,
298 298 u'status',
299 299 {u'execution_state': status},
300 300 parent=parent or self._parent_header,
301 301 ident=self._topic('status'),
302 302 )
303 303
304 304 def set_parent(self, ident, parent):
305 305 """Set the current parent_header
306 306
307 307 Side effects (IOPub messages) and replies are associated with
308 308 the request that caused them via the parent_header.
309 309
310 310 The parent identity is used to route input_request messages
311 311 on the stdin channel.
312 312 """
313 313 self._parent_ident = ident
314 314 self._parent_header = parent
315 315
316 316 def send_response(self, stream, msg_or_type, content=None, ident=None,
317 317 buffers=None, track=False, header=None, metadata=None):
318 318 """Send a response to the message we're currently processing.
319 319
320 320 This accepts all the parameters of :meth:`IPython.kernel.zmq.session.Session.send`
321 321 except ``parent``.
322 322
323 323 This relies on :meth:`set_parent` having been called for the current
324 324 message.
325 325 """
326 326 return self.session.send(stream, msg_or_type, content, self._parent_header,
327 327 ident, buffers, track, header, metadata)
328 328
329 329 def execute_request(self, stream, ident, parent):
330 330 """handle an execute_request"""
331 331
332 332 try:
333 333 content = parent[u'content']
334 334 code = py3compat.cast_unicode_py2(content[u'code'])
335 335 silent = content[u'silent']
336 336 store_history = content.get(u'store_history', not silent)
337 337 user_expressions = content.get('user_expressions', {})
338 338 allow_stdin = content.get('allow_stdin', False)
339 339 except:
340 340 self.log.error("Got bad msg: ")
341 341 self.log.error("%s", parent)
342 342 return
343 343
344 344 md = self._make_metadata(parent['metadata'])
345 345
346 346 # Re-broadcast our input for the benefit of listening clients, and
347 347 # start computing output
348 348 if not silent:
349 349 self.execution_count += 1
350 350 self._publish_execute_input(code, parent, self.execution_count)
351 351
352 352 reply_content = self.do_execute(code, silent, store_history,
353 353 user_expressions, allow_stdin)
354 354
355 355 # Flush output before sending the reply.
356 356 sys.stdout.flush()
357 357 sys.stderr.flush()
358 358 # FIXME: on rare occasions, the flush doesn't seem to make it to the
359 359 # clients... This seems to mitigate the problem, but we definitely need
360 360 # to better understand what's going on.
361 361 if self._execute_sleep:
362 362 time.sleep(self._execute_sleep)
363 363
364 364 # Send the reply.
365 365 reply_content = json_clean(reply_content)
366 366
367 367 md['status'] = reply_content['status']
368 368 if reply_content['status'] == 'error' and \
369 369 reply_content['ename'] == 'UnmetDependency':
370 370 md['dependencies_met'] = False
371 371
372 372 reply_msg = self.session.send(stream, u'execute_reply',
373 373 reply_content, parent, metadata=md,
374 374 ident=ident)
375 375
376 376 self.log.debug("%s", reply_msg)
377 377
378 378 if not silent and reply_msg['content']['status'] == u'error':
379 379 self._abort_queues()
380 380
381 381 def do_execute(self, code, silent, store_history=True,
382 382 user_experssions=None, allow_stdin=False):
383 383 """Execute user code. Must be overridden by subclasses.
384 384 """
385 385 raise NotImplementedError
386 386
387 387 def complete_request(self, stream, ident, parent):
388 388 content = parent['content']
389 389 code = content['code']
390 390 cursor_pos = content['cursor_pos']
391 391
392 392 matches = self.do_complete(code, cursor_pos)
393 393 matches = json_clean(matches)
394 394 completion_msg = self.session.send(stream, 'complete_reply',
395 395 matches, parent, ident)
396 396 self.log.debug("%s", completion_msg)
397 397
398 398 def do_complete(self, code, cursor_pos):
399 399 """Override in subclasses to find completions.
400 400 """
401 401 return {'matches' : [],
402 402 'cursor_end' : cursor_pos,
403 403 'cursor_start' : cursor_pos,
404 404 'metadata' : {},
405 405 'status' : 'ok'}
406 406
407 407 def inspect_request(self, stream, ident, parent):
408 408 content = parent['content']
409 409
410 410 reply_content = self.do_inspect(content['code'], content['cursor_pos'],
411 411 content.get('detail_level', 0))
412 412 # Before we send this object over, we scrub it for JSON usage
413 413 reply_content = json_clean(reply_content)
414 414 msg = self.session.send(stream, 'inspect_reply',
415 415 reply_content, parent, ident)
416 416 self.log.debug("%s", msg)
417 417
418 418 def do_inspect(self, code, cursor_pos, detail_level=0):
419 419 """Override in subclasses to allow introspection.
420 420 """
421 421 return {'status': 'ok', 'data':{}, 'metadata':{}, 'found':False}
422 422
423 423 def history_request(self, stream, ident, parent):
424 424 content = parent['content']
425 425
426 426 reply_content = self.do_history(**content)
427 427
428 428 reply_content = json_clean(reply_content)
429 429 msg = self.session.send(stream, 'history_reply',
430 430 reply_content, parent, ident)
431 431 self.log.debug("%s", msg)
432 432
433 433 def do_history(self, hist_access_type, output, raw, session=None, start=None,
434 434 stop=None, n=None, pattern=None, unique=False):
435 435 """Override in subclasses to access history.
436 436 """
437 437 return {'history': []}
438 438
439 439 def connect_request(self, stream, ident, parent):
440 440 if self._recorded_ports is not None:
441 441 content = self._recorded_ports.copy()
442 442 else:
443 443 content = {}
444 444 msg = self.session.send(stream, 'connect_reply',
445 445 content, parent, ident)
446 446 self.log.debug("%s", msg)
447 447
448 448 @property
449 449 def kernel_info(self):
450 450 return {
451 451 'protocol_version': release.kernel_protocol_version,
452 452 'implementation': self.implementation,
453 453 'implementation_version': self.implementation_version,
454 454 'language': self.language,
455 455 'language_version': self.language_version,
456 456 'banner': self.banner,
457 457 }
458 458
459 459 def kernel_info_request(self, stream, ident, parent):
460 460 msg = self.session.send(stream, 'kernel_info_reply',
461 461 self.kernel_info, parent, ident)
462 462 self.log.debug("%s", msg)
463 463
464 464 def shutdown_request(self, stream, ident, parent):
465 465 content = self.do_shutdown(parent['content']['restart'])
466 466 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
467 467 # same content, but different msg_id for broadcasting on IOPub
468 468 self._shutdown_message = self.session.msg(u'shutdown_reply',
469 469 content, parent
470 470 )
471 471
472 472 self._at_shutdown()
473 473 # call sys.exit after a short delay
474 474 loop = ioloop.IOLoop.instance()
475 475 loop.add_timeout(time.time()+0.1, loop.stop)
476 476
477 477 def do_shutdown(self, restart):
478 478 """Override in subclasses to do things when the frontend shuts down the
479 479 kernel.
480 480 """
481 481 return {'status': 'ok', 'restart': restart}
482 482
483 483 #---------------------------------------------------------------------------
484 484 # Engine methods
485 485 #---------------------------------------------------------------------------
486 486
487 487 def apply_request(self, stream, ident, parent):
488 488 try:
489 489 content = parent[u'content']
490 490 bufs = parent[u'buffers']
491 491 msg_id = parent['header']['msg_id']
492 492 except:
493 493 self.log.error("Got bad msg: %s", parent, exc_info=True)
494 494 return
495 495
496 496 md = self._make_metadata(parent['metadata'])
497 497
498 498 reply_content, result_buf = self.do_apply(content, bufs, msg_id, md)
499 499
500 500 # put 'ok'/'error' status in header, for scheduler introspection:
501 501 md['status'] = reply_content['status']
502 502
503 503 # flush i/o
504 504 sys.stdout.flush()
505 505 sys.stderr.flush()
506 506
507 507 self.session.send(stream, u'apply_reply', reply_content,
508 508 parent=parent, ident=ident,buffers=result_buf, metadata=md)
509 509
510 510 def do_apply(self, content, bufs, msg_id, reply_metadata):
511 511 """Override in subclasses to support the IPython parallel framework.
512 512 """
513 513 raise NotImplementedError
514 514
515 515 #---------------------------------------------------------------------------
516 516 # Control messages
517 517 #---------------------------------------------------------------------------
518 518
519 519 def abort_request(self, stream, ident, parent):
520 """abort a specifig msg by id"""
520 """abort a specific msg by id"""
521 521 msg_ids = parent['content'].get('msg_ids', None)
522 522 if isinstance(msg_ids, string_types):
523 523 msg_ids = [msg_ids]
524 524 if not msg_ids:
525 self.abort_queues()
525 self._abort_queues()
526 526 for mid in msg_ids:
527 527 self.aborted.add(str(mid))
528 528
529 529 content = dict(status='ok')
530 530 reply_msg = self.session.send(stream, 'abort_reply', content=content,
531 531 parent=parent, ident=ident)
532 532 self.log.debug("%s", reply_msg)
533 533
534 534 def clear_request(self, stream, idents, parent):
535 535 """Clear our namespace."""
536 536 content = self.do_clear()
537 537 self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
538 538 content = content)
539 539
540 540 def do_clear(self):
541 541 """Override in subclasses to clear the namespace
542 542
543 543 This is only required for IPython.parallel.
544 544 """
545 545 raise NotImplementedError
546 546
547 547 #---------------------------------------------------------------------------
548 548 # Protected interface
549 549 #---------------------------------------------------------------------------
550 550
551 551 def _topic(self, topic):
552 552 """prefixed topic for IOPub messages"""
553 553 if self.int_id >= 0:
554 554 base = "engine.%i" % self.int_id
555 555 else:
556 556 base = "kernel.%s" % self.ident
557 557
558 558 return py3compat.cast_bytes("%s.%s" % (base, topic))
559 559
560 560 def _abort_queues(self):
561 561 for stream in self.shell_streams:
562 562 if stream:
563 563 self._abort_queue(stream)
564 564
565 565 def _abort_queue(self, stream):
566 566 poller = zmq.Poller()
567 567 poller.register(stream.socket, zmq.POLLIN)
568 568 while True:
569 569 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
570 570 if msg is None:
571 571 return
572 572
573 573 self.log.info("Aborting:")
574 574 self.log.info("%s", msg)
575 575 msg_type = msg['header']['msg_type']
576 576 reply_type = msg_type.split('_')[0] + '_reply'
577 577
578 578 status = {'status' : 'aborted'}
579 579 md = {'engine' : self.ident}
580 580 md.update(status)
581 581 reply_msg = self.session.send(stream, reply_type, metadata=md,
582 582 content=status, parent=msg, ident=idents)
583 583 self.log.debug("%s", reply_msg)
584 584 # We need to wait a bit for requests to come in. This can probably
585 585 # be set shorter for true asynchronous clients.
586 586 poller.poll(50)
587 587
588 588
589 589 def _no_raw_input(self):
590 590 """Raise StdinNotImplentedError if active frontend doesn't support
591 591 stdin."""
592 592 raise StdinNotImplementedError("raw_input was called, but this "
593 593 "frontend does not support stdin.")
594 594
595 595 def getpass(self, prompt=''):
596 596 """Forward getpass to frontends
597 597
598 598 Raises
599 599 ------
600 600 StdinNotImplentedError if active frontend doesn't support stdin.
601 601 """
602 602 if not self._allow_stdin:
603 603 raise StdinNotImplementedError(
604 604 "getpass was called, but this frontend does not support input requests."
605 605 )
606 606 return self._input_request(prompt,
607 607 self._parent_ident,
608 608 self._parent_header,
609 609 password=True,
610 610 )
611 611
612 612 def raw_input(self, prompt=''):
613 613 """Forward raw_input to frontends
614 614
615 615 Raises
616 616 ------
617 617 StdinNotImplentedError if active frontend doesn't support stdin.
618 618 """
619 619 if not self._allow_stdin:
620 620 raise StdinNotImplementedError(
621 621 "raw_input was called, but this frontend does not support input requests."
622 622 )
623 623 return self._input_request(prompt,
624 624 self._parent_ident,
625 625 self._parent_header,
626 626 password=False,
627 627 )
628 628
629 629 def _input_request(self, prompt, ident, parent, password=False):
630 630 # Flush output before making the request.
631 631 sys.stderr.flush()
632 632 sys.stdout.flush()
633 633 # flush the stdin socket, to purge stale replies
634 634 while True:
635 635 try:
636 636 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
637 637 except zmq.ZMQError as e:
638 638 if e.errno == zmq.EAGAIN:
639 639 break
640 640 else:
641 641 raise
642 642
643 643 # Send the input request.
644 644 content = json_clean(dict(prompt=prompt, password=password))
645 645 self.session.send(self.stdin_socket, u'input_request', content, parent,
646 646 ident=ident)
647 647
648 648 # Await a response.
649 649 while True:
650 650 try:
651 651 ident, reply = self.session.recv(self.stdin_socket, 0)
652 652 except Exception:
653 653 self.log.warn("Invalid Message:", exc_info=True)
654 654 except KeyboardInterrupt:
655 655 # re-raise KeyboardInterrupt, to truncate traceback
656 656 raise KeyboardInterrupt
657 657 else:
658 658 break
659 659 try:
660 660 value = py3compat.unicode_to_str(reply['content']['value'])
661 661 except:
662 662 self.log.error("Bad input_reply: %s", parent)
663 663 value = ''
664 664 if value == '\x04':
665 665 # EOF
666 666 raise EOFError
667 667 return value
668 668
669 669 def _at_shutdown(self):
670 670 """Actions taken at shutdown by the kernel, called by python's atexit.
671 671 """
672 672 # io.rprint("Kernel at_shutdown") # dbg
673 673 if self._shutdown_message is not None:
674 674 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
675 675 self.log.debug("%s", self._shutdown_message)
676 676 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
677
General Comments 0
You need to be logged in to leave comments. Login now