##// END OF EJS Templates
Backport PR #2926: Don't die if stderr/stdout do not support set_parent() #2925...
MinRK -
Show More
@@ -1,922 +1,934 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 7 call set_parent on all the PUB objects with the message about to be executed.
8 8 * Implement random port and security key logic.
9 9 * Implement control messages.
10 10 * Implement event loop and poll version.
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 # Standard library imports
19 19 import __builtin__
20 20 import atexit
21 21 import sys
22 22 import time
23 23 import traceback
24 24 import logging
25 25 import uuid
26 26
27 27 from datetime import datetime
28 28 from signal import (
29 29 signal, getsignal, default_int_handler, SIGINT, SIG_IGN
30 30 )
31 31
32 32 # System library imports
33 33 import zmq
34 34 from zmq.eventloop import ioloop
35 35 from zmq.eventloop.zmqstream import ZMQStream
36 36
37 37 # Local imports
38 38 from IPython.config.configurable import Configurable
39 39 from IPython.config.application import boolean_flag, catch_config_error
40 40 from IPython.core.application import ProfileDir
41 41 from IPython.core.error import StdinNotImplementedError
42 42 from IPython.core.shellapp import (
43 43 InteractiveShellApp, shell_flags, shell_aliases
44 44 )
45 45 from IPython.utils import io
46 46 from IPython.utils import py3compat
47 47 from IPython.utils.frame import extract_module_locals
48 48 from IPython.utils.jsonutil import json_clean
49 49 from IPython.utils.traitlets import (
50 50 Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode
51 51 )
52 52
53 53 from entry_point import base_launch_kernel
54 54 from kernelapp import KernelApp, kernel_flags, kernel_aliases
55 55 from serialize import serialize_object, unpack_apply_message
56 56 from session import Session, Message
57 57 from zmqshell import ZMQInteractiveShell
58 58
59 59
60 60 #-----------------------------------------------------------------------------
61 61 # Main kernel class
62 62 #-----------------------------------------------------------------------------
63 63
64 64 class Kernel(Configurable):
65 65
66 66 #---------------------------------------------------------------------------
67 67 # Kernel interface
68 68 #---------------------------------------------------------------------------
69 69
70 70 # attribute to override with a GUI
71 71 eventloop = Any(None)
72 72 def _eventloop_changed(self, name, old, new):
73 73 """schedule call to eventloop from IOLoop"""
74 74 loop = ioloop.IOLoop.instance()
75 75 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
76 76
77 77 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
78 78 session = Instance(Session)
79 79 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
80 80 shell_streams = List()
81 81 control_stream = Instance(ZMQStream)
82 82 iopub_socket = Instance(zmq.Socket)
83 83 stdin_socket = Instance(zmq.Socket)
84 84 log = Instance(logging.Logger)
85 85
86 86 user_module = Any()
87 87 def _user_module_changed(self, name, old, new):
88 88 if self.shell is not None:
89 89 self.shell.user_module = new
90 90
91 91 user_ns = Dict(default_value=None)
92 92 def _user_ns_changed(self, name, old, new):
93 93 if self.shell is not None:
94 94 self.shell.user_ns = new
95 95 self.shell.init_user_ns()
96 96
97 97 # identities:
98 98 int_id = Integer(-1)
99 99 ident = Unicode()
100 100
101 101 def _ident_default(self):
102 102 return unicode(uuid.uuid4())
103 103
104 104
105 105 # Private interface
106 106
107 107 # Time to sleep after flushing the stdout/err buffers in each execute
108 108 # cycle. While this introduces a hard limit on the minimal latency of the
109 109 # execute cycle, it helps prevent output synchronization problems for
110 110 # clients.
111 111 # Units are in seconds. The minimum zmq latency on local host is probably
112 112 # ~150 microseconds, set this to 500us for now. We may need to increase it
113 113 # a little if it's not enough after more interactive testing.
114 114 _execute_sleep = Float(0.0005, config=True)
115 115
116 116 # Frequency of the kernel's event loop.
117 117 # Units are in seconds, kernel subclasses for GUI toolkits may need to
118 118 # adapt to milliseconds.
119 119 _poll_interval = Float(0.05, config=True)
120 120
121 121 # If the shutdown was requested over the network, we leave here the
122 122 # necessary reply message so it can be sent by our registered atexit
123 123 # handler. This ensures that the reply is only sent to clients truly at
124 124 # the end of our shutdown process (which happens after the underlying
125 125 # IPython shell's own shutdown).
126 126 _shutdown_message = None
127 127
128 128 # This is a dict of port number that the kernel is listening on. It is set
129 129 # by record_ports and used by connect_request.
130 130 _recorded_ports = Dict()
131 131
132 132 # set of aborted msg_ids
133 133 aborted = Set()
134 134
135 135
136 136 def __init__(self, **kwargs):
137 137 super(Kernel, self).__init__(**kwargs)
138 138
139 139 # Initialize the InteractiveShell subclass
140 140 self.shell = ZMQInteractiveShell.instance(config=self.config,
141 141 profile_dir = self.profile_dir,
142 142 user_module = self.user_module,
143 143 user_ns = self.user_ns,
144 144 )
145 145 self.shell.displayhook.session = self.session
146 146 self.shell.displayhook.pub_socket = self.iopub_socket
147 147 self.shell.displayhook.topic = self._topic('pyout')
148 148 self.shell.display_pub.session = self.session
149 149 self.shell.display_pub.pub_socket = self.iopub_socket
150 150
151 151 # TMP - hack while developing
152 152 self.shell._reply_content = None
153 153
154 154 # Build dict of handlers for message types
155 155 msg_types = [ 'execute_request', 'complete_request',
156 156 'object_info_request', 'history_request',
157 157 'connect_request', 'shutdown_request',
158 158 'apply_request',
159 159 ]
160 160 self.shell_handlers = {}
161 161 for msg_type in msg_types:
162 162 self.shell_handlers[msg_type] = getattr(self, msg_type)
163 163
164 164 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
165 165 self.control_handlers = {}
166 166 for msg_type in control_msg_types:
167 167 self.control_handlers[msg_type] = getattr(self, msg_type)
168 168
169 169 def dispatch_control(self, msg):
170 170 """dispatch control requests"""
171 171 idents,msg = self.session.feed_identities(msg, copy=False)
172 172 try:
173 173 msg = self.session.unserialize(msg, content=True, copy=False)
174 174 except:
175 175 self.log.error("Invalid Control Message", exc_info=True)
176 176 return
177 177
178 178 self.log.debug("Control received: %s", msg)
179 179
180 180 header = msg['header']
181 181 msg_id = header['msg_id']
182 182 msg_type = header['msg_type']
183 183
184 184 handler = self.control_handlers.get(msg_type, None)
185 185 if handler is None:
186 186 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
187 187 else:
188 188 try:
189 189 handler(self.control_stream, idents, msg)
190 190 except Exception:
191 191 self.log.error("Exception in control handler:", exc_info=True)
192 192
193 193 def dispatch_shell(self, stream, msg):
194 194 """dispatch shell requests"""
195 195 # flush control requests first
196 196 if self.control_stream:
197 197 self.control_stream.flush()
198 198
199 199 idents,msg = self.session.feed_identities(msg, copy=False)
200 200 try:
201 201 msg = self.session.unserialize(msg, content=True, copy=False)
202 202 except:
203 203 self.log.error("Invalid Message", exc_info=True)
204 204 return
205 205
206 206 header = msg['header']
207 207 msg_id = header['msg_id']
208 208 msg_type = msg['header']['msg_type']
209 209
210 210 # Print some info about this message and leave a '--->' marker, so it's
211 211 # easier to trace visually the message chain when debugging. Each
212 212 # handler prints its message at the end.
213 213 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
214 214 self.log.debug(' Content: %s\n --->\n ', msg['content'])
215 215
216 216 if msg_id in self.aborted:
217 217 self.aborted.remove(msg_id)
218 218 # is it safe to assume a msg_id will not be resubmitted?
219 219 reply_type = msg_type.split('_')[0] + '_reply'
220 220 status = {'status' : 'aborted'}
221 221 sub = {'engine' : self.ident}
222 222 sub.update(status)
223 223 reply_msg = self.session.send(stream, reply_type, subheader=sub,
224 224 content=status, parent=msg, ident=idents)
225 225 return
226 226
227 227 handler = self.shell_handlers.get(msg_type, None)
228 228 if handler is None:
229 229 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
230 230 else:
231 231 # ensure default_int_handler during handler call
232 232 sig = signal(SIGINT, default_int_handler)
233 233 try:
234 234 handler(stream, idents, msg)
235 235 except Exception:
236 236 self.log.error("Exception in message handler:", exc_info=True)
237 237 finally:
238 238 signal(SIGINT, sig)
239 239
240 240 def enter_eventloop(self):
241 241 """enter eventloop"""
242 242 self.log.info("entering eventloop")
243 243 # restore default_int_handler
244 244 signal(SIGINT, default_int_handler)
245 245 while self.eventloop is not None:
246 246 try:
247 247 self.eventloop(self)
248 248 except KeyboardInterrupt:
249 249 # Ctrl-C shouldn't crash the kernel
250 250 self.log.error("KeyboardInterrupt caught in kernel")
251 251 continue
252 252 else:
253 253 # eventloop exited cleanly, this means we should stop (right?)
254 254 self.eventloop = None
255 255 break
256 256 self.log.info("exiting eventloop")
257 257 # if eventloop exits, IOLoop should stop
258 258 ioloop.IOLoop.instance().stop()
259 259
260 260 def start(self):
261 261 """register dispatchers for streams"""
262 262 self.shell.exit_now = False
263 263 if self.control_stream:
264 264 self.control_stream.on_recv(self.dispatch_control, copy=False)
265 265
266 266 def make_dispatcher(stream):
267 267 def dispatcher(msg):
268 268 return self.dispatch_shell(stream, msg)
269 269 return dispatcher
270 270
271 271 for s in self.shell_streams:
272 272 s.on_recv(make_dispatcher(s), copy=False)
273 273
274 274 def do_one_iteration(self):
275 275 """step eventloop just once"""
276 276 if self.control_stream:
277 277 self.control_stream.flush()
278 278 for stream in self.shell_streams:
279 279 # handle at most one request per iteration
280 280 stream.flush(zmq.POLLIN, 1)
281 281 stream.flush(zmq.POLLOUT)
282 282
283 283
284 284 def record_ports(self, ports):
285 285 """Record the ports that this kernel is using.
286 286
287 287 The creator of the Kernel instance must call this methods if they
288 288 want the :meth:`connect_request` method to return the port numbers.
289 289 """
290 290 self._recorded_ports = ports
291 291
292 292 #---------------------------------------------------------------------------
293 293 # Kernel request handlers
294 294 #---------------------------------------------------------------------------
295 295
296 296 def _make_subheader(self):
297 297 """init subheader dict, for execute/apply_reply"""
298 298 return {
299 299 'dependencies_met' : True,
300 300 'engine' : self.ident,
301 301 'started': datetime.now(),
302 302 }
303 303
304 304 def _publish_pyin(self, code, parent, execution_count):
305 305 """Publish the code request on the pyin stream."""
306 306
307 307 self.session.send(self.iopub_socket, u'pyin',
308 308 {u'code':code, u'execution_count': execution_count},
309 309 parent=parent, ident=self._topic('pyin')
310 310 )
311 311
312 312 def _publish_status(self, status, parent=None):
313 313 """send status (busy/idle) on IOPub"""
314 314 self.session.send(self.iopub_socket,
315 315 u'status',
316 316 {u'execution_state': status},
317 317 parent=parent,
318 318 ident=self._topic('status'),
319 319 )
320 320
321 321
322 322 def execute_request(self, stream, ident, parent):
323 323 """handle an execute_request"""
324 324
325 325 self._publish_status(u'busy', parent)
326 326
327 327 try:
328 328 content = parent[u'content']
329 329 code = content[u'code']
330 330 silent = content[u'silent']
331 331 except:
332 332 self.log.error("Got bad msg: ")
333 333 self.log.error("%s", parent)
334 334 return
335 335
336 336 sub = self._make_subheader()
337 337
338 338 shell = self.shell # we'll need this a lot here
339 339
340 340 # Replace raw_input. Note that is not sufficient to replace
341 341 # raw_input in the user namespace.
342 342 if content.get('allow_stdin', False):
343 343 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
344 344 else:
345 345 raw_input = lambda prompt='' : self._no_raw_input()
346 346
347 347 if py3compat.PY3:
348 348 __builtin__.input = raw_input
349 349 else:
350 350 __builtin__.raw_input = raw_input
351 351
352 352 # Set the parent message of the display hook and out streams.
353 353 shell.displayhook.set_parent(parent)
354 354 shell.display_pub.set_parent(parent)
355 sys.stdout.set_parent(parent)
356 sys.stderr.set_parent(parent)
355 try:
356 sys.stdout.set_parent(parent)
357 except AttributeError:
358 pass
359 try:
360 sys.stderr.set_parent(parent)
361 except AttributeError:
362 pass
357 363
358 364 # Re-broadcast our input for the benefit of listening clients, and
359 365 # start computing output
360 366 if not silent:
361 367 self._publish_pyin(code, parent, shell.execution_count)
362 368
363 369 reply_content = {}
364 370 try:
365 371 # FIXME: the shell calls the exception handler itself.
366 372 shell.run_cell(code, store_history=not silent, silent=silent)
367 373 except:
368 374 status = u'error'
369 375 # FIXME: this code right now isn't being used yet by default,
370 376 # because the run_cell() call above directly fires off exception
371 377 # reporting. This code, therefore, is only active in the scenario
372 378 # where runlines itself has an unhandled exception. We need to
373 379 # uniformize this, for all exception construction to come from a
374 380 # single location in the codbase.
375 381 etype, evalue, tb = sys.exc_info()
376 382 tb_list = traceback.format_exception(etype, evalue, tb)
377 383 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
378 384 else:
379 385 status = u'ok'
380 386
381 387 reply_content[u'status'] = status
382 388
383 389 # Return the execution counter so clients can display prompts
384 390 reply_content['execution_count'] = shell.execution_count - 1
385 391
386 392 # FIXME - fish exception info out of shell, possibly left there by
387 393 # runlines. We'll need to clean up this logic later.
388 394 if shell._reply_content is not None:
389 395 reply_content.update(shell._reply_content)
390 396 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
391 397 reply_content['engine_info'] = e_info
392 398 # reset after use
393 399 shell._reply_content = None
394 400
395 401 # At this point, we can tell whether the main code execution succeeded
396 402 # or not. If it did, we proceed to evaluate user_variables/expressions
397 403 if reply_content['status'] == 'ok':
398 404 reply_content[u'user_variables'] = \
399 405 shell.user_variables(content.get(u'user_variables', []))
400 406 reply_content[u'user_expressions'] = \
401 407 shell.user_expressions(content.get(u'user_expressions', {}))
402 408 else:
403 409 # If there was an error, don't even try to compute variables or
404 410 # expressions
405 411 reply_content[u'user_variables'] = {}
406 412 reply_content[u'user_expressions'] = {}
407 413
408 414 # Payloads should be retrieved regardless of outcome, so we can both
409 415 # recover partial output (that could have been generated early in a
410 416 # block, before an error) and clear the payload system always.
411 417 reply_content[u'payload'] = shell.payload_manager.read_payload()
412 418 # Be agressive about clearing the payload because we don't want
413 419 # it to sit in memory until the next execute_request comes in.
414 420 shell.payload_manager.clear_payload()
415 421
416 422 # Flush output before sending the reply.
417 423 sys.stdout.flush()
418 424 sys.stderr.flush()
419 425 # FIXME: on rare occasions, the flush doesn't seem to make it to the
420 426 # clients... This seems to mitigate the problem, but we definitely need
421 427 # to better understand what's going on.
422 428 if self._execute_sleep:
423 429 time.sleep(self._execute_sleep)
424 430
425 431 # Send the reply.
426 432 reply_content = json_clean(reply_content)
427 433
428 434 sub['status'] = reply_content['status']
429 435 if reply_content['status'] == 'error' and \
430 436 reply_content['ename'] == 'UnmetDependency':
431 437 sub['dependencies_met'] = False
432 438
433 439 reply_msg = self.session.send(stream, u'execute_reply',
434 440 reply_content, parent, subheader=sub,
435 441 ident=ident)
436 442
437 443 self.log.debug("%s", reply_msg)
438 444
439 445 if not silent and reply_msg['content']['status'] == u'error':
440 446 self._abort_queues()
441 447
442 448 self._publish_status(u'idle', parent)
443 449
444 450 def complete_request(self, stream, ident, parent):
445 451 txt, matches = self._complete(parent)
446 452 matches = {'matches' : matches,
447 453 'matched_text' : txt,
448 454 'status' : 'ok'}
449 455 matches = json_clean(matches)
450 456 completion_msg = self.session.send(stream, 'complete_reply',
451 457 matches, parent, ident)
452 458 self.log.debug("%s", completion_msg)
453 459
454 460 def object_info_request(self, stream, ident, parent):
455 461 content = parent['content']
456 462 object_info = self.shell.object_inspect(content['oname'],
457 463 detail_level = content.get('detail_level', 0)
458 464 )
459 465 # Before we send this object over, we scrub it for JSON usage
460 466 oinfo = json_clean(object_info)
461 467 msg = self.session.send(stream, 'object_info_reply',
462 468 oinfo, parent, ident)
463 469 self.log.debug("%s", msg)
464 470
465 471 def history_request(self, stream, ident, parent):
466 472 # We need to pull these out, as passing **kwargs doesn't work with
467 473 # unicode keys before Python 2.6.5.
468 474 hist_access_type = parent['content']['hist_access_type']
469 475 raw = parent['content']['raw']
470 476 output = parent['content']['output']
471 477 if hist_access_type == 'tail':
472 478 n = parent['content']['n']
473 479 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
474 480 include_latest=True)
475 481
476 482 elif hist_access_type == 'range':
477 483 session = parent['content']['session']
478 484 start = parent['content']['start']
479 485 stop = parent['content']['stop']
480 486 hist = self.shell.history_manager.get_range(session, start, stop,
481 487 raw=raw, output=output)
482 488
483 489 elif hist_access_type == 'search':
484 490 pattern = parent['content']['pattern']
485 491 hist = self.shell.history_manager.search(pattern, raw=raw,
486 492 output=output)
487 493
488 494 else:
489 495 hist = []
490 496 hist = list(hist)
491 497 content = {'history' : hist}
492 498 content = json_clean(content)
493 499 msg = self.session.send(stream, 'history_reply',
494 500 content, parent, ident)
495 501 self.log.debug("Sending history reply with %i entries", len(hist))
496 502
497 503 def connect_request(self, stream, ident, parent):
498 504 if self._recorded_ports is not None:
499 505 content = self._recorded_ports.copy()
500 506 else:
501 507 content = {}
502 508 msg = self.session.send(stream, 'connect_reply',
503 509 content, parent, ident)
504 510 self.log.debug("%s", msg)
505 511
506 512 def shutdown_request(self, stream, ident, parent):
507 513 self.shell.exit_now = True
508 514 content = dict(status='ok')
509 515 content.update(parent['content'])
510 516 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
511 517 # same content, but different msg_id for broadcasting on IOPub
512 518 self._shutdown_message = self.session.msg(u'shutdown_reply',
513 519 content, parent
514 520 )
515 521
516 522 self._at_shutdown()
517 523 # call sys.exit after a short delay
518 524 loop = ioloop.IOLoop.instance()
519 525 loop.add_timeout(time.time()+0.1, loop.stop)
520 526
521 527 #---------------------------------------------------------------------------
522 528 # Engine methods
523 529 #---------------------------------------------------------------------------
524 530
525 531 def apply_request(self, stream, ident, parent):
526 532 try:
527 533 content = parent[u'content']
528 534 bufs = parent[u'buffers']
529 535 msg_id = parent['header']['msg_id']
530 536 except:
531 537 self.log.error("Got bad msg: %s", parent, exc_info=True)
532 538 return
533 539
534 540 self._publish_status(u'busy', parent)
535
541
536 542 # Set the parent message of the display hook and out streams.
537 543 shell = self.shell
538 544 shell.displayhook.set_parent(parent)
539 545 shell.display_pub.set_parent(parent)
540 sys.stdout.set_parent(parent)
541 sys.stderr.set_parent(parent)
546 try:
547 sys.stdout.set_parent(parent)
548 except AttributeError:
549 pass
550 try:
551 sys.stderr.set_parent(parent)
552 except AttributeError:
553 pass
542 554
543 555 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
544 556 # self.iopub_socket.send(pyin_msg)
545 557 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
546 558 sub = self._make_subheader()
547 559 try:
548 560 working = shell.user_ns
549 561
550 562 prefix = "_"+str(msg_id).replace("-","")+"_"
551 563
552 564 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
553 565
554 566 fname = getattr(f, '__name__', 'f')
555 567
556 568 fname = prefix+"f"
557 569 argname = prefix+"args"
558 570 kwargname = prefix+"kwargs"
559 571 resultname = prefix+"result"
560 572
561 573 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
562 574 # print ns
563 575 working.update(ns)
564 576 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
565 577 try:
566 578 exec code in shell.user_global_ns, shell.user_ns
567 579 result = working.get(resultname)
568 580 finally:
569 581 for key in ns.iterkeys():
570 582 working.pop(key)
571 583
572 584 packed_result,buf = serialize_object(result)
573 585 result_buf = [packed_result]+buf
574 586 except:
575 587 # invoke IPython traceback formatting
576 588 shell.showtraceback()
577 589 # FIXME - fish exception info out of shell, possibly left there by
578 590 # run_code. We'll need to clean up this logic later.
579 591 reply_content = {}
580 592 if shell._reply_content is not None:
581 593 reply_content.update(shell._reply_content)
582 594 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
583 595 reply_content['engine_info'] = e_info
584 596 # reset after use
585 597 shell._reply_content = None
586 598
587 599 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
588 600 ident=self._topic('pyerr'))
589 601 result_buf = []
590 602
591 603 if reply_content['ename'] == 'UnmetDependency':
592 604 sub['dependencies_met'] = False
593 605 else:
594 606 reply_content = {'status' : 'ok'}
595 607
596 608 # put 'ok'/'error' status in header, for scheduler introspection:
597 609 sub['status'] = reply_content['status']
598 610
599 611 # flush i/o
600 612 sys.stdout.flush()
601 613 sys.stderr.flush()
602 614
603 615 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
604 616 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
605 617
606 618 self._publish_status(u'idle', parent)
607 619
608 620 #---------------------------------------------------------------------------
609 621 # Control messages
610 622 #---------------------------------------------------------------------------
611 623
612 624 def abort_request(self, stream, ident, parent):
613 625 """abort a specifig msg by id"""
614 626 msg_ids = parent['content'].get('msg_ids', None)
615 627 if isinstance(msg_ids, basestring):
616 628 msg_ids = [msg_ids]
617 629 if not msg_ids:
618 630 self.abort_queues()
619 631 for mid in msg_ids:
620 632 self.aborted.add(str(mid))
621 633
622 634 content = dict(status='ok')
623 635 reply_msg = self.session.send(stream, 'abort_reply', content=content,
624 636 parent=parent, ident=ident)
625 637 self.log.debug("%s", reply_msg)
626 638
627 639 def clear_request(self, stream, idents, parent):
628 640 """Clear our namespace."""
629 641 self.shell.reset(False)
630 642 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
631 643 content = dict(status='ok'))
632 644
633 645
634 646 #---------------------------------------------------------------------------
635 647 # Protected interface
636 648 #---------------------------------------------------------------------------
637 649
638 650
639 651 def _wrap_exception(self, method=None):
640 652 # import here, because _wrap_exception is only used in parallel,
641 653 # and parallel has higher min pyzmq version
642 654 from IPython.parallel.error import wrap_exception
643 655 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
644 656 content = wrap_exception(e_info)
645 657 return content
646 658
647 659 def _topic(self, topic):
648 660 """prefixed topic for IOPub messages"""
649 661 if self.int_id >= 0:
650 662 base = "engine.%i" % self.int_id
651 663 else:
652 664 base = "kernel.%s" % self.ident
653 665
654 666 return py3compat.cast_bytes("%s.%s" % (base, topic))
655 667
656 668 def _abort_queues(self):
657 669 for stream in self.shell_streams:
658 670 if stream:
659 671 self._abort_queue(stream)
660 672
661 673 def _abort_queue(self, stream):
662 674 poller = zmq.Poller()
663 675 poller.register(stream.socket, zmq.POLLIN)
664 676 while True:
665 677 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
666 678 if msg is None:
667 679 return
668 680
669 681 self.log.info("Aborting:")
670 682 self.log.info("%s", msg)
671 683 msg_type = msg['header']['msg_type']
672 684 reply_type = msg_type.split('_')[0] + '_reply'
673 685
674 686 status = {'status' : 'aborted'}
675 687 sub = {'engine' : self.ident}
676 688 sub.update(status)
677 689 reply_msg = self.session.send(stream, reply_type, subheader=sub,
678 690 content=status, parent=msg, ident=idents)
679 691 self.log.debug("%s", reply_msg)
680 692 # We need to wait a bit for requests to come in. This can probably
681 693 # be set shorter for true asynchronous clients.
682 694 poller.poll(50)
683 695
684 696
685 697 def _no_raw_input(self):
686 698 """Raise StdinNotImplentedError if active frontend doesn't support
687 699 stdin."""
688 700 raise StdinNotImplementedError("raw_input was called, but this "
689 701 "frontend does not support stdin.")
690 702
691 703 def _raw_input(self, prompt, ident, parent):
692 704 # Flush output before making the request.
693 705 sys.stderr.flush()
694 706 sys.stdout.flush()
695 707
696 708 # Send the input request.
697 709 content = json_clean(dict(prompt=prompt))
698 710 self.session.send(self.stdin_socket, u'input_request', content, parent,
699 711 ident=ident)
700 712
701 713 # Await a response.
702 714 while True:
703 715 try:
704 716 ident, reply = self.session.recv(self.stdin_socket, 0)
705 717 except Exception:
706 718 self.log.warn("Invalid Message:", exc_info=True)
707 719 else:
708 720 break
709 721 try:
710 722 value = reply['content']['value']
711 723 except:
712 724 self.log.error("Got bad raw_input reply: ")
713 725 self.log.error("%s", parent)
714 726 value = ''
715 727 if value == '\x04':
716 728 # EOF
717 729 raise EOFError
718 730 return value
719 731
720 732 def _complete(self, msg):
721 733 c = msg['content']
722 734 try:
723 735 cpos = int(c['cursor_pos'])
724 736 except:
725 737 # If we don't get something that we can convert to an integer, at
726 738 # least attempt the completion guessing the cursor is at the end of
727 739 # the text, if there's any, and otherwise of the line
728 740 cpos = len(c['text'])
729 741 if cpos==0:
730 742 cpos = len(c['line'])
731 743 return self.shell.complete(c['text'], c['line'], cpos)
732 744
733 745 def _object_info(self, context):
734 746 symbol, leftover = self._symbol_from_context(context)
735 747 if symbol is not None and not leftover:
736 748 doc = getattr(symbol, '__doc__', '')
737 749 else:
738 750 doc = ''
739 751 object_info = dict(docstring = doc)
740 752 return object_info
741 753
742 754 def _symbol_from_context(self, context):
743 755 if not context:
744 756 return None, context
745 757
746 758 base_symbol_string = context[0]
747 759 symbol = self.shell.user_ns.get(base_symbol_string, None)
748 760 if symbol is None:
749 761 symbol = __builtin__.__dict__.get(base_symbol_string, None)
750 762 if symbol is None:
751 763 return None, context
752 764
753 765 context = context[1:]
754 766 for i, name in enumerate(context):
755 767 new_symbol = getattr(symbol, name, None)
756 768 if new_symbol is None:
757 769 return symbol, context[i:]
758 770 else:
759 771 symbol = new_symbol
760 772
761 773 return symbol, []
762 774
763 775 def _at_shutdown(self):
764 776 """Actions taken at shutdown by the kernel, called by python's atexit.
765 777 """
766 778 # io.rprint("Kernel at_shutdown") # dbg
767 779 if self._shutdown_message is not None:
768 780 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
769 781 self.log.debug("%s", self._shutdown_message)
770 782 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
771 783
772 784 #-----------------------------------------------------------------------------
773 785 # Aliases and Flags for the IPKernelApp
774 786 #-----------------------------------------------------------------------------
775 787
776 788 flags = dict(kernel_flags)
777 789 flags.update(shell_flags)
778 790
779 791 addflag = lambda *args: flags.update(boolean_flag(*args))
780 792
781 793 flags['pylab'] = (
782 794 {'IPKernelApp' : {'pylab' : 'auto'}},
783 795 """Pre-load matplotlib and numpy for interactive use with
784 796 the default matplotlib backend."""
785 797 )
786 798
787 799 aliases = dict(kernel_aliases)
788 800 aliases.update(shell_aliases)
789 801
790 802 #-----------------------------------------------------------------------------
791 803 # The IPKernelApp class
792 804 #-----------------------------------------------------------------------------
793 805
794 806 class IPKernelApp(KernelApp, InteractiveShellApp):
795 807 name = 'ipkernel'
796 808
797 809 aliases = Dict(aliases)
798 810 flags = Dict(flags)
799 811 classes = [Kernel, ZMQInteractiveShell, ProfileDir, Session]
800 812
801 813 @catch_config_error
802 814 def initialize(self, argv=None):
803 815 super(IPKernelApp, self).initialize(argv)
804 816 self.init_path()
805 817 self.init_shell()
806 818 self.init_gui_pylab()
807 819 self.init_extensions()
808 820 self.init_code()
809 821
810 822 def init_kernel(self):
811 823
812 824 shell_stream = ZMQStream(self.shell_socket)
813 825
814 826 kernel = Kernel(config=self.config, session=self.session,
815 827 shell_streams=[shell_stream],
816 828 iopub_socket=self.iopub_socket,
817 829 stdin_socket=self.stdin_socket,
818 830 log=self.log,
819 831 profile_dir=self.profile_dir,
820 832 )
821 833 self.kernel = kernel
822 834 kernel.record_ports(self.ports)
823 835 shell = kernel.shell
824 836
825 837 def init_gui_pylab(self):
826 838 """Enable GUI event loop integration, taking pylab into account."""
827 839
828 840 # Provide a wrapper for :meth:`InteractiveShellApp.init_gui_pylab`
829 841 # to ensure that any exception is printed straight to stderr.
830 842 # Normally _showtraceback associates the reply with an execution,
831 843 # which means frontends will never draw it, as this exception
832 844 # is not associated with any execute request.
833 845
834 846 shell = self.shell
835 847 _showtraceback = shell._showtraceback
836 848 try:
837 849 # replace pyerr-sending traceback with stderr
838 850 def print_tb(etype, evalue, stb):
839 851 print ("GUI event loop or pylab initialization failed",
840 852 file=io.stderr)
841 853 print (shell.InteractiveTB.stb2text(stb), file=io.stderr)
842 854 shell._showtraceback = print_tb
843 855 InteractiveShellApp.init_gui_pylab(self)
844 856 finally:
845 857 shell._showtraceback = _showtraceback
846 858
847 859 def init_shell(self):
848 860 self.shell = self.kernel.shell
849 861 self.shell.configurables.append(self)
850 862
851 863
852 864 #-----------------------------------------------------------------------------
853 865 # Kernel main and launch functions
854 866 #-----------------------------------------------------------------------------
855 867
856 868 def launch_kernel(*args, **kwargs):
857 869 """Launches a localhost IPython kernel, binding to the specified ports.
858 870
859 871 This function simply calls entry_point.base_launch_kernel with the right
860 872 first command to start an ipkernel. See base_launch_kernel for arguments.
861 873
862 874 Returns
863 875 -------
864 876 A tuple of form:
865 877 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
866 878 where kernel_process is a Popen object and the ports are integers.
867 879 """
868 880 return base_launch_kernel('from IPython.zmq.ipkernel import main; main()',
869 881 *args, **kwargs)
870 882
871 883
872 884 def embed_kernel(module=None, local_ns=None, **kwargs):
873 885 """Embed and start an IPython kernel in a given scope.
874 886
875 887 Parameters
876 888 ----------
877 889 module : ModuleType, optional
878 890 The module to load into IPython globals (default: caller)
879 891 local_ns : dict, optional
880 892 The namespace to load into IPython user namespace (default: caller)
881 893
882 894 kwargs : various, optional
883 895 Further keyword args are relayed to the KernelApp constructor,
884 896 allowing configuration of the Kernel. Will only have an effect
885 897 on the first embed_kernel call for a given process.
886 898
887 899 """
888 900 # get the app if it exists, or set it up if it doesn't
889 901 if IPKernelApp.initialized():
890 902 app = IPKernelApp.instance()
891 903 else:
892 904 app = IPKernelApp.instance(**kwargs)
893 905 app.initialize([])
894 906 # Undo unnecessary sys module mangling from init_sys_modules.
895 907 # This would not be necessary if we could prevent it
896 908 # in the first place by using a different InteractiveShell
897 909 # subclass, as in the regular embed case.
898 910 main = app.kernel.shell._orig_sys_modules_main_mod
899 911 if main is not None:
900 912 sys.modules[app.kernel.shell._orig_sys_modules_main_name] = main
901 913
902 914 # load the calling scope if not given
903 915 (caller_module, caller_locals) = extract_module_locals(1)
904 916 if module is None:
905 917 module = caller_module
906 918 if local_ns is None:
907 919 local_ns = caller_locals
908 920
909 921 app.kernel.user_module = module
910 922 app.kernel.user_ns = local_ns
911 923 app.shell.set_completer_frame()
912 924 app.start()
913 925
914 926 def main():
915 927 """Run an IPKernel as an application"""
916 928 app = IPKernelApp.instance()
917 929 app.initialize()
918 930 app.start()
919 931
920 932
921 933 if __name__ == '__main__':
922 934 main()
General Comments 0
You need to be logged in to leave comments. Login now