##// END OF EJS Templates
Don't die if stderr/stdout do not support set_parent() #2925...
y-p -
Show More
@@ -1,780 +1,792 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 7 call set_parent on all the PUB objects with the message about to be executed.
8 8 * Implement random port and security key logic.
9 9 * Implement control messages.
10 10 * Implement event loop and poll version.
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 # Standard library imports
19 19 import __builtin__
20 20 import sys
21 21 import time
22 22 import traceback
23 23 import logging
24 24 import uuid
25 25
26 26 from datetime import datetime
27 27 from signal import (
28 28 signal, getsignal, default_int_handler, SIGINT, SIG_IGN
29 29 )
30 30
31 31 # System library imports
32 32 import zmq
33 33 from zmq.eventloop import ioloop
34 34 from zmq.eventloop.zmqstream import ZMQStream
35 35
36 36 # Local imports
37 37 from IPython.config.configurable import Configurable
38 38 from IPython.core.error import StdinNotImplementedError
39 39 from IPython.core import release
40 40 from IPython.utils import io
41 41 from IPython.utils import py3compat
42 42 from IPython.utils.jsonutil import json_clean
43 43 from IPython.utils.traitlets import (
44 44 Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode,
45 45 Type
46 46 )
47 47
48 48 from serialize import serialize_object, unpack_apply_message
49 49 from session import Session
50 50 from zmqshell import ZMQInteractiveShell
51 51
52 52
53 53 #-----------------------------------------------------------------------------
54 54 # Main kernel class
55 55 #-----------------------------------------------------------------------------
56 56
57 57 protocol_version = list(release.kernel_protocol_version_info)
58 58 ipython_version = list(release.version_info)
59 59 language_version = list(sys.version_info[:3])
60 60
61 61
62 62 class Kernel(Configurable):
63 63
64 64 #---------------------------------------------------------------------------
65 65 # Kernel interface
66 66 #---------------------------------------------------------------------------
67 67
68 68 # attribute to override with a GUI
69 69 eventloop = Any(None)
70 70 def _eventloop_changed(self, name, old, new):
71 71 """schedule call to eventloop from IOLoop"""
72 72 loop = ioloop.IOLoop.instance()
73 73 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
74 74
75 75 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
76 76 shell_class = Type(ZMQInteractiveShell)
77 77
78 78 session = Instance(Session)
79 79 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
80 80 shell_streams = List()
81 81 control_stream = Instance(ZMQStream)
82 82 iopub_socket = Instance(zmq.Socket)
83 83 stdin_socket = Instance(zmq.Socket)
84 84 log = Instance(logging.Logger)
85 85
86 86 user_module = Any()
87 87 def _user_module_changed(self, name, old, new):
88 88 if self.shell is not None:
89 89 self.shell.user_module = new
90 90
91 91 user_ns = Dict(default_value=None)
92 92 def _user_ns_changed(self, name, old, new):
93 93 if self.shell is not None:
94 94 self.shell.user_ns = new
95 95 self.shell.init_user_ns()
96 96
97 97 # identities:
98 98 int_id = Integer(-1)
99 99 ident = Unicode()
100 100
101 101 def _ident_default(self):
102 102 return unicode(uuid.uuid4())
103 103
104 104
105 105 # Private interface
106 106
107 107 # Time to sleep after flushing the stdout/err buffers in each execute
108 108 # cycle. While this introduces a hard limit on the minimal latency of the
109 109 # execute cycle, it helps prevent output synchronization problems for
110 110 # clients.
111 111 # Units are in seconds. The minimum zmq latency on local host is probably
112 112 # ~150 microseconds, set this to 500us for now. We may need to increase it
113 113 # a little if it's not enough after more interactive testing.
114 114 _execute_sleep = Float(0.0005, config=True)
115 115
116 116 # Frequency of the kernel's event loop.
117 117 # Units are in seconds, kernel subclasses for GUI toolkits may need to
118 118 # adapt to milliseconds.
119 119 _poll_interval = Float(0.05, config=True)
120 120
121 121 # If the shutdown was requested over the network, we leave here the
122 122 # necessary reply message so it can be sent by our registered atexit
123 123 # handler. This ensures that the reply is only sent to clients truly at
124 124 # the end of our shutdown process (which happens after the underlying
125 125 # IPython shell's own shutdown).
126 126 _shutdown_message = None
127 127
128 128 # This is a dict of port number that the kernel is listening on. It is set
129 129 # by record_ports and used by connect_request.
130 130 _recorded_ports = Dict()
131 131
132 132 # A reference to the Python builtin 'raw_input' function.
133 133 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
134 134 _sys_raw_input = Any()
135 135
136 136 # set of aborted msg_ids
137 137 aborted = Set()
138 138
139 139
140 140 def __init__(self, **kwargs):
141 141 super(Kernel, self).__init__(**kwargs)
142 142
143 143 # Initialize the InteractiveShell subclass
144 144 self.shell = self.shell_class.instance(config=self.config,
145 145 profile_dir = self.profile_dir,
146 146 user_module = self.user_module,
147 147 user_ns = self.user_ns,
148 148 )
149 149 self.shell.displayhook.session = self.session
150 150 self.shell.displayhook.pub_socket = self.iopub_socket
151 151 self.shell.displayhook.topic = self._topic('pyout')
152 152 self.shell.display_pub.session = self.session
153 153 self.shell.display_pub.pub_socket = self.iopub_socket
154 154 self.shell.data_pub.session = self.session
155 155 self.shell.data_pub.pub_socket = self.iopub_socket
156 156
157 157 # TMP - hack while developing
158 158 self.shell._reply_content = None
159 159
160 160 # Build dict of handlers for message types
161 161 msg_types = [ 'execute_request', 'complete_request',
162 162 'object_info_request', 'history_request',
163 163 'kernel_info_request',
164 164 'connect_request', 'shutdown_request',
165 165 'apply_request',
166 166 ]
167 167 self.shell_handlers = {}
168 168 for msg_type in msg_types:
169 169 self.shell_handlers[msg_type] = getattr(self, msg_type)
170 170
171 171 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
172 172 self.control_handlers = {}
173 173 for msg_type in control_msg_types:
174 174 self.control_handlers[msg_type] = getattr(self, msg_type)
175 175
176 176 def dispatch_control(self, msg):
177 177 """dispatch control requests"""
178 178 idents,msg = self.session.feed_identities(msg, copy=False)
179 179 try:
180 180 msg = self.session.unserialize(msg, content=True, copy=False)
181 181 except:
182 182 self.log.error("Invalid Control Message", exc_info=True)
183 183 return
184 184
185 185 self.log.debug("Control received: %s", msg)
186 186
187 187 header = msg['header']
188 188 msg_id = header['msg_id']
189 189 msg_type = header['msg_type']
190 190
191 191 handler = self.control_handlers.get(msg_type, None)
192 192 if handler is None:
193 193 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
194 194 else:
195 195 try:
196 196 handler(self.control_stream, idents, msg)
197 197 except Exception:
198 198 self.log.error("Exception in control handler:", exc_info=True)
199 199
200 200 def dispatch_shell(self, stream, msg):
201 201 """dispatch shell requests"""
202 202 # flush control requests first
203 203 if self.control_stream:
204 204 self.control_stream.flush()
205 205
206 206 idents,msg = self.session.feed_identities(msg, copy=False)
207 207 try:
208 208 msg = self.session.unserialize(msg, content=True, copy=False)
209 209 except:
210 210 self.log.error("Invalid Message", exc_info=True)
211 211 return
212 212
213 213 header = msg['header']
214 214 msg_id = header['msg_id']
215 215 msg_type = msg['header']['msg_type']
216 216
217 217 # Print some info about this message and leave a '--->' marker, so it's
218 218 # easier to trace visually the message chain when debugging. Each
219 219 # handler prints its message at the end.
220 220 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
221 221 self.log.debug(' Content: %s\n --->\n ', msg['content'])
222 222
223 223 if msg_id in self.aborted:
224 224 self.aborted.remove(msg_id)
225 225 # is it safe to assume a msg_id will not be resubmitted?
226 226 reply_type = msg_type.split('_')[0] + '_reply'
227 227 status = {'status' : 'aborted'}
228 228 md = {'engine' : self.ident}
229 229 md.update(status)
230 230 reply_msg = self.session.send(stream, reply_type, metadata=md,
231 231 content=status, parent=msg, ident=idents)
232 232 return
233 233
234 234 handler = self.shell_handlers.get(msg_type, None)
235 235 if handler is None:
236 236 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
237 237 else:
238 238 # ensure default_int_handler during handler call
239 239 sig = signal(SIGINT, default_int_handler)
240 240 try:
241 241 handler(stream, idents, msg)
242 242 except Exception:
243 243 self.log.error("Exception in message handler:", exc_info=True)
244 244 finally:
245 245 signal(SIGINT, sig)
246 246
247 247 def enter_eventloop(self):
248 248 """enter eventloop"""
249 249 self.log.info("entering eventloop")
250 250 # restore default_int_handler
251 251 signal(SIGINT, default_int_handler)
252 252 while self.eventloop is not None:
253 253 try:
254 254 self.eventloop(self)
255 255 except KeyboardInterrupt:
256 256 # Ctrl-C shouldn't crash the kernel
257 257 self.log.error("KeyboardInterrupt caught in kernel")
258 258 continue
259 259 else:
260 260 # eventloop exited cleanly, this means we should stop (right?)
261 261 self.eventloop = None
262 262 break
263 263 self.log.info("exiting eventloop")
264 264
265 265 def start(self):
266 266 """register dispatchers for streams"""
267 267 self.shell.exit_now = False
268 268 if self.control_stream:
269 269 self.control_stream.on_recv(self.dispatch_control, copy=False)
270 270
271 271 def make_dispatcher(stream):
272 272 def dispatcher(msg):
273 273 return self.dispatch_shell(stream, msg)
274 274 return dispatcher
275 275
276 276 for s in self.shell_streams:
277 277 s.on_recv(make_dispatcher(s), copy=False)
278 278
279 279 def do_one_iteration(self):
280 280 """step eventloop just once"""
281 281 if self.control_stream:
282 282 self.control_stream.flush()
283 283 for stream in self.shell_streams:
284 284 # handle at most one request per iteration
285 285 stream.flush(zmq.POLLIN, 1)
286 286 stream.flush(zmq.POLLOUT)
287 287
288 288
289 289 def record_ports(self, ports):
290 290 """Record the ports that this kernel is using.
291 291
292 292 The creator of the Kernel instance must call this methods if they
293 293 want the :meth:`connect_request` method to return the port numbers.
294 294 """
295 295 self._recorded_ports = ports
296 296
297 297 #---------------------------------------------------------------------------
298 298 # Kernel request handlers
299 299 #---------------------------------------------------------------------------
300 300
301 301 def _make_metadata(self, other=None):
302 302 """init metadata dict, for execute/apply_reply"""
303 303 new_md = {
304 304 'dependencies_met' : True,
305 305 'engine' : self.ident,
306 306 'started': datetime.now(),
307 307 }
308 308 if other:
309 309 new_md.update(other)
310 310 return new_md
311 311
312 312 def _publish_pyin(self, code, parent, execution_count):
313 313 """Publish the code request on the pyin stream."""
314 314
315 315 self.session.send(self.iopub_socket, u'pyin',
316 316 {u'code':code, u'execution_count': execution_count},
317 317 parent=parent, ident=self._topic('pyin')
318 318 )
319 319
320 320 def _publish_status(self, status, parent=None):
321 321 """send status (busy/idle) on IOPub"""
322 322 self.session.send(self.iopub_socket,
323 323 u'status',
324 324 {u'execution_state': status},
325 325 parent=parent,
326 326 ident=self._topic('status'),
327 327 )
328 328
329 329
330 330 def execute_request(self, stream, ident, parent):
331 331 """handle an execute_request"""
332 332
333 333 self._publish_status(u'busy', parent)
334 334
335 335 try:
336 336 content = parent[u'content']
337 337 code = content[u'code']
338 338 silent = content[u'silent']
339 339 store_history = content.get(u'store_history', not silent)
340 340 except:
341 341 self.log.error("Got bad msg: ")
342 342 self.log.error("%s", parent)
343 343 return
344 344
345 345 md = self._make_metadata(parent['metadata'])
346 346
347 347 shell = self.shell # we'll need this a lot here
348 348
349 349 # Replace raw_input. Note that is not sufficient to replace
350 350 # raw_input in the user namespace.
351 351 if content.get('allow_stdin', False):
352 352 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
353 353 else:
354 354 raw_input = lambda prompt='' : self._no_raw_input()
355 355
356 356 if py3compat.PY3:
357 357 self._sys_raw_input = __builtin__.input
358 358 __builtin__.input = raw_input
359 359 else:
360 360 self._sys_raw_input = __builtin__.raw_input
361 361 __builtin__.raw_input = raw_input
362 362
363 363 # Set the parent message of the display hook and out streams.
364 364 shell.displayhook.set_parent(parent)
365 365 shell.display_pub.set_parent(parent)
366 366 shell.data_pub.set_parent(parent)
367 sys.stdout.set_parent(parent)
368 sys.stderr.set_parent(parent)
367 try:
368 sys.stdout.set_parent(parent)
369 except AttributeError:
370 pass
371 try:
372 sys.stderr.set_parent(parent)
373 except AttributeError:
374 pass
369 375
370 376 # Re-broadcast our input for the benefit of listening clients, and
371 377 # start computing output
372 378 if not silent:
373 379 self._publish_pyin(code, parent, shell.execution_count)
374 380
375 381 reply_content = {}
376 382 try:
377 383 # FIXME: the shell calls the exception handler itself.
378 384 shell.run_cell(code, store_history=store_history, silent=silent)
379 385 except:
380 386 status = u'error'
381 387 # FIXME: this code right now isn't being used yet by default,
382 388 # because the run_cell() call above directly fires off exception
383 389 # reporting. This code, therefore, is only active in the scenario
384 390 # where runlines itself has an unhandled exception. We need to
385 391 # uniformize this, for all exception construction to come from a
386 392 # single location in the codbase.
387 393 etype, evalue, tb = sys.exc_info()
388 394 tb_list = traceback.format_exception(etype, evalue, tb)
389 395 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
390 396 else:
391 397 status = u'ok'
392 398 finally:
393 399 # Restore raw_input.
394 400 if py3compat.PY3:
395 401 __builtin__.input = self._sys_raw_input
396 402 else:
397 403 __builtin__.raw_input = self._sys_raw_input
398 404
399 405 reply_content[u'status'] = status
400 406
401 407 # Return the execution counter so clients can display prompts
402 408 reply_content['execution_count'] = shell.execution_count - 1
403 409
404 410 # FIXME - fish exception info out of shell, possibly left there by
405 411 # runlines. We'll need to clean up this logic later.
406 412 if shell._reply_content is not None:
407 413 reply_content.update(shell._reply_content)
408 414 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
409 415 reply_content['engine_info'] = e_info
410 416 # reset after use
411 417 shell._reply_content = None
412 418
413 419 if 'traceback' in reply_content:
414 420 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
415 421
416 422
417 423 # At this point, we can tell whether the main code execution succeeded
418 424 # or not. If it did, we proceed to evaluate user_variables/expressions
419 425 if reply_content['status'] == 'ok':
420 426 reply_content[u'user_variables'] = \
421 427 shell.user_variables(content.get(u'user_variables', []))
422 428 reply_content[u'user_expressions'] = \
423 429 shell.user_expressions(content.get(u'user_expressions', {}))
424 430 else:
425 431 # If there was an error, don't even try to compute variables or
426 432 # expressions
427 433 reply_content[u'user_variables'] = {}
428 434 reply_content[u'user_expressions'] = {}
429 435
430 436 # Payloads should be retrieved regardless of outcome, so we can both
431 437 # recover partial output (that could have been generated early in a
432 438 # block, before an error) and clear the payload system always.
433 439 reply_content[u'payload'] = shell.payload_manager.read_payload()
434 440 # Be agressive about clearing the payload because we don't want
435 441 # it to sit in memory until the next execute_request comes in.
436 442 shell.payload_manager.clear_payload()
437 443
438 444 # Flush output before sending the reply.
439 445 sys.stdout.flush()
440 446 sys.stderr.flush()
441 447 # FIXME: on rare occasions, the flush doesn't seem to make it to the
442 448 # clients... This seems to mitigate the problem, but we definitely need
443 449 # to better understand what's going on.
444 450 if self._execute_sleep:
445 451 time.sleep(self._execute_sleep)
446 452
447 453 # Send the reply.
448 454 reply_content = json_clean(reply_content)
449 455
450 456 md['status'] = reply_content['status']
451 457 if reply_content['status'] == 'error' and \
452 458 reply_content['ename'] == 'UnmetDependency':
453 459 md['dependencies_met'] = False
454 460
455 461 reply_msg = self.session.send(stream, u'execute_reply',
456 462 reply_content, parent, metadata=md,
457 463 ident=ident)
458 464
459 465 self.log.debug("%s", reply_msg)
460 466
461 467 if not silent and reply_msg['content']['status'] == u'error':
462 468 self._abort_queues()
463 469
464 470 self._publish_status(u'idle', parent)
465 471
466 472 def complete_request(self, stream, ident, parent):
467 473 txt, matches = self._complete(parent)
468 474 matches = {'matches' : matches,
469 475 'matched_text' : txt,
470 476 'status' : 'ok'}
471 477 matches = json_clean(matches)
472 478 completion_msg = self.session.send(stream, 'complete_reply',
473 479 matches, parent, ident)
474 480 self.log.debug("%s", completion_msg)
475 481
476 482 def object_info_request(self, stream, ident, parent):
477 483 content = parent['content']
478 484 object_info = self.shell.object_inspect(content['oname'],
479 485 detail_level = content.get('detail_level', 0)
480 486 )
481 487 # Before we send this object over, we scrub it for JSON usage
482 488 oinfo = json_clean(object_info)
483 489 msg = self.session.send(stream, 'object_info_reply',
484 490 oinfo, parent, ident)
485 491 self.log.debug("%s", msg)
486 492
487 493 def history_request(self, stream, ident, parent):
488 494 # We need to pull these out, as passing **kwargs doesn't work with
489 495 # unicode keys before Python 2.6.5.
490 496 hist_access_type = parent['content']['hist_access_type']
491 497 raw = parent['content']['raw']
492 498 output = parent['content']['output']
493 499 if hist_access_type == 'tail':
494 500 n = parent['content']['n']
495 501 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
496 502 include_latest=True)
497 503
498 504 elif hist_access_type == 'range':
499 505 session = parent['content']['session']
500 506 start = parent['content']['start']
501 507 stop = parent['content']['stop']
502 508 hist = self.shell.history_manager.get_range(session, start, stop,
503 509 raw=raw, output=output)
504 510
505 511 elif hist_access_type == 'search':
506 512 n = parent['content'].get('n')
507 513 unique = parent['content'].get('unique', False)
508 514 pattern = parent['content']['pattern']
509 515 hist = self.shell.history_manager.search(
510 516 pattern, raw=raw, output=output, n=n, unique=unique)
511 517
512 518 else:
513 519 hist = []
514 520 hist = list(hist)
515 521 content = {'history' : hist}
516 522 content = json_clean(content)
517 523 msg = self.session.send(stream, 'history_reply',
518 524 content, parent, ident)
519 525 self.log.debug("Sending history reply with %i entries", len(hist))
520 526
521 527 def connect_request(self, stream, ident, parent):
522 528 if self._recorded_ports is not None:
523 529 content = self._recorded_ports.copy()
524 530 else:
525 531 content = {}
526 532 msg = self.session.send(stream, 'connect_reply',
527 533 content, parent, ident)
528 534 self.log.debug("%s", msg)
529 535
530 536 def kernel_info_request(self, stream, ident, parent):
531 537 vinfo = {
532 538 'protocol_version': protocol_version,
533 539 'ipython_version': ipython_version,
534 540 'language_version': language_version,
535 541 'language': 'python',
536 542 }
537 543 msg = self.session.send(stream, 'kernel_info_reply',
538 544 vinfo, parent, ident)
539 545 self.log.debug("%s", msg)
540 546
541 547 def shutdown_request(self, stream, ident, parent):
542 548 self.shell.exit_now = True
543 549 content = dict(status='ok')
544 550 content.update(parent['content'])
545 551 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
546 552 # same content, but different msg_id for broadcasting on IOPub
547 553 self._shutdown_message = self.session.msg(u'shutdown_reply',
548 554 content, parent
549 555 )
550 556
551 557 self._at_shutdown()
552 558 # call sys.exit after a short delay
553 559 loop = ioloop.IOLoop.instance()
554 560 loop.add_timeout(time.time()+0.1, loop.stop)
555 561
556 562 #---------------------------------------------------------------------------
557 563 # Engine methods
558 564 #---------------------------------------------------------------------------
559 565
560 566 def apply_request(self, stream, ident, parent):
561 567 try:
562 568 content = parent[u'content']
563 569 bufs = parent[u'buffers']
564 570 msg_id = parent['header']['msg_id']
565 571 except:
566 572 self.log.error("Got bad msg: %s", parent, exc_info=True)
567 573 return
568 574
569 575 self._publish_status(u'busy', parent)
570
576
571 577 # Set the parent message of the display hook and out streams.
572 578 shell = self.shell
573 579 shell.displayhook.set_parent(parent)
574 580 shell.display_pub.set_parent(parent)
575 581 shell.data_pub.set_parent(parent)
576 sys.stdout.set_parent(parent)
577 sys.stderr.set_parent(parent)
582 try:
583 sys.stdout.set_parent(parent)
584 except AttributeError:
585 pass
586 try:
587 sys.stderr.set_parent(parent)
588 except AttributeError:
589 pass
578 590
579 591 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
580 592 # self.iopub_socket.send(pyin_msg)
581 593 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
582 594 md = self._make_metadata(parent['metadata'])
583 595 try:
584 596 working = shell.user_ns
585 597
586 598 prefix = "_"+str(msg_id).replace("-","")+"_"
587 599
588 600 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
589 601
590 602 fname = getattr(f, '__name__', 'f')
591 603
592 604 fname = prefix+"f"
593 605 argname = prefix+"args"
594 606 kwargname = prefix+"kwargs"
595 607 resultname = prefix+"result"
596 608
597 609 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
598 610 # print ns
599 611 working.update(ns)
600 612 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
601 613 try:
602 614 exec code in shell.user_global_ns, shell.user_ns
603 615 result = working.get(resultname)
604 616 finally:
605 617 for key in ns.iterkeys():
606 618 working.pop(key)
607 619
608 620 result_buf = serialize_object(result,
609 621 buffer_threshold=self.session.buffer_threshold,
610 622 item_threshold=self.session.item_threshold,
611 623 )
612 624
613 625 except:
614 626 # invoke IPython traceback formatting
615 627 shell.showtraceback()
616 628 # FIXME - fish exception info out of shell, possibly left there by
617 629 # run_code. We'll need to clean up this logic later.
618 630 reply_content = {}
619 631 if shell._reply_content is not None:
620 632 reply_content.update(shell._reply_content)
621 633 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
622 634 reply_content['engine_info'] = e_info
623 635 # reset after use
624 636 shell._reply_content = None
625 637
626 638 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
627 639 ident=self._topic('pyerr'))
628 640 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
629 641 result_buf = []
630 642
631 643 if reply_content['ename'] == 'UnmetDependency':
632 644 md['dependencies_met'] = False
633 645 else:
634 646 reply_content = {'status' : 'ok'}
635 647
636 648 # put 'ok'/'error' status in header, for scheduler introspection:
637 649 md['status'] = reply_content['status']
638 650
639 651 # flush i/o
640 652 sys.stdout.flush()
641 653 sys.stderr.flush()
642 654
643 655 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
644 656 parent=parent, ident=ident,buffers=result_buf, metadata=md)
645 657
646 658 self._publish_status(u'idle', parent)
647 659
648 660 #---------------------------------------------------------------------------
649 661 # Control messages
650 662 #---------------------------------------------------------------------------
651 663
652 664 def abort_request(self, stream, ident, parent):
653 665 """abort a specifig msg by id"""
654 666 msg_ids = parent['content'].get('msg_ids', None)
655 667 if isinstance(msg_ids, basestring):
656 668 msg_ids = [msg_ids]
657 669 if not msg_ids:
658 670 self.abort_queues()
659 671 for mid in msg_ids:
660 672 self.aborted.add(str(mid))
661 673
662 674 content = dict(status='ok')
663 675 reply_msg = self.session.send(stream, 'abort_reply', content=content,
664 676 parent=parent, ident=ident)
665 677 self.log.debug("%s", reply_msg)
666 678
667 679 def clear_request(self, stream, idents, parent):
668 680 """Clear our namespace."""
669 681 self.shell.reset(False)
670 682 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
671 683 content = dict(status='ok'))
672 684
673 685
674 686 #---------------------------------------------------------------------------
675 687 # Protected interface
676 688 #---------------------------------------------------------------------------
677 689
678 690 def _wrap_exception(self, method=None):
679 691 # import here, because _wrap_exception is only used in parallel,
680 692 # and parallel has higher min pyzmq version
681 693 from IPython.parallel.error import wrap_exception
682 694 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
683 695 content = wrap_exception(e_info)
684 696 return content
685 697
686 698 def _topic(self, topic):
687 699 """prefixed topic for IOPub messages"""
688 700 if self.int_id >= 0:
689 701 base = "engine.%i" % self.int_id
690 702 else:
691 703 base = "kernel.%s" % self.ident
692 704
693 705 return py3compat.cast_bytes("%s.%s" % (base, topic))
694 706
695 707 def _abort_queues(self):
696 708 for stream in self.shell_streams:
697 709 if stream:
698 710 self._abort_queue(stream)
699 711
700 712 def _abort_queue(self, stream):
701 713 poller = zmq.Poller()
702 714 poller.register(stream.socket, zmq.POLLIN)
703 715 while True:
704 716 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
705 717 if msg is None:
706 718 return
707 719
708 720 self.log.info("Aborting:")
709 721 self.log.info("%s", msg)
710 722 msg_type = msg['header']['msg_type']
711 723 reply_type = msg_type.split('_')[0] + '_reply'
712 724
713 725 status = {'status' : 'aborted'}
714 726 md = {'engine' : self.ident}
715 727 md.update(status)
716 728 reply_msg = self.session.send(stream, reply_type, metadata=md,
717 729 content=status, parent=msg, ident=idents)
718 730 self.log.debug("%s", reply_msg)
719 731 # We need to wait a bit for requests to come in. This can probably
720 732 # be set shorter for true asynchronous clients.
721 733 poller.poll(50)
722 734
723 735
724 736 def _no_raw_input(self):
725 737 """Raise StdinNotImplentedError if active frontend doesn't support
726 738 stdin."""
727 739 raise StdinNotImplementedError("raw_input was called, but this "
728 740 "frontend does not support stdin.")
729 741
730 742 def _raw_input(self, prompt, ident, parent):
731 743 # Flush output before making the request.
732 744 sys.stderr.flush()
733 745 sys.stdout.flush()
734 746
735 747 # Send the input request.
736 748 content = json_clean(dict(prompt=prompt))
737 749 self.session.send(self.stdin_socket, u'input_request', content, parent,
738 750 ident=ident)
739 751
740 752 # Await a response.
741 753 while True:
742 754 try:
743 755 ident, reply = self.session.recv(self.stdin_socket, 0)
744 756 except Exception:
745 757 self.log.warn("Invalid Message:", exc_info=True)
746 758 else:
747 759 break
748 760 try:
749 761 value = reply['content']['value']
750 762 except:
751 763 self.log.error("Got bad raw_input reply: ")
752 764 self.log.error("%s", parent)
753 765 value = ''
754 766 if value == '\x04':
755 767 # EOF
756 768 raise EOFError
757 769 return value
758 770
759 771 def _complete(self, msg):
760 772 c = msg['content']
761 773 try:
762 774 cpos = int(c['cursor_pos'])
763 775 except:
764 776 # If we don't get something that we can convert to an integer, at
765 777 # least attempt the completion guessing the cursor is at the end of
766 778 # the text, if there's any, and otherwise of the line
767 779 cpos = len(c['text'])
768 780 if cpos==0:
769 781 cpos = len(c['line'])
770 782 return self.shell.complete(c['text'], c['line'], cpos)
771 783
772 784 def _at_shutdown(self):
773 785 """Actions taken at shutdown by the kernel, called by python's atexit.
774 786 """
775 787 # io.rprint("Kernel at_shutdown") # dbg
776 788 if self._shutdown_message is not None:
777 789 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
778 790 self.log.debug("%s", self._shutdown_message)
779 791 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
780 792
General Comments 0
You need to be logged in to leave comments. Login now