##// END OF EJS Templates
use shell namespaces in apply_request
MinRK -
Show More
@@ -1,880 +1,880 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 7 call set_parent on all the PUB objects with the message about to be executed.
8 8 * Implement random port and security key logic.
9 9 * Implement control messages.
10 10 * Implement event loop and poll version.
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 # Standard library imports
19 19 import __builtin__
20 20 import atexit
21 21 import sys
22 22 import time
23 23 import traceback
24 24 import logging
25 25 import uuid
26 26
27 27 from datetime import datetime
28 28 from signal import (
29 29 signal, getsignal, default_int_handler, SIGINT, SIG_IGN
30 30 )
31 31
32 32 # System library imports
33 33 import zmq
34 34 from zmq.eventloop import ioloop
35 35 from zmq.eventloop.zmqstream import ZMQStream
36 36
37 37 # Local imports
38 38 from IPython.core import pylabtools
39 39 from IPython.config.configurable import Configurable
40 40 from IPython.config.application import boolean_flag, catch_config_error
41 41 from IPython.core.application import ProfileDir
42 42 from IPython.core.error import StdinNotImplementedError
43 43 from IPython.core.shellapp import (
44 44 InteractiveShellApp, shell_flags, shell_aliases
45 45 )
46 46 from IPython.utils import io
47 47 from IPython.utils import py3compat
48 48 from IPython.utils.frame import extract_module_locals
49 49 from IPython.utils.jsonutil import json_clean
50 50 from IPython.utils.traitlets import (
51 51 Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode
52 52 )
53 53
54 54 from entry_point import base_launch_kernel
55 55 from kernelapp import KernelApp, kernel_flags, kernel_aliases
56 56 from serialize import serialize_object, unpack_apply_message
57 57 from session import Session, Message
58 58 from zmqshell import ZMQInteractiveShell
59 59
60 60
61 61 #-----------------------------------------------------------------------------
62 62 # Main kernel class
63 63 #-----------------------------------------------------------------------------
64 64
65 65 class Kernel(Configurable):
66 66
67 67 #---------------------------------------------------------------------------
68 68 # Kernel interface
69 69 #---------------------------------------------------------------------------
70 70
71 71 # attribute to override with a GUI
72 72 eventloop = Any(None)
73 73 def _eventloop_changed(self, name, old, new):
74 74 """schedule call to eventloop from IOLoop"""
75 75 loop = ioloop.IOLoop.instance()
76 76 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
77 77
78 78 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
79 79 session = Instance(Session)
80 80 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
81 81 shell_streams = List()
82 82 control_stream = Instance(ZMQStream)
83 83 iopub_stream = Instance(ZMQStream)
84 84 stdin_socket = Instance(zmq.Socket)
85 85 log = Instance(logging.Logger)
86 86
87 87 user_module = Instance('types.ModuleType')
88 88 def _user_module_changed(self, name, old, new):
89 89 if self.shell is not None:
90 90 self.shell.user_module = new
91 91
92 92 user_ns = Dict(default_value=None)
93 93 def _user_ns_changed(self, name, old, new):
94 94 if self.shell is not None:
95 95 self.shell.user_ns = new
96 96 self.shell.init_user_ns()
97 97
98 98 # identities:
99 99 int_id = Integer(-1)
100 100 ident = Unicode()
101 101
102 102 def _ident_default(self):
103 103 return unicode(uuid.uuid4())
104 104
105 105
106 106 # Private interface
107 107
108 108 # Time to sleep after flushing the stdout/err buffers in each execute
109 109 # cycle. While this introduces a hard limit on the minimal latency of the
110 110 # execute cycle, it helps prevent output synchronization problems for
111 111 # clients.
112 112 # Units are in seconds. The minimum zmq latency on local host is probably
113 113 # ~150 microseconds, set this to 500us for now. We may need to increase it
114 114 # a little if it's not enough after more interactive testing.
115 115 _execute_sleep = Float(0.0005, config=True)
116 116
117 117 # Frequency of the kernel's event loop.
118 118 # Units are in seconds, kernel subclasses for GUI toolkits may need to
119 119 # adapt to milliseconds.
120 120 _poll_interval = Float(0.05, config=True)
121 121
122 122 # If the shutdown was requested over the network, we leave here the
123 123 # necessary reply message so it can be sent by our registered atexit
124 124 # handler. This ensures that the reply is only sent to clients truly at
125 125 # the end of our shutdown process (which happens after the underlying
126 126 # IPython shell's own shutdown).
127 127 _shutdown_message = None
128 128
129 129 # This is a dict of port number that the kernel is listening on. It is set
130 130 # by record_ports and used by connect_request.
131 131 _recorded_ports = Dict()
132 132
133 133 # set of aborted msg_ids
134 134 aborted = Set()
135 135
136 136
137 137 def __init__(self, **kwargs):
138 138 super(Kernel, self).__init__(**kwargs)
139 139
140 140 # Initialize the InteractiveShell subclass
141 141 self.shell = ZMQInteractiveShell.instance(config=self.config,
142 142 profile_dir = self.profile_dir,
143 143 user_module = self.user_module,
144 144 user_ns = self.user_ns,
145 145 )
146 146 self.shell.displayhook.session = self.session
147 147 self.shell.displayhook.pub_socket = self.iopub_stream.socket
148 148 self.shell.display_pub.session = self.session
149 149 self.shell.display_pub.pub_socket = self.iopub_stream.socket
150 150
151 151 # TMP - hack while developing
152 152 self.shell._reply_content = None
153 153
154 154 # Build dict of handlers for message types
155 155 msg_types = [ 'execute_request', 'complete_request',
156 156 'object_info_request', 'history_request',
157 157 'connect_request', 'shutdown_request',
158 158 'apply_request',
159 159 ]
160 160 self.shell_handlers = {}
161 161 for msg_type in msg_types:
162 162 self.shell_handlers[msg_type] = getattr(self, msg_type)
163 163
164 164 control_msg_types = [ 'clear_request', 'abort_request' ]
165 165 self.control_handlers = {}
166 166 for msg_type in control_msg_types:
167 167 self.control_handlers[msg_type] = getattr(self, msg_type)
168 168
169 169 def dispatch_control(self, msg):
170 170 """dispatch control requests"""
171 171 idents,msg = self.session.feed_identities(msg, copy=False)
172 172 try:
173 173 msg = self.session.unserialize(msg, content=True, copy=False)
174 174 except:
175 175 self.log.error("Invalid Control Message", exc_info=True)
176 176 return
177 177
178 178 self.log.debug("Control received: %s", msg)
179 179
180 180 header = msg['header']
181 181 msg_id = header['msg_id']
182 182 msg_type = header['msg_type']
183 183
184 184 handler = self.control_handlers.get(msg_type, None)
185 185 if handler is None:
186 186 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
187 187 else:
188 188 try:
189 189 handler(self.control_stream, idents, msg)
190 190 except Exception:
191 191 self.log.error("Exception in control handler:", exc_info=True)
192 192
193 193 def dispatch_shell(self, stream, msg):
194 194 """dispatch shell requests"""
195 195 # flush control requests first
196 196 if self.control_stream:
197 197 self.control_stream.flush()
198 198
199 199 idents,msg = self.session.feed_identities(msg, copy=False)
200 200 try:
201 201 msg = self.session.unserialize(msg, content=True, copy=False)
202 202 except:
203 203 self.log.error("Invalid Message", exc_info=True)
204 204 return
205 205
206 206 header = msg['header']
207 207 msg_id = header['msg_id']
208 208 msg_type = msg['header']['msg_type']
209 209
210 210 # Print some info about this message and leave a '--->' marker, so it's
211 211 # easier to trace visually the message chain when debugging. Each
212 212 # handler prints its message at the end.
213 213 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
214 214 self.log.debug(' Content: %s\n --->\n ', msg['content'])
215 215
216 216 if msg_id in self.aborted:
217 217 self.aborted.remove(msg_id)
218 218 # is it safe to assume a msg_id will not be resubmitted?
219 219 reply_type = msg_type.split('_')[0] + '_reply'
220 220 status = {'status' : 'aborted'}
221 221 reply_msg = self.session.send(stream, reply_type, subheader=status,
222 222 content=status, parent=msg, ident=idents)
223 223 return
224 224
225 225 handler = self.shell_handlers.get(msg_type, None)
226 226 if handler is None:
227 227 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
228 228 else:
229 229 # ensure default_int_handler during handler call
230 230 sig = signal(SIGINT, default_int_handler)
231 231 try:
232 232 handler(stream, idents, msg)
233 233 except Exception:
234 234 self.log.error("Exception in message handler:", exc_info=True)
235 235 finally:
236 236 signal(SIGINT, sig)
237 237
238 238 def enter_eventloop(self):
239 239 """enter eventloop"""
240 240 self.log.critical("entering eventloop")
241 241 # restore default_int_handler
242 242 signal(SIGINT, default_int_handler)
243 243 self.eventloop(self)
244 244 self.log.critical("exiting eventloop")
245 245 # if eventloop exits, IOLoop should stop
246 246 ioloop.IOLoop.instance().stop()
247 247
248 248 def start(self):
249 249 """register dispatchers for streams"""
250 250 if self.control_stream:
251 251 self.control_stream.on_recv(self.dispatch_control, copy=False)
252 252
253 253 def make_dispatcher(stream):
254 254 def dispatcher(msg):
255 255 return self.dispatch_shell(stream, msg)
256 256 return dispatcher
257 257
258 258 for s in self.shell_streams:
259 259 s.on_recv(make_dispatcher(s), copy=False)
260 260
261 261 def do_one_iteration(self):
262 262 """step eventloop just once"""
263 263 if self.control_stream:
264 264 self.control_stream.flush()
265 265 for stream in self.shell_streams:
266 266 # handle at most one request per iteration
267 267 stream.flush(zmq.POLLIN, 1)
268 268 stream.flush(zmq.POLLOUT)
269 269 self.iopub_stream.flush()
270 270
271 271
272 272 def record_ports(self, ports):
273 273 """Record the ports that this kernel is using.
274 274
275 275 The creator of the Kernel instance must call this methods if they
276 276 want the :meth:`connect_request` method to return the port numbers.
277 277 """
278 278 self._recorded_ports = ports
279 279
280 280 #---------------------------------------------------------------------------
281 281 # Kernel request handlers
282 282 #---------------------------------------------------------------------------
283 283
284 284 def _publish_pyin(self, code, parent, execution_count):
285 285 """Publish the code request on the pyin stream."""
286 286
287 287 self.session.send(self.iopub_stream, u'pyin',
288 288 {u'code':code, u'execution_count': execution_count},
289 289 parent=parent, ident=self._topic('pyin')
290 290 )
291 291
292 292 def execute_request(self, stream, ident, parent):
293 293
294 294 self.session.send(self.iopub_stream,
295 295 u'status',
296 296 {u'execution_state':u'busy'},
297 297 parent=parent,
298 298 ident=self._topic('status'),
299 299 )
300 300
301 301 try:
302 302 content = parent[u'content']
303 303 code = content[u'code']
304 304 silent = content[u'silent']
305 305 except:
306 306 self.log.error("Got bad msg: ")
307 307 self.log.error("%s", parent)
308 308 return
309 309
310 310 shell = self.shell # we'll need this a lot here
311 311
312 312 # Replace raw_input. Note that is not sufficient to replace
313 313 # raw_input in the user namespace.
314 314 if content.get('allow_stdin', False):
315 315 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
316 316 else:
317 317 raw_input = lambda prompt='' : self._no_raw_input()
318 318
319 319 if py3compat.PY3:
320 320 __builtin__.input = raw_input
321 321 else:
322 322 __builtin__.raw_input = raw_input
323 323
324 324 # Set the parent message of the display hook and out streams.
325 325 shell.displayhook.set_parent(parent)
326 326 shell.display_pub.set_parent(parent)
327 327 sys.stdout.set_parent(parent)
328 328 sys.stderr.set_parent(parent)
329 329
330 330 # Re-broadcast our input for the benefit of listening clients, and
331 331 # start computing output
332 332 if not silent:
333 333 self._publish_pyin(code, parent, shell.execution_count)
334 334
335 335 reply_content = {}
336 336 try:
337 337 if silent:
338 338 # run_code uses 'exec' mode, so no displayhook will fire, and it
339 339 # doesn't call logging or history manipulations. Print
340 340 # statements in that code will obviously still execute.
341 341 shell.run_code(code)
342 342 else:
343 343 # FIXME: the shell calls the exception handler itself.
344 344 shell.run_cell(code, store_history=True)
345 345 except:
346 346 status = u'error'
347 347 # FIXME: this code right now isn't being used yet by default,
348 348 # because the run_cell() call above directly fires off exception
349 349 # reporting. This code, therefore, is only active in the scenario
350 350 # where runlines itself has an unhandled exception. We need to
351 351 # uniformize this, for all exception construction to come from a
352 352 # single location in the codbase.
353 353 etype, evalue, tb = sys.exc_info()
354 354 tb_list = traceback.format_exception(etype, evalue, tb)
355 355 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
356 356 else:
357 357 status = u'ok'
358 358
359 359 reply_content[u'status'] = status
360 360
361 361 # Return the execution counter so clients can display prompts
362 362 reply_content['execution_count'] = shell.execution_count -1
363 363
364 364 # FIXME - fish exception info out of shell, possibly left there by
365 365 # runlines. We'll need to clean up this logic later.
366 366 if shell._reply_content is not None:
367 367 reply_content.update(shell._reply_content)
368 368 # reset after use
369 369 shell._reply_content = None
370 370
371 371 # At this point, we can tell whether the main code execution succeeded
372 372 # or not. If it did, we proceed to evaluate user_variables/expressions
373 373 if reply_content['status'] == 'ok':
374 374 reply_content[u'user_variables'] = \
375 375 shell.user_variables(content.get(u'user_variables', []))
376 376 reply_content[u'user_expressions'] = \
377 377 shell.user_expressions(content.get(u'user_expressions', {}))
378 378 else:
379 379 # If there was an error, don't even try to compute variables or
380 380 # expressions
381 381 reply_content[u'user_variables'] = {}
382 382 reply_content[u'user_expressions'] = {}
383 383
384 384 # Payloads should be retrieved regardless of outcome, so we can both
385 385 # recover partial output (that could have been generated early in a
386 386 # block, before an error) and clear the payload system always.
387 387 reply_content[u'payload'] = shell.payload_manager.read_payload()
388 388 # Be agressive about clearing the payload because we don't want
389 389 # it to sit in memory until the next execute_request comes in.
390 390 shell.payload_manager.clear_payload()
391 391
392 392 # Flush output before sending the reply.
393 393 sys.stdout.flush()
394 394 sys.stderr.flush()
395 395 # FIXME: on rare occasions, the flush doesn't seem to make it to the
396 396 # clients... This seems to mitigate the problem, but we definitely need
397 397 # to better understand what's going on.
398 398 if self._execute_sleep:
399 399 time.sleep(self._execute_sleep)
400 400
401 401 # Send the reply.
402 402 reply_content = json_clean(reply_content)
403 403 reply_msg = self.session.send(stream, u'execute_reply',
404 404 reply_content, parent, ident=ident)
405 405 self.log.debug("%s", reply_msg)
406 406
407 407 if reply_msg['content']['status'] == u'error':
408 408 self._abort_queues()
409 409
410 410 self.session.send(self.iopub_stream,
411 411 u'status',
412 412 {u'execution_state':u'idle'},
413 413 parent=parent,
414 414 ident=self._topic('status'))
415 415
416 416 def complete_request(self, stream, ident, parent):
417 417 txt, matches = self._complete(parent)
418 418 matches = {'matches' : matches,
419 419 'matched_text' : txt,
420 420 'status' : 'ok'}
421 421 matches = json_clean(matches)
422 422 completion_msg = self.session.send(stream, 'complete_reply',
423 423 matches, parent, ident)
424 424 self.log.debug("%s", completion_msg)
425 425
426 426 def object_info_request(self, stream, ident, parent):
427 427 content = parent['content']
428 428 object_info = self.shell.object_inspect(content['oname'],
429 429 detail_level = content.get('detail_level', 0)
430 430 )
431 431 # Before we send this object over, we scrub it for JSON usage
432 432 oinfo = json_clean(object_info)
433 433 msg = self.session.send(stream, 'object_info_reply',
434 434 oinfo, parent, ident)
435 435 self.log.debug("%s", msg)
436 436
437 437 def history_request(self, stream, ident, parent):
438 438 # We need to pull these out, as passing **kwargs doesn't work with
439 439 # unicode keys before Python 2.6.5.
440 440 hist_access_type = parent['content']['hist_access_type']
441 441 raw = parent['content']['raw']
442 442 output = parent['content']['output']
443 443 if hist_access_type == 'tail':
444 444 n = parent['content']['n']
445 445 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
446 446 include_latest=True)
447 447
448 448 elif hist_access_type == 'range':
449 449 session = parent['content']['session']
450 450 start = parent['content']['start']
451 451 stop = parent['content']['stop']
452 452 hist = self.shell.history_manager.get_range(session, start, stop,
453 453 raw=raw, output=output)
454 454
455 455 elif hist_access_type == 'search':
456 456 pattern = parent['content']['pattern']
457 457 hist = self.shell.history_manager.search(pattern, raw=raw,
458 458 output=output)
459 459
460 460 else:
461 461 hist = []
462 462 hist = list(hist)
463 463 content = {'history' : hist}
464 464 content = json_clean(content)
465 465 msg = self.session.send(stream, 'history_reply',
466 466 content, parent, ident)
467 467 self.log.debug("Sending history reply with %i entries", len(hist))
468 468
469 469 def connect_request(self, stream, ident, parent):
470 470 if self._recorded_ports is not None:
471 471 content = self._recorded_ports.copy()
472 472 else:
473 473 content = {}
474 474 msg = self.session.send(stream, 'connect_reply',
475 475 content, parent, ident)
476 476 self.log.debug("%s", msg)
477 477
478 478 def shutdown_request(self, stream, ident, parent):
479 479 self.shell.exit_now = True
480 480 self._shutdown_message = self.session.msg(u'shutdown_reply',
481 481 parent['content'], parent
482 482 )
483 483 # self.session.send(stream, self._shutdown_message, ident=ident)
484 484
485 485 self._at_shutdown()
486 486 # call sys.exit after a short delay
487 487 ioloop.IOLoop.instance().add_timeout(time.time()+0.05, lambda : sys.exit(0))
488 488
489 489 #---------------------------------------------------------------------------
490 490 # Engine methods
491 491 #---------------------------------------------------------------------------
492 492
493 493 def apply_request(self, stream, ident, parent):
494 494 try:
495 495 content = parent[u'content']
496 496 bufs = parent[u'buffers']
497 497 msg_id = parent['header']['msg_id']
498 # bound = parent['header'].get('bound', False)
499 498 except:
500 499 self.log.error("Got bad msg: %s", parent, exc_info=True)
501 500 return
502 501 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
503 502 # self.iopub_stream.send(pyin_msg)
504 503 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
505 504 sub = {'dependencies_met' : True, 'engine' : self.ident,
506 505 'started': datetime.now()}
507 506 try:
508 507 # allow for not overriding displayhook
509 508 if hasattr(sys.displayhook, 'set_parent'):
510 509 sys.displayhook.set_parent(parent)
511 510 sys.stdout.set_parent(parent)
512 511 sys.stderr.set_parent(parent)
513 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
514 working = self.user_ns
515 # suffix =
512 working = self.shell.user_ns
513
516 514 prefix = "_"+str(msg_id).replace("-","")+"_"
517 515
518 516 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
519 # if bound:
520 # bound_ns = Namespace(working)
521 # args = [bound_ns]+list(args)
522 517
523 518 fname = getattr(f, '__name__', 'f')
524 519
525 520 fname = prefix+"f"
526 521 argname = prefix+"args"
527 522 kwargname = prefix+"kwargs"
528 523 resultname = prefix+"result"
529 524
530 525 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
531 526 # print ns
532 527 working.update(ns)
533 528 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
534 529 try:
535 exec code in working,working
530 exec code in self.shell.user_global_ns, self.shell.user_ns
536 531 result = working.get(resultname)
537 532 finally:
538 533 for key in ns.iterkeys():
539 534 working.pop(key)
540 # if bound:
541 # working.update(bound_ns)
542 535
543 536 packed_result,buf = serialize_object(result)
544 537 result_buf = [packed_result]+buf
545 538 except:
546 539 exc_content = self._wrap_exception('apply')
547 540 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
548 541 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
549 542 ident=self._topic('pyerr'))
550 543 reply_content = exc_content
551 544 result_buf = []
552 545
553 546 if exc_content['ename'] == 'UnmetDependency':
554 547 sub['dependencies_met'] = False
555 548 else:
556 549 reply_content = {'status' : 'ok'}
557 550
558 551 # put 'ok'/'error' status in header, for scheduler introspection:
559 552 sub['status'] = reply_content['status']
560 553
561 554 # flush i/o
562 555 sys.stdout.flush()
563 556 sys.stderr.flush()
564 557
565 558 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
566 559 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
567 560
568 561 #---------------------------------------------------------------------------
569 562 # Control messages
570 563 #---------------------------------------------------------------------------
571 564
572 565 def abort_request(self, stream, ident, parent):
573 566 """abort a specifig msg by id"""
574 567 msg_ids = parent['content'].get('msg_ids', None)
575 568 if isinstance(msg_ids, basestring):
576 569 msg_ids = [msg_ids]
577 570 if not msg_ids:
578 571 self.abort_queues()
579 572 for mid in msg_ids:
580 573 self.aborted.add(str(mid))
581 574
582 575 content = dict(status='ok')
583 576 reply_msg = self.session.send(stream, 'abort_reply', content=content,
584 577 parent=parent, ident=ident)
585 578 self.log.debug("%s", reply_msg)
586 579
587 580 def clear_request(self, stream, idents, parent):
588 581 """Clear our namespace."""
589 self.user_ns = {}
582 self.shell.reset(False)
590 583 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
591 584 content = dict(status='ok'))
592 self._initial_exec_lines()
593 585
594 586
595 587 #---------------------------------------------------------------------------
596 588 # Protected interface
597 589 #---------------------------------------------------------------------------
598 590
599 591
592 def _wrap_exception(self, method=None):
593 # import here, because _wrap_exception is only used in parallel,
594 # and parallel has higher min pyzmq version
595 from IPython.parallel.error import wrap_exception
596 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
597 content = wrap_exception(e_info)
598 return content
599
600 600 def _topic(self, topic):
601 601 """prefixed topic for IOPub messages"""
602 602 if self.int_id >= 0:
603 603 base = "engine.%i" % self.int_id
604 604 else:
605 605 base = "kernel.%s" % self.ident
606 606
607 607 return py3compat.cast_bytes("%s.%s" % (base, topic))
608 608
609 609 def _abort_queues(self):
610 610 for stream in self.shell_streams:
611 611 if stream:
612 612 self._abort_queue(stream)
613 613
614 614 def _abort_queue(self, stream):
615 615 while True:
616 616 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
617 617 if msg is None:
618 618 return
619 619
620 620 self.log.info("Aborting:")
621 621 self.log.info("%s", msg)
622 622 msg_type = msg['header']['msg_type']
623 623 reply_type = msg_type.split('_')[0] + '_reply'
624 624 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
625 625 # self.reply_stream.send(ident,zmq.SNDMORE)
626 626 # self.reply_stream.send_json(reply_msg)
627 627 reply_msg = self.session.send(stream, reply_type,
628 628 content={'status' : 'aborted'}, parent=msg, ident=idents)
629 629 self.log.debug("%s", reply_msg)
630 630 # We need to wait a bit for requests to come in. This can probably
631 631 # be set shorter for true asynchronous clients.
632 632 time.sleep(0.05)
633 633
634 634
635 635 def _no_raw_input(self):
636 636 """Raise StdinNotImplentedError if active frontend doesn't support
637 637 stdin."""
638 638 raise StdinNotImplementedError("raw_input was called, but this "
639 639 "frontend does not support stdin.")
640 640
641 641 def _raw_input(self, prompt, ident, parent):
642 642 # Flush output before making the request.
643 643 sys.stderr.flush()
644 644 sys.stdout.flush()
645 645
646 646 # Send the input request.
647 647 content = json_clean(dict(prompt=prompt))
648 648 self.session.send(self.stdin_socket, u'input_request', content, parent,
649 649 ident=ident)
650 650
651 651 # Await a response.
652 652 while True:
653 653 try:
654 654 ident, reply = self.session.recv(self.stdin_socket, 0)
655 655 except Exception:
656 656 self.log.warn("Invalid Message:", exc_info=True)
657 657 else:
658 658 break
659 659 try:
660 660 value = reply['content']['value']
661 661 except:
662 662 self.log.error("Got bad raw_input reply: ")
663 663 self.log.error("%s", parent)
664 664 value = ''
665 665 if value == '\x04':
666 666 # EOF
667 667 raise EOFError
668 668 return value
669 669
670 670 def _complete(self, msg):
671 671 c = msg['content']
672 672 try:
673 673 cpos = int(c['cursor_pos'])
674 674 except:
675 675 # If we don't get something that we can convert to an integer, at
676 676 # least attempt the completion guessing the cursor is at the end of
677 677 # the text, if there's any, and otherwise of the line
678 678 cpos = len(c['text'])
679 679 if cpos==0:
680 680 cpos = len(c['line'])
681 681 return self.shell.complete(c['text'], c['line'], cpos)
682 682
683 683 def _object_info(self, context):
684 684 symbol, leftover = self._symbol_from_context(context)
685 685 if symbol is not None and not leftover:
686 686 doc = getattr(symbol, '__doc__', '')
687 687 else:
688 688 doc = ''
689 689 object_info = dict(docstring = doc)
690 690 return object_info
691 691
692 692 def _symbol_from_context(self, context):
693 693 if not context:
694 694 return None, context
695 695
696 696 base_symbol_string = context[0]
697 697 symbol = self.shell.user_ns.get(base_symbol_string, None)
698 698 if symbol is None:
699 699 symbol = __builtin__.__dict__.get(base_symbol_string, None)
700 700 if symbol is None:
701 701 return None, context
702 702
703 703 context = context[1:]
704 704 for i, name in enumerate(context):
705 705 new_symbol = getattr(symbol, name, None)
706 706 if new_symbol is None:
707 707 return symbol, context[i:]
708 708 else:
709 709 symbol = new_symbol
710 710
711 711 return symbol, []
712 712
713 713 def _at_shutdown(self):
714 714 """Actions taken at shutdown by the kernel, called by python's atexit.
715 715 """
716 716 # io.rprint("Kernel at_shutdown") # dbg
717 717 if self._shutdown_message is not None:
718 718 self.session.send(self.iopub_stream, self._shutdown_message, ident=self._topic('shutdown'))
719 719 self.log.debug("%s", self._shutdown_message)
720 720 [ s.flush(zmq.POLLOUT) for s in self.shell_streams + [self.iopub_stream] ]
721 721
722 722 #-----------------------------------------------------------------------------
723 723 # Aliases and Flags for the IPKernelApp
724 724 #-----------------------------------------------------------------------------
725 725
726 726 flags = dict(kernel_flags)
727 727 flags.update(shell_flags)
728 728
729 729 addflag = lambda *args: flags.update(boolean_flag(*args))
730 730
731 731 flags['pylab'] = (
732 732 {'IPKernelApp' : {'pylab' : 'auto'}},
733 733 """Pre-load matplotlib and numpy for interactive use with
734 734 the default matplotlib backend."""
735 735 )
736 736
737 737 aliases = dict(kernel_aliases)
738 738 aliases.update(shell_aliases)
739 739
740 740 # it's possible we don't want short aliases for *all* of these:
741 741 aliases.update(dict(
742 742 pylab='IPKernelApp.pylab',
743 743 ))
744 744
745 745 #-----------------------------------------------------------------------------
746 746 # The IPKernelApp class
747 747 #-----------------------------------------------------------------------------
748 748
749 749 class IPKernelApp(KernelApp, InteractiveShellApp):
750 750 name = 'ipkernel'
751 751
752 752 aliases = Dict(aliases)
753 753 flags = Dict(flags)
754 754 classes = [Kernel, ZMQInteractiveShell, ProfileDir, Session]
755 755
756 756 # configurables
757 757 pylab = CaselessStrEnum(['tk', 'qt', 'wx', 'gtk', 'osx', 'inline', 'auto'],
758 758 config=True,
759 759 help="""Pre-load matplotlib and numpy for interactive use,
760 760 selecting a particular matplotlib backend and loop integration.
761 761 """
762 762 )
763 763
764 764 @catch_config_error
765 765 def initialize(self, argv=None):
766 766 super(IPKernelApp, self).initialize(argv)
767 767 self.init_path()
768 768 self.init_shell()
769 769 self.init_extensions()
770 770 self.init_code()
771 771
772 772 def init_kernel(self):
773 773
774 774 shell_stream = ZMQStream(self.shell_socket)
775 775 iopub_stream = ZMQStream(self.iopub_socket)
776 776
777 777 kernel = Kernel(config=self.config, session=self.session,
778 778 shell_streams=[shell_stream],
779 779 iopub_stream=iopub_stream,
780 780 stdin_socket=self.stdin_socket,
781 781 log=self.log,
782 782 profile_dir=self.profile_dir,
783 783 )
784 784 self.kernel = kernel
785 785 kernel.record_ports(self.ports)
786 786 shell = kernel.shell
787 787 if self.pylab:
788 788 try:
789 789 gui, backend = pylabtools.find_gui_and_backend(self.pylab)
790 790 shell.enable_pylab(gui, import_all=self.pylab_import_all)
791 791 except Exception:
792 792 self.log.error("Pylab initialization failed", exc_info=True)
793 793 # print exception straight to stdout, because normally
794 794 # _showtraceback associates the reply with an execution,
795 795 # which means frontends will never draw it, as this exception
796 796 # is not associated with any execute request.
797 797
798 798 # replace pyerr-sending traceback with stdout
799 799 _showtraceback = shell._showtraceback
800 800 def print_tb(etype, evalue, stb):
801 801 print ("Error initializing pylab, pylab mode will not "
802 802 "be active", file=io.stderr)
803 803 print (shell.InteractiveTB.stb2text(stb), file=io.stdout)
804 804 shell._showtraceback = print_tb
805 805
806 806 # send the traceback over stdout
807 807 shell.showtraceback(tb_offset=0)
808 808
809 809 # restore proper _showtraceback method
810 810 shell._showtraceback = _showtraceback
811 811
812 812
813 813 def init_shell(self):
814 814 self.shell = self.kernel.shell
815 815 self.shell.configurables.append(self)
816 816
817 817
818 818 #-----------------------------------------------------------------------------
819 819 # Kernel main and launch functions
820 820 #-----------------------------------------------------------------------------
821 821
822 822 def launch_kernel(*args, **kwargs):
823 823 """Launches a localhost IPython kernel, binding to the specified ports.
824 824
825 825 This function simply calls entry_point.base_launch_kernel with the right
826 826 first command to start an ipkernel. See base_launch_kernel for arguments.
827 827
828 828 Returns
829 829 -------
830 830 A tuple of form:
831 831 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
832 832 where kernel_process is a Popen object and the ports are integers.
833 833 """
834 834 return base_launch_kernel('from IPython.zmq.ipkernel import main; main()',
835 835 *args, **kwargs)
836 836
837 837
838 838 def embed_kernel(module=None, local_ns=None, **kwargs):
839 839 """Embed and start an IPython kernel in a given scope.
840 840
841 841 Parameters
842 842 ----------
843 843 module : ModuleType, optional
844 844 The module to load into IPython globals (default: caller)
845 845 local_ns : dict, optional
846 846 The namespace to load into IPython user namespace (default: caller)
847 847
848 848 kwargs : various, optional
849 849 Further keyword args are relayed to the KernelApp constructor,
850 850 allowing configuration of the Kernel. Will only have an effect
851 851 on the first embed_kernel call for a given process.
852 852
853 853 """
854 854 # get the app if it exists, or set it up if it doesn't
855 855 if IPKernelApp.initialized():
856 856 app = IPKernelApp.instance()
857 857 else:
858 858 app = IPKernelApp.instance(**kwargs)
859 859 app.initialize([])
860 860
861 861 # load the calling scope if not given
862 862 (caller_module, caller_locals) = extract_module_locals(1)
863 863 if module is None:
864 864 module = caller_module
865 865 if local_ns is None:
866 866 local_ns = caller_locals
867 867
868 868 app.kernel.user_module = module
869 869 app.kernel.user_ns = local_ns
870 870 app.start()
871 871
872 872 def main():
873 873 """Run an IPKernel as an application"""
874 874 app = IPKernelApp.instance()
875 875 app.initialize()
876 876 app.start()
877 877
878 878
879 879 if __name__ == '__main__':
880 880 main()
General Comments 0
You need to be logged in to leave comments. Login now