##// END OF EJS Templates
use IPython traceback formatting in apply requests
MinRK -
Show More
@@ -1,905 +1,915 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 7 call set_parent on all the PUB objects with the message about to be executed.
8 8 * Implement random port and security key logic.
9 9 * Implement control messages.
10 10 * Implement event loop and poll version.
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 # Standard library imports
19 19 import __builtin__
20 20 import atexit
21 21 import sys
22 22 import time
23 23 import traceback
24 24 import logging
25 25 import uuid
26 26
27 27 from datetime import datetime
28 28 from signal import (
29 29 signal, getsignal, default_int_handler, SIGINT, SIG_IGN
30 30 )
31 31
32 32 # System library imports
33 33 import zmq
34 34 from zmq.eventloop import ioloop
35 35 from zmq.eventloop.zmqstream import ZMQStream
36 36
37 37 # Local imports
38 38 from IPython.config.configurable import Configurable
39 39 from IPython.config.application import boolean_flag, catch_config_error
40 40 from IPython.core.application import ProfileDir
41 41 from IPython.core.error import StdinNotImplementedError
42 42 from IPython.core.shellapp import (
43 43 InteractiveShellApp, shell_flags, shell_aliases
44 44 )
45 45 from IPython.utils import io
46 46 from IPython.utils import py3compat
47 47 from IPython.utils.frame import extract_module_locals
48 48 from IPython.utils.jsonutil import json_clean
49 49 from IPython.utils.traitlets import (
50 50 Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode
51 51 )
52 52
53 53 from entry_point import base_launch_kernel
54 54 from kernelapp import KernelApp, kernel_flags, kernel_aliases
55 55 from serialize import serialize_object, unpack_apply_message
56 56 from session import Session, Message
57 57 from zmqshell import ZMQInteractiveShell
58 58
59 59
60 60 #-----------------------------------------------------------------------------
61 61 # Main kernel class
62 62 #-----------------------------------------------------------------------------
63 63
64 64 class Kernel(Configurable):
65 65
66 66 #---------------------------------------------------------------------------
67 67 # Kernel interface
68 68 #---------------------------------------------------------------------------
69 69
70 70 # attribute to override with a GUI
71 71 eventloop = Any(None)
72 72 def _eventloop_changed(self, name, old, new):
73 73 """schedule call to eventloop from IOLoop"""
74 74 loop = ioloop.IOLoop.instance()
75 75 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
76 76
77 77 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
78 78 session = Instance(Session)
79 79 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
80 80 shell_streams = List()
81 81 control_stream = Instance(ZMQStream)
82 82 iopub_socket = Instance(zmq.Socket)
83 83 stdin_socket = Instance(zmq.Socket)
84 84 log = Instance(logging.Logger)
85 85
86 86 user_module = Any()
87 87 def _user_module_changed(self, name, old, new):
88 88 if self.shell is not None:
89 89 self.shell.user_module = new
90 90
91 91 user_ns = Dict(default_value=None)
92 92 def _user_ns_changed(self, name, old, new):
93 93 if self.shell is not None:
94 94 self.shell.user_ns = new
95 95 self.shell.init_user_ns()
96 96
97 97 # identities:
98 98 int_id = Integer(-1)
99 99 ident = Unicode()
100 100
101 101 def _ident_default(self):
102 102 return unicode(uuid.uuid4())
103 103
104 104
105 105 # Private interface
106 106
107 107 # Time to sleep after flushing the stdout/err buffers in each execute
108 108 # cycle. While this introduces a hard limit on the minimal latency of the
109 109 # execute cycle, it helps prevent output synchronization problems for
110 110 # clients.
111 111 # Units are in seconds. The minimum zmq latency on local host is probably
112 112 # ~150 microseconds, set this to 500us for now. We may need to increase it
113 113 # a little if it's not enough after more interactive testing.
114 114 _execute_sleep = Float(0.0005, config=True)
115 115
116 116 # Frequency of the kernel's event loop.
117 117 # Units are in seconds, kernel subclasses for GUI toolkits may need to
118 118 # adapt to milliseconds.
119 119 _poll_interval = Float(0.05, config=True)
120 120
121 121 # If the shutdown was requested over the network, we leave here the
122 122 # necessary reply message so it can be sent by our registered atexit
123 123 # handler. This ensures that the reply is only sent to clients truly at
124 124 # the end of our shutdown process (which happens after the underlying
125 125 # IPython shell's own shutdown).
126 126 _shutdown_message = None
127 127
128 128 # This is a dict of port number that the kernel is listening on. It is set
129 129 # by record_ports and used by connect_request.
130 130 _recorded_ports = Dict()
131 131
132 132 # set of aborted msg_ids
133 133 aborted = Set()
134 134
135 135
136 136 def __init__(self, **kwargs):
137 137 super(Kernel, self).__init__(**kwargs)
138 138
139 139 # Initialize the InteractiveShell subclass
140 140 self.shell = ZMQInteractiveShell.instance(config=self.config,
141 141 profile_dir = self.profile_dir,
142 142 user_module = self.user_module,
143 143 user_ns = self.user_ns,
144 144 )
145 145 self.shell.displayhook.session = self.session
146 146 self.shell.displayhook.pub_socket = self.iopub_socket
147 147 self.shell.displayhook.topic = self._topic('pyout')
148 148 self.shell.display_pub.session = self.session
149 149 self.shell.display_pub.pub_socket = self.iopub_socket
150 150
151 151 # TMP - hack while developing
152 152 self.shell._reply_content = None
153 153
154 154 # Build dict of handlers for message types
155 155 msg_types = [ 'execute_request', 'complete_request',
156 156 'object_info_request', 'history_request',
157 157 'connect_request', 'shutdown_request',
158 158 'apply_request',
159 159 ]
160 160 self.shell_handlers = {}
161 161 for msg_type in msg_types:
162 162 self.shell_handlers[msg_type] = getattr(self, msg_type)
163 163
164 164 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
165 165 self.control_handlers = {}
166 166 for msg_type in control_msg_types:
167 167 self.control_handlers[msg_type] = getattr(self, msg_type)
168 168
169 169 def dispatch_control(self, msg):
170 170 """dispatch control requests"""
171 171 idents,msg = self.session.feed_identities(msg, copy=False)
172 172 try:
173 173 msg = self.session.unserialize(msg, content=True, copy=False)
174 174 except:
175 175 self.log.error("Invalid Control Message", exc_info=True)
176 176 return
177 177
178 178 self.log.debug("Control received: %s", msg)
179 179
180 180 header = msg['header']
181 181 msg_id = header['msg_id']
182 182 msg_type = header['msg_type']
183 183
184 184 handler = self.control_handlers.get(msg_type, None)
185 185 if handler is None:
186 186 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
187 187 else:
188 188 try:
189 189 handler(self.control_stream, idents, msg)
190 190 except Exception:
191 191 self.log.error("Exception in control handler:", exc_info=True)
192 192
193 193 def dispatch_shell(self, stream, msg):
194 194 """dispatch shell requests"""
195 195 # flush control requests first
196 196 if self.control_stream:
197 197 self.control_stream.flush()
198 198
199 199 idents,msg = self.session.feed_identities(msg, copy=False)
200 200 try:
201 201 msg = self.session.unserialize(msg, content=True, copy=False)
202 202 except:
203 203 self.log.error("Invalid Message", exc_info=True)
204 204 return
205 205
206 206 header = msg['header']
207 207 msg_id = header['msg_id']
208 208 msg_type = msg['header']['msg_type']
209 209
210 210 # Print some info about this message and leave a '--->' marker, so it's
211 211 # easier to trace visually the message chain when debugging. Each
212 212 # handler prints its message at the end.
213 213 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
214 214 self.log.debug(' Content: %s\n --->\n ', msg['content'])
215 215
216 216 if msg_id in self.aborted:
217 217 self.aborted.remove(msg_id)
218 218 # is it safe to assume a msg_id will not be resubmitted?
219 219 reply_type = msg_type.split('_')[0] + '_reply'
220 220 status = {'status' : 'aborted'}
221 221 sub = {'engine' : self.ident}
222 222 sub.update(status)
223 223 reply_msg = self.session.send(stream, reply_type, subheader=sub,
224 224 content=status, parent=msg, ident=idents)
225 225 return
226 226
227 227 handler = self.shell_handlers.get(msg_type, None)
228 228 if handler is None:
229 229 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
230 230 else:
231 231 # ensure default_int_handler during handler call
232 232 sig = signal(SIGINT, default_int_handler)
233 233 try:
234 234 handler(stream, idents, msg)
235 235 except Exception:
236 236 self.log.error("Exception in message handler:", exc_info=True)
237 237 finally:
238 238 signal(SIGINT, sig)
239 239
240 240 def enter_eventloop(self):
241 241 """enter eventloop"""
242 242 self.log.info("entering eventloop")
243 243 # restore default_int_handler
244 244 signal(SIGINT, default_int_handler)
245 245 while self.eventloop is not None:
246 246 try:
247 247 self.eventloop(self)
248 248 except KeyboardInterrupt:
249 249 # Ctrl-C shouldn't crash the kernel
250 250 self.log.error("KeyboardInterrupt caught in kernel")
251 251 continue
252 252 else:
253 253 # eventloop exited cleanly, this means we should stop (right?)
254 254 self.eventloop = None
255 255 break
256 256 self.log.info("exiting eventloop")
257 257 # if eventloop exits, IOLoop should stop
258 258 ioloop.IOLoop.instance().stop()
259 259
260 260 def start(self):
261 261 """register dispatchers for streams"""
262 262 self.shell.exit_now = False
263 263 if self.control_stream:
264 264 self.control_stream.on_recv(self.dispatch_control, copy=False)
265 265
266 266 def make_dispatcher(stream):
267 267 def dispatcher(msg):
268 268 return self.dispatch_shell(stream, msg)
269 269 return dispatcher
270 270
271 271 for s in self.shell_streams:
272 272 s.on_recv(make_dispatcher(s), copy=False)
273 273
274 274 def do_one_iteration(self):
275 275 """step eventloop just once"""
276 276 if self.control_stream:
277 277 self.control_stream.flush()
278 278 for stream in self.shell_streams:
279 279 # handle at most one request per iteration
280 280 stream.flush(zmq.POLLIN, 1)
281 281 stream.flush(zmq.POLLOUT)
282 282
283 283
284 284 def record_ports(self, ports):
285 285 """Record the ports that this kernel is using.
286 286
287 287 The creator of the Kernel instance must call this methods if they
288 288 want the :meth:`connect_request` method to return the port numbers.
289 289 """
290 290 self._recorded_ports = ports
291 291
292 292 #---------------------------------------------------------------------------
293 293 # Kernel request handlers
294 294 #---------------------------------------------------------------------------
295 295
296 296 def _make_subheader(self):
297 297 """init subheader dict, for execute/apply_reply"""
298 298 return {
299 299 'dependencies_met' : True,
300 300 'engine' : self.ident,
301 301 'started': datetime.now(),
302 302 }
303 303
304 304 def _publish_pyin(self, code, parent, execution_count):
305 305 """Publish the code request on the pyin stream."""
306 306
307 307 self.session.send(self.iopub_socket, u'pyin',
308 308 {u'code':code, u'execution_count': execution_count},
309 309 parent=parent, ident=self._topic('pyin')
310 310 )
311 311
312 312 def execute_request(self, stream, ident, parent):
313 313
314 314 self.session.send(self.iopub_socket,
315 315 u'status',
316 316 {u'execution_state':u'busy'},
317 317 parent=parent,
318 318 ident=self._topic('status'),
319 319 )
320 320
321 321 try:
322 322 content = parent[u'content']
323 323 code = content[u'code']
324 324 silent = content[u'silent']
325 325 except:
326 326 self.log.error("Got bad msg: ")
327 327 self.log.error("%s", parent)
328 328 return
329 329
330 330 sub = self._make_subheader()
331 331
332 332 shell = self.shell # we'll need this a lot here
333 333
334 334 # Replace raw_input. Note that is not sufficient to replace
335 335 # raw_input in the user namespace.
336 336 if content.get('allow_stdin', False):
337 337 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
338 338 else:
339 339 raw_input = lambda prompt='' : self._no_raw_input()
340 340
341 341 if py3compat.PY3:
342 342 __builtin__.input = raw_input
343 343 else:
344 344 __builtin__.raw_input = raw_input
345 345
346 346 # Set the parent message of the display hook and out streams.
347 347 shell.displayhook.set_parent(parent)
348 348 shell.display_pub.set_parent(parent)
349 349 sys.stdout.set_parent(parent)
350 350 sys.stderr.set_parent(parent)
351 351
352 352 # Re-broadcast our input for the benefit of listening clients, and
353 353 # start computing output
354 354 if not silent:
355 355 self._publish_pyin(code, parent, shell.execution_count)
356 356
357 357 reply_content = {}
358 358 try:
359 359 # FIXME: the shell calls the exception handler itself.
360 360 shell.run_cell(code, store_history=not silent, silent=silent)
361 361 except:
362 362 status = u'error'
363 363 # FIXME: this code right now isn't being used yet by default,
364 364 # because the run_cell() call above directly fires off exception
365 365 # reporting. This code, therefore, is only active in the scenario
366 366 # where runlines itself has an unhandled exception. We need to
367 367 # uniformize this, for all exception construction to come from a
368 368 # single location in the codbase.
369 369 etype, evalue, tb = sys.exc_info()
370 370 tb_list = traceback.format_exception(etype, evalue, tb)
371 371 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
372 372 else:
373 373 status = u'ok'
374 374
375 375 reply_content[u'status'] = status
376 376
377 377 # Return the execution counter so clients can display prompts
378 378 reply_content['execution_count'] = shell.execution_count - 1
379 379
380 380 # FIXME - fish exception info out of shell, possibly left there by
381 381 # runlines. We'll need to clean up this logic later.
382 382 if shell._reply_content is not None:
383 383 reply_content.update(shell._reply_content)
384 384 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
385 385 reply_content['engine_info'] = e_info
386 386 # reset after use
387 387 shell._reply_content = None
388 388
389 389 # At this point, we can tell whether the main code execution succeeded
390 390 # or not. If it did, we proceed to evaluate user_variables/expressions
391 391 if reply_content['status'] == 'ok':
392 392 reply_content[u'user_variables'] = \
393 393 shell.user_variables(content.get(u'user_variables', []))
394 394 reply_content[u'user_expressions'] = \
395 395 shell.user_expressions(content.get(u'user_expressions', {}))
396 396 else:
397 397 # If there was an error, don't even try to compute variables or
398 398 # expressions
399 399 reply_content[u'user_variables'] = {}
400 400 reply_content[u'user_expressions'] = {}
401 401
402 402 # Payloads should be retrieved regardless of outcome, so we can both
403 403 # recover partial output (that could have been generated early in a
404 404 # block, before an error) and clear the payload system always.
405 405 reply_content[u'payload'] = shell.payload_manager.read_payload()
406 406 # Be agressive about clearing the payload because we don't want
407 407 # it to sit in memory until the next execute_request comes in.
408 408 shell.payload_manager.clear_payload()
409 409
410 410 # Flush output before sending the reply.
411 411 sys.stdout.flush()
412 412 sys.stderr.flush()
413 413 # FIXME: on rare occasions, the flush doesn't seem to make it to the
414 414 # clients... This seems to mitigate the problem, but we definitely need
415 415 # to better understand what's going on.
416 416 if self._execute_sleep:
417 417 time.sleep(self._execute_sleep)
418 418
419 419 # Send the reply.
420 420 reply_content = json_clean(reply_content)
421 421
422 422 sub['status'] = reply_content['status']
423 423 if reply_content['status'] == 'error' and \
424 424 reply_content['ename'] == 'UnmetDependency':
425 425 sub['dependencies_met'] = False
426 426
427 427 reply_msg = self.session.send(stream, u'execute_reply',
428 428 reply_content, parent, subheader=sub,
429 429 ident=ident)
430 430
431 431 self.log.debug("%s", reply_msg)
432 432
433 433 if not silent and reply_msg['content']['status'] == u'error':
434 434 self._abort_queues()
435 435
436 436 self.session.send(self.iopub_socket,
437 437 u'status',
438 438 {u'execution_state':u'idle'},
439 439 parent=parent,
440 440 ident=self._topic('status'))
441 441
442 442 def complete_request(self, stream, ident, parent):
443 443 txt, matches = self._complete(parent)
444 444 matches = {'matches' : matches,
445 445 'matched_text' : txt,
446 446 'status' : 'ok'}
447 447 matches = json_clean(matches)
448 448 completion_msg = self.session.send(stream, 'complete_reply',
449 449 matches, parent, ident)
450 450 self.log.debug("%s", completion_msg)
451 451
452 452 def object_info_request(self, stream, ident, parent):
453 453 content = parent['content']
454 454 object_info = self.shell.object_inspect(content['oname'],
455 455 detail_level = content.get('detail_level', 0)
456 456 )
457 457 # Before we send this object over, we scrub it for JSON usage
458 458 oinfo = json_clean(object_info)
459 459 msg = self.session.send(stream, 'object_info_reply',
460 460 oinfo, parent, ident)
461 461 self.log.debug("%s", msg)
462 462
463 463 def history_request(self, stream, ident, parent):
464 464 # We need to pull these out, as passing **kwargs doesn't work with
465 465 # unicode keys before Python 2.6.5.
466 466 hist_access_type = parent['content']['hist_access_type']
467 467 raw = parent['content']['raw']
468 468 output = parent['content']['output']
469 469 if hist_access_type == 'tail':
470 470 n = parent['content']['n']
471 471 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
472 472 include_latest=True)
473 473
474 474 elif hist_access_type == 'range':
475 475 session = parent['content']['session']
476 476 start = parent['content']['start']
477 477 stop = parent['content']['stop']
478 478 hist = self.shell.history_manager.get_range(session, start, stop,
479 479 raw=raw, output=output)
480 480
481 481 elif hist_access_type == 'search':
482 482 pattern = parent['content']['pattern']
483 483 hist = self.shell.history_manager.search(pattern, raw=raw,
484 484 output=output)
485 485
486 486 else:
487 487 hist = []
488 488 hist = list(hist)
489 489 content = {'history' : hist}
490 490 content = json_clean(content)
491 491 msg = self.session.send(stream, 'history_reply',
492 492 content, parent, ident)
493 493 self.log.debug("Sending history reply with %i entries", len(hist))
494 494
495 495 def connect_request(self, stream, ident, parent):
496 496 if self._recorded_ports is not None:
497 497 content = self._recorded_ports.copy()
498 498 else:
499 499 content = {}
500 500 msg = self.session.send(stream, 'connect_reply',
501 501 content, parent, ident)
502 502 self.log.debug("%s", msg)
503 503
504 504 def shutdown_request(self, stream, ident, parent):
505 505 self.shell.exit_now = True
506 506 content = dict(status='ok')
507 507 content.update(parent['content'])
508 508 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
509 509 # same content, but different msg_id for broadcasting on IOPub
510 510 self._shutdown_message = self.session.msg(u'shutdown_reply',
511 511 content, parent
512 512 )
513 513
514 514 self._at_shutdown()
515 515 # call sys.exit after a short delay
516 516 loop = ioloop.IOLoop.instance()
517 517 loop.add_timeout(time.time()+0.1, loop.stop)
518 518
519 519 #---------------------------------------------------------------------------
520 520 # Engine methods
521 521 #---------------------------------------------------------------------------
522 522
523 523 def apply_request(self, stream, ident, parent):
524 524 try:
525 525 content = parent[u'content']
526 526 bufs = parent[u'buffers']
527 527 msg_id = parent['header']['msg_id']
528 528 except:
529 529 self.log.error("Got bad msg: %s", parent, exc_info=True)
530 530 return
531 531
532 532 # Set the parent message of the display hook and out streams.
533 self.shell.displayhook.set_parent(parent)
534 self.shell.display_pub.set_parent(parent)
533 shell = self.shell
534 shell.displayhook.set_parent(parent)
535 shell.display_pub.set_parent(parent)
535 536 sys.stdout.set_parent(parent)
536 537 sys.stderr.set_parent(parent)
537 538
538 539 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
539 540 # self.iopub_socket.send(pyin_msg)
540 541 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
541 542 sub = self._make_subheader()
542 543 try:
543 working = self.shell.user_ns
544 working = shell.user_ns
544 545
545 546 prefix = "_"+str(msg_id).replace("-","")+"_"
546 547
547 548 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
548 549
549 550 fname = getattr(f, '__name__', 'f')
550 551
551 552 fname = prefix+"f"
552 553 argname = prefix+"args"
553 554 kwargname = prefix+"kwargs"
554 555 resultname = prefix+"result"
555 556
556 557 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
557 558 # print ns
558 559 working.update(ns)
559 560 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
560 561 try:
561 exec code in self.shell.user_global_ns, self.shell.user_ns
562 exec code in shell.user_global_ns, shell.user_ns
562 563 result = working.get(resultname)
563 564 finally:
564 565 for key in ns.iterkeys():
565 566 working.pop(key)
566 567
567 568 packed_result,buf = serialize_object(result)
568 569 result_buf = [packed_result]+buf
569 570 except:
570 exc_content = self._wrap_exception('apply')
571 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
572 self.session.send(self.iopub_socket, u'pyerr', exc_content, parent=parent,
571 # invoke IPython traceback formatting
572 shell.showtraceback()
573 # FIXME - fish exception info out of shell, possibly left there by
574 # run_code. We'll need to clean up this logic later.
575 reply_content = {}
576 if shell._reply_content is not None:
577 reply_content.update(shell._reply_content)
578 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
579 reply_content['engine_info'] = e_info
580 # reset after use
581 shell._reply_content = None
582
583 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
573 584 ident=self._topic('pyerr'))
574 reply_content = exc_content
575 585 result_buf = []
576 586
577 if exc_content['ename'] == 'UnmetDependency':
587 if reply_content['ename'] == 'UnmetDependency':
578 588 sub['dependencies_met'] = False
579 589 else:
580 590 reply_content = {'status' : 'ok'}
581 591
582 592 # put 'ok'/'error' status in header, for scheduler introspection:
583 593 sub['status'] = reply_content['status']
584 594
585 595 # flush i/o
586 596 sys.stdout.flush()
587 597 sys.stderr.flush()
588 598
589 599 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
590 600 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
591 601
592 602 #---------------------------------------------------------------------------
593 603 # Control messages
594 604 #---------------------------------------------------------------------------
595 605
596 606 def abort_request(self, stream, ident, parent):
597 607 """abort a specifig msg by id"""
598 608 msg_ids = parent['content'].get('msg_ids', None)
599 609 if isinstance(msg_ids, basestring):
600 610 msg_ids = [msg_ids]
601 611 if not msg_ids:
602 612 self.abort_queues()
603 613 for mid in msg_ids:
604 614 self.aborted.add(str(mid))
605 615
606 616 content = dict(status='ok')
607 617 reply_msg = self.session.send(stream, 'abort_reply', content=content,
608 618 parent=parent, ident=ident)
609 619 self.log.debug("%s", reply_msg)
610 620
611 621 def clear_request(self, stream, idents, parent):
612 622 """Clear our namespace."""
613 623 self.shell.reset(False)
614 624 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
615 625 content = dict(status='ok'))
616 626
617 627
618 628 #---------------------------------------------------------------------------
619 629 # Protected interface
620 630 #---------------------------------------------------------------------------
621 631
622 632
623 633 def _wrap_exception(self, method=None):
624 634 # import here, because _wrap_exception is only used in parallel,
625 635 # and parallel has higher min pyzmq version
626 636 from IPython.parallel.error import wrap_exception
627 637 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
628 638 content = wrap_exception(e_info)
629 639 return content
630 640
631 641 def _topic(self, topic):
632 642 """prefixed topic for IOPub messages"""
633 643 if self.int_id >= 0:
634 644 base = "engine.%i" % self.int_id
635 645 else:
636 646 base = "kernel.%s" % self.ident
637 647
638 648 return py3compat.cast_bytes("%s.%s" % (base, topic))
639 649
640 650 def _abort_queues(self):
641 651 for stream in self.shell_streams:
642 652 if stream:
643 653 self._abort_queue(stream)
644 654
645 655 def _abort_queue(self, stream):
646 656 poller = zmq.Poller()
647 657 poller.register(stream.socket, zmq.POLLIN)
648 658 while True:
649 659 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
650 660 if msg is None:
651 661 return
652 662
653 663 self.log.info("Aborting:")
654 664 self.log.info("%s", msg)
655 665 msg_type = msg['header']['msg_type']
656 666 reply_type = msg_type.split('_')[0] + '_reply'
657 667
658 668 status = {'status' : 'aborted'}
659 669 sub = {'engine' : self.ident}
660 670 sub.update(status)
661 671 reply_msg = self.session.send(stream, reply_type, subheader=sub,
662 672 content=status, parent=msg, ident=idents)
663 673 self.log.debug("%s", reply_msg)
664 674 # We need to wait a bit for requests to come in. This can probably
665 675 # be set shorter for true asynchronous clients.
666 676 poller.poll(50)
667 677
668 678
669 679 def _no_raw_input(self):
670 680 """Raise StdinNotImplentedError if active frontend doesn't support
671 681 stdin."""
672 682 raise StdinNotImplementedError("raw_input was called, but this "
673 683 "frontend does not support stdin.")
674 684
675 685 def _raw_input(self, prompt, ident, parent):
676 686 # Flush output before making the request.
677 687 sys.stderr.flush()
678 688 sys.stdout.flush()
679 689
680 690 # Send the input request.
681 691 content = json_clean(dict(prompt=prompt))
682 692 self.session.send(self.stdin_socket, u'input_request', content, parent,
683 693 ident=ident)
684 694
685 695 # Await a response.
686 696 while True:
687 697 try:
688 698 ident, reply = self.session.recv(self.stdin_socket, 0)
689 699 except Exception:
690 700 self.log.warn("Invalid Message:", exc_info=True)
691 701 else:
692 702 break
693 703 try:
694 704 value = reply['content']['value']
695 705 except:
696 706 self.log.error("Got bad raw_input reply: ")
697 707 self.log.error("%s", parent)
698 708 value = ''
699 709 if value == '\x04':
700 710 # EOF
701 711 raise EOFError
702 712 return value
703 713
704 714 def _complete(self, msg):
705 715 c = msg['content']
706 716 try:
707 717 cpos = int(c['cursor_pos'])
708 718 except:
709 719 # If we don't get something that we can convert to an integer, at
710 720 # least attempt the completion guessing the cursor is at the end of
711 721 # the text, if there's any, and otherwise of the line
712 722 cpos = len(c['text'])
713 723 if cpos==0:
714 724 cpos = len(c['line'])
715 725 return self.shell.complete(c['text'], c['line'], cpos)
716 726
717 727 def _object_info(self, context):
718 728 symbol, leftover = self._symbol_from_context(context)
719 729 if symbol is not None and not leftover:
720 730 doc = getattr(symbol, '__doc__', '')
721 731 else:
722 732 doc = ''
723 733 object_info = dict(docstring = doc)
724 734 return object_info
725 735
726 736 def _symbol_from_context(self, context):
727 737 if not context:
728 738 return None, context
729 739
730 740 base_symbol_string = context[0]
731 741 symbol = self.shell.user_ns.get(base_symbol_string, None)
732 742 if symbol is None:
733 743 symbol = __builtin__.__dict__.get(base_symbol_string, None)
734 744 if symbol is None:
735 745 return None, context
736 746
737 747 context = context[1:]
738 748 for i, name in enumerate(context):
739 749 new_symbol = getattr(symbol, name, None)
740 750 if new_symbol is None:
741 751 return symbol, context[i:]
742 752 else:
743 753 symbol = new_symbol
744 754
745 755 return symbol, []
746 756
747 757 def _at_shutdown(self):
748 758 """Actions taken at shutdown by the kernel, called by python's atexit.
749 759 """
750 760 # io.rprint("Kernel at_shutdown") # dbg
751 761 if self._shutdown_message is not None:
752 762 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
753 763 self.log.debug("%s", self._shutdown_message)
754 764 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
755 765
756 766 #-----------------------------------------------------------------------------
757 767 # Aliases and Flags for the IPKernelApp
758 768 #-----------------------------------------------------------------------------
759 769
760 770 flags = dict(kernel_flags)
761 771 flags.update(shell_flags)
762 772
763 773 addflag = lambda *args: flags.update(boolean_flag(*args))
764 774
765 775 flags['pylab'] = (
766 776 {'IPKernelApp' : {'pylab' : 'auto'}},
767 777 """Pre-load matplotlib and numpy for interactive use with
768 778 the default matplotlib backend."""
769 779 )
770 780
771 781 aliases = dict(kernel_aliases)
772 782 aliases.update(shell_aliases)
773 783
774 784 #-----------------------------------------------------------------------------
775 785 # The IPKernelApp class
776 786 #-----------------------------------------------------------------------------
777 787
778 788 class IPKernelApp(KernelApp, InteractiveShellApp):
779 789 name = 'ipkernel'
780 790
781 791 aliases = Dict(aliases)
782 792 flags = Dict(flags)
783 793 classes = [Kernel, ZMQInteractiveShell, ProfileDir, Session]
784 794
785 795 @catch_config_error
786 796 def initialize(self, argv=None):
787 797 super(IPKernelApp, self).initialize(argv)
788 798 self.init_path()
789 799 self.init_shell()
790 800 self.init_gui_pylab()
791 801 self.init_extensions()
792 802 self.init_code()
793 803
794 804 def init_kernel(self):
795 805
796 806 shell_stream = ZMQStream(self.shell_socket)
797 807
798 808 kernel = Kernel(config=self.config, session=self.session,
799 809 shell_streams=[shell_stream],
800 810 iopub_socket=self.iopub_socket,
801 811 stdin_socket=self.stdin_socket,
802 812 log=self.log,
803 813 profile_dir=self.profile_dir,
804 814 )
805 815 self.kernel = kernel
806 816 kernel.record_ports(self.ports)
807 817 shell = kernel.shell
808 818
809 819 def init_gui_pylab(self):
810 820 """Enable GUI event loop integration, taking pylab into account."""
811 821
812 822 # Provide a wrapper for :meth:`InteractiveShellApp.init_gui_pylab`
813 823 # to ensure that any exception is printed straight to stderr.
814 824 # Normally _showtraceback associates the reply with an execution,
815 825 # which means frontends will never draw it, as this exception
816 826 # is not associated with any execute request.
817 827
818 828 shell = self.shell
819 829 _showtraceback = shell._showtraceback
820 830 try:
821 831 # replace pyerr-sending traceback with stderr
822 832 def print_tb(etype, evalue, stb):
823 833 print ("GUI event loop or pylab initialization failed",
824 834 file=io.stderr)
825 835 print (shell.InteractiveTB.stb2text(stb), file=io.stderr)
826 836 shell._showtraceback = print_tb
827 837 InteractiveShellApp.init_gui_pylab(self)
828 838 finally:
829 839 shell._showtraceback = _showtraceback
830 840
831 841 def init_shell(self):
832 842 self.shell = self.kernel.shell
833 843 self.shell.configurables.append(self)
834 844
835 845
836 846 #-----------------------------------------------------------------------------
837 847 # Kernel main and launch functions
838 848 #-----------------------------------------------------------------------------
839 849
840 850 def launch_kernel(*args, **kwargs):
841 851 """Launches a localhost IPython kernel, binding to the specified ports.
842 852
843 853 This function simply calls entry_point.base_launch_kernel with the right
844 854 first command to start an ipkernel. See base_launch_kernel for arguments.
845 855
846 856 Returns
847 857 -------
848 858 A tuple of form:
849 859 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
850 860 where kernel_process is a Popen object and the ports are integers.
851 861 """
852 862 return base_launch_kernel('from IPython.zmq.ipkernel import main; main()',
853 863 *args, **kwargs)
854 864
855 865
856 866 def embed_kernel(module=None, local_ns=None, **kwargs):
857 867 """Embed and start an IPython kernel in a given scope.
858 868
859 869 Parameters
860 870 ----------
861 871 module : ModuleType, optional
862 872 The module to load into IPython globals (default: caller)
863 873 local_ns : dict, optional
864 874 The namespace to load into IPython user namespace (default: caller)
865 875
866 876 kwargs : various, optional
867 877 Further keyword args are relayed to the KernelApp constructor,
868 878 allowing configuration of the Kernel. Will only have an effect
869 879 on the first embed_kernel call for a given process.
870 880
871 881 """
872 882 # get the app if it exists, or set it up if it doesn't
873 883 if IPKernelApp.initialized():
874 884 app = IPKernelApp.instance()
875 885 else:
876 886 app = IPKernelApp.instance(**kwargs)
877 887 app.initialize([])
878 888 # Undo unnecessary sys module mangling from init_sys_modules.
879 889 # This would not be necessary if we could prevent it
880 890 # in the first place by using a different InteractiveShell
881 891 # subclass, as in the regular embed case.
882 892 main = app.kernel.shell._orig_sys_modules_main_mod
883 893 if main is not None:
884 894 sys.modules[app.kernel.shell._orig_sys_modules_main_name] = main
885 895
886 896 # load the calling scope if not given
887 897 (caller_module, caller_locals) = extract_module_locals(1)
888 898 if module is None:
889 899 module = caller_module
890 900 if local_ns is None:
891 901 local_ns = caller_locals
892 902
893 903 app.kernel.user_module = module
894 904 app.kernel.user_ns = local_ns
895 905 app.start()
896 906
897 907 def main():
898 908 """Run an IPKernel as an application"""
899 909 app = IPKernelApp.instance()
900 910 app.initialize()
901 911 app.start()
902 912
903 913
904 914 if __name__ == '__main__':
905 915 main()
General Comments 0
You need to be logged in to leave comments. Login now