##// END OF EJS Templates
flush replies when entering an eventloop...
MinRK -
Show More
1 NO CONTENT: modified file
@@ -1,795 +1,799 b''
1 1 #!/usr/bin/env python
2 2 """An interactive kernel that talks to frontends over 0MQ."""
3 3
4 4 #-----------------------------------------------------------------------------
5 5 # Imports
6 6 #-----------------------------------------------------------------------------
7 7 from __future__ import print_function
8 8
9 9 # Standard library imports
10 10 import sys
11 11 import time
12 12 import traceback
13 13 import logging
14 14 import uuid
15 15
16 16 from datetime import datetime
17 17 from signal import (
18 18 signal, default_int_handler, SIGINT
19 19 )
20 20
21 21 # System library imports
22 22 import zmq
23 23 from zmq.eventloop import ioloop
24 24 from zmq.eventloop.zmqstream import ZMQStream
25 25
26 26 # Local imports
27 27 from IPython.config.configurable import Configurable
28 28 from IPython.core.error import StdinNotImplementedError
29 29 from IPython.core import release
30 30 from IPython.utils import py3compat
31 31 from IPython.utils.py3compat import builtin_mod, unicode_type, string_types
32 32 from IPython.utils.jsonutil import json_clean
33 33 from IPython.utils.traitlets import (
34 34 Any, Instance, Float, Dict, List, Set, Integer, Unicode,
35 35 Type, Bool,
36 36 )
37 37
38 38 from .serialize import serialize_object, unpack_apply_message
39 39 from .session import Session
40 40 from .zmqshell import ZMQInteractiveShell
41 41
42 42
43 43 #-----------------------------------------------------------------------------
44 44 # Main kernel class
45 45 #-----------------------------------------------------------------------------
46 46
47 47 protocol_version = list(release.kernel_protocol_version_info)
48 48 ipython_version = list(release.version_info)
49 49 language_version = list(sys.version_info[:3])
50 50
51 51
52 52 class Kernel(Configurable):
53 53
54 54 #---------------------------------------------------------------------------
55 55 # Kernel interface
56 56 #---------------------------------------------------------------------------
57 57
58 58 # attribute to override with a GUI
59 59 eventloop = Any(None)
60 60 def _eventloop_changed(self, name, old, new):
61 61 """schedule call to eventloop from IOLoop"""
62 62 loop = ioloop.IOLoop.instance()
63 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
63 loop.add_callback(self.enter_eventloop)
64 64
65 65 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
66 66 shell_class = Type(ZMQInteractiveShell)
67 67
68 68 session = Instance(Session)
69 69 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
70 70 shell_streams = List()
71 71 control_stream = Instance(ZMQStream)
72 72 iopub_socket = Instance(zmq.Socket)
73 73 stdin_socket = Instance(zmq.Socket)
74 74 log = Instance(logging.Logger)
75 75
76 76 user_module = Any()
77 77 def _user_module_changed(self, name, old, new):
78 78 if self.shell is not None:
79 79 self.shell.user_module = new
80 80
81 81 user_ns = Instance(dict, args=None, allow_none=True)
82 82 def _user_ns_changed(self, name, old, new):
83 83 if self.shell is not None:
84 84 self.shell.user_ns = new
85 85 self.shell.init_user_ns()
86 86
87 87 # identities:
88 88 int_id = Integer(-1)
89 89 ident = Unicode()
90 90
91 91 def _ident_default(self):
92 92 return unicode_type(uuid.uuid4())
93 93
94 94 # Private interface
95 95
96 96 _darwin_app_nap = Bool(True, config=True,
97 97 help="""Whether to use appnope for compatiblity with OS X App Nap.
98 98
99 99 Only affects OS X >= 10.9.
100 100 """
101 101 )
102 102
103 103 # Time to sleep after flushing the stdout/err buffers in each execute
104 104 # cycle. While this introduces a hard limit on the minimal latency of the
105 105 # execute cycle, it helps prevent output synchronization problems for
106 106 # clients.
107 107 # Units are in seconds. The minimum zmq latency on local host is probably
108 108 # ~150 microseconds, set this to 500us for now. We may need to increase it
109 109 # a little if it's not enough after more interactive testing.
110 110 _execute_sleep = Float(0.0005, config=True)
111 111
112 112 # Frequency of the kernel's event loop.
113 113 # Units are in seconds, kernel subclasses for GUI toolkits may need to
114 114 # adapt to milliseconds.
115 115 _poll_interval = Float(0.05, config=True)
116 116
117 117 # If the shutdown was requested over the network, we leave here the
118 118 # necessary reply message so it can be sent by our registered atexit
119 119 # handler. This ensures that the reply is only sent to clients truly at
120 120 # the end of our shutdown process (which happens after the underlying
121 121 # IPython shell's own shutdown).
122 122 _shutdown_message = None
123 123
124 124 # This is a dict of port number that the kernel is listening on. It is set
125 125 # by record_ports and used by connect_request.
126 126 _recorded_ports = Dict()
127 127
128 128 # A reference to the Python builtin 'raw_input' function.
129 129 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
130 130 _sys_raw_input = Any()
131 131 _sys_eval_input = Any()
132 132
133 133 # set of aborted msg_ids
134 134 aborted = Set()
135 135
136 136
137 137 def __init__(self, **kwargs):
138 138 super(Kernel, self).__init__(**kwargs)
139 139
140 140 # Initialize the InteractiveShell subclass
141 141 self.shell = self.shell_class.instance(parent=self,
142 142 profile_dir = self.profile_dir,
143 143 user_module = self.user_module,
144 144 user_ns = self.user_ns,
145 145 kernel = self,
146 146 )
147 147 self.shell.displayhook.session = self.session
148 148 self.shell.displayhook.pub_socket = self.iopub_socket
149 149 self.shell.displayhook.topic = self._topic('pyout')
150 150 self.shell.display_pub.session = self.session
151 151 self.shell.display_pub.pub_socket = self.iopub_socket
152 152 self.shell.data_pub.session = self.session
153 153 self.shell.data_pub.pub_socket = self.iopub_socket
154 154
155 155 # TMP - hack while developing
156 156 self.shell._reply_content = None
157 157
158 158 # Build dict of handlers for message types
159 159 msg_types = [ 'execute_request', 'complete_request',
160 160 'object_info_request', 'history_request',
161 161 'kernel_info_request',
162 162 'connect_request', 'shutdown_request',
163 163 'apply_request',
164 164 ]
165 165 self.shell_handlers = {}
166 166 for msg_type in msg_types:
167 167 self.shell_handlers[msg_type] = getattr(self, msg_type)
168 168
169 169 comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]
170 170 comm_manager = self.shell.comm_manager
171 171 for msg_type in comm_msg_types:
172 172 self.shell_handlers[msg_type] = getattr(comm_manager, msg_type)
173 173
174 174 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
175 175 self.control_handlers = {}
176 176 for msg_type in control_msg_types:
177 177 self.control_handlers[msg_type] = getattr(self, msg_type)
178 178
179 179
180 180 def dispatch_control(self, msg):
181 181 """dispatch control requests"""
182 182 idents,msg = self.session.feed_identities(msg, copy=False)
183 183 try:
184 184 msg = self.session.unserialize(msg, content=True, copy=False)
185 185 except:
186 186 self.log.error("Invalid Control Message", exc_info=True)
187 187 return
188 188
189 189 self.log.debug("Control received: %s", msg)
190 190
191 191 header = msg['header']
192 192 msg_id = header['msg_id']
193 193 msg_type = header['msg_type']
194 194
195 195 handler = self.control_handlers.get(msg_type, None)
196 196 if handler is None:
197 197 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
198 198 else:
199 199 try:
200 200 handler(self.control_stream, idents, msg)
201 201 except Exception:
202 202 self.log.error("Exception in control handler:", exc_info=True)
203 203
204 204 def dispatch_shell(self, stream, msg):
205 205 """dispatch shell requests"""
206 206 # flush control requests first
207 207 if self.control_stream:
208 208 self.control_stream.flush()
209 209
210 210 idents,msg = self.session.feed_identities(msg, copy=False)
211 211 try:
212 212 msg = self.session.unserialize(msg, content=True, copy=False)
213 213 except:
214 214 self.log.error("Invalid Message", exc_info=True)
215 215 return
216 216
217 217 header = msg['header']
218 218 msg_id = header['msg_id']
219 219 msg_type = msg['header']['msg_type']
220 220
221 221 # Print some info about this message and leave a '--->' marker, so it's
222 222 # easier to trace visually the message chain when debugging. Each
223 223 # handler prints its message at the end.
224 224 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
225 225 self.log.debug(' Content: %s\n --->\n ', msg['content'])
226 226
227 227 if msg_id in self.aborted:
228 228 self.aborted.remove(msg_id)
229 229 # is it safe to assume a msg_id will not be resubmitted?
230 230 reply_type = msg_type.split('_')[0] + '_reply'
231 231 status = {'status' : 'aborted'}
232 232 md = {'engine' : self.ident}
233 233 md.update(status)
234 234 reply_msg = self.session.send(stream, reply_type, metadata=md,
235 235 content=status, parent=msg, ident=idents)
236 236 return
237 237
238 238 handler = self.shell_handlers.get(msg_type, None)
239 239 if handler is None:
240 240 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
241 241 else:
242 242 # ensure default_int_handler during handler call
243 243 sig = signal(SIGINT, default_int_handler)
244 244 try:
245 245 handler(stream, idents, msg)
246 246 except Exception:
247 247 self.log.error("Exception in message handler:", exc_info=True)
248 248 finally:
249 249 signal(SIGINT, sig)
250 250
251 251 def enter_eventloop(self):
252 252 """enter eventloop"""
253 self.log.info("entering eventloop")
253 self.log.info("entering eventloop %s", self.eventloop)
254 for stream in self.shell_streams:
255 # flush any pending replies,
256 # which may be skipped by entering the eventloop
257 stream.flush(zmq.POLLOUT)
254 258 # restore default_int_handler
255 259 signal(SIGINT, default_int_handler)
256 260 while self.eventloop is not None:
257 261 try:
258 262 self.eventloop(self)
259 263 except KeyboardInterrupt:
260 264 # Ctrl-C shouldn't crash the kernel
261 265 self.log.error("KeyboardInterrupt caught in kernel")
262 266 continue
263 267 else:
264 268 # eventloop exited cleanly, this means we should stop (right?)
265 269 self.eventloop = None
266 270 break
267 271 self.log.info("exiting eventloop")
268 272
269 273 def start(self):
270 274 """register dispatchers for streams"""
271 275 self.shell.exit_now = False
272 276 if self.control_stream:
273 277 self.control_stream.on_recv(self.dispatch_control, copy=False)
274 278
275 279 def make_dispatcher(stream):
276 280 def dispatcher(msg):
277 281 return self.dispatch_shell(stream, msg)
278 282 return dispatcher
279 283
280 284 for s in self.shell_streams:
281 285 s.on_recv(make_dispatcher(s), copy=False)
282 286
283 287 # publish idle status
284 288 self._publish_status('starting')
285 289
286 290 def do_one_iteration(self):
287 291 """step eventloop just once"""
288 292 if self.control_stream:
289 293 self.control_stream.flush()
290 294 for stream in self.shell_streams:
291 295 # handle at most one request per iteration
292 296 stream.flush(zmq.POLLIN, 1)
293 297 stream.flush(zmq.POLLOUT)
294 298
295 299
296 300 def record_ports(self, ports):
297 301 """Record the ports that this kernel is using.
298 302
299 303 The creator of the Kernel instance must call this methods if they
300 304 want the :meth:`connect_request` method to return the port numbers.
301 305 """
302 306 self._recorded_ports = ports
303 307
304 308 #---------------------------------------------------------------------------
305 309 # Kernel request handlers
306 310 #---------------------------------------------------------------------------
307 311
308 312 def _make_metadata(self, other=None):
309 313 """init metadata dict, for execute/apply_reply"""
310 314 new_md = {
311 315 'dependencies_met' : True,
312 316 'engine' : self.ident,
313 317 'started': datetime.now(),
314 318 }
315 319 if other:
316 320 new_md.update(other)
317 321 return new_md
318 322
319 323 def _publish_pyin(self, code, parent, execution_count):
320 324 """Publish the code request on the pyin stream."""
321 325
322 326 self.session.send(self.iopub_socket, u'pyin',
323 327 {u'code':code, u'execution_count': execution_count},
324 328 parent=parent, ident=self._topic('pyin')
325 329 )
326 330
327 331 def _publish_status(self, status, parent=None):
328 332 """send status (busy/idle) on IOPub"""
329 333 self.session.send(self.iopub_socket,
330 334 u'status',
331 335 {u'execution_state': status},
332 336 parent=parent,
333 337 ident=self._topic('status'),
334 338 )
335 339
336 340
337 341 def execute_request(self, stream, ident, parent):
338 342 """handle an execute_request"""
339 343
340 344 self._publish_status(u'busy', parent)
341 345
342 346 try:
343 347 content = parent[u'content']
344 348 code = py3compat.cast_unicode_py2(content[u'code'])
345 349 silent = content[u'silent']
346 350 store_history = content.get(u'store_history', not silent)
347 351 except:
348 352 self.log.error("Got bad msg: ")
349 353 self.log.error("%s", parent)
350 354 return
351 355
352 356 md = self._make_metadata(parent['metadata'])
353 357
354 358 shell = self.shell # we'll need this a lot here
355 359
356 360 # Replace raw_input. Note that is not sufficient to replace
357 361 # raw_input in the user namespace.
358 362 if content.get('allow_stdin', False):
359 363 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
360 364 input = lambda prompt='': eval(raw_input(prompt))
361 365 else:
362 366 raw_input = input = lambda prompt='' : self._no_raw_input()
363 367
364 368 if py3compat.PY3:
365 369 self._sys_raw_input = builtin_mod.input
366 370 builtin_mod.input = raw_input
367 371 else:
368 372 self._sys_raw_input = builtin_mod.raw_input
369 373 self._sys_eval_input = builtin_mod.input
370 374 builtin_mod.raw_input = raw_input
371 375 builtin_mod.input = input
372 376
373 377 # Set the parent message of the display hook and out streams.
374 378 shell.set_parent(parent)
375 379
376 380 # Re-broadcast our input for the benefit of listening clients, and
377 381 # start computing output
378 382 if not silent:
379 383 self._publish_pyin(code, parent, shell.execution_count)
380 384
381 385 reply_content = {}
382 386 try:
383 387 # FIXME: the shell calls the exception handler itself.
384 388 shell.run_cell(code, store_history=store_history, silent=silent)
385 389 except:
386 390 status = u'error'
387 391 # FIXME: this code right now isn't being used yet by default,
388 392 # because the run_cell() call above directly fires off exception
389 393 # reporting. This code, therefore, is only active in the scenario
390 394 # where runlines itself has an unhandled exception. We need to
391 395 # uniformize this, for all exception construction to come from a
392 396 # single location in the codbase.
393 397 etype, evalue, tb = sys.exc_info()
394 398 tb_list = traceback.format_exception(etype, evalue, tb)
395 399 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
396 400 else:
397 401 status = u'ok'
398 402 finally:
399 403 # Restore raw_input.
400 404 if py3compat.PY3:
401 405 builtin_mod.input = self._sys_raw_input
402 406 else:
403 407 builtin_mod.raw_input = self._sys_raw_input
404 408 builtin_mod.input = self._sys_eval_input
405 409
406 410 reply_content[u'status'] = status
407 411
408 412 # Return the execution counter so clients can display prompts
409 413 reply_content['execution_count'] = shell.execution_count - 1
410 414
411 415 # FIXME - fish exception info out of shell, possibly left there by
412 416 # runlines. We'll need to clean up this logic later.
413 417 if shell._reply_content is not None:
414 418 reply_content.update(shell._reply_content)
415 419 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
416 420 reply_content['engine_info'] = e_info
417 421 # reset after use
418 422 shell._reply_content = None
419 423
420 424 if 'traceback' in reply_content:
421 425 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
422 426
423 427
424 428 # At this point, we can tell whether the main code execution succeeded
425 429 # or not. If it did, we proceed to evaluate user_variables/expressions
426 430 if reply_content['status'] == 'ok':
427 431 reply_content[u'user_variables'] = \
428 432 shell.user_variables(content.get(u'user_variables', []))
429 433 reply_content[u'user_expressions'] = \
430 434 shell.user_expressions(content.get(u'user_expressions', {}))
431 435 else:
432 436 # If there was an error, don't even try to compute variables or
433 437 # expressions
434 438 reply_content[u'user_variables'] = {}
435 439 reply_content[u'user_expressions'] = {}
436 440
437 441 # Payloads should be retrieved regardless of outcome, so we can both
438 442 # recover partial output (that could have been generated early in a
439 443 # block, before an error) and clear the payload system always.
440 444 reply_content[u'payload'] = shell.payload_manager.read_payload()
441 445 # Be agressive about clearing the payload because we don't want
442 446 # it to sit in memory until the next execute_request comes in.
443 447 shell.payload_manager.clear_payload()
444 448
445 449 # Flush output before sending the reply.
446 450 sys.stdout.flush()
447 451 sys.stderr.flush()
448 452 # FIXME: on rare occasions, the flush doesn't seem to make it to the
449 453 # clients... This seems to mitigate the problem, but we definitely need
450 454 # to better understand what's going on.
451 455 if self._execute_sleep:
452 456 time.sleep(self._execute_sleep)
453 457
454 458 # Send the reply.
455 459 reply_content = json_clean(reply_content)
456 460
457 461 md['status'] = reply_content['status']
458 462 if reply_content['status'] == 'error' and \
459 463 reply_content['ename'] == 'UnmetDependency':
460 464 md['dependencies_met'] = False
461 465
462 466 reply_msg = self.session.send(stream, u'execute_reply',
463 467 reply_content, parent, metadata=md,
464 468 ident=ident)
465 469
466 470 self.log.debug("%s", reply_msg)
467 471
468 472 if not silent and reply_msg['content']['status'] == u'error':
469 473 self._abort_queues()
470 474
471 475 self._publish_status(u'idle', parent)
472 476
473 477 def complete_request(self, stream, ident, parent):
474 478 txt, matches = self._complete(parent)
475 479 matches = {'matches' : matches,
476 480 'matched_text' : txt,
477 481 'status' : 'ok'}
478 482 matches = json_clean(matches)
479 483 completion_msg = self.session.send(stream, 'complete_reply',
480 484 matches, parent, ident)
481 485 self.log.debug("%s", completion_msg)
482 486
483 487 def object_info_request(self, stream, ident, parent):
484 488 content = parent['content']
485 489 object_info = self.shell.object_inspect(content['oname'],
486 490 detail_level = content.get('detail_level', 0)
487 491 )
488 492 # Before we send this object over, we scrub it for JSON usage
489 493 oinfo = json_clean(object_info)
490 494 msg = self.session.send(stream, 'object_info_reply',
491 495 oinfo, parent, ident)
492 496 self.log.debug("%s", msg)
493 497
494 498 def history_request(self, stream, ident, parent):
495 499 # We need to pull these out, as passing **kwargs doesn't work with
496 500 # unicode keys before Python 2.6.5.
497 501 hist_access_type = parent['content']['hist_access_type']
498 502 raw = parent['content']['raw']
499 503 output = parent['content']['output']
500 504 if hist_access_type == 'tail':
501 505 n = parent['content']['n']
502 506 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
503 507 include_latest=True)
504 508
505 509 elif hist_access_type == 'range':
506 510 session = parent['content']['session']
507 511 start = parent['content']['start']
508 512 stop = parent['content']['stop']
509 513 hist = self.shell.history_manager.get_range(session, start, stop,
510 514 raw=raw, output=output)
511 515
512 516 elif hist_access_type == 'search':
513 517 n = parent['content'].get('n')
514 518 unique = parent['content'].get('unique', False)
515 519 pattern = parent['content']['pattern']
516 520 hist = self.shell.history_manager.search(
517 521 pattern, raw=raw, output=output, n=n, unique=unique)
518 522
519 523 else:
520 524 hist = []
521 525 hist = list(hist)
522 526 content = {'history' : hist}
523 527 content = json_clean(content)
524 528 msg = self.session.send(stream, 'history_reply',
525 529 content, parent, ident)
526 530 self.log.debug("Sending history reply with %i entries", len(hist))
527 531
528 532 def connect_request(self, stream, ident, parent):
529 533 if self._recorded_ports is not None:
530 534 content = self._recorded_ports.copy()
531 535 else:
532 536 content = {}
533 537 msg = self.session.send(stream, 'connect_reply',
534 538 content, parent, ident)
535 539 self.log.debug("%s", msg)
536 540
537 541 def kernel_info_request(self, stream, ident, parent):
538 542 vinfo = {
539 543 'protocol_version': protocol_version,
540 544 'ipython_version': ipython_version,
541 545 'language_version': language_version,
542 546 'language': 'python',
543 547 }
544 548 msg = self.session.send(stream, 'kernel_info_reply',
545 549 vinfo, parent, ident)
546 550 self.log.debug("%s", msg)
547 551
548 552 def shutdown_request(self, stream, ident, parent):
549 553 self.shell.exit_now = True
550 554 content = dict(status='ok')
551 555 content.update(parent['content'])
552 556 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
553 557 # same content, but different msg_id for broadcasting on IOPub
554 558 self._shutdown_message = self.session.msg(u'shutdown_reply',
555 559 content, parent
556 560 )
557 561
558 562 self._at_shutdown()
559 563 # call sys.exit after a short delay
560 564 loop = ioloop.IOLoop.instance()
561 565 loop.add_timeout(time.time()+0.1, loop.stop)
562 566
563 567 #---------------------------------------------------------------------------
564 568 # Engine methods
565 569 #---------------------------------------------------------------------------
566 570
567 571 def apply_request(self, stream, ident, parent):
568 572 try:
569 573 content = parent[u'content']
570 574 bufs = parent[u'buffers']
571 575 msg_id = parent['header']['msg_id']
572 576 except:
573 577 self.log.error("Got bad msg: %s", parent, exc_info=True)
574 578 return
575 579
576 580 self._publish_status(u'busy', parent)
577 581
578 582 # Set the parent message of the display hook and out streams.
579 583 shell = self.shell
580 584 shell.set_parent(parent)
581 585
582 586 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
583 587 # self.iopub_socket.send(pyin_msg)
584 588 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
585 589 md = self._make_metadata(parent['metadata'])
586 590 try:
587 591 working = shell.user_ns
588 592
589 593 prefix = "_"+str(msg_id).replace("-","")+"_"
590 594
591 595 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
592 596
593 597 fname = getattr(f, '__name__', 'f')
594 598
595 599 fname = prefix+"f"
596 600 argname = prefix+"args"
597 601 kwargname = prefix+"kwargs"
598 602 resultname = prefix+"result"
599 603
600 604 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
601 605 # print ns
602 606 working.update(ns)
603 607 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
604 608 try:
605 609 exec(code, shell.user_global_ns, shell.user_ns)
606 610 result = working.get(resultname)
607 611 finally:
608 612 for key in ns:
609 613 working.pop(key)
610 614
611 615 result_buf = serialize_object(result,
612 616 buffer_threshold=self.session.buffer_threshold,
613 617 item_threshold=self.session.item_threshold,
614 618 )
615 619
616 620 except:
617 621 # invoke IPython traceback formatting
618 622 shell.showtraceback()
619 623 # FIXME - fish exception info out of shell, possibly left there by
620 624 # run_code. We'll need to clean up this logic later.
621 625 reply_content = {}
622 626 if shell._reply_content is not None:
623 627 reply_content.update(shell._reply_content)
624 628 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
625 629 reply_content['engine_info'] = e_info
626 630 # reset after use
627 631 shell._reply_content = None
628 632
629 633 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
630 634 ident=self._topic('pyerr'))
631 635 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
632 636 result_buf = []
633 637
634 638 if reply_content['ename'] == 'UnmetDependency':
635 639 md['dependencies_met'] = False
636 640 else:
637 641 reply_content = {'status' : 'ok'}
638 642
639 643 # put 'ok'/'error' status in header, for scheduler introspection:
640 644 md['status'] = reply_content['status']
641 645
642 646 # flush i/o
643 647 sys.stdout.flush()
644 648 sys.stderr.flush()
645 649
646 650 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
647 651 parent=parent, ident=ident,buffers=result_buf, metadata=md)
648 652
649 653 self._publish_status(u'idle', parent)
650 654
651 655 #---------------------------------------------------------------------------
652 656 # Control messages
653 657 #---------------------------------------------------------------------------
654 658
655 659 def abort_request(self, stream, ident, parent):
656 660 """abort a specifig msg by id"""
657 661 msg_ids = parent['content'].get('msg_ids', None)
658 662 if isinstance(msg_ids, string_types):
659 663 msg_ids = [msg_ids]
660 664 if not msg_ids:
661 665 self.abort_queues()
662 666 for mid in msg_ids:
663 667 self.aborted.add(str(mid))
664 668
665 669 content = dict(status='ok')
666 670 reply_msg = self.session.send(stream, 'abort_reply', content=content,
667 671 parent=parent, ident=ident)
668 672 self.log.debug("%s", reply_msg)
669 673
670 674 def clear_request(self, stream, idents, parent):
671 675 """Clear our namespace."""
672 676 self.shell.reset(False)
673 677 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
674 678 content = dict(status='ok'))
675 679
676 680
677 681 #---------------------------------------------------------------------------
678 682 # Protected interface
679 683 #---------------------------------------------------------------------------
680 684
681 685 def _wrap_exception(self, method=None):
682 686 # import here, because _wrap_exception is only used in parallel,
683 687 # and parallel has higher min pyzmq version
684 688 from IPython.parallel.error import wrap_exception
685 689 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
686 690 content = wrap_exception(e_info)
687 691 return content
688 692
689 693 def _topic(self, topic):
690 694 """prefixed topic for IOPub messages"""
691 695 if self.int_id >= 0:
692 696 base = "engine.%i" % self.int_id
693 697 else:
694 698 base = "kernel.%s" % self.ident
695 699
696 700 return py3compat.cast_bytes("%s.%s" % (base, topic))
697 701
698 702 def _abort_queues(self):
699 703 for stream in self.shell_streams:
700 704 if stream:
701 705 self._abort_queue(stream)
702 706
703 707 def _abort_queue(self, stream):
704 708 poller = zmq.Poller()
705 709 poller.register(stream.socket, zmq.POLLIN)
706 710 while True:
707 711 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
708 712 if msg is None:
709 713 return
710 714
711 715 self.log.info("Aborting:")
712 716 self.log.info("%s", msg)
713 717 msg_type = msg['header']['msg_type']
714 718 reply_type = msg_type.split('_')[0] + '_reply'
715 719
716 720 status = {'status' : 'aborted'}
717 721 md = {'engine' : self.ident}
718 722 md.update(status)
719 723 reply_msg = self.session.send(stream, reply_type, metadata=md,
720 724 content=status, parent=msg, ident=idents)
721 725 self.log.debug("%s", reply_msg)
722 726 # We need to wait a bit for requests to come in. This can probably
723 727 # be set shorter for true asynchronous clients.
724 728 poller.poll(50)
725 729
726 730
727 731 def _no_raw_input(self):
728 732 """Raise StdinNotImplentedError if active frontend doesn't support
729 733 stdin."""
730 734 raise StdinNotImplementedError("raw_input was called, but this "
731 735 "frontend does not support stdin.")
732 736
733 737 def _raw_input(self, prompt, ident, parent):
734 738 # Flush output before making the request.
735 739 sys.stderr.flush()
736 740 sys.stdout.flush()
737 741 # flush the stdin socket, to purge stale replies
738 742 while True:
739 743 try:
740 744 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
741 745 except zmq.ZMQError as e:
742 746 if e.errno == zmq.EAGAIN:
743 747 break
744 748 else:
745 749 raise
746 750
747 751 # Send the input request.
748 752 content = json_clean(dict(prompt=prompt))
749 753 self.session.send(self.stdin_socket, u'input_request', content, parent,
750 754 ident=ident)
751 755
752 756 # Await a response.
753 757 while True:
754 758 try:
755 759 ident, reply = self.session.recv(self.stdin_socket, 0)
756 760 except Exception:
757 761 self.log.warn("Invalid Message:", exc_info=True)
758 762 except KeyboardInterrupt:
759 763 # re-raise KeyboardInterrupt, to truncate traceback
760 764 raise KeyboardInterrupt
761 765 else:
762 766 break
763 767 try:
764 768 value = py3compat.unicode_to_str(reply['content']['value'])
765 769 except:
766 770 self.log.error("Got bad raw_input reply: ")
767 771 self.log.error("%s", parent)
768 772 value = ''
769 773 if value == '\x04':
770 774 # EOF
771 775 raise EOFError
772 776 return value
773 777
774 778 def _complete(self, msg):
775 779 c = msg['content']
776 780 try:
777 781 cpos = int(c['cursor_pos'])
778 782 except:
779 783 # If we don't get something that we can convert to an integer, at
780 784 # least attempt the completion guessing the cursor is at the end of
781 785 # the text, if there's any, and otherwise of the line
782 786 cpos = len(c['text'])
783 787 if cpos==0:
784 788 cpos = len(c['line'])
785 789 return self.shell.complete(c['text'], c['line'], cpos)
786 790
787 791 def _at_shutdown(self):
788 792 """Actions taken at shutdown by the kernel, called by python's atexit.
789 793 """
790 794 # io.rprint("Kernel at_shutdown") # dbg
791 795 if self._shutdown_message is not None:
792 796 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
793 797 self.log.debug("%s", self._shutdown_message)
794 798 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
795 799
General Comments 0
You need to be logged in to leave comments. Login now