##// END OF EJS Templates
Add version_request/reply messaging protocol
Takafumi Arakaki -
Show More
@@ -1,932 +1,949 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 7 call set_parent on all the PUB objects with the message about to be executed.
8 8 * Implement random port and security key logic.
9 9 * Implement control messages.
10 10 * Implement event loop and poll version.
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 # Standard library imports
19 19 import __builtin__
20 20 import atexit
21 21 import sys
22 22 import time
23 23 import traceback
24 24 import logging
25 25 import uuid
26 26
27 27 from datetime import datetime
28 28 from signal import (
29 29 signal, getsignal, default_int_handler, SIGINT, SIG_IGN
30 30 )
31 31
32 32 # System library imports
33 33 import zmq
34 34 from zmq.eventloop import ioloop
35 35 from zmq.eventloop.zmqstream import ZMQStream
36 36
37 37 # Local imports
38 38 from IPython.config.configurable import Configurable
39 39 from IPython.config.application import boolean_flag, catch_config_error
40 40 from IPython.core.application import ProfileDir
41 41 from IPython.core.error import StdinNotImplementedError
42 42 from IPython.core.shellapp import (
43 43 InteractiveShellApp, shell_flags, shell_aliases
44 44 )
45 45 from IPython.utils import io
46 46 from IPython.utils import py3compat
47 47 from IPython.utils.frame import extract_module_locals
48 48 from IPython.utils.jsonutil import json_clean
49 49 from IPython.utils.traitlets import (
50 50 Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode
51 51 )
52 52
53 53 from entry_point import base_launch_kernel
54 54 from kernelapp import KernelApp, kernel_flags, kernel_aliases
55 55 from serialize import serialize_object, unpack_apply_message
56 56 from session import Session, Message
57 57 from zmqshell import ZMQInteractiveShell
58 58
59 59
60 60 #-----------------------------------------------------------------------------
61 61 # Main kernel class
62 62 #-----------------------------------------------------------------------------
63 63
64 # Change this when incrementing the kernel protocol version
65 version_major = 1
66 version_minor = 1
67 version = '{0}.{1}'.format(version_major, version_minor)
68
69
64 70 class Kernel(Configurable):
65 71
66 72 #---------------------------------------------------------------------------
67 73 # Kernel interface
68 74 #---------------------------------------------------------------------------
69 75
70 76 # attribute to override with a GUI
71 77 eventloop = Any(None)
72 78 def _eventloop_changed(self, name, old, new):
73 79 """schedule call to eventloop from IOLoop"""
74 80 loop = ioloop.IOLoop.instance()
75 81 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
76 82
77 83 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
78 84 session = Instance(Session)
79 85 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
80 86 shell_streams = List()
81 87 control_stream = Instance(ZMQStream)
82 88 iopub_socket = Instance(zmq.Socket)
83 89 stdin_socket = Instance(zmq.Socket)
84 90 log = Instance(logging.Logger)
85 91
86 92 user_module = Any()
87 93 def _user_module_changed(self, name, old, new):
88 94 if self.shell is not None:
89 95 self.shell.user_module = new
90 96
91 97 user_ns = Dict(default_value=None)
92 98 def _user_ns_changed(self, name, old, new):
93 99 if self.shell is not None:
94 100 self.shell.user_ns = new
95 101 self.shell.init_user_ns()
96 102
97 103 # identities:
98 104 int_id = Integer(-1)
99 105 ident = Unicode()
100 106
101 107 def _ident_default(self):
102 108 return unicode(uuid.uuid4())
103 109
104 110
105 111 # Private interface
106 112
107 113 # Time to sleep after flushing the stdout/err buffers in each execute
108 114 # cycle. While this introduces a hard limit on the minimal latency of the
109 115 # execute cycle, it helps prevent output synchronization problems for
110 116 # clients.
111 117 # Units are in seconds. The minimum zmq latency on local host is probably
112 118 # ~150 microseconds, set this to 500us for now. We may need to increase it
113 119 # a little if it's not enough after more interactive testing.
114 120 _execute_sleep = Float(0.0005, config=True)
115 121
116 122 # Frequency of the kernel's event loop.
117 123 # Units are in seconds, kernel subclasses for GUI toolkits may need to
118 124 # adapt to milliseconds.
119 125 _poll_interval = Float(0.05, config=True)
120 126
121 127 # If the shutdown was requested over the network, we leave here the
122 128 # necessary reply message so it can be sent by our registered atexit
123 129 # handler. This ensures that the reply is only sent to clients truly at
124 130 # the end of our shutdown process (which happens after the underlying
125 131 # IPython shell's own shutdown).
126 132 _shutdown_message = None
127 133
128 134 # This is a dict of port number that the kernel is listening on. It is set
129 135 # by record_ports and used by connect_request.
130 136 _recorded_ports = Dict()
131 137
132 138 # set of aborted msg_ids
133 139 aborted = Set()
134 140
135 141
136 142 def __init__(self, **kwargs):
137 143 super(Kernel, self).__init__(**kwargs)
138 144
139 145 # Initialize the InteractiveShell subclass
140 146 self.shell = ZMQInteractiveShell.instance(config=self.config,
141 147 profile_dir = self.profile_dir,
142 148 user_module = self.user_module,
143 149 user_ns = self.user_ns,
144 150 )
145 151 self.shell.displayhook.session = self.session
146 152 self.shell.displayhook.pub_socket = self.iopub_socket
147 153 self.shell.displayhook.topic = self._topic('pyout')
148 154 self.shell.display_pub.session = self.session
149 155 self.shell.display_pub.pub_socket = self.iopub_socket
150 156 self.shell.data_pub.session = self.session
151 157 self.shell.data_pub.pub_socket = self.iopub_socket
152 158
153 159 # TMP - hack while developing
154 160 self.shell._reply_content = None
155 161
156 162 # Build dict of handlers for message types
157 163 msg_types = [ 'execute_request', 'complete_request',
158 164 'object_info_request', 'history_request',
165 'version_request',
159 166 'connect_request', 'shutdown_request',
160 167 'apply_request',
161 168 ]
162 169 self.shell_handlers = {}
163 170 for msg_type in msg_types:
164 171 self.shell_handlers[msg_type] = getattr(self, msg_type)
165 172
166 173 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
167 174 self.control_handlers = {}
168 175 for msg_type in control_msg_types:
169 176 self.control_handlers[msg_type] = getattr(self, msg_type)
170 177
171 178 def dispatch_control(self, msg):
172 179 """dispatch control requests"""
173 180 idents,msg = self.session.feed_identities(msg, copy=False)
174 181 try:
175 182 msg = self.session.unserialize(msg, content=True, copy=False)
176 183 except:
177 184 self.log.error("Invalid Control Message", exc_info=True)
178 185 return
179 186
180 187 self.log.debug("Control received: %s", msg)
181 188
182 189 header = msg['header']
183 190 msg_id = header['msg_id']
184 191 msg_type = header['msg_type']
185 192
186 193 handler = self.control_handlers.get(msg_type, None)
187 194 if handler is None:
188 195 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
189 196 else:
190 197 try:
191 198 handler(self.control_stream, idents, msg)
192 199 except Exception:
193 200 self.log.error("Exception in control handler:", exc_info=True)
194 201
195 202 def dispatch_shell(self, stream, msg):
196 203 """dispatch shell requests"""
197 204 # flush control requests first
198 205 if self.control_stream:
199 206 self.control_stream.flush()
200 207
201 208 idents,msg = self.session.feed_identities(msg, copy=False)
202 209 try:
203 210 msg = self.session.unserialize(msg, content=True, copy=False)
204 211 except:
205 212 self.log.error("Invalid Message", exc_info=True)
206 213 return
207 214
208 215 header = msg['header']
209 216 msg_id = header['msg_id']
210 217 msg_type = msg['header']['msg_type']
211 218
212 219 # Print some info about this message and leave a '--->' marker, so it's
213 220 # easier to trace visually the message chain when debugging. Each
214 221 # handler prints its message at the end.
215 222 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
216 223 self.log.debug(' Content: %s\n --->\n ', msg['content'])
217 224
218 225 if msg_id in self.aborted:
219 226 self.aborted.remove(msg_id)
220 227 # is it safe to assume a msg_id will not be resubmitted?
221 228 reply_type = msg_type.split('_')[0] + '_reply'
222 229 status = {'status' : 'aborted'}
223 230 md = {'engine' : self.ident}
224 231 md.update(status)
225 232 reply_msg = self.session.send(stream, reply_type, metadata=md,
226 233 content=status, parent=msg, ident=idents)
227 234 return
228 235
229 236 handler = self.shell_handlers.get(msg_type, None)
230 237 if handler is None:
231 238 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
232 239 else:
233 240 # ensure default_int_handler during handler call
234 241 sig = signal(SIGINT, default_int_handler)
235 242 try:
236 243 handler(stream, idents, msg)
237 244 except Exception:
238 245 self.log.error("Exception in message handler:", exc_info=True)
239 246 finally:
240 247 signal(SIGINT, sig)
241 248
242 249 def enter_eventloop(self):
243 250 """enter eventloop"""
244 251 self.log.info("entering eventloop")
245 252 # restore default_int_handler
246 253 signal(SIGINT, default_int_handler)
247 254 while self.eventloop is not None:
248 255 try:
249 256 self.eventloop(self)
250 257 except KeyboardInterrupt:
251 258 # Ctrl-C shouldn't crash the kernel
252 259 self.log.error("KeyboardInterrupt caught in kernel")
253 260 continue
254 261 else:
255 262 # eventloop exited cleanly, this means we should stop (right?)
256 263 self.eventloop = None
257 264 break
258 265 self.log.info("exiting eventloop")
259 266
260 267 def start(self):
261 268 """register dispatchers for streams"""
262 269 self.shell.exit_now = False
263 270 if self.control_stream:
264 271 self.control_stream.on_recv(self.dispatch_control, copy=False)
265 272
266 273 def make_dispatcher(stream):
267 274 def dispatcher(msg):
268 275 return self.dispatch_shell(stream, msg)
269 276 return dispatcher
270 277
271 278 for s in self.shell_streams:
272 279 s.on_recv(make_dispatcher(s), copy=False)
273 280
274 281 def do_one_iteration(self):
275 282 """step eventloop just once"""
276 283 if self.control_stream:
277 284 self.control_stream.flush()
278 285 for stream in self.shell_streams:
279 286 # handle at most one request per iteration
280 287 stream.flush(zmq.POLLIN, 1)
281 288 stream.flush(zmq.POLLOUT)
282 289
283 290
284 291 def record_ports(self, ports):
285 292 """Record the ports that this kernel is using.
286 293
287 294 The creator of the Kernel instance must call this methods if they
288 295 want the :meth:`connect_request` method to return the port numbers.
289 296 """
290 297 self._recorded_ports = ports
291 298
292 299 #---------------------------------------------------------------------------
293 300 # Kernel request handlers
294 301 #---------------------------------------------------------------------------
295 302
296 303 def _make_metadata(self, other=None):
297 304 """init metadata dict, for execute/apply_reply"""
298 305 new_md = {
299 306 'dependencies_met' : True,
300 307 'engine' : self.ident,
301 308 'started': datetime.now(),
302 309 }
303 310 if other:
304 311 new_md.update(other)
305 312 return new_md
306 313
307 314 def _publish_pyin(self, code, parent, execution_count):
308 315 """Publish the code request on the pyin stream."""
309 316
310 317 self.session.send(self.iopub_socket, u'pyin',
311 318 {u'code':code, u'execution_count': execution_count},
312 319 parent=parent, ident=self._topic('pyin')
313 320 )
314 321
315 322 def _publish_status(self, status, parent=None):
316 323 """send status (busy/idle) on IOPub"""
317 324 self.session.send(self.iopub_socket,
318 325 u'status',
319 326 {u'execution_state': status},
320 327 parent=parent,
321 328 ident=self._topic('status'),
322 329 )
323 330
324 331
325 332 def execute_request(self, stream, ident, parent):
326 333 """handle an execute_request"""
327 334
328 335 self._publish_status(u'busy', parent)
329 336
330 337 try:
331 338 content = parent[u'content']
332 339 code = content[u'code']
333 340 silent = content[u'silent']
334 341 store_history = content.get(u'store_history', not silent)
335 342 except:
336 343 self.log.error("Got bad msg: ")
337 344 self.log.error("%s", parent)
338 345 return
339 346
340 347 md = self._make_metadata(parent['metadata'])
341 348
342 349 shell = self.shell # we'll need this a lot here
343 350
344 351 # Replace raw_input. Note that is not sufficient to replace
345 352 # raw_input in the user namespace.
346 353 if content.get('allow_stdin', False):
347 354 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
348 355 else:
349 356 raw_input = lambda prompt='' : self._no_raw_input()
350 357
351 358 if py3compat.PY3:
352 359 __builtin__.input = raw_input
353 360 else:
354 361 __builtin__.raw_input = raw_input
355 362
356 363 # Set the parent message of the display hook and out streams.
357 364 shell.displayhook.set_parent(parent)
358 365 shell.display_pub.set_parent(parent)
359 366 shell.data_pub.set_parent(parent)
360 367 sys.stdout.set_parent(parent)
361 368 sys.stderr.set_parent(parent)
362 369
363 370 # Re-broadcast our input for the benefit of listening clients, and
364 371 # start computing output
365 372 if not silent:
366 373 self._publish_pyin(code, parent, shell.execution_count)
367 374
368 375 reply_content = {}
369 376 try:
370 377 # FIXME: the shell calls the exception handler itself.
371 378 shell.run_cell(code, store_history=store_history, silent=silent)
372 379 except:
373 380 status = u'error'
374 381 # FIXME: this code right now isn't being used yet by default,
375 382 # because the run_cell() call above directly fires off exception
376 383 # reporting. This code, therefore, is only active in the scenario
377 384 # where runlines itself has an unhandled exception. We need to
378 385 # uniformize this, for all exception construction to come from a
379 386 # single location in the codbase.
380 387 etype, evalue, tb = sys.exc_info()
381 388 tb_list = traceback.format_exception(etype, evalue, tb)
382 389 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
383 390 else:
384 391 status = u'ok'
385 392
386 393 reply_content[u'status'] = status
387 394
388 395 # Return the execution counter so clients can display prompts
389 396 reply_content['execution_count'] = shell.execution_count - 1
390 397
391 398 # FIXME - fish exception info out of shell, possibly left there by
392 399 # runlines. We'll need to clean up this logic later.
393 400 if shell._reply_content is not None:
394 401 reply_content.update(shell._reply_content)
395 402 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
396 403 reply_content['engine_info'] = e_info
397 404 # reset after use
398 405 shell._reply_content = None
399 406
400 407 # At this point, we can tell whether the main code execution succeeded
401 408 # or not. If it did, we proceed to evaluate user_variables/expressions
402 409 if reply_content['status'] == 'ok':
403 410 reply_content[u'user_variables'] = \
404 411 shell.user_variables(content.get(u'user_variables', []))
405 412 reply_content[u'user_expressions'] = \
406 413 shell.user_expressions(content.get(u'user_expressions', {}))
407 414 else:
408 415 # If there was an error, don't even try to compute variables or
409 416 # expressions
410 417 reply_content[u'user_variables'] = {}
411 418 reply_content[u'user_expressions'] = {}
412 419
413 420 # Payloads should be retrieved regardless of outcome, so we can both
414 421 # recover partial output (that could have been generated early in a
415 422 # block, before an error) and clear the payload system always.
416 423 reply_content[u'payload'] = shell.payload_manager.read_payload()
417 424 # Be agressive about clearing the payload because we don't want
418 425 # it to sit in memory until the next execute_request comes in.
419 426 shell.payload_manager.clear_payload()
420 427
421 428 # Flush output before sending the reply.
422 429 sys.stdout.flush()
423 430 sys.stderr.flush()
424 431 # FIXME: on rare occasions, the flush doesn't seem to make it to the
425 432 # clients... This seems to mitigate the problem, but we definitely need
426 433 # to better understand what's going on.
427 434 if self._execute_sleep:
428 435 time.sleep(self._execute_sleep)
429 436
430 437 # Send the reply.
431 438 reply_content = json_clean(reply_content)
432 439
433 440 md['status'] = reply_content['status']
434 441 if reply_content['status'] == 'error' and \
435 442 reply_content['ename'] == 'UnmetDependency':
436 443 md['dependencies_met'] = False
437 444
438 445 reply_msg = self.session.send(stream, u'execute_reply',
439 446 reply_content, parent, metadata=md,
440 447 ident=ident)
441 448
442 449 self.log.debug("%s", reply_msg)
443 450
444 451 if not silent and reply_msg['content']['status'] == u'error':
445 452 self._abort_queues()
446 453
447 454 self._publish_status(u'idle', parent)
448 455
449 456 def complete_request(self, stream, ident, parent):
450 457 txt, matches = self._complete(parent)
451 458 matches = {'matches' : matches,
452 459 'matched_text' : txt,
453 460 'status' : 'ok'}
454 461 matches = json_clean(matches)
455 462 completion_msg = self.session.send(stream, 'complete_reply',
456 463 matches, parent, ident)
457 464 self.log.debug("%s", completion_msg)
458 465
459 466 def object_info_request(self, stream, ident, parent):
460 467 content = parent['content']
461 468 object_info = self.shell.object_inspect(content['oname'],
462 469 detail_level = content.get('detail_level', 0)
463 470 )
464 471 # Before we send this object over, we scrub it for JSON usage
465 472 oinfo = json_clean(object_info)
466 473 msg = self.session.send(stream, 'object_info_reply',
467 474 oinfo, parent, ident)
468 475 self.log.debug("%s", msg)
469 476
470 477 def history_request(self, stream, ident, parent):
471 478 # We need to pull these out, as passing **kwargs doesn't work with
472 479 # unicode keys before Python 2.6.5.
473 480 hist_access_type = parent['content']['hist_access_type']
474 481 raw = parent['content']['raw']
475 482 output = parent['content']['output']
476 483 if hist_access_type == 'tail':
477 484 n = parent['content']['n']
478 485 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
479 486 include_latest=True)
480 487
481 488 elif hist_access_type == 'range':
482 489 session = parent['content']['session']
483 490 start = parent['content']['start']
484 491 stop = parent['content']['stop']
485 492 hist = self.shell.history_manager.get_range(session, start, stop,
486 493 raw=raw, output=output)
487 494
488 495 elif hist_access_type == 'search':
489 496 n = parent['content'].get('n')
490 497 pattern = parent['content']['pattern']
491 498 hist = self.shell.history_manager.search(pattern, raw=raw,
492 499 output=output, n=n)
493 500
494 501 else:
495 502 hist = []
496 503 hist = list(hist)
497 504 content = {'history' : hist}
498 505 content = json_clean(content)
499 506 msg = self.session.send(stream, 'history_reply',
500 507 content, parent, ident)
501 508 self.log.debug("Sending history reply with %i entries", len(hist))
502 509
503 510 def connect_request(self, stream, ident, parent):
504 511 if self._recorded_ports is not None:
505 512 content = self._recorded_ports.copy()
506 513 else:
507 514 content = {}
508 515 msg = self.session.send(stream, 'connect_reply',
509 516 content, parent, ident)
510 517 self.log.debug("%s", msg)
511 518
519 def version_request(self, stream, ident, parent):
520 vinfo = {
521 'version': version,
522 'version_major': version_major,
523 'version_minor': version_minor,
524 }
525 msg = self.session.send(stream, 'version_reply',
526 vinfo, parent, ident)
527 self.log.debug("%s", msg)
528
512 529 def shutdown_request(self, stream, ident, parent):
513 530 self.shell.exit_now = True
514 531 content = dict(status='ok')
515 532 content.update(parent['content'])
516 533 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
517 534 # same content, but different msg_id for broadcasting on IOPub
518 535 self._shutdown_message = self.session.msg(u'shutdown_reply',
519 536 content, parent
520 537 )
521 538
522 539 self._at_shutdown()
523 540 # call sys.exit after a short delay
524 541 loop = ioloop.IOLoop.instance()
525 542 loop.add_timeout(time.time()+0.1, loop.stop)
526 543
527 544 #---------------------------------------------------------------------------
528 545 # Engine methods
529 546 #---------------------------------------------------------------------------
530 547
531 548 def apply_request(self, stream, ident, parent):
532 549 try:
533 550 content = parent[u'content']
534 551 bufs = parent[u'buffers']
535 552 msg_id = parent['header']['msg_id']
536 553 except:
537 554 self.log.error("Got bad msg: %s", parent, exc_info=True)
538 555 return
539 556
540 557 self._publish_status(u'busy', parent)
541 558
542 559 # Set the parent message of the display hook and out streams.
543 560 shell = self.shell
544 561 shell.displayhook.set_parent(parent)
545 562 shell.display_pub.set_parent(parent)
546 563 shell.data_pub.set_parent(parent)
547 564 sys.stdout.set_parent(parent)
548 565 sys.stderr.set_parent(parent)
549 566
550 567 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
551 568 # self.iopub_socket.send(pyin_msg)
552 569 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
553 570 md = self._make_metadata(parent['metadata'])
554 571 try:
555 572 working = shell.user_ns
556 573
557 574 prefix = "_"+str(msg_id).replace("-","")+"_"
558 575
559 576 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
560 577
561 578 fname = getattr(f, '__name__', 'f')
562 579
563 580 fname = prefix+"f"
564 581 argname = prefix+"args"
565 582 kwargname = prefix+"kwargs"
566 583 resultname = prefix+"result"
567 584
568 585 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
569 586 # print ns
570 587 working.update(ns)
571 588 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
572 589 try:
573 590 exec code in shell.user_global_ns, shell.user_ns
574 591 result = working.get(resultname)
575 592 finally:
576 593 for key in ns.iterkeys():
577 594 working.pop(key)
578 595
579 596 result_buf = serialize_object(result,
580 597 buffer_threshold=self.session.buffer_threshold,
581 598 item_threshold=self.session.item_threshold,
582 599 )
583 600
584 601 except:
585 602 # invoke IPython traceback formatting
586 603 shell.showtraceback()
587 604 # FIXME - fish exception info out of shell, possibly left there by
588 605 # run_code. We'll need to clean up this logic later.
589 606 reply_content = {}
590 607 if shell._reply_content is not None:
591 608 reply_content.update(shell._reply_content)
592 609 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
593 610 reply_content['engine_info'] = e_info
594 611 # reset after use
595 612 shell._reply_content = None
596 613
597 614 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
598 615 ident=self._topic('pyerr'))
599 616 result_buf = []
600 617
601 618 if reply_content['ename'] == 'UnmetDependency':
602 619 md['dependencies_met'] = False
603 620 else:
604 621 reply_content = {'status' : 'ok'}
605 622
606 623 # put 'ok'/'error' status in header, for scheduler introspection:
607 624 md['status'] = reply_content['status']
608 625
609 626 # flush i/o
610 627 sys.stdout.flush()
611 628 sys.stderr.flush()
612 629
613 630 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
614 631 parent=parent, ident=ident,buffers=result_buf, metadata=md)
615 632
616 633 self._publish_status(u'idle', parent)
617 634
618 635 #---------------------------------------------------------------------------
619 636 # Control messages
620 637 #---------------------------------------------------------------------------
621 638
622 639 def abort_request(self, stream, ident, parent):
623 640 """abort a specifig msg by id"""
624 641 msg_ids = parent['content'].get('msg_ids', None)
625 642 if isinstance(msg_ids, basestring):
626 643 msg_ids = [msg_ids]
627 644 if not msg_ids:
628 645 self.abort_queues()
629 646 for mid in msg_ids:
630 647 self.aborted.add(str(mid))
631 648
632 649 content = dict(status='ok')
633 650 reply_msg = self.session.send(stream, 'abort_reply', content=content,
634 651 parent=parent, ident=ident)
635 652 self.log.debug("%s", reply_msg)
636 653
637 654 def clear_request(self, stream, idents, parent):
638 655 """Clear our namespace."""
639 656 self.shell.reset(False)
640 657 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
641 658 content = dict(status='ok'))
642 659
643 660
644 661 #---------------------------------------------------------------------------
645 662 # Protected interface
646 663 #---------------------------------------------------------------------------
647 664
648 665
649 666 def _wrap_exception(self, method=None):
650 667 # import here, because _wrap_exception is only used in parallel,
651 668 # and parallel has higher min pyzmq version
652 669 from IPython.parallel.error import wrap_exception
653 670 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
654 671 content = wrap_exception(e_info)
655 672 return content
656 673
657 674 def _topic(self, topic):
658 675 """prefixed topic for IOPub messages"""
659 676 if self.int_id >= 0:
660 677 base = "engine.%i" % self.int_id
661 678 else:
662 679 base = "kernel.%s" % self.ident
663 680
664 681 return py3compat.cast_bytes("%s.%s" % (base, topic))
665 682
666 683 def _abort_queues(self):
667 684 for stream in self.shell_streams:
668 685 if stream:
669 686 self._abort_queue(stream)
670 687
671 688 def _abort_queue(self, stream):
672 689 poller = zmq.Poller()
673 690 poller.register(stream.socket, zmq.POLLIN)
674 691 while True:
675 692 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
676 693 if msg is None:
677 694 return
678 695
679 696 self.log.info("Aborting:")
680 697 self.log.info("%s", msg)
681 698 msg_type = msg['header']['msg_type']
682 699 reply_type = msg_type.split('_')[0] + '_reply'
683 700
684 701 status = {'status' : 'aborted'}
685 702 md = {'engine' : self.ident}
686 703 md.update(status)
687 704 reply_msg = self.session.send(stream, reply_type, metadata=md,
688 705 content=status, parent=msg, ident=idents)
689 706 self.log.debug("%s", reply_msg)
690 707 # We need to wait a bit for requests to come in. This can probably
691 708 # be set shorter for true asynchronous clients.
692 709 poller.poll(50)
693 710
694 711
695 712 def _no_raw_input(self):
696 713 """Raise StdinNotImplentedError if active frontend doesn't support
697 714 stdin."""
698 715 raise StdinNotImplementedError("raw_input was called, but this "
699 716 "frontend does not support stdin.")
700 717
701 718 def _raw_input(self, prompt, ident, parent):
702 719 # Flush output before making the request.
703 720 sys.stderr.flush()
704 721 sys.stdout.flush()
705 722
706 723 # Send the input request.
707 724 content = json_clean(dict(prompt=prompt))
708 725 self.session.send(self.stdin_socket, u'input_request', content, parent,
709 726 ident=ident)
710 727
711 728 # Await a response.
712 729 while True:
713 730 try:
714 731 ident, reply = self.session.recv(self.stdin_socket, 0)
715 732 except Exception:
716 733 self.log.warn("Invalid Message:", exc_info=True)
717 734 else:
718 735 break
719 736 try:
720 737 value = reply['content']['value']
721 738 except:
722 739 self.log.error("Got bad raw_input reply: ")
723 740 self.log.error("%s", parent)
724 741 value = ''
725 742 if value == '\x04':
726 743 # EOF
727 744 raise EOFError
728 745 return value
729 746
730 747 def _complete(self, msg):
731 748 c = msg['content']
732 749 try:
733 750 cpos = int(c['cursor_pos'])
734 751 except:
735 752 # If we don't get something that we can convert to an integer, at
736 753 # least attempt the completion guessing the cursor is at the end of
737 754 # the text, if there's any, and otherwise of the line
738 755 cpos = len(c['text'])
739 756 if cpos==0:
740 757 cpos = len(c['line'])
741 758 return self.shell.complete(c['text'], c['line'], cpos)
742 759
743 760 def _object_info(self, context):
744 761 symbol, leftover = self._symbol_from_context(context)
745 762 if symbol is not None and not leftover:
746 763 doc = getattr(symbol, '__doc__', '')
747 764 else:
748 765 doc = ''
749 766 object_info = dict(docstring = doc)
750 767 return object_info
751 768
752 769 def _symbol_from_context(self, context):
753 770 if not context:
754 771 return None, context
755 772
756 773 base_symbol_string = context[0]
757 774 symbol = self.shell.user_ns.get(base_symbol_string, None)
758 775 if symbol is None:
759 776 symbol = __builtin__.__dict__.get(base_symbol_string, None)
760 777 if symbol is None:
761 778 return None, context
762 779
763 780 context = context[1:]
764 781 for i, name in enumerate(context):
765 782 new_symbol = getattr(symbol, name, None)
766 783 if new_symbol is None:
767 784 return symbol, context[i:]
768 785 else:
769 786 symbol = new_symbol
770 787
771 788 return symbol, []
772 789
773 790 def _at_shutdown(self):
774 791 """Actions taken at shutdown by the kernel, called by python's atexit.
775 792 """
776 793 # io.rprint("Kernel at_shutdown") # dbg
777 794 if self._shutdown_message is not None:
778 795 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
779 796 self.log.debug("%s", self._shutdown_message)
780 797 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
781 798
782 799 #-----------------------------------------------------------------------------
783 800 # Aliases and Flags for the IPKernelApp
784 801 #-----------------------------------------------------------------------------
785 802
786 803 flags = dict(kernel_flags)
787 804 flags.update(shell_flags)
788 805
789 806 addflag = lambda *args: flags.update(boolean_flag(*args))
790 807
791 808 flags['pylab'] = (
792 809 {'IPKernelApp' : {'pylab' : 'auto'}},
793 810 """Pre-load matplotlib and numpy for interactive use with
794 811 the default matplotlib backend."""
795 812 )
796 813
797 814 aliases = dict(kernel_aliases)
798 815 aliases.update(shell_aliases)
799 816
800 817 #-----------------------------------------------------------------------------
801 818 # The IPKernelApp class
802 819 #-----------------------------------------------------------------------------
803 820
804 821 class IPKernelApp(KernelApp, InteractiveShellApp):
805 822 name = 'ipkernel'
806 823
807 824 aliases = Dict(aliases)
808 825 flags = Dict(flags)
809 826 classes = [Kernel, ZMQInteractiveShell, ProfileDir, Session]
810 827
811 828 @catch_config_error
812 829 def initialize(self, argv=None):
813 830 super(IPKernelApp, self).initialize(argv)
814 831 self.init_path()
815 832 self.init_shell()
816 833 self.init_gui_pylab()
817 834 self.init_extensions()
818 835 self.init_code()
819 836
820 837 def init_kernel(self):
821 838
822 839 shell_stream = ZMQStream(self.shell_socket)
823 840
824 841 kernel = Kernel(config=self.config, session=self.session,
825 842 shell_streams=[shell_stream],
826 843 iopub_socket=self.iopub_socket,
827 844 stdin_socket=self.stdin_socket,
828 845 log=self.log,
829 846 profile_dir=self.profile_dir,
830 847 )
831 848 self.kernel = kernel
832 849 kernel.record_ports(self.ports)
833 850 shell = kernel.shell
834 851
835 852 def init_gui_pylab(self):
836 853 """Enable GUI event loop integration, taking pylab into account."""
837 854
838 855 # Provide a wrapper for :meth:`InteractiveShellApp.init_gui_pylab`
839 856 # to ensure that any exception is printed straight to stderr.
840 857 # Normally _showtraceback associates the reply with an execution,
841 858 # which means frontends will never draw it, as this exception
842 859 # is not associated with any execute request.
843 860
844 861 shell = self.shell
845 862 _showtraceback = shell._showtraceback
846 863 try:
847 864 # replace pyerr-sending traceback with stderr
848 865 def print_tb(etype, evalue, stb):
849 866 print ("GUI event loop or pylab initialization failed",
850 867 file=io.stderr)
851 868 print (shell.InteractiveTB.stb2text(stb), file=io.stderr)
852 869 shell._showtraceback = print_tb
853 870 InteractiveShellApp.init_gui_pylab(self)
854 871 finally:
855 872 shell._showtraceback = _showtraceback
856 873
857 874 def init_shell(self):
858 875 self.shell = self.kernel.shell
859 876 self.shell.configurables.append(self)
860 877
861 878
862 879 #-----------------------------------------------------------------------------
863 880 # Kernel main and launch functions
864 881 #-----------------------------------------------------------------------------
865 882
866 883 def launch_kernel(*args, **kwargs):
867 884 """Launches a localhost IPython kernel, binding to the specified ports.
868 885
869 886 This function simply calls entry_point.base_launch_kernel with the right
870 887 first command to start an ipkernel. See base_launch_kernel for arguments.
871 888
872 889 Returns
873 890 -------
874 891 A tuple of form:
875 892 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
876 893 where kernel_process is a Popen object and the ports are integers.
877 894 """
878 895 return base_launch_kernel('from IPython.zmq.ipkernel import main; main()',
879 896 *args, **kwargs)
880 897
881 898
882 899 def embed_kernel(module=None, local_ns=None, **kwargs):
883 900 """Embed and start an IPython kernel in a given scope.
884 901
885 902 Parameters
886 903 ----------
887 904 module : ModuleType, optional
888 905 The module to load into IPython globals (default: caller)
889 906 local_ns : dict, optional
890 907 The namespace to load into IPython user namespace (default: caller)
891 908
892 909 kwargs : various, optional
893 910 Further keyword args are relayed to the KernelApp constructor,
894 911 allowing configuration of the Kernel. Will only have an effect
895 912 on the first embed_kernel call for a given process.
896 913
897 914 """
898 915 # get the app if it exists, or set it up if it doesn't
899 916 if IPKernelApp.initialized():
900 917 app = IPKernelApp.instance()
901 918 else:
902 919 app = IPKernelApp.instance(**kwargs)
903 920 app.initialize([])
904 921 # Undo unnecessary sys module mangling from init_sys_modules.
905 922 # This would not be necessary if we could prevent it
906 923 # in the first place by using a different InteractiveShell
907 924 # subclass, as in the regular embed case.
908 925 main = app.kernel.shell._orig_sys_modules_main_mod
909 926 if main is not None:
910 927 sys.modules[app.kernel.shell._orig_sys_modules_main_name] = main
911 928
912 929 # load the calling scope if not given
913 930 (caller_module, caller_locals) = extract_module_locals(1)
914 931 if module is None:
915 932 module = caller_module
916 933 if local_ns is None:
917 934 local_ns = caller_locals
918 935
919 936 app.kernel.user_module = module
920 937 app.kernel.user_ns = local_ns
921 938 app.shell.set_completer_frame()
922 939 app.start()
923 940
924 941 def main():
925 942 """Run an IPKernel as an application"""
926 943 app = IPKernelApp.instance()
927 944 app.initialize()
928 945 app.start()
929 946
930 947
931 948 if __name__ == '__main__':
932 949 main()
General Comments 0
You need to be logged in to leave comments. Login now