##// END OF EJS Templates
add banner to kernel_info_reply
MinRK -
Show More
@@ -1,850 +1,851
1 1 """An interactive kernel that talks to frontends over 0MQ."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from __future__ import print_function
7 7
8 8 import getpass
9 9 import sys
10 10 import time
11 11 import traceback
12 12 import logging
13 13 import uuid
14 14
15 15 from datetime import datetime
16 16 from signal import (
17 17 signal, default_int_handler, SIGINT
18 18 )
19 19
20 20 import zmq
21 21 from zmq.eventloop import ioloop
22 22 from zmq.eventloop.zmqstream import ZMQStream
23 23
24 24 from IPython.config.configurable import Configurable
25 25 from IPython.core.error import StdinNotImplementedError
26 26 from IPython.core import release
27 27 from IPython.utils import py3compat
28 28 from IPython.utils.py3compat import builtin_mod, unicode_type, string_types
29 29 from IPython.utils.jsonutil import json_clean
30 30 from IPython.utils.tokenutil import token_at_cursor
31 31 from IPython.utils.traitlets import (
32 32 Any, Instance, Float, Dict, List, Set, Integer, Unicode,
33 33 Type, Bool,
34 34 )
35 35
36 36 from .serialize import serialize_object, unpack_apply_message
37 37 from .session import Session
38 38 from .zmqshell import ZMQInteractiveShell
39 39
40 40
41 41 #-----------------------------------------------------------------------------
42 42 # Main kernel class
43 43 #-----------------------------------------------------------------------------
44 44
45 45 protocol_version = release.kernel_protocol_version
46 46 ipython_version = release.version
47 47 language_version = sys.version.split()[0]
48 48
49 49
50 50 class Kernel(Configurable):
51 51
52 52 #---------------------------------------------------------------------------
53 53 # Kernel interface
54 54 #---------------------------------------------------------------------------
55 55
56 56 # attribute to override with a GUI
57 57 eventloop = Any(None)
58 58 def _eventloop_changed(self, name, old, new):
59 59 """schedule call to eventloop from IOLoop"""
60 60 loop = ioloop.IOLoop.instance()
61 61 loop.add_callback(self.enter_eventloop)
62 62
63 63 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
64 64 shell_class = Type(ZMQInteractiveShell)
65 65
66 66 session = Instance(Session)
67 67 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
68 68 shell_streams = List()
69 69 control_stream = Instance(ZMQStream)
70 70 iopub_socket = Instance(zmq.Socket)
71 71 stdin_socket = Instance(zmq.Socket)
72 72 log = Instance(logging.Logger)
73 73
74 74 user_module = Any()
75 75 def _user_module_changed(self, name, old, new):
76 76 if self.shell is not None:
77 77 self.shell.user_module = new
78 78
79 79 user_ns = Instance(dict, args=None, allow_none=True)
80 80 def _user_ns_changed(self, name, old, new):
81 81 if self.shell is not None:
82 82 self.shell.user_ns = new
83 83 self.shell.init_user_ns()
84 84
85 85 # identities:
86 86 int_id = Integer(-1)
87 87 ident = Unicode()
88 88
89 89 def _ident_default(self):
90 90 return unicode_type(uuid.uuid4())
91 91
92 92 # Private interface
93 93
94 94 _darwin_app_nap = Bool(True, config=True,
95 95 help="""Whether to use appnope for compatiblity with OS X App Nap.
96 96
97 97 Only affects OS X >= 10.9.
98 98 """
99 99 )
100 100
101 101 # track associations with current request
102 102 _allow_stdin = Bool(False)
103 103 _parent_header = Dict()
104 104 _parent_ident = Any(b'')
105 105 # Time to sleep after flushing the stdout/err buffers in each execute
106 106 # cycle. While this introduces a hard limit on the minimal latency of the
107 107 # execute cycle, it helps prevent output synchronization problems for
108 108 # clients.
109 109 # Units are in seconds. The minimum zmq latency on local host is probably
110 110 # ~150 microseconds, set this to 500us for now. We may need to increase it
111 111 # a little if it's not enough after more interactive testing.
112 112 _execute_sleep = Float(0.0005, config=True)
113 113
114 114 # Frequency of the kernel's event loop.
115 115 # Units are in seconds, kernel subclasses for GUI toolkits may need to
116 116 # adapt to milliseconds.
117 117 _poll_interval = Float(0.05, config=True)
118 118
119 119 # If the shutdown was requested over the network, we leave here the
120 120 # necessary reply message so it can be sent by our registered atexit
121 121 # handler. This ensures that the reply is only sent to clients truly at
122 122 # the end of our shutdown process (which happens after the underlying
123 123 # IPython shell's own shutdown).
124 124 _shutdown_message = None
125 125
126 126 # This is a dict of port number that the kernel is listening on. It is set
127 127 # by record_ports and used by connect_request.
128 128 _recorded_ports = Dict()
129 129
130 130 # A reference to the Python builtin 'raw_input' function.
131 131 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
132 132 _sys_raw_input = Any()
133 133 _sys_eval_input = Any()
134 134
135 135 # set of aborted msg_ids
136 136 aborted = Set()
137 137
138 138
139 139 def __init__(self, **kwargs):
140 140 super(Kernel, self).__init__(**kwargs)
141 141
142 142 # Initialize the InteractiveShell subclass
143 143 self.shell = self.shell_class.instance(parent=self,
144 144 profile_dir = self.profile_dir,
145 145 user_module = self.user_module,
146 146 user_ns = self.user_ns,
147 147 kernel = self,
148 148 )
149 149 self.shell.displayhook.session = self.session
150 150 self.shell.displayhook.pub_socket = self.iopub_socket
151 151 self.shell.displayhook.topic = self._topic('execute_result')
152 152 self.shell.display_pub.session = self.session
153 153 self.shell.display_pub.pub_socket = self.iopub_socket
154 154 self.shell.data_pub.session = self.session
155 155 self.shell.data_pub.pub_socket = self.iopub_socket
156 156
157 157 # TMP - hack while developing
158 158 self.shell._reply_content = None
159 159
160 160 # Build dict of handlers for message types
161 161 msg_types = [ 'execute_request', 'complete_request',
162 162 'object_info_request', 'history_request',
163 163 'kernel_info_request',
164 164 'connect_request', 'shutdown_request',
165 165 'apply_request',
166 166 ]
167 167 self.shell_handlers = {}
168 168 for msg_type in msg_types:
169 169 self.shell_handlers[msg_type] = getattr(self, msg_type)
170 170
171 171 comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]
172 172 comm_manager = self.shell.comm_manager
173 173 for msg_type in comm_msg_types:
174 174 self.shell_handlers[msg_type] = getattr(comm_manager, msg_type)
175 175
176 176 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
177 177 self.control_handlers = {}
178 178 for msg_type in control_msg_types:
179 179 self.control_handlers[msg_type] = getattr(self, msg_type)
180 180
181 181
182 182 def dispatch_control(self, msg):
183 183 """dispatch control requests"""
184 184 idents,msg = self.session.feed_identities(msg, copy=False)
185 185 try:
186 186 msg = self.session.unserialize(msg, content=True, copy=False)
187 187 except:
188 188 self.log.error("Invalid Control Message", exc_info=True)
189 189 return
190 190
191 191 self.log.debug("Control received: %s", msg)
192 192
193 193 header = msg['header']
194 194 msg_id = header['msg_id']
195 195 msg_type = header['msg_type']
196 196
197 197 handler = self.control_handlers.get(msg_type, None)
198 198 if handler is None:
199 199 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
200 200 else:
201 201 try:
202 202 handler(self.control_stream, idents, msg)
203 203 except Exception:
204 204 self.log.error("Exception in control handler:", exc_info=True)
205 205
206 206 def dispatch_shell(self, stream, msg):
207 207 """dispatch shell requests"""
208 208 # flush control requests first
209 209 if self.control_stream:
210 210 self.control_stream.flush()
211 211
212 212 idents,msg = self.session.feed_identities(msg, copy=False)
213 213 try:
214 214 msg = self.session.unserialize(msg, content=True, copy=False)
215 215 except:
216 216 self.log.error("Invalid Message", exc_info=True)
217 217 return
218 218
219 219 header = msg['header']
220 220 msg_id = header['msg_id']
221 221 msg_type = msg['header']['msg_type']
222 222
223 223 # Print some info about this message and leave a '--->' marker, so it's
224 224 # easier to trace visually the message chain when debugging. Each
225 225 # handler prints its message at the end.
226 226 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
227 227 self.log.debug(' Content: %s\n --->\n ', msg['content'])
228 228
229 229 if msg_id in self.aborted:
230 230 self.aborted.remove(msg_id)
231 231 # is it safe to assume a msg_id will not be resubmitted?
232 232 reply_type = msg_type.split('_')[0] + '_reply'
233 233 status = {'status' : 'aborted'}
234 234 md = {'engine' : self.ident}
235 235 md.update(status)
236 236 reply_msg = self.session.send(stream, reply_type, metadata=md,
237 237 content=status, parent=msg, ident=idents)
238 238 return
239 239
240 240 handler = self.shell_handlers.get(msg_type, None)
241 241 if handler is None:
242 242 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
243 243 else:
244 244 # ensure default_int_handler during handler call
245 245 sig = signal(SIGINT, default_int_handler)
246 246 self.log.debug("%s: %s", msg_type, msg)
247 247 try:
248 248 handler(stream, idents, msg)
249 249 except Exception:
250 250 self.log.error("Exception in message handler:", exc_info=True)
251 251 finally:
252 252 signal(SIGINT, sig)
253 253
254 254 def enter_eventloop(self):
255 255 """enter eventloop"""
256 256 self.log.info("entering eventloop %s", self.eventloop)
257 257 for stream in self.shell_streams:
258 258 # flush any pending replies,
259 259 # which may be skipped by entering the eventloop
260 260 stream.flush(zmq.POLLOUT)
261 261 # restore default_int_handler
262 262 signal(SIGINT, default_int_handler)
263 263 while self.eventloop is not None:
264 264 try:
265 265 self.eventloop(self)
266 266 except KeyboardInterrupt:
267 267 # Ctrl-C shouldn't crash the kernel
268 268 self.log.error("KeyboardInterrupt caught in kernel")
269 269 continue
270 270 else:
271 271 # eventloop exited cleanly, this means we should stop (right?)
272 272 self.eventloop = None
273 273 break
274 274 self.log.info("exiting eventloop")
275 275
276 276 def start(self):
277 277 """register dispatchers for streams"""
278 278 self.shell.exit_now = False
279 279 if self.control_stream:
280 280 self.control_stream.on_recv(self.dispatch_control, copy=False)
281 281
282 282 def make_dispatcher(stream):
283 283 def dispatcher(msg):
284 284 return self.dispatch_shell(stream, msg)
285 285 return dispatcher
286 286
287 287 for s in self.shell_streams:
288 288 s.on_recv(make_dispatcher(s), copy=False)
289 289
290 290 # publish idle status
291 291 self._publish_status('starting')
292 292
293 293 def do_one_iteration(self):
294 294 """step eventloop just once"""
295 295 if self.control_stream:
296 296 self.control_stream.flush()
297 297 for stream in self.shell_streams:
298 298 # handle at most one request per iteration
299 299 stream.flush(zmq.POLLIN, 1)
300 300 stream.flush(zmq.POLLOUT)
301 301
302 302
303 303 def record_ports(self, ports):
304 304 """Record the ports that this kernel is using.
305 305
306 306 The creator of the Kernel instance must call this methods if they
307 307 want the :meth:`connect_request` method to return the port numbers.
308 308 """
309 309 self._recorded_ports = ports
310 310
311 311 #---------------------------------------------------------------------------
312 312 # Kernel request handlers
313 313 #---------------------------------------------------------------------------
314 314
315 315 def _make_metadata(self, other=None):
316 316 """init metadata dict, for execute/apply_reply"""
317 317 new_md = {
318 318 'dependencies_met' : True,
319 319 'engine' : self.ident,
320 320 'started': datetime.now(),
321 321 }
322 322 if other:
323 323 new_md.update(other)
324 324 return new_md
325 325
326 326 def _publish_execute_input(self, code, parent, execution_count):
327 327 """Publish the code request on the iopub stream."""
328 328
329 329 self.session.send(self.iopub_socket, u'execute_input',
330 330 {u'code':code, u'execution_count': execution_count},
331 331 parent=parent, ident=self._topic('execute_input')
332 332 )
333 333
334 334 def _publish_status(self, status, parent=None):
335 335 """send status (busy/idle) on IOPub"""
336 336 self.session.send(self.iopub_socket,
337 337 u'status',
338 338 {u'execution_state': status},
339 339 parent=parent,
340 340 ident=self._topic('status'),
341 341 )
342 342
343 343 def _forward_input(self, allow_stdin=False):
344 344 """Forward raw_input and getpass to the current frontend.
345 345
346 346 via input_request
347 347 """
348 348 self._allow_stdin = allow_stdin
349 349
350 350 if py3compat.PY3:
351 351 self._sys_raw_input = builtin_mod.input
352 352 builtin_mod.input = self.raw_input
353 353 else:
354 354 self._sys_raw_input = builtin_mod.raw_input
355 355 self._sys_eval_input = builtin_mod.input
356 356 builtin_mod.raw_input = self.raw_input
357 357 builtin_mod.input = lambda prompt='': eval(self.raw_input(prompt))
358 358 self._save_getpass = getpass.getpass
359 359 getpass.getpass = self.getpass
360 360
361 361 def _restore_input(self):
362 362 """Restore raw_input, getpass"""
363 363 if py3compat.PY3:
364 364 builtin_mod.input = self._sys_raw_input
365 365 else:
366 366 builtin_mod.raw_input = self._sys_raw_input
367 367 builtin_mod.input = self._sys_eval_input
368 368
369 369 getpass.getpass = self._save_getpass
370 370
371 371 def set_parent(self, ident, parent):
372 372 """Record the parent state
373 373
374 374 For associating side effects with their requests.
375 375 """
376 376 self._parent_ident = ident
377 377 self._parent_header = parent
378 378 self.shell.set_parent(parent)
379 379
380 380 def execute_request(self, stream, ident, parent):
381 381 """handle an execute_request"""
382 382
383 383 self._publish_status(u'busy', parent)
384 384
385 385 try:
386 386 content = parent[u'content']
387 387 code = py3compat.cast_unicode_py2(content[u'code'])
388 388 silent = content[u'silent']
389 389 store_history = content.get(u'store_history', not silent)
390 390 except:
391 391 self.log.error("Got bad msg: ")
392 392 self.log.error("%s", parent)
393 393 return
394 394
395 395 md = self._make_metadata(parent['metadata'])
396 396
397 397 shell = self.shell # we'll need this a lot here
398 398
399 399 self._forward_input(content.get('allow_stdin', False))
400 400 # Set the parent message of the display hook and out streams.
401 401 self.set_parent(ident, parent)
402 402
403 403 # Re-broadcast our input for the benefit of listening clients, and
404 404 # start computing output
405 405 if not silent:
406 406 self._publish_execute_input(code, parent, shell.execution_count)
407 407
408 408 reply_content = {}
409 409 # FIXME: the shell calls the exception handler itself.
410 410 shell._reply_content = None
411 411 try:
412 412 shell.run_cell(code, store_history=store_history, silent=silent)
413 413 except:
414 414 status = u'error'
415 415 # FIXME: this code right now isn't being used yet by default,
416 416 # because the run_cell() call above directly fires off exception
417 417 # reporting. This code, therefore, is only active in the scenario
418 418 # where runlines itself has an unhandled exception. We need to
419 419 # uniformize this, for all exception construction to come from a
420 420 # single location in the codbase.
421 421 etype, evalue, tb = sys.exc_info()
422 422 tb_list = traceback.format_exception(etype, evalue, tb)
423 423 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
424 424 else:
425 425 status = u'ok'
426 426 finally:
427 427 self._restore_input()
428 428
429 429 reply_content[u'status'] = status
430 430
431 431 # Return the execution counter so clients can display prompts
432 432 reply_content['execution_count'] = shell.execution_count - 1
433 433
434 434 # FIXME - fish exception info out of shell, possibly left there by
435 435 # runlines. We'll need to clean up this logic later.
436 436 if shell._reply_content is not None:
437 437 reply_content.update(shell._reply_content)
438 438 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
439 439 reply_content['engine_info'] = e_info
440 440 # reset after use
441 441 shell._reply_content = None
442 442
443 443 if 'traceback' in reply_content:
444 444 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
445 445
446 446
447 447 # At this point, we can tell whether the main code execution succeeded
448 448 # or not. If it did, we proceed to evaluate user_expressions
449 449 if reply_content['status'] == 'ok':
450 450 reply_content[u'user_expressions'] = \
451 451 shell.user_expressions(content.get(u'user_expressions', {}))
452 452 else:
453 453 # If there was an error, don't even try to compute expressions
454 454 reply_content[u'user_expressions'] = {}
455 455
456 456 # Payloads should be retrieved regardless of outcome, so we can both
457 457 # recover partial output (that could have been generated early in a
458 458 # block, before an error) and clear the payload system always.
459 459 reply_content[u'payload'] = shell.payload_manager.read_payload()
460 460 # Be agressive about clearing the payload because we don't want
461 461 # it to sit in memory until the next execute_request comes in.
462 462 shell.payload_manager.clear_payload()
463 463
464 464 # Flush output before sending the reply.
465 465 sys.stdout.flush()
466 466 sys.stderr.flush()
467 467 # FIXME: on rare occasions, the flush doesn't seem to make it to the
468 468 # clients... This seems to mitigate the problem, but we definitely need
469 469 # to better understand what's going on.
470 470 if self._execute_sleep:
471 471 time.sleep(self._execute_sleep)
472 472
473 473 # Send the reply.
474 474 reply_content = json_clean(reply_content)
475 475
476 476 md['status'] = reply_content['status']
477 477 if reply_content['status'] == 'error' and \
478 478 reply_content['ename'] == 'UnmetDependency':
479 479 md['dependencies_met'] = False
480 480
481 481 reply_msg = self.session.send(stream, u'execute_reply',
482 482 reply_content, parent, metadata=md,
483 483 ident=ident)
484 484
485 485 self.log.debug("%s", reply_msg)
486 486
487 487 if not silent and reply_msg['content']['status'] == u'error':
488 488 self._abort_queues()
489 489
490 490 self._publish_status(u'idle', parent)
491 491
492 492 def complete_request(self, stream, ident, parent):
493 493 content = parent['content']
494 494 code = content['code']
495 495 cursor_pos = content['cursor_pos']
496 496
497 497 txt, matches = self.shell.complete('', code, cursor_pos)
498 498 matches = {'matches' : matches,
499 499 'matched_text' : txt,
500 500 'status' : 'ok'}
501 501 matches = json_clean(matches)
502 502 completion_msg = self.session.send(stream, 'complete_reply',
503 503 matches, parent, ident)
504 504 self.log.debug("%s", completion_msg)
505 505
506 506 def object_info_request(self, stream, ident, parent):
507 507 content = parent['content']
508 508
509 509 name = token_at_cursor(content['code'], content['cursor_pos'])
510 510 info = self.shell.object_inspect(name)
511 511
512 512 reply_content = {'status' : 'ok'}
513 513 reply_content['data'] = data = {}
514 514 reply_content['metadata'] = {}
515 515 reply_content['name'] = name
516 516 reply_content['found'] = info['found']
517 517 if info['found']:
518 518 info_text = self.shell.object_inspect_text(
519 519 name,
520 520 detail_level=content.get('detail_level', 0),
521 521 )
522 522 reply_content['data']['text/plain'] = info_text
523 523 # Before we send this object over, we scrub it for JSON usage
524 524 reply_content = json_clean(reply_content)
525 525 msg = self.session.send(stream, 'object_info_reply',
526 526 reply_content, parent, ident)
527 527 self.log.debug("%s", msg)
528 528
529 529 def history_request(self, stream, ident, parent):
530 530 # We need to pull these out, as passing **kwargs doesn't work with
531 531 # unicode keys before Python 2.6.5.
532 532 hist_access_type = parent['content']['hist_access_type']
533 533 raw = parent['content']['raw']
534 534 output = parent['content']['output']
535 535 if hist_access_type == 'tail':
536 536 n = parent['content']['n']
537 537 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
538 538 include_latest=True)
539 539
540 540 elif hist_access_type == 'range':
541 541 session = parent['content']['session']
542 542 start = parent['content']['start']
543 543 stop = parent['content']['stop']
544 544 hist = self.shell.history_manager.get_range(session, start, stop,
545 545 raw=raw, output=output)
546 546
547 547 elif hist_access_type == 'search':
548 548 n = parent['content'].get('n')
549 549 unique = parent['content'].get('unique', False)
550 550 pattern = parent['content']['pattern']
551 551 hist = self.shell.history_manager.search(
552 552 pattern, raw=raw, output=output, n=n, unique=unique)
553 553
554 554 else:
555 555 hist = []
556 556 hist = list(hist)
557 557 content = {'history' : hist}
558 558 content = json_clean(content)
559 559 msg = self.session.send(stream, 'history_reply',
560 560 content, parent, ident)
561 561 self.log.debug("Sending history reply with %i entries", len(hist))
562 562
563 563 def connect_request(self, stream, ident, parent):
564 564 if self._recorded_ports is not None:
565 565 content = self._recorded_ports.copy()
566 566 else:
567 567 content = {}
568 568 msg = self.session.send(stream, 'connect_reply',
569 569 content, parent, ident)
570 570 self.log.debug("%s", msg)
571 571
572 572 def kernel_info_request(self, stream, ident, parent):
573 573 vinfo = {
574 574 'protocol_version': protocol_version,
575 575 'ipython_version': ipython_version,
576 576 'language_version': language_version,
577 577 'language': 'python',
578 'banner': self.shell.banner,
578 579 }
579 580 msg = self.session.send(stream, 'kernel_info_reply',
580 581 vinfo, parent, ident)
581 582 self.log.debug("%s", msg)
582 583
583 584 def shutdown_request(self, stream, ident, parent):
584 585 self.shell.exit_now = True
585 586 content = dict(status='ok')
586 587 content.update(parent['content'])
587 588 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
588 589 # same content, but different msg_id for broadcasting on IOPub
589 590 self._shutdown_message = self.session.msg(u'shutdown_reply',
590 591 content, parent
591 592 )
592 593
593 594 self._at_shutdown()
594 595 # call sys.exit after a short delay
595 596 loop = ioloop.IOLoop.instance()
596 597 loop.add_timeout(time.time()+0.1, loop.stop)
597 598
598 599 #---------------------------------------------------------------------------
599 600 # Engine methods
600 601 #---------------------------------------------------------------------------
601 602
602 603 def apply_request(self, stream, ident, parent):
603 604 try:
604 605 content = parent[u'content']
605 606 bufs = parent[u'buffers']
606 607 msg_id = parent['header']['msg_id']
607 608 except:
608 609 self.log.error("Got bad msg: %s", parent, exc_info=True)
609 610 return
610 611
611 612 self._publish_status(u'busy', parent)
612 613
613 614 # Set the parent message of the display hook and out streams.
614 615 shell = self.shell
615 616 shell.set_parent(parent)
616 617
617 618 # execute_input_msg = self.session.msg(u'execute_input',{u'code':code}, parent=parent)
618 619 # self.iopub_socket.send(execute_input_msg)
619 620 # self.session.send(self.iopub_socket, u'execute_input', {u'code':code},parent=parent)
620 621 md = self._make_metadata(parent['metadata'])
621 622 try:
622 623 working = shell.user_ns
623 624
624 625 prefix = "_"+str(msg_id).replace("-","")+"_"
625 626
626 627 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
627 628
628 629 fname = getattr(f, '__name__', 'f')
629 630
630 631 fname = prefix+"f"
631 632 argname = prefix+"args"
632 633 kwargname = prefix+"kwargs"
633 634 resultname = prefix+"result"
634 635
635 636 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
636 637 # print ns
637 638 working.update(ns)
638 639 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
639 640 try:
640 641 exec(code, shell.user_global_ns, shell.user_ns)
641 642 result = working.get(resultname)
642 643 finally:
643 644 for key in ns:
644 645 working.pop(key)
645 646
646 647 result_buf = serialize_object(result,
647 648 buffer_threshold=self.session.buffer_threshold,
648 649 item_threshold=self.session.item_threshold,
649 650 )
650 651
651 652 except:
652 653 # invoke IPython traceback formatting
653 654 shell.showtraceback()
654 655 # FIXME - fish exception info out of shell, possibly left there by
655 656 # run_code. We'll need to clean up this logic later.
656 657 reply_content = {}
657 658 if shell._reply_content is not None:
658 659 reply_content.update(shell._reply_content)
659 660 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
660 661 reply_content['engine_info'] = e_info
661 662 # reset after use
662 663 shell._reply_content = None
663 664
664 665 self.session.send(self.iopub_socket, u'error', reply_content, parent=parent,
665 666 ident=self._topic('error'))
666 667 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
667 668 result_buf = []
668 669
669 670 if reply_content['ename'] == 'UnmetDependency':
670 671 md['dependencies_met'] = False
671 672 else:
672 673 reply_content = {'status' : 'ok'}
673 674
674 675 # put 'ok'/'error' status in header, for scheduler introspection:
675 676 md['status'] = reply_content['status']
676 677
677 678 # flush i/o
678 679 sys.stdout.flush()
679 680 sys.stderr.flush()
680 681
681 682 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
682 683 parent=parent, ident=ident,buffers=result_buf, metadata=md)
683 684
684 685 self._publish_status(u'idle', parent)
685 686
686 687 #---------------------------------------------------------------------------
687 688 # Control messages
688 689 #---------------------------------------------------------------------------
689 690
690 691 def abort_request(self, stream, ident, parent):
691 692 """abort a specifig msg by id"""
692 693 msg_ids = parent['content'].get('msg_ids', None)
693 694 if isinstance(msg_ids, string_types):
694 695 msg_ids = [msg_ids]
695 696 if not msg_ids:
696 697 self.abort_queues()
697 698 for mid in msg_ids:
698 699 self.aborted.add(str(mid))
699 700
700 701 content = dict(status='ok')
701 702 reply_msg = self.session.send(stream, 'abort_reply', content=content,
702 703 parent=parent, ident=ident)
703 704 self.log.debug("%s", reply_msg)
704 705
705 706 def clear_request(self, stream, idents, parent):
706 707 """Clear our namespace."""
707 708 self.shell.reset(False)
708 709 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
709 710 content = dict(status='ok'))
710 711
711 712
712 713 #---------------------------------------------------------------------------
713 714 # Protected interface
714 715 #---------------------------------------------------------------------------
715 716
716 717 def _wrap_exception(self, method=None):
717 718 # import here, because _wrap_exception is only used in parallel,
718 719 # and parallel has higher min pyzmq version
719 720 from IPython.parallel.error import wrap_exception
720 721 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
721 722 content = wrap_exception(e_info)
722 723 return content
723 724
724 725 def _topic(self, topic):
725 726 """prefixed topic for IOPub messages"""
726 727 if self.int_id >= 0:
727 728 base = "engine.%i" % self.int_id
728 729 else:
729 730 base = "kernel.%s" % self.ident
730 731
731 732 return py3compat.cast_bytes("%s.%s" % (base, topic))
732 733
733 734 def _abort_queues(self):
734 735 for stream in self.shell_streams:
735 736 if stream:
736 737 self._abort_queue(stream)
737 738
738 739 def _abort_queue(self, stream):
739 740 poller = zmq.Poller()
740 741 poller.register(stream.socket, zmq.POLLIN)
741 742 while True:
742 743 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
743 744 if msg is None:
744 745 return
745 746
746 747 self.log.info("Aborting:")
747 748 self.log.info("%s", msg)
748 749 msg_type = msg['header']['msg_type']
749 750 reply_type = msg_type.split('_')[0] + '_reply'
750 751
751 752 status = {'status' : 'aborted'}
752 753 md = {'engine' : self.ident}
753 754 md.update(status)
754 755 reply_msg = self.session.send(stream, reply_type, metadata=md,
755 756 content=status, parent=msg, ident=idents)
756 757 self.log.debug("%s", reply_msg)
757 758 # We need to wait a bit for requests to come in. This can probably
758 759 # be set shorter for true asynchronous clients.
759 760 poller.poll(50)
760 761
761 762
762 763 def _no_raw_input(self):
763 764 """Raise StdinNotImplentedError if active frontend doesn't support
764 765 stdin."""
765 766 raise StdinNotImplementedError("raw_input was called, but this "
766 767 "frontend does not support stdin.")
767 768
768 769 def getpass(self, prompt=''):
769 770 """Forward getpass to frontends
770 771
771 772 Raises
772 773 ------
773 774 StdinNotImplentedError if active frontend doesn't support stdin.
774 775 """
775 776 if not self._allow_stdin:
776 777 raise StdinNotImplementedError(
777 778 "getpass was called, but this frontend does not support input requests."
778 779 )
779 780 return self._input_request(prompt,
780 781 self._parent_ident,
781 782 self._parent_header,
782 783 password=True,
783 784 )
784 785
785 786 def raw_input(self, prompt=''):
786 787 """Forward raw_input to frontends
787 788
788 789 Raises
789 790 ------
790 791 StdinNotImplentedError if active frontend doesn't support stdin.
791 792 """
792 793 if not self._allow_stdin:
793 794 raise StdinNotImplementedError(
794 795 "raw_input was called, but this frontend does not support input requests."
795 796 )
796 797 return self._input_request(prompt,
797 798 self._parent_ident,
798 799 self._parent_header,
799 800 password=False,
800 801 )
801 802
802 803 def _input_request(self, prompt, ident, parent, password=False):
803 804 # Flush output before making the request.
804 805 sys.stderr.flush()
805 806 sys.stdout.flush()
806 807 # flush the stdin socket, to purge stale replies
807 808 while True:
808 809 try:
809 810 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
810 811 except zmq.ZMQError as e:
811 812 if e.errno == zmq.EAGAIN:
812 813 break
813 814 else:
814 815 raise
815 816
816 817 # Send the input request.
817 818 content = json_clean(dict(prompt=prompt, password=password))
818 819 self.session.send(self.stdin_socket, u'input_request', content, parent,
819 820 ident=ident)
820 821
821 822 # Await a response.
822 823 while True:
823 824 try:
824 825 ident, reply = self.session.recv(self.stdin_socket, 0)
825 826 except Exception:
826 827 self.log.warn("Invalid Message:", exc_info=True)
827 828 except KeyboardInterrupt:
828 829 # re-raise KeyboardInterrupt, to truncate traceback
829 830 raise KeyboardInterrupt
830 831 else:
831 832 break
832 833 try:
833 834 value = py3compat.unicode_to_str(reply['content']['value'])
834 835 except:
835 836 self.log.error("Bad input_reply: %s", parent)
836 837 value = ''
837 838 if value == '\x04':
838 839 # EOF
839 840 raise EOFError
840 841 return value
841 842
842 843 def _at_shutdown(self):
843 844 """Actions taken at shutdown by the kernel, called by python's atexit.
844 845 """
845 846 # io.rprint("Kernel at_shutdown") # dbg
846 847 if self._shutdown_message is not None:
847 848 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
848 849 self.log.debug("%s", self._shutdown_message)
849 850 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
850 851
General Comments 0
You need to be logged in to leave comments. Login now