##// END OF EJS Templates
ensure raw_input returns str in zmq shell...
MinRK -
Show More
@@ -1,804 +1,804 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 7 call set_parent on all the PUB objects with the message about to be executed.
8 8 * Implement random port and security key logic.
9 9 * Implement control messages.
10 10 * Implement event loop and poll version.
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 # Standard library imports
19 19 import __builtin__
20 20 import sys
21 21 import time
22 22 import traceback
23 23 import logging
24 24 import uuid
25 25
26 26 from datetime import datetime
27 27 from signal import (
28 28 signal, getsignal, default_int_handler, SIGINT, SIG_IGN
29 29 )
30 30
31 31 # System library imports
32 32 import zmq
33 33 from zmq.eventloop import ioloop
34 34 from zmq.eventloop.zmqstream import ZMQStream
35 35
36 36 # Local imports
37 37 from IPython.config.configurable import Configurable
38 38 from IPython.core.error import StdinNotImplementedError
39 39 from IPython.core import release
40 40 from IPython.utils import io
41 41 from IPython.utils import py3compat
42 42 from IPython.utils.jsonutil import json_clean
43 43 from IPython.utils.traitlets import (
44 44 Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode,
45 45 Type
46 46 )
47 47
48 48 from serialize import serialize_object, unpack_apply_message
49 49 from session import Session
50 50 from zmqshell import ZMQInteractiveShell
51 51
52 52
53 53 #-----------------------------------------------------------------------------
54 54 # Main kernel class
55 55 #-----------------------------------------------------------------------------
56 56
57 57 protocol_version = list(release.kernel_protocol_version_info)
58 58 ipython_version = list(release.version_info)
59 59 language_version = list(sys.version_info[:3])
60 60
61 61
62 62 class Kernel(Configurable):
63 63
64 64 #---------------------------------------------------------------------------
65 65 # Kernel interface
66 66 #---------------------------------------------------------------------------
67 67
68 68 # attribute to override with a GUI
69 69 eventloop = Any(None)
70 70 def _eventloop_changed(self, name, old, new):
71 71 """schedule call to eventloop from IOLoop"""
72 72 loop = ioloop.IOLoop.instance()
73 73 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
74 74
75 75 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
76 76 shell_class = Type(ZMQInteractiveShell)
77 77
78 78 session = Instance(Session)
79 79 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
80 80 shell_streams = List()
81 81 control_stream = Instance(ZMQStream)
82 82 iopub_socket = Instance(zmq.Socket)
83 83 stdin_socket = Instance(zmq.Socket)
84 84 log = Instance(logging.Logger)
85 85
86 86 user_module = Any()
87 87 def _user_module_changed(self, name, old, new):
88 88 if self.shell is not None:
89 89 self.shell.user_module = new
90 90
91 91 user_ns = Dict(default_value=None)
92 92 def _user_ns_changed(self, name, old, new):
93 93 if self.shell is not None:
94 94 self.shell.user_ns = new
95 95 self.shell.init_user_ns()
96 96
97 97 # identities:
98 98 int_id = Integer(-1)
99 99 ident = Unicode()
100 100
101 101 def _ident_default(self):
102 102 return unicode(uuid.uuid4())
103 103
104 104
105 105 # Private interface
106 106
107 107 # Time to sleep after flushing the stdout/err buffers in each execute
108 108 # cycle. While this introduces a hard limit on the minimal latency of the
109 109 # execute cycle, it helps prevent output synchronization problems for
110 110 # clients.
111 111 # Units are in seconds. The minimum zmq latency on local host is probably
112 112 # ~150 microseconds, set this to 500us for now. We may need to increase it
113 113 # a little if it's not enough after more interactive testing.
114 114 _execute_sleep = Float(0.0005, config=True)
115 115
116 116 # Frequency of the kernel's event loop.
117 117 # Units are in seconds, kernel subclasses for GUI toolkits may need to
118 118 # adapt to milliseconds.
119 119 _poll_interval = Float(0.05, config=True)
120 120
121 121 # If the shutdown was requested over the network, we leave here the
122 122 # necessary reply message so it can be sent by our registered atexit
123 123 # handler. This ensures that the reply is only sent to clients truly at
124 124 # the end of our shutdown process (which happens after the underlying
125 125 # IPython shell's own shutdown).
126 126 _shutdown_message = None
127 127
128 128 # This is a dict of port number that the kernel is listening on. It is set
129 129 # by record_ports and used by connect_request.
130 130 _recorded_ports = Dict()
131 131
132 132 # A reference to the Python builtin 'raw_input' function.
133 133 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
134 134 _sys_raw_input = Any()
135 135
136 136 # set of aborted msg_ids
137 137 aborted = Set()
138 138
139 139
140 140 def __init__(self, **kwargs):
141 141 super(Kernel, self).__init__(**kwargs)
142 142
143 143 # Initialize the InteractiveShell subclass
144 144 self.shell = self.shell_class.instance(parent=self,
145 145 profile_dir = self.profile_dir,
146 146 user_module = self.user_module,
147 147 user_ns = self.user_ns,
148 148 )
149 149 self.shell.displayhook.session = self.session
150 150 self.shell.displayhook.pub_socket = self.iopub_socket
151 151 self.shell.displayhook.topic = self._topic('pyout')
152 152 self.shell.display_pub.session = self.session
153 153 self.shell.display_pub.pub_socket = self.iopub_socket
154 154 self.shell.data_pub.session = self.session
155 155 self.shell.data_pub.pub_socket = self.iopub_socket
156 156
157 157 # TMP - hack while developing
158 158 self.shell._reply_content = None
159 159
160 160 # Build dict of handlers for message types
161 161 msg_types = [ 'execute_request', 'complete_request',
162 162 'object_info_request', 'history_request',
163 163 'kernel_info_request',
164 164 'connect_request', 'shutdown_request',
165 165 'apply_request',
166 166 ]
167 167 self.shell_handlers = {}
168 168 for msg_type in msg_types:
169 169 self.shell_handlers[msg_type] = getattr(self, msg_type)
170 170
171 171 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
172 172 self.control_handlers = {}
173 173 for msg_type in control_msg_types:
174 174 self.control_handlers[msg_type] = getattr(self, msg_type)
175 175
176 176 def dispatch_control(self, msg):
177 177 """dispatch control requests"""
178 178 idents,msg = self.session.feed_identities(msg, copy=False)
179 179 try:
180 180 msg = self.session.unserialize(msg, content=True, copy=False)
181 181 except:
182 182 self.log.error("Invalid Control Message", exc_info=True)
183 183 return
184 184
185 185 self.log.debug("Control received: %s", msg)
186 186
187 187 header = msg['header']
188 188 msg_id = header['msg_id']
189 189 msg_type = header['msg_type']
190 190
191 191 handler = self.control_handlers.get(msg_type, None)
192 192 if handler is None:
193 193 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
194 194 else:
195 195 try:
196 196 handler(self.control_stream, idents, msg)
197 197 except Exception:
198 198 self.log.error("Exception in control handler:", exc_info=True)
199 199
200 200 def dispatch_shell(self, stream, msg):
201 201 """dispatch shell requests"""
202 202 # flush control requests first
203 203 if self.control_stream:
204 204 self.control_stream.flush()
205 205
206 206 idents,msg = self.session.feed_identities(msg, copy=False)
207 207 try:
208 208 msg = self.session.unserialize(msg, content=True, copy=False)
209 209 except:
210 210 self.log.error("Invalid Message", exc_info=True)
211 211 return
212 212
213 213 header = msg['header']
214 214 msg_id = header['msg_id']
215 215 msg_type = msg['header']['msg_type']
216 216
217 217 # Print some info about this message and leave a '--->' marker, so it's
218 218 # easier to trace visually the message chain when debugging. Each
219 219 # handler prints its message at the end.
220 220 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
221 221 self.log.debug(' Content: %s\n --->\n ', msg['content'])
222 222
223 223 if msg_id in self.aborted:
224 224 self.aborted.remove(msg_id)
225 225 # is it safe to assume a msg_id will not be resubmitted?
226 226 reply_type = msg_type.split('_')[0] + '_reply'
227 227 status = {'status' : 'aborted'}
228 228 md = {'engine' : self.ident}
229 229 md.update(status)
230 230 reply_msg = self.session.send(stream, reply_type, metadata=md,
231 231 content=status, parent=msg, ident=idents)
232 232 return
233 233
234 234 handler = self.shell_handlers.get(msg_type, None)
235 235 if handler is None:
236 236 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
237 237 else:
238 238 # ensure default_int_handler during handler call
239 239 sig = signal(SIGINT, default_int_handler)
240 240 try:
241 241 handler(stream, idents, msg)
242 242 except Exception:
243 243 self.log.error("Exception in message handler:", exc_info=True)
244 244 finally:
245 245 signal(SIGINT, sig)
246 246
247 247 def enter_eventloop(self):
248 248 """enter eventloop"""
249 249 self.log.info("entering eventloop")
250 250 # restore default_int_handler
251 251 signal(SIGINT, default_int_handler)
252 252 while self.eventloop is not None:
253 253 try:
254 254 self.eventloop(self)
255 255 except KeyboardInterrupt:
256 256 # Ctrl-C shouldn't crash the kernel
257 257 self.log.error("KeyboardInterrupt caught in kernel")
258 258 continue
259 259 else:
260 260 # eventloop exited cleanly, this means we should stop (right?)
261 261 self.eventloop = None
262 262 break
263 263 self.log.info("exiting eventloop")
264 264
265 265 def start(self):
266 266 """register dispatchers for streams"""
267 267 self.shell.exit_now = False
268 268 if self.control_stream:
269 269 self.control_stream.on_recv(self.dispatch_control, copy=False)
270 270
271 271 def make_dispatcher(stream):
272 272 def dispatcher(msg):
273 273 return self.dispatch_shell(stream, msg)
274 274 return dispatcher
275 275
276 276 for s in self.shell_streams:
277 277 s.on_recv(make_dispatcher(s), copy=False)
278 278
279 279 # publish idle status
280 280 self._publish_status('starting')
281 281
282 282 def do_one_iteration(self):
283 283 """step eventloop just once"""
284 284 if self.control_stream:
285 285 self.control_stream.flush()
286 286 for stream in self.shell_streams:
287 287 # handle at most one request per iteration
288 288 stream.flush(zmq.POLLIN, 1)
289 289 stream.flush(zmq.POLLOUT)
290 290
291 291
292 292 def record_ports(self, ports):
293 293 """Record the ports that this kernel is using.
294 294
295 295 The creator of the Kernel instance must call this methods if they
296 296 want the :meth:`connect_request` method to return the port numbers.
297 297 """
298 298 self._recorded_ports = ports
299 299
300 300 #---------------------------------------------------------------------------
301 301 # Kernel request handlers
302 302 #---------------------------------------------------------------------------
303 303
304 304 def _make_metadata(self, other=None):
305 305 """init metadata dict, for execute/apply_reply"""
306 306 new_md = {
307 307 'dependencies_met' : True,
308 308 'engine' : self.ident,
309 309 'started': datetime.now(),
310 310 }
311 311 if other:
312 312 new_md.update(other)
313 313 return new_md
314 314
315 315 def _publish_pyin(self, code, parent, execution_count):
316 316 """Publish the code request on the pyin stream."""
317 317
318 318 self.session.send(self.iopub_socket, u'pyin',
319 319 {u'code':code, u'execution_count': execution_count},
320 320 parent=parent, ident=self._topic('pyin')
321 321 )
322 322
323 323 def _publish_status(self, status, parent=None):
324 324 """send status (busy/idle) on IOPub"""
325 325 self.session.send(self.iopub_socket,
326 326 u'status',
327 327 {u'execution_state': status},
328 328 parent=parent,
329 329 ident=self._topic('status'),
330 330 )
331 331
332 332
333 333 def execute_request(self, stream, ident, parent):
334 334 """handle an execute_request"""
335 335
336 336 self._publish_status(u'busy', parent)
337 337
338 338 try:
339 339 content = parent[u'content']
340 340 code = content[u'code']
341 341 silent = content[u'silent']
342 342 store_history = content.get(u'store_history', not silent)
343 343 except:
344 344 self.log.error("Got bad msg: ")
345 345 self.log.error("%s", parent)
346 346 return
347 347
348 348 md = self._make_metadata(parent['metadata'])
349 349
350 350 shell = self.shell # we'll need this a lot here
351 351
352 352 # Replace raw_input. Note that is not sufficient to replace
353 353 # raw_input in the user namespace.
354 354 if content.get('allow_stdin', False):
355 355 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
356 356 else:
357 357 raw_input = lambda prompt='' : self._no_raw_input()
358 358
359 359 if py3compat.PY3:
360 360 self._sys_raw_input = __builtin__.input
361 361 __builtin__.input = raw_input
362 362 else:
363 363 self._sys_raw_input = __builtin__.raw_input
364 364 __builtin__.raw_input = raw_input
365 365
366 366 # Set the parent message of the display hook and out streams.
367 367 shell.displayhook.set_parent(parent)
368 368 shell.display_pub.set_parent(parent)
369 369 shell.data_pub.set_parent(parent)
370 370 try:
371 371 sys.stdout.set_parent(parent)
372 372 except AttributeError:
373 373 pass
374 374 try:
375 375 sys.stderr.set_parent(parent)
376 376 except AttributeError:
377 377 pass
378 378
379 379 # Re-broadcast our input for the benefit of listening clients, and
380 380 # start computing output
381 381 if not silent:
382 382 self._publish_pyin(code, parent, shell.execution_count)
383 383
384 384 reply_content = {}
385 385 try:
386 386 # FIXME: the shell calls the exception handler itself.
387 387 shell.run_cell(code, store_history=store_history, silent=silent)
388 388 except:
389 389 status = u'error'
390 390 # FIXME: this code right now isn't being used yet by default,
391 391 # because the run_cell() call above directly fires off exception
392 392 # reporting. This code, therefore, is only active in the scenario
393 393 # where runlines itself has an unhandled exception. We need to
394 394 # uniformize this, for all exception construction to come from a
395 395 # single location in the codbase.
396 396 etype, evalue, tb = sys.exc_info()
397 397 tb_list = traceback.format_exception(etype, evalue, tb)
398 398 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
399 399 else:
400 400 status = u'ok'
401 401 finally:
402 402 # Restore raw_input.
403 403 if py3compat.PY3:
404 404 __builtin__.input = self._sys_raw_input
405 405 else:
406 406 __builtin__.raw_input = self._sys_raw_input
407 407
408 408 reply_content[u'status'] = status
409 409
410 410 # Return the execution counter so clients can display prompts
411 411 reply_content['execution_count'] = shell.execution_count - 1
412 412
413 413 # FIXME - fish exception info out of shell, possibly left there by
414 414 # runlines. We'll need to clean up this logic later.
415 415 if shell._reply_content is not None:
416 416 reply_content.update(shell._reply_content)
417 417 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
418 418 reply_content['engine_info'] = e_info
419 419 # reset after use
420 420 shell._reply_content = None
421 421
422 422 if 'traceback' in reply_content:
423 423 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
424 424
425 425
426 426 # At this point, we can tell whether the main code execution succeeded
427 427 # or not. If it did, we proceed to evaluate user_variables/expressions
428 428 if reply_content['status'] == 'ok':
429 429 reply_content[u'user_variables'] = \
430 430 shell.user_variables(content.get(u'user_variables', []))
431 431 reply_content[u'user_expressions'] = \
432 432 shell.user_expressions(content.get(u'user_expressions', {}))
433 433 else:
434 434 # If there was an error, don't even try to compute variables or
435 435 # expressions
436 436 reply_content[u'user_variables'] = {}
437 437 reply_content[u'user_expressions'] = {}
438 438
439 439 # Payloads should be retrieved regardless of outcome, so we can both
440 440 # recover partial output (that could have been generated early in a
441 441 # block, before an error) and clear the payload system always.
442 442 reply_content[u'payload'] = shell.payload_manager.read_payload()
443 443 # Be agressive about clearing the payload because we don't want
444 444 # it to sit in memory until the next execute_request comes in.
445 445 shell.payload_manager.clear_payload()
446 446
447 447 # Flush output before sending the reply.
448 448 sys.stdout.flush()
449 449 sys.stderr.flush()
450 450 # FIXME: on rare occasions, the flush doesn't seem to make it to the
451 451 # clients... This seems to mitigate the problem, but we definitely need
452 452 # to better understand what's going on.
453 453 if self._execute_sleep:
454 454 time.sleep(self._execute_sleep)
455 455
456 456 # Send the reply.
457 457 reply_content = json_clean(reply_content)
458 458
459 459 md['status'] = reply_content['status']
460 460 if reply_content['status'] == 'error' and \
461 461 reply_content['ename'] == 'UnmetDependency':
462 462 md['dependencies_met'] = False
463 463
464 464 reply_msg = self.session.send(stream, u'execute_reply',
465 465 reply_content, parent, metadata=md,
466 466 ident=ident)
467 467
468 468 self.log.debug("%s", reply_msg)
469 469
470 470 if not silent and reply_msg['content']['status'] == u'error':
471 471 self._abort_queues()
472 472
473 473 self._publish_status(u'idle', parent)
474 474
475 475 def complete_request(self, stream, ident, parent):
476 476 txt, matches = self._complete(parent)
477 477 matches = {'matches' : matches,
478 478 'matched_text' : txt,
479 479 'status' : 'ok'}
480 480 matches = json_clean(matches)
481 481 completion_msg = self.session.send(stream, 'complete_reply',
482 482 matches, parent, ident)
483 483 self.log.debug("%s", completion_msg)
484 484
485 485 def object_info_request(self, stream, ident, parent):
486 486 content = parent['content']
487 487 object_info = self.shell.object_inspect(content['oname'],
488 488 detail_level = content.get('detail_level', 0)
489 489 )
490 490 # Before we send this object over, we scrub it for JSON usage
491 491 oinfo = json_clean(object_info)
492 492 msg = self.session.send(stream, 'object_info_reply',
493 493 oinfo, parent, ident)
494 494 self.log.debug("%s", msg)
495 495
496 496 def history_request(self, stream, ident, parent):
497 497 # We need to pull these out, as passing **kwargs doesn't work with
498 498 # unicode keys before Python 2.6.5.
499 499 hist_access_type = parent['content']['hist_access_type']
500 500 raw = parent['content']['raw']
501 501 output = parent['content']['output']
502 502 if hist_access_type == 'tail':
503 503 n = parent['content']['n']
504 504 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
505 505 include_latest=True)
506 506
507 507 elif hist_access_type == 'range':
508 508 session = parent['content']['session']
509 509 start = parent['content']['start']
510 510 stop = parent['content']['stop']
511 511 hist = self.shell.history_manager.get_range(session, start, stop,
512 512 raw=raw, output=output)
513 513
514 514 elif hist_access_type == 'search':
515 515 n = parent['content'].get('n')
516 516 unique = parent['content'].get('unique', False)
517 517 pattern = parent['content']['pattern']
518 518 hist = self.shell.history_manager.search(
519 519 pattern, raw=raw, output=output, n=n, unique=unique)
520 520
521 521 else:
522 522 hist = []
523 523 hist = list(hist)
524 524 content = {'history' : hist}
525 525 content = json_clean(content)
526 526 msg = self.session.send(stream, 'history_reply',
527 527 content, parent, ident)
528 528 self.log.debug("Sending history reply with %i entries", len(hist))
529 529
530 530 def connect_request(self, stream, ident, parent):
531 531 if self._recorded_ports is not None:
532 532 content = self._recorded_ports.copy()
533 533 else:
534 534 content = {}
535 535 msg = self.session.send(stream, 'connect_reply',
536 536 content, parent, ident)
537 537 self.log.debug("%s", msg)
538 538
539 539 def kernel_info_request(self, stream, ident, parent):
540 540 vinfo = {
541 541 'protocol_version': protocol_version,
542 542 'ipython_version': ipython_version,
543 543 'language_version': language_version,
544 544 'language': 'python',
545 545 }
546 546 msg = self.session.send(stream, 'kernel_info_reply',
547 547 vinfo, parent, ident)
548 548 self.log.debug("%s", msg)
549 549
550 550 def shutdown_request(self, stream, ident, parent):
551 551 self.shell.exit_now = True
552 552 content = dict(status='ok')
553 553 content.update(parent['content'])
554 554 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
555 555 # same content, but different msg_id for broadcasting on IOPub
556 556 self._shutdown_message = self.session.msg(u'shutdown_reply',
557 557 content, parent
558 558 )
559 559
560 560 self._at_shutdown()
561 561 # call sys.exit after a short delay
562 562 loop = ioloop.IOLoop.instance()
563 563 loop.add_timeout(time.time()+0.1, loop.stop)
564 564
565 565 #---------------------------------------------------------------------------
566 566 # Engine methods
567 567 #---------------------------------------------------------------------------
568 568
569 569 def apply_request(self, stream, ident, parent):
570 570 try:
571 571 content = parent[u'content']
572 572 bufs = parent[u'buffers']
573 573 msg_id = parent['header']['msg_id']
574 574 except:
575 575 self.log.error("Got bad msg: %s", parent, exc_info=True)
576 576 return
577 577
578 578 self._publish_status(u'busy', parent)
579 579
580 580 # Set the parent message of the display hook and out streams.
581 581 shell = self.shell
582 582 shell.displayhook.set_parent(parent)
583 583 shell.display_pub.set_parent(parent)
584 584 shell.data_pub.set_parent(parent)
585 585 try:
586 586 sys.stdout.set_parent(parent)
587 587 except AttributeError:
588 588 pass
589 589 try:
590 590 sys.stderr.set_parent(parent)
591 591 except AttributeError:
592 592 pass
593 593
594 594 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
595 595 # self.iopub_socket.send(pyin_msg)
596 596 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
597 597 md = self._make_metadata(parent['metadata'])
598 598 try:
599 599 working = shell.user_ns
600 600
601 601 prefix = "_"+str(msg_id).replace("-","")+"_"
602 602
603 603 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
604 604
605 605 fname = getattr(f, '__name__', 'f')
606 606
607 607 fname = prefix+"f"
608 608 argname = prefix+"args"
609 609 kwargname = prefix+"kwargs"
610 610 resultname = prefix+"result"
611 611
612 612 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
613 613 # print ns
614 614 working.update(ns)
615 615 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
616 616 try:
617 617 exec code in shell.user_global_ns, shell.user_ns
618 618 result = working.get(resultname)
619 619 finally:
620 620 for key in ns.iterkeys():
621 621 working.pop(key)
622 622
623 623 result_buf = serialize_object(result,
624 624 buffer_threshold=self.session.buffer_threshold,
625 625 item_threshold=self.session.item_threshold,
626 626 )
627 627
628 628 except:
629 629 # invoke IPython traceback formatting
630 630 shell.showtraceback()
631 631 # FIXME - fish exception info out of shell, possibly left there by
632 632 # run_code. We'll need to clean up this logic later.
633 633 reply_content = {}
634 634 if shell._reply_content is not None:
635 635 reply_content.update(shell._reply_content)
636 636 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
637 637 reply_content['engine_info'] = e_info
638 638 # reset after use
639 639 shell._reply_content = None
640 640
641 641 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
642 642 ident=self._topic('pyerr'))
643 643 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
644 644 result_buf = []
645 645
646 646 if reply_content['ename'] == 'UnmetDependency':
647 647 md['dependencies_met'] = False
648 648 else:
649 649 reply_content = {'status' : 'ok'}
650 650
651 651 # put 'ok'/'error' status in header, for scheduler introspection:
652 652 md['status'] = reply_content['status']
653 653
654 654 # flush i/o
655 655 sys.stdout.flush()
656 656 sys.stderr.flush()
657 657
658 658 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
659 659 parent=parent, ident=ident,buffers=result_buf, metadata=md)
660 660
661 661 self._publish_status(u'idle', parent)
662 662
663 663 #---------------------------------------------------------------------------
664 664 # Control messages
665 665 #---------------------------------------------------------------------------
666 666
667 667 def abort_request(self, stream, ident, parent):
668 668 """abort a specifig msg by id"""
669 669 msg_ids = parent['content'].get('msg_ids', None)
670 670 if isinstance(msg_ids, basestring):
671 671 msg_ids = [msg_ids]
672 672 if not msg_ids:
673 673 self.abort_queues()
674 674 for mid in msg_ids:
675 675 self.aborted.add(str(mid))
676 676
677 677 content = dict(status='ok')
678 678 reply_msg = self.session.send(stream, 'abort_reply', content=content,
679 679 parent=parent, ident=ident)
680 680 self.log.debug("%s", reply_msg)
681 681
682 682 def clear_request(self, stream, idents, parent):
683 683 """Clear our namespace."""
684 684 self.shell.reset(False)
685 685 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
686 686 content = dict(status='ok'))
687 687
688 688
689 689 #---------------------------------------------------------------------------
690 690 # Protected interface
691 691 #---------------------------------------------------------------------------
692 692
693 693 def _wrap_exception(self, method=None):
694 694 # import here, because _wrap_exception is only used in parallel,
695 695 # and parallel has higher min pyzmq version
696 696 from IPython.parallel.error import wrap_exception
697 697 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
698 698 content = wrap_exception(e_info)
699 699 return content
700 700
701 701 def _topic(self, topic):
702 702 """prefixed topic for IOPub messages"""
703 703 if self.int_id >= 0:
704 704 base = "engine.%i" % self.int_id
705 705 else:
706 706 base = "kernel.%s" % self.ident
707 707
708 708 return py3compat.cast_bytes("%s.%s" % (base, topic))
709 709
710 710 def _abort_queues(self):
711 711 for stream in self.shell_streams:
712 712 if stream:
713 713 self._abort_queue(stream)
714 714
715 715 def _abort_queue(self, stream):
716 716 poller = zmq.Poller()
717 717 poller.register(stream.socket, zmq.POLLIN)
718 718 while True:
719 719 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
720 720 if msg is None:
721 721 return
722 722
723 723 self.log.info("Aborting:")
724 724 self.log.info("%s", msg)
725 725 msg_type = msg['header']['msg_type']
726 726 reply_type = msg_type.split('_')[0] + '_reply'
727 727
728 728 status = {'status' : 'aborted'}
729 729 md = {'engine' : self.ident}
730 730 md.update(status)
731 731 reply_msg = self.session.send(stream, reply_type, metadata=md,
732 732 content=status, parent=msg, ident=idents)
733 733 self.log.debug("%s", reply_msg)
734 734 # We need to wait a bit for requests to come in. This can probably
735 735 # be set shorter for true asynchronous clients.
736 736 poller.poll(50)
737 737
738 738
739 739 def _no_raw_input(self):
740 740 """Raise StdinNotImplentedError if active frontend doesn't support
741 741 stdin."""
742 742 raise StdinNotImplementedError("raw_input was called, but this "
743 743 "frontend does not support stdin.")
744 744
745 745 def _raw_input(self, prompt, ident, parent):
746 746 # Flush output before making the request.
747 747 sys.stderr.flush()
748 748 sys.stdout.flush()
749 749 # flush the stdin socket, to purge stale replies
750 750 while True:
751 751 try:
752 752 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
753 753 except zmq.ZMQError as e:
754 754 if e.errno == zmq.EAGAIN:
755 755 break
756 756 else:
757 757 raise
758 758
759 759 # Send the input request.
760 760 content = json_clean(dict(prompt=prompt))
761 761 self.session.send(self.stdin_socket, u'input_request', content, parent,
762 762 ident=ident)
763 763
764 764 # Await a response.
765 765 while True:
766 766 try:
767 767 ident, reply = self.session.recv(self.stdin_socket, 0)
768 768 except Exception:
769 769 self.log.warn("Invalid Message:", exc_info=True)
770 770 else:
771 771 break
772 772 try:
773 value = reply['content']['value']
773 value = py3compat.unicode_to_str(reply['content']['value'])
774 774 except:
775 775 self.log.error("Got bad raw_input reply: ")
776 776 self.log.error("%s", parent)
777 777 value = ''
778 778 if value == '\x04':
779 779 # EOF
780 780 raise EOFError
781 781 return value
782 782
783 783 def _complete(self, msg):
784 784 c = msg['content']
785 785 try:
786 786 cpos = int(c['cursor_pos'])
787 787 except:
788 788 # If we don't get something that we can convert to an integer, at
789 789 # least attempt the completion guessing the cursor is at the end of
790 790 # the text, if there's any, and otherwise of the line
791 791 cpos = len(c['text'])
792 792 if cpos==0:
793 793 cpos = len(c['line'])
794 794 return self.shell.complete(c['text'], c['line'], cpos)
795 795
796 796 def _at_shutdown(self):
797 797 """Actions taken at shutdown by the kernel, called by python's atexit.
798 798 """
799 799 # io.rprint("Kernel at_shutdown") # dbg
800 800 if self._shutdown_message is not None:
801 801 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
802 802 self.log.debug("%s", self._shutdown_message)
803 803 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
804 804
General Comments 0
You need to be logged in to leave comments. Login now