##// END OF EJS Templates
Don't exit the event loop when a toolkit event loop exits.
Ryan May -
Show More
@@ -1,922 +1,920 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 7 call set_parent on all the PUB objects with the message about to be executed.
8 8 * Implement random port and security key logic.
9 9 * Implement control messages.
10 10 * Implement event loop and poll version.
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 # Standard library imports
19 19 import __builtin__
20 20 import atexit
21 21 import sys
22 22 import time
23 23 import traceback
24 24 import logging
25 25 import uuid
26 26
27 27 from datetime import datetime
28 28 from signal import (
29 29 signal, getsignal, default_int_handler, SIGINT, SIG_IGN
30 30 )
31 31
32 32 # System library imports
33 33 import zmq
34 34 from zmq.eventloop import ioloop
35 35 from zmq.eventloop.zmqstream import ZMQStream
36 36
37 37 # Local imports
38 38 from IPython.config.configurable import Configurable
39 39 from IPython.config.application import boolean_flag, catch_config_error
40 40 from IPython.core.application import ProfileDir
41 41 from IPython.core.error import StdinNotImplementedError
42 42 from IPython.core.shellapp import (
43 43 InteractiveShellApp, shell_flags, shell_aliases
44 44 )
45 45 from IPython.utils import io
46 46 from IPython.utils import py3compat
47 47 from IPython.utils.frame import extract_module_locals
48 48 from IPython.utils.jsonutil import json_clean
49 49 from IPython.utils.traitlets import (
50 50 Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode
51 51 )
52 52
53 53 from entry_point import base_launch_kernel
54 54 from kernelapp import KernelApp, kernel_flags, kernel_aliases
55 55 from serialize import serialize_object, unpack_apply_message
56 56 from session import Session, Message
57 57 from zmqshell import ZMQInteractiveShell
58 58
59 59
60 60 #-----------------------------------------------------------------------------
61 61 # Main kernel class
62 62 #-----------------------------------------------------------------------------
63 63
64 64 class Kernel(Configurable):
65 65
66 66 #---------------------------------------------------------------------------
67 67 # Kernel interface
68 68 #---------------------------------------------------------------------------
69 69
70 70 # attribute to override with a GUI
71 71 eventloop = Any(None)
72 72 def _eventloop_changed(self, name, old, new):
73 73 """schedule call to eventloop from IOLoop"""
74 74 loop = ioloop.IOLoop.instance()
75 75 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
76 76
77 77 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
78 78 session = Instance(Session)
79 79 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
80 80 shell_streams = List()
81 81 control_stream = Instance(ZMQStream)
82 82 iopub_socket = Instance(zmq.Socket)
83 83 stdin_socket = Instance(zmq.Socket)
84 84 log = Instance(logging.Logger)
85 85
86 86 user_module = Any()
87 87 def _user_module_changed(self, name, old, new):
88 88 if self.shell is not None:
89 89 self.shell.user_module = new
90 90
91 91 user_ns = Dict(default_value=None)
92 92 def _user_ns_changed(self, name, old, new):
93 93 if self.shell is not None:
94 94 self.shell.user_ns = new
95 95 self.shell.init_user_ns()
96 96
97 97 # identities:
98 98 int_id = Integer(-1)
99 99 ident = Unicode()
100 100
101 101 def _ident_default(self):
102 102 return unicode(uuid.uuid4())
103 103
104 104
105 105 # Private interface
106 106
107 107 # Time to sleep after flushing the stdout/err buffers in each execute
108 108 # cycle. While this introduces a hard limit on the minimal latency of the
109 109 # execute cycle, it helps prevent output synchronization problems for
110 110 # clients.
111 111 # Units are in seconds. The minimum zmq latency on local host is probably
112 112 # ~150 microseconds, set this to 500us for now. We may need to increase it
113 113 # a little if it's not enough after more interactive testing.
114 114 _execute_sleep = Float(0.0005, config=True)
115 115
116 116 # Frequency of the kernel's event loop.
117 117 # Units are in seconds, kernel subclasses for GUI toolkits may need to
118 118 # adapt to milliseconds.
119 119 _poll_interval = Float(0.05, config=True)
120 120
121 121 # If the shutdown was requested over the network, we leave here the
122 122 # necessary reply message so it can be sent by our registered atexit
123 123 # handler. This ensures that the reply is only sent to clients truly at
124 124 # the end of our shutdown process (which happens after the underlying
125 125 # IPython shell's own shutdown).
126 126 _shutdown_message = None
127 127
128 128 # This is a dict of port number that the kernel is listening on. It is set
129 129 # by record_ports and used by connect_request.
130 130 _recorded_ports = Dict()
131 131
132 132 # set of aborted msg_ids
133 133 aborted = Set()
134 134
135 135
136 136 def __init__(self, **kwargs):
137 137 super(Kernel, self).__init__(**kwargs)
138 138
139 139 # Initialize the InteractiveShell subclass
140 140 self.shell = ZMQInteractiveShell.instance(config=self.config,
141 141 profile_dir = self.profile_dir,
142 142 user_module = self.user_module,
143 143 user_ns = self.user_ns,
144 144 )
145 145 self.shell.displayhook.session = self.session
146 146 self.shell.displayhook.pub_socket = self.iopub_socket
147 147 self.shell.displayhook.topic = self._topic('pyout')
148 148 self.shell.display_pub.session = self.session
149 149 self.shell.display_pub.pub_socket = self.iopub_socket
150 150
151 151 # TMP - hack while developing
152 152 self.shell._reply_content = None
153 153
154 154 # Build dict of handlers for message types
155 155 msg_types = [ 'execute_request', 'complete_request',
156 156 'object_info_request', 'history_request',
157 157 'connect_request', 'shutdown_request',
158 158 'apply_request',
159 159 ]
160 160 self.shell_handlers = {}
161 161 for msg_type in msg_types:
162 162 self.shell_handlers[msg_type] = getattr(self, msg_type)
163 163
164 164 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
165 165 self.control_handlers = {}
166 166 for msg_type in control_msg_types:
167 167 self.control_handlers[msg_type] = getattr(self, msg_type)
168 168
169 169 def dispatch_control(self, msg):
170 170 """dispatch control requests"""
171 171 idents,msg = self.session.feed_identities(msg, copy=False)
172 172 try:
173 173 msg = self.session.unserialize(msg, content=True, copy=False)
174 174 except:
175 175 self.log.error("Invalid Control Message", exc_info=True)
176 176 return
177 177
178 178 self.log.debug("Control received: %s", msg)
179 179
180 180 header = msg['header']
181 181 msg_id = header['msg_id']
182 182 msg_type = header['msg_type']
183 183
184 184 handler = self.control_handlers.get(msg_type, None)
185 185 if handler is None:
186 186 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
187 187 else:
188 188 try:
189 189 handler(self.control_stream, idents, msg)
190 190 except Exception:
191 191 self.log.error("Exception in control handler:", exc_info=True)
192 192
193 193 def dispatch_shell(self, stream, msg):
194 194 """dispatch shell requests"""
195 195 # flush control requests first
196 196 if self.control_stream:
197 197 self.control_stream.flush()
198 198
199 199 idents,msg = self.session.feed_identities(msg, copy=False)
200 200 try:
201 201 msg = self.session.unserialize(msg, content=True, copy=False)
202 202 except:
203 203 self.log.error("Invalid Message", exc_info=True)
204 204 return
205 205
206 206 header = msg['header']
207 207 msg_id = header['msg_id']
208 208 msg_type = msg['header']['msg_type']
209 209
210 210 # Print some info about this message and leave a '--->' marker, so it's
211 211 # easier to trace visually the message chain when debugging. Each
212 212 # handler prints its message at the end.
213 213 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
214 214 self.log.debug(' Content: %s\n --->\n ', msg['content'])
215 215
216 216 if msg_id in self.aborted:
217 217 self.aborted.remove(msg_id)
218 218 # is it safe to assume a msg_id will not be resubmitted?
219 219 reply_type = msg_type.split('_')[0] + '_reply'
220 220 status = {'status' : 'aborted'}
221 221 sub = {'engine' : self.ident}
222 222 sub.update(status)
223 223 reply_msg = self.session.send(stream, reply_type, subheader=sub,
224 224 content=status, parent=msg, ident=idents)
225 225 return
226 226
227 227 handler = self.shell_handlers.get(msg_type, None)
228 228 if handler is None:
229 229 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
230 230 else:
231 231 # ensure default_int_handler during handler call
232 232 sig = signal(SIGINT, default_int_handler)
233 233 try:
234 234 handler(stream, idents, msg)
235 235 except Exception:
236 236 self.log.error("Exception in message handler:", exc_info=True)
237 237 finally:
238 238 signal(SIGINT, sig)
239 239
240 240 def enter_eventloop(self):
241 241 """enter eventloop"""
242 242 self.log.info("entering eventloop")
243 243 # restore default_int_handler
244 244 signal(SIGINT, default_int_handler)
245 245 while self.eventloop is not None:
246 246 try:
247 247 self.eventloop(self)
248 248 except KeyboardInterrupt:
249 249 # Ctrl-C shouldn't crash the kernel
250 250 self.log.error("KeyboardInterrupt caught in kernel")
251 251 continue
252 252 else:
253 253 # eventloop exited cleanly, this means we should stop (right?)
254 254 self.eventloop = None
255 255 break
256 256 self.log.info("exiting eventloop")
257 # if eventloop exits, IOLoop should stop
258 ioloop.IOLoop.instance().stop()
259 257
260 258 def start(self):
261 259 """register dispatchers for streams"""
262 260 self.shell.exit_now = False
263 261 if self.control_stream:
264 262 self.control_stream.on_recv(self.dispatch_control, copy=False)
265 263
266 264 def make_dispatcher(stream):
267 265 def dispatcher(msg):
268 266 return self.dispatch_shell(stream, msg)
269 267 return dispatcher
270 268
271 269 for s in self.shell_streams:
272 270 s.on_recv(make_dispatcher(s), copy=False)
273 271
274 272 def do_one_iteration(self):
275 273 """step eventloop just once"""
276 274 if self.control_stream:
277 275 self.control_stream.flush()
278 276 for stream in self.shell_streams:
279 277 # handle at most one request per iteration
280 278 stream.flush(zmq.POLLIN, 1)
281 279 stream.flush(zmq.POLLOUT)
282 280
283 281
284 282 def record_ports(self, ports):
285 283 """Record the ports that this kernel is using.
286 284
287 285 The creator of the Kernel instance must call this methods if they
288 286 want the :meth:`connect_request` method to return the port numbers.
289 287 """
290 288 self._recorded_ports = ports
291 289
292 290 #---------------------------------------------------------------------------
293 291 # Kernel request handlers
294 292 #---------------------------------------------------------------------------
295 293
296 294 def _make_subheader(self):
297 295 """init subheader dict, for execute/apply_reply"""
298 296 return {
299 297 'dependencies_met' : True,
300 298 'engine' : self.ident,
301 299 'started': datetime.now(),
302 300 }
303 301
304 302 def _publish_pyin(self, code, parent, execution_count):
305 303 """Publish the code request on the pyin stream."""
306 304
307 305 self.session.send(self.iopub_socket, u'pyin',
308 306 {u'code':code, u'execution_count': execution_count},
309 307 parent=parent, ident=self._topic('pyin')
310 308 )
311 309
312 310 def _publish_status(self, status, parent=None):
313 311 """send status (busy/idle) on IOPub"""
314 312 self.session.send(self.iopub_socket,
315 313 u'status',
316 314 {u'execution_state': status},
317 315 parent=parent,
318 316 ident=self._topic('status'),
319 317 )
320 318
321 319
322 320 def execute_request(self, stream, ident, parent):
323 321 """handle an execute_request"""
324 322
325 323 self._publish_status(u'busy', parent)
326 324
327 325 try:
328 326 content = parent[u'content']
329 327 code = content[u'code']
330 328 silent = content[u'silent']
331 329 except:
332 330 self.log.error("Got bad msg: ")
333 331 self.log.error("%s", parent)
334 332 return
335 333
336 334 sub = self._make_subheader()
337 335
338 336 shell = self.shell # we'll need this a lot here
339 337
340 338 # Replace raw_input. Note that is not sufficient to replace
341 339 # raw_input in the user namespace.
342 340 if content.get('allow_stdin', False):
343 341 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
344 342 else:
345 343 raw_input = lambda prompt='' : self._no_raw_input()
346 344
347 345 if py3compat.PY3:
348 346 __builtin__.input = raw_input
349 347 else:
350 348 __builtin__.raw_input = raw_input
351 349
352 350 # Set the parent message of the display hook and out streams.
353 351 shell.displayhook.set_parent(parent)
354 352 shell.display_pub.set_parent(parent)
355 353 sys.stdout.set_parent(parent)
356 354 sys.stderr.set_parent(parent)
357 355
358 356 # Re-broadcast our input for the benefit of listening clients, and
359 357 # start computing output
360 358 if not silent:
361 359 self._publish_pyin(code, parent, shell.execution_count)
362 360
363 361 reply_content = {}
364 362 try:
365 363 # FIXME: the shell calls the exception handler itself.
366 364 shell.run_cell(code, store_history=not silent, silent=silent)
367 365 except:
368 366 status = u'error'
369 367 # FIXME: this code right now isn't being used yet by default,
370 368 # because the run_cell() call above directly fires off exception
371 369 # reporting. This code, therefore, is only active in the scenario
372 370 # where runlines itself has an unhandled exception. We need to
373 371 # uniformize this, for all exception construction to come from a
374 372 # single location in the codbase.
375 373 etype, evalue, tb = sys.exc_info()
376 374 tb_list = traceback.format_exception(etype, evalue, tb)
377 375 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
378 376 else:
379 377 status = u'ok'
380 378
381 379 reply_content[u'status'] = status
382 380
383 381 # Return the execution counter so clients can display prompts
384 382 reply_content['execution_count'] = shell.execution_count - 1
385 383
386 384 # FIXME - fish exception info out of shell, possibly left there by
387 385 # runlines. We'll need to clean up this logic later.
388 386 if shell._reply_content is not None:
389 387 reply_content.update(shell._reply_content)
390 388 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
391 389 reply_content['engine_info'] = e_info
392 390 # reset after use
393 391 shell._reply_content = None
394 392
395 393 # At this point, we can tell whether the main code execution succeeded
396 394 # or not. If it did, we proceed to evaluate user_variables/expressions
397 395 if reply_content['status'] == 'ok':
398 396 reply_content[u'user_variables'] = \
399 397 shell.user_variables(content.get(u'user_variables', []))
400 398 reply_content[u'user_expressions'] = \
401 399 shell.user_expressions(content.get(u'user_expressions', {}))
402 400 else:
403 401 # If there was an error, don't even try to compute variables or
404 402 # expressions
405 403 reply_content[u'user_variables'] = {}
406 404 reply_content[u'user_expressions'] = {}
407 405
408 406 # Payloads should be retrieved regardless of outcome, so we can both
409 407 # recover partial output (that could have been generated early in a
410 408 # block, before an error) and clear the payload system always.
411 409 reply_content[u'payload'] = shell.payload_manager.read_payload()
412 410 # Be agressive about clearing the payload because we don't want
413 411 # it to sit in memory until the next execute_request comes in.
414 412 shell.payload_manager.clear_payload()
415 413
416 414 # Flush output before sending the reply.
417 415 sys.stdout.flush()
418 416 sys.stderr.flush()
419 417 # FIXME: on rare occasions, the flush doesn't seem to make it to the
420 418 # clients... This seems to mitigate the problem, but we definitely need
421 419 # to better understand what's going on.
422 420 if self._execute_sleep:
423 421 time.sleep(self._execute_sleep)
424 422
425 423 # Send the reply.
426 424 reply_content = json_clean(reply_content)
427 425
428 426 sub['status'] = reply_content['status']
429 427 if reply_content['status'] == 'error' and \
430 428 reply_content['ename'] == 'UnmetDependency':
431 429 sub['dependencies_met'] = False
432 430
433 431 reply_msg = self.session.send(stream, u'execute_reply',
434 432 reply_content, parent, subheader=sub,
435 433 ident=ident)
436 434
437 435 self.log.debug("%s", reply_msg)
438 436
439 437 if not silent and reply_msg['content']['status'] == u'error':
440 438 self._abort_queues()
441 439
442 440 self._publish_status(u'idle', parent)
443 441
444 442 def complete_request(self, stream, ident, parent):
445 443 txt, matches = self._complete(parent)
446 444 matches = {'matches' : matches,
447 445 'matched_text' : txt,
448 446 'status' : 'ok'}
449 447 matches = json_clean(matches)
450 448 completion_msg = self.session.send(stream, 'complete_reply',
451 449 matches, parent, ident)
452 450 self.log.debug("%s", completion_msg)
453 451
454 452 def object_info_request(self, stream, ident, parent):
455 453 content = parent['content']
456 454 object_info = self.shell.object_inspect(content['oname'],
457 455 detail_level = content.get('detail_level', 0)
458 456 )
459 457 # Before we send this object over, we scrub it for JSON usage
460 458 oinfo = json_clean(object_info)
461 459 msg = self.session.send(stream, 'object_info_reply',
462 460 oinfo, parent, ident)
463 461 self.log.debug("%s", msg)
464 462
465 463 def history_request(self, stream, ident, parent):
466 464 # We need to pull these out, as passing **kwargs doesn't work with
467 465 # unicode keys before Python 2.6.5.
468 466 hist_access_type = parent['content']['hist_access_type']
469 467 raw = parent['content']['raw']
470 468 output = parent['content']['output']
471 469 if hist_access_type == 'tail':
472 470 n = parent['content']['n']
473 471 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
474 472 include_latest=True)
475 473
476 474 elif hist_access_type == 'range':
477 475 session = parent['content']['session']
478 476 start = parent['content']['start']
479 477 stop = parent['content']['stop']
480 478 hist = self.shell.history_manager.get_range(session, start, stop,
481 479 raw=raw, output=output)
482 480
483 481 elif hist_access_type == 'search':
484 482 pattern = parent['content']['pattern']
485 483 hist = self.shell.history_manager.search(pattern, raw=raw,
486 484 output=output)
487 485
488 486 else:
489 487 hist = []
490 488 hist = list(hist)
491 489 content = {'history' : hist}
492 490 content = json_clean(content)
493 491 msg = self.session.send(stream, 'history_reply',
494 492 content, parent, ident)
495 493 self.log.debug("Sending history reply with %i entries", len(hist))
496 494
497 495 def connect_request(self, stream, ident, parent):
498 496 if self._recorded_ports is not None:
499 497 content = self._recorded_ports.copy()
500 498 else:
501 499 content = {}
502 500 msg = self.session.send(stream, 'connect_reply',
503 501 content, parent, ident)
504 502 self.log.debug("%s", msg)
505 503
506 504 def shutdown_request(self, stream, ident, parent):
507 505 self.shell.exit_now = True
508 506 content = dict(status='ok')
509 507 content.update(parent['content'])
510 508 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
511 509 # same content, but different msg_id for broadcasting on IOPub
512 510 self._shutdown_message = self.session.msg(u'shutdown_reply',
513 511 content, parent
514 512 )
515 513
516 514 self._at_shutdown()
517 515 # call sys.exit after a short delay
518 516 loop = ioloop.IOLoop.instance()
519 517 loop.add_timeout(time.time()+0.1, loop.stop)
520 518
521 519 #---------------------------------------------------------------------------
522 520 # Engine methods
523 521 #---------------------------------------------------------------------------
524 522
525 523 def apply_request(self, stream, ident, parent):
526 524 try:
527 525 content = parent[u'content']
528 526 bufs = parent[u'buffers']
529 527 msg_id = parent['header']['msg_id']
530 528 except:
531 529 self.log.error("Got bad msg: %s", parent, exc_info=True)
532 530 return
533 531
534 532 self._publish_status(u'busy', parent)
535 533
536 534 # Set the parent message of the display hook and out streams.
537 535 shell = self.shell
538 536 shell.displayhook.set_parent(parent)
539 537 shell.display_pub.set_parent(parent)
540 538 sys.stdout.set_parent(parent)
541 539 sys.stderr.set_parent(parent)
542 540
543 541 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
544 542 # self.iopub_socket.send(pyin_msg)
545 543 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
546 544 sub = self._make_subheader()
547 545 try:
548 546 working = shell.user_ns
549 547
550 548 prefix = "_"+str(msg_id).replace("-","")+"_"
551 549
552 550 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
553 551
554 552 fname = getattr(f, '__name__', 'f')
555 553
556 554 fname = prefix+"f"
557 555 argname = prefix+"args"
558 556 kwargname = prefix+"kwargs"
559 557 resultname = prefix+"result"
560 558
561 559 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
562 560 # print ns
563 561 working.update(ns)
564 562 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
565 563 try:
566 564 exec code in shell.user_global_ns, shell.user_ns
567 565 result = working.get(resultname)
568 566 finally:
569 567 for key in ns.iterkeys():
570 568 working.pop(key)
571 569
572 570 packed_result,buf = serialize_object(result)
573 571 result_buf = [packed_result]+buf
574 572 except:
575 573 # invoke IPython traceback formatting
576 574 shell.showtraceback()
577 575 # FIXME - fish exception info out of shell, possibly left there by
578 576 # run_code. We'll need to clean up this logic later.
579 577 reply_content = {}
580 578 if shell._reply_content is not None:
581 579 reply_content.update(shell._reply_content)
582 580 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
583 581 reply_content['engine_info'] = e_info
584 582 # reset after use
585 583 shell._reply_content = None
586 584
587 585 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
588 586 ident=self._topic('pyerr'))
589 587 result_buf = []
590 588
591 589 if reply_content['ename'] == 'UnmetDependency':
592 590 sub['dependencies_met'] = False
593 591 else:
594 592 reply_content = {'status' : 'ok'}
595 593
596 594 # put 'ok'/'error' status in header, for scheduler introspection:
597 595 sub['status'] = reply_content['status']
598 596
599 597 # flush i/o
600 598 sys.stdout.flush()
601 599 sys.stderr.flush()
602 600
603 601 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
604 602 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
605 603
606 604 self._publish_status(u'idle', parent)
607 605
608 606 #---------------------------------------------------------------------------
609 607 # Control messages
610 608 #---------------------------------------------------------------------------
611 609
612 610 def abort_request(self, stream, ident, parent):
613 611 """abort a specifig msg by id"""
614 612 msg_ids = parent['content'].get('msg_ids', None)
615 613 if isinstance(msg_ids, basestring):
616 614 msg_ids = [msg_ids]
617 615 if not msg_ids:
618 616 self.abort_queues()
619 617 for mid in msg_ids:
620 618 self.aborted.add(str(mid))
621 619
622 620 content = dict(status='ok')
623 621 reply_msg = self.session.send(stream, 'abort_reply', content=content,
624 622 parent=parent, ident=ident)
625 623 self.log.debug("%s", reply_msg)
626 624
627 625 def clear_request(self, stream, idents, parent):
628 626 """Clear our namespace."""
629 627 self.shell.reset(False)
630 628 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
631 629 content = dict(status='ok'))
632 630
633 631
634 632 #---------------------------------------------------------------------------
635 633 # Protected interface
636 634 #---------------------------------------------------------------------------
637 635
638 636
639 637 def _wrap_exception(self, method=None):
640 638 # import here, because _wrap_exception is only used in parallel,
641 639 # and parallel has higher min pyzmq version
642 640 from IPython.parallel.error import wrap_exception
643 641 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
644 642 content = wrap_exception(e_info)
645 643 return content
646 644
647 645 def _topic(self, topic):
648 646 """prefixed topic for IOPub messages"""
649 647 if self.int_id >= 0:
650 648 base = "engine.%i" % self.int_id
651 649 else:
652 650 base = "kernel.%s" % self.ident
653 651
654 652 return py3compat.cast_bytes("%s.%s" % (base, topic))
655 653
656 654 def _abort_queues(self):
657 655 for stream in self.shell_streams:
658 656 if stream:
659 657 self._abort_queue(stream)
660 658
661 659 def _abort_queue(self, stream):
662 660 poller = zmq.Poller()
663 661 poller.register(stream.socket, zmq.POLLIN)
664 662 while True:
665 663 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
666 664 if msg is None:
667 665 return
668 666
669 667 self.log.info("Aborting:")
670 668 self.log.info("%s", msg)
671 669 msg_type = msg['header']['msg_type']
672 670 reply_type = msg_type.split('_')[0] + '_reply'
673 671
674 672 status = {'status' : 'aborted'}
675 673 sub = {'engine' : self.ident}
676 674 sub.update(status)
677 675 reply_msg = self.session.send(stream, reply_type, subheader=sub,
678 676 content=status, parent=msg, ident=idents)
679 677 self.log.debug("%s", reply_msg)
680 678 # We need to wait a bit for requests to come in. This can probably
681 679 # be set shorter for true asynchronous clients.
682 680 poller.poll(50)
683 681
684 682
685 683 def _no_raw_input(self):
686 684 """Raise StdinNotImplentedError if active frontend doesn't support
687 685 stdin."""
688 686 raise StdinNotImplementedError("raw_input was called, but this "
689 687 "frontend does not support stdin.")
690 688
691 689 def _raw_input(self, prompt, ident, parent):
692 690 # Flush output before making the request.
693 691 sys.stderr.flush()
694 692 sys.stdout.flush()
695 693
696 694 # Send the input request.
697 695 content = json_clean(dict(prompt=prompt))
698 696 self.session.send(self.stdin_socket, u'input_request', content, parent,
699 697 ident=ident)
700 698
701 699 # Await a response.
702 700 while True:
703 701 try:
704 702 ident, reply = self.session.recv(self.stdin_socket, 0)
705 703 except Exception:
706 704 self.log.warn("Invalid Message:", exc_info=True)
707 705 else:
708 706 break
709 707 try:
710 708 value = reply['content']['value']
711 709 except:
712 710 self.log.error("Got bad raw_input reply: ")
713 711 self.log.error("%s", parent)
714 712 value = ''
715 713 if value == '\x04':
716 714 # EOF
717 715 raise EOFError
718 716 return value
719 717
720 718 def _complete(self, msg):
721 719 c = msg['content']
722 720 try:
723 721 cpos = int(c['cursor_pos'])
724 722 except:
725 723 # If we don't get something that we can convert to an integer, at
726 724 # least attempt the completion guessing the cursor is at the end of
727 725 # the text, if there's any, and otherwise of the line
728 726 cpos = len(c['text'])
729 727 if cpos==0:
730 728 cpos = len(c['line'])
731 729 return self.shell.complete(c['text'], c['line'], cpos)
732 730
733 731 def _object_info(self, context):
734 732 symbol, leftover = self._symbol_from_context(context)
735 733 if symbol is not None and not leftover:
736 734 doc = getattr(symbol, '__doc__', '')
737 735 else:
738 736 doc = ''
739 737 object_info = dict(docstring = doc)
740 738 return object_info
741 739
742 740 def _symbol_from_context(self, context):
743 741 if not context:
744 742 return None, context
745 743
746 744 base_symbol_string = context[0]
747 745 symbol = self.shell.user_ns.get(base_symbol_string, None)
748 746 if symbol is None:
749 747 symbol = __builtin__.__dict__.get(base_symbol_string, None)
750 748 if symbol is None:
751 749 return None, context
752 750
753 751 context = context[1:]
754 752 for i, name in enumerate(context):
755 753 new_symbol = getattr(symbol, name, None)
756 754 if new_symbol is None:
757 755 return symbol, context[i:]
758 756 else:
759 757 symbol = new_symbol
760 758
761 759 return symbol, []
762 760
763 761 def _at_shutdown(self):
764 762 """Actions taken at shutdown by the kernel, called by python's atexit.
765 763 """
766 764 # io.rprint("Kernel at_shutdown") # dbg
767 765 if self._shutdown_message is not None:
768 766 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
769 767 self.log.debug("%s", self._shutdown_message)
770 768 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
771 769
772 770 #-----------------------------------------------------------------------------
773 771 # Aliases and Flags for the IPKernelApp
774 772 #-----------------------------------------------------------------------------
775 773
776 774 flags = dict(kernel_flags)
777 775 flags.update(shell_flags)
778 776
779 777 addflag = lambda *args: flags.update(boolean_flag(*args))
780 778
781 779 flags['pylab'] = (
782 780 {'IPKernelApp' : {'pylab' : 'auto'}},
783 781 """Pre-load matplotlib and numpy for interactive use with
784 782 the default matplotlib backend."""
785 783 )
786 784
787 785 aliases = dict(kernel_aliases)
788 786 aliases.update(shell_aliases)
789 787
790 788 #-----------------------------------------------------------------------------
791 789 # The IPKernelApp class
792 790 #-----------------------------------------------------------------------------
793 791
794 792 class IPKernelApp(KernelApp, InteractiveShellApp):
795 793 name = 'ipkernel'
796 794
797 795 aliases = Dict(aliases)
798 796 flags = Dict(flags)
799 797 classes = [Kernel, ZMQInteractiveShell, ProfileDir, Session]
800 798
801 799 @catch_config_error
802 800 def initialize(self, argv=None):
803 801 super(IPKernelApp, self).initialize(argv)
804 802 self.init_path()
805 803 self.init_shell()
806 804 self.init_gui_pylab()
807 805 self.init_extensions()
808 806 self.init_code()
809 807
810 808 def init_kernel(self):
811 809
812 810 shell_stream = ZMQStream(self.shell_socket)
813 811
814 812 kernel = Kernel(config=self.config, session=self.session,
815 813 shell_streams=[shell_stream],
816 814 iopub_socket=self.iopub_socket,
817 815 stdin_socket=self.stdin_socket,
818 816 log=self.log,
819 817 profile_dir=self.profile_dir,
820 818 )
821 819 self.kernel = kernel
822 820 kernel.record_ports(self.ports)
823 821 shell = kernel.shell
824 822
825 823 def init_gui_pylab(self):
826 824 """Enable GUI event loop integration, taking pylab into account."""
827 825
828 826 # Provide a wrapper for :meth:`InteractiveShellApp.init_gui_pylab`
829 827 # to ensure that any exception is printed straight to stderr.
830 828 # Normally _showtraceback associates the reply with an execution,
831 829 # which means frontends will never draw it, as this exception
832 830 # is not associated with any execute request.
833 831
834 832 shell = self.shell
835 833 _showtraceback = shell._showtraceback
836 834 try:
837 835 # replace pyerr-sending traceback with stderr
838 836 def print_tb(etype, evalue, stb):
839 837 print ("GUI event loop or pylab initialization failed",
840 838 file=io.stderr)
841 839 print (shell.InteractiveTB.stb2text(stb), file=io.stderr)
842 840 shell._showtraceback = print_tb
843 841 InteractiveShellApp.init_gui_pylab(self)
844 842 finally:
845 843 shell._showtraceback = _showtraceback
846 844
847 845 def init_shell(self):
848 846 self.shell = self.kernel.shell
849 847 self.shell.configurables.append(self)
850 848
851 849
852 850 #-----------------------------------------------------------------------------
853 851 # Kernel main and launch functions
854 852 #-----------------------------------------------------------------------------
855 853
856 854 def launch_kernel(*args, **kwargs):
857 855 """Launches a localhost IPython kernel, binding to the specified ports.
858 856
859 857 This function simply calls entry_point.base_launch_kernel with the right
860 858 first command to start an ipkernel. See base_launch_kernel for arguments.
861 859
862 860 Returns
863 861 -------
864 862 A tuple of form:
865 863 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
866 864 where kernel_process is a Popen object and the ports are integers.
867 865 """
868 866 return base_launch_kernel('from IPython.zmq.ipkernel import main; main()',
869 867 *args, **kwargs)
870 868
871 869
872 870 def embed_kernel(module=None, local_ns=None, **kwargs):
873 871 """Embed and start an IPython kernel in a given scope.
874 872
875 873 Parameters
876 874 ----------
877 875 module : ModuleType, optional
878 876 The module to load into IPython globals (default: caller)
879 877 local_ns : dict, optional
880 878 The namespace to load into IPython user namespace (default: caller)
881 879
882 880 kwargs : various, optional
883 881 Further keyword args are relayed to the KernelApp constructor,
884 882 allowing configuration of the Kernel. Will only have an effect
885 883 on the first embed_kernel call for a given process.
886 884
887 885 """
888 886 # get the app if it exists, or set it up if it doesn't
889 887 if IPKernelApp.initialized():
890 888 app = IPKernelApp.instance()
891 889 else:
892 890 app = IPKernelApp.instance(**kwargs)
893 891 app.initialize([])
894 892 # Undo unnecessary sys module mangling from init_sys_modules.
895 893 # This would not be necessary if we could prevent it
896 894 # in the first place by using a different InteractiveShell
897 895 # subclass, as in the regular embed case.
898 896 main = app.kernel.shell._orig_sys_modules_main_mod
899 897 if main is not None:
900 898 sys.modules[app.kernel.shell._orig_sys_modules_main_name] = main
901 899
902 900 # load the calling scope if not given
903 901 (caller_module, caller_locals) = extract_module_locals(1)
904 902 if module is None:
905 903 module = caller_module
906 904 if local_ns is None:
907 905 local_ns = caller_locals
908 906
909 907 app.kernel.user_module = module
910 908 app.kernel.user_ns = local_ns
911 909 app.shell.set_completer_frame()
912 910 app.start()
913 911
914 912 def main():
915 913 """Run an IPKernel as an application"""
916 914 app = IPKernelApp.instance()
917 915 app.initialize()
918 916 app.start()
919 917
920 918
921 919 if __name__ == '__main__':
922 920 main()
General Comments 0
You need to be logged in to leave comments. Login now