##// END OF EJS Templates
re-raise KeyboardInterrupt in raw_input...
MinRK -
Show More
@@ -1,803 +1,806 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 7 call set_parent on all the PUB objects with the message about to be executed.
8 8 * Implement random port and security key logic.
9 9 * Implement control messages.
10 10 * Implement event loop and poll version.
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 # Standard library imports
19 19 import __builtin__
20 20 import sys
21 21 import time
22 22 import traceback
23 23 import logging
24 24 import uuid
25 25
26 26 from datetime import datetime
27 27 from signal import (
28 28 signal, default_int_handler, SIGINT
29 29 )
30 30
31 31 # System library imports
32 32 import zmq
33 33 from zmq.eventloop import ioloop
34 34 from zmq.eventloop.zmqstream import ZMQStream
35 35
36 36 # Local imports
37 37 from IPython.config.configurable import Configurable
38 38 from IPython.core.error import StdinNotImplementedError
39 39 from IPython.core import release
40 40 from IPython.utils import py3compat
41 41 from IPython.utils.jsonutil import json_clean
42 42 from IPython.utils.traitlets import (
43 43 Any, Instance, Float, Dict, List, Set, Integer, Unicode,
44 44 Type
45 45 )
46 46
47 47 from serialize import serialize_object, unpack_apply_message
48 48 from session import Session
49 49 from zmqshell import ZMQInteractiveShell
50 50
51 51
52 52 #-----------------------------------------------------------------------------
53 53 # Main kernel class
54 54 #-----------------------------------------------------------------------------
55 55
56 56 protocol_version = list(release.kernel_protocol_version_info)
57 57 ipython_version = list(release.version_info)
58 58 language_version = list(sys.version_info[:3])
59 59
60 60
61 61 class Kernel(Configurable):
62 62
63 63 #---------------------------------------------------------------------------
64 64 # Kernel interface
65 65 #---------------------------------------------------------------------------
66 66
67 67 # attribute to override with a GUI
68 68 eventloop = Any(None)
69 69 def _eventloop_changed(self, name, old, new):
70 70 """schedule call to eventloop from IOLoop"""
71 71 loop = ioloop.IOLoop.instance()
72 72 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
73 73
74 74 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
75 75 shell_class = Type(ZMQInteractiveShell)
76 76
77 77 session = Instance(Session)
78 78 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
79 79 shell_streams = List()
80 80 control_stream = Instance(ZMQStream)
81 81 iopub_socket = Instance(zmq.Socket)
82 82 stdin_socket = Instance(zmq.Socket)
83 83 log = Instance(logging.Logger)
84 84
85 85 user_module = Any()
86 86 def _user_module_changed(self, name, old, new):
87 87 if self.shell is not None:
88 88 self.shell.user_module = new
89 89
90 90 user_ns = Dict(default_value=None)
91 91 def _user_ns_changed(self, name, old, new):
92 92 if self.shell is not None:
93 93 self.shell.user_ns = new
94 94 self.shell.init_user_ns()
95 95
96 96 # identities:
97 97 int_id = Integer(-1)
98 98 ident = Unicode()
99 99
100 100 def _ident_default(self):
101 101 return unicode(uuid.uuid4())
102 102
103 103
104 104 # Private interface
105 105
106 106 # Time to sleep after flushing the stdout/err buffers in each execute
107 107 # cycle. While this introduces a hard limit on the minimal latency of the
108 108 # execute cycle, it helps prevent output synchronization problems for
109 109 # clients.
110 110 # Units are in seconds. The minimum zmq latency on local host is probably
111 111 # ~150 microseconds, set this to 500us for now. We may need to increase it
112 112 # a little if it's not enough after more interactive testing.
113 113 _execute_sleep = Float(0.0005, config=True)
114 114
115 115 # Frequency of the kernel's event loop.
116 116 # Units are in seconds, kernel subclasses for GUI toolkits may need to
117 117 # adapt to milliseconds.
118 118 _poll_interval = Float(0.05, config=True)
119 119
120 120 # If the shutdown was requested over the network, we leave here the
121 121 # necessary reply message so it can be sent by our registered atexit
122 122 # handler. This ensures that the reply is only sent to clients truly at
123 123 # the end of our shutdown process (which happens after the underlying
124 124 # IPython shell's own shutdown).
125 125 _shutdown_message = None
126 126
127 127 # This is a dict of port number that the kernel is listening on. It is set
128 128 # by record_ports and used by connect_request.
129 129 _recorded_ports = Dict()
130 130
131 131 # A reference to the Python builtin 'raw_input' function.
132 132 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
133 133 _sys_raw_input = Any()
134 134
135 135 # set of aborted msg_ids
136 136 aborted = Set()
137 137
138 138
139 139 def __init__(self, **kwargs):
140 140 super(Kernel, self).__init__(**kwargs)
141 141
142 142 # Initialize the InteractiveShell subclass
143 143 self.shell = self.shell_class.instance(parent=self,
144 144 profile_dir = self.profile_dir,
145 145 user_module = self.user_module,
146 146 user_ns = self.user_ns,
147 147 )
148 148 self.shell.displayhook.session = self.session
149 149 self.shell.displayhook.pub_socket = self.iopub_socket
150 150 self.shell.displayhook.topic = self._topic('pyout')
151 151 self.shell.display_pub.session = self.session
152 152 self.shell.display_pub.pub_socket = self.iopub_socket
153 153 self.shell.data_pub.session = self.session
154 154 self.shell.data_pub.pub_socket = self.iopub_socket
155 155
156 156 # TMP - hack while developing
157 157 self.shell._reply_content = None
158 158
159 159 # Build dict of handlers for message types
160 160 msg_types = [ 'execute_request', 'complete_request',
161 161 'object_info_request', 'history_request',
162 162 'kernel_info_request',
163 163 'connect_request', 'shutdown_request',
164 164 'apply_request',
165 165 ]
166 166 self.shell_handlers = {}
167 167 for msg_type in msg_types:
168 168 self.shell_handlers[msg_type] = getattr(self, msg_type)
169 169
170 170 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
171 171 self.control_handlers = {}
172 172 for msg_type in control_msg_types:
173 173 self.control_handlers[msg_type] = getattr(self, msg_type)
174 174
175 175 def dispatch_control(self, msg):
176 176 """dispatch control requests"""
177 177 idents,msg = self.session.feed_identities(msg, copy=False)
178 178 try:
179 179 msg = self.session.unserialize(msg, content=True, copy=False)
180 180 except:
181 181 self.log.error("Invalid Control Message", exc_info=True)
182 182 return
183 183
184 184 self.log.debug("Control received: %s", msg)
185 185
186 186 header = msg['header']
187 187 msg_id = header['msg_id']
188 188 msg_type = header['msg_type']
189 189
190 190 handler = self.control_handlers.get(msg_type, None)
191 191 if handler is None:
192 192 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
193 193 else:
194 194 try:
195 195 handler(self.control_stream, idents, msg)
196 196 except Exception:
197 197 self.log.error("Exception in control handler:", exc_info=True)
198 198
199 199 def dispatch_shell(self, stream, msg):
200 200 """dispatch shell requests"""
201 201 # flush control requests first
202 202 if self.control_stream:
203 203 self.control_stream.flush()
204 204
205 205 idents,msg = self.session.feed_identities(msg, copy=False)
206 206 try:
207 207 msg = self.session.unserialize(msg, content=True, copy=False)
208 208 except:
209 209 self.log.error("Invalid Message", exc_info=True)
210 210 return
211 211
212 212 header = msg['header']
213 213 msg_id = header['msg_id']
214 214 msg_type = msg['header']['msg_type']
215 215
216 216 # Print some info about this message and leave a '--->' marker, so it's
217 217 # easier to trace visually the message chain when debugging. Each
218 218 # handler prints its message at the end.
219 219 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
220 220 self.log.debug(' Content: %s\n --->\n ', msg['content'])
221 221
222 222 if msg_id in self.aborted:
223 223 self.aborted.remove(msg_id)
224 224 # is it safe to assume a msg_id will not be resubmitted?
225 225 reply_type = msg_type.split('_')[0] + '_reply'
226 226 status = {'status' : 'aborted'}
227 227 md = {'engine' : self.ident}
228 228 md.update(status)
229 229 reply_msg = self.session.send(stream, reply_type, metadata=md,
230 230 content=status, parent=msg, ident=idents)
231 231 return
232 232
233 233 handler = self.shell_handlers.get(msg_type, None)
234 234 if handler is None:
235 235 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
236 236 else:
237 237 # ensure default_int_handler during handler call
238 238 sig = signal(SIGINT, default_int_handler)
239 239 try:
240 240 handler(stream, idents, msg)
241 241 except Exception:
242 242 self.log.error("Exception in message handler:", exc_info=True)
243 243 finally:
244 244 signal(SIGINT, sig)
245 245
246 246 def enter_eventloop(self):
247 247 """enter eventloop"""
248 248 self.log.info("entering eventloop")
249 249 # restore default_int_handler
250 250 signal(SIGINT, default_int_handler)
251 251 while self.eventloop is not None:
252 252 try:
253 253 self.eventloop(self)
254 254 except KeyboardInterrupt:
255 255 # Ctrl-C shouldn't crash the kernel
256 256 self.log.error("KeyboardInterrupt caught in kernel")
257 257 continue
258 258 else:
259 259 # eventloop exited cleanly, this means we should stop (right?)
260 260 self.eventloop = None
261 261 break
262 262 self.log.info("exiting eventloop")
263 263
264 264 def start(self):
265 265 """register dispatchers for streams"""
266 266 self.shell.exit_now = False
267 267 if self.control_stream:
268 268 self.control_stream.on_recv(self.dispatch_control, copy=False)
269 269
270 270 def make_dispatcher(stream):
271 271 def dispatcher(msg):
272 272 return self.dispatch_shell(stream, msg)
273 273 return dispatcher
274 274
275 275 for s in self.shell_streams:
276 276 s.on_recv(make_dispatcher(s), copy=False)
277 277
278 278 # publish idle status
279 279 self._publish_status('starting')
280 280
281 281 def do_one_iteration(self):
282 282 """step eventloop just once"""
283 283 if self.control_stream:
284 284 self.control_stream.flush()
285 285 for stream in self.shell_streams:
286 286 # handle at most one request per iteration
287 287 stream.flush(zmq.POLLIN, 1)
288 288 stream.flush(zmq.POLLOUT)
289 289
290 290
291 291 def record_ports(self, ports):
292 292 """Record the ports that this kernel is using.
293 293
294 294 The creator of the Kernel instance must call this methods if they
295 295 want the :meth:`connect_request` method to return the port numbers.
296 296 """
297 297 self._recorded_ports = ports
298 298
299 299 #---------------------------------------------------------------------------
300 300 # Kernel request handlers
301 301 #---------------------------------------------------------------------------
302 302
303 303 def _make_metadata(self, other=None):
304 304 """init metadata dict, for execute/apply_reply"""
305 305 new_md = {
306 306 'dependencies_met' : True,
307 307 'engine' : self.ident,
308 308 'started': datetime.now(),
309 309 }
310 310 if other:
311 311 new_md.update(other)
312 312 return new_md
313 313
314 314 def _publish_pyin(self, code, parent, execution_count):
315 315 """Publish the code request on the pyin stream."""
316 316
317 317 self.session.send(self.iopub_socket, u'pyin',
318 318 {u'code':code, u'execution_count': execution_count},
319 319 parent=parent, ident=self._topic('pyin')
320 320 )
321 321
322 322 def _publish_status(self, status, parent=None):
323 323 """send status (busy/idle) on IOPub"""
324 324 self.session.send(self.iopub_socket,
325 325 u'status',
326 326 {u'execution_state': status},
327 327 parent=parent,
328 328 ident=self._topic('status'),
329 329 )
330 330
331 331
332 332 def execute_request(self, stream, ident, parent):
333 333 """handle an execute_request"""
334 334
335 335 self._publish_status(u'busy', parent)
336 336
337 337 try:
338 338 content = parent[u'content']
339 339 code = content[u'code']
340 340 silent = content[u'silent']
341 341 store_history = content.get(u'store_history', not silent)
342 342 except:
343 343 self.log.error("Got bad msg: ")
344 344 self.log.error("%s", parent)
345 345 return
346 346
347 347 md = self._make_metadata(parent['metadata'])
348 348
349 349 shell = self.shell # we'll need this a lot here
350 350
351 351 # Replace raw_input. Note that is not sufficient to replace
352 352 # raw_input in the user namespace.
353 353 if content.get('allow_stdin', False):
354 354 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
355 355 else:
356 356 raw_input = lambda prompt='' : self._no_raw_input()
357 357
358 358 if py3compat.PY3:
359 359 self._sys_raw_input = __builtin__.input
360 360 __builtin__.input = raw_input
361 361 else:
362 362 self._sys_raw_input = __builtin__.raw_input
363 363 __builtin__.raw_input = raw_input
364 364
365 365 # Set the parent message of the display hook and out streams.
366 366 shell.displayhook.set_parent(parent)
367 367 shell.display_pub.set_parent(parent)
368 368 shell.data_pub.set_parent(parent)
369 369 try:
370 370 sys.stdout.set_parent(parent)
371 371 except AttributeError:
372 372 pass
373 373 try:
374 374 sys.stderr.set_parent(parent)
375 375 except AttributeError:
376 376 pass
377 377
378 378 # Re-broadcast our input for the benefit of listening clients, and
379 379 # start computing output
380 380 if not silent:
381 381 self._publish_pyin(code, parent, shell.execution_count)
382 382
383 383 reply_content = {}
384 384 try:
385 385 # FIXME: the shell calls the exception handler itself.
386 386 shell.run_cell(code, store_history=store_history, silent=silent)
387 387 except:
388 388 status = u'error'
389 389 # FIXME: this code right now isn't being used yet by default,
390 390 # because the run_cell() call above directly fires off exception
391 391 # reporting. This code, therefore, is only active in the scenario
392 392 # where runlines itself has an unhandled exception. We need to
393 393 # uniformize this, for all exception construction to come from a
394 394 # single location in the codbase.
395 395 etype, evalue, tb = sys.exc_info()
396 396 tb_list = traceback.format_exception(etype, evalue, tb)
397 397 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
398 398 else:
399 399 status = u'ok'
400 400 finally:
401 401 # Restore raw_input.
402 402 if py3compat.PY3:
403 403 __builtin__.input = self._sys_raw_input
404 404 else:
405 405 __builtin__.raw_input = self._sys_raw_input
406 406
407 407 reply_content[u'status'] = status
408 408
409 409 # Return the execution counter so clients can display prompts
410 410 reply_content['execution_count'] = shell.execution_count - 1
411 411
412 412 # FIXME - fish exception info out of shell, possibly left there by
413 413 # runlines. We'll need to clean up this logic later.
414 414 if shell._reply_content is not None:
415 415 reply_content.update(shell._reply_content)
416 416 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
417 417 reply_content['engine_info'] = e_info
418 418 # reset after use
419 419 shell._reply_content = None
420 420
421 421 if 'traceback' in reply_content:
422 422 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
423 423
424 424
425 425 # At this point, we can tell whether the main code execution succeeded
426 426 # or not. If it did, we proceed to evaluate user_variables/expressions
427 427 if reply_content['status'] == 'ok':
428 428 reply_content[u'user_variables'] = \
429 429 shell.user_variables(content.get(u'user_variables', []))
430 430 reply_content[u'user_expressions'] = \
431 431 shell.user_expressions(content.get(u'user_expressions', {}))
432 432 else:
433 433 # If there was an error, don't even try to compute variables or
434 434 # expressions
435 435 reply_content[u'user_variables'] = {}
436 436 reply_content[u'user_expressions'] = {}
437 437
438 438 # Payloads should be retrieved regardless of outcome, so we can both
439 439 # recover partial output (that could have been generated early in a
440 440 # block, before an error) and clear the payload system always.
441 441 reply_content[u'payload'] = shell.payload_manager.read_payload()
442 442 # Be agressive about clearing the payload because we don't want
443 443 # it to sit in memory until the next execute_request comes in.
444 444 shell.payload_manager.clear_payload()
445 445
446 446 # Flush output before sending the reply.
447 447 sys.stdout.flush()
448 448 sys.stderr.flush()
449 449 # FIXME: on rare occasions, the flush doesn't seem to make it to the
450 450 # clients... This seems to mitigate the problem, but we definitely need
451 451 # to better understand what's going on.
452 452 if self._execute_sleep:
453 453 time.sleep(self._execute_sleep)
454 454
455 455 # Send the reply.
456 456 reply_content = json_clean(reply_content)
457 457
458 458 md['status'] = reply_content['status']
459 459 if reply_content['status'] == 'error' and \
460 460 reply_content['ename'] == 'UnmetDependency':
461 461 md['dependencies_met'] = False
462 462
463 463 reply_msg = self.session.send(stream, u'execute_reply',
464 464 reply_content, parent, metadata=md,
465 465 ident=ident)
466 466
467 467 self.log.debug("%s", reply_msg)
468 468
469 469 if not silent and reply_msg['content']['status'] == u'error':
470 470 self._abort_queues()
471 471
472 472 self._publish_status(u'idle', parent)
473 473
474 474 def complete_request(self, stream, ident, parent):
475 475 txt, matches = self._complete(parent)
476 476 matches = {'matches' : matches,
477 477 'matched_text' : txt,
478 478 'status' : 'ok'}
479 479 matches = json_clean(matches)
480 480 completion_msg = self.session.send(stream, 'complete_reply',
481 481 matches, parent, ident)
482 482 self.log.debug("%s", completion_msg)
483 483
484 484 def object_info_request(self, stream, ident, parent):
485 485 content = parent['content']
486 486 object_info = self.shell.object_inspect(content['oname'],
487 487 detail_level = content.get('detail_level', 0)
488 488 )
489 489 # Before we send this object over, we scrub it for JSON usage
490 490 oinfo = json_clean(object_info)
491 491 msg = self.session.send(stream, 'object_info_reply',
492 492 oinfo, parent, ident)
493 493 self.log.debug("%s", msg)
494 494
495 495 def history_request(self, stream, ident, parent):
496 496 # We need to pull these out, as passing **kwargs doesn't work with
497 497 # unicode keys before Python 2.6.5.
498 498 hist_access_type = parent['content']['hist_access_type']
499 499 raw = parent['content']['raw']
500 500 output = parent['content']['output']
501 501 if hist_access_type == 'tail':
502 502 n = parent['content']['n']
503 503 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
504 504 include_latest=True)
505 505
506 506 elif hist_access_type == 'range':
507 507 session = parent['content']['session']
508 508 start = parent['content']['start']
509 509 stop = parent['content']['stop']
510 510 hist = self.shell.history_manager.get_range(session, start, stop,
511 511 raw=raw, output=output)
512 512
513 513 elif hist_access_type == 'search':
514 514 n = parent['content'].get('n')
515 515 unique = parent['content'].get('unique', False)
516 516 pattern = parent['content']['pattern']
517 517 hist = self.shell.history_manager.search(
518 518 pattern, raw=raw, output=output, n=n, unique=unique)
519 519
520 520 else:
521 521 hist = []
522 522 hist = list(hist)
523 523 content = {'history' : hist}
524 524 content = json_clean(content)
525 525 msg = self.session.send(stream, 'history_reply',
526 526 content, parent, ident)
527 527 self.log.debug("Sending history reply with %i entries", len(hist))
528 528
529 529 def connect_request(self, stream, ident, parent):
530 530 if self._recorded_ports is not None:
531 531 content = self._recorded_ports.copy()
532 532 else:
533 533 content = {}
534 534 msg = self.session.send(stream, 'connect_reply',
535 535 content, parent, ident)
536 536 self.log.debug("%s", msg)
537 537
538 538 def kernel_info_request(self, stream, ident, parent):
539 539 vinfo = {
540 540 'protocol_version': protocol_version,
541 541 'ipython_version': ipython_version,
542 542 'language_version': language_version,
543 543 'language': 'python',
544 544 }
545 545 msg = self.session.send(stream, 'kernel_info_reply',
546 546 vinfo, parent, ident)
547 547 self.log.debug("%s", msg)
548 548
549 549 def shutdown_request(self, stream, ident, parent):
550 550 self.shell.exit_now = True
551 551 content = dict(status='ok')
552 552 content.update(parent['content'])
553 553 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
554 554 # same content, but different msg_id for broadcasting on IOPub
555 555 self._shutdown_message = self.session.msg(u'shutdown_reply',
556 556 content, parent
557 557 )
558 558
559 559 self._at_shutdown()
560 560 # call sys.exit after a short delay
561 561 loop = ioloop.IOLoop.instance()
562 562 loop.add_timeout(time.time()+0.1, loop.stop)
563 563
564 564 #---------------------------------------------------------------------------
565 565 # Engine methods
566 566 #---------------------------------------------------------------------------
567 567
568 568 def apply_request(self, stream, ident, parent):
569 569 try:
570 570 content = parent[u'content']
571 571 bufs = parent[u'buffers']
572 572 msg_id = parent['header']['msg_id']
573 573 except:
574 574 self.log.error("Got bad msg: %s", parent, exc_info=True)
575 575 return
576 576
577 577 self._publish_status(u'busy', parent)
578 578
579 579 # Set the parent message of the display hook and out streams.
580 580 shell = self.shell
581 581 shell.displayhook.set_parent(parent)
582 582 shell.display_pub.set_parent(parent)
583 583 shell.data_pub.set_parent(parent)
584 584 try:
585 585 sys.stdout.set_parent(parent)
586 586 except AttributeError:
587 587 pass
588 588 try:
589 589 sys.stderr.set_parent(parent)
590 590 except AttributeError:
591 591 pass
592 592
593 593 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
594 594 # self.iopub_socket.send(pyin_msg)
595 595 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
596 596 md = self._make_metadata(parent['metadata'])
597 597 try:
598 598 working = shell.user_ns
599 599
600 600 prefix = "_"+str(msg_id).replace("-","")+"_"
601 601
602 602 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
603 603
604 604 fname = getattr(f, '__name__', 'f')
605 605
606 606 fname = prefix+"f"
607 607 argname = prefix+"args"
608 608 kwargname = prefix+"kwargs"
609 609 resultname = prefix+"result"
610 610
611 611 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
612 612 # print ns
613 613 working.update(ns)
614 614 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
615 615 try:
616 616 exec code in shell.user_global_ns, shell.user_ns
617 617 result = working.get(resultname)
618 618 finally:
619 619 for key in ns.iterkeys():
620 620 working.pop(key)
621 621
622 622 result_buf = serialize_object(result,
623 623 buffer_threshold=self.session.buffer_threshold,
624 624 item_threshold=self.session.item_threshold,
625 625 )
626 626
627 627 except:
628 628 # invoke IPython traceback formatting
629 629 shell.showtraceback()
630 630 # FIXME - fish exception info out of shell, possibly left there by
631 631 # run_code. We'll need to clean up this logic later.
632 632 reply_content = {}
633 633 if shell._reply_content is not None:
634 634 reply_content.update(shell._reply_content)
635 635 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
636 636 reply_content['engine_info'] = e_info
637 637 # reset after use
638 638 shell._reply_content = None
639 639
640 640 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
641 641 ident=self._topic('pyerr'))
642 642 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
643 643 result_buf = []
644 644
645 645 if reply_content['ename'] == 'UnmetDependency':
646 646 md['dependencies_met'] = False
647 647 else:
648 648 reply_content = {'status' : 'ok'}
649 649
650 650 # put 'ok'/'error' status in header, for scheduler introspection:
651 651 md['status'] = reply_content['status']
652 652
653 653 # flush i/o
654 654 sys.stdout.flush()
655 655 sys.stderr.flush()
656 656
657 657 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
658 658 parent=parent, ident=ident,buffers=result_buf, metadata=md)
659 659
660 660 self._publish_status(u'idle', parent)
661 661
662 662 #---------------------------------------------------------------------------
663 663 # Control messages
664 664 #---------------------------------------------------------------------------
665 665
666 666 def abort_request(self, stream, ident, parent):
667 667 """abort a specifig msg by id"""
668 668 msg_ids = parent['content'].get('msg_ids', None)
669 669 if isinstance(msg_ids, basestring):
670 670 msg_ids = [msg_ids]
671 671 if not msg_ids:
672 672 self.abort_queues()
673 673 for mid in msg_ids:
674 674 self.aborted.add(str(mid))
675 675
676 676 content = dict(status='ok')
677 677 reply_msg = self.session.send(stream, 'abort_reply', content=content,
678 678 parent=parent, ident=ident)
679 679 self.log.debug("%s", reply_msg)
680 680
681 681 def clear_request(self, stream, idents, parent):
682 682 """Clear our namespace."""
683 683 self.shell.reset(False)
684 684 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
685 685 content = dict(status='ok'))
686 686
687 687
688 688 #---------------------------------------------------------------------------
689 689 # Protected interface
690 690 #---------------------------------------------------------------------------
691 691
692 692 def _wrap_exception(self, method=None):
693 693 # import here, because _wrap_exception is only used in parallel,
694 694 # and parallel has higher min pyzmq version
695 695 from IPython.parallel.error import wrap_exception
696 696 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
697 697 content = wrap_exception(e_info)
698 698 return content
699 699
700 700 def _topic(self, topic):
701 701 """prefixed topic for IOPub messages"""
702 702 if self.int_id >= 0:
703 703 base = "engine.%i" % self.int_id
704 704 else:
705 705 base = "kernel.%s" % self.ident
706 706
707 707 return py3compat.cast_bytes("%s.%s" % (base, topic))
708 708
709 709 def _abort_queues(self):
710 710 for stream in self.shell_streams:
711 711 if stream:
712 712 self._abort_queue(stream)
713 713
714 714 def _abort_queue(self, stream):
715 715 poller = zmq.Poller()
716 716 poller.register(stream.socket, zmq.POLLIN)
717 717 while True:
718 718 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
719 719 if msg is None:
720 720 return
721 721
722 722 self.log.info("Aborting:")
723 723 self.log.info("%s", msg)
724 724 msg_type = msg['header']['msg_type']
725 725 reply_type = msg_type.split('_')[0] + '_reply'
726 726
727 727 status = {'status' : 'aborted'}
728 728 md = {'engine' : self.ident}
729 729 md.update(status)
730 730 reply_msg = self.session.send(stream, reply_type, metadata=md,
731 731 content=status, parent=msg, ident=idents)
732 732 self.log.debug("%s", reply_msg)
733 733 # We need to wait a bit for requests to come in. This can probably
734 734 # be set shorter for true asynchronous clients.
735 735 poller.poll(50)
736 736
737 737
738 738 def _no_raw_input(self):
739 739 """Raise StdinNotImplentedError if active frontend doesn't support
740 740 stdin."""
741 741 raise StdinNotImplementedError("raw_input was called, but this "
742 742 "frontend does not support stdin.")
743 743
744 744 def _raw_input(self, prompt, ident, parent):
745 745 # Flush output before making the request.
746 746 sys.stderr.flush()
747 747 sys.stdout.flush()
748 748 # flush the stdin socket, to purge stale replies
749 749 while True:
750 750 try:
751 751 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
752 752 except zmq.ZMQError as e:
753 753 if e.errno == zmq.EAGAIN:
754 754 break
755 755 else:
756 756 raise
757 757
758 758 # Send the input request.
759 759 content = json_clean(dict(prompt=prompt))
760 760 self.session.send(self.stdin_socket, u'input_request', content, parent,
761 761 ident=ident)
762 762
763 763 # Await a response.
764 764 while True:
765 765 try:
766 766 ident, reply = self.session.recv(self.stdin_socket, 0)
767 767 except Exception:
768 768 self.log.warn("Invalid Message:", exc_info=True)
769 except KeyboardInterrupt:
770 # re-raise KeyboardInterrupt, to truncate traceback
771 raise KeyboardInterrupt
769 772 else:
770 773 break
771 774 try:
772 775 value = py3compat.unicode_to_str(reply['content']['value'])
773 776 except:
774 777 self.log.error("Got bad raw_input reply: ")
775 778 self.log.error("%s", parent)
776 779 value = ''
777 780 if value == '\x04':
778 781 # EOF
779 782 raise EOFError
780 783 return value
781 784
782 785 def _complete(self, msg):
783 786 c = msg['content']
784 787 try:
785 788 cpos = int(c['cursor_pos'])
786 789 except:
787 790 # If we don't get something that we can convert to an integer, at
788 791 # least attempt the completion guessing the cursor is at the end of
789 792 # the text, if there's any, and otherwise of the line
790 793 cpos = len(c['text'])
791 794 if cpos==0:
792 795 cpos = len(c['line'])
793 796 return self.shell.complete(c['text'], c['line'], cpos)
794 797
795 798 def _at_shutdown(self):
796 799 """Actions taken at shutdown by the kernel, called by python's atexit.
797 800 """
798 801 # io.rprint("Kernel at_shutdown") # dbg
799 802 if self._shutdown_message is not None:
800 803 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
801 804 self.log.debug("%s", self._shutdown_message)
802 805 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
803 806
General Comments 0
You need to be logged in to leave comments. Login now