##// END OF EJS Templates
Be explicit about default value for unique
Takafumi Arakaki -
Show More
@@ -1,780 +1,780 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple interactive kernel that talks to a frontend over 0MQ.
2 """A simple interactive kernel that talks to a frontend over 0MQ.
3
3
4 Things to do:
4 Things to do:
5
5
6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 call set_parent on all the PUB objects with the message about to be executed.
7 call set_parent on all the PUB objects with the message about to be executed.
8 * Implement random port and security key logic.
8 * Implement random port and security key logic.
9 * Implement control messages.
9 * Implement control messages.
10 * Implement event loop and poll version.
10 * Implement event loop and poll version.
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 # Standard library imports
18 # Standard library imports
19 import __builtin__
19 import __builtin__
20 import sys
20 import sys
21 import time
21 import time
22 import traceback
22 import traceback
23 import logging
23 import logging
24 import uuid
24 import uuid
25
25
26 from datetime import datetime
26 from datetime import datetime
27 from signal import (
27 from signal import (
28 signal, getsignal, default_int_handler, SIGINT, SIG_IGN
28 signal, getsignal, default_int_handler, SIGINT, SIG_IGN
29 )
29 )
30
30
31 # System library imports
31 # System library imports
32 import zmq
32 import zmq
33 from zmq.eventloop import ioloop
33 from zmq.eventloop import ioloop
34 from zmq.eventloop.zmqstream import ZMQStream
34 from zmq.eventloop.zmqstream import ZMQStream
35
35
36 # Local imports
36 # Local imports
37 from IPython.config.configurable import Configurable
37 from IPython.config.configurable import Configurable
38 from IPython.core.error import StdinNotImplementedError
38 from IPython.core.error import StdinNotImplementedError
39 from IPython.core import release
39 from IPython.core import release
40 from IPython.utils import io
40 from IPython.utils import io
41 from IPython.utils import py3compat
41 from IPython.utils import py3compat
42 from IPython.utils.jsonutil import json_clean
42 from IPython.utils.jsonutil import json_clean
43 from IPython.utils.traitlets import (
43 from IPython.utils.traitlets import (
44 Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode,
44 Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode,
45 Type
45 Type
46 )
46 )
47
47
48 from serialize import serialize_object, unpack_apply_message
48 from serialize import serialize_object, unpack_apply_message
49 from session import Session
49 from session import Session
50 from zmqshell import ZMQInteractiveShell
50 from zmqshell import ZMQInteractiveShell
51
51
52
52
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54 # Main kernel class
54 # Main kernel class
55 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
56
56
57 protocol_version = list(release.kernel_protocol_version_info)
57 protocol_version = list(release.kernel_protocol_version_info)
58 ipython_version = list(release.version_info)
58 ipython_version = list(release.version_info)
59 language_version = list(sys.version_info[:3])
59 language_version = list(sys.version_info[:3])
60
60
61
61
62 class Kernel(Configurable):
62 class Kernel(Configurable):
63
63
64 #---------------------------------------------------------------------------
64 #---------------------------------------------------------------------------
65 # Kernel interface
65 # Kernel interface
66 #---------------------------------------------------------------------------
66 #---------------------------------------------------------------------------
67
67
68 # attribute to override with a GUI
68 # attribute to override with a GUI
69 eventloop = Any(None)
69 eventloop = Any(None)
70 def _eventloop_changed(self, name, old, new):
70 def _eventloop_changed(self, name, old, new):
71 """schedule call to eventloop from IOLoop"""
71 """schedule call to eventloop from IOLoop"""
72 loop = ioloop.IOLoop.instance()
72 loop = ioloop.IOLoop.instance()
73 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
73 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
74
74
75 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
75 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
76 shell_class = Type(ZMQInteractiveShell)
76 shell_class = Type(ZMQInteractiveShell)
77
77
78 session = Instance(Session)
78 session = Instance(Session)
79 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
79 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
80 shell_streams = List()
80 shell_streams = List()
81 control_stream = Instance(ZMQStream)
81 control_stream = Instance(ZMQStream)
82 iopub_socket = Instance(zmq.Socket)
82 iopub_socket = Instance(zmq.Socket)
83 stdin_socket = Instance(zmq.Socket)
83 stdin_socket = Instance(zmq.Socket)
84 log = Instance(logging.Logger)
84 log = Instance(logging.Logger)
85
85
86 user_module = Any()
86 user_module = Any()
87 def _user_module_changed(self, name, old, new):
87 def _user_module_changed(self, name, old, new):
88 if self.shell is not None:
88 if self.shell is not None:
89 self.shell.user_module = new
89 self.shell.user_module = new
90
90
91 user_ns = Dict(default_value=None)
91 user_ns = Dict(default_value=None)
92 def _user_ns_changed(self, name, old, new):
92 def _user_ns_changed(self, name, old, new):
93 if self.shell is not None:
93 if self.shell is not None:
94 self.shell.user_ns = new
94 self.shell.user_ns = new
95 self.shell.init_user_ns()
95 self.shell.init_user_ns()
96
96
97 # identities:
97 # identities:
98 int_id = Integer(-1)
98 int_id = Integer(-1)
99 ident = Unicode()
99 ident = Unicode()
100
100
101 def _ident_default(self):
101 def _ident_default(self):
102 return unicode(uuid.uuid4())
102 return unicode(uuid.uuid4())
103
103
104
104
105 # Private interface
105 # Private interface
106
106
107 # Time to sleep after flushing the stdout/err buffers in each execute
107 # Time to sleep after flushing the stdout/err buffers in each execute
108 # cycle. While this introduces a hard limit on the minimal latency of the
108 # cycle. While this introduces a hard limit on the minimal latency of the
109 # execute cycle, it helps prevent output synchronization problems for
109 # execute cycle, it helps prevent output synchronization problems for
110 # clients.
110 # clients.
111 # Units are in seconds. The minimum zmq latency on local host is probably
111 # Units are in seconds. The minimum zmq latency on local host is probably
112 # ~150 microseconds, set this to 500us for now. We may need to increase it
112 # ~150 microseconds, set this to 500us for now. We may need to increase it
113 # a little if it's not enough after more interactive testing.
113 # a little if it's not enough after more interactive testing.
114 _execute_sleep = Float(0.0005, config=True)
114 _execute_sleep = Float(0.0005, config=True)
115
115
116 # Frequency of the kernel's event loop.
116 # Frequency of the kernel's event loop.
117 # Units are in seconds, kernel subclasses for GUI toolkits may need to
117 # Units are in seconds, kernel subclasses for GUI toolkits may need to
118 # adapt to milliseconds.
118 # adapt to milliseconds.
119 _poll_interval = Float(0.05, config=True)
119 _poll_interval = Float(0.05, config=True)
120
120
121 # If the shutdown was requested over the network, we leave here the
121 # If the shutdown was requested over the network, we leave here the
122 # necessary reply message so it can be sent by our registered atexit
122 # necessary reply message so it can be sent by our registered atexit
123 # handler. This ensures that the reply is only sent to clients truly at
123 # handler. This ensures that the reply is only sent to clients truly at
124 # the end of our shutdown process (which happens after the underlying
124 # the end of our shutdown process (which happens after the underlying
125 # IPython shell's own shutdown).
125 # IPython shell's own shutdown).
126 _shutdown_message = None
126 _shutdown_message = None
127
127
128 # This is a dict of port number that the kernel is listening on. It is set
128 # This is a dict of port number that the kernel is listening on. It is set
129 # by record_ports and used by connect_request.
129 # by record_ports and used by connect_request.
130 _recorded_ports = Dict()
130 _recorded_ports = Dict()
131
131
132 # A reference to the Python builtin 'raw_input' function.
132 # A reference to the Python builtin 'raw_input' function.
133 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
133 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
134 _sys_raw_input = Any()
134 _sys_raw_input = Any()
135
135
136 # set of aborted msg_ids
136 # set of aborted msg_ids
137 aborted = Set()
137 aborted = Set()
138
138
139
139
140 def __init__(self, **kwargs):
140 def __init__(self, **kwargs):
141 super(Kernel, self).__init__(**kwargs)
141 super(Kernel, self).__init__(**kwargs)
142
142
143 # Initialize the InteractiveShell subclass
143 # Initialize the InteractiveShell subclass
144 self.shell = self.shell_class.instance(config=self.config,
144 self.shell = self.shell_class.instance(config=self.config,
145 profile_dir = self.profile_dir,
145 profile_dir = self.profile_dir,
146 user_module = self.user_module,
146 user_module = self.user_module,
147 user_ns = self.user_ns,
147 user_ns = self.user_ns,
148 )
148 )
149 self.shell.displayhook.session = self.session
149 self.shell.displayhook.session = self.session
150 self.shell.displayhook.pub_socket = self.iopub_socket
150 self.shell.displayhook.pub_socket = self.iopub_socket
151 self.shell.displayhook.topic = self._topic('pyout')
151 self.shell.displayhook.topic = self._topic('pyout')
152 self.shell.display_pub.session = self.session
152 self.shell.display_pub.session = self.session
153 self.shell.display_pub.pub_socket = self.iopub_socket
153 self.shell.display_pub.pub_socket = self.iopub_socket
154 self.shell.data_pub.session = self.session
154 self.shell.data_pub.session = self.session
155 self.shell.data_pub.pub_socket = self.iopub_socket
155 self.shell.data_pub.pub_socket = self.iopub_socket
156
156
157 # TMP - hack while developing
157 # TMP - hack while developing
158 self.shell._reply_content = None
158 self.shell._reply_content = None
159
159
160 # Build dict of handlers for message types
160 # Build dict of handlers for message types
161 msg_types = [ 'execute_request', 'complete_request',
161 msg_types = [ 'execute_request', 'complete_request',
162 'object_info_request', 'history_request',
162 'object_info_request', 'history_request',
163 'kernel_info_request',
163 'kernel_info_request',
164 'connect_request', 'shutdown_request',
164 'connect_request', 'shutdown_request',
165 'apply_request',
165 'apply_request',
166 ]
166 ]
167 self.shell_handlers = {}
167 self.shell_handlers = {}
168 for msg_type in msg_types:
168 for msg_type in msg_types:
169 self.shell_handlers[msg_type] = getattr(self, msg_type)
169 self.shell_handlers[msg_type] = getattr(self, msg_type)
170
170
171 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
171 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
172 self.control_handlers = {}
172 self.control_handlers = {}
173 for msg_type in control_msg_types:
173 for msg_type in control_msg_types:
174 self.control_handlers[msg_type] = getattr(self, msg_type)
174 self.control_handlers[msg_type] = getattr(self, msg_type)
175
175
176 def dispatch_control(self, msg):
176 def dispatch_control(self, msg):
177 """dispatch control requests"""
177 """dispatch control requests"""
178 idents,msg = self.session.feed_identities(msg, copy=False)
178 idents,msg = self.session.feed_identities(msg, copy=False)
179 try:
179 try:
180 msg = self.session.unserialize(msg, content=True, copy=False)
180 msg = self.session.unserialize(msg, content=True, copy=False)
181 except:
181 except:
182 self.log.error("Invalid Control Message", exc_info=True)
182 self.log.error("Invalid Control Message", exc_info=True)
183 return
183 return
184
184
185 self.log.debug("Control received: %s", msg)
185 self.log.debug("Control received: %s", msg)
186
186
187 header = msg['header']
187 header = msg['header']
188 msg_id = header['msg_id']
188 msg_id = header['msg_id']
189 msg_type = header['msg_type']
189 msg_type = header['msg_type']
190
190
191 handler = self.control_handlers.get(msg_type, None)
191 handler = self.control_handlers.get(msg_type, None)
192 if handler is None:
192 if handler is None:
193 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
193 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
194 else:
194 else:
195 try:
195 try:
196 handler(self.control_stream, idents, msg)
196 handler(self.control_stream, idents, msg)
197 except Exception:
197 except Exception:
198 self.log.error("Exception in control handler:", exc_info=True)
198 self.log.error("Exception in control handler:", exc_info=True)
199
199
200 def dispatch_shell(self, stream, msg):
200 def dispatch_shell(self, stream, msg):
201 """dispatch shell requests"""
201 """dispatch shell requests"""
202 # flush control requests first
202 # flush control requests first
203 if self.control_stream:
203 if self.control_stream:
204 self.control_stream.flush()
204 self.control_stream.flush()
205
205
206 idents,msg = self.session.feed_identities(msg, copy=False)
206 idents,msg = self.session.feed_identities(msg, copy=False)
207 try:
207 try:
208 msg = self.session.unserialize(msg, content=True, copy=False)
208 msg = self.session.unserialize(msg, content=True, copy=False)
209 except:
209 except:
210 self.log.error("Invalid Message", exc_info=True)
210 self.log.error("Invalid Message", exc_info=True)
211 return
211 return
212
212
213 header = msg['header']
213 header = msg['header']
214 msg_id = header['msg_id']
214 msg_id = header['msg_id']
215 msg_type = msg['header']['msg_type']
215 msg_type = msg['header']['msg_type']
216
216
217 # Print some info about this message and leave a '--->' marker, so it's
217 # Print some info about this message and leave a '--->' marker, so it's
218 # easier to trace visually the message chain when debugging. Each
218 # easier to trace visually the message chain when debugging. Each
219 # handler prints its message at the end.
219 # handler prints its message at the end.
220 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
220 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
221 self.log.debug(' Content: %s\n --->\n ', msg['content'])
221 self.log.debug(' Content: %s\n --->\n ', msg['content'])
222
222
223 if msg_id in self.aborted:
223 if msg_id in self.aborted:
224 self.aborted.remove(msg_id)
224 self.aborted.remove(msg_id)
225 # is it safe to assume a msg_id will not be resubmitted?
225 # is it safe to assume a msg_id will not be resubmitted?
226 reply_type = msg_type.split('_')[0] + '_reply'
226 reply_type = msg_type.split('_')[0] + '_reply'
227 status = {'status' : 'aborted'}
227 status = {'status' : 'aborted'}
228 md = {'engine' : self.ident}
228 md = {'engine' : self.ident}
229 md.update(status)
229 md.update(status)
230 reply_msg = self.session.send(stream, reply_type, metadata=md,
230 reply_msg = self.session.send(stream, reply_type, metadata=md,
231 content=status, parent=msg, ident=idents)
231 content=status, parent=msg, ident=idents)
232 return
232 return
233
233
234 handler = self.shell_handlers.get(msg_type, None)
234 handler = self.shell_handlers.get(msg_type, None)
235 if handler is None:
235 if handler is None:
236 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
236 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
237 else:
237 else:
238 # ensure default_int_handler during handler call
238 # ensure default_int_handler during handler call
239 sig = signal(SIGINT, default_int_handler)
239 sig = signal(SIGINT, default_int_handler)
240 try:
240 try:
241 handler(stream, idents, msg)
241 handler(stream, idents, msg)
242 except Exception:
242 except Exception:
243 self.log.error("Exception in message handler:", exc_info=True)
243 self.log.error("Exception in message handler:", exc_info=True)
244 finally:
244 finally:
245 signal(SIGINT, sig)
245 signal(SIGINT, sig)
246
246
247 def enter_eventloop(self):
247 def enter_eventloop(self):
248 """enter eventloop"""
248 """enter eventloop"""
249 self.log.info("entering eventloop")
249 self.log.info("entering eventloop")
250 # restore default_int_handler
250 # restore default_int_handler
251 signal(SIGINT, default_int_handler)
251 signal(SIGINT, default_int_handler)
252 while self.eventloop is not None:
252 while self.eventloop is not None:
253 try:
253 try:
254 self.eventloop(self)
254 self.eventloop(self)
255 except KeyboardInterrupt:
255 except KeyboardInterrupt:
256 # Ctrl-C shouldn't crash the kernel
256 # Ctrl-C shouldn't crash the kernel
257 self.log.error("KeyboardInterrupt caught in kernel")
257 self.log.error("KeyboardInterrupt caught in kernel")
258 continue
258 continue
259 else:
259 else:
260 # eventloop exited cleanly, this means we should stop (right?)
260 # eventloop exited cleanly, this means we should stop (right?)
261 self.eventloop = None
261 self.eventloop = None
262 break
262 break
263 self.log.info("exiting eventloop")
263 self.log.info("exiting eventloop")
264
264
265 def start(self):
265 def start(self):
266 """register dispatchers for streams"""
266 """register dispatchers for streams"""
267 self.shell.exit_now = False
267 self.shell.exit_now = False
268 if self.control_stream:
268 if self.control_stream:
269 self.control_stream.on_recv(self.dispatch_control, copy=False)
269 self.control_stream.on_recv(self.dispatch_control, copy=False)
270
270
271 def make_dispatcher(stream):
271 def make_dispatcher(stream):
272 def dispatcher(msg):
272 def dispatcher(msg):
273 return self.dispatch_shell(stream, msg)
273 return self.dispatch_shell(stream, msg)
274 return dispatcher
274 return dispatcher
275
275
276 for s in self.shell_streams:
276 for s in self.shell_streams:
277 s.on_recv(make_dispatcher(s), copy=False)
277 s.on_recv(make_dispatcher(s), copy=False)
278
278
279 def do_one_iteration(self):
279 def do_one_iteration(self):
280 """step eventloop just once"""
280 """step eventloop just once"""
281 if self.control_stream:
281 if self.control_stream:
282 self.control_stream.flush()
282 self.control_stream.flush()
283 for stream in self.shell_streams:
283 for stream in self.shell_streams:
284 # handle at most one request per iteration
284 # handle at most one request per iteration
285 stream.flush(zmq.POLLIN, 1)
285 stream.flush(zmq.POLLIN, 1)
286 stream.flush(zmq.POLLOUT)
286 stream.flush(zmq.POLLOUT)
287
287
288
288
289 def record_ports(self, ports):
289 def record_ports(self, ports):
290 """Record the ports that this kernel is using.
290 """Record the ports that this kernel is using.
291
291
292 The creator of the Kernel instance must call this methods if they
292 The creator of the Kernel instance must call this methods if they
293 want the :meth:`connect_request` method to return the port numbers.
293 want the :meth:`connect_request` method to return the port numbers.
294 """
294 """
295 self._recorded_ports = ports
295 self._recorded_ports = ports
296
296
297 #---------------------------------------------------------------------------
297 #---------------------------------------------------------------------------
298 # Kernel request handlers
298 # Kernel request handlers
299 #---------------------------------------------------------------------------
299 #---------------------------------------------------------------------------
300
300
301 def _make_metadata(self, other=None):
301 def _make_metadata(self, other=None):
302 """init metadata dict, for execute/apply_reply"""
302 """init metadata dict, for execute/apply_reply"""
303 new_md = {
303 new_md = {
304 'dependencies_met' : True,
304 'dependencies_met' : True,
305 'engine' : self.ident,
305 'engine' : self.ident,
306 'started': datetime.now(),
306 'started': datetime.now(),
307 }
307 }
308 if other:
308 if other:
309 new_md.update(other)
309 new_md.update(other)
310 return new_md
310 return new_md
311
311
312 def _publish_pyin(self, code, parent, execution_count):
312 def _publish_pyin(self, code, parent, execution_count):
313 """Publish the code request on the pyin stream."""
313 """Publish the code request on the pyin stream."""
314
314
315 self.session.send(self.iopub_socket, u'pyin',
315 self.session.send(self.iopub_socket, u'pyin',
316 {u'code':code, u'execution_count': execution_count},
316 {u'code':code, u'execution_count': execution_count},
317 parent=parent, ident=self._topic('pyin')
317 parent=parent, ident=self._topic('pyin')
318 )
318 )
319
319
320 def _publish_status(self, status, parent=None):
320 def _publish_status(self, status, parent=None):
321 """send status (busy/idle) on IOPub"""
321 """send status (busy/idle) on IOPub"""
322 self.session.send(self.iopub_socket,
322 self.session.send(self.iopub_socket,
323 u'status',
323 u'status',
324 {u'execution_state': status},
324 {u'execution_state': status},
325 parent=parent,
325 parent=parent,
326 ident=self._topic('status'),
326 ident=self._topic('status'),
327 )
327 )
328
328
329
329
330 def execute_request(self, stream, ident, parent):
330 def execute_request(self, stream, ident, parent):
331 """handle an execute_request"""
331 """handle an execute_request"""
332
332
333 self._publish_status(u'busy', parent)
333 self._publish_status(u'busy', parent)
334
334
335 try:
335 try:
336 content = parent[u'content']
336 content = parent[u'content']
337 code = content[u'code']
337 code = content[u'code']
338 silent = content[u'silent']
338 silent = content[u'silent']
339 store_history = content.get(u'store_history', not silent)
339 store_history = content.get(u'store_history', not silent)
340 except:
340 except:
341 self.log.error("Got bad msg: ")
341 self.log.error("Got bad msg: ")
342 self.log.error("%s", parent)
342 self.log.error("%s", parent)
343 return
343 return
344
344
345 md = self._make_metadata(parent['metadata'])
345 md = self._make_metadata(parent['metadata'])
346
346
347 shell = self.shell # we'll need this a lot here
347 shell = self.shell # we'll need this a lot here
348
348
349 # Replace raw_input. Note that is not sufficient to replace
349 # Replace raw_input. Note that is not sufficient to replace
350 # raw_input in the user namespace.
350 # raw_input in the user namespace.
351 if content.get('allow_stdin', False):
351 if content.get('allow_stdin', False):
352 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
352 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
353 else:
353 else:
354 raw_input = lambda prompt='' : self._no_raw_input()
354 raw_input = lambda prompt='' : self._no_raw_input()
355
355
356 if py3compat.PY3:
356 if py3compat.PY3:
357 self._sys_raw_input = __builtin__.input
357 self._sys_raw_input = __builtin__.input
358 __builtin__.input = raw_input
358 __builtin__.input = raw_input
359 else:
359 else:
360 self._sys_raw_input = __builtin__.raw_input
360 self._sys_raw_input = __builtin__.raw_input
361 __builtin__.raw_input = raw_input
361 __builtin__.raw_input = raw_input
362
362
363 # Set the parent message of the display hook and out streams.
363 # Set the parent message of the display hook and out streams.
364 shell.displayhook.set_parent(parent)
364 shell.displayhook.set_parent(parent)
365 shell.display_pub.set_parent(parent)
365 shell.display_pub.set_parent(parent)
366 shell.data_pub.set_parent(parent)
366 shell.data_pub.set_parent(parent)
367 sys.stdout.set_parent(parent)
367 sys.stdout.set_parent(parent)
368 sys.stderr.set_parent(parent)
368 sys.stderr.set_parent(parent)
369
369
370 # Re-broadcast our input for the benefit of listening clients, and
370 # Re-broadcast our input for the benefit of listening clients, and
371 # start computing output
371 # start computing output
372 if not silent:
372 if not silent:
373 self._publish_pyin(code, parent, shell.execution_count)
373 self._publish_pyin(code, parent, shell.execution_count)
374
374
375 reply_content = {}
375 reply_content = {}
376 try:
376 try:
377 # FIXME: the shell calls the exception handler itself.
377 # FIXME: the shell calls the exception handler itself.
378 shell.run_cell(code, store_history=store_history, silent=silent)
378 shell.run_cell(code, store_history=store_history, silent=silent)
379 except:
379 except:
380 status = u'error'
380 status = u'error'
381 # FIXME: this code right now isn't being used yet by default,
381 # FIXME: this code right now isn't being used yet by default,
382 # because the run_cell() call above directly fires off exception
382 # because the run_cell() call above directly fires off exception
383 # reporting. This code, therefore, is only active in the scenario
383 # reporting. This code, therefore, is only active in the scenario
384 # where runlines itself has an unhandled exception. We need to
384 # where runlines itself has an unhandled exception. We need to
385 # uniformize this, for all exception construction to come from a
385 # uniformize this, for all exception construction to come from a
386 # single location in the codbase.
386 # single location in the codbase.
387 etype, evalue, tb = sys.exc_info()
387 etype, evalue, tb = sys.exc_info()
388 tb_list = traceback.format_exception(etype, evalue, tb)
388 tb_list = traceback.format_exception(etype, evalue, tb)
389 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
389 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
390 else:
390 else:
391 status = u'ok'
391 status = u'ok'
392 finally:
392 finally:
393 # Restore raw_input.
393 # Restore raw_input.
394 if py3compat.PY3:
394 if py3compat.PY3:
395 __builtin__.input = self._sys_raw_input
395 __builtin__.input = self._sys_raw_input
396 else:
396 else:
397 __builtin__.raw_input = self._sys_raw_input
397 __builtin__.raw_input = self._sys_raw_input
398
398
399 reply_content[u'status'] = status
399 reply_content[u'status'] = status
400
400
401 # Return the execution counter so clients can display prompts
401 # Return the execution counter so clients can display prompts
402 reply_content['execution_count'] = shell.execution_count - 1
402 reply_content['execution_count'] = shell.execution_count - 1
403
403
404 # FIXME - fish exception info out of shell, possibly left there by
404 # FIXME - fish exception info out of shell, possibly left there by
405 # runlines. We'll need to clean up this logic later.
405 # runlines. We'll need to clean up this logic later.
406 if shell._reply_content is not None:
406 if shell._reply_content is not None:
407 reply_content.update(shell._reply_content)
407 reply_content.update(shell._reply_content)
408 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
408 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
409 reply_content['engine_info'] = e_info
409 reply_content['engine_info'] = e_info
410 # reset after use
410 # reset after use
411 shell._reply_content = None
411 shell._reply_content = None
412
412
413 if 'traceback' in reply_content:
413 if 'traceback' in reply_content:
414 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
414 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
415
415
416
416
417 # At this point, we can tell whether the main code execution succeeded
417 # At this point, we can tell whether the main code execution succeeded
418 # or not. If it did, we proceed to evaluate user_variables/expressions
418 # or not. If it did, we proceed to evaluate user_variables/expressions
419 if reply_content['status'] == 'ok':
419 if reply_content['status'] == 'ok':
420 reply_content[u'user_variables'] = \
420 reply_content[u'user_variables'] = \
421 shell.user_variables(content.get(u'user_variables', []))
421 shell.user_variables(content.get(u'user_variables', []))
422 reply_content[u'user_expressions'] = \
422 reply_content[u'user_expressions'] = \
423 shell.user_expressions(content.get(u'user_expressions', {}))
423 shell.user_expressions(content.get(u'user_expressions', {}))
424 else:
424 else:
425 # If there was an error, don't even try to compute variables or
425 # If there was an error, don't even try to compute variables or
426 # expressions
426 # expressions
427 reply_content[u'user_variables'] = {}
427 reply_content[u'user_variables'] = {}
428 reply_content[u'user_expressions'] = {}
428 reply_content[u'user_expressions'] = {}
429
429
430 # Payloads should be retrieved regardless of outcome, so we can both
430 # Payloads should be retrieved regardless of outcome, so we can both
431 # recover partial output (that could have been generated early in a
431 # recover partial output (that could have been generated early in a
432 # block, before an error) and clear the payload system always.
432 # block, before an error) and clear the payload system always.
433 reply_content[u'payload'] = shell.payload_manager.read_payload()
433 reply_content[u'payload'] = shell.payload_manager.read_payload()
434 # Be agressive about clearing the payload because we don't want
434 # Be agressive about clearing the payload because we don't want
435 # it to sit in memory until the next execute_request comes in.
435 # it to sit in memory until the next execute_request comes in.
436 shell.payload_manager.clear_payload()
436 shell.payload_manager.clear_payload()
437
437
438 # Flush output before sending the reply.
438 # Flush output before sending the reply.
439 sys.stdout.flush()
439 sys.stdout.flush()
440 sys.stderr.flush()
440 sys.stderr.flush()
441 # FIXME: on rare occasions, the flush doesn't seem to make it to the
441 # FIXME: on rare occasions, the flush doesn't seem to make it to the
442 # clients... This seems to mitigate the problem, but we definitely need
442 # clients... This seems to mitigate the problem, but we definitely need
443 # to better understand what's going on.
443 # to better understand what's going on.
444 if self._execute_sleep:
444 if self._execute_sleep:
445 time.sleep(self._execute_sleep)
445 time.sleep(self._execute_sleep)
446
446
447 # Send the reply.
447 # Send the reply.
448 reply_content = json_clean(reply_content)
448 reply_content = json_clean(reply_content)
449
449
450 md['status'] = reply_content['status']
450 md['status'] = reply_content['status']
451 if reply_content['status'] == 'error' and \
451 if reply_content['status'] == 'error' and \
452 reply_content['ename'] == 'UnmetDependency':
452 reply_content['ename'] == 'UnmetDependency':
453 md['dependencies_met'] = False
453 md['dependencies_met'] = False
454
454
455 reply_msg = self.session.send(stream, u'execute_reply',
455 reply_msg = self.session.send(stream, u'execute_reply',
456 reply_content, parent, metadata=md,
456 reply_content, parent, metadata=md,
457 ident=ident)
457 ident=ident)
458
458
459 self.log.debug("%s", reply_msg)
459 self.log.debug("%s", reply_msg)
460
460
461 if not silent and reply_msg['content']['status'] == u'error':
461 if not silent and reply_msg['content']['status'] == u'error':
462 self._abort_queues()
462 self._abort_queues()
463
463
464 self._publish_status(u'idle', parent)
464 self._publish_status(u'idle', parent)
465
465
466 def complete_request(self, stream, ident, parent):
466 def complete_request(self, stream, ident, parent):
467 txt, matches = self._complete(parent)
467 txt, matches = self._complete(parent)
468 matches = {'matches' : matches,
468 matches = {'matches' : matches,
469 'matched_text' : txt,
469 'matched_text' : txt,
470 'status' : 'ok'}
470 'status' : 'ok'}
471 matches = json_clean(matches)
471 matches = json_clean(matches)
472 completion_msg = self.session.send(stream, 'complete_reply',
472 completion_msg = self.session.send(stream, 'complete_reply',
473 matches, parent, ident)
473 matches, parent, ident)
474 self.log.debug("%s", completion_msg)
474 self.log.debug("%s", completion_msg)
475
475
476 def object_info_request(self, stream, ident, parent):
476 def object_info_request(self, stream, ident, parent):
477 content = parent['content']
477 content = parent['content']
478 object_info = self.shell.object_inspect(content['oname'],
478 object_info = self.shell.object_inspect(content['oname'],
479 detail_level = content.get('detail_level', 0)
479 detail_level = content.get('detail_level', 0)
480 )
480 )
481 # Before we send this object over, we scrub it for JSON usage
481 # Before we send this object over, we scrub it for JSON usage
482 oinfo = json_clean(object_info)
482 oinfo = json_clean(object_info)
483 msg = self.session.send(stream, 'object_info_reply',
483 msg = self.session.send(stream, 'object_info_reply',
484 oinfo, parent, ident)
484 oinfo, parent, ident)
485 self.log.debug("%s", msg)
485 self.log.debug("%s", msg)
486
486
487 def history_request(self, stream, ident, parent):
487 def history_request(self, stream, ident, parent):
488 # We need to pull these out, as passing **kwargs doesn't work with
488 # We need to pull these out, as passing **kwargs doesn't work with
489 # unicode keys before Python 2.6.5.
489 # unicode keys before Python 2.6.5.
490 hist_access_type = parent['content']['hist_access_type']
490 hist_access_type = parent['content']['hist_access_type']
491 raw = parent['content']['raw']
491 raw = parent['content']['raw']
492 output = parent['content']['output']
492 output = parent['content']['output']
493 if hist_access_type == 'tail':
493 if hist_access_type == 'tail':
494 n = parent['content']['n']
494 n = parent['content']['n']
495 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
495 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
496 include_latest=True)
496 include_latest=True)
497
497
498 elif hist_access_type == 'range':
498 elif hist_access_type == 'range':
499 session = parent['content']['session']
499 session = parent['content']['session']
500 start = parent['content']['start']
500 start = parent['content']['start']
501 stop = parent['content']['stop']
501 stop = parent['content']['stop']
502 hist = self.shell.history_manager.get_range(session, start, stop,
502 hist = self.shell.history_manager.get_range(session, start, stop,
503 raw=raw, output=output)
503 raw=raw, output=output)
504
504
505 elif hist_access_type == 'search':
505 elif hist_access_type == 'search':
506 n = parent['content'].get('n')
506 n = parent['content'].get('n')
507 unique = parent['content'].get('unique')
507 unique = parent['content'].get('unique', False)
508 pattern = parent['content']['pattern']
508 pattern = parent['content']['pattern']
509 hist = self.shell.history_manager.search(
509 hist = self.shell.history_manager.search(
510 pattern, raw=raw, output=output, n=n, unique=unique)
510 pattern, raw=raw, output=output, n=n, unique=unique)
511
511
512 else:
512 else:
513 hist = []
513 hist = []
514 hist = list(hist)
514 hist = list(hist)
515 content = {'history' : hist}
515 content = {'history' : hist}
516 content = json_clean(content)
516 content = json_clean(content)
517 msg = self.session.send(stream, 'history_reply',
517 msg = self.session.send(stream, 'history_reply',
518 content, parent, ident)
518 content, parent, ident)
519 self.log.debug("Sending history reply with %i entries", len(hist))
519 self.log.debug("Sending history reply with %i entries", len(hist))
520
520
521 def connect_request(self, stream, ident, parent):
521 def connect_request(self, stream, ident, parent):
522 if self._recorded_ports is not None:
522 if self._recorded_ports is not None:
523 content = self._recorded_ports.copy()
523 content = self._recorded_ports.copy()
524 else:
524 else:
525 content = {}
525 content = {}
526 msg = self.session.send(stream, 'connect_reply',
526 msg = self.session.send(stream, 'connect_reply',
527 content, parent, ident)
527 content, parent, ident)
528 self.log.debug("%s", msg)
528 self.log.debug("%s", msg)
529
529
530 def kernel_info_request(self, stream, ident, parent):
530 def kernel_info_request(self, stream, ident, parent):
531 vinfo = {
531 vinfo = {
532 'protocol_version': protocol_version,
532 'protocol_version': protocol_version,
533 'ipython_version': ipython_version,
533 'ipython_version': ipython_version,
534 'language_version': language_version,
534 'language_version': language_version,
535 'language': 'python',
535 'language': 'python',
536 }
536 }
537 msg = self.session.send(stream, 'kernel_info_reply',
537 msg = self.session.send(stream, 'kernel_info_reply',
538 vinfo, parent, ident)
538 vinfo, parent, ident)
539 self.log.debug("%s", msg)
539 self.log.debug("%s", msg)
540
540
541 def shutdown_request(self, stream, ident, parent):
541 def shutdown_request(self, stream, ident, parent):
542 self.shell.exit_now = True
542 self.shell.exit_now = True
543 content = dict(status='ok')
543 content = dict(status='ok')
544 content.update(parent['content'])
544 content.update(parent['content'])
545 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
545 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
546 # same content, but different msg_id for broadcasting on IOPub
546 # same content, but different msg_id for broadcasting on IOPub
547 self._shutdown_message = self.session.msg(u'shutdown_reply',
547 self._shutdown_message = self.session.msg(u'shutdown_reply',
548 content, parent
548 content, parent
549 )
549 )
550
550
551 self._at_shutdown()
551 self._at_shutdown()
552 # call sys.exit after a short delay
552 # call sys.exit after a short delay
553 loop = ioloop.IOLoop.instance()
553 loop = ioloop.IOLoop.instance()
554 loop.add_timeout(time.time()+0.1, loop.stop)
554 loop.add_timeout(time.time()+0.1, loop.stop)
555
555
556 #---------------------------------------------------------------------------
556 #---------------------------------------------------------------------------
557 # Engine methods
557 # Engine methods
558 #---------------------------------------------------------------------------
558 #---------------------------------------------------------------------------
559
559
560 def apply_request(self, stream, ident, parent):
560 def apply_request(self, stream, ident, parent):
561 try:
561 try:
562 content = parent[u'content']
562 content = parent[u'content']
563 bufs = parent[u'buffers']
563 bufs = parent[u'buffers']
564 msg_id = parent['header']['msg_id']
564 msg_id = parent['header']['msg_id']
565 except:
565 except:
566 self.log.error("Got bad msg: %s", parent, exc_info=True)
566 self.log.error("Got bad msg: %s", parent, exc_info=True)
567 return
567 return
568
568
569 self._publish_status(u'busy', parent)
569 self._publish_status(u'busy', parent)
570
570
571 # Set the parent message of the display hook and out streams.
571 # Set the parent message of the display hook and out streams.
572 shell = self.shell
572 shell = self.shell
573 shell.displayhook.set_parent(parent)
573 shell.displayhook.set_parent(parent)
574 shell.display_pub.set_parent(parent)
574 shell.display_pub.set_parent(parent)
575 shell.data_pub.set_parent(parent)
575 shell.data_pub.set_parent(parent)
576 sys.stdout.set_parent(parent)
576 sys.stdout.set_parent(parent)
577 sys.stderr.set_parent(parent)
577 sys.stderr.set_parent(parent)
578
578
579 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
579 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
580 # self.iopub_socket.send(pyin_msg)
580 # self.iopub_socket.send(pyin_msg)
581 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
581 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
582 md = self._make_metadata(parent['metadata'])
582 md = self._make_metadata(parent['metadata'])
583 try:
583 try:
584 working = shell.user_ns
584 working = shell.user_ns
585
585
586 prefix = "_"+str(msg_id).replace("-","")+"_"
586 prefix = "_"+str(msg_id).replace("-","")+"_"
587
587
588 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
588 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
589
589
590 fname = getattr(f, '__name__', 'f')
590 fname = getattr(f, '__name__', 'f')
591
591
592 fname = prefix+"f"
592 fname = prefix+"f"
593 argname = prefix+"args"
593 argname = prefix+"args"
594 kwargname = prefix+"kwargs"
594 kwargname = prefix+"kwargs"
595 resultname = prefix+"result"
595 resultname = prefix+"result"
596
596
597 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
597 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
598 # print ns
598 # print ns
599 working.update(ns)
599 working.update(ns)
600 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
600 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
601 try:
601 try:
602 exec code in shell.user_global_ns, shell.user_ns
602 exec code in shell.user_global_ns, shell.user_ns
603 result = working.get(resultname)
603 result = working.get(resultname)
604 finally:
604 finally:
605 for key in ns.iterkeys():
605 for key in ns.iterkeys():
606 working.pop(key)
606 working.pop(key)
607
607
608 result_buf = serialize_object(result,
608 result_buf = serialize_object(result,
609 buffer_threshold=self.session.buffer_threshold,
609 buffer_threshold=self.session.buffer_threshold,
610 item_threshold=self.session.item_threshold,
610 item_threshold=self.session.item_threshold,
611 )
611 )
612
612
613 except:
613 except:
614 # invoke IPython traceback formatting
614 # invoke IPython traceback formatting
615 shell.showtraceback()
615 shell.showtraceback()
616 # FIXME - fish exception info out of shell, possibly left there by
616 # FIXME - fish exception info out of shell, possibly left there by
617 # run_code. We'll need to clean up this logic later.
617 # run_code. We'll need to clean up this logic later.
618 reply_content = {}
618 reply_content = {}
619 if shell._reply_content is not None:
619 if shell._reply_content is not None:
620 reply_content.update(shell._reply_content)
620 reply_content.update(shell._reply_content)
621 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
621 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
622 reply_content['engine_info'] = e_info
622 reply_content['engine_info'] = e_info
623 # reset after use
623 # reset after use
624 shell._reply_content = None
624 shell._reply_content = None
625
625
626 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
626 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
627 ident=self._topic('pyerr'))
627 ident=self._topic('pyerr'))
628 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
628 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
629 result_buf = []
629 result_buf = []
630
630
631 if reply_content['ename'] == 'UnmetDependency':
631 if reply_content['ename'] == 'UnmetDependency':
632 md['dependencies_met'] = False
632 md['dependencies_met'] = False
633 else:
633 else:
634 reply_content = {'status' : 'ok'}
634 reply_content = {'status' : 'ok'}
635
635
636 # put 'ok'/'error' status in header, for scheduler introspection:
636 # put 'ok'/'error' status in header, for scheduler introspection:
637 md['status'] = reply_content['status']
637 md['status'] = reply_content['status']
638
638
639 # flush i/o
639 # flush i/o
640 sys.stdout.flush()
640 sys.stdout.flush()
641 sys.stderr.flush()
641 sys.stderr.flush()
642
642
643 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
643 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
644 parent=parent, ident=ident,buffers=result_buf, metadata=md)
644 parent=parent, ident=ident,buffers=result_buf, metadata=md)
645
645
646 self._publish_status(u'idle', parent)
646 self._publish_status(u'idle', parent)
647
647
648 #---------------------------------------------------------------------------
648 #---------------------------------------------------------------------------
649 # Control messages
649 # Control messages
650 #---------------------------------------------------------------------------
650 #---------------------------------------------------------------------------
651
651
652 def abort_request(self, stream, ident, parent):
652 def abort_request(self, stream, ident, parent):
653 """abort a specifig msg by id"""
653 """abort a specifig msg by id"""
654 msg_ids = parent['content'].get('msg_ids', None)
654 msg_ids = parent['content'].get('msg_ids', None)
655 if isinstance(msg_ids, basestring):
655 if isinstance(msg_ids, basestring):
656 msg_ids = [msg_ids]
656 msg_ids = [msg_ids]
657 if not msg_ids:
657 if not msg_ids:
658 self.abort_queues()
658 self.abort_queues()
659 for mid in msg_ids:
659 for mid in msg_ids:
660 self.aborted.add(str(mid))
660 self.aborted.add(str(mid))
661
661
662 content = dict(status='ok')
662 content = dict(status='ok')
663 reply_msg = self.session.send(stream, 'abort_reply', content=content,
663 reply_msg = self.session.send(stream, 'abort_reply', content=content,
664 parent=parent, ident=ident)
664 parent=parent, ident=ident)
665 self.log.debug("%s", reply_msg)
665 self.log.debug("%s", reply_msg)
666
666
667 def clear_request(self, stream, idents, parent):
667 def clear_request(self, stream, idents, parent):
668 """Clear our namespace."""
668 """Clear our namespace."""
669 self.shell.reset(False)
669 self.shell.reset(False)
670 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
670 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
671 content = dict(status='ok'))
671 content = dict(status='ok'))
672
672
673
673
674 #---------------------------------------------------------------------------
674 #---------------------------------------------------------------------------
675 # Protected interface
675 # Protected interface
676 #---------------------------------------------------------------------------
676 #---------------------------------------------------------------------------
677
677
678 def _wrap_exception(self, method=None):
678 def _wrap_exception(self, method=None):
679 # import here, because _wrap_exception is only used in parallel,
679 # import here, because _wrap_exception is only used in parallel,
680 # and parallel has higher min pyzmq version
680 # and parallel has higher min pyzmq version
681 from IPython.parallel.error import wrap_exception
681 from IPython.parallel.error import wrap_exception
682 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
682 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
683 content = wrap_exception(e_info)
683 content = wrap_exception(e_info)
684 return content
684 return content
685
685
686 def _topic(self, topic):
686 def _topic(self, topic):
687 """prefixed topic for IOPub messages"""
687 """prefixed topic for IOPub messages"""
688 if self.int_id >= 0:
688 if self.int_id >= 0:
689 base = "engine.%i" % self.int_id
689 base = "engine.%i" % self.int_id
690 else:
690 else:
691 base = "kernel.%s" % self.ident
691 base = "kernel.%s" % self.ident
692
692
693 return py3compat.cast_bytes("%s.%s" % (base, topic))
693 return py3compat.cast_bytes("%s.%s" % (base, topic))
694
694
695 def _abort_queues(self):
695 def _abort_queues(self):
696 for stream in self.shell_streams:
696 for stream in self.shell_streams:
697 if stream:
697 if stream:
698 self._abort_queue(stream)
698 self._abort_queue(stream)
699
699
700 def _abort_queue(self, stream):
700 def _abort_queue(self, stream):
701 poller = zmq.Poller()
701 poller = zmq.Poller()
702 poller.register(stream.socket, zmq.POLLIN)
702 poller.register(stream.socket, zmq.POLLIN)
703 while True:
703 while True:
704 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
704 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
705 if msg is None:
705 if msg is None:
706 return
706 return
707
707
708 self.log.info("Aborting:")
708 self.log.info("Aborting:")
709 self.log.info("%s", msg)
709 self.log.info("%s", msg)
710 msg_type = msg['header']['msg_type']
710 msg_type = msg['header']['msg_type']
711 reply_type = msg_type.split('_')[0] + '_reply'
711 reply_type = msg_type.split('_')[0] + '_reply'
712
712
713 status = {'status' : 'aborted'}
713 status = {'status' : 'aborted'}
714 md = {'engine' : self.ident}
714 md = {'engine' : self.ident}
715 md.update(status)
715 md.update(status)
716 reply_msg = self.session.send(stream, reply_type, metadata=md,
716 reply_msg = self.session.send(stream, reply_type, metadata=md,
717 content=status, parent=msg, ident=idents)
717 content=status, parent=msg, ident=idents)
718 self.log.debug("%s", reply_msg)
718 self.log.debug("%s", reply_msg)
719 # We need to wait a bit for requests to come in. This can probably
719 # We need to wait a bit for requests to come in. This can probably
720 # be set shorter for true asynchronous clients.
720 # be set shorter for true asynchronous clients.
721 poller.poll(50)
721 poller.poll(50)
722
722
723
723
724 def _no_raw_input(self):
724 def _no_raw_input(self):
725 """Raise StdinNotImplentedError if active frontend doesn't support
725 """Raise StdinNotImplentedError if active frontend doesn't support
726 stdin."""
726 stdin."""
727 raise StdinNotImplementedError("raw_input was called, but this "
727 raise StdinNotImplementedError("raw_input was called, but this "
728 "frontend does not support stdin.")
728 "frontend does not support stdin.")
729
729
730 def _raw_input(self, prompt, ident, parent):
730 def _raw_input(self, prompt, ident, parent):
731 # Flush output before making the request.
731 # Flush output before making the request.
732 sys.stderr.flush()
732 sys.stderr.flush()
733 sys.stdout.flush()
733 sys.stdout.flush()
734
734
735 # Send the input request.
735 # Send the input request.
736 content = json_clean(dict(prompt=prompt))
736 content = json_clean(dict(prompt=prompt))
737 self.session.send(self.stdin_socket, u'input_request', content, parent,
737 self.session.send(self.stdin_socket, u'input_request', content, parent,
738 ident=ident)
738 ident=ident)
739
739
740 # Await a response.
740 # Await a response.
741 while True:
741 while True:
742 try:
742 try:
743 ident, reply = self.session.recv(self.stdin_socket, 0)
743 ident, reply = self.session.recv(self.stdin_socket, 0)
744 except Exception:
744 except Exception:
745 self.log.warn("Invalid Message:", exc_info=True)
745 self.log.warn("Invalid Message:", exc_info=True)
746 else:
746 else:
747 break
747 break
748 try:
748 try:
749 value = reply['content']['value']
749 value = reply['content']['value']
750 except:
750 except:
751 self.log.error("Got bad raw_input reply: ")
751 self.log.error("Got bad raw_input reply: ")
752 self.log.error("%s", parent)
752 self.log.error("%s", parent)
753 value = ''
753 value = ''
754 if value == '\x04':
754 if value == '\x04':
755 # EOF
755 # EOF
756 raise EOFError
756 raise EOFError
757 return value
757 return value
758
758
759 def _complete(self, msg):
759 def _complete(self, msg):
760 c = msg['content']
760 c = msg['content']
761 try:
761 try:
762 cpos = int(c['cursor_pos'])
762 cpos = int(c['cursor_pos'])
763 except:
763 except:
764 # If we don't get something that we can convert to an integer, at
764 # If we don't get something that we can convert to an integer, at
765 # least attempt the completion guessing the cursor is at the end of
765 # least attempt the completion guessing the cursor is at the end of
766 # the text, if there's any, and otherwise of the line
766 # the text, if there's any, and otherwise of the line
767 cpos = len(c['text'])
767 cpos = len(c['text'])
768 if cpos==0:
768 if cpos==0:
769 cpos = len(c['line'])
769 cpos = len(c['line'])
770 return self.shell.complete(c['text'], c['line'], cpos)
770 return self.shell.complete(c['text'], c['line'], cpos)
771
771
772 def _at_shutdown(self):
772 def _at_shutdown(self):
773 """Actions taken at shutdown by the kernel, called by python's atexit.
773 """Actions taken at shutdown by the kernel, called by python's atexit.
774 """
774 """
775 # io.rprint("Kernel at_shutdown") # dbg
775 # io.rprint("Kernel at_shutdown") # dbg
776 if self._shutdown_message is not None:
776 if self._shutdown_message is not None:
777 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
777 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
778 self.log.debug("%s", self._shutdown_message)
778 self.log.debug("%s", self._shutdown_message)
779 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
779 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
780
780
General Comments 0
You need to be logged in to leave comments. Login now