##// END OF EJS Templates
upstream change preventing kernel exit on SIGINT during eventloop integration.
MinRK -
Show More
@@ -1,900 +1,910 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 7 call set_parent on all the PUB objects with the message about to be executed.
8 8 * Implement random port and security key logic.
9 9 * Implement control messages.
10 10 * Implement event loop and poll version.
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 # Standard library imports
19 19 import __builtin__
20 20 import atexit
21 21 import sys
22 22 import time
23 23 import traceback
24 24 import logging
25 25 import uuid
26 26
27 27 from datetime import datetime
28 28 from signal import (
29 29 signal, getsignal, default_int_handler, SIGINT, SIG_IGN
30 30 )
31 31
32 32 # System library imports
33 33 import zmq
34 34 from zmq.eventloop import ioloop
35 35 from zmq.eventloop.zmqstream import ZMQStream
36 36
37 37 # Local imports
38 38 from IPython.core import pylabtools
39 39 from IPython.config.configurable import Configurable
40 40 from IPython.config.application import boolean_flag, catch_config_error
41 41 from IPython.core.application import ProfileDir
42 42 from IPython.core.error import StdinNotImplementedError
43 43 from IPython.core.shellapp import (
44 44 InteractiveShellApp, shell_flags, shell_aliases
45 45 )
46 46 from IPython.utils import io
47 47 from IPython.utils import py3compat
48 48 from IPython.utils.frame import extract_module_locals
49 49 from IPython.utils.jsonutil import json_clean
50 50 from IPython.utils.traitlets import (
51 51 Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode
52 52 )
53 53
54 54 from entry_point import base_launch_kernel
55 55 from kernelapp import KernelApp, kernel_flags, kernel_aliases
56 56 from serialize import serialize_object, unpack_apply_message
57 57 from session import Session, Message
58 58 from zmqshell import ZMQInteractiveShell
59 59
60 60
61 61 #-----------------------------------------------------------------------------
62 62 # Main kernel class
63 63 #-----------------------------------------------------------------------------
64 64
65 65 class Kernel(Configurable):
66 66
67 67 #---------------------------------------------------------------------------
68 68 # Kernel interface
69 69 #---------------------------------------------------------------------------
70 70
71 71 # attribute to override with a GUI
72 72 eventloop = Any(None)
73 73 def _eventloop_changed(self, name, old, new):
74 74 """schedule call to eventloop from IOLoop"""
75 75 loop = ioloop.IOLoop.instance()
76 76 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
77 77
78 78 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
79 79 session = Instance(Session)
80 80 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
81 81 shell_streams = List()
82 82 control_stream = Instance(ZMQStream)
83 83 iopub_socket = Instance(zmq.Socket)
84 84 stdin_socket = Instance(zmq.Socket)
85 85 log = Instance(logging.Logger)
86 86
87 87 user_module = Any()
88 88 def _user_module_changed(self, name, old, new):
89 89 if self.shell is not None:
90 90 self.shell.user_module = new
91 91
92 92 user_ns = Dict(default_value=None)
93 93 def _user_ns_changed(self, name, old, new):
94 94 if self.shell is not None:
95 95 self.shell.user_ns = new
96 96 self.shell.init_user_ns()
97 97
98 98 # identities:
99 99 int_id = Integer(-1)
100 100 ident = Unicode()
101 101
102 102 def _ident_default(self):
103 103 return unicode(uuid.uuid4())
104 104
105 105
106 106 # Private interface
107 107
108 108 # Time to sleep after flushing the stdout/err buffers in each execute
109 109 # cycle. While this introduces a hard limit on the minimal latency of the
110 110 # execute cycle, it helps prevent output synchronization problems for
111 111 # clients.
112 112 # Units are in seconds. The minimum zmq latency on local host is probably
113 113 # ~150 microseconds, set this to 500us for now. We may need to increase it
114 114 # a little if it's not enough after more interactive testing.
115 115 _execute_sleep = Float(0.0005, config=True)
116 116
117 117 # Frequency of the kernel's event loop.
118 118 # Units are in seconds, kernel subclasses for GUI toolkits may need to
119 119 # adapt to milliseconds.
120 120 _poll_interval = Float(0.05, config=True)
121 121
122 122 # If the shutdown was requested over the network, we leave here the
123 123 # necessary reply message so it can be sent by our registered atexit
124 124 # handler. This ensures that the reply is only sent to clients truly at
125 125 # the end of our shutdown process (which happens after the underlying
126 126 # IPython shell's own shutdown).
127 127 _shutdown_message = None
128 128
129 129 # This is a dict of port number that the kernel is listening on. It is set
130 130 # by record_ports and used by connect_request.
131 131 _recorded_ports = Dict()
132 132
133 133 # set of aborted msg_ids
134 134 aborted = Set()
135 135
136 136
137 137 def __init__(self, **kwargs):
138 138 super(Kernel, self).__init__(**kwargs)
139 139
140 140 # Initialize the InteractiveShell subclass
141 141 self.shell = ZMQInteractiveShell.instance(config=self.config,
142 142 profile_dir = self.profile_dir,
143 143 user_module = self.user_module,
144 144 user_ns = self.user_ns,
145 145 )
146 146 self.shell.displayhook.session = self.session
147 147 self.shell.displayhook.pub_socket = self.iopub_socket
148 148 self.shell.displayhook.topic = self._topic('pyout')
149 149 self.shell.display_pub.session = self.session
150 150 self.shell.display_pub.pub_socket = self.iopub_socket
151 151
152 152 # TMP - hack while developing
153 153 self.shell._reply_content = None
154 154
155 155 # Build dict of handlers for message types
156 156 msg_types = [ 'execute_request', 'complete_request',
157 157 'object_info_request', 'history_request',
158 158 'connect_request', 'shutdown_request',
159 159 'apply_request',
160 160 ]
161 161 self.shell_handlers = {}
162 162 for msg_type in msg_types:
163 163 self.shell_handlers[msg_type] = getattr(self, msg_type)
164 164
165 165 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
166 166 self.control_handlers = {}
167 167 for msg_type in control_msg_types:
168 168 self.control_handlers[msg_type] = getattr(self, msg_type)
169 169
170 170 def dispatch_control(self, msg):
171 171 """dispatch control requests"""
172 172 idents,msg = self.session.feed_identities(msg, copy=False)
173 173 try:
174 174 msg = self.session.unserialize(msg, content=True, copy=False)
175 175 except:
176 176 self.log.error("Invalid Control Message", exc_info=True)
177 177 return
178 178
179 179 self.log.debug("Control received: %s", msg)
180 180
181 181 header = msg['header']
182 182 msg_id = header['msg_id']
183 183 msg_type = header['msg_type']
184 184
185 185 handler = self.control_handlers.get(msg_type, None)
186 186 if handler is None:
187 187 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
188 188 else:
189 189 try:
190 190 handler(self.control_stream, idents, msg)
191 191 except Exception:
192 192 self.log.error("Exception in control handler:", exc_info=True)
193 193
194 194 def dispatch_shell(self, stream, msg):
195 195 """dispatch shell requests"""
196 196 # flush control requests first
197 197 if self.control_stream:
198 198 self.control_stream.flush()
199 199
200 200 idents,msg = self.session.feed_identities(msg, copy=False)
201 201 try:
202 202 msg = self.session.unserialize(msg, content=True, copy=False)
203 203 except:
204 204 self.log.error("Invalid Message", exc_info=True)
205 205 return
206 206
207 207 header = msg['header']
208 208 msg_id = header['msg_id']
209 209 msg_type = msg['header']['msg_type']
210 210
211 211 # Print some info about this message and leave a '--->' marker, so it's
212 212 # easier to trace visually the message chain when debugging. Each
213 213 # handler prints its message at the end.
214 214 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
215 215 self.log.debug(' Content: %s\n --->\n ', msg['content'])
216 216
217 217 if msg_id in self.aborted:
218 218 self.aborted.remove(msg_id)
219 219 # is it safe to assume a msg_id will not be resubmitted?
220 220 reply_type = msg_type.split('_')[0] + '_reply'
221 221 status = {'status' : 'aborted'}
222 222 sub = {'engine' : self.ident}
223 223 sub.update(status)
224 224 reply_msg = self.session.send(stream, reply_type, subheader=sub,
225 225 content=status, parent=msg, ident=idents)
226 226 return
227 227
228 228 handler = self.shell_handlers.get(msg_type, None)
229 229 if handler is None:
230 230 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
231 231 else:
232 232 # ensure default_int_handler during handler call
233 233 sig = signal(SIGINT, default_int_handler)
234 234 try:
235 235 handler(stream, idents, msg)
236 236 except Exception:
237 237 self.log.error("Exception in message handler:", exc_info=True)
238 238 finally:
239 239 signal(SIGINT, sig)
240 240
241 241 def enter_eventloop(self):
242 242 """enter eventloop"""
243 243 self.log.critical("entering eventloop")
244 244 # restore default_int_handler
245 245 signal(SIGINT, default_int_handler)
246 while self.eventloop is not None:
247 try:
246 248 self.eventloop(self)
249 except KeyboardInterrupt:
250 # Ctrl-C shouldn't crash the kernel
251 self.log.error("KeyboardInterrupt caught in kernel")
252 continue
253 else:
254 # eventloop exited cleanly, this means we should stop (right?)
255 self.eventloop = None
256 break
247 257 self.log.critical("exiting eventloop")
248 258 # if eventloop exits, IOLoop should stop
249 259 ioloop.IOLoop.instance().stop()
250 260
251 261 def start(self):
252 262 """register dispatchers for streams"""
253 263 self.shell.exit_now = False
254 264 if self.control_stream:
255 265 self.control_stream.on_recv(self.dispatch_control, copy=False)
256 266
257 267 def make_dispatcher(stream):
258 268 def dispatcher(msg):
259 269 return self.dispatch_shell(stream, msg)
260 270 return dispatcher
261 271
262 272 for s in self.shell_streams:
263 273 s.on_recv(make_dispatcher(s), copy=False)
264 274
265 275 def do_one_iteration(self):
266 276 """step eventloop just once"""
267 277 if self.control_stream:
268 278 self.control_stream.flush()
269 279 for stream in self.shell_streams:
270 280 # handle at most one request per iteration
271 281 stream.flush(zmq.POLLIN, 1)
272 282 stream.flush(zmq.POLLOUT)
273 283
274 284
275 285 def record_ports(self, ports):
276 286 """Record the ports that this kernel is using.
277 287
278 288 The creator of the Kernel instance must call this methods if they
279 289 want the :meth:`connect_request` method to return the port numbers.
280 290 """
281 291 self._recorded_ports = ports
282 292
283 293 #---------------------------------------------------------------------------
284 294 # Kernel request handlers
285 295 #---------------------------------------------------------------------------
286 296
287 297 def _make_subheader(self):
288 298 """init subheader dict, for execute/apply_reply"""
289 299 return {
290 300 'dependencies_met' : True,
291 301 'engine' : self.ident,
292 302 'started': datetime.now(),
293 303 }
294 304
295 305 def _publish_pyin(self, code, parent, execution_count):
296 306 """Publish the code request on the pyin stream."""
297 307
298 308 self.session.send(self.iopub_socket, u'pyin',
299 309 {u'code':code, u'execution_count': execution_count},
300 310 parent=parent, ident=self._topic('pyin')
301 311 )
302 312
303 313 def execute_request(self, stream, ident, parent):
304 314
305 315 self.session.send(self.iopub_socket,
306 316 u'status',
307 317 {u'execution_state':u'busy'},
308 318 parent=parent,
309 319 ident=self._topic('status'),
310 320 )
311 321
312 322 try:
313 323 content = parent[u'content']
314 324 code = content[u'code']
315 325 silent = content[u'silent']
316 326 except:
317 327 self.log.error("Got bad msg: ")
318 328 self.log.error("%s", parent)
319 329 return
320 330
321 331 sub = self._make_subheader()
322 332
323 333 shell = self.shell # we'll need this a lot here
324 334
325 335 # Replace raw_input. Note that is not sufficient to replace
326 336 # raw_input in the user namespace.
327 337 if content.get('allow_stdin', False):
328 338 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
329 339 else:
330 340 raw_input = lambda prompt='' : self._no_raw_input()
331 341
332 342 if py3compat.PY3:
333 343 __builtin__.input = raw_input
334 344 else:
335 345 __builtin__.raw_input = raw_input
336 346
337 347 # Set the parent message of the display hook and out streams.
338 348 shell.displayhook.set_parent(parent)
339 349 shell.display_pub.set_parent(parent)
340 350 sys.stdout.set_parent(parent)
341 351 sys.stderr.set_parent(parent)
342 352
343 353 # Re-broadcast our input for the benefit of listening clients, and
344 354 # start computing output
345 355 if not silent:
346 356 self._publish_pyin(code, parent, shell.execution_count)
347 357
348 358 reply_content = {}
349 359 try:
350 360 # FIXME: the shell calls the exception handler itself.
351 361 shell.run_cell(code, store_history=not silent, silent=silent)
352 362 except:
353 363 status = u'error'
354 364 # FIXME: this code right now isn't being used yet by default,
355 365 # because the run_cell() call above directly fires off exception
356 366 # reporting. This code, therefore, is only active in the scenario
357 367 # where runlines itself has an unhandled exception. We need to
358 368 # uniformize this, for all exception construction to come from a
359 369 # single location in the codbase.
360 370 etype, evalue, tb = sys.exc_info()
361 371 tb_list = traceback.format_exception(etype, evalue, tb)
362 372 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
363 373 else:
364 374 status = u'ok'
365 375
366 376 reply_content[u'status'] = status
367 377
368 378 # Return the execution counter so clients can display prompts
369 379 reply_content['execution_count'] = shell.execution_count - 1
370 380
371 381 # FIXME - fish exception info out of shell, possibly left there by
372 382 # runlines. We'll need to clean up this logic later.
373 383 if shell._reply_content is not None:
374 384 reply_content.update(shell._reply_content)
375 385 # reset after use
376 386 shell._reply_content = None
377 387
378 388 # At this point, we can tell whether the main code execution succeeded
379 389 # or not. If it did, we proceed to evaluate user_variables/expressions
380 390 if reply_content['status'] == 'ok':
381 391 reply_content[u'user_variables'] = \
382 392 shell.user_variables(content.get(u'user_variables', []))
383 393 reply_content[u'user_expressions'] = \
384 394 shell.user_expressions(content.get(u'user_expressions', {}))
385 395 else:
386 396 # If there was an error, don't even try to compute variables or
387 397 # expressions
388 398 reply_content[u'user_variables'] = {}
389 399 reply_content[u'user_expressions'] = {}
390 400
391 401 # Payloads should be retrieved regardless of outcome, so we can both
392 402 # recover partial output (that could have been generated early in a
393 403 # block, before an error) and clear the payload system always.
394 404 reply_content[u'payload'] = shell.payload_manager.read_payload()
395 405 # Be agressive about clearing the payload because we don't want
396 406 # it to sit in memory until the next execute_request comes in.
397 407 shell.payload_manager.clear_payload()
398 408
399 409 # Flush output before sending the reply.
400 410 sys.stdout.flush()
401 411 sys.stderr.flush()
402 412 # FIXME: on rare occasions, the flush doesn't seem to make it to the
403 413 # clients... This seems to mitigate the problem, but we definitely need
404 414 # to better understand what's going on.
405 415 if self._execute_sleep:
406 416 time.sleep(self._execute_sleep)
407 417
408 418 # Send the reply.
409 419 reply_content = json_clean(reply_content)
410 420
411 421 sub['status'] = reply_content['status']
412 422 if reply_content['status'] == 'error' and \
413 423 reply_content['ename'] == 'UnmetDependency':
414 424 sub['dependencies_met'] = False
415 425
416 426 reply_msg = self.session.send(stream, u'execute_reply',
417 427 reply_content, parent, subheader=sub,
418 428 ident=ident)
419 429
420 430 self.log.debug("%s", reply_msg)
421 431
422 432 if not silent and reply_msg['content']['status'] == u'error':
423 433 self._abort_queues()
424 434
425 435 self.session.send(self.iopub_socket,
426 436 u'status',
427 437 {u'execution_state':u'idle'},
428 438 parent=parent,
429 439 ident=self._topic('status'))
430 440
431 441 def complete_request(self, stream, ident, parent):
432 442 txt, matches = self._complete(parent)
433 443 matches = {'matches' : matches,
434 444 'matched_text' : txt,
435 445 'status' : 'ok'}
436 446 matches = json_clean(matches)
437 447 completion_msg = self.session.send(stream, 'complete_reply',
438 448 matches, parent, ident)
439 449 self.log.debug("%s", completion_msg)
440 450
441 451 def object_info_request(self, stream, ident, parent):
442 452 content = parent['content']
443 453 object_info = self.shell.object_inspect(content['oname'],
444 454 detail_level = content.get('detail_level', 0)
445 455 )
446 456 # Before we send this object over, we scrub it for JSON usage
447 457 oinfo = json_clean(object_info)
448 458 msg = self.session.send(stream, 'object_info_reply',
449 459 oinfo, parent, ident)
450 460 self.log.debug("%s", msg)
451 461
452 462 def history_request(self, stream, ident, parent):
453 463 # We need to pull these out, as passing **kwargs doesn't work with
454 464 # unicode keys before Python 2.6.5.
455 465 hist_access_type = parent['content']['hist_access_type']
456 466 raw = parent['content']['raw']
457 467 output = parent['content']['output']
458 468 if hist_access_type == 'tail':
459 469 n = parent['content']['n']
460 470 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
461 471 include_latest=True)
462 472
463 473 elif hist_access_type == 'range':
464 474 session = parent['content']['session']
465 475 start = parent['content']['start']
466 476 stop = parent['content']['stop']
467 477 hist = self.shell.history_manager.get_range(session, start, stop,
468 478 raw=raw, output=output)
469 479
470 480 elif hist_access_type == 'search':
471 481 pattern = parent['content']['pattern']
472 482 hist = self.shell.history_manager.search(pattern, raw=raw,
473 483 output=output)
474 484
475 485 else:
476 486 hist = []
477 487 hist = list(hist)
478 488 content = {'history' : hist}
479 489 content = json_clean(content)
480 490 msg = self.session.send(stream, 'history_reply',
481 491 content, parent, ident)
482 492 self.log.debug("Sending history reply with %i entries", len(hist))
483 493
484 494 def connect_request(self, stream, ident, parent):
485 495 if self._recorded_ports is not None:
486 496 content = self._recorded_ports.copy()
487 497 else:
488 498 content = {}
489 499 msg = self.session.send(stream, 'connect_reply',
490 500 content, parent, ident)
491 501 self.log.debug("%s", msg)
492 502
493 503 def shutdown_request(self, stream, ident, parent):
494 504 self.shell.exit_now = True
495 505 content = dict(status='ok')
496 506 content.update(parent['content'])
497 507 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
498 508 # same content, but different msg_id for broadcasting on IOPub
499 509 self._shutdown_message = self.session.msg(u'shutdown_reply',
500 510 content, parent
501 511 )
502 512
503 513 self._at_shutdown()
504 514 # call sys.exit after a short delay
505 515 loop = ioloop.IOLoop.instance()
506 516 loop.add_timeout(time.time()+0.1, loop.stop)
507 517
508 518 #---------------------------------------------------------------------------
509 519 # Engine methods
510 520 #---------------------------------------------------------------------------
511 521
512 522 def apply_request(self, stream, ident, parent):
513 523 try:
514 524 content = parent[u'content']
515 525 bufs = parent[u'buffers']
516 526 msg_id = parent['header']['msg_id']
517 527 except:
518 528 self.log.error("Got bad msg: %s", parent, exc_info=True)
519 529 return
520 530 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
521 531 # self.iopub_socket.send(pyin_msg)
522 532 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
523 533 sub = self._make_subheader()
524 534 try:
525 535 # allow for not overriding displayhook
526 536 if hasattr(sys.displayhook, 'set_parent'):
527 537 sys.displayhook.set_parent(parent)
528 538 sys.stdout.set_parent(parent)
529 539 sys.stderr.set_parent(parent)
530 540 working = self.shell.user_ns
531 541
532 542 prefix = "_"+str(msg_id).replace("-","")+"_"
533 543
534 544 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
535 545
536 546 fname = getattr(f, '__name__', 'f')
537 547
538 548 fname = prefix+"f"
539 549 argname = prefix+"args"
540 550 kwargname = prefix+"kwargs"
541 551 resultname = prefix+"result"
542 552
543 553 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
544 554 # print ns
545 555 working.update(ns)
546 556 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
547 557 try:
548 558 exec code in self.shell.user_global_ns, self.shell.user_ns
549 559 result = working.get(resultname)
550 560 finally:
551 561 for key in ns.iterkeys():
552 562 working.pop(key)
553 563
554 564 packed_result,buf = serialize_object(result)
555 565 result_buf = [packed_result]+buf
556 566 except:
557 567 exc_content = self._wrap_exception('apply')
558 568 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
559 569 self.session.send(self.iopub_socket, u'pyerr', exc_content, parent=parent,
560 570 ident=self._topic('pyerr'))
561 571 reply_content = exc_content
562 572 result_buf = []
563 573
564 574 if exc_content['ename'] == 'UnmetDependency':
565 575 sub['dependencies_met'] = False
566 576 else:
567 577 reply_content = {'status' : 'ok'}
568 578
569 579 # put 'ok'/'error' status in header, for scheduler introspection:
570 580 sub['status'] = reply_content['status']
571 581
572 582 # flush i/o
573 583 sys.stdout.flush()
574 584 sys.stderr.flush()
575 585
576 586 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
577 587 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
578 588
579 589 #---------------------------------------------------------------------------
580 590 # Control messages
581 591 #---------------------------------------------------------------------------
582 592
583 593 def abort_request(self, stream, ident, parent):
584 594 """abort a specifig msg by id"""
585 595 msg_ids = parent['content'].get('msg_ids', None)
586 596 if isinstance(msg_ids, basestring):
587 597 msg_ids = [msg_ids]
588 598 if not msg_ids:
589 599 self.abort_queues()
590 600 for mid in msg_ids:
591 601 self.aborted.add(str(mid))
592 602
593 603 content = dict(status='ok')
594 604 reply_msg = self.session.send(stream, 'abort_reply', content=content,
595 605 parent=parent, ident=ident)
596 606 self.log.debug("%s", reply_msg)
597 607
598 608 def clear_request(self, stream, idents, parent):
599 609 """Clear our namespace."""
600 610 self.shell.reset(False)
601 611 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
602 612 content = dict(status='ok'))
603 613
604 614
605 615 #---------------------------------------------------------------------------
606 616 # Protected interface
607 617 #---------------------------------------------------------------------------
608 618
609 619
610 620 def _wrap_exception(self, method=None):
611 621 # import here, because _wrap_exception is only used in parallel,
612 622 # and parallel has higher min pyzmq version
613 623 from IPython.parallel.error import wrap_exception
614 624 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
615 625 content = wrap_exception(e_info)
616 626 return content
617 627
618 628 def _topic(self, topic):
619 629 """prefixed topic for IOPub messages"""
620 630 if self.int_id >= 0:
621 631 base = "engine.%i" % self.int_id
622 632 else:
623 633 base = "kernel.%s" % self.ident
624 634
625 635 return py3compat.cast_bytes("%s.%s" % (base, topic))
626 636
627 637 def _abort_queues(self):
628 638 for stream in self.shell_streams:
629 639 if stream:
630 640 self._abort_queue(stream)
631 641
632 642 def _abort_queue(self, stream):
633 643 poller = zmq.Poller()
634 644 poller.register(stream.socket, zmq.POLLIN)
635 645 while True:
636 646 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
637 647 if msg is None:
638 648 return
639 649
640 650 self.log.info("Aborting:")
641 651 self.log.info("%s", msg)
642 652 msg_type = msg['header']['msg_type']
643 653 reply_type = msg_type.split('_')[0] + '_reply'
644 654
645 655 status = {'status' : 'aborted'}
646 656 sub = {'engine' : self.ident}
647 657 sub.update(status)
648 658 reply_msg = self.session.send(stream, reply_type, subheader=sub,
649 659 content=status, parent=msg, ident=idents)
650 660 self.log.debug("%s", reply_msg)
651 661 # We need to wait a bit for requests to come in. This can probably
652 662 # be set shorter for true asynchronous clients.
653 663 poller.poll(50)
654 664
655 665
656 666 def _no_raw_input(self):
657 667 """Raise StdinNotImplentedError if active frontend doesn't support
658 668 stdin."""
659 669 raise StdinNotImplementedError("raw_input was called, but this "
660 670 "frontend does not support stdin.")
661 671
662 672 def _raw_input(self, prompt, ident, parent):
663 673 # Flush output before making the request.
664 674 sys.stderr.flush()
665 675 sys.stdout.flush()
666 676
667 677 # Send the input request.
668 678 content = json_clean(dict(prompt=prompt))
669 679 self.session.send(self.stdin_socket, u'input_request', content, parent,
670 680 ident=ident)
671 681
672 682 # Await a response.
673 683 while True:
674 684 try:
675 685 ident, reply = self.session.recv(self.stdin_socket, 0)
676 686 except Exception:
677 687 self.log.warn("Invalid Message:", exc_info=True)
678 688 else:
679 689 break
680 690 try:
681 691 value = reply['content']['value']
682 692 except:
683 693 self.log.error("Got bad raw_input reply: ")
684 694 self.log.error("%s", parent)
685 695 value = ''
686 696 if value == '\x04':
687 697 # EOF
688 698 raise EOFError
689 699 return value
690 700
691 701 def _complete(self, msg):
692 702 c = msg['content']
693 703 try:
694 704 cpos = int(c['cursor_pos'])
695 705 except:
696 706 # If we don't get something that we can convert to an integer, at
697 707 # least attempt the completion guessing the cursor is at the end of
698 708 # the text, if there's any, and otherwise of the line
699 709 cpos = len(c['text'])
700 710 if cpos==0:
701 711 cpos = len(c['line'])
702 712 return self.shell.complete(c['text'], c['line'], cpos)
703 713
704 714 def _object_info(self, context):
705 715 symbol, leftover = self._symbol_from_context(context)
706 716 if symbol is not None and not leftover:
707 717 doc = getattr(symbol, '__doc__', '')
708 718 else:
709 719 doc = ''
710 720 object_info = dict(docstring = doc)
711 721 return object_info
712 722
713 723 def _symbol_from_context(self, context):
714 724 if not context:
715 725 return None, context
716 726
717 727 base_symbol_string = context[0]
718 728 symbol = self.shell.user_ns.get(base_symbol_string, None)
719 729 if symbol is None:
720 730 symbol = __builtin__.__dict__.get(base_symbol_string, None)
721 731 if symbol is None:
722 732 return None, context
723 733
724 734 context = context[1:]
725 735 for i, name in enumerate(context):
726 736 new_symbol = getattr(symbol, name, None)
727 737 if new_symbol is None:
728 738 return symbol, context[i:]
729 739 else:
730 740 symbol = new_symbol
731 741
732 742 return symbol, []
733 743
734 744 def _at_shutdown(self):
735 745 """Actions taken at shutdown by the kernel, called by python's atexit.
736 746 """
737 747 # io.rprint("Kernel at_shutdown") # dbg
738 748 if self._shutdown_message is not None:
739 749 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
740 750 self.log.debug("%s", self._shutdown_message)
741 751 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
742 752
743 753 #-----------------------------------------------------------------------------
744 754 # Aliases and Flags for the IPKernelApp
745 755 #-----------------------------------------------------------------------------
746 756
747 757 flags = dict(kernel_flags)
748 758 flags.update(shell_flags)
749 759
750 760 addflag = lambda *args: flags.update(boolean_flag(*args))
751 761
752 762 flags['pylab'] = (
753 763 {'IPKernelApp' : {'pylab' : 'auto'}},
754 764 """Pre-load matplotlib and numpy for interactive use with
755 765 the default matplotlib backend."""
756 766 )
757 767
758 768 aliases = dict(kernel_aliases)
759 769 aliases.update(shell_aliases)
760 770
761 771 # it's possible we don't want short aliases for *all* of these:
762 772 aliases.update(dict(
763 773 pylab='IPKernelApp.pylab',
764 774 ))
765 775
766 776 #-----------------------------------------------------------------------------
767 777 # The IPKernelApp class
768 778 #-----------------------------------------------------------------------------
769 779
770 780 class IPKernelApp(KernelApp, InteractiveShellApp):
771 781 name = 'ipkernel'
772 782
773 783 aliases = Dict(aliases)
774 784 flags = Dict(flags)
775 785 classes = [Kernel, ZMQInteractiveShell, ProfileDir, Session]
776 786
777 787 # configurables
778 788 pylab = CaselessStrEnum(['tk', 'qt', 'wx', 'gtk', 'osx', 'inline', 'auto'],
779 789 config=True,
780 790 help="""Pre-load matplotlib and numpy for interactive use,
781 791 selecting a particular matplotlib backend and loop integration.
782 792 """
783 793 )
784 794
785 795 @catch_config_error
786 796 def initialize(self, argv=None):
787 797 super(IPKernelApp, self).initialize(argv)
788 798 self.init_path()
789 799 self.init_shell()
790 800 self.init_extensions()
791 801 self.init_code()
792 802
793 803 def init_kernel(self):
794 804
795 805 shell_stream = ZMQStream(self.shell_socket)
796 806
797 807 kernel = Kernel(config=self.config, session=self.session,
798 808 shell_streams=[shell_stream],
799 809 iopub_socket=self.iopub_socket,
800 810 stdin_socket=self.stdin_socket,
801 811 log=self.log,
802 812 profile_dir=self.profile_dir,
803 813 )
804 814 self.kernel = kernel
805 815 kernel.record_ports(self.ports)
806 816 shell = kernel.shell
807 817 if self.pylab:
808 818 try:
809 819 gui, backend = pylabtools.find_gui_and_backend(self.pylab)
810 820 shell.enable_pylab(gui, import_all=self.pylab_import_all)
811 821 except Exception:
812 822 self.log.error("Pylab initialization failed", exc_info=True)
813 823 # print exception straight to stdout, because normally
814 824 # _showtraceback associates the reply with an execution,
815 825 # which means frontends will never draw it, as this exception
816 826 # is not associated with any execute request.
817 827
818 828 # replace pyerr-sending traceback with stdout
819 829 _showtraceback = shell._showtraceback
820 830 def print_tb(etype, evalue, stb):
821 831 print ("Error initializing pylab, pylab mode will not "
822 832 "be active", file=io.stderr)
823 833 print (shell.InteractiveTB.stb2text(stb), file=io.stdout)
824 834 shell._showtraceback = print_tb
825 835
826 836 # send the traceback over stdout
827 837 shell.showtraceback(tb_offset=0)
828 838
829 839 # restore proper _showtraceback method
830 840 shell._showtraceback = _showtraceback
831 841
832 842
833 843 def init_shell(self):
834 844 self.shell = self.kernel.shell
835 845 self.shell.configurables.append(self)
836 846
837 847
838 848 #-----------------------------------------------------------------------------
839 849 # Kernel main and launch functions
840 850 #-----------------------------------------------------------------------------
841 851
842 852 def launch_kernel(*args, **kwargs):
843 853 """Launches a localhost IPython kernel, binding to the specified ports.
844 854
845 855 This function simply calls entry_point.base_launch_kernel with the right
846 856 first command to start an ipkernel. See base_launch_kernel for arguments.
847 857
848 858 Returns
849 859 -------
850 860 A tuple of form:
851 861 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
852 862 where kernel_process is a Popen object and the ports are integers.
853 863 """
854 864 return base_launch_kernel('from IPython.zmq.ipkernel import main; main()',
855 865 *args, **kwargs)
856 866
857 867
858 868 def embed_kernel(module=None, local_ns=None, **kwargs):
859 869 """Embed and start an IPython kernel in a given scope.
860 870
861 871 Parameters
862 872 ----------
863 873 module : ModuleType, optional
864 874 The module to load into IPython globals (default: caller)
865 875 local_ns : dict, optional
866 876 The namespace to load into IPython user namespace (default: caller)
867 877
868 878 kwargs : various, optional
869 879 Further keyword args are relayed to the KernelApp constructor,
870 880 allowing configuration of the Kernel. Will only have an effect
871 881 on the first embed_kernel call for a given process.
872 882
873 883 """
874 884 # get the app if it exists, or set it up if it doesn't
875 885 if IPKernelApp.initialized():
876 886 app = IPKernelApp.instance()
877 887 else:
878 888 app = IPKernelApp.instance(**kwargs)
879 889 app.initialize([])
880 890
881 891 # load the calling scope if not given
882 892 (caller_module, caller_locals) = extract_module_locals(1)
883 893 if module is None:
884 894 module = caller_module
885 895 if local_ns is None:
886 896 local_ns = caller_locals
887 897
888 898 app.kernel.user_module = module
889 899 app.kernel.user_ns = local_ns
890 900 app.start()
891 901
892 902 def main():
893 903 """Run an IPKernel as an application"""
894 904 app = IPKernelApp.instance()
895 905 app.initialize()
896 906 app.start()
897 907
898 908
899 909 if __name__ == '__main__':
900 910 main()
General Comments 0
You need to be logged in to leave comments. Login now