##// END OF EJS Templates
Backport PR #6374: fix _abort_queues typo
Thomas Kluyver -
Show More
@@ -1,800 +1,800 b''
1 1 #!/usr/bin/env python
2 2 """An interactive kernel that talks to frontends over 0MQ."""
3 3
4 4 #-----------------------------------------------------------------------------
5 5 # Imports
6 6 #-----------------------------------------------------------------------------
7 7 from __future__ import print_function
8 8
9 9 # Standard library imports
10 10 import sys
11 11 import time
12 12 import traceback
13 13 import logging
14 14 import uuid
15 15
16 16 from datetime import datetime
17 17 from signal import (
18 18 signal, default_int_handler, SIGINT
19 19 )
20 20
21 21 # System library imports
22 22 import zmq
23 23 from zmq.eventloop import ioloop
24 24 from zmq.eventloop.zmqstream import ZMQStream
25 25
26 26 # Local imports
27 27 from IPython.config.configurable import Configurable
28 28 from IPython.core.error import StdinNotImplementedError
29 29 from IPython.core import release
30 30 from IPython.utils import py3compat
31 31 from IPython.utils.py3compat import builtin_mod, unicode_type, string_types
32 32 from IPython.utils.jsonutil import json_clean
33 33 from IPython.utils.traitlets import (
34 34 Any, Instance, Float, Dict, List, Set, Integer, Unicode,
35 35 Type, Bool,
36 36 )
37 37
38 38 from .serialize import serialize_object, unpack_apply_message
39 39 from .session import Session
40 40 from .zmqshell import ZMQInteractiveShell
41 41
42 42
43 43 #-----------------------------------------------------------------------------
44 44 # Main kernel class
45 45 #-----------------------------------------------------------------------------
46 46
47 47 protocol_version = list(release.kernel_protocol_version_info)
48 48 ipython_version = list(release.version_info)
49 49 language_version = list(sys.version_info[:3])
50 50
51 51
52 52 class Kernel(Configurable):
53 53
54 54 #---------------------------------------------------------------------------
55 55 # Kernel interface
56 56 #---------------------------------------------------------------------------
57 57
58 58 # attribute to override with a GUI
59 59 eventloop = Any(None)
60 60 def _eventloop_changed(self, name, old, new):
61 61 """schedule call to eventloop from IOLoop"""
62 62 loop = ioloop.IOLoop.instance()
63 63 loop.add_callback(self.enter_eventloop)
64 64
65 65 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
66 66 shell_class = Type(ZMQInteractiveShell)
67 67
68 68 session = Instance(Session)
69 69 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
70 70 shell_streams = List()
71 71 control_stream = Instance(ZMQStream)
72 72 iopub_socket = Instance(zmq.Socket)
73 73 stdin_socket = Instance(zmq.Socket)
74 74 log = Instance(logging.Logger)
75 75
76 76 user_module = Any()
77 77 def _user_module_changed(self, name, old, new):
78 78 if self.shell is not None:
79 79 self.shell.user_module = new
80 80
81 81 user_ns = Instance(dict, args=None, allow_none=True)
82 82 def _user_ns_changed(self, name, old, new):
83 83 if self.shell is not None:
84 84 self.shell.user_ns = new
85 85 self.shell.init_user_ns()
86 86
87 87 # identities:
88 88 int_id = Integer(-1)
89 89 ident = Unicode()
90 90
91 91 def _ident_default(self):
92 92 return unicode_type(uuid.uuid4())
93 93
94 94 # Private interface
95 95
96 96 _darwin_app_nap = Bool(True, config=True,
97 97 help="""Whether to use appnope for compatiblity with OS X App Nap.
98 98
99 99 Only affects OS X >= 10.9.
100 100 """
101 101 )
102 102
103 103 # Time to sleep after flushing the stdout/err buffers in each execute
104 104 # cycle. While this introduces a hard limit on the minimal latency of the
105 105 # execute cycle, it helps prevent output synchronization problems for
106 106 # clients.
107 107 # Units are in seconds. The minimum zmq latency on local host is probably
108 108 # ~150 microseconds, set this to 500us for now. We may need to increase it
109 109 # a little if it's not enough after more interactive testing.
110 110 _execute_sleep = Float(0.0005, config=True)
111 111
112 112 # Frequency of the kernel's event loop.
113 113 # Units are in seconds, kernel subclasses for GUI toolkits may need to
114 114 # adapt to milliseconds.
115 115 _poll_interval = Float(0.05, config=True)
116 116
117 117 # If the shutdown was requested over the network, we leave here the
118 118 # necessary reply message so it can be sent by our registered atexit
119 119 # handler. This ensures that the reply is only sent to clients truly at
120 120 # the end of our shutdown process (which happens after the underlying
121 121 # IPython shell's own shutdown).
122 122 _shutdown_message = None
123 123
124 124 # This is a dict of port number that the kernel is listening on. It is set
125 125 # by record_ports and used by connect_request.
126 126 _recorded_ports = Dict()
127 127
128 128 # A reference to the Python builtin 'raw_input' function.
129 129 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
130 130 _sys_raw_input = Any()
131 131 _sys_eval_input = Any()
132 132
133 133 # set of aborted msg_ids
134 134 aborted = Set()
135 135
136 136
137 137 def __init__(self, **kwargs):
138 138 super(Kernel, self).__init__(**kwargs)
139 139
140 140 # Initialize the InteractiveShell subclass
141 141 self.shell = self.shell_class.instance(parent=self,
142 142 profile_dir = self.profile_dir,
143 143 user_module = self.user_module,
144 144 user_ns = self.user_ns,
145 145 kernel = self,
146 146 )
147 147 self.shell.displayhook.session = self.session
148 148 self.shell.displayhook.pub_socket = self.iopub_socket
149 149 self.shell.displayhook.topic = self._topic('pyout')
150 150 self.shell.display_pub.session = self.session
151 151 self.shell.display_pub.pub_socket = self.iopub_socket
152 152 self.shell.data_pub.session = self.session
153 153 self.shell.data_pub.pub_socket = self.iopub_socket
154 154
155 155 # TMP - hack while developing
156 156 self.shell._reply_content = None
157 157
158 158 # Build dict of handlers for message types
159 159 msg_types = [ 'execute_request', 'complete_request',
160 160 'object_info_request', 'history_request',
161 161 'kernel_info_request',
162 162 'connect_request', 'shutdown_request',
163 163 'apply_request',
164 164 ]
165 165 self.shell_handlers = {}
166 166 for msg_type in msg_types:
167 167 self.shell_handlers[msg_type] = getattr(self, msg_type)
168 168
169 169 comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]
170 170 comm_manager = self.shell.comm_manager
171 171 for msg_type in comm_msg_types:
172 172 self.shell_handlers[msg_type] = getattr(comm_manager, msg_type)
173 173
174 174 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
175 175 self.control_handlers = {}
176 176 for msg_type in control_msg_types:
177 177 self.control_handlers[msg_type] = getattr(self, msg_type)
178 178
179 179
180 180 def dispatch_control(self, msg):
181 181 """dispatch control requests"""
182 182 idents,msg = self.session.feed_identities(msg, copy=False)
183 183 try:
184 184 msg = self.session.unserialize(msg, content=True, copy=False)
185 185 except:
186 186 self.log.error("Invalid Control Message", exc_info=True)
187 187 return
188 188
189 189 self.log.debug("Control received: %s", msg)
190 190
191 191 header = msg['header']
192 192 msg_id = header['msg_id']
193 193 msg_type = header['msg_type']
194 194
195 195 handler = self.control_handlers.get(msg_type, None)
196 196 if handler is None:
197 197 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
198 198 else:
199 199 try:
200 200 handler(self.control_stream, idents, msg)
201 201 except Exception:
202 202 self.log.error("Exception in control handler:", exc_info=True)
203 203
204 204 def dispatch_shell(self, stream, msg):
205 205 """dispatch shell requests"""
206 206 # flush control requests first
207 207 if self.control_stream:
208 208 self.control_stream.flush()
209 209
210 210 idents,msg = self.session.feed_identities(msg, copy=False)
211 211 try:
212 212 msg = self.session.unserialize(msg, content=True, copy=False)
213 213 except:
214 214 self.log.error("Invalid Message", exc_info=True)
215 215 return
216 216
217 217 header = msg['header']
218 218 msg_id = header['msg_id']
219 219 msg_type = msg['header']['msg_type']
220 220
221 221 # Print some info about this message and leave a '--->' marker, so it's
222 222 # easier to trace visually the message chain when debugging. Each
223 223 # handler prints its message at the end.
224 224 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
225 225 self.log.debug(' Content: %s\n --->\n ', msg['content'])
226 226
227 227 if msg_id in self.aborted:
228 228 self.aborted.remove(msg_id)
229 229 # is it safe to assume a msg_id will not be resubmitted?
230 230 reply_type = msg_type.split('_')[0] + '_reply'
231 231 status = {'status' : 'aborted'}
232 232 md = {'engine' : self.ident}
233 233 md.update(status)
234 234 reply_msg = self.session.send(stream, reply_type, metadata=md,
235 235 content=status, parent=msg, ident=idents)
236 236 return
237 237
238 238 handler = self.shell_handlers.get(msg_type, None)
239 239 if handler is None:
240 240 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
241 241 else:
242 242 # ensure default_int_handler during handler call
243 243 sig = signal(SIGINT, default_int_handler)
244 244 try:
245 245 handler(stream, idents, msg)
246 246 except Exception:
247 247 self.log.error("Exception in message handler:", exc_info=True)
248 248 finally:
249 249 signal(SIGINT, sig)
250 250
251 251 def enter_eventloop(self):
252 252 """enter eventloop"""
253 253 self.log.info("entering eventloop %s", self.eventloop)
254 254 for stream in self.shell_streams:
255 255 # flush any pending replies,
256 256 # which may be skipped by entering the eventloop
257 257 stream.flush(zmq.POLLOUT)
258 258 # restore default_int_handler
259 259 signal(SIGINT, default_int_handler)
260 260 while self.eventloop is not None:
261 261 try:
262 262 self.eventloop(self)
263 263 except KeyboardInterrupt:
264 264 # Ctrl-C shouldn't crash the kernel
265 265 self.log.error("KeyboardInterrupt caught in kernel")
266 266 continue
267 267 else:
268 268 # eventloop exited cleanly, this means we should stop (right?)
269 269 self.eventloop = None
270 270 break
271 271 self.log.info("exiting eventloop")
272 272
273 273 def start(self):
274 274 """register dispatchers for streams"""
275 275 self.shell.exit_now = False
276 276 if self.control_stream:
277 277 self.control_stream.on_recv(self.dispatch_control, copy=False)
278 278
279 279 def make_dispatcher(stream):
280 280 def dispatcher(msg):
281 281 return self.dispatch_shell(stream, msg)
282 282 return dispatcher
283 283
284 284 for s in self.shell_streams:
285 285 s.on_recv(make_dispatcher(s), copy=False)
286 286
287 287 # publish idle status
288 288 self._publish_status('starting')
289 289
290 290 def do_one_iteration(self):
291 291 """step eventloop just once"""
292 292 if self.control_stream:
293 293 self.control_stream.flush()
294 294 for stream in self.shell_streams:
295 295 # handle at most one request per iteration
296 296 stream.flush(zmq.POLLIN, 1)
297 297 stream.flush(zmq.POLLOUT)
298 298
299 299
300 300 def record_ports(self, ports):
301 301 """Record the ports that this kernel is using.
302 302
303 303 The creator of the Kernel instance must call this methods if they
304 304 want the :meth:`connect_request` method to return the port numbers.
305 305 """
306 306 self._recorded_ports = ports
307 307
308 308 #---------------------------------------------------------------------------
309 309 # Kernel request handlers
310 310 #---------------------------------------------------------------------------
311 311
312 312 def _make_metadata(self, other=None):
313 313 """init metadata dict, for execute/apply_reply"""
314 314 new_md = {
315 315 'dependencies_met' : True,
316 316 'engine' : self.ident,
317 317 'started': datetime.now(),
318 318 }
319 319 if other:
320 320 new_md.update(other)
321 321 return new_md
322 322
323 323 def _publish_pyin(self, code, parent, execution_count):
324 324 """Publish the code request on the pyin stream."""
325 325
326 326 self.session.send(self.iopub_socket, u'pyin',
327 327 {u'code':code, u'execution_count': execution_count},
328 328 parent=parent, ident=self._topic('pyin')
329 329 )
330 330
331 331 def _publish_status(self, status, parent=None):
332 332 """send status (busy/idle) on IOPub"""
333 333 self.session.send(self.iopub_socket,
334 334 u'status',
335 335 {u'execution_state': status},
336 336 parent=parent,
337 337 ident=self._topic('status'),
338 338 )
339 339
340 340
341 341 def execute_request(self, stream, ident, parent):
342 342 """handle an execute_request"""
343 343
344 344 self._publish_status(u'busy', parent)
345 345
346 346 try:
347 347 content = parent[u'content']
348 348 code = py3compat.cast_unicode_py2(content[u'code'])
349 349 silent = content[u'silent']
350 350 store_history = content.get(u'store_history', not silent)
351 351 except:
352 352 self.log.error("Got bad msg: ")
353 353 self.log.error("%s", parent)
354 354 return
355 355
356 356 md = self._make_metadata(parent['metadata'])
357 357
358 358 shell = self.shell # we'll need this a lot here
359 359
360 360 # Replace raw_input. Note that is not sufficient to replace
361 361 # raw_input in the user namespace.
362 362 if content.get('allow_stdin', False):
363 363 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
364 364 input = lambda prompt='': eval(raw_input(prompt))
365 365 else:
366 366 raw_input = input = lambda prompt='' : self._no_raw_input()
367 367
368 368 if py3compat.PY3:
369 369 self._sys_raw_input = builtin_mod.input
370 370 builtin_mod.input = raw_input
371 371 else:
372 372 self._sys_raw_input = builtin_mod.raw_input
373 373 self._sys_eval_input = builtin_mod.input
374 374 builtin_mod.raw_input = raw_input
375 375 builtin_mod.input = input
376 376
377 377 # Set the parent message of the display hook and out streams.
378 378 shell.set_parent(parent)
379 379
380 380 # Re-broadcast our input for the benefit of listening clients, and
381 381 # start computing output
382 382 if not silent:
383 383 self._publish_pyin(code, parent, shell.execution_count)
384 384
385 385 reply_content = {}
386 386 # FIXME: the shell calls the exception handler itself.
387 387 shell._reply_content = None
388 388 try:
389 389 shell.run_cell(code, store_history=store_history, silent=silent)
390 390 except:
391 391 status = u'error'
392 392 # FIXME: this code right now isn't being used yet by default,
393 393 # because the run_cell() call above directly fires off exception
394 394 # reporting. This code, therefore, is only active in the scenario
395 395 # where runlines itself has an unhandled exception. We need to
396 396 # uniformize this, for all exception construction to come from a
397 397 # single location in the codbase.
398 398 etype, evalue, tb = sys.exc_info()
399 399 tb_list = traceback.format_exception(etype, evalue, tb)
400 400 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
401 401 else:
402 402 status = u'ok'
403 403 finally:
404 404 # Restore raw_input.
405 405 if py3compat.PY3:
406 406 builtin_mod.input = self._sys_raw_input
407 407 else:
408 408 builtin_mod.raw_input = self._sys_raw_input
409 409 builtin_mod.input = self._sys_eval_input
410 410
411 411 reply_content[u'status'] = status
412 412
413 413 # Return the execution counter so clients can display prompts
414 414 reply_content['execution_count'] = shell.execution_count - 1
415 415
416 416 # FIXME - fish exception info out of shell, possibly left there by
417 417 # runlines. We'll need to clean up this logic later.
418 418 if shell._reply_content is not None:
419 419 reply_content.update(shell._reply_content)
420 420 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
421 421 reply_content['engine_info'] = e_info
422 422 # reset after use
423 423 shell._reply_content = None
424 424
425 425 if 'traceback' in reply_content:
426 426 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
427 427
428 428
429 429 # At this point, we can tell whether the main code execution succeeded
430 430 # or not. If it did, we proceed to evaluate user_variables/expressions
431 431 if reply_content['status'] == 'ok':
432 432 reply_content[u'user_variables'] = \
433 433 shell.user_variables(content.get(u'user_variables', []))
434 434 reply_content[u'user_expressions'] = \
435 435 shell.user_expressions(content.get(u'user_expressions', {}))
436 436 else:
437 437 # If there was an error, don't even try to compute variables or
438 438 # expressions
439 439 reply_content[u'user_variables'] = {}
440 440 reply_content[u'user_expressions'] = {}
441 441
442 442 # Payloads should be retrieved regardless of outcome, so we can both
443 443 # recover partial output (that could have been generated early in a
444 444 # block, before an error) and clear the payload system always.
445 445 reply_content[u'payload'] = shell.payload_manager.read_payload()
446 446 # Be agressive about clearing the payload because we don't want
447 447 # it to sit in memory until the next execute_request comes in.
448 448 shell.payload_manager.clear_payload()
449 449
450 450 # Flush output before sending the reply.
451 451 sys.stdout.flush()
452 452 sys.stderr.flush()
453 453 # FIXME: on rare occasions, the flush doesn't seem to make it to the
454 454 # clients... This seems to mitigate the problem, but we definitely need
455 455 # to better understand what's going on.
456 456 if self._execute_sleep:
457 457 time.sleep(self._execute_sleep)
458 458
459 459 # Send the reply.
460 460 reply_content = json_clean(reply_content)
461 461
462 462 md['status'] = reply_content['status']
463 463 if reply_content['status'] == 'error' and \
464 464 reply_content['ename'] == 'UnmetDependency':
465 465 md['dependencies_met'] = False
466 466
467 467 reply_msg = self.session.send(stream, u'execute_reply',
468 468 reply_content, parent, metadata=md,
469 469 ident=ident)
470 470
471 471 self.log.debug("%s", reply_msg)
472 472
473 473 if not silent and reply_msg['content']['status'] == u'error':
474 474 self._abort_queues()
475 475
476 476 self._publish_status(u'idle', parent)
477 477
478 478 def complete_request(self, stream, ident, parent):
479 479 txt, matches = self._complete(parent)
480 480 matches = {'matches' : matches,
481 481 'matched_text' : txt,
482 482 'status' : 'ok'}
483 483 matches = json_clean(matches)
484 484 completion_msg = self.session.send(stream, 'complete_reply',
485 485 matches, parent, ident)
486 486 self.log.debug("%s", completion_msg)
487 487
488 488 def object_info_request(self, stream, ident, parent):
489 489 content = parent['content']
490 490 object_info = self.shell.object_inspect(content['oname'],
491 491 detail_level = content.get('detail_level', 0)
492 492 )
493 493 # Before we send this object over, we scrub it for JSON usage
494 494 oinfo = json_clean(object_info)
495 495 msg = self.session.send(stream, 'object_info_reply',
496 496 oinfo, parent, ident)
497 497 self.log.debug("%s", msg)
498 498
499 499 def history_request(self, stream, ident, parent):
500 500 # We need to pull these out, as passing **kwargs doesn't work with
501 501 # unicode keys before Python 2.6.5.
502 502 hist_access_type = parent['content']['hist_access_type']
503 503 raw = parent['content']['raw']
504 504 output = parent['content']['output']
505 505 if hist_access_type == 'tail':
506 506 n = parent['content']['n']
507 507 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
508 508 include_latest=True)
509 509
510 510 elif hist_access_type == 'range':
511 511 session = parent['content']['session']
512 512 start = parent['content']['start']
513 513 stop = parent['content']['stop']
514 514 hist = self.shell.history_manager.get_range(session, start, stop,
515 515 raw=raw, output=output)
516 516
517 517 elif hist_access_type == 'search':
518 518 n = parent['content'].get('n')
519 519 unique = parent['content'].get('unique', False)
520 520 pattern = parent['content']['pattern']
521 521 hist = self.shell.history_manager.search(
522 522 pattern, raw=raw, output=output, n=n, unique=unique)
523 523
524 524 else:
525 525 hist = []
526 526 hist = list(hist)
527 527 content = {'history' : hist}
528 528 content = json_clean(content)
529 529 msg = self.session.send(stream, 'history_reply',
530 530 content, parent, ident)
531 531 self.log.debug("Sending history reply with %i entries", len(hist))
532 532
533 533 def connect_request(self, stream, ident, parent):
534 534 if self._recorded_ports is not None:
535 535 content = self._recorded_ports.copy()
536 536 else:
537 537 content = {}
538 538 msg = self.session.send(stream, 'connect_reply',
539 539 content, parent, ident)
540 540 self.log.debug("%s", msg)
541 541
542 542 def kernel_info_request(self, stream, ident, parent):
543 543 vinfo = {
544 544 'protocol_version': protocol_version,
545 545 'ipython_version': ipython_version,
546 546 'language_version': language_version,
547 547 'language': 'python',
548 548 }
549 549 msg = self.session.send(stream, 'kernel_info_reply',
550 550 vinfo, parent, ident)
551 551 self.log.debug("%s", msg)
552 552
553 553 def shutdown_request(self, stream, ident, parent):
554 554 self.shell.exit_now = True
555 555 content = dict(status='ok')
556 556 content.update(parent['content'])
557 557 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
558 558 # same content, but different msg_id for broadcasting on IOPub
559 559 self._shutdown_message = self.session.msg(u'shutdown_reply',
560 560 content, parent
561 561 )
562 562
563 563 self._at_shutdown()
564 564 # call sys.exit after a short delay
565 565 loop = ioloop.IOLoop.instance()
566 566 loop.add_timeout(time.time()+0.1, loop.stop)
567 567
568 568 #---------------------------------------------------------------------------
569 569 # Engine methods
570 570 #---------------------------------------------------------------------------
571 571
572 572 def apply_request(self, stream, ident, parent):
573 573 try:
574 574 content = parent[u'content']
575 575 bufs = parent[u'buffers']
576 576 msg_id = parent['header']['msg_id']
577 577 except:
578 578 self.log.error("Got bad msg: %s", parent, exc_info=True)
579 579 return
580 580
581 581 self._publish_status(u'busy', parent)
582 582
583 583 # Set the parent message of the display hook and out streams.
584 584 shell = self.shell
585 585 shell.set_parent(parent)
586 586
587 587 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
588 588 # self.iopub_socket.send(pyin_msg)
589 589 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
590 590 md = self._make_metadata(parent['metadata'])
591 591 try:
592 592 working = shell.user_ns
593 593
594 594 prefix = "_"+str(msg_id).replace("-","")+"_"
595 595
596 596 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
597 597
598 598 fname = getattr(f, '__name__', 'f')
599 599
600 600 fname = prefix+"f"
601 601 argname = prefix+"args"
602 602 kwargname = prefix+"kwargs"
603 603 resultname = prefix+"result"
604 604
605 605 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
606 606 # print ns
607 607 working.update(ns)
608 608 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
609 609 try:
610 610 exec(code, shell.user_global_ns, shell.user_ns)
611 611 result = working.get(resultname)
612 612 finally:
613 613 for key in ns:
614 614 working.pop(key)
615 615
616 616 result_buf = serialize_object(result,
617 617 buffer_threshold=self.session.buffer_threshold,
618 618 item_threshold=self.session.item_threshold,
619 619 )
620 620
621 621 except:
622 622 # invoke IPython traceback formatting
623 623 shell.showtraceback()
624 624 # FIXME - fish exception info out of shell, possibly left there by
625 625 # run_code. We'll need to clean up this logic later.
626 626 reply_content = {}
627 627 if shell._reply_content is not None:
628 628 reply_content.update(shell._reply_content)
629 629 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
630 630 reply_content['engine_info'] = e_info
631 631 # reset after use
632 632 shell._reply_content = None
633 633
634 634 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
635 635 ident=self._topic('pyerr'))
636 636 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
637 637 result_buf = []
638 638
639 639 if reply_content['ename'] == 'UnmetDependency':
640 640 md['dependencies_met'] = False
641 641 else:
642 642 reply_content = {'status' : 'ok'}
643 643
644 644 # put 'ok'/'error' status in header, for scheduler introspection:
645 645 md['status'] = reply_content['status']
646 646
647 647 # flush i/o
648 648 sys.stdout.flush()
649 649 sys.stderr.flush()
650 650
651 651 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
652 652 parent=parent, ident=ident,buffers=result_buf, metadata=md)
653 653
654 654 self._publish_status(u'idle', parent)
655 655
656 656 #---------------------------------------------------------------------------
657 657 # Control messages
658 658 #---------------------------------------------------------------------------
659 659
660 660 def abort_request(self, stream, ident, parent):
661 """abort a specifig msg by id"""
661 """abort a specific msg by id"""
662 662 msg_ids = parent['content'].get('msg_ids', None)
663 663 if isinstance(msg_ids, string_types):
664 664 msg_ids = [msg_ids]
665 665 if not msg_ids:
666 self.abort_queues()
666 self._abort_queues()
667 667 for mid in msg_ids:
668 668 self.aborted.add(str(mid))
669 669
670 670 content = dict(status='ok')
671 671 reply_msg = self.session.send(stream, 'abort_reply', content=content,
672 672 parent=parent, ident=ident)
673 673 self.log.debug("%s", reply_msg)
674 674
675 675 def clear_request(self, stream, idents, parent):
676 676 """Clear our namespace."""
677 677 self.shell.reset(False)
678 678 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
679 679 content = dict(status='ok'))
680 680
681 681
682 682 #---------------------------------------------------------------------------
683 683 # Protected interface
684 684 #---------------------------------------------------------------------------
685 685
686 686 def _wrap_exception(self, method=None):
687 687 # import here, because _wrap_exception is only used in parallel,
688 688 # and parallel has higher min pyzmq version
689 689 from IPython.parallel.error import wrap_exception
690 690 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
691 691 content = wrap_exception(e_info)
692 692 return content
693 693
694 694 def _topic(self, topic):
695 695 """prefixed topic for IOPub messages"""
696 696 if self.int_id >= 0:
697 697 base = "engine.%i" % self.int_id
698 698 else:
699 699 base = "kernel.%s" % self.ident
700 700
701 701 return py3compat.cast_bytes("%s.%s" % (base, topic))
702 702
703 703 def _abort_queues(self):
704 704 for stream in self.shell_streams:
705 705 if stream:
706 706 self._abort_queue(stream)
707 707
708 708 def _abort_queue(self, stream):
709 709 poller = zmq.Poller()
710 710 poller.register(stream.socket, zmq.POLLIN)
711 711 while True:
712 712 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
713 713 if msg is None:
714 714 return
715 715
716 716 self.log.info("Aborting:")
717 717 self.log.info("%s", msg)
718 718 msg_type = msg['header']['msg_type']
719 719 reply_type = msg_type.split('_')[0] + '_reply'
720 720
721 721 status = {'status' : 'aborted'}
722 722 md = {'engine' : self.ident}
723 723 md.update(status)
724 724 reply_msg = self.session.send(stream, reply_type, metadata=md,
725 725 content=status, parent=msg, ident=idents)
726 726 self.log.debug("%s", reply_msg)
727 727 # We need to wait a bit for requests to come in. This can probably
728 728 # be set shorter for true asynchronous clients.
729 729 poller.poll(50)
730 730
731 731
732 732 def _no_raw_input(self):
733 733 """Raise StdinNotImplentedError if active frontend doesn't support
734 734 stdin."""
735 735 raise StdinNotImplementedError("raw_input was called, but this "
736 736 "frontend does not support stdin.")
737 737
738 738 def _raw_input(self, prompt, ident, parent):
739 739 # Flush output before making the request.
740 740 sys.stderr.flush()
741 741 sys.stdout.flush()
742 742 # flush the stdin socket, to purge stale replies
743 743 while True:
744 744 try:
745 745 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
746 746 except zmq.ZMQError as e:
747 747 if e.errno == zmq.EAGAIN:
748 748 break
749 749 else:
750 750 raise
751 751
752 752 # Send the input request.
753 753 content = json_clean(dict(prompt=prompt))
754 754 self.session.send(self.stdin_socket, u'input_request', content, parent,
755 755 ident=ident)
756 756
757 757 # Await a response.
758 758 while True:
759 759 try:
760 760 ident, reply = self.session.recv(self.stdin_socket, 0)
761 761 except Exception:
762 762 self.log.warn("Invalid Message:", exc_info=True)
763 763 except KeyboardInterrupt:
764 764 # re-raise KeyboardInterrupt, to truncate traceback
765 765 raise KeyboardInterrupt
766 766 else:
767 767 break
768 768 try:
769 769 value = py3compat.unicode_to_str(reply['content']['value'])
770 770 except:
771 771 self.log.error("Got bad raw_input reply: ")
772 772 self.log.error("%s", parent)
773 773 value = ''
774 774 if value == '\x04':
775 775 # EOF
776 776 raise EOFError
777 777 return value
778 778
779 779 def _complete(self, msg):
780 780 c = msg['content']
781 781 try:
782 782 cpos = int(c['cursor_pos'])
783 783 except:
784 784 # If we don't get something that we can convert to an integer, at
785 785 # least attempt the completion guessing the cursor is at the end of
786 786 # the text, if there's any, and otherwise of the line
787 787 cpos = len(c['text'])
788 788 if cpos==0:
789 789 cpos = len(c['line'])
790 790 return self.shell.complete(c['text'], c['line'], cpos)
791 791
792 792 def _at_shutdown(self):
793 793 """Actions taken at shutdown by the kernel, called by python's atexit.
794 794 """
795 795 # io.rprint("Kernel at_shutdown") # dbg
796 796 if self._shutdown_message is not None:
797 797 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
798 798 self.log.debug("%s", self._shutdown_message)
799 799 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
800 800
General Comments 0
You need to be logged in to leave comments. Login now