##// END OF EJS Templates
revert embed_kernel changes that implied bind_kernel in engine
MinRK -
Show More
@@ -1,931 +1,919 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 7 call set_parent on all the PUB objects with the message about to be executed.
8 8 * Implement random port and security key logic.
9 9 * Implement control messages.
10 10 * Implement event loop and poll version.
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 # Standard library imports
19 19 import __builtin__
20 20 import atexit
21 21 import sys
22 22 import time
23 23 import traceback
24 24 import logging
25 25 import uuid
26 26
27 27 from datetime import datetime
28 28 from signal import (
29 29 signal, getsignal, default_int_handler, SIGINT, SIG_IGN
30 30 )
31 31
32 32 # System library imports
33 33 import zmq
34 34 from zmq.eventloop import ioloop
35 35 from zmq.eventloop.zmqstream import ZMQStream
36 36
37 37 # Local imports
38 38 from IPython.core import pylabtools
39 39 from IPython.config.configurable import Configurable
40 from IPython.config.application import boolean_flag, catch_config_error, Application
40 from IPython.config.application import boolean_flag, catch_config_error
41 41 from IPython.core.application import ProfileDir
42 42 from IPython.core.error import StdinNotImplementedError
43 43 from IPython.core.shellapp import (
44 44 InteractiveShellApp, shell_flags, shell_aliases
45 45 )
46 46 from IPython.utils import io
47 47 from IPython.utils import py3compat
48 48 from IPython.utils.frame import extract_module_locals
49 49 from IPython.utils.jsonutil import json_clean
50 50 from IPython.utils.traitlets import (
51 51 Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode
52 52 )
53 53
54 54 from entry_point import base_launch_kernel
55 55 from kernelapp import KernelApp, kernel_flags, kernel_aliases
56 56 from serialize import serialize_object, unpack_apply_message
57 57 from session import Session, Message
58 58 from zmqshell import ZMQInteractiveShell
59 59
60 60
61 61 #-----------------------------------------------------------------------------
62 62 # Main kernel class
63 63 #-----------------------------------------------------------------------------
64 64
65 65 class Kernel(Configurable):
66 66
67 67 #---------------------------------------------------------------------------
68 68 # Kernel interface
69 69 #---------------------------------------------------------------------------
70 70
71 71 # attribute to override with a GUI
72 72 eventloop = Any(None)
73 73 def _eventloop_changed(self, name, old, new):
74 74 """schedule call to eventloop from IOLoop"""
75 75 loop = ioloop.IOLoop.instance()
76 76 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
77 77
78 78 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
79 79 session = Instance(Session)
80 80 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
81 81 shell_streams = List()
82 82 control_stream = Instance(ZMQStream)
83 83 iopub_socket = Instance(zmq.Socket)
84 84 stdin_socket = Instance(zmq.Socket)
85 85 log = Instance(logging.Logger)
86 86
87 87 user_module = Any()
88 88 def _user_module_changed(self, name, old, new):
89 89 if self.shell is not None:
90 90 self.shell.user_module = new
91 91
92 92 user_ns = Dict(default_value=None)
93 93 def _user_ns_changed(self, name, old, new):
94 94 if self.shell is not None:
95 95 self.shell.user_ns = new
96 96 self.shell.init_user_ns()
97 97
98 98 # identities:
99 99 int_id = Integer(-1)
100 100 ident = Unicode()
101 101
102 102 def _ident_default(self):
103 103 return unicode(uuid.uuid4())
104 104
105 105
106 106 # Private interface
107 107
108 108 # Time to sleep after flushing the stdout/err buffers in each execute
109 109 # cycle. While this introduces a hard limit on the minimal latency of the
110 110 # execute cycle, it helps prevent output synchronization problems for
111 111 # clients.
112 112 # Units are in seconds. The minimum zmq latency on local host is probably
113 113 # ~150 microseconds, set this to 500us for now. We may need to increase it
114 114 # a little if it's not enough after more interactive testing.
115 115 _execute_sleep = Float(0.0005, config=True)
116 116
117 117 # Frequency of the kernel's event loop.
118 118 # Units are in seconds, kernel subclasses for GUI toolkits may need to
119 119 # adapt to milliseconds.
120 120 _poll_interval = Float(0.05, config=True)
121 121
122 122 # If the shutdown was requested over the network, we leave here the
123 123 # necessary reply message so it can be sent by our registered atexit
124 124 # handler. This ensures that the reply is only sent to clients truly at
125 125 # the end of our shutdown process (which happens after the underlying
126 126 # IPython shell's own shutdown).
127 127 _shutdown_message = None
128 128
129 129 # This is a dict of port number that the kernel is listening on. It is set
130 130 # by record_ports and used by connect_request.
131 131 _recorded_ports = Dict()
132 132
133 133 # set of aborted msg_ids
134 134 aborted = Set()
135 135
136 136
137 137 def __init__(self, **kwargs):
138 138 super(Kernel, self).__init__(**kwargs)
139 139
140 140 # Initialize the InteractiveShell subclass
141 141 self.shell = ZMQInteractiveShell.instance(config=self.config,
142 142 profile_dir = self.profile_dir,
143 143 user_module = self.user_module,
144 144 user_ns = self.user_ns,
145 145 )
146 146 self.shell.displayhook.session = self.session
147 147 self.shell.displayhook.pub_socket = self.iopub_socket
148 148 self.shell.displayhook.topic = self._topic('pyout')
149 149 self.shell.display_pub.session = self.session
150 150 self.shell.display_pub.pub_socket = self.iopub_socket
151 151
152 152 # TMP - hack while developing
153 153 self.shell._reply_content = None
154 154
155 155 # Build dict of handlers for message types
156 156 msg_types = [ 'execute_request', 'complete_request',
157 157 'object_info_request', 'history_request',
158 158 'connect_request', 'shutdown_request',
159 159 'apply_request',
160 160 ]
161 161 self.shell_handlers = {}
162 162 for msg_type in msg_types:
163 163 self.shell_handlers[msg_type] = getattr(self, msg_type)
164 164
165 165 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
166 166 self.control_handlers = {}
167 167 for msg_type in control_msg_types:
168 168 self.control_handlers[msg_type] = getattr(self, msg_type)
169 169
170 170 def dispatch_control(self, msg):
171 171 """dispatch control requests"""
172 172 idents,msg = self.session.feed_identities(msg, copy=False)
173 173 try:
174 174 msg = self.session.unserialize(msg, content=True, copy=False)
175 175 except:
176 176 self.log.error("Invalid Control Message", exc_info=True)
177 177 return
178 178
179 179 self.log.debug("Control received: %s", msg)
180 180
181 181 header = msg['header']
182 182 msg_id = header['msg_id']
183 183 msg_type = header['msg_type']
184 184
185 185 handler = self.control_handlers.get(msg_type, None)
186 186 if handler is None:
187 187 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
188 188 else:
189 189 try:
190 190 handler(self.control_stream, idents, msg)
191 191 except Exception:
192 192 self.log.error("Exception in control handler:", exc_info=True)
193 193
194 194 def dispatch_shell(self, stream, msg):
195 195 """dispatch shell requests"""
196 196 # flush control requests first
197 197 if self.control_stream:
198 198 self.control_stream.flush()
199 199
200 200 idents,msg = self.session.feed_identities(msg, copy=False)
201 201 try:
202 202 msg = self.session.unserialize(msg, content=True, copy=False)
203 203 except:
204 204 self.log.error("Invalid Message", exc_info=True)
205 205 return
206 206
207 207 header = msg['header']
208 208 msg_id = header['msg_id']
209 209 msg_type = msg['header']['msg_type']
210 210
211 211 # Print some info about this message and leave a '--->' marker, so it's
212 212 # easier to trace visually the message chain when debugging. Each
213 213 # handler prints its message at the end.
214 214 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
215 215 self.log.debug(' Content: %s\n --->\n ', msg['content'])
216 216
217 217 if msg_id in self.aborted:
218 218 self.aborted.remove(msg_id)
219 219 # is it safe to assume a msg_id will not be resubmitted?
220 220 reply_type = msg_type.split('_')[0] + '_reply'
221 221 status = {'status' : 'aborted'}
222 222 sub = {'engine' : self.ident}
223 223 sub.update(status)
224 224 reply_msg = self.session.send(stream, reply_type, subheader=sub,
225 225 content=status, parent=msg, ident=idents)
226 226 return
227 227
228 228 handler = self.shell_handlers.get(msg_type, None)
229 229 if handler is None:
230 230 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
231 231 else:
232 232 # ensure default_int_handler during handler call
233 233 sig = signal(SIGINT, default_int_handler)
234 234 try:
235 235 handler(stream, idents, msg)
236 236 except Exception:
237 237 self.log.error("Exception in message handler:", exc_info=True)
238 238 finally:
239 239 signal(SIGINT, sig)
240 240
241 241 def enter_eventloop(self):
242 242 """enter eventloop"""
243 243 self.log.info("entering eventloop")
244 244 # restore default_int_handler
245 245 signal(SIGINT, default_int_handler)
246 246 while self.eventloop is not None:
247 247 try:
248 248 self.eventloop(self)
249 249 except KeyboardInterrupt:
250 250 # Ctrl-C shouldn't crash the kernel
251 251 self.log.error("KeyboardInterrupt caught in kernel")
252 252 continue
253 253 else:
254 254 # eventloop exited cleanly, this means we should stop (right?)
255 255 self.eventloop = None
256 256 break
257 257 self.log.info("exiting eventloop")
258 258 # if eventloop exits, IOLoop should stop
259 259 ioloop.IOLoop.instance().stop()
260 260
261 261 def start(self):
262 262 """register dispatchers for streams"""
263 263 self.shell.exit_now = False
264 264 if self.control_stream:
265 265 self.control_stream.on_recv(self.dispatch_control, copy=False)
266 266
267 267 def make_dispatcher(stream):
268 268 def dispatcher(msg):
269 269 return self.dispatch_shell(stream, msg)
270 270 return dispatcher
271 271
272 272 for s in self.shell_streams:
273 273 s.on_recv(make_dispatcher(s), copy=False)
274 274
275 275 def do_one_iteration(self):
276 276 """step eventloop just once"""
277 277 if self.control_stream:
278 278 self.control_stream.flush()
279 279 for stream in self.shell_streams:
280 280 # handle at most one request per iteration
281 281 stream.flush(zmq.POLLIN, 1)
282 282 stream.flush(zmq.POLLOUT)
283 283
284 284
285 285 def record_ports(self, ports):
286 286 """Record the ports that this kernel is using.
287 287
288 288 The creator of the Kernel instance must call this methods if they
289 289 want the :meth:`connect_request` method to return the port numbers.
290 290 """
291 291 self._recorded_ports = ports
292 292
293 293 #---------------------------------------------------------------------------
294 294 # Kernel request handlers
295 295 #---------------------------------------------------------------------------
296 296
297 297 def _make_subheader(self):
298 298 """init subheader dict, for execute/apply_reply"""
299 299 return {
300 300 'dependencies_met' : True,
301 301 'engine' : self.ident,
302 302 'started': datetime.now(),
303 303 }
304 304
305 305 def _publish_pyin(self, code, parent, execution_count):
306 306 """Publish the code request on the pyin stream."""
307 307
308 308 self.session.send(self.iopub_socket, u'pyin',
309 309 {u'code':code, u'execution_count': execution_count},
310 310 parent=parent, ident=self._topic('pyin')
311 311 )
312 312
313 313 def execute_request(self, stream, ident, parent):
314 314
315 315 self.session.send(self.iopub_socket,
316 316 u'status',
317 317 {u'execution_state':u'busy'},
318 318 parent=parent,
319 319 ident=self._topic('status'),
320 320 )
321 321
322 322 try:
323 323 content = parent[u'content']
324 324 code = content[u'code']
325 325 silent = content[u'silent']
326 326 except:
327 327 self.log.error("Got bad msg: ")
328 328 self.log.error("%s", parent)
329 329 return
330 330
331 331 sub = self._make_subheader()
332 332
333 333 shell = self.shell # we'll need this a lot here
334 334
335 335 # Replace raw_input. Note that is not sufficient to replace
336 336 # raw_input in the user namespace.
337 337 if content.get('allow_stdin', False):
338 338 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
339 339 else:
340 340 raw_input = lambda prompt='' : self._no_raw_input()
341 341
342 342 if py3compat.PY3:
343 343 __builtin__.input = raw_input
344 344 else:
345 345 __builtin__.raw_input = raw_input
346 346
347 347 # Set the parent message of the display hook and out streams.
348 348 shell.displayhook.set_parent(parent)
349 349 shell.display_pub.set_parent(parent)
350 350 sys.stdout.set_parent(parent)
351 351 sys.stderr.set_parent(parent)
352 352
353 353 # Re-broadcast our input for the benefit of listening clients, and
354 354 # start computing output
355 355 if not silent:
356 356 self._publish_pyin(code, parent, shell.execution_count)
357 357
358 358 reply_content = {}
359 359 try:
360 360 # FIXME: the shell calls the exception handler itself.
361 361 shell.run_cell(code, store_history=not silent, silent=silent)
362 362 except:
363 363 status = u'error'
364 364 # FIXME: this code right now isn't being used yet by default,
365 365 # because the run_cell() call above directly fires off exception
366 366 # reporting. This code, therefore, is only active in the scenario
367 367 # where runlines itself has an unhandled exception. We need to
368 368 # uniformize this, for all exception construction to come from a
369 369 # single location in the codbase.
370 370 etype, evalue, tb = sys.exc_info()
371 371 tb_list = traceback.format_exception(etype, evalue, tb)
372 372 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
373 373 else:
374 374 status = u'ok'
375 375
376 376 reply_content[u'status'] = status
377 377
378 378 # Return the execution counter so clients can display prompts
379 379 reply_content['execution_count'] = shell.execution_count - 1
380 380
381 381 # FIXME - fish exception info out of shell, possibly left there by
382 382 # runlines. We'll need to clean up this logic later.
383 383 if shell._reply_content is not None:
384 384 reply_content.update(shell._reply_content)
385 385 # reset after use
386 386 shell._reply_content = None
387 387
388 388 # At this point, we can tell whether the main code execution succeeded
389 389 # or not. If it did, we proceed to evaluate user_variables/expressions
390 390 if reply_content['status'] == 'ok':
391 391 reply_content[u'user_variables'] = \
392 392 shell.user_variables(content.get(u'user_variables', []))
393 393 reply_content[u'user_expressions'] = \
394 394 shell.user_expressions(content.get(u'user_expressions', {}))
395 395 else:
396 396 # If there was an error, don't even try to compute variables or
397 397 # expressions
398 398 reply_content[u'user_variables'] = {}
399 399 reply_content[u'user_expressions'] = {}
400 400
401 401 # Payloads should be retrieved regardless of outcome, so we can both
402 402 # recover partial output (that could have been generated early in a
403 403 # block, before an error) and clear the payload system always.
404 404 reply_content[u'payload'] = shell.payload_manager.read_payload()
405 405 # Be agressive about clearing the payload because we don't want
406 406 # it to sit in memory until the next execute_request comes in.
407 407 shell.payload_manager.clear_payload()
408 408
409 409 # Flush output before sending the reply.
410 410 sys.stdout.flush()
411 411 sys.stderr.flush()
412 412 # FIXME: on rare occasions, the flush doesn't seem to make it to the
413 413 # clients... This seems to mitigate the problem, but we definitely need
414 414 # to better understand what's going on.
415 415 if self._execute_sleep:
416 416 time.sleep(self._execute_sleep)
417 417
418 418 # Send the reply.
419 419 reply_content = json_clean(reply_content)
420 420
421 421 sub['status'] = reply_content['status']
422 422 if reply_content['status'] == 'error' and \
423 423 reply_content['ename'] == 'UnmetDependency':
424 424 sub['dependencies_met'] = False
425 425
426 426 reply_msg = self.session.send(stream, u'execute_reply',
427 427 reply_content, parent, subheader=sub,
428 428 ident=ident)
429 429
430 430 self.log.debug("%s", reply_msg)
431 431
432 432 if not silent and reply_msg['content']['status'] == u'error':
433 433 self._abort_queues()
434 434
435 435 self.session.send(self.iopub_socket,
436 436 u'status',
437 437 {u'execution_state':u'idle'},
438 438 parent=parent,
439 439 ident=self._topic('status'))
440 440
441 441 def complete_request(self, stream, ident, parent):
442 442 txt, matches = self._complete(parent)
443 443 matches = {'matches' : matches,
444 444 'matched_text' : txt,
445 445 'status' : 'ok'}
446 446 matches = json_clean(matches)
447 447 completion_msg = self.session.send(stream, 'complete_reply',
448 448 matches, parent, ident)
449 449 self.log.debug("%s", completion_msg)
450 450
451 451 def object_info_request(self, stream, ident, parent):
452 452 content = parent['content']
453 453 object_info = self.shell.object_inspect(content['oname'],
454 454 detail_level = content.get('detail_level', 0)
455 455 )
456 456 # Before we send this object over, we scrub it for JSON usage
457 457 oinfo = json_clean(object_info)
458 458 msg = self.session.send(stream, 'object_info_reply',
459 459 oinfo, parent, ident)
460 460 self.log.debug("%s", msg)
461 461
462 462 def history_request(self, stream, ident, parent):
463 463 # We need to pull these out, as passing **kwargs doesn't work with
464 464 # unicode keys before Python 2.6.5.
465 465 hist_access_type = parent['content']['hist_access_type']
466 466 raw = parent['content']['raw']
467 467 output = parent['content']['output']
468 468 if hist_access_type == 'tail':
469 469 n = parent['content']['n']
470 470 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
471 471 include_latest=True)
472 472
473 473 elif hist_access_type == 'range':
474 474 session = parent['content']['session']
475 475 start = parent['content']['start']
476 476 stop = parent['content']['stop']
477 477 hist = self.shell.history_manager.get_range(session, start, stop,
478 478 raw=raw, output=output)
479 479
480 480 elif hist_access_type == 'search':
481 481 pattern = parent['content']['pattern']
482 482 hist = self.shell.history_manager.search(pattern, raw=raw,
483 483 output=output)
484 484
485 485 else:
486 486 hist = []
487 487 hist = list(hist)
488 488 content = {'history' : hist}
489 489 content = json_clean(content)
490 490 msg = self.session.send(stream, 'history_reply',
491 491 content, parent, ident)
492 492 self.log.debug("Sending history reply with %i entries", len(hist))
493 493
494 494 def connect_request(self, stream, ident, parent):
495 495 if self._recorded_ports is not None:
496 496 content = self._recorded_ports.copy()
497 497 else:
498 498 content = {}
499 499 msg = self.session.send(stream, 'connect_reply',
500 500 content, parent, ident)
501 501 self.log.debug("%s", msg)
502 502
503 503 def shutdown_request(self, stream, ident, parent):
504 504 self.shell.exit_now = True
505 505 content = dict(status='ok')
506 506 content.update(parent['content'])
507 507 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
508 508 # same content, but different msg_id for broadcasting on IOPub
509 509 self._shutdown_message = self.session.msg(u'shutdown_reply',
510 510 content, parent
511 511 )
512 512
513 513 self._at_shutdown()
514 514 # call sys.exit after a short delay
515 515 loop = ioloop.IOLoop.instance()
516 516 loop.add_timeout(time.time()+0.1, loop.stop)
517 517
518 518 #---------------------------------------------------------------------------
519 519 # Engine methods
520 520 #---------------------------------------------------------------------------
521 521
522 522 def apply_request(self, stream, ident, parent):
523 523 try:
524 524 content = parent[u'content']
525 525 bufs = parent[u'buffers']
526 526 msg_id = parent['header']['msg_id']
527 527 except:
528 528 self.log.error("Got bad msg: %s", parent, exc_info=True)
529 529 return
530 530
531 531 # Set the parent message of the display hook and out streams.
532 532 self.shell.displayhook.set_parent(parent)
533 533 self.shell.display_pub.set_parent(parent)
534 534 sys.stdout.set_parent(parent)
535 535 sys.stderr.set_parent(parent)
536 536
537 537 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
538 538 # self.iopub_socket.send(pyin_msg)
539 539 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
540 540 sub = self._make_subheader()
541 541 try:
542 542 working = self.shell.user_ns
543 543
544 544 prefix = "_"+str(msg_id).replace("-","")+"_"
545 545
546 546 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
547 547
548 548 fname = getattr(f, '__name__', 'f')
549 549
550 550 fname = prefix+"f"
551 551 argname = prefix+"args"
552 552 kwargname = prefix+"kwargs"
553 553 resultname = prefix+"result"
554 554
555 555 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
556 556 # print ns
557 557 working.update(ns)
558 558 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
559 559 try:
560 560 exec code in self.shell.user_global_ns, self.shell.user_ns
561 561 result = working.get(resultname)
562 562 finally:
563 563 for key in ns.iterkeys():
564 564 working.pop(key)
565 565
566 566 packed_result,buf = serialize_object(result)
567 567 result_buf = [packed_result]+buf
568 568 except:
569 569 exc_content = self._wrap_exception('apply')
570 570 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
571 571 self.session.send(self.iopub_socket, u'pyerr', exc_content, parent=parent,
572 572 ident=self._topic('pyerr'))
573 573 reply_content = exc_content
574 574 result_buf = []
575 575
576 576 if exc_content['ename'] == 'UnmetDependency':
577 577 sub['dependencies_met'] = False
578 578 else:
579 579 reply_content = {'status' : 'ok'}
580 580
581 581 # put 'ok'/'error' status in header, for scheduler introspection:
582 582 sub['status'] = reply_content['status']
583 583
584 584 # flush i/o
585 585 sys.stdout.flush()
586 586 sys.stderr.flush()
587 587
588 588 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
589 589 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
590 590
591 591 #---------------------------------------------------------------------------
592 592 # Control messages
593 593 #---------------------------------------------------------------------------
594 594
595 595 def abort_request(self, stream, ident, parent):
596 596 """abort a specifig msg by id"""
597 597 msg_ids = parent['content'].get('msg_ids', None)
598 598 if isinstance(msg_ids, basestring):
599 599 msg_ids = [msg_ids]
600 600 if not msg_ids:
601 601 self.abort_queues()
602 602 for mid in msg_ids:
603 603 self.aborted.add(str(mid))
604 604
605 605 content = dict(status='ok')
606 606 reply_msg = self.session.send(stream, 'abort_reply', content=content,
607 607 parent=parent, ident=ident)
608 608 self.log.debug("%s", reply_msg)
609 609
610 610 def clear_request(self, stream, idents, parent):
611 611 """Clear our namespace."""
612 612 self.shell.reset(False)
613 613 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
614 614 content = dict(status='ok'))
615 615
616 616
617 617 #---------------------------------------------------------------------------
618 618 # Protected interface
619 619 #---------------------------------------------------------------------------
620 620
621 621
622 622 def _wrap_exception(self, method=None):
623 623 # import here, because _wrap_exception is only used in parallel,
624 624 # and parallel has higher min pyzmq version
625 625 from IPython.parallel.error import wrap_exception
626 626 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
627 627 content = wrap_exception(e_info)
628 628 return content
629 629
630 630 def _topic(self, topic):
631 631 """prefixed topic for IOPub messages"""
632 632 if self.int_id >= 0:
633 633 base = "engine.%i" % self.int_id
634 634 else:
635 635 base = "kernel.%s" % self.ident
636 636
637 637 return py3compat.cast_bytes("%s.%s" % (base, topic))
638 638
639 639 def _abort_queues(self):
640 640 for stream in self.shell_streams:
641 641 if stream:
642 642 self._abort_queue(stream)
643 643
644 644 def _abort_queue(self, stream):
645 645 poller = zmq.Poller()
646 646 poller.register(stream.socket, zmq.POLLIN)
647 647 while True:
648 648 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
649 649 if msg is None:
650 650 return
651 651
652 652 self.log.info("Aborting:")
653 653 self.log.info("%s", msg)
654 654 msg_type = msg['header']['msg_type']
655 655 reply_type = msg_type.split('_')[0] + '_reply'
656 656
657 657 status = {'status' : 'aborted'}
658 658 sub = {'engine' : self.ident}
659 659 sub.update(status)
660 660 reply_msg = self.session.send(stream, reply_type, subheader=sub,
661 661 content=status, parent=msg, ident=idents)
662 662 self.log.debug("%s", reply_msg)
663 663 # We need to wait a bit for requests to come in. This can probably
664 664 # be set shorter for true asynchronous clients.
665 665 poller.poll(50)
666 666
667 667
668 668 def _no_raw_input(self):
669 669 """Raise StdinNotImplentedError if active frontend doesn't support
670 670 stdin."""
671 671 raise StdinNotImplementedError("raw_input was called, but this "
672 672 "frontend does not support stdin.")
673 673
674 674 def _raw_input(self, prompt, ident, parent):
675 675 # Flush output before making the request.
676 676 sys.stderr.flush()
677 677 sys.stdout.flush()
678 678
679 679 # Send the input request.
680 680 content = json_clean(dict(prompt=prompt))
681 681 self.session.send(self.stdin_socket, u'input_request', content, parent,
682 682 ident=ident)
683 683
684 684 # Await a response.
685 685 while True:
686 686 try:
687 687 ident, reply = self.session.recv(self.stdin_socket, 0)
688 688 except Exception:
689 689 self.log.warn("Invalid Message:", exc_info=True)
690 690 else:
691 691 break
692 692 try:
693 693 value = reply['content']['value']
694 694 except:
695 695 self.log.error("Got bad raw_input reply: ")
696 696 self.log.error("%s", parent)
697 697 value = ''
698 698 if value == '\x04':
699 699 # EOF
700 700 raise EOFError
701 701 return value
702 702
703 703 def _complete(self, msg):
704 704 c = msg['content']
705 705 try:
706 706 cpos = int(c['cursor_pos'])
707 707 except:
708 708 # If we don't get something that we can convert to an integer, at
709 709 # least attempt the completion guessing the cursor is at the end of
710 710 # the text, if there's any, and otherwise of the line
711 711 cpos = len(c['text'])
712 712 if cpos==0:
713 713 cpos = len(c['line'])
714 714 return self.shell.complete(c['text'], c['line'], cpos)
715 715
716 716 def _object_info(self, context):
717 717 symbol, leftover = self._symbol_from_context(context)
718 718 if symbol is not None and not leftover:
719 719 doc = getattr(symbol, '__doc__', '')
720 720 else:
721 721 doc = ''
722 722 object_info = dict(docstring = doc)
723 723 return object_info
724 724
725 725 def _symbol_from_context(self, context):
726 726 if not context:
727 727 return None, context
728 728
729 729 base_symbol_string = context[0]
730 730 symbol = self.shell.user_ns.get(base_symbol_string, None)
731 731 if symbol is None:
732 732 symbol = __builtin__.__dict__.get(base_symbol_string, None)
733 733 if symbol is None:
734 734 return None, context
735 735
736 736 context = context[1:]
737 737 for i, name in enumerate(context):
738 738 new_symbol = getattr(symbol, name, None)
739 739 if new_symbol is None:
740 740 return symbol, context[i:]
741 741 else:
742 742 symbol = new_symbol
743 743
744 744 return symbol, []
745 745
746 746 def _at_shutdown(self):
747 747 """Actions taken at shutdown by the kernel, called by python's atexit.
748 748 """
749 749 # io.rprint("Kernel at_shutdown") # dbg
750 750 if self._shutdown_message is not None:
751 751 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
752 752 self.log.debug("%s", self._shutdown_message)
753 753 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
754 754
755 755 #-----------------------------------------------------------------------------
756 756 # Aliases and Flags for the IPKernelApp
757 757 #-----------------------------------------------------------------------------
758 758
759 759 flags = dict(kernel_flags)
760 760 flags.update(shell_flags)
761 761
762 762 addflag = lambda *args: flags.update(boolean_flag(*args))
763 763
764 764 flags['pylab'] = (
765 765 {'IPKernelApp' : {'pylab' : 'auto'}},
766 766 """Pre-load matplotlib and numpy for interactive use with
767 767 the default matplotlib backend."""
768 768 )
769 769
770 770 aliases = dict(kernel_aliases)
771 771 aliases.update(shell_aliases)
772 772
773 773 # it's possible we don't want short aliases for *all* of these:
774 774 aliases.update(dict(
775 775 pylab='IPKernelApp.pylab',
776 776 ))
777 777
778 778 #-----------------------------------------------------------------------------
779 779 # The IPKernelApp class
780 780 #-----------------------------------------------------------------------------
781 781
782 782 class IPKernelApp(KernelApp, InteractiveShellApp):
783 783 name = 'ipkernel'
784 784
785 785 aliases = Dict(aliases)
786 786 flags = Dict(flags)
787 787 classes = [Kernel, ZMQInteractiveShell, ProfileDir, Session]
788 788
789 789 # configurables
790 790 pylab = CaselessStrEnum(['tk', 'qt', 'wx', 'gtk', 'osx', 'inline', 'auto'],
791 791 config=True,
792 792 help="""Pre-load matplotlib and numpy for interactive use,
793 793 selecting a particular matplotlib backend and loop integration.
794 794 """
795 795 )
796 796
797 797 @catch_config_error
798 798 def initialize(self, argv=None):
799 799 super(IPKernelApp, self).initialize(argv)
800 800 self.init_path()
801 801 self.init_shell()
802 802 self.init_extensions()
803 803 self.init_code()
804 804
805 805 def init_kernel(self):
806 806
807 807 shell_stream = ZMQStream(self.shell_socket)
808 808
809 809 kernel = Kernel(config=self.config, session=self.session,
810 810 shell_streams=[shell_stream],
811 811 iopub_socket=self.iopub_socket,
812 812 stdin_socket=self.stdin_socket,
813 813 log=self.log,
814 814 profile_dir=self.profile_dir,
815 815 )
816 816 self.kernel = kernel
817 817 kernel.record_ports(self.ports)
818 818 shell = kernel.shell
819 819 if self.pylab:
820 820 try:
821 821 gui, backend = pylabtools.find_gui_and_backend(self.pylab)
822 822 shell.enable_pylab(gui, import_all=self.pylab_import_all)
823 823 except Exception:
824 824 self.log.error("Pylab initialization failed", exc_info=True)
825 825 # print exception straight to stdout, because normally
826 826 # _showtraceback associates the reply with an execution,
827 827 # which means frontends will never draw it, as this exception
828 828 # is not associated with any execute request.
829 829
830 830 # replace pyerr-sending traceback with stdout
831 831 _showtraceback = shell._showtraceback
832 832 def print_tb(etype, evalue, stb):
833 833 print ("Error initializing pylab, pylab mode will not "
834 834 "be active", file=io.stderr)
835 835 print (shell.InteractiveTB.stb2text(stb), file=io.stdout)
836 836 shell._showtraceback = print_tb
837 837
838 838 # send the traceback over stdout
839 839 shell.showtraceback(tb_offset=0)
840 840
841 841 # restore proper _showtraceback method
842 842 shell._showtraceback = _showtraceback
843 843
844 844
845 845 def init_shell(self):
846 846 self.shell = self.kernel.shell
847 847 self.shell.configurables.append(self)
848 848
849 849
850 850 #-----------------------------------------------------------------------------
851 851 # Kernel main and launch functions
852 852 #-----------------------------------------------------------------------------
853 853
854 854 def launch_kernel(*args, **kwargs):
855 855 """Launches a localhost IPython kernel, binding to the specified ports.
856 856
857 857 This function simply calls entry_point.base_launch_kernel with the right
858 858 first command to start an ipkernel. See base_launch_kernel for arguments.
859 859
860 860 Returns
861 861 -------
862 862 A tuple of form:
863 863 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
864 864 where kernel_process is a Popen object and the ports are integers.
865 865 """
866 866 return base_launch_kernel('from IPython.zmq.ipkernel import main; main()',
867 867 *args, **kwargs)
868 868
869 869
870 870 def embed_kernel(module=None, local_ns=None, **kwargs):
871 871 """Embed and start an IPython kernel in a given scope.
872 872
873 873 Parameters
874 874 ----------
875 875 module : ModuleType, optional
876 876 The module to load into IPython globals (default: caller)
877 877 local_ns : dict, optional
878 878 The namespace to load into IPython user namespace (default: caller)
879 879
880 880 kwargs : various, optional
881 881 Further keyword args are relayed to the KernelApp constructor,
882 882 allowing configuration of the Kernel. Will only have an effect
883 883 on the first embed_kernel call for a given process.
884 884
885 885 """
886 try:
887 from IPython.parallel.apps.ipengineapp import IPEngineApp
888 except ImportError:
889 # IPython.parallel has higher zmq dependency that IPython.zmq
890 class IPEngineApp:
891 pass
892
893 886 # get the app if it exists, or set it up if it doesn't
894 if Application.initialized():
895 app = Application.instance()
896 if isinstance(app, IPEngineApp):
897 app.listen_kernel()
898 # return right away, because we embed in the Engine's
899 # namespace, not the calling one.
900 return
887 if IPKernelApp.initialized():
888 app = IPKernelApp.instance()
901 889 else:
902 890 app = IPKernelApp.instance(**kwargs)
903 891 app.initialize([])
904 892 # Undo unnecessary sys module mangling from init_sys_modules.
905 893 # This would not be necessary if we could prevent it
906 894 # in the first place by using a different InteractiveShell
907 895 # subclass, as in the regular embed case.
908 896 main = app.kernel.shell._orig_sys_modules_main_mod
909 897 if main is not None:
910 898 sys.modules[app.kernel.shell._orig_sys_modules_main_name] = main
911 899
912 900 # load the calling scope if not given
913 901 (caller_module, caller_locals) = extract_module_locals(1)
914 902 if module is None:
915 903 module = caller_module
916 904 if local_ns is None:
917 905 local_ns = caller_locals
918 906
919 907 app.kernel.user_module = module
920 908 app.kernel.user_ns = local_ns
921 909 app.start()
922 910
923 911 def main():
924 912 """Run an IPKernel as an application"""
925 913 app = IPKernelApp.instance()
926 914 app.initialize()
927 915 app.start()
928 916
929 917
930 918 if __name__ == '__main__':
931 919 main()
General Comments 0
You need to be logged in to leave comments. Login now