##// END OF EJS Templates
BUG: raw_input logic incorrect for in-process terminal frontend.
epatters -
Show More
@@ -1,75 +1,87 b''
1 1 """ Implements a fully blocking kernel manager.
2 2
3 3 Useful for test suites and blocking terminal interfaces.
4 4 """
5 5 #-----------------------------------------------------------------------------
6 6 # Copyright (C) 2012 The IPython Development Team
7 7 #
8 8 # Distributed under the terms of the BSD License. The full license is in
9 9 # the file COPYING.txt, distributed as part of this software.
10 10 #-----------------------------------------------------------------------------
11 11
12 12 #-----------------------------------------------------------------------------
13 13 # Imports
14 14 #-----------------------------------------------------------------------------
15 15 from __future__ import print_function
16 16
17 17 # Standard library imports.
18 18 import Queue
19 19 from threading import Event
20 20
21 21 # Local imports.
22 from IPython.utils.io import raw_print
22 23 from IPython.utils.traitlets import Type
23 24 from kernelmanager import InProcessKernelManager, ShellInProcessChannel, \
24 25 SubInProcessChannel, StdInInProcessChannel
25 26
26 27 #-----------------------------------------------------------------------------
27 28 # Utility classes
28 29 #-----------------------------------------------------------------------------
29 30
30 31 class BlockingChannelMixin(object):
31 32
32 33 def __init__(self, *args, **kwds):
33 34 super(BlockingChannelMixin, self).__init__(*args, **kwds)
34 35 self._in_queue = Queue.Queue()
35 36
36 37 def call_handlers(self, msg):
37 38 self._in_queue.put(msg)
38 39
39 40 def get_msg(self, block=True, timeout=None):
40 41 """ Gets a message if there is one that is ready. """
41 42 return self._in_queue.get(block, timeout)
42 43
43 44 def get_msgs(self):
44 45 """ Get all messages that are currently ready. """
45 46 msgs = []
46 47 while True:
47 48 try:
48 49 msgs.append(self.get_msg(block=False))
49 50 except Queue.Empty:
50 51 break
51 52 return msgs
52 53
53 54 def msg_ready(self):
54 55 """ Is there a message that has been received? """
55 56 return not self._in_queue.empty()
56 57
57 58 #-----------------------------------------------------------------------------
58 59 # Blocking kernel manager
59 60 #-----------------------------------------------------------------------------
60 61
61 62 class BlockingShellInProcessChannel(BlockingChannelMixin, ShellInProcessChannel):
62 63 pass
63 64
64 65 class BlockingSubInProcessChannel(BlockingChannelMixin, SubInProcessChannel):
65 66 pass
66 67
67 68 class BlockingStdInInProcessChannel(BlockingChannelMixin, StdInInProcessChannel):
68 pass
69
70 def call_handlers(self, msg):
71 """ Overridden for the in-process channel.
72
73 This methods simply calls raw_input directly.
74 """
75 msg_type = msg['header']['msg_type']
76 if msg_type == 'input_request':
77 raw_input = self.manager.kernel.sys_raw_input
78 prompt = msg['content']['prompt']
79 raw_print(prompt, end='')
80 self.input(raw_input())
69 81
70 82 class BlockingInProcessKernelManager(InProcessKernelManager):
71 83
72 84 # The classes to use for the various channels.
73 85 shell_channel_class = Type(BlockingShellInProcessChannel)
74 86 sub_channel_class = Type(BlockingSubInProcessChannel)
75 87 stdin_channel_class = Type(BlockingStdInInProcessChannel)
@@ -1,66 +1,83 b''
1 1 #-------------------------------------------------------------------------------
2 2 # Copyright (C) 2012 The IPython Development Team
3 3 #
4 4 # Distributed under the terms of the BSD License. The full license is in
5 5 # the file COPYING, distributed as part of this software.
6 6 #-------------------------------------------------------------------------------
7 7
8 8 #-----------------------------------------------------------------------------
9 9 # Imports
10 10 #-----------------------------------------------------------------------------
11 11 from __future__ import print_function
12 12
13 13 # Standard library imports
14 from StringIO import StringIO
15 import sys
14 16 import unittest
15 17
16 18 # Local imports
17 19 from IPython.inprocess.blockingkernelmanager import \
18 20 BlockingInProcessKernelManager
19 21 from IPython.inprocess.ipkernel import InProcessKernel
20 22 from IPython.utils.io import capture_output
21 23
22 24 #-----------------------------------------------------------------------------
23 25 # Test case
24 26 #-----------------------------------------------------------------------------
25 27
26 28 class InProcessKernelTestCase(unittest.TestCase):
27 29
28 30 def test_pylab(self):
29 31 """ Does pylab work in the in-process kernel?
30 32 """
31 33 km = BlockingInProcessKernelManager()
32 34 km.start_kernel()
33 35 km.shell_channel.execute('%pylab')
34 36 msg = get_stream_message(km)
35 37 self.assert_('Welcome to pylab' in msg['content']['data'])
36 38
39 def test_raw_input(self):
40 """ Does the in-process kernel handle raw_input correctly?
41 """
42 km = BlockingInProcessKernelManager()
43 km.start_kernel()
44
45 io = StringIO('foobar\n')
46 sys_stdin = sys.stdin
47 sys.stdin = io
48 try:
49 km.shell_channel.execute('x = raw_input()')
50 finally:
51 sys.stdin = sys_stdin
52 self.assertEqual(km.kernel.shell.user_ns.get('x'), 'foobar')
53
37 54 def test_stdout(self):
38 55 """ Does the in-process kernel correctly capture IO?
39 56 """
40 57 kernel = InProcessKernel()
41 58
42 59 with capture_output() as io:
43 60 kernel.shell.run_cell('print("foo")')
44 61 self.assertEqual(io.stdout, 'foo\n')
45 62
46 63 km = BlockingInProcessKernelManager(kernel=kernel)
47 64 kernel.frontends.append(km)
48 65 km.shell_channel.execute('print("bar")')
49 66 msg = get_stream_message(km)
50 67 self.assertEqual(msg['content']['data'], 'bar\n')
51 68
52 69 #-----------------------------------------------------------------------------
53 70 # Utility functions
54 71 #-----------------------------------------------------------------------------
55 72
56 73 def get_stream_message(kernel_manager, timeout=5):
57 74 """ Gets a single stream message synchronously from the sub channel.
58 75 """
59 76 while True:
60 77 msg = kernel_manager.sub_channel.get_msg(timeout=timeout)
61 78 if msg['header']['msg_type'] == 'stream':
62 79 return msg
63 80
64 81
65 82 if __name__ == '__main__':
66 83 unittest.main()
@@ -1,903 +1,912 b''
1 1 #!/usr/bin/env python
2 2 """A simple interactive kernel that talks to a frontend over 0MQ.
3 3
4 4 Things to do:
5 5
6 6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 7 call set_parent on all the PUB objects with the message about to be executed.
8 8 * Implement random port and security key logic.
9 9 * Implement control messages.
10 10 * Implement event loop and poll version.
11 11 """
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16 from __future__ import print_function
17 17
18 18 # Standard library imports
19 19 import __builtin__
20 20 import atexit
21 21 import sys
22 22 import time
23 23 import traceback
24 24 import logging
25 25 import uuid
26 26
27 27 from datetime import datetime
28 28 from signal import (
29 29 signal, getsignal, default_int_handler, SIGINT, SIG_IGN
30 30 )
31 31
32 32 # System library imports
33 33 import zmq
34 34 from zmq.eventloop import ioloop
35 35 from zmq.eventloop.zmqstream import ZMQStream
36 36
37 37 # Local imports
38 38 from IPython.config.configurable import Configurable
39 39 from IPython.config.application import boolean_flag, catch_config_error
40 40 from IPython.core.application import ProfileDir
41 41 from IPython.core.error import StdinNotImplementedError
42 42 from IPython.core.shellapp import (
43 43 InteractiveShellApp, shell_flags, shell_aliases
44 44 )
45 45 from IPython.utils import io
46 46 from IPython.utils import py3compat
47 47 from IPython.utils.frame import extract_module_locals
48 48 from IPython.utils.jsonutil import json_clean
49 49 from IPython.utils.traitlets import (
50 50 Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode,
51 51 Type
52 52 )
53 53
54 54 from entry_point import base_launch_kernel
55 55 from kernelapp import KernelApp, kernel_flags, kernel_aliases
56 56 from serialize import serialize_object, unpack_apply_message
57 57 from session import Session, Message
58 58 from zmqshell import ZMQInteractiveShell
59 59
60 60
61 61 #-----------------------------------------------------------------------------
62 62 # Main kernel class
63 63 #-----------------------------------------------------------------------------
64 64
65 65 class Kernel(Configurable):
66 66
67 67 #---------------------------------------------------------------------------
68 68 # Kernel interface
69 69 #---------------------------------------------------------------------------
70 70
71 71 # attribute to override with a GUI
72 72 eventloop = Any(None)
73 73 def _eventloop_changed(self, name, old, new):
74 74 """schedule call to eventloop from IOLoop"""
75 75 loop = ioloop.IOLoop.instance()
76 76 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
77 77
78 78 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
79 79 shell_class = Type(ZMQInteractiveShell)
80 80
81 81 session = Instance(Session)
82 82 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
83 83 shell_streams = List()
84 84 control_stream = Instance(ZMQStream)
85 85 iopub_socket = Instance(zmq.Socket)
86 86 stdin_socket = Instance(zmq.Socket)
87 sys_raw_input = Any()
87 88 log = Instance(logging.Logger)
88 89
89 90 user_module = Any()
90 91 def _user_module_changed(self, name, old, new):
91 92 if self.shell is not None:
92 93 self.shell.user_module = new
93 94
94 95 user_ns = Dict(default_value=None)
95 96 def _user_ns_changed(self, name, old, new):
96 97 if self.shell is not None:
97 98 self.shell.user_ns = new
98 99 self.shell.init_user_ns()
99 100
100 101 # identities:
101 102 int_id = Integer(-1)
102 103 ident = Unicode()
103 104
104 105 def _ident_default(self):
105 106 return unicode(uuid.uuid4())
106 107
107 108
108 109 # Private interface
109 110
110 111 # Time to sleep after flushing the stdout/err buffers in each execute
111 112 # cycle. While this introduces a hard limit on the minimal latency of the
112 113 # execute cycle, it helps prevent output synchronization problems for
113 114 # clients.
114 115 # Units are in seconds. The minimum zmq latency on local host is probably
115 116 # ~150 microseconds, set this to 500us for now. We may need to increase it
116 117 # a little if it's not enough after more interactive testing.
117 118 _execute_sleep = Float(0.0005, config=True)
118 119
119 120 # Frequency of the kernel's event loop.
120 121 # Units are in seconds, kernel subclasses for GUI toolkits may need to
121 122 # adapt to milliseconds.
122 123 _poll_interval = Float(0.05, config=True)
123 124
124 125 # If the shutdown was requested over the network, we leave here the
125 126 # necessary reply message so it can be sent by our registered atexit
126 127 # handler. This ensures that the reply is only sent to clients truly at
127 128 # the end of our shutdown process (which happens after the underlying
128 129 # IPython shell's own shutdown).
129 130 _shutdown_message = None
130 131
131 132 # This is a dict of port number that the kernel is listening on. It is set
132 133 # by record_ports and used by connect_request.
133 134 _recorded_ports = Dict()
134 135
135 136 # set of aborted msg_ids
136 137 aborted = Set()
137 138
138 139
139 140 def __init__(self, **kwargs):
140 141 super(Kernel, self).__init__(**kwargs)
141 142
142 143 # Initialize the InteractiveShell subclass
143 144 self.shell = self.shell_class.instance(config=self.config,
144 145 profile_dir = self.profile_dir,
145 146 user_module = self.user_module,
146 147 user_ns = self.user_ns,
147 148 )
148 149 self.shell.displayhook.session = self.session
149 150 self.shell.displayhook.pub_socket = self.iopub_socket
150 151 self.shell.displayhook.topic = self._topic('pyout')
151 152 self.shell.display_pub.session = self.session
152 153 self.shell.display_pub.pub_socket = self.iopub_socket
153 154 self.shell.data_pub.session = self.session
154 155 self.shell.data_pub.pub_socket = self.iopub_socket
155 156
156 157 # TMP - hack while developing
157 158 self.shell._reply_content = None
158 159
159 160 # Build dict of handlers for message types
160 161 msg_types = [ 'execute_request', 'complete_request',
161 162 'object_info_request', 'history_request',
162 163 'connect_request', 'shutdown_request',
163 164 'apply_request',
164 165 ]
165 166 self.shell_handlers = {}
166 167 for msg_type in msg_types:
167 168 self.shell_handlers[msg_type] = getattr(self, msg_type)
168 169
169 170 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
170 171 self.control_handlers = {}
171 172 for msg_type in control_msg_types:
172 173 self.control_handlers[msg_type] = getattr(self, msg_type)
173 174
174 175 def dispatch_control(self, msg):
175 176 """dispatch control requests"""
176 177 idents,msg = self.session.feed_identities(msg, copy=False)
177 178 try:
178 179 msg = self.session.unserialize(msg, content=True, copy=False)
179 180 except:
180 181 self.log.error("Invalid Control Message", exc_info=True)
181 182 return
182 183
183 184 self.log.debug("Control received: %s", msg)
184 185
185 186 header = msg['header']
186 187 msg_id = header['msg_id']
187 188 msg_type = header['msg_type']
188 189
189 190 handler = self.control_handlers.get(msg_type, None)
190 191 if handler is None:
191 192 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
192 193 else:
193 194 try:
194 195 handler(self.control_stream, idents, msg)
195 196 except Exception:
196 197 self.log.error("Exception in control handler:", exc_info=True)
197 198
198 199 def dispatch_shell(self, stream, msg):
199 200 """dispatch shell requests"""
200 201 # flush control requests first
201 202 if self.control_stream:
202 203 self.control_stream.flush()
203 204
204 205 idents,msg = self.session.feed_identities(msg, copy=False)
205 206 try:
206 207 msg = self.session.unserialize(msg, content=True, copy=False)
207 208 except:
208 209 self.log.error("Invalid Message", exc_info=True)
209 210 return
210 211
211 212 header = msg['header']
212 213 msg_id = header['msg_id']
213 214 msg_type = msg['header']['msg_type']
214 215
215 216 # Print some info about this message and leave a '--->' marker, so it's
216 217 # easier to trace visually the message chain when debugging. Each
217 218 # handler prints its message at the end.
218 219 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
219 220 self.log.debug(' Content: %s\n --->\n ', msg['content'])
220 221
221 222 if msg_id in self.aborted:
222 223 self.aborted.remove(msg_id)
223 224 # is it safe to assume a msg_id will not be resubmitted?
224 225 reply_type = msg_type.split('_')[0] + '_reply'
225 226 status = {'status' : 'aborted'}
226 227 md = {'engine' : self.ident}
227 228 md.update(status)
228 229 reply_msg = self.session.send(stream, reply_type, metadata=md,
229 230 content=status, parent=msg, ident=idents)
230 231 return
231 232
232 233 handler = self.shell_handlers.get(msg_type, None)
233 234 if handler is None:
234 235 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
235 236 else:
236 237 # ensure default_int_handler during handler call
237 238 sig = signal(SIGINT, default_int_handler)
238 239 try:
239 240 handler(stream, idents, msg)
240 241 except Exception:
241 242 self.log.error("Exception in message handler:", exc_info=True)
242 243 finally:
243 244 signal(SIGINT, sig)
244 245
245 246 def enter_eventloop(self):
246 247 """enter eventloop"""
247 248 self.log.info("entering eventloop")
248 249 # restore default_int_handler
249 250 signal(SIGINT, default_int_handler)
250 251 while self.eventloop is not None:
251 252 try:
252 253 self.eventloop(self)
253 254 except KeyboardInterrupt:
254 255 # Ctrl-C shouldn't crash the kernel
255 256 self.log.error("KeyboardInterrupt caught in kernel")
256 257 continue
257 258 else:
258 259 # eventloop exited cleanly, this means we should stop (right?)
259 260 self.eventloop = None
260 261 break
261 262 self.log.info("exiting eventloop")
262 263
263 264 def start(self):
264 265 """register dispatchers for streams"""
265 266 self.shell.exit_now = False
266 267 if self.control_stream:
267 268 self.control_stream.on_recv(self.dispatch_control, copy=False)
268 269
269 270 def make_dispatcher(stream):
270 271 def dispatcher(msg):
271 272 return self.dispatch_shell(stream, msg)
272 273 return dispatcher
273 274
274 275 for s in self.shell_streams:
275 276 s.on_recv(make_dispatcher(s), copy=False)
276 277
277 278 def do_one_iteration(self):
278 279 """step eventloop just once"""
279 280 if self.control_stream:
280 281 self.control_stream.flush()
281 282 for stream in self.shell_streams:
282 283 # handle at most one request per iteration
283 284 stream.flush(zmq.POLLIN, 1)
284 285 stream.flush(zmq.POLLOUT)
285 286
286 287
287 288 def record_ports(self, ports):
288 289 """Record the ports that this kernel is using.
289 290
290 291 The creator of the Kernel instance must call this methods if they
291 292 want the :meth:`connect_request` method to return the port numbers.
292 293 """
293 294 self._recorded_ports = ports
294 295
295 296 #---------------------------------------------------------------------------
296 297 # Kernel request handlers
297 298 #---------------------------------------------------------------------------
298 299
299 300 def _make_metadata(self, other=None):
300 301 """init metadata dict, for execute/apply_reply"""
301 302 new_md = {
302 303 'dependencies_met' : True,
303 304 'engine' : self.ident,
304 305 'started': datetime.now(),
305 306 }
306 307 if other:
307 308 new_md.update(other)
308 309 return new_md
309 310
310 311 def _publish_pyin(self, code, parent, execution_count):
311 312 """Publish the code request on the pyin stream."""
312 313
313 314 self.session.send(self.iopub_socket, u'pyin',
314 315 {u'code':code, u'execution_count': execution_count},
315 316 parent=parent, ident=self._topic('pyin')
316 317 )
317 318
318 319 def _publish_status(self, status, parent=None):
319 320 """send status (busy/idle) on IOPub"""
320 321 self.session.send(self.iopub_socket,
321 322 u'status',
322 323 {u'execution_state': status},
323 324 parent=parent,
324 325 ident=self._topic('status'),
325 326 )
326 327
327 328
328 329 def execute_request(self, stream, ident, parent):
329 330 """handle an execute_request"""
330 331
331 332 self._publish_status(u'busy', parent)
332 333
333 334 try:
334 335 content = parent[u'content']
335 336 code = content[u'code']
336 337 silent = content[u'silent']
337 338 store_history = content.get(u'store_history', not silent)
338 339 except:
339 340 self.log.error("Got bad msg: ")
340 341 self.log.error("%s", parent)
341 342 return
342 343
343 344 md = self._make_metadata(parent['metadata'])
344 345
345 346 shell = self.shell # we'll need this a lot here
346 347
347 348 # Replace raw_input. Note that is not sufficient to replace
348 349 # raw_input in the user namespace.
349 350 if content.get('allow_stdin', False):
350 351 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
351 352 else:
352 353 raw_input = lambda prompt='' : self._no_raw_input()
353 354
354 355 if py3compat.PY3:
356 self.sys_raw_input = __builtin__.input
355 357 __builtin__.input = raw_input
356 358 else:
359 self.sys_raw_input = __builtin__.raw_input
357 360 __builtin__.raw_input = raw_input
358 361
359 362 # Set the parent message of the display hook and out streams.
360 363 shell.displayhook.set_parent(parent)
361 364 shell.display_pub.set_parent(parent)
362 365 shell.data_pub.set_parent(parent)
363 366 sys.stdout.set_parent(parent)
364 367 sys.stderr.set_parent(parent)
365 368
366 369 # Re-broadcast our input for the benefit of listening clients, and
367 370 # start computing output
368 371 if not silent:
369 372 self._publish_pyin(code, parent, shell.execution_count)
370 373
371 374 reply_content = {}
372 375 try:
373 376 # FIXME: the shell calls the exception handler itself.
374 377 shell.run_cell(code, store_history=store_history, silent=silent)
375 378 except:
376 379 status = u'error'
377 380 # FIXME: this code right now isn't being used yet by default,
378 381 # because the run_cell() call above directly fires off exception
379 382 # reporting. This code, therefore, is only active in the scenario
380 383 # where runlines itself has an unhandled exception. We need to
381 384 # uniformize this, for all exception construction to come from a
382 385 # single location in the codbase.
383 386 etype, evalue, tb = sys.exc_info()
384 387 tb_list = traceback.format_exception(etype, evalue, tb)
385 388 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
386 389 else:
387 390 status = u'ok'
391 finally:
392 # Restore raw_input.
393 if py3compat.PY3:
394 __builtin__.input = self.sys_raw_input
395 else:
396 __builtin__.raw_input = self.sys_raw_input
388 397
389 398 reply_content[u'status'] = status
390 399
391 400 # Return the execution counter so clients can display prompts
392 401 reply_content['execution_count'] = shell.execution_count - 1
393 402
394 403 # FIXME - fish exception info out of shell, possibly left there by
395 404 # runlines. We'll need to clean up this logic later.
396 405 if shell._reply_content is not None:
397 406 reply_content.update(shell._reply_content)
398 407 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
399 408 reply_content['engine_info'] = e_info
400 409 # reset after use
401 410 shell._reply_content = None
402 411
403 412 # At this point, we can tell whether the main code execution succeeded
404 413 # or not. If it did, we proceed to evaluate user_variables/expressions
405 414 if reply_content['status'] == 'ok':
406 415 reply_content[u'user_variables'] = \
407 416 shell.user_variables(content.get(u'user_variables', []))
408 417 reply_content[u'user_expressions'] = \
409 418 shell.user_expressions(content.get(u'user_expressions', {}))
410 419 else:
411 420 # If there was an error, don't even try to compute variables or
412 421 # expressions
413 422 reply_content[u'user_variables'] = {}
414 423 reply_content[u'user_expressions'] = {}
415 424
416 425 # Payloads should be retrieved regardless of outcome, so we can both
417 426 # recover partial output (that could have been generated early in a
418 427 # block, before an error) and clear the payload system always.
419 428 reply_content[u'payload'] = shell.payload_manager.read_payload()
420 429 # Be agressive about clearing the payload because we don't want
421 430 # it to sit in memory until the next execute_request comes in.
422 431 shell.payload_manager.clear_payload()
423 432
424 433 # Flush output before sending the reply.
425 434 sys.stdout.flush()
426 435 sys.stderr.flush()
427 436 # FIXME: on rare occasions, the flush doesn't seem to make it to the
428 437 # clients... This seems to mitigate the problem, but we definitely need
429 438 # to better understand what's going on.
430 439 if self._execute_sleep:
431 440 time.sleep(self._execute_sleep)
432 441
433 442 # Send the reply.
434 443 reply_content = json_clean(reply_content)
435 444
436 445 md['status'] = reply_content['status']
437 446 if reply_content['status'] == 'error' and \
438 447 reply_content['ename'] == 'UnmetDependency':
439 448 md['dependencies_met'] = False
440 449
441 450 reply_msg = self.session.send(stream, u'execute_reply',
442 451 reply_content, parent, metadata=md,
443 452 ident=ident)
444 453
445 454 self.log.debug("%s", reply_msg)
446 455
447 456 if not silent and reply_msg['content']['status'] == u'error':
448 457 self._abort_queues()
449 458
450 459 self._publish_status(u'idle', parent)
451 460
452 461 def complete_request(self, stream, ident, parent):
453 462 txt, matches = self._complete(parent)
454 463 matches = {'matches' : matches,
455 464 'matched_text' : txt,
456 465 'status' : 'ok'}
457 466 matches = json_clean(matches)
458 467 completion_msg = self.session.send(stream, 'complete_reply',
459 468 matches, parent, ident)
460 469 self.log.debug("%s", completion_msg)
461 470
462 471 def object_info_request(self, stream, ident, parent):
463 472 content = parent['content']
464 473 object_info = self.shell.object_inspect(content['oname'],
465 474 detail_level = content.get('detail_level', 0)
466 475 )
467 476 # Before we send this object over, we scrub it for JSON usage
468 477 oinfo = json_clean(object_info)
469 478 msg = self.session.send(stream, 'object_info_reply',
470 479 oinfo, parent, ident)
471 480 self.log.debug("%s", msg)
472 481
473 482 def history_request(self, stream, ident, parent):
474 483 # We need to pull these out, as passing **kwargs doesn't work with
475 484 # unicode keys before Python 2.6.5.
476 485 hist_access_type = parent['content']['hist_access_type']
477 486 raw = parent['content']['raw']
478 487 output = parent['content']['output']
479 488 if hist_access_type == 'tail':
480 489 n = parent['content']['n']
481 490 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
482 491 include_latest=True)
483 492
484 493 elif hist_access_type == 'range':
485 494 session = parent['content']['session']
486 495 start = parent['content']['start']
487 496 stop = parent['content']['stop']
488 497 hist = self.shell.history_manager.get_range(session, start, stop,
489 498 raw=raw, output=output)
490 499
491 500 elif hist_access_type == 'search':
492 501 pattern = parent['content']['pattern']
493 502 hist = self.shell.history_manager.search(pattern, raw=raw,
494 503 output=output)
495 504
496 505 else:
497 506 hist = []
498 507 hist = list(hist)
499 508 content = {'history' : hist}
500 509 content = json_clean(content)
501 510 msg = self.session.send(stream, 'history_reply',
502 511 content, parent, ident)
503 512 self.log.debug("Sending history reply with %i entries", len(hist))
504 513
505 514 def connect_request(self, stream, ident, parent):
506 515 if self._recorded_ports is not None:
507 516 content = self._recorded_ports.copy()
508 517 else:
509 518 content = {}
510 519 msg = self.session.send(stream, 'connect_reply',
511 520 content, parent, ident)
512 521 self.log.debug("%s", msg)
513 522
514 523 def shutdown_request(self, stream, ident, parent):
515 524 self.shell.exit_now = True
516 525 content = dict(status='ok')
517 526 content.update(parent['content'])
518 527 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
519 528 # same content, but different msg_id for broadcasting on IOPub
520 529 self._shutdown_message = self.session.msg(u'shutdown_reply',
521 530 content, parent
522 531 )
523 532
524 533 self._at_shutdown()
525 534 # call sys.exit after a short delay
526 535 loop = ioloop.IOLoop.instance()
527 536 loop.add_timeout(time.time()+0.1, loop.stop)
528 537
529 538 #---------------------------------------------------------------------------
530 539 # Engine methods
531 540 #---------------------------------------------------------------------------
532 541
533 542 def apply_request(self, stream, ident, parent):
534 543 try:
535 544 content = parent[u'content']
536 545 bufs = parent[u'buffers']
537 546 msg_id = parent['header']['msg_id']
538 547 except:
539 548 self.log.error("Got bad msg: %s", parent, exc_info=True)
540 549 return
541 550
542 551 self._publish_status(u'busy', parent)
543 552
544 553 # Set the parent message of the display hook and out streams.
545 554 shell = self.shell
546 555 shell.displayhook.set_parent(parent)
547 556 shell.display_pub.set_parent(parent)
548 557 shell.data_pub.set_parent(parent)
549 558 sys.stdout.set_parent(parent)
550 559 sys.stderr.set_parent(parent)
551 560
552 561 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
553 562 # self.iopub_socket.send(pyin_msg)
554 563 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
555 564 md = self._make_metadata(parent['metadata'])
556 565 try:
557 566 working = shell.user_ns
558 567
559 568 prefix = "_"+str(msg_id).replace("-","")+"_"
560 569
561 570 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
562 571
563 572 fname = getattr(f, '__name__', 'f')
564 573
565 574 fname = prefix+"f"
566 575 argname = prefix+"args"
567 576 kwargname = prefix+"kwargs"
568 577 resultname = prefix+"result"
569 578
570 579 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
571 580 # print ns
572 581 working.update(ns)
573 582 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
574 583 try:
575 584 exec code in shell.user_global_ns, shell.user_ns
576 585 result = working.get(resultname)
577 586 finally:
578 587 for key in ns.iterkeys():
579 588 working.pop(key)
580 589
581 590 result_buf = serialize_object(result,
582 591 buffer_threshold=self.session.buffer_threshold,
583 592 item_threshold=self.session.item_threshold,
584 593 )
585 594
586 595 except:
587 596 # invoke IPython traceback formatting
588 597 shell.showtraceback()
589 598 # FIXME - fish exception info out of shell, possibly left there by
590 599 # run_code. We'll need to clean up this logic later.
591 600 reply_content = {}
592 601 if shell._reply_content is not None:
593 602 reply_content.update(shell._reply_content)
594 603 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
595 604 reply_content['engine_info'] = e_info
596 605 # reset after use
597 606 shell._reply_content = None
598 607
599 608 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
600 609 ident=self._topic('pyerr'))
601 610 result_buf = []
602 611
603 612 if reply_content['ename'] == 'UnmetDependency':
604 613 md['dependencies_met'] = False
605 614 else:
606 615 reply_content = {'status' : 'ok'}
607 616
608 617 # put 'ok'/'error' status in header, for scheduler introspection:
609 618 md['status'] = reply_content['status']
610 619
611 620 # flush i/o
612 621 sys.stdout.flush()
613 622 sys.stderr.flush()
614 623
615 624 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
616 625 parent=parent, ident=ident,buffers=result_buf, metadata=md)
617 626
618 627 self._publish_status(u'idle', parent)
619 628
620 629 #---------------------------------------------------------------------------
621 630 # Control messages
622 631 #---------------------------------------------------------------------------
623 632
624 633 def abort_request(self, stream, ident, parent):
625 634 """abort a specifig msg by id"""
626 635 msg_ids = parent['content'].get('msg_ids', None)
627 636 if isinstance(msg_ids, basestring):
628 637 msg_ids = [msg_ids]
629 638 if not msg_ids:
630 639 self.abort_queues()
631 640 for mid in msg_ids:
632 641 self.aborted.add(str(mid))
633 642
634 643 content = dict(status='ok')
635 644 reply_msg = self.session.send(stream, 'abort_reply', content=content,
636 645 parent=parent, ident=ident)
637 646 self.log.debug("%s", reply_msg)
638 647
639 648 def clear_request(self, stream, idents, parent):
640 649 """Clear our namespace."""
641 650 self.shell.reset(False)
642 651 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
643 652 content = dict(status='ok'))
644 653
645 654
646 655 #---------------------------------------------------------------------------
647 656 # Protected interface
648 657 #---------------------------------------------------------------------------
649 658
650 659 def _wrap_exception(self, method=None):
651 660 # import here, because _wrap_exception is only used in parallel,
652 661 # and parallel has higher min pyzmq version
653 662 from IPython.parallel.error import wrap_exception
654 663 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
655 664 content = wrap_exception(e_info)
656 665 return content
657 666
658 667 def _topic(self, topic):
659 668 """prefixed topic for IOPub messages"""
660 669 if self.int_id >= 0:
661 670 base = "engine.%i" % self.int_id
662 671 else:
663 672 base = "kernel.%s" % self.ident
664 673
665 674 return py3compat.cast_bytes("%s.%s" % (base, topic))
666 675
667 676 def _abort_queues(self):
668 677 for stream in self.shell_streams:
669 678 if stream:
670 679 self._abort_queue(stream)
671 680
672 681 def _abort_queue(self, stream):
673 682 poller = zmq.Poller()
674 683 poller.register(stream.socket, zmq.POLLIN)
675 684 while True:
676 685 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
677 686 if msg is None:
678 687 return
679 688
680 689 self.log.info("Aborting:")
681 690 self.log.info("%s", msg)
682 691 msg_type = msg['header']['msg_type']
683 692 reply_type = msg_type.split('_')[0] + '_reply'
684 693
685 694 status = {'status' : 'aborted'}
686 695 md = {'engine' : self.ident}
687 696 md.update(status)
688 697 reply_msg = self.session.send(stream, reply_type, metadata=md,
689 698 content=status, parent=msg, ident=idents)
690 699 self.log.debug("%s", reply_msg)
691 700 # We need to wait a bit for requests to come in. This can probably
692 701 # be set shorter for true asynchronous clients.
693 702 poller.poll(50)
694 703
695 704
696 705 def _no_raw_input(self):
697 706 """Raise StdinNotImplentedError if active frontend doesn't support
698 707 stdin."""
699 708 raise StdinNotImplementedError("raw_input was called, but this "
700 709 "frontend does not support stdin.")
701 710
702 711 def _raw_input(self, prompt, ident, parent):
703 712 # Flush output before making the request.
704 713 sys.stderr.flush()
705 714 sys.stdout.flush()
706 715
707 716 # Send the input request.
708 717 content = json_clean(dict(prompt=prompt))
709 718 self.session.send(self.stdin_socket, u'input_request', content, parent,
710 719 ident=ident)
711 720
712 721 # Await a response.
713 722 while True:
714 723 try:
715 724 ident, reply = self.session.recv(self.stdin_socket, 0)
716 725 except Exception:
717 726 self.log.warn("Invalid Message:", exc_info=True)
718 727 else:
719 728 break
720 729 try:
721 730 value = reply['content']['value']
722 731 except:
723 732 self.log.error("Got bad raw_input reply: ")
724 733 self.log.error("%s", parent)
725 734 value = ''
726 735 if value == '\x04':
727 736 # EOF
728 737 raise EOFError
729 738 return value
730 739
731 740 def _complete(self, msg):
732 741 c = msg['content']
733 742 try:
734 743 cpos = int(c['cursor_pos'])
735 744 except:
736 745 # If we don't get something that we can convert to an integer, at
737 746 # least attempt the completion guessing the cursor is at the end of
738 747 # the text, if there's any, and otherwise of the line
739 748 cpos = len(c['text'])
740 749 if cpos==0:
741 750 cpos = len(c['line'])
742 751 return self.shell.complete(c['text'], c['line'], cpos)
743 752
744 753 def _at_shutdown(self):
745 754 """Actions taken at shutdown by the kernel, called by python's atexit.
746 755 """
747 756 # io.rprint("Kernel at_shutdown") # dbg
748 757 if self._shutdown_message is not None:
749 758 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
750 759 self.log.debug("%s", self._shutdown_message)
751 760 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
752 761
753 762 #-----------------------------------------------------------------------------
754 763 # Aliases and Flags for the IPKernelApp
755 764 #-----------------------------------------------------------------------------
756 765
757 766 flags = dict(kernel_flags)
758 767 flags.update(shell_flags)
759 768
760 769 addflag = lambda *args: flags.update(boolean_flag(*args))
761 770
762 771 flags['pylab'] = (
763 772 {'IPKernelApp' : {'pylab' : 'auto'}},
764 773 """Pre-load matplotlib and numpy for interactive use with
765 774 the default matplotlib backend."""
766 775 )
767 776
768 777 aliases = dict(kernel_aliases)
769 778 aliases.update(shell_aliases)
770 779
771 780 #-----------------------------------------------------------------------------
772 781 # The IPKernelApp class
773 782 #-----------------------------------------------------------------------------
774 783
775 784 class IPKernelApp(KernelApp, InteractiveShellApp):
776 785 name = 'ipkernel'
777 786
778 787 aliases = Dict(aliases)
779 788 flags = Dict(flags)
780 789 classes = [Kernel, ZMQInteractiveShell, ProfileDir, Session]
781 790
782 791 @catch_config_error
783 792 def initialize(self, argv=None):
784 793 super(IPKernelApp, self).initialize(argv)
785 794 self.init_path()
786 795 self.init_shell()
787 796 self.init_gui_pylab()
788 797 self.init_extensions()
789 798 self.init_code()
790 799
791 800 def init_kernel(self):
792 801
793 802 shell_stream = ZMQStream(self.shell_socket)
794 803
795 804 kernel = Kernel(config=self.config, session=self.session,
796 805 shell_streams=[shell_stream],
797 806 iopub_socket=self.iopub_socket,
798 807 stdin_socket=self.stdin_socket,
799 808 log=self.log,
800 809 profile_dir=self.profile_dir,
801 810 )
802 811 self.kernel = kernel
803 812 kernel.record_ports(self.ports)
804 813 shell = kernel.shell
805 814
806 815 def init_gui_pylab(self):
807 816 """Enable GUI event loop integration, taking pylab into account."""
808 817
809 818 # Provide a wrapper for :meth:`InteractiveShellApp.init_gui_pylab`
810 819 # to ensure that any exception is printed straight to stderr.
811 820 # Normally _showtraceback associates the reply with an execution,
812 821 # which means frontends will never draw it, as this exception
813 822 # is not associated with any execute request.
814 823
815 824 shell = self.shell
816 825 _showtraceback = shell._showtraceback
817 826 try:
818 827 # replace pyerr-sending traceback with stderr
819 828 def print_tb(etype, evalue, stb):
820 829 print ("GUI event loop or pylab initialization failed",
821 830 file=io.stderr)
822 831 print (shell.InteractiveTB.stb2text(stb), file=io.stderr)
823 832 shell._showtraceback = print_tb
824 833 InteractiveShellApp.init_gui_pylab(self)
825 834 finally:
826 835 shell._showtraceback = _showtraceback
827 836
828 837 def init_shell(self):
829 838 self.shell = self.kernel.shell
830 839 self.shell.configurables.append(self)
831 840
832 841
833 842 #-----------------------------------------------------------------------------
834 843 # Kernel main and launch functions
835 844 #-----------------------------------------------------------------------------
836 845
837 846 def launch_kernel(*args, **kwargs):
838 847 """Launches a localhost IPython kernel, binding to the specified ports.
839 848
840 849 This function simply calls entry_point.base_launch_kernel with the right
841 850 first command to start an ipkernel. See base_launch_kernel for arguments.
842 851
843 852 Returns
844 853 -------
845 854 A tuple of form:
846 855 (kernel_process, shell_port, iopub_port, stdin_port, hb_port)
847 856 where kernel_process is a Popen object and the ports are integers.
848 857 """
849 858 return base_launch_kernel('from IPython.zmq.ipkernel import main; main()',
850 859 *args, **kwargs)
851 860
852 861
853 862 def embed_kernel(module=None, local_ns=None, **kwargs):
854 863 """Embed and start an IPython kernel in a given scope.
855 864
856 865 Parameters
857 866 ----------
858 867 module : ModuleType, optional
859 868 The module to load into IPython globals (default: caller)
860 869 local_ns : dict, optional
861 870 The namespace to load into IPython user namespace (default: caller)
862 871
863 872 kwargs : various, optional
864 873 Further keyword args are relayed to the KernelApp constructor,
865 874 allowing configuration of the Kernel. Will only have an effect
866 875 on the first embed_kernel call for a given process.
867 876
868 877 """
869 878 # get the app if it exists, or set it up if it doesn't
870 879 if IPKernelApp.initialized():
871 880 app = IPKernelApp.instance()
872 881 else:
873 882 app = IPKernelApp.instance(**kwargs)
874 883 app.initialize([])
875 884 # Undo unnecessary sys module mangling from init_sys_modules.
876 885 # This would not be necessary if we could prevent it
877 886 # in the first place by using a different InteractiveShell
878 887 # subclass, as in the regular embed case.
879 888 main = app.kernel.shell._orig_sys_modules_main_mod
880 889 if main is not None:
881 890 sys.modules[app.kernel.shell._orig_sys_modules_main_name] = main
882 891
883 892 # load the calling scope if not given
884 893 (caller_module, caller_locals) = extract_module_locals(1)
885 894 if module is None:
886 895 module = caller_module
887 896 if local_ns is None:
888 897 local_ns = caller_locals
889 898
890 899 app.kernel.user_module = module
891 900 app.kernel.user_ns = local_ns
892 901 app.shell.set_completer_frame()
893 902 app.start()
894 903
895 904 def main():
896 905 """Run an IPKernel as an application"""
897 906 app = IPKernelApp.instance()
898 907 app.initialize()
899 908 app.start()
900 909
901 910
902 911 if __name__ == '__main__':
903 912 main()
General Comments 0
You need to be logged in to leave comments. Login now