##// END OF EJS Templates
Update error messages.
Bradley M. Froehle -
Show More
@@ -1,933 +1,934 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 7 call set_parent on all the PUB objects with the message about to be executed.
8 8 * Implement random port and security key logic.
9 9 * Implement control messages.
10 10 * Implement event loop and poll version.
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 # Standard library imports
19 19 import __builtin__
20 20 import atexit
21 21 import sys
22 22 import time
23 23 import traceback
24 24 import logging
25 25 import uuid
26 26
27 27 from datetime import datetime
28 28 from signal import (
29 29 signal, getsignal, default_int_handler, SIGINT, SIG_IGN
30 30 )
31 31
32 32 # System library imports
33 33 import zmq
34 34 from zmq.eventloop import ioloop
35 35 from zmq.eventloop.zmqstream import ZMQStream
36 36
37 37 # Local imports
38 38 from IPython.core import pylabtools
39 39 from IPython.config.configurable import Configurable
40 40 from IPython.config.application import boolean_flag, catch_config_error
41 41 from IPython.core.application import ProfileDir
42 42 from IPython.core.error import StdinNotImplementedError
43 43 from IPython.core.shellapp import (
44 44 InteractiveShellApp, shell_flags, shell_aliases
45 45 )
46 46 from IPython.utils import io
47 47 from IPython.utils import py3compat
48 48 from IPython.utils.frame import extract_module_locals
49 49 from IPython.utils.jsonutil import json_clean
50 50 from IPython.utils.traitlets import (
51 51 Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode
52 52 )
53 53
54 54 from entry_point import base_launch_kernel
55 55 from kernelapp import KernelApp, kernel_flags, kernel_aliases
56 56 from serialize import serialize_object, unpack_apply_message
57 57 from session import Session, Message
58 58 from zmqshell import ZMQInteractiveShell
59 59
60 60
61 61 #-----------------------------------------------------------------------------
62 62 # Main kernel class
63 63 #-----------------------------------------------------------------------------
64 64
65 65 class Kernel(Configurable):
66 66
67 67 #---------------------------------------------------------------------------
68 68 # Kernel interface
69 69 #---------------------------------------------------------------------------
70 70
71 71 # attribute to override with a GUI
72 72 eventloop = Any(None)
73 73 def _eventloop_changed(self, name, old, new):
74 74 """schedule call to eventloop from IOLoop"""
75 75 loop = ioloop.IOLoop.instance()
76 76 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
77 77
78 78 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
79 79 session = Instance(Session)
80 80 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
81 81 shell_streams = List()
82 82 control_stream = Instance(ZMQStream)
83 83 iopub_socket = Instance(zmq.Socket)
84 84 stdin_socket = Instance(zmq.Socket)
85 85 log = Instance(logging.Logger)
86 86
87 87 user_module = Any()
88 88 def _user_module_changed(self, name, old, new):
89 89 if self.shell is not None:
90 90 self.shell.user_module = new
91 91
92 92 user_ns = Dict(default_value=None)
93 93 def _user_ns_changed(self, name, old, new):
94 94 if self.shell is not None:
95 95 self.shell.user_ns = new
96 96 self.shell.init_user_ns()
97 97
98 98 # identities:
99 99 int_id = Integer(-1)
100 100 ident = Unicode()
101 101
102 102 def _ident_default(self):
103 103 return unicode(uuid.uuid4())
104 104
105 105
106 106 # Private interface
107 107
108 108 # Time to sleep after flushing the stdout/err buffers in each execute
109 109 # cycle. While this introduces a hard limit on the minimal latency of the
110 110 # execute cycle, it helps prevent output synchronization problems for
111 111 # clients.
112 112 # Units are in seconds. The minimum zmq latency on local host is probably
113 113 # ~150 microseconds, set this to 500us for now. We may need to increase it
114 114 # a little if it's not enough after more interactive testing.
115 115 _execute_sleep = Float(0.0005, config=True)
116 116
117 117 # Frequency of the kernel's event loop.
118 118 # Units are in seconds, kernel subclasses for GUI toolkits may need to
119 119 # adapt to milliseconds.
120 120 _poll_interval = Float(0.05, config=True)
121 121
122 122 # If the shutdown was requested over the network, we leave here the
123 123 # necessary reply message so it can be sent by our registered atexit
124 124 # handler. This ensures that the reply is only sent to clients truly at
125 125 # the end of our shutdown process (which happens after the underlying
126 126 # IPython shell's own shutdown).
127 127 _shutdown_message = None
128 128
129 129 # This is a dict of port number that the kernel is listening on. It is set
130 130 # by record_ports and used by connect_request.
131 131 _recorded_ports = Dict()
132 132
133 133 # set of aborted msg_ids
134 134 aborted = Set()
135 135
136 136
137 137 def __init__(self, **kwargs):
138 138 super(Kernel, self).__init__(**kwargs)
139 139
140 140 # Initialize the InteractiveShell subclass
141 141 self.shell = ZMQInteractiveShell.instance(config=self.config,
142 142 profile_dir = self.profile_dir,
143 143 user_module = self.user_module,
144 144 user_ns = self.user_ns,
145 145 )
146 146 self.shell.displayhook.session = self.session
147 147 self.shell.displayhook.pub_socket = self.iopub_socket
148 148 self.shell.displayhook.topic = self._topic('pyout')
149 149 self.shell.display_pub.session = self.session
150 150 self.shell.display_pub.pub_socket = self.iopub_socket
151 151
152 152 # TMP - hack while developing
153 153 self.shell._reply_content = None
154 154
155 155 # Build dict of handlers for message types
156 156 msg_types = [ 'execute_request', 'complete_request',
157 157 'object_info_request', 'history_request',
158 158 'connect_request', 'shutdown_request',
159 159 'apply_request',
160 160 ]
161 161 self.shell_handlers = {}
162 162 for msg_type in msg_types:
163 163 self.shell_handlers[msg_type] = getattr(self, msg_type)
164 164
165 165 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
166 166 self.control_handlers = {}
167 167 for msg_type in control_msg_types:
168 168 self.control_handlers[msg_type] = getattr(self, msg_type)
169 169
170 170 def dispatch_control(self, msg):
171 171 """dispatch control requests"""
172 172 idents,msg = self.session.feed_identities(msg, copy=False)
173 173 try:
174 174 msg = self.session.unserialize(msg, content=True, copy=False)
175 175 except:
176 176 self.log.error("Invalid Control Message", exc_info=True)
177 177 return
178 178
179 179 self.log.debug("Control received: %s", msg)
180 180
181 181 header = msg['header']
182 182 msg_id = header['msg_id']
183 183 msg_type = header['msg_type']
184 184
185 185 handler = self.control_handlers.get(msg_type, None)
186 186 if handler is None:
187 187 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
188 188 else:
189 189 try:
190 190 handler(self.control_stream, idents, msg)
191 191 except Exception:
192 192 self.log.error("Exception in control handler:", exc_info=True)
193 193
194 194 def dispatch_shell(self, stream, msg):
195 195 """dispatch shell requests"""
196 196 # flush control requests first
197 197 if self.control_stream:
198 198 self.control_stream.flush()
199 199
200 200 idents,msg = self.session.feed_identities(msg, copy=False)
201 201 try:
202 202 msg = self.session.unserialize(msg, content=True, copy=False)
203 203 except:
204 204 self.log.error("Invalid Message", exc_info=True)
205 205 return
206 206
207 207 header = msg['header']
208 208 msg_id = header['msg_id']
209 209 msg_type = msg['header']['msg_type']
210 210
211 211 # Print some info about this message and leave a '--->' marker, so it's
212 212 # easier to trace visually the message chain when debugging. Each
213 213 # handler prints its message at the end.
214 214 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
215 215 self.log.debug(' Content: %s\n --->\n ', msg['content'])
216 216
217 217 if msg_id in self.aborted:
218 218 self.aborted.remove(msg_id)
219 219 # is it safe to assume a msg_id will not be resubmitted?
220 220 reply_type = msg_type.split('_')[0] + '_reply'
221 221 status = {'status' : 'aborted'}
222 222 sub = {'engine' : self.ident}
223 223 sub.update(status)
224 224 reply_msg = self.session.send(stream, reply_type, subheader=sub,
225 225 content=status, parent=msg, ident=idents)
226 226 return
227 227
228 228 handler = self.shell_handlers.get(msg_type, None)
229 229 if handler is None:
230 230 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
231 231 else:
232 232 # ensure default_int_handler during handler call
233 233 sig = signal(SIGINT, default_int_handler)
234 234 try:
235 235 handler(stream, idents, msg)
236 236 except Exception:
237 237 self.log.error("Exception in message handler:", exc_info=True)
238 238 finally:
239 239 signal(SIGINT, sig)
240 240
241 241 def enter_eventloop(self):
242 242 """enter eventloop"""
243 243 self.log.info("entering eventloop")
244 244 # restore default_int_handler
245 245 signal(SIGINT, default_int_handler)
246 246 while self.eventloop is not None:
247 247 try:
248 248 self.eventloop(self)
249 249 except KeyboardInterrupt:
250 250 # Ctrl-C shouldn't crash the kernel
251 251 self.log.error("KeyboardInterrupt caught in kernel")
252 252 continue
253 253 else:
254 254 # eventloop exited cleanly, this means we should stop (right?)
255 255 self.eventloop = None
256 256 break
257 257 self.log.info("exiting eventloop")
258 258 # if eventloop exits, IOLoop should stop
259 259 ioloop.IOLoop.instance().stop()
260 260
261 261 def start(self):
262 262 """register dispatchers for streams"""
263 263 self.shell.exit_now = False
264 264 if self.control_stream:
265 265 self.control_stream.on_recv(self.dispatch_control, copy=False)
266 266
267 267 def make_dispatcher(stream):
268 268 def dispatcher(msg):
269 269 return self.dispatch_shell(stream, msg)
270 270 return dispatcher
271 271
272 272 for s in self.shell_streams:
273 273 s.on_recv(make_dispatcher(s), copy=False)
274 274
275 275 def do_one_iteration(self):
276 276 """step eventloop just once"""
277 277 if self.control_stream:
278 278 self.control_stream.flush()
279 279 for stream in self.shell_streams:
280 280 # handle at most one request per iteration
281 281 stream.flush(zmq.POLLIN, 1)
282 282 stream.flush(zmq.POLLOUT)
283 283
284 284
285 285 def record_ports(self, ports):
286 286 """Record the ports that this kernel is using.
287 287
288 288 The creator of the Kernel instance must call this methods if they
289 289 want the :meth:`connect_request` method to return the port numbers.
290 290 """
291 291 self._recorded_ports = ports
292 292
293 293 #---------------------------------------------------------------------------
294 294 # Kernel request handlers
295 295 #---------------------------------------------------------------------------
296 296
297 297 def _make_subheader(self):
298 298 """init subheader dict, for execute/apply_reply"""
299 299 return {
300 300 'dependencies_met' : True,
301 301 'engine' : self.ident,
302 302 'started': datetime.now(),
303 303 }
304 304
305 305 def _publish_pyin(self, code, parent, execution_count):
306 306 """Publish the code request on the pyin stream."""
307 307
308 308 self.session.send(self.iopub_socket, u'pyin',
309 309 {u'code':code, u'execution_count': execution_count},
310 310 parent=parent, ident=self._topic('pyin')
311 311 )
312 312
313 313 def execute_request(self, stream, ident, parent):
314 314
315 315 self.session.send(self.iopub_socket,
316 316 u'status',
317 317 {u'execution_state':u'busy'},
318 318 parent=parent,
319 319 ident=self._topic('status'),
320 320 )
321 321
322 322 try:
323 323 content = parent[u'content']
324 324 code = content[u'code']
325 325 silent = content[u'silent']
326 326 except:
327 327 self.log.error("Got bad msg: ")
328 328 self.log.error("%s", parent)
329 329 return
330 330
331 331 sub = self._make_subheader()
332 332
333 333 shell = self.shell # we'll need this a lot here
334 334
335 335 # Replace raw_input. Note that is not sufficient to replace
336 336 # raw_input in the user namespace.
337 337 if content.get('allow_stdin', False):
338 338 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
339 339 else:
340 340 raw_input = lambda prompt='' : self._no_raw_input()
341 341
342 342 if py3compat.PY3:
343 343 __builtin__.input = raw_input
344 344 else:
345 345 __builtin__.raw_input = raw_input
346 346
347 347 # Set the parent message of the display hook and out streams.
348 348 shell.displayhook.set_parent(parent)
349 349 shell.display_pub.set_parent(parent)
350 350 sys.stdout.set_parent(parent)
351 351 sys.stderr.set_parent(parent)
352 352
353 353 # Re-broadcast our input for the benefit of listening clients, and
354 354 # start computing output
355 355 if not silent:
356 356 self._publish_pyin(code, parent, shell.execution_count)
357 357
358 358 reply_content = {}
359 359 try:
360 360 # FIXME: the shell calls the exception handler itself.
361 361 shell.run_cell(code, store_history=not silent, silent=silent)
362 362 except:
363 363 status = u'error'
364 364 # FIXME: this code right now isn't being used yet by default,
365 365 # because the run_cell() call above directly fires off exception
366 366 # reporting. This code, therefore, is only active in the scenario
367 367 # where runlines itself has an unhandled exception. We need to
368 368 # uniformize this, for all exception construction to come from a
369 369 # single location in the codbase.
370 370 etype, evalue, tb = sys.exc_info()
371 371 tb_list = traceback.format_exception(etype, evalue, tb)
372 372 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
373 373 else:
374 374 status = u'ok'
375 375
376 376 reply_content[u'status'] = status
377 377
378 378 # Return the execution counter so clients can display prompts
379 379 reply_content['execution_count'] = shell.execution_count - 1
380 380
381 381 # FIXME - fish exception info out of shell, possibly left there by
382 382 # runlines. We'll need to clean up this logic later.
383 383 if shell._reply_content is not None:
384 384 reply_content.update(shell._reply_content)
385 385 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
386 386 reply_content['engine_info'] = e_info
387 387 # reset after use
388 388 shell._reply_content = None
389 389
390 390 # At this point, we can tell whether the main code execution succeeded
391 391 # or not. If it did, we proceed to evaluate user_variables/expressions
392 392 if reply_content['status'] == 'ok':
393 393 reply_content[u'user_variables'] = \
394 394 shell.user_variables(content.get(u'user_variables', []))
395 395 reply_content[u'user_expressions'] = \
396 396 shell.user_expressions(content.get(u'user_expressions', {}))
397 397 else:
398 398 # If there was an error, don't even try to compute variables or
399 399 # expressions
400 400 reply_content[u'user_variables'] = {}
401 401 reply_content[u'user_expressions'] = {}
402 402
403 403 # Payloads should be retrieved regardless of outcome, so we can both
404 404 # recover partial output (that could have been generated early in a
405 405 # block, before an error) and clear the payload system always.
406 406 reply_content[u'payload'] = shell.payload_manager.read_payload()
407 407 # Be agressive about clearing the payload because we don't want
408 408 # it to sit in memory until the next execute_request comes in.
409 409 shell.payload_manager.clear_payload()
410 410
411 411 # Flush output before sending the reply.
412 412 sys.stdout.flush()
413 413 sys.stderr.flush()
414 414 # FIXME: on rare occasions, the flush doesn't seem to make it to the
415 415 # clients... This seems to mitigate the problem, but we definitely need
416 416 # to better understand what's going on.
417 417 if self._execute_sleep:
418 418 time.sleep(self._execute_sleep)
419 419
420 420 # Send the reply.
421 421 reply_content = json_clean(reply_content)
422 422
423 423 sub['status'] = reply_content['status']
424 424 if reply_content['status'] == 'error' and \
425 425 reply_content['ename'] == 'UnmetDependency':
426 426 sub['dependencies_met'] = False
427 427
428 428 reply_msg = self.session.send(stream, u'execute_reply',
429 429 reply_content, parent, subheader=sub,
430 430 ident=ident)
431 431
432 432 self.log.debug("%s", reply_msg)
433 433
434 434 if not silent and reply_msg['content']['status'] == u'error':
435 435 self._abort_queues()
436 436
437 437 self.session.send(self.iopub_socket,
438 438 u'status',
439 439 {u'execution_state':u'idle'},
440 440 parent=parent,
441 441 ident=self._topic('status'))
442 442
443 443 def complete_request(self, stream, ident, parent):
444 444 txt, matches = self._complete(parent)
445 445 matches = {'matches' : matches,
446 446 'matched_text' : txt,
447 447 'status' : 'ok'}
448 448 matches = json_clean(matches)
449 449 completion_msg = self.session.send(stream, 'complete_reply',
450 450 matches, parent, ident)
451 451 self.log.debug("%s", completion_msg)
452 452
453 453 def object_info_request(self, stream, ident, parent):
454 454 content = parent['content']
455 455 object_info = self.shell.object_inspect(content['oname'],
456 456 detail_level = content.get('detail_level', 0)
457 457 )
458 458 # Before we send this object over, we scrub it for JSON usage
459 459 oinfo = json_clean(object_info)
460 460 msg = self.session.send(stream, 'object_info_reply',
461 461 oinfo, parent, ident)
462 462 self.log.debug("%s", msg)
463 463
464 464 def history_request(self, stream, ident, parent):
465 465 # We need to pull these out, as passing **kwargs doesn't work with
466 466 # unicode keys before Python 2.6.5.
467 467 hist_access_type = parent['content']['hist_access_type']
468 468 raw = parent['content']['raw']
469 469 output = parent['content']['output']
470 470 if hist_access_type == 'tail':
471 471 n = parent['content']['n']
472 472 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
473 473 include_latest=True)
474 474
475 475 elif hist_access_type == 'range':
476 476 session = parent['content']['session']
477 477 start = parent['content']['start']
478 478 stop = parent['content']['stop']
479 479 hist = self.shell.history_manager.get_range(session, start, stop,
480 480 raw=raw, output=output)
481 481
482 482 elif hist_access_type == 'search':
483 483 pattern = parent['content']['pattern']
484 484 hist = self.shell.history_manager.search(pattern, raw=raw,
485 485 output=output)
486 486
487 487 else:
488 488 hist = []
489 489 hist = list(hist)
490 490 content = {'history' : hist}
491 491 content = json_clean(content)
492 492 msg = self.session.send(stream, 'history_reply',
493 493 content, parent, ident)
494 494 self.log.debug("Sending history reply with %i entries", len(hist))
495 495
496 496 def connect_request(self, stream, ident, parent):
497 497 if self._recorded_ports is not None:
498 498 content = self._recorded_ports.copy()
499 499 else:
500 500 content = {}
501 501 msg = self.session.send(stream, 'connect_reply',
502 502 content, parent, ident)
503 503 self.log.debug("%s", msg)
504 504
505 505 def shutdown_request(self, stream, ident, parent):
506 506 self.shell.exit_now = True
507 507 content = dict(status='ok')
508 508 content.update(parent['content'])
509 509 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
510 510 # same content, but different msg_id for broadcasting on IOPub
511 511 self._shutdown_message = self.session.msg(u'shutdown_reply',
512 512 content, parent
513 513 )
514 514
515 515 self._at_shutdown()
516 516 # call sys.exit after a short delay
517 517 loop = ioloop.IOLoop.instance()
518 518 loop.add_timeout(time.time()+0.1, loop.stop)
519 519
520 520 #---------------------------------------------------------------------------
521 521 # Engine methods
522 522 #---------------------------------------------------------------------------
523 523
524 524 def apply_request(self, stream, ident, parent):
525 525 try:
526 526 content = parent[u'content']
527 527 bufs = parent[u'buffers']
528 528 msg_id = parent['header']['msg_id']
529 529 except:
530 530 self.log.error("Got bad msg: %s", parent, exc_info=True)
531 531 return
532 532
533 533 # Set the parent message of the display hook and out streams.
534 534 self.shell.displayhook.set_parent(parent)
535 535 self.shell.display_pub.set_parent(parent)
536 536 sys.stdout.set_parent(parent)
537 537 sys.stderr.set_parent(parent)
538 538
539 539 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
540 540 # self.iopub_socket.send(pyin_msg)
541 541 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
542 542 sub = self._make_subheader()
543 543 try:
544 544 working = self.shell.user_ns
545 545
546 546 prefix = "_"+str(msg_id).replace("-","")+"_"
547 547
548 548 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
549 549
550 550 fname = getattr(f, '__name__', 'f')
551 551
552 552 fname = prefix+"f"
553 553 argname = prefix+"args"
554 554 kwargname = prefix+"kwargs"
555 555 resultname = prefix+"result"
556 556
557 557 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
558 558 # print ns
559 559 working.update(ns)
560 560 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
561 561 try:
562 562 exec code in self.shell.user_global_ns, self.shell.user_ns
563 563 result = working.get(resultname)
564 564 finally:
565 565 for key in ns.iterkeys():
566 566 working.pop(key)
567 567
568 568 packed_result,buf = serialize_object(result)
569 569 result_buf = [packed_result]+buf
570 570 except:
571 571 exc_content = self._wrap_exception('apply')
572 572 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
573 573 self.session.send(self.iopub_socket, u'pyerr', exc_content, parent=parent,
574 574 ident=self._topic('pyerr'))
575 575 reply_content = exc_content
576 576 result_buf = []
577 577
578 578 if exc_content['ename'] == 'UnmetDependency':
579 579 sub['dependencies_met'] = False
580 580 else:
581 581 reply_content = {'status' : 'ok'}
582 582
583 583 # put 'ok'/'error' status in header, for scheduler introspection:
584 584 sub['status'] = reply_content['status']
585 585
586 586 # flush i/o
587 587 sys.stdout.flush()
588 588 sys.stderr.flush()
589 589
590 590 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
591 591 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
592 592
593 593 #---------------------------------------------------------------------------
594 594 # Control messages
595 595 #---------------------------------------------------------------------------
596 596
597 597 def abort_request(self, stream, ident, parent):
598 598 """abort a specifig msg by id"""
599 599 msg_ids = parent['content'].get('msg_ids', None)
600 600 if isinstance(msg_ids, basestring):
601 601 msg_ids = [msg_ids]
602 602 if not msg_ids:
603 603 self.abort_queues()
604 604 for mid in msg_ids:
605 605 self.aborted.add(str(mid))
606 606
607 607 content = dict(status='ok')
608 608 reply_msg = self.session.send(stream, 'abort_reply', content=content,
609 609 parent=parent, ident=ident)
610 610 self.log.debug("%s", reply_msg)
611 611
612 612 def clear_request(self, stream, idents, parent):
613 613 """Clear our namespace."""
614 614 self.shell.reset(False)
615 615 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
616 616 content = dict(status='ok'))
617 617
618 618
619 619 #---------------------------------------------------------------------------
620 620 # Protected interface
621 621 #---------------------------------------------------------------------------
622 622
623 623
624 624 def _wrap_exception(self, method=None):
625 625 # import here, because _wrap_exception is only used in parallel,
626 626 # and parallel has higher min pyzmq version
627 627 from IPython.parallel.error import wrap_exception
628 628 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
629 629 content = wrap_exception(e_info)
630 630 return content
631 631
632 632 def _topic(self, topic):
633 633 """prefixed topic for IOPub messages"""
634 634 if self.int_id >= 0:
635 635 base = "engine.%i" % self.int_id
636 636 else:
637 637 base = "kernel.%s" % self.ident
638 638
639 639 return py3compat.cast_bytes("%s.%s" % (base, topic))
640 640
641 641 def _abort_queues(self):
642 642 for stream in self.shell_streams:
643 643 if stream:
644 644 self._abort_queue(stream)
645 645
646 646 def _abort_queue(self, stream):
647 647 poller = zmq.Poller()
648 648 poller.register(stream.socket, zmq.POLLIN)
649 649 while True:
650 650 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
651 651 if msg is None:
652 652 return
653 653
654 654 self.log.info("Aborting:")
655 655 self.log.info("%s", msg)
656 656 msg_type = msg['header']['msg_type']
657 657 reply_type = msg_type.split('_')[0] + '_reply'
658 658
659 659 status = {'status' : 'aborted'}
660 660 sub = {'engine' : self.ident}
661 661 sub.update(status)
662 662 reply_msg = self.session.send(stream, reply_type, subheader=sub,
663 663 content=status, parent=msg, ident=idents)
664 664 self.log.debug("%s", reply_msg)
665 665 # We need to wait a bit for requests to come in. This can probably
666 666 # be set shorter for true asynchronous clients.
667 667 poller.poll(50)
668 668
669 669
670 670 def _no_raw_input(self):
671 671 """Raise StdinNotImplentedError if active frontend doesn't support
672 672 stdin."""
673 673 raise StdinNotImplementedError("raw_input was called, but this "
674 674 "frontend does not support stdin.")
675 675
676 676 def _raw_input(self, prompt, ident, parent):
677 677 # Flush output before making the request.
678 678 sys.stderr.flush()
679 679 sys.stdout.flush()
680 680
681 681 # Send the input request.
682 682 content = json_clean(dict(prompt=prompt))
683 683 self.session.send(self.stdin_socket, u'input_request', content, parent,
684 684 ident=ident)
685 685
686 686 # Await a response.
687 687 while True:
688 688 try:
689 689 ident, reply = self.session.recv(self.stdin_socket, 0)
690 690 except Exception:
691 691 self.log.warn("Invalid Message:", exc_info=True)
692 692 else:
693 693 break
694 694 try:
695 695 value = reply['content']['value']
696 696 except:
697 697 self.log.error("Got bad raw_input reply: ")
698 698 self.log.error("%s", parent)
699 699 value = ''
700 700 if value == '\x04':
701 701 # EOF
702 702 raise EOFError
703 703 return value
704 704
705 705 def _complete(self, msg):
706 706 c = msg['content']
707 707 try:
708 708 cpos = int(c['cursor_pos'])
709 709 except:
710 710 # If we don't get something that we can convert to an integer, at
711 711 # least attempt the completion guessing the cursor is at the end of
712 712 # the text, if there's any, and otherwise of the line
713 713 cpos = len(c['text'])
714 714 if cpos==0:
715 715 cpos = len(c['line'])
716 716 return self.shell.complete(c['text'], c['line'], cpos)
717 717
718 718 def _object_info(self, context):
719 719 symbol, leftover = self._symbol_from_context(context)
720 720 if symbol is not None and not leftover:
721 721 doc = getattr(symbol, '__doc__', '')
722 722 else:
723 723 doc = ''
724 724 object_info = dict(docstring = doc)
725 725 return object_info
726 726
727 727 def _symbol_from_context(self, context):
728 728 if not context:
729 729 return None, context
730 730
731 731 base_symbol_string = context[0]
732 732 symbol = self.shell.user_ns.get(base_symbol_string, None)
733 733 if symbol is None:
734 734 symbol = __builtin__.__dict__.get(base_symbol_string, None)
735 735 if symbol is None:
736 736 return None, context
737 737
738 738 context = context[1:]
739 739 for i, name in enumerate(context):
740 740 new_symbol = getattr(symbol, name, None)
741 741 if new_symbol is None:
742 742 return symbol, context[i:]
743 743 else:
744 744 symbol = new_symbol
745 745
746 746 return symbol, []
747 747
748 748 def _at_shutdown(self):
749 749 """Actions taken at shutdown by the kernel, called by python's atexit.
750 750 """
751 751 # io.rprint("Kernel at_shutdown") # dbg
752 752 if self._shutdown_message is not None:
753 753 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
754 754 self.log.debug("%s", self._shutdown_message)
755 755 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
756 756
757 757 #-----------------------------------------------------------------------------
758 758 # Aliases and Flags for the IPKernelApp
759 759 #-----------------------------------------------------------------------------
760 760
761 761 flags = dict(kernel_flags)
762 762 flags.update(shell_flags)
763 763
764 764 addflag = lambda *args: flags.update(boolean_flag(*args))
765 765
766 766 flags['pylab'] = (
767 767 {'IPKernelApp' : {'pylab' : 'auto'}},
768 768 """Pre-load matplotlib and numpy for interactive use with
769 769 the default matplotlib backend."""
770 770 )
771 771
772 772 aliases = dict(kernel_aliases)
773 773 aliases.update(shell_aliases)
774 774
775 775 # it's possible we don't want short aliases for *all* of these:
776 776 aliases.update(dict(
777 777 gui='IPKernelApp.gui',
778 778 pylab='IPKernelApp.pylab',
779 779 ))
780 780
781 781 #-----------------------------------------------------------------------------
782 782 # The IPKernelApp class
783 783 #-----------------------------------------------------------------------------
784 784
785 785 class IPKernelApp(KernelApp, InteractiveShellApp):
786 786 name = 'ipkernel'
787 787
788 788 aliases = Dict(aliases)
789 789 flags = Dict(flags)
790 790 classes = [Kernel, ZMQInteractiveShell, ProfileDir, Session]
791 791
792 792 # configurables
793 793 gui = CaselessStrEnum(('qt', 'wx', 'gtk', 'glut', 'pyglet'), config=True,
794 794 help="Enable GUI event loop integration ('qt', 'wx', 'gtk', 'glut', 'pyglet')."
795 795 )
796 796 pylab = CaselessStrEnum(['tk', 'qt', 'wx', 'gtk', 'osx', 'inline', 'auto'],
797 797 config=True,
798 798 help="""Pre-load matplotlib and numpy for interactive use,
799 799 selecting a particular matplotlib backend and loop integration.
800 800 """
801 801 )
802 802
803 803 @catch_config_error
804 804 def initialize(self, argv=None):
805 805 super(IPKernelApp, self).initialize(argv)
806 806 self.init_path()
807 807 self.init_shell()
808 808 self.init_gui_pylab()
809 809 self.init_extensions()
810 810 self.init_code()
811 811
812 812 def init_kernel(self):
813 813
814 814 shell_stream = ZMQStream(self.shell_socket)
815 815
816 816 kernel = Kernel(config=self.config, session=self.session,
817 817 shell_streams=[shell_stream],
818 818 iopub_socket=self.iopub_socket,
819 819 stdin_socket=self.stdin_socket,
820 820 log=self.log,
821 821 profile_dir=self.profile_dir,
822 822 )
823 823 self.kernel = kernel
824 824 kernel.record_ports(self.ports)
825 825 shell = kernel.shell
826 826
827 827 def init_gui_pylab(self):
828 828 """Enable GUI event loop integration, taking pylab into account."""
829 829 if self.gui or self.pylab:
830 830 shell = self.shell
831 831 try:
832 832 if self.pylab:
833 833 gui, backend = pylabtools.find_gui_and_backend(self.pylab)
834 834 shell.enable_pylab(gui, import_all=self.pylab_import_all)
835 835 else:
836 836 shell.enable_gui(self.gui)
837 837 except Exception:
838 self.log.error("Pylab initialization failed", exc_info=True)
838 self.log.error("GUI event loop or pylab initialization failed",
839 exc_info=True)
839 840 # print exception straight to stdout, because normally
840 841 # _showtraceback associates the reply with an execution,
841 842 # which means frontends will never draw it, as this exception
842 843 # is not associated with any execute request.
843 844
844 845 # replace pyerr-sending traceback with stdout
845 846 _showtraceback = shell._showtraceback
846 847 def print_tb(etype, evalue, stb):
847 print ("Error initializing pylab, pylab mode will not "
848 "be active", file=io.stderr)
848 print ("GUI event loop or pylab initialization failed",
849 file=io.stderr)
849 850 print (shell.InteractiveTB.stb2text(stb), file=io.stdout)
850 851 shell._showtraceback = print_tb
851 852
852 853 # send the traceback over stdout
853 854 shell.showtraceback(tb_offset=0)
854 855
855 856 # restore proper _showtraceback method
856 857 shell._showtraceback = _showtraceback
857 858
858 859
859 860 def init_shell(self):
860 861 self.shell = self.kernel.shell
861 862 self.shell.configurables.append(self)
862 863
863 864
864 865 #-----------------------------------------------------------------------------
865 866 # Kernel main and launch functions
866 867 #-----------------------------------------------------------------------------
867 868
868 869 def launch_kernel(*args, **kwargs):
869 870 """Launches a localhost IPython kernel, binding to the specified ports.
870 871
871 872 This function simply calls entry_point.base_launch_kernel with the right
872 873 first command to start an ipkernel. See base_launch_kernel for arguments.
873 874
874 875 Returns
875 876 -------
876 877 A tuple of form:
877 878 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
878 879 where kernel_process is a Popen object and the ports are integers.
879 880 """
880 881 return base_launch_kernel('from IPython.zmq.ipkernel import main; main()',
881 882 *args, **kwargs)
882 883
883 884
884 885 def embed_kernel(module=None, local_ns=None, **kwargs):
885 886 """Embed and start an IPython kernel in a given scope.
886 887
887 888 Parameters
888 889 ----------
889 890 module : ModuleType, optional
890 891 The module to load into IPython globals (default: caller)
891 892 local_ns : dict, optional
892 893 The namespace to load into IPython user namespace (default: caller)
893 894
894 895 kwargs : various, optional
895 896 Further keyword args are relayed to the KernelApp constructor,
896 897 allowing configuration of the Kernel. Will only have an effect
897 898 on the first embed_kernel call for a given process.
898 899
899 900 """
900 901 # get the app if it exists, or set it up if it doesn't
901 902 if IPKernelApp.initialized():
902 903 app = IPKernelApp.instance()
903 904 else:
904 905 app = IPKernelApp.instance(**kwargs)
905 906 app.initialize([])
906 907 # Undo unnecessary sys module mangling from init_sys_modules.
907 908 # This would not be necessary if we could prevent it
908 909 # in the first place by using a different InteractiveShell
909 910 # subclass, as in the regular embed case.
910 911 main = app.kernel.shell._orig_sys_modules_main_mod
911 912 if main is not None:
912 913 sys.modules[app.kernel.shell._orig_sys_modules_main_name] = main
913 914
914 915 # load the calling scope if not given
915 916 (caller_module, caller_locals) = extract_module_locals(1)
916 917 if module is None:
917 918 module = caller_module
918 919 if local_ns is None:
919 920 local_ns = caller_locals
920 921
921 922 app.kernel.user_module = module
922 923 app.kernel.user_ns = local_ns
923 924 app.start()
924 925
925 926 def main():
926 927 """Run an IPKernel as an application"""
927 928 app = IPKernelApp.instance()
928 929 app.initialize()
929 930 app.start()
930 931
931 932
932 933 if __name__ == '__main__':
933 934 main()
General Comments 0
You need to be logged in to leave comments. Login now