##// END OF EJS Templates
Be explicit about default value for unique
Takafumi Arakaki -
Show More
@@ -1,780 +1,780 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 7 call set_parent on all the PUB objects with the message about to be executed.
8 8 * Implement random port and security key logic.
9 9 * Implement control messages.
10 10 * Implement event loop and poll version.
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 # Standard library imports
19 19 import __builtin__
20 20 import sys
21 21 import time
22 22 import traceback
23 23 import logging
24 24 import uuid
25 25
26 26 from datetime import datetime
27 27 from signal import (
28 28 signal, getsignal, default_int_handler, SIGINT, SIG_IGN
29 29 )
30 30
31 31 # System library imports
32 32 import zmq
33 33 from zmq.eventloop import ioloop
34 34 from zmq.eventloop.zmqstream import ZMQStream
35 35
36 36 # Local imports
37 37 from IPython.config.configurable import Configurable
38 38 from IPython.core.error import StdinNotImplementedError
39 39 from IPython.core import release
40 40 from IPython.utils import io
41 41 from IPython.utils import py3compat
42 42 from IPython.utils.jsonutil import json_clean
43 43 from IPython.utils.traitlets import (
44 44 Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode,
45 45 Type
46 46 )
47 47
48 48 from serialize import serialize_object, unpack_apply_message
49 49 from session import Session
50 50 from zmqshell import ZMQInteractiveShell
51 51
52 52
53 53 #-----------------------------------------------------------------------------
54 54 # Main kernel class
55 55 #-----------------------------------------------------------------------------
56 56
57 57 protocol_version = list(release.kernel_protocol_version_info)
58 58 ipython_version = list(release.version_info)
59 59 language_version = list(sys.version_info[:3])
60 60
61 61
62 62 class Kernel(Configurable):
63 63
64 64 #---------------------------------------------------------------------------
65 65 # Kernel interface
66 66 #---------------------------------------------------------------------------
67 67
68 68 # attribute to override with a GUI
69 69 eventloop = Any(None)
70 70 def _eventloop_changed(self, name, old, new):
71 71 """schedule call to eventloop from IOLoop"""
72 72 loop = ioloop.IOLoop.instance()
73 73 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
74 74
75 75 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
76 76 shell_class = Type(ZMQInteractiveShell)
77 77
78 78 session = Instance(Session)
79 79 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
80 80 shell_streams = List()
81 81 control_stream = Instance(ZMQStream)
82 82 iopub_socket = Instance(zmq.Socket)
83 83 stdin_socket = Instance(zmq.Socket)
84 84 log = Instance(logging.Logger)
85 85
86 86 user_module = Any()
87 87 def _user_module_changed(self, name, old, new):
88 88 if self.shell is not None:
89 89 self.shell.user_module = new
90 90
91 91 user_ns = Dict(default_value=None)
92 92 def _user_ns_changed(self, name, old, new):
93 93 if self.shell is not None:
94 94 self.shell.user_ns = new
95 95 self.shell.init_user_ns()
96 96
97 97 # identities:
98 98 int_id = Integer(-1)
99 99 ident = Unicode()
100 100
101 101 def _ident_default(self):
102 102 return unicode(uuid.uuid4())
103 103
104 104
105 105 # Private interface
106 106
107 107 # Time to sleep after flushing the stdout/err buffers in each execute
108 108 # cycle. While this introduces a hard limit on the minimal latency of the
109 109 # execute cycle, it helps prevent output synchronization problems for
110 110 # clients.
111 111 # Units are in seconds. The minimum zmq latency on local host is probably
112 112 # ~150 microseconds, set this to 500us for now. We may need to increase it
113 113 # a little if it's not enough after more interactive testing.
114 114 _execute_sleep = Float(0.0005, config=True)
115 115
116 116 # Frequency of the kernel's event loop.
117 117 # Units are in seconds, kernel subclasses for GUI toolkits may need to
118 118 # adapt to milliseconds.
119 119 _poll_interval = Float(0.05, config=True)
120 120
121 121 # If the shutdown was requested over the network, we leave here the
122 122 # necessary reply message so it can be sent by our registered atexit
123 123 # handler. This ensures that the reply is only sent to clients truly at
124 124 # the end of our shutdown process (which happens after the underlying
125 125 # IPython shell's own shutdown).
126 126 _shutdown_message = None
127 127
128 128 # This is a dict of port number that the kernel is listening on. It is set
129 129 # by record_ports and used by connect_request.
130 130 _recorded_ports = Dict()
131 131
132 132 # A reference to the Python builtin 'raw_input' function.
133 133 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
134 134 _sys_raw_input = Any()
135 135
136 136 # set of aborted msg_ids
137 137 aborted = Set()
138 138
139 139
140 140 def __init__(self, **kwargs):
141 141 super(Kernel, self).__init__(**kwargs)
142 142
143 143 # Initialize the InteractiveShell subclass
144 144 self.shell = self.shell_class.instance(config=self.config,
145 145 profile_dir = self.profile_dir,
146 146 user_module = self.user_module,
147 147 user_ns = self.user_ns,
148 148 )
149 149 self.shell.displayhook.session = self.session
150 150 self.shell.displayhook.pub_socket = self.iopub_socket
151 151 self.shell.displayhook.topic = self._topic('pyout')
152 152 self.shell.display_pub.session = self.session
153 153 self.shell.display_pub.pub_socket = self.iopub_socket
154 154 self.shell.data_pub.session = self.session
155 155 self.shell.data_pub.pub_socket = self.iopub_socket
156 156
157 157 # TMP - hack while developing
158 158 self.shell._reply_content = None
159 159
160 160 # Build dict of handlers for message types
161 161 msg_types = [ 'execute_request', 'complete_request',
162 162 'object_info_request', 'history_request',
163 163 'kernel_info_request',
164 164 'connect_request', 'shutdown_request',
165 165 'apply_request',
166 166 ]
167 167 self.shell_handlers = {}
168 168 for msg_type in msg_types:
169 169 self.shell_handlers[msg_type] = getattr(self, msg_type)
170 170
171 171 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
172 172 self.control_handlers = {}
173 173 for msg_type in control_msg_types:
174 174 self.control_handlers[msg_type] = getattr(self, msg_type)
175 175
176 176 def dispatch_control(self, msg):
177 177 """dispatch control requests"""
178 178 idents,msg = self.session.feed_identities(msg, copy=False)
179 179 try:
180 180 msg = self.session.unserialize(msg, content=True, copy=False)
181 181 except:
182 182 self.log.error("Invalid Control Message", exc_info=True)
183 183 return
184 184
185 185 self.log.debug("Control received: %s", msg)
186 186
187 187 header = msg['header']
188 188 msg_id = header['msg_id']
189 189 msg_type = header['msg_type']
190 190
191 191 handler = self.control_handlers.get(msg_type, None)
192 192 if handler is None:
193 193 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
194 194 else:
195 195 try:
196 196 handler(self.control_stream, idents, msg)
197 197 except Exception:
198 198 self.log.error("Exception in control handler:", exc_info=True)
199 199
200 200 def dispatch_shell(self, stream, msg):
201 201 """dispatch shell requests"""
202 202 # flush control requests first
203 203 if self.control_stream:
204 204 self.control_stream.flush()
205 205
206 206 idents,msg = self.session.feed_identities(msg, copy=False)
207 207 try:
208 208 msg = self.session.unserialize(msg, content=True, copy=False)
209 209 except:
210 210 self.log.error("Invalid Message", exc_info=True)
211 211 return
212 212
213 213 header = msg['header']
214 214 msg_id = header['msg_id']
215 215 msg_type = msg['header']['msg_type']
216 216
217 217 # Print some info about this message and leave a '--->' marker, so it's
218 218 # easier to trace visually the message chain when debugging. Each
219 219 # handler prints its message at the end.
220 220 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
221 221 self.log.debug(' Content: %s\n --->\n ', msg['content'])
222 222
223 223 if msg_id in self.aborted:
224 224 self.aborted.remove(msg_id)
225 225 # is it safe to assume a msg_id will not be resubmitted?
226 226 reply_type = msg_type.split('_')[0] + '_reply'
227 227 status = {'status' : 'aborted'}
228 228 md = {'engine' : self.ident}
229 229 md.update(status)
230 230 reply_msg = self.session.send(stream, reply_type, metadata=md,
231 231 content=status, parent=msg, ident=idents)
232 232 return
233 233
234 234 handler = self.shell_handlers.get(msg_type, None)
235 235 if handler is None:
236 236 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
237 237 else:
238 238 # ensure default_int_handler during handler call
239 239 sig = signal(SIGINT, default_int_handler)
240 240 try:
241 241 handler(stream, idents, msg)
242 242 except Exception:
243 243 self.log.error("Exception in message handler:", exc_info=True)
244 244 finally:
245 245 signal(SIGINT, sig)
246 246
247 247 def enter_eventloop(self):
248 248 """enter eventloop"""
249 249 self.log.info("entering eventloop")
250 250 # restore default_int_handler
251 251 signal(SIGINT, default_int_handler)
252 252 while self.eventloop is not None:
253 253 try:
254 254 self.eventloop(self)
255 255 except KeyboardInterrupt:
256 256 # Ctrl-C shouldn't crash the kernel
257 257 self.log.error("KeyboardInterrupt caught in kernel")
258 258 continue
259 259 else:
260 260 # eventloop exited cleanly, this means we should stop (right?)
261 261 self.eventloop = None
262 262 break
263 263 self.log.info("exiting eventloop")
264 264
265 265 def start(self):
266 266 """register dispatchers for streams"""
267 267 self.shell.exit_now = False
268 268 if self.control_stream:
269 269 self.control_stream.on_recv(self.dispatch_control, copy=False)
270 270
271 271 def make_dispatcher(stream):
272 272 def dispatcher(msg):
273 273 return self.dispatch_shell(stream, msg)
274 274 return dispatcher
275 275
276 276 for s in self.shell_streams:
277 277 s.on_recv(make_dispatcher(s), copy=False)
278 278
279 279 def do_one_iteration(self):
280 280 """step eventloop just once"""
281 281 if self.control_stream:
282 282 self.control_stream.flush()
283 283 for stream in self.shell_streams:
284 284 # handle at most one request per iteration
285 285 stream.flush(zmq.POLLIN, 1)
286 286 stream.flush(zmq.POLLOUT)
287 287
288 288
289 289 def record_ports(self, ports):
290 290 """Record the ports that this kernel is using.
291 291
292 292 The creator of the Kernel instance must call this methods if they
293 293 want the :meth:`connect_request` method to return the port numbers.
294 294 """
295 295 self._recorded_ports = ports
296 296
297 297 #---------------------------------------------------------------------------
298 298 # Kernel request handlers
299 299 #---------------------------------------------------------------------------
300 300
301 301 def _make_metadata(self, other=None):
302 302 """init metadata dict, for execute/apply_reply"""
303 303 new_md = {
304 304 'dependencies_met' : True,
305 305 'engine' : self.ident,
306 306 'started': datetime.now(),
307 307 }
308 308 if other:
309 309 new_md.update(other)
310 310 return new_md
311 311
312 312 def _publish_pyin(self, code, parent, execution_count):
313 313 """Publish the code request on the pyin stream."""
314 314
315 315 self.session.send(self.iopub_socket, u'pyin',
316 316 {u'code':code, u'execution_count': execution_count},
317 317 parent=parent, ident=self._topic('pyin')
318 318 )
319 319
320 320 def _publish_status(self, status, parent=None):
321 321 """send status (busy/idle) on IOPub"""
322 322 self.session.send(self.iopub_socket,
323 323 u'status',
324 324 {u'execution_state': status},
325 325 parent=parent,
326 326 ident=self._topic('status'),
327 327 )
328 328
329 329
330 330 def execute_request(self, stream, ident, parent):
331 331 """handle an execute_request"""
332 332
333 333 self._publish_status(u'busy', parent)
334 334
335 335 try:
336 336 content = parent[u'content']
337 337 code = content[u'code']
338 338 silent = content[u'silent']
339 339 store_history = content.get(u'store_history', not silent)
340 340 except:
341 341 self.log.error("Got bad msg: ")
342 342 self.log.error("%s", parent)
343 343 return
344 344
345 345 md = self._make_metadata(parent['metadata'])
346 346
347 347 shell = self.shell # we'll need this a lot here
348 348
349 349 # Replace raw_input. Note that is not sufficient to replace
350 350 # raw_input in the user namespace.
351 351 if content.get('allow_stdin', False):
352 352 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
353 353 else:
354 354 raw_input = lambda prompt='' : self._no_raw_input()
355 355
356 356 if py3compat.PY3:
357 357 self._sys_raw_input = __builtin__.input
358 358 __builtin__.input = raw_input
359 359 else:
360 360 self._sys_raw_input = __builtin__.raw_input
361 361 __builtin__.raw_input = raw_input
362 362
363 363 # Set the parent message of the display hook and out streams.
364 364 shell.displayhook.set_parent(parent)
365 365 shell.display_pub.set_parent(parent)
366 366 shell.data_pub.set_parent(parent)
367 367 sys.stdout.set_parent(parent)
368 368 sys.stderr.set_parent(parent)
369 369
370 370 # Re-broadcast our input for the benefit of listening clients, and
371 371 # start computing output
372 372 if not silent:
373 373 self._publish_pyin(code, parent, shell.execution_count)
374 374
375 375 reply_content = {}
376 376 try:
377 377 # FIXME: the shell calls the exception handler itself.
378 378 shell.run_cell(code, store_history=store_history, silent=silent)
379 379 except:
380 380 status = u'error'
381 381 # FIXME: this code right now isn't being used yet by default,
382 382 # because the run_cell() call above directly fires off exception
383 383 # reporting. This code, therefore, is only active in the scenario
384 384 # where runlines itself has an unhandled exception. We need to
385 385 # uniformize this, for all exception construction to come from a
386 386 # single location in the codbase.
387 387 etype, evalue, tb = sys.exc_info()
388 388 tb_list = traceback.format_exception(etype, evalue, tb)
389 389 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
390 390 else:
391 391 status = u'ok'
392 392 finally:
393 393 # Restore raw_input.
394 394 if py3compat.PY3:
395 395 __builtin__.input = self._sys_raw_input
396 396 else:
397 397 __builtin__.raw_input = self._sys_raw_input
398 398
399 399 reply_content[u'status'] = status
400 400
401 401 # Return the execution counter so clients can display prompts
402 402 reply_content['execution_count'] = shell.execution_count - 1
403 403
404 404 # FIXME - fish exception info out of shell, possibly left there by
405 405 # runlines. We'll need to clean up this logic later.
406 406 if shell._reply_content is not None:
407 407 reply_content.update(shell._reply_content)
408 408 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
409 409 reply_content['engine_info'] = e_info
410 410 # reset after use
411 411 shell._reply_content = None
412 412
413 413 if 'traceback' in reply_content:
414 414 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
415 415
416 416
417 417 # At this point, we can tell whether the main code execution succeeded
418 418 # or not. If it did, we proceed to evaluate user_variables/expressions
419 419 if reply_content['status'] == 'ok':
420 420 reply_content[u'user_variables'] = \
421 421 shell.user_variables(content.get(u'user_variables', []))
422 422 reply_content[u'user_expressions'] = \
423 423 shell.user_expressions(content.get(u'user_expressions', {}))
424 424 else:
425 425 # If there was an error, don't even try to compute variables or
426 426 # expressions
427 427 reply_content[u'user_variables'] = {}
428 428 reply_content[u'user_expressions'] = {}
429 429
430 430 # Payloads should be retrieved regardless of outcome, so we can both
431 431 # recover partial output (that could have been generated early in a
432 432 # block, before an error) and clear the payload system always.
433 433 reply_content[u'payload'] = shell.payload_manager.read_payload()
434 434 # Be agressive about clearing the payload because we don't want
435 435 # it to sit in memory until the next execute_request comes in.
436 436 shell.payload_manager.clear_payload()
437 437
438 438 # Flush output before sending the reply.
439 439 sys.stdout.flush()
440 440 sys.stderr.flush()
441 441 # FIXME: on rare occasions, the flush doesn't seem to make it to the
442 442 # clients... This seems to mitigate the problem, but we definitely need
443 443 # to better understand what's going on.
444 444 if self._execute_sleep:
445 445 time.sleep(self._execute_sleep)
446 446
447 447 # Send the reply.
448 448 reply_content = json_clean(reply_content)
449 449
450 450 md['status'] = reply_content['status']
451 451 if reply_content['status'] == 'error' and \
452 452 reply_content['ename'] == 'UnmetDependency':
453 453 md['dependencies_met'] = False
454 454
455 455 reply_msg = self.session.send(stream, u'execute_reply',
456 456 reply_content, parent, metadata=md,
457 457 ident=ident)
458 458
459 459 self.log.debug("%s", reply_msg)
460 460
461 461 if not silent and reply_msg['content']['status'] == u'error':
462 462 self._abort_queues()
463 463
464 464 self._publish_status(u'idle', parent)
465 465
466 466 def complete_request(self, stream, ident, parent):
467 467 txt, matches = self._complete(parent)
468 468 matches = {'matches' : matches,
469 469 'matched_text' : txt,
470 470 'status' : 'ok'}
471 471 matches = json_clean(matches)
472 472 completion_msg = self.session.send(stream, 'complete_reply',
473 473 matches, parent, ident)
474 474 self.log.debug("%s", completion_msg)
475 475
476 476 def object_info_request(self, stream, ident, parent):
477 477 content = parent['content']
478 478 object_info = self.shell.object_inspect(content['oname'],
479 479 detail_level = content.get('detail_level', 0)
480 480 )
481 481 # Before we send this object over, we scrub it for JSON usage
482 482 oinfo = json_clean(object_info)
483 483 msg = self.session.send(stream, 'object_info_reply',
484 484 oinfo, parent, ident)
485 485 self.log.debug("%s", msg)
486 486
487 487 def history_request(self, stream, ident, parent):
488 488 # We need to pull these out, as passing **kwargs doesn't work with
489 489 # unicode keys before Python 2.6.5.
490 490 hist_access_type = parent['content']['hist_access_type']
491 491 raw = parent['content']['raw']
492 492 output = parent['content']['output']
493 493 if hist_access_type == 'tail':
494 494 n = parent['content']['n']
495 495 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
496 496 include_latest=True)
497 497
498 498 elif hist_access_type == 'range':
499 499 session = parent['content']['session']
500 500 start = parent['content']['start']
501 501 stop = parent['content']['stop']
502 502 hist = self.shell.history_manager.get_range(session, start, stop,
503 503 raw=raw, output=output)
504 504
505 505 elif hist_access_type == 'search':
506 506 n = parent['content'].get('n')
507 unique = parent['content'].get('unique')
507 unique = parent['content'].get('unique', False)
508 508 pattern = parent['content']['pattern']
509 509 hist = self.shell.history_manager.search(
510 510 pattern, raw=raw, output=output, n=n, unique=unique)
511 511
512 512 else:
513 513 hist = []
514 514 hist = list(hist)
515 515 content = {'history' : hist}
516 516 content = json_clean(content)
517 517 msg = self.session.send(stream, 'history_reply',
518 518 content, parent, ident)
519 519 self.log.debug("Sending history reply with %i entries", len(hist))
520 520
521 521 def connect_request(self, stream, ident, parent):
522 522 if self._recorded_ports is not None:
523 523 content = self._recorded_ports.copy()
524 524 else:
525 525 content = {}
526 526 msg = self.session.send(stream, 'connect_reply',
527 527 content, parent, ident)
528 528 self.log.debug("%s", msg)
529 529
530 530 def kernel_info_request(self, stream, ident, parent):
531 531 vinfo = {
532 532 'protocol_version': protocol_version,
533 533 'ipython_version': ipython_version,
534 534 'language_version': language_version,
535 535 'language': 'python',
536 536 }
537 537 msg = self.session.send(stream, 'kernel_info_reply',
538 538 vinfo, parent, ident)
539 539 self.log.debug("%s", msg)
540 540
541 541 def shutdown_request(self, stream, ident, parent):
542 542 self.shell.exit_now = True
543 543 content = dict(status='ok')
544 544 content.update(parent['content'])
545 545 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
546 546 # same content, but different msg_id for broadcasting on IOPub
547 547 self._shutdown_message = self.session.msg(u'shutdown_reply',
548 548 content, parent
549 549 )
550 550
551 551 self._at_shutdown()
552 552 # call sys.exit after a short delay
553 553 loop = ioloop.IOLoop.instance()
554 554 loop.add_timeout(time.time()+0.1, loop.stop)
555 555
556 556 #---------------------------------------------------------------------------
557 557 # Engine methods
558 558 #---------------------------------------------------------------------------
559 559
560 560 def apply_request(self, stream, ident, parent):
561 561 try:
562 562 content = parent[u'content']
563 563 bufs = parent[u'buffers']
564 564 msg_id = parent['header']['msg_id']
565 565 except:
566 566 self.log.error("Got bad msg: %s", parent, exc_info=True)
567 567 return
568 568
569 569 self._publish_status(u'busy', parent)
570 570
571 571 # Set the parent message of the display hook and out streams.
572 572 shell = self.shell
573 573 shell.displayhook.set_parent(parent)
574 574 shell.display_pub.set_parent(parent)
575 575 shell.data_pub.set_parent(parent)
576 576 sys.stdout.set_parent(parent)
577 577 sys.stderr.set_parent(parent)
578 578
579 579 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
580 580 # self.iopub_socket.send(pyin_msg)
581 581 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
582 582 md = self._make_metadata(parent['metadata'])
583 583 try:
584 584 working = shell.user_ns
585 585
586 586 prefix = "_"+str(msg_id).replace("-","")+"_"
587 587
588 588 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
589 589
590 590 fname = getattr(f, '__name__', 'f')
591 591
592 592 fname = prefix+"f"
593 593 argname = prefix+"args"
594 594 kwargname = prefix+"kwargs"
595 595 resultname = prefix+"result"
596 596
597 597 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
598 598 # print ns
599 599 working.update(ns)
600 600 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
601 601 try:
602 602 exec code in shell.user_global_ns, shell.user_ns
603 603 result = working.get(resultname)
604 604 finally:
605 605 for key in ns.iterkeys():
606 606 working.pop(key)
607 607
608 608 result_buf = serialize_object(result,
609 609 buffer_threshold=self.session.buffer_threshold,
610 610 item_threshold=self.session.item_threshold,
611 611 )
612 612
613 613 except:
614 614 # invoke IPython traceback formatting
615 615 shell.showtraceback()
616 616 # FIXME - fish exception info out of shell, possibly left there by
617 617 # run_code. We'll need to clean up this logic later.
618 618 reply_content = {}
619 619 if shell._reply_content is not None:
620 620 reply_content.update(shell._reply_content)
621 621 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
622 622 reply_content['engine_info'] = e_info
623 623 # reset after use
624 624 shell._reply_content = None
625 625
626 626 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
627 627 ident=self._topic('pyerr'))
628 628 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
629 629 result_buf = []
630 630
631 631 if reply_content['ename'] == 'UnmetDependency':
632 632 md['dependencies_met'] = False
633 633 else:
634 634 reply_content = {'status' : 'ok'}
635 635
636 636 # put 'ok'/'error' status in header, for scheduler introspection:
637 637 md['status'] = reply_content['status']
638 638
639 639 # flush i/o
640 640 sys.stdout.flush()
641 641 sys.stderr.flush()
642 642
643 643 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
644 644 parent=parent, ident=ident,buffers=result_buf, metadata=md)
645 645
646 646 self._publish_status(u'idle', parent)
647 647
648 648 #---------------------------------------------------------------------------
649 649 # Control messages
650 650 #---------------------------------------------------------------------------
651 651
652 652 def abort_request(self, stream, ident, parent):
653 653 """abort a specifig msg by id"""
654 654 msg_ids = parent['content'].get('msg_ids', None)
655 655 if isinstance(msg_ids, basestring):
656 656 msg_ids = [msg_ids]
657 657 if not msg_ids:
658 658 self.abort_queues()
659 659 for mid in msg_ids:
660 660 self.aborted.add(str(mid))
661 661
662 662 content = dict(status='ok')
663 663 reply_msg = self.session.send(stream, 'abort_reply', content=content,
664 664 parent=parent, ident=ident)
665 665 self.log.debug("%s", reply_msg)
666 666
667 667 def clear_request(self, stream, idents, parent):
668 668 """Clear our namespace."""
669 669 self.shell.reset(False)
670 670 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
671 671 content = dict(status='ok'))
672 672
673 673
674 674 #---------------------------------------------------------------------------
675 675 # Protected interface
676 676 #---------------------------------------------------------------------------
677 677
678 678 def _wrap_exception(self, method=None):
679 679 # import here, because _wrap_exception is only used in parallel,
680 680 # and parallel has higher min pyzmq version
681 681 from IPython.parallel.error import wrap_exception
682 682 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
683 683 content = wrap_exception(e_info)
684 684 return content
685 685
686 686 def _topic(self, topic):
687 687 """prefixed topic for IOPub messages"""
688 688 if self.int_id >= 0:
689 689 base = "engine.%i" % self.int_id
690 690 else:
691 691 base = "kernel.%s" % self.ident
692 692
693 693 return py3compat.cast_bytes("%s.%s" % (base, topic))
694 694
695 695 def _abort_queues(self):
696 696 for stream in self.shell_streams:
697 697 if stream:
698 698 self._abort_queue(stream)
699 699
700 700 def _abort_queue(self, stream):
701 701 poller = zmq.Poller()
702 702 poller.register(stream.socket, zmq.POLLIN)
703 703 while True:
704 704 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
705 705 if msg is None:
706 706 return
707 707
708 708 self.log.info("Aborting:")
709 709 self.log.info("%s", msg)
710 710 msg_type = msg['header']['msg_type']
711 711 reply_type = msg_type.split('_')[0] + '_reply'
712 712
713 713 status = {'status' : 'aborted'}
714 714 md = {'engine' : self.ident}
715 715 md.update(status)
716 716 reply_msg = self.session.send(stream, reply_type, metadata=md,
717 717 content=status, parent=msg, ident=idents)
718 718 self.log.debug("%s", reply_msg)
719 719 # We need to wait a bit for requests to come in. This can probably
720 720 # be set shorter for true asynchronous clients.
721 721 poller.poll(50)
722 722
723 723
724 724 def _no_raw_input(self):
725 725 """Raise StdinNotImplentedError if active frontend doesn't support
726 726 stdin."""
727 727 raise StdinNotImplementedError("raw_input was called, but this "
728 728 "frontend does not support stdin.")
729 729
730 730 def _raw_input(self, prompt, ident, parent):
731 731 # Flush output before making the request.
732 732 sys.stderr.flush()
733 733 sys.stdout.flush()
734 734
735 735 # Send the input request.
736 736 content = json_clean(dict(prompt=prompt))
737 737 self.session.send(self.stdin_socket, u'input_request', content, parent,
738 738 ident=ident)
739 739
740 740 # Await a response.
741 741 while True:
742 742 try:
743 743 ident, reply = self.session.recv(self.stdin_socket, 0)
744 744 except Exception:
745 745 self.log.warn("Invalid Message:", exc_info=True)
746 746 else:
747 747 break
748 748 try:
749 749 value = reply['content']['value']
750 750 except:
751 751 self.log.error("Got bad raw_input reply: ")
752 752 self.log.error("%s", parent)
753 753 value = ''
754 754 if value == '\x04':
755 755 # EOF
756 756 raise EOFError
757 757 return value
758 758
759 759 def _complete(self, msg):
760 760 c = msg['content']
761 761 try:
762 762 cpos = int(c['cursor_pos'])
763 763 except:
764 764 # If we don't get something that we can convert to an integer, at
765 765 # least attempt the completion guessing the cursor is at the end of
766 766 # the text, if there's any, and otherwise of the line
767 767 cpos = len(c['text'])
768 768 if cpos==0:
769 769 cpos = len(c['line'])
770 770 return self.shell.complete(c['text'], c['line'], cpos)
771 771
772 772 def _at_shutdown(self):
773 773 """Actions taken at shutdown by the kernel, called by python's atexit.
774 774 """
775 775 # io.rprint("Kernel at_shutdown") # dbg
776 776 if self._shutdown_message is not None:
777 777 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
778 778 self.log.debug("%s", self._shutdown_message)
779 779 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
780 780
General Comments 0
You need to be logged in to leave comments. Login now