##// END OF EJS Templates
scrub: ident on iopub
MinRK -
Show More
@@ -1,865 +1,869 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 7 call set_parent on all the PUB objects with the message about to be executed.
8 8 * Implement random port and security key logic.
9 9 * Implement control messages.
10 10 * Implement event loop and poll version.
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 # Standard library imports
19 19 import __builtin__
20 20 import atexit
21 21 import sys
22 22 import time
23 23 import traceback
24 24 import logging
25 25 import uuid
26 26
27 27 from datetime import datetime
28 28 from signal import (
29 29 signal, getsignal, default_int_handler, SIGINT, SIG_IGN
30 30 )
31 31
32 32 # System library imports
33 33 import zmq
34 34 from zmq.eventloop import ioloop
35 35 from zmq.eventloop.zmqstream import ZMQStream
36 36
37 37 # Local imports
38 38 from IPython.core import pylabtools
39 39 from IPython.config.configurable import Configurable
40 40 from IPython.config.application import boolean_flag, catch_config_error
41 41 from IPython.core.application import ProfileDir
42 42 from IPython.core.error import StdinNotImplementedError
43 43 from IPython.core.shellapp import (
44 44 InteractiveShellApp, shell_flags, shell_aliases
45 45 )
46 46 from IPython.utils import io
47 47 from IPython.utils import py3compat
48 48 from IPython.utils.frame import extract_module_locals
49 49 from IPython.utils.jsonutil import json_clean
50 50 from IPython.utils.traitlets import (
51 51 Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode
52 52 )
53 53
54 54 from entry_point import base_launch_kernel
55 55 from kernelapp import KernelApp, kernel_flags, kernel_aliases
56 56 from serialize import serialize_object, unpack_apply_message
57 57 from session import Session, Message
58 58 from zmqshell import ZMQInteractiveShell
59 59
60 60
61 61 #-----------------------------------------------------------------------------
62 62 # Main kernel class
63 63 #-----------------------------------------------------------------------------
64 64
65 65 class Kernel(Configurable):
66 66
67 67 #---------------------------------------------------------------------------
68 68 # Kernel interface
69 69 #---------------------------------------------------------------------------
70 70
71 71 # attribute to override with a GUI
72 72 eventloop = Any(None)
73 73 def _eventloop_changed(self, name, old, new):
74 74 """schedule call to eventloop from IOLoop"""
75 75 loop = ioloop.IOLoop.instance()
76 76 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
77 77
78 78 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
79 79 session = Instance(Session)
80 80 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
81 81 shell_streams = List()
82 82 control_stream = Instance(ZMQStream)
83 83 iopub_stream = Instance(ZMQStream)
84 84 stdin_socket = Instance(zmq.Socket)
85 85 log = Instance(logging.Logger)
86 86
87 87 user_module = Instance('types.ModuleType')
88 88 def _user_module_changed(self, name, old, new):
89 89 if self.shell is not None:
90 90 self.shell.user_module = new
91 91
92 92 user_ns = Dict(default_value=None)
93 93 def _user_ns_changed(self, name, old, new):
94 94 if self.shell is not None:
95 95 self.shell.user_ns = new
96 96 self.shell.init_user_ns()
97 97
98 98 # identities:
99 99 int_id = Integer(-1)
100 100 ident = Unicode()
101 101
102 102 def _ident_default(self):
103 103 return unicode(uuid.uuid4())
104 104
105 105
106 106 # Private interface
107 107
108 108 # Time to sleep after flushing the stdout/err buffers in each execute
109 109 # cycle. While this introduces a hard limit on the minimal latency of the
110 110 # execute cycle, it helps prevent output synchronization problems for
111 111 # clients.
112 112 # Units are in seconds. The minimum zmq latency on local host is probably
113 113 # ~150 microseconds, set this to 500us for now. We may need to increase it
114 114 # a little if it's not enough after more interactive testing.
115 115 _execute_sleep = Float(0.0005, config=True)
116 116
117 117 # Frequency of the kernel's event loop.
118 118 # Units are in seconds, kernel subclasses for GUI toolkits may need to
119 119 # adapt to milliseconds.
120 120 _poll_interval = Float(0.05, config=True)
121 121
122 122 # If the shutdown was requested over the network, we leave here the
123 123 # necessary reply message so it can be sent by our registered atexit
124 124 # handler. This ensures that the reply is only sent to clients truly at
125 125 # the end of our shutdown process (which happens after the underlying
126 126 # IPython shell's own shutdown).
127 127 _shutdown_message = None
128 128
129 129 # This is a dict of port number that the kernel is listening on. It is set
130 130 # by record_ports and used by connect_request.
131 131 _recorded_ports = Dict()
132 132
133 133 # set of aborted msg_ids
134 134 aborted = Set()
135 135
136 136
137 137 def __init__(self, **kwargs):
138 138 super(Kernel, self).__init__(**kwargs)
139 139
140 140 # Initialize the InteractiveShell subclass
141 141 self.shell = ZMQInteractiveShell.instance(config=self.config,
142 142 profile_dir = self.profile_dir,
143 143 user_module = self.user_module,
144 144 user_ns = self.user_ns,
145 145 )
146 146 self.shell.displayhook.session = self.session
147 147 self.shell.displayhook.pub_socket = self.iopub_stream.socket
148 148 self.shell.display_pub.session = self.session
149 149 self.shell.display_pub.pub_socket = self.iopub_stream.socket
150 150
151 151 # TMP - hack while developing
152 152 self.shell._reply_content = None
153 153
154 154 # Build dict of handlers for message types
155 155 msg_types = [ 'execute_request', 'complete_request',
156 156 'object_info_request', 'history_request',
157 157 'connect_request', 'shutdown_request',
158 158 'apply_request',
159 159 ]
160 160 self.shell_handlers = {}
161 161 for msg_type in msg_types:
162 162 self.shell_handlers[msg_type] = getattr(self, msg_type)
163 163
164 164 control_msg_types = [ 'clear_request', 'abort_request' ]
165 165 self.control_handlers = {}
166 166 for msg_type in control_msg_types:
167 167 self.control_handlers[msg_type] = getattr(self, msg_type)
168 168
169 169 def dispatch_control(self, msg):
170 170 """dispatch control requests"""
171 171 idents,msg = self.session.feed_identities(msg, copy=False)
172 172 try:
173 173 msg = self.session.unserialize(msg, content=True, copy=False)
174 174 except:
175 175 self.log.error("Invalid Control Message", exc_info=True)
176 176 return
177 177
178 178 self.log.debug("Control received: %s", msg)
179 179
180 180 header = msg['header']
181 181 msg_id = header['msg_id']
182 182 msg_type = header['msg_type']
183 183
184 184 handler = self.control_handlers.get(msg_type, None)
185 185 if handler is None:
186 186 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
187 187 else:
188 188 try:
189 189 handler(self.control_stream, idents, msg)
190 190 except Exception:
191 191 self.log.error("Exception in control handler:", exc_info=True)
192 192
193 193 def dispatch_shell(self, stream, msg):
194 194 """dispatch shell requests"""
195 195 # flush control requests first
196 196 if self.control_stream:
197 197 self.control_stream.flush()
198 198
199 199 idents,msg = self.session.feed_identities(msg, copy=False)
200 200 try:
201 201 msg = self.session.unserialize(msg, content=True, copy=False)
202 202 except:
203 203 self.log.error("Invalid Message", exc_info=True)
204 204 return
205 205
206 206 header = msg['header']
207 207 msg_id = header['msg_id']
208 208 msg_type = msg['header']['msg_type']
209 209
210 210 # Print some info about this message and leave a '--->' marker, so it's
211 211 # easier to trace visually the message chain when debugging. Each
212 212 # handler prints its message at the end.
213 213 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
214 214 self.log.debug(' Content: %s\n --->\n ', msg['content'])
215 215
216 216 if msg_id in self.aborted:
217 217 self.aborted.remove(msg_id)
218 218 # is it safe to assume a msg_id will not be resubmitted?
219 219 reply_type = msg_type.split('_')[0] + '_reply'
220 220 status = {'status' : 'aborted'}
221 221 reply_msg = self.session.send(stream, reply_type, subheader=status,
222 222 content=status, parent=msg, ident=idents)
223 223 return
224 224
225 225 handler = self.shell_handlers.get(msg_type, None)
226 226 if handler is None:
227 227 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
228 228 else:
229 229 # ensure default_int_handler during handler call
230 230 sig = signal(SIGINT, default_int_handler)
231 231 try:
232 232 handler(stream, idents, msg)
233 233 except Exception:
234 234 self.log.error("Exception in message handler:", exc_info=True)
235 235 finally:
236 236 signal(SIGINT, sig)
237 237
238 238 def enter_eventloop(self):
239 239 """enter eventloop"""
240 240 self.log.critical("entering eventloop")
241 241 # restore default_int_handler
242 242 signal(SIGINT, default_int_handler)
243 243 self.eventloop(self)
244 244 self.log.critical("exiting eventloop")
245 245 # if eventloop exits, IOLoop should stop
246 246 ioloop.IOLoop.instance().stop()
247 247
248 248 def start(self):
249 249 """register dispatchers for streams"""
250 250 if self.control_stream:
251 251 self.control_stream.on_recv(self.dispatch_control, copy=False)
252 252
253 253 def make_dispatcher(stream):
254 254 def dispatcher(msg):
255 255 return self.dispatch_shell(stream, msg)
256 256 return dispatcher
257 257
258 258 for s in self.shell_streams:
259 259 s.on_recv(make_dispatcher(s), copy=False)
260 260
261 261 def do_one_iteration(self):
262 262 """step eventloop just once"""
263 263 if self.control_stream:
264 264 self.control_stream.flush()
265 265 for stream in self.shell_streams:
266 266 # handle at most one request per iteration
267 267 stream.flush(zmq.POLLIN, 1)
268 268 stream.flush(zmq.POLLOUT)
269 269 self.iopub_stream.flush()
270 270
271 271
272 272 def record_ports(self, ports):
273 273 """Record the ports that this kernel is using.
274 274
275 275 The creator of the Kernel instance must call this methods if they
276 276 want the :meth:`connect_request` method to return the port numbers.
277 277 """
278 278 self._recorded_ports = ports
279 279
280 280 #---------------------------------------------------------------------------
281 281 # Kernel request handlers
282 282 #---------------------------------------------------------------------------
283 283
284 284 def _publish_pyin(self, code, parent, execution_count):
285 285 """Publish the code request on the pyin stream."""
286 286
287 self.session.send(self.iopub_stream, u'pyin', {u'code':code,
288 u'execution_count': execution_count}, parent=parent)
287 self.session.send(self.iopub_stream, u'pyin',
288 {u'code':code, u'execution_count': execution_count},
289 parent=parent,
290 )
289 291
290 292 def execute_request(self, stream, ident, parent):
291 293
292 294 self.session.send(self.iopub_stream,
293 295 u'status',
294 296 {u'execution_state':u'busy'},
295 parent=parent )
297 parent=parent,
298 ident='status',
299 )
296 300
297 301 try:
298 302 content = parent[u'content']
299 303 code = content[u'code']
300 304 silent = content[u'silent']
301 305 except:
302 306 self.log.error("Got bad msg: ")
303 307 self.log.error("%s", parent)
304 308 return
305 309
306 310 shell = self.shell # we'll need this a lot here
307 311
308 312 # Replace raw_input. Note that is not sufficient to replace
309 313 # raw_input in the user namespace.
310 314 if content.get('allow_stdin', False):
311 315 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
312 316 else:
313 317 raw_input = lambda prompt='' : self._no_raw_input()
314 318
315 319 if py3compat.PY3:
316 320 __builtin__.input = raw_input
317 321 else:
318 322 __builtin__.raw_input = raw_input
319 323
320 324 # Set the parent message of the display hook and out streams.
321 325 shell.displayhook.set_parent(parent)
322 326 shell.display_pub.set_parent(parent)
323 327 sys.stdout.set_parent(parent)
324 328 sys.stderr.set_parent(parent)
325 329
326 330 # Re-broadcast our input for the benefit of listening clients, and
327 331 # start computing output
328 332 if not silent:
329 333 self._publish_pyin(code, parent, shell.execution_count)
330 334
331 335 reply_content = {}
332 336 try:
333 337 if silent:
334 338 # run_code uses 'exec' mode, so no displayhook will fire, and it
335 339 # doesn't call logging or history manipulations. Print
336 340 # statements in that code will obviously still execute.
337 341 shell.run_code(code)
338 342 else:
339 343 # FIXME: the shell calls the exception handler itself.
340 344 shell.run_cell(code, store_history=True)
341 345 except:
342 346 status = u'error'
343 347 # FIXME: this code right now isn't being used yet by default,
344 348 # because the run_cell() call above directly fires off exception
345 349 # reporting. This code, therefore, is only active in the scenario
346 350 # where runlines itself has an unhandled exception. We need to
347 351 # uniformize this, for all exception construction to come from a
348 352 # single location in the codbase.
349 353 etype, evalue, tb = sys.exc_info()
350 354 tb_list = traceback.format_exception(etype, evalue, tb)
351 355 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
352 356 else:
353 357 status = u'ok'
354 358
355 359 reply_content[u'status'] = status
356 360
357 361 # Return the execution counter so clients can display prompts
358 362 reply_content['execution_count'] = shell.execution_count -1
359 363
360 364 # FIXME - fish exception info out of shell, possibly left there by
361 365 # runlines. We'll need to clean up this logic later.
362 366 if shell._reply_content is not None:
363 367 reply_content.update(shell._reply_content)
364 368 # reset after use
365 369 shell._reply_content = None
366 370
367 371 # At this point, we can tell whether the main code execution succeeded
368 372 # or not. If it did, we proceed to evaluate user_variables/expressions
369 373 if reply_content['status'] == 'ok':
370 374 reply_content[u'user_variables'] = \
371 375 shell.user_variables(content.get(u'user_variables', []))
372 376 reply_content[u'user_expressions'] = \
373 377 shell.user_expressions(content.get(u'user_expressions', {}))
374 378 else:
375 379 # If there was an error, don't even try to compute variables or
376 380 # expressions
377 381 reply_content[u'user_variables'] = {}
378 382 reply_content[u'user_expressions'] = {}
379 383
380 384 # Payloads should be retrieved regardless of outcome, so we can both
381 385 # recover partial output (that could have been generated early in a
382 386 # block, before an error) and clear the payload system always.
383 387 reply_content[u'payload'] = shell.payload_manager.read_payload()
384 388 # Be agressive about clearing the payload because we don't want
385 389 # it to sit in memory until the next execute_request comes in.
386 390 shell.payload_manager.clear_payload()
387 391
388 392 # Flush output before sending the reply.
389 393 sys.stdout.flush()
390 394 sys.stderr.flush()
391 395 # FIXME: on rare occasions, the flush doesn't seem to make it to the
392 396 # clients... This seems to mitigate the problem, but we definitely need
393 397 # to better understand what's going on.
394 398 if self._execute_sleep:
395 399 time.sleep(self._execute_sleep)
396 400
397 401 # Send the reply.
398 402 reply_content = json_clean(reply_content)
399 403 reply_msg = self.session.send(stream, u'execute_reply',
400 404 reply_content, parent, ident=ident)
401 405 self.log.debug("%s", reply_msg)
402 406
403 407 if reply_msg['content']['status'] == u'error':
404 408 self._abort_queues()
405 409
406 410 self.session.send(self.iopub_stream,
407 411 u'status',
408 412 {u'execution_state':u'idle'},
409 413 parent=parent )
410 414
411 415 def complete_request(self, stream, ident, parent):
412 416 txt, matches = self._complete(parent)
413 417 matches = {'matches' : matches,
414 418 'matched_text' : txt,
415 419 'status' : 'ok'}
416 420 matches = json_clean(matches)
417 421 completion_msg = self.session.send(stream, 'complete_reply',
418 422 matches, parent, ident)
419 423 self.log.debug("%s", completion_msg)
420 424
421 425 def object_info_request(self, stream, ident, parent):
422 426 content = parent['content']
423 427 object_info = self.shell.object_inspect(content['oname'],
424 428 detail_level = content.get('detail_level', 0)
425 429 )
426 430 # Before we send this object over, we scrub it for JSON usage
427 431 oinfo = json_clean(object_info)
428 432 msg = self.session.send(stream, 'object_info_reply',
429 433 oinfo, parent, ident)
430 434 self.log.debug("%s", msg)
431 435
432 436 def history_request(self, stream, ident, parent):
433 437 # We need to pull these out, as passing **kwargs doesn't work with
434 438 # unicode keys before Python 2.6.5.
435 439 hist_access_type = parent['content']['hist_access_type']
436 440 raw = parent['content']['raw']
437 441 output = parent['content']['output']
438 442 if hist_access_type == 'tail':
439 443 n = parent['content']['n']
440 444 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
441 445 include_latest=True)
442 446
443 447 elif hist_access_type == 'range':
444 448 session = parent['content']['session']
445 449 start = parent['content']['start']
446 450 stop = parent['content']['stop']
447 451 hist = self.shell.history_manager.get_range(session, start, stop,
448 452 raw=raw, output=output)
449 453
450 454 elif hist_access_type == 'search':
451 455 pattern = parent['content']['pattern']
452 456 hist = self.shell.history_manager.search(pattern, raw=raw,
453 457 output=output)
454 458
455 459 else:
456 460 hist = []
457 461 hist = list(hist)
458 462 content = {'history' : hist}
459 463 content = json_clean(content)
460 464 msg = self.session.send(stream, 'history_reply',
461 465 content, parent, ident)
462 466 self.log.debug("Sending history reply with %i entries", len(hist))
463 467
464 468 def connect_request(self, stream, ident, parent):
465 469 if self._recorded_ports is not None:
466 470 content = self._recorded_ports.copy()
467 471 else:
468 472 content = {}
469 473 msg = self.session.send(stream, 'connect_reply',
470 474 content, parent, ident)
471 475 self.log.debug("%s", msg)
472 476
473 477 def shutdown_request(self, stream, ident, parent):
474 478 self.shell.exit_now = True
475 479 self._shutdown_message = self.session.msg(u'shutdown_reply',
476 480 parent['content'], parent
477 481 )
478 482 # self.session.send(stream, self._shutdown_message, ident=ident)
479 483
480 484 self._at_shutdown()
481 485 # call sys.exit after a short delay
482 486 ioloop.IOLoop.instance().add_timeout(time.time()+0.05, lambda : sys.exit(0))
483 487
484 488 #---------------------------------------------------------------------------
485 489 # Engine methods
486 490 #---------------------------------------------------------------------------
487 491
488 492 def apply_request(self, stream, ident, parent):
489 493 try:
490 494 content = parent[u'content']
491 495 bufs = parent[u'buffers']
492 496 msg_id = parent['header']['msg_id']
493 497 # bound = parent['header'].get('bound', False)
494 498 except:
495 499 self.log.error("Got bad msg: %s", parent, exc_info=True)
496 500 return
497 501 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
498 502 # self.iopub_stream.send(pyin_msg)
499 503 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
500 504 sub = {'dependencies_met' : True, 'engine' : self.ident,
501 505 'started': datetime.now()}
502 506 try:
503 507 # allow for not overriding displayhook
504 508 if hasattr(sys.displayhook, 'set_parent'):
505 509 sys.displayhook.set_parent(parent)
506 510 sys.stdout.set_parent(parent)
507 511 sys.stderr.set_parent(parent)
508 512 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
509 513 working = self.user_ns
510 514 # suffix =
511 515 prefix = "_"+str(msg_id).replace("-","")+"_"
512 516
513 517 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
514 518 # if bound:
515 519 # bound_ns = Namespace(working)
516 520 # args = [bound_ns]+list(args)
517 521
518 522 fname = getattr(f, '__name__', 'f')
519 523
520 524 fname = prefix+"f"
521 525 argname = prefix+"args"
522 526 kwargname = prefix+"kwargs"
523 527 resultname = prefix+"result"
524 528
525 529 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
526 530 # print ns
527 531 working.update(ns)
528 532 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
529 533 try:
530 534 exec code in working,working
531 535 result = working.get(resultname)
532 536 finally:
533 537 for key in ns.iterkeys():
534 538 working.pop(key)
535 539 # if bound:
536 540 # working.update(bound_ns)
537 541
538 542 packed_result,buf = serialize_object(result)
539 543 result_buf = [packed_result]+buf
540 544 except:
541 545 exc_content = self._wrap_exception('apply')
542 546 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
543 547 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
544 548 ident=py3compat.str_to_bytes('%s.pyerr'%self.prefix))
545 549 reply_content = exc_content
546 550 result_buf = []
547 551
548 552 if exc_content['ename'] == 'UnmetDependency':
549 553 sub['dependencies_met'] = False
550 554 else:
551 555 reply_content = {'status' : 'ok'}
552 556
553 557 # put 'ok'/'error' status in header, for scheduler introspection:
554 558 sub['status'] = reply_content['status']
555 559
556 560 # flush i/o
557 561 sys.stdout.flush()
558 562 sys.stderr.flush()
559 563
560 564 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
561 565 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
562 566
563 567 #---------------------------------------------------------------------------
564 568 # Control messages
565 569 #---------------------------------------------------------------------------
566 570
567 571 def abort_request(self, stream, ident, parent):
568 572 """abort a specifig msg by id"""
569 573 msg_ids = parent['content'].get('msg_ids', None)
570 574 if isinstance(msg_ids, basestring):
571 575 msg_ids = [msg_ids]
572 576 if not msg_ids:
573 577 self.abort_queues()
574 578 for mid in msg_ids:
575 579 self.aborted.add(str(mid))
576 580
577 581 content = dict(status='ok')
578 582 reply_msg = self.session.send(stream, 'abort_reply', content=content,
579 583 parent=parent, ident=ident)
580 584 self.log.debug("%s", reply_msg)
581 585
582 586 def clear_request(self, stream, idents, parent):
583 587 """Clear our namespace."""
584 588 self.user_ns = {}
585 589 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
586 590 content = dict(status='ok'))
587 591 self._initial_exec_lines()
588 592
589 593
590 594 #---------------------------------------------------------------------------
591 595 # Protected interface
592 596 #---------------------------------------------------------------------------
593 597
594 598 def _abort_queues(self):
595 599 for stream in self.shell_streams:
596 600 if stream:
597 601 self._abort_queue(stream)
598 602
599 603 def _abort_queue(self, stream):
600 604 while True:
601 605 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
602 606 if msg is None:
603 607 return
604 608
605 609 self.log.info("Aborting:")
606 610 self.log.info("%s", msg)
607 611 msg_type = msg['header']['msg_type']
608 612 reply_type = msg_type.split('_')[0] + '_reply'
609 613 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
610 614 # self.reply_stream.send(ident,zmq.SNDMORE)
611 615 # self.reply_stream.send_json(reply_msg)
612 616 reply_msg = self.session.send(stream, reply_type,
613 617 content={'status' : 'aborted'}, parent=msg, ident=idents)
614 618 self.log.debug("%s", reply_msg)
615 619 # We need to wait a bit for requests to come in. This can probably
616 620 # be set shorter for true asynchronous clients.
617 621 time.sleep(0.05)
618 622
619 623
620 624 def _no_raw_input(self):
621 625 """Raise StdinNotImplentedError if active frontend doesn't support
622 626 stdin."""
623 627 raise StdinNotImplementedError("raw_input was called, but this "
624 628 "frontend does not support stdin.")
625 629
626 630 def _raw_input(self, prompt, ident, parent):
627 631 # Flush output before making the request.
628 632 sys.stderr.flush()
629 633 sys.stdout.flush()
630 634
631 635 # Send the input request.
632 636 content = json_clean(dict(prompt=prompt))
633 637 self.session.send(self.stdin_socket, u'input_request', content, parent,
634 638 ident=ident)
635 639
636 640 # Await a response.
637 641 while True:
638 642 try:
639 643 ident, reply = self.session.recv(self.stdin_socket, 0)
640 644 except Exception:
641 645 self.log.warn("Invalid Message:", exc_info=True)
642 646 else:
643 647 break
644 648 try:
645 649 value = reply['content']['value']
646 650 except:
647 651 self.log.error("Got bad raw_input reply: ")
648 652 self.log.error("%s", parent)
649 653 value = ''
650 654 if value == '\x04':
651 655 # EOF
652 656 raise EOFError
653 657 return value
654 658
655 659 def _complete(self, msg):
656 660 c = msg['content']
657 661 try:
658 662 cpos = int(c['cursor_pos'])
659 663 except:
660 664 # If we don't get something that we can convert to an integer, at
661 665 # least attempt the completion guessing the cursor is at the end of
662 666 # the text, if there's any, and otherwise of the line
663 667 cpos = len(c['text'])
664 668 if cpos==0:
665 669 cpos = len(c['line'])
666 670 return self.shell.complete(c['text'], c['line'], cpos)
667 671
668 672 def _object_info(self, context):
669 673 symbol, leftover = self._symbol_from_context(context)
670 674 if symbol is not None and not leftover:
671 675 doc = getattr(symbol, '__doc__', '')
672 676 else:
673 677 doc = ''
674 678 object_info = dict(docstring = doc)
675 679 return object_info
676 680
677 681 def _symbol_from_context(self, context):
678 682 if not context:
679 683 return None, context
680 684
681 685 base_symbol_string = context[0]
682 686 symbol = self.shell.user_ns.get(base_symbol_string, None)
683 687 if symbol is None:
684 688 symbol = __builtin__.__dict__.get(base_symbol_string, None)
685 689 if symbol is None:
686 690 return None, context
687 691
688 692 context = context[1:]
689 693 for i, name in enumerate(context):
690 694 new_symbol = getattr(symbol, name, None)
691 695 if new_symbol is None:
692 696 return symbol, context[i:]
693 697 else:
694 698 symbol = new_symbol
695 699
696 700 return symbol, []
697 701
698 702 def _at_shutdown(self):
699 703 """Actions taken at shutdown by the kernel, called by python's atexit.
700 704 """
701 705 # io.rprint("Kernel at_shutdown") # dbg
702 706 if self._shutdown_message is not None:
703 707 self.session.send(self.iopub_stream, self._shutdown_message)
704 708 self.log.debug("%s", self._shutdown_message)
705 709 [ s.flush(zmq.POLLOUT) for s in self.shell_streams + [self.iopub_stream] ]
706 710
707 711 #-----------------------------------------------------------------------------
708 712 # Aliases and Flags for the IPKernelApp
709 713 #-----------------------------------------------------------------------------
710 714
711 715 flags = dict(kernel_flags)
712 716 flags.update(shell_flags)
713 717
714 718 addflag = lambda *args: flags.update(boolean_flag(*args))
715 719
716 720 flags['pylab'] = (
717 721 {'IPKernelApp' : {'pylab' : 'auto'}},
718 722 """Pre-load matplotlib and numpy for interactive use with
719 723 the default matplotlib backend."""
720 724 )
721 725
722 726 aliases = dict(kernel_aliases)
723 727 aliases.update(shell_aliases)
724 728
725 729 # it's possible we don't want short aliases for *all* of these:
726 730 aliases.update(dict(
727 731 pylab='IPKernelApp.pylab',
728 732 ))
729 733
730 734 #-----------------------------------------------------------------------------
731 735 # The IPKernelApp class
732 736 #-----------------------------------------------------------------------------
733 737
734 738 class IPKernelApp(KernelApp, InteractiveShellApp):
735 739 name = 'ipkernel'
736 740
737 741 aliases = Dict(aliases)
738 742 flags = Dict(flags)
739 743 classes = [Kernel, ZMQInteractiveShell, ProfileDir, Session]
740 744
741 745 # configurables
742 746 pylab = CaselessStrEnum(['tk', 'qt', 'wx', 'gtk', 'osx', 'inline', 'auto'],
743 747 config=True,
744 748 help="""Pre-load matplotlib and numpy for interactive use,
745 749 selecting a particular matplotlib backend and loop integration.
746 750 """
747 751 )
748 752
749 753 @catch_config_error
750 754 def initialize(self, argv=None):
751 755 super(IPKernelApp, self).initialize(argv)
752 756 self.init_path()
753 757 self.init_shell()
754 758 self.init_extensions()
755 759 self.init_code()
756 760
757 761 def init_kernel(self):
758 762
759 763 shell_stream = ZMQStream(self.shell_socket)
760 764 iopub_stream = ZMQStream(self.iopub_socket)
761 765
762 766 kernel = Kernel(config=self.config, session=self.session,
763 767 shell_streams=[shell_stream],
764 768 iopub_stream=iopub_stream,
765 769 stdin_socket=self.stdin_socket,
766 770 log=self.log,
767 771 profile_dir=self.profile_dir,
768 772 )
769 773 self.kernel = kernel
770 774 kernel.record_ports(self.ports)
771 775 shell = kernel.shell
772 776 if self.pylab:
773 777 try:
774 778 gui, backend = pylabtools.find_gui_and_backend(self.pylab)
775 779 shell.enable_pylab(gui, import_all=self.pylab_import_all)
776 780 except Exception:
777 781 self.log.error("Pylab initialization failed", exc_info=True)
778 782 # print exception straight to stdout, because normally
779 783 # _showtraceback associates the reply with an execution,
780 784 # which means frontends will never draw it, as this exception
781 785 # is not associated with any execute request.
782 786
783 787 # replace pyerr-sending traceback with stdout
784 788 _showtraceback = shell._showtraceback
785 789 def print_tb(etype, evalue, stb):
786 790 print ("Error initializing pylab, pylab mode will not "
787 791 "be active", file=io.stderr)
788 792 print (shell.InteractiveTB.stb2text(stb), file=io.stdout)
789 793 shell._showtraceback = print_tb
790 794
791 795 # send the traceback over stdout
792 796 shell.showtraceback(tb_offset=0)
793 797
794 798 # restore proper _showtraceback method
795 799 shell._showtraceback = _showtraceback
796 800
797 801
798 802 def init_shell(self):
799 803 self.shell = self.kernel.shell
800 804 self.shell.configurables.append(self)
801 805
802 806
803 807 #-----------------------------------------------------------------------------
804 808 # Kernel main and launch functions
805 809 #-----------------------------------------------------------------------------
806 810
807 811 def launch_kernel(*args, **kwargs):
808 812 """Launches a localhost IPython kernel, binding to the specified ports.
809 813
810 814 This function simply calls entry_point.base_launch_kernel with the right
811 815 first command to start an ipkernel. See base_launch_kernel for arguments.
812 816
813 817 Returns
814 818 -------
815 819 A tuple of form:
816 820 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
817 821 where kernel_process is a Popen object and the ports are integers.
818 822 """
819 823 return base_launch_kernel('from IPython.zmq.ipkernel import main; main()',
820 824 *args, **kwargs)
821 825
822 826
823 827 def embed_kernel(module=None, local_ns=None, **kwargs):
824 828 """Embed and start an IPython kernel in a given scope.
825 829
826 830 Parameters
827 831 ----------
828 832 module : ModuleType, optional
829 833 The module to load into IPython globals (default: caller)
830 834 local_ns : dict, optional
831 835 The namespace to load into IPython user namespace (default: caller)
832 836
833 837 kwargs : various, optional
834 838 Further keyword args are relayed to the KernelApp constructor,
835 839 allowing configuration of the Kernel. Will only have an effect
836 840 on the first embed_kernel call for a given process.
837 841
838 842 """
839 843 # get the app if it exists, or set it up if it doesn't
840 844 if IPKernelApp.initialized():
841 845 app = IPKernelApp.instance()
842 846 else:
843 847 app = IPKernelApp.instance(**kwargs)
844 848 app.initialize([])
845 849
846 850 # load the calling scope if not given
847 851 (caller_module, caller_locals) = extract_module_locals(1)
848 852 if module is None:
849 853 module = caller_module
850 854 if local_ns is None:
851 855 local_ns = caller_locals
852 856
853 857 app.kernel.user_module = module
854 858 app.kernel.user_ns = local_ns
855 859 app.start()
856 860
857 861 def main():
858 862 """Run an IPKernel as an application"""
859 863 app = IPKernelApp.instance()
860 864 app.initialize()
861 865 app.start()
862 866
863 867
864 868 if __name__ == '__main__':
865 869 main()
General Comments 0
You need to be logged in to leave comments. Login now