##// END OF EJS Templates
Don't die if stderr/stdout do not support set_parent() #2925...
y-p -
Show More
@@ -1,780 +1,792 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple interactive kernel that talks to a frontend over 0MQ.
2 """A simple interactive kernel that talks to a frontend over 0MQ.
3
3
4 Things to do:
4 Things to do:
5
5
6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 call set_parent on all the PUB objects with the message about to be executed.
7 call set_parent on all the PUB objects with the message about to be executed.
8 * Implement random port and security key logic.
8 * Implement random port and security key logic.
9 * Implement control messages.
9 * Implement control messages.
10 * Implement event loop and poll version.
10 * Implement event loop and poll version.
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 # Standard library imports
18 # Standard library imports
19 import __builtin__
19 import __builtin__
20 import sys
20 import sys
21 import time
21 import time
22 import traceback
22 import traceback
23 import logging
23 import logging
24 import uuid
24 import uuid
25
25
26 from datetime import datetime
26 from datetime import datetime
27 from signal import (
27 from signal import (
28 signal, getsignal, default_int_handler, SIGINT, SIG_IGN
28 signal, getsignal, default_int_handler, SIGINT, SIG_IGN
29 )
29 )
30
30
31 # System library imports
31 # System library imports
32 import zmq
32 import zmq
33 from zmq.eventloop import ioloop
33 from zmq.eventloop import ioloop
34 from zmq.eventloop.zmqstream import ZMQStream
34 from zmq.eventloop.zmqstream import ZMQStream
35
35
36 # Local imports
36 # Local imports
37 from IPython.config.configurable import Configurable
37 from IPython.config.configurable import Configurable
38 from IPython.core.error import StdinNotImplementedError
38 from IPython.core.error import StdinNotImplementedError
39 from IPython.core import release
39 from IPython.core import release
40 from IPython.utils import io
40 from IPython.utils import io
41 from IPython.utils import py3compat
41 from IPython.utils import py3compat
42 from IPython.utils.jsonutil import json_clean
42 from IPython.utils.jsonutil import json_clean
43 from IPython.utils.traitlets import (
43 from IPython.utils.traitlets import (
44 Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode,
44 Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode,
45 Type
45 Type
46 )
46 )
47
47
48 from serialize import serialize_object, unpack_apply_message
48 from serialize import serialize_object, unpack_apply_message
49 from session import Session
49 from session import Session
50 from zmqshell import ZMQInteractiveShell
50 from zmqshell import ZMQInteractiveShell
51
51
52
52
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54 # Main kernel class
54 # Main kernel class
55 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
56
56
57 protocol_version = list(release.kernel_protocol_version_info)
57 protocol_version = list(release.kernel_protocol_version_info)
58 ipython_version = list(release.version_info)
58 ipython_version = list(release.version_info)
59 language_version = list(sys.version_info[:3])
59 language_version = list(sys.version_info[:3])
60
60
61
61
62 class Kernel(Configurable):
62 class Kernel(Configurable):
63
63
64 #---------------------------------------------------------------------------
64 #---------------------------------------------------------------------------
65 # Kernel interface
65 # Kernel interface
66 #---------------------------------------------------------------------------
66 #---------------------------------------------------------------------------
67
67
68 # attribute to override with a GUI
68 # attribute to override with a GUI
69 eventloop = Any(None)
69 eventloop = Any(None)
70 def _eventloop_changed(self, name, old, new):
70 def _eventloop_changed(self, name, old, new):
71 """schedule call to eventloop from IOLoop"""
71 """schedule call to eventloop from IOLoop"""
72 loop = ioloop.IOLoop.instance()
72 loop = ioloop.IOLoop.instance()
73 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
73 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
74
74
75 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
75 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
76 shell_class = Type(ZMQInteractiveShell)
76 shell_class = Type(ZMQInteractiveShell)
77
77
78 session = Instance(Session)
78 session = Instance(Session)
79 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
79 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
80 shell_streams = List()
80 shell_streams = List()
81 control_stream = Instance(ZMQStream)
81 control_stream = Instance(ZMQStream)
82 iopub_socket = Instance(zmq.Socket)
82 iopub_socket = Instance(zmq.Socket)
83 stdin_socket = Instance(zmq.Socket)
83 stdin_socket = Instance(zmq.Socket)
84 log = Instance(logging.Logger)
84 log = Instance(logging.Logger)
85
85
86 user_module = Any()
86 user_module = Any()
87 def _user_module_changed(self, name, old, new):
87 def _user_module_changed(self, name, old, new):
88 if self.shell is not None:
88 if self.shell is not None:
89 self.shell.user_module = new
89 self.shell.user_module = new
90
90
91 user_ns = Dict(default_value=None)
91 user_ns = Dict(default_value=None)
92 def _user_ns_changed(self, name, old, new):
92 def _user_ns_changed(self, name, old, new):
93 if self.shell is not None:
93 if self.shell is not None:
94 self.shell.user_ns = new
94 self.shell.user_ns = new
95 self.shell.init_user_ns()
95 self.shell.init_user_ns()
96
96
97 # identities:
97 # identities:
98 int_id = Integer(-1)
98 int_id = Integer(-1)
99 ident = Unicode()
99 ident = Unicode()
100
100
101 def _ident_default(self):
101 def _ident_default(self):
102 return unicode(uuid.uuid4())
102 return unicode(uuid.uuid4())
103
103
104
104
105 # Private interface
105 # Private interface
106
106
107 # Time to sleep after flushing the stdout/err buffers in each execute
107 # Time to sleep after flushing the stdout/err buffers in each execute
108 # cycle. While this introduces a hard limit on the minimal latency of the
108 # cycle. While this introduces a hard limit on the minimal latency of the
109 # execute cycle, it helps prevent output synchronization problems for
109 # execute cycle, it helps prevent output synchronization problems for
110 # clients.
110 # clients.
111 # Units are in seconds. The minimum zmq latency on local host is probably
111 # Units are in seconds. The minimum zmq latency on local host is probably
112 # ~150 microseconds, set this to 500us for now. We may need to increase it
112 # ~150 microseconds, set this to 500us for now. We may need to increase it
113 # a little if it's not enough after more interactive testing.
113 # a little if it's not enough after more interactive testing.
114 _execute_sleep = Float(0.0005, config=True)
114 _execute_sleep = Float(0.0005, config=True)
115
115
116 # Frequency of the kernel's event loop.
116 # Frequency of the kernel's event loop.
117 # Units are in seconds, kernel subclasses for GUI toolkits may need to
117 # Units are in seconds, kernel subclasses for GUI toolkits may need to
118 # adapt to milliseconds.
118 # adapt to milliseconds.
119 _poll_interval = Float(0.05, config=True)
119 _poll_interval = Float(0.05, config=True)
120
120
121 # If the shutdown was requested over the network, we leave here the
121 # If the shutdown was requested over the network, we leave here the
122 # necessary reply message so it can be sent by our registered atexit
122 # necessary reply message so it can be sent by our registered atexit
123 # handler. This ensures that the reply is only sent to clients truly at
123 # handler. This ensures that the reply is only sent to clients truly at
124 # the end of our shutdown process (which happens after the underlying
124 # the end of our shutdown process (which happens after the underlying
125 # IPython shell's own shutdown).
125 # IPython shell's own shutdown).
126 _shutdown_message = None
126 _shutdown_message = None
127
127
128 # This is a dict of port number that the kernel is listening on. It is set
128 # This is a dict of port number that the kernel is listening on. It is set
129 # by record_ports and used by connect_request.
129 # by record_ports and used by connect_request.
130 _recorded_ports = Dict()
130 _recorded_ports = Dict()
131
131
132 # A reference to the Python builtin 'raw_input' function.
132 # A reference to the Python builtin 'raw_input' function.
133 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
133 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
134 _sys_raw_input = Any()
134 _sys_raw_input = Any()
135
135
136 # set of aborted msg_ids
136 # set of aborted msg_ids
137 aborted = Set()
137 aborted = Set()
138
138
139
139
140 def __init__(self, **kwargs):
140 def __init__(self, **kwargs):
141 super(Kernel, self).__init__(**kwargs)
141 super(Kernel, self).__init__(**kwargs)
142
142
143 # Initialize the InteractiveShell subclass
143 # Initialize the InteractiveShell subclass
144 self.shell = self.shell_class.instance(config=self.config,
144 self.shell = self.shell_class.instance(config=self.config,
145 profile_dir = self.profile_dir,
145 profile_dir = self.profile_dir,
146 user_module = self.user_module,
146 user_module = self.user_module,
147 user_ns = self.user_ns,
147 user_ns = self.user_ns,
148 )
148 )
149 self.shell.displayhook.session = self.session
149 self.shell.displayhook.session = self.session
150 self.shell.displayhook.pub_socket = self.iopub_socket
150 self.shell.displayhook.pub_socket = self.iopub_socket
151 self.shell.displayhook.topic = self._topic('pyout')
151 self.shell.displayhook.topic = self._topic('pyout')
152 self.shell.display_pub.session = self.session
152 self.shell.display_pub.session = self.session
153 self.shell.display_pub.pub_socket = self.iopub_socket
153 self.shell.display_pub.pub_socket = self.iopub_socket
154 self.shell.data_pub.session = self.session
154 self.shell.data_pub.session = self.session
155 self.shell.data_pub.pub_socket = self.iopub_socket
155 self.shell.data_pub.pub_socket = self.iopub_socket
156
156
157 # TMP - hack while developing
157 # TMP - hack while developing
158 self.shell._reply_content = None
158 self.shell._reply_content = None
159
159
160 # Build dict of handlers for message types
160 # Build dict of handlers for message types
161 msg_types = [ 'execute_request', 'complete_request',
161 msg_types = [ 'execute_request', 'complete_request',
162 'object_info_request', 'history_request',
162 'object_info_request', 'history_request',
163 'kernel_info_request',
163 'kernel_info_request',
164 'connect_request', 'shutdown_request',
164 'connect_request', 'shutdown_request',
165 'apply_request',
165 'apply_request',
166 ]
166 ]
167 self.shell_handlers = {}
167 self.shell_handlers = {}
168 for msg_type in msg_types:
168 for msg_type in msg_types:
169 self.shell_handlers[msg_type] = getattr(self, msg_type)
169 self.shell_handlers[msg_type] = getattr(self, msg_type)
170
170
171 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
171 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
172 self.control_handlers = {}
172 self.control_handlers = {}
173 for msg_type in control_msg_types:
173 for msg_type in control_msg_types:
174 self.control_handlers[msg_type] = getattr(self, msg_type)
174 self.control_handlers[msg_type] = getattr(self, msg_type)
175
175
176 def dispatch_control(self, msg):
176 def dispatch_control(self, msg):
177 """dispatch control requests"""
177 """dispatch control requests"""
178 idents,msg = self.session.feed_identities(msg, copy=False)
178 idents,msg = self.session.feed_identities(msg, copy=False)
179 try:
179 try:
180 msg = self.session.unserialize(msg, content=True, copy=False)
180 msg = self.session.unserialize(msg, content=True, copy=False)
181 except:
181 except:
182 self.log.error("Invalid Control Message", exc_info=True)
182 self.log.error("Invalid Control Message", exc_info=True)
183 return
183 return
184
184
185 self.log.debug("Control received: %s", msg)
185 self.log.debug("Control received: %s", msg)
186
186
187 header = msg['header']
187 header = msg['header']
188 msg_id = header['msg_id']
188 msg_id = header['msg_id']
189 msg_type = header['msg_type']
189 msg_type = header['msg_type']
190
190
191 handler = self.control_handlers.get(msg_type, None)
191 handler = self.control_handlers.get(msg_type, None)
192 if handler is None:
192 if handler is None:
193 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
193 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
194 else:
194 else:
195 try:
195 try:
196 handler(self.control_stream, idents, msg)
196 handler(self.control_stream, idents, msg)
197 except Exception:
197 except Exception:
198 self.log.error("Exception in control handler:", exc_info=True)
198 self.log.error("Exception in control handler:", exc_info=True)
199
199
200 def dispatch_shell(self, stream, msg):
200 def dispatch_shell(self, stream, msg):
201 """dispatch shell requests"""
201 """dispatch shell requests"""
202 # flush control requests first
202 # flush control requests first
203 if self.control_stream:
203 if self.control_stream:
204 self.control_stream.flush()
204 self.control_stream.flush()
205
205
206 idents,msg = self.session.feed_identities(msg, copy=False)
206 idents,msg = self.session.feed_identities(msg, copy=False)
207 try:
207 try:
208 msg = self.session.unserialize(msg, content=True, copy=False)
208 msg = self.session.unserialize(msg, content=True, copy=False)
209 except:
209 except:
210 self.log.error("Invalid Message", exc_info=True)
210 self.log.error("Invalid Message", exc_info=True)
211 return
211 return
212
212
213 header = msg['header']
213 header = msg['header']
214 msg_id = header['msg_id']
214 msg_id = header['msg_id']
215 msg_type = msg['header']['msg_type']
215 msg_type = msg['header']['msg_type']
216
216
217 # Print some info about this message and leave a '--->' marker, so it's
217 # Print some info about this message and leave a '--->' marker, so it's
218 # easier to trace visually the message chain when debugging. Each
218 # easier to trace visually the message chain when debugging. Each
219 # handler prints its message at the end.
219 # handler prints its message at the end.
220 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
220 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
221 self.log.debug(' Content: %s\n --->\n ', msg['content'])
221 self.log.debug(' Content: %s\n --->\n ', msg['content'])
222
222
223 if msg_id in self.aborted:
223 if msg_id in self.aborted:
224 self.aborted.remove(msg_id)
224 self.aborted.remove(msg_id)
225 # is it safe to assume a msg_id will not be resubmitted?
225 # is it safe to assume a msg_id will not be resubmitted?
226 reply_type = msg_type.split('_')[0] + '_reply'
226 reply_type = msg_type.split('_')[0] + '_reply'
227 status = {'status' : 'aborted'}
227 status = {'status' : 'aborted'}
228 md = {'engine' : self.ident}
228 md = {'engine' : self.ident}
229 md.update(status)
229 md.update(status)
230 reply_msg = self.session.send(stream, reply_type, metadata=md,
230 reply_msg = self.session.send(stream, reply_type, metadata=md,
231 content=status, parent=msg, ident=idents)
231 content=status, parent=msg, ident=idents)
232 return
232 return
233
233
234 handler = self.shell_handlers.get(msg_type, None)
234 handler = self.shell_handlers.get(msg_type, None)
235 if handler is None:
235 if handler is None:
236 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
236 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
237 else:
237 else:
238 # ensure default_int_handler during handler call
238 # ensure default_int_handler during handler call
239 sig = signal(SIGINT, default_int_handler)
239 sig = signal(SIGINT, default_int_handler)
240 try:
240 try:
241 handler(stream, idents, msg)
241 handler(stream, idents, msg)
242 except Exception:
242 except Exception:
243 self.log.error("Exception in message handler:", exc_info=True)
243 self.log.error("Exception in message handler:", exc_info=True)
244 finally:
244 finally:
245 signal(SIGINT, sig)
245 signal(SIGINT, sig)
246
246
247 def enter_eventloop(self):
247 def enter_eventloop(self):
248 """enter eventloop"""
248 """enter eventloop"""
249 self.log.info("entering eventloop")
249 self.log.info("entering eventloop")
250 # restore default_int_handler
250 # restore default_int_handler
251 signal(SIGINT, default_int_handler)
251 signal(SIGINT, default_int_handler)
252 while self.eventloop is not None:
252 while self.eventloop is not None:
253 try:
253 try:
254 self.eventloop(self)
254 self.eventloop(self)
255 except KeyboardInterrupt:
255 except KeyboardInterrupt:
256 # Ctrl-C shouldn't crash the kernel
256 # Ctrl-C shouldn't crash the kernel
257 self.log.error("KeyboardInterrupt caught in kernel")
257 self.log.error("KeyboardInterrupt caught in kernel")
258 continue
258 continue
259 else:
259 else:
260 # eventloop exited cleanly, this means we should stop (right?)
260 # eventloop exited cleanly, this means we should stop (right?)
261 self.eventloop = None
261 self.eventloop = None
262 break
262 break
263 self.log.info("exiting eventloop")
263 self.log.info("exiting eventloop")
264
264
265 def start(self):
265 def start(self):
266 """register dispatchers for streams"""
266 """register dispatchers for streams"""
267 self.shell.exit_now = False
267 self.shell.exit_now = False
268 if self.control_stream:
268 if self.control_stream:
269 self.control_stream.on_recv(self.dispatch_control, copy=False)
269 self.control_stream.on_recv(self.dispatch_control, copy=False)
270
270
271 def make_dispatcher(stream):
271 def make_dispatcher(stream):
272 def dispatcher(msg):
272 def dispatcher(msg):
273 return self.dispatch_shell(stream, msg)
273 return self.dispatch_shell(stream, msg)
274 return dispatcher
274 return dispatcher
275
275
276 for s in self.shell_streams:
276 for s in self.shell_streams:
277 s.on_recv(make_dispatcher(s), copy=False)
277 s.on_recv(make_dispatcher(s), copy=False)
278
278
279 def do_one_iteration(self):
279 def do_one_iteration(self):
280 """step eventloop just once"""
280 """step eventloop just once"""
281 if self.control_stream:
281 if self.control_stream:
282 self.control_stream.flush()
282 self.control_stream.flush()
283 for stream in self.shell_streams:
283 for stream in self.shell_streams:
284 # handle at most one request per iteration
284 # handle at most one request per iteration
285 stream.flush(zmq.POLLIN, 1)
285 stream.flush(zmq.POLLIN, 1)
286 stream.flush(zmq.POLLOUT)
286 stream.flush(zmq.POLLOUT)
287
287
288
288
289 def record_ports(self, ports):
289 def record_ports(self, ports):
290 """Record the ports that this kernel is using.
290 """Record the ports that this kernel is using.
291
291
292 The creator of the Kernel instance must call this methods if they
292 The creator of the Kernel instance must call this methods if they
293 want the :meth:`connect_request` method to return the port numbers.
293 want the :meth:`connect_request` method to return the port numbers.
294 """
294 """
295 self._recorded_ports = ports
295 self._recorded_ports = ports
296
296
297 #---------------------------------------------------------------------------
297 #---------------------------------------------------------------------------
298 # Kernel request handlers
298 # Kernel request handlers
299 #---------------------------------------------------------------------------
299 #---------------------------------------------------------------------------
300
300
301 def _make_metadata(self, other=None):
301 def _make_metadata(self, other=None):
302 """init metadata dict, for execute/apply_reply"""
302 """init metadata dict, for execute/apply_reply"""
303 new_md = {
303 new_md = {
304 'dependencies_met' : True,
304 'dependencies_met' : True,
305 'engine' : self.ident,
305 'engine' : self.ident,
306 'started': datetime.now(),
306 'started': datetime.now(),
307 }
307 }
308 if other:
308 if other:
309 new_md.update(other)
309 new_md.update(other)
310 return new_md
310 return new_md
311
311
312 def _publish_pyin(self, code, parent, execution_count):
312 def _publish_pyin(self, code, parent, execution_count):
313 """Publish the code request on the pyin stream."""
313 """Publish the code request on the pyin stream."""
314
314
315 self.session.send(self.iopub_socket, u'pyin',
315 self.session.send(self.iopub_socket, u'pyin',
316 {u'code':code, u'execution_count': execution_count},
316 {u'code':code, u'execution_count': execution_count},
317 parent=parent, ident=self._topic('pyin')
317 parent=parent, ident=self._topic('pyin')
318 )
318 )
319
319
320 def _publish_status(self, status, parent=None):
320 def _publish_status(self, status, parent=None):
321 """send status (busy/idle) on IOPub"""
321 """send status (busy/idle) on IOPub"""
322 self.session.send(self.iopub_socket,
322 self.session.send(self.iopub_socket,
323 u'status',
323 u'status',
324 {u'execution_state': status},
324 {u'execution_state': status},
325 parent=parent,
325 parent=parent,
326 ident=self._topic('status'),
326 ident=self._topic('status'),
327 )
327 )
328
328
329
329
330 def execute_request(self, stream, ident, parent):
330 def execute_request(self, stream, ident, parent):
331 """handle an execute_request"""
331 """handle an execute_request"""
332
332
333 self._publish_status(u'busy', parent)
333 self._publish_status(u'busy', parent)
334
334
335 try:
335 try:
336 content = parent[u'content']
336 content = parent[u'content']
337 code = content[u'code']
337 code = content[u'code']
338 silent = content[u'silent']
338 silent = content[u'silent']
339 store_history = content.get(u'store_history', not silent)
339 store_history = content.get(u'store_history', not silent)
340 except:
340 except:
341 self.log.error("Got bad msg: ")
341 self.log.error("Got bad msg: ")
342 self.log.error("%s", parent)
342 self.log.error("%s", parent)
343 return
343 return
344
344
345 md = self._make_metadata(parent['metadata'])
345 md = self._make_metadata(parent['metadata'])
346
346
347 shell = self.shell # we'll need this a lot here
347 shell = self.shell # we'll need this a lot here
348
348
349 # Replace raw_input. Note that is not sufficient to replace
349 # Replace raw_input. Note that is not sufficient to replace
350 # raw_input in the user namespace.
350 # raw_input in the user namespace.
351 if content.get('allow_stdin', False):
351 if content.get('allow_stdin', False):
352 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
352 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
353 else:
353 else:
354 raw_input = lambda prompt='' : self._no_raw_input()
354 raw_input = lambda prompt='' : self._no_raw_input()
355
355
356 if py3compat.PY3:
356 if py3compat.PY3:
357 self._sys_raw_input = __builtin__.input
357 self._sys_raw_input = __builtin__.input
358 __builtin__.input = raw_input
358 __builtin__.input = raw_input
359 else:
359 else:
360 self._sys_raw_input = __builtin__.raw_input
360 self._sys_raw_input = __builtin__.raw_input
361 __builtin__.raw_input = raw_input
361 __builtin__.raw_input = raw_input
362
362
363 # Set the parent message of the display hook and out streams.
363 # Set the parent message of the display hook and out streams.
364 shell.displayhook.set_parent(parent)
364 shell.displayhook.set_parent(parent)
365 shell.display_pub.set_parent(parent)
365 shell.display_pub.set_parent(parent)
366 shell.data_pub.set_parent(parent)
366 shell.data_pub.set_parent(parent)
367 sys.stdout.set_parent(parent)
367 try:
368 sys.stderr.set_parent(parent)
368 sys.stdout.set_parent(parent)
369 except AttributeError:
370 pass
371 try:
372 sys.stderr.set_parent(parent)
373 except AttributeError:
374 pass
369
375
370 # Re-broadcast our input for the benefit of listening clients, and
376 # Re-broadcast our input for the benefit of listening clients, and
371 # start computing output
377 # start computing output
372 if not silent:
378 if not silent:
373 self._publish_pyin(code, parent, shell.execution_count)
379 self._publish_pyin(code, parent, shell.execution_count)
374
380
375 reply_content = {}
381 reply_content = {}
376 try:
382 try:
377 # FIXME: the shell calls the exception handler itself.
383 # FIXME: the shell calls the exception handler itself.
378 shell.run_cell(code, store_history=store_history, silent=silent)
384 shell.run_cell(code, store_history=store_history, silent=silent)
379 except:
385 except:
380 status = u'error'
386 status = u'error'
381 # FIXME: this code right now isn't being used yet by default,
387 # FIXME: this code right now isn't being used yet by default,
382 # because the run_cell() call above directly fires off exception
388 # because the run_cell() call above directly fires off exception
383 # reporting. This code, therefore, is only active in the scenario
389 # reporting. This code, therefore, is only active in the scenario
384 # where runlines itself has an unhandled exception. We need to
390 # where runlines itself has an unhandled exception. We need to
385 # uniformize this, for all exception construction to come from a
391 # uniformize this, for all exception construction to come from a
386 # single location in the codbase.
392 # single location in the codbase.
387 etype, evalue, tb = sys.exc_info()
393 etype, evalue, tb = sys.exc_info()
388 tb_list = traceback.format_exception(etype, evalue, tb)
394 tb_list = traceback.format_exception(etype, evalue, tb)
389 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
395 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
390 else:
396 else:
391 status = u'ok'
397 status = u'ok'
392 finally:
398 finally:
393 # Restore raw_input.
399 # Restore raw_input.
394 if py3compat.PY3:
400 if py3compat.PY3:
395 __builtin__.input = self._sys_raw_input
401 __builtin__.input = self._sys_raw_input
396 else:
402 else:
397 __builtin__.raw_input = self._sys_raw_input
403 __builtin__.raw_input = self._sys_raw_input
398
404
399 reply_content[u'status'] = status
405 reply_content[u'status'] = status
400
406
401 # Return the execution counter so clients can display prompts
407 # Return the execution counter so clients can display prompts
402 reply_content['execution_count'] = shell.execution_count - 1
408 reply_content['execution_count'] = shell.execution_count - 1
403
409
404 # FIXME - fish exception info out of shell, possibly left there by
410 # FIXME - fish exception info out of shell, possibly left there by
405 # runlines. We'll need to clean up this logic later.
411 # runlines. We'll need to clean up this logic later.
406 if shell._reply_content is not None:
412 if shell._reply_content is not None:
407 reply_content.update(shell._reply_content)
413 reply_content.update(shell._reply_content)
408 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
414 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
409 reply_content['engine_info'] = e_info
415 reply_content['engine_info'] = e_info
410 # reset after use
416 # reset after use
411 shell._reply_content = None
417 shell._reply_content = None
412
418
413 if 'traceback' in reply_content:
419 if 'traceback' in reply_content:
414 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
420 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
415
421
416
422
417 # At this point, we can tell whether the main code execution succeeded
423 # At this point, we can tell whether the main code execution succeeded
418 # or not. If it did, we proceed to evaluate user_variables/expressions
424 # or not. If it did, we proceed to evaluate user_variables/expressions
419 if reply_content['status'] == 'ok':
425 if reply_content['status'] == 'ok':
420 reply_content[u'user_variables'] = \
426 reply_content[u'user_variables'] = \
421 shell.user_variables(content.get(u'user_variables', []))
427 shell.user_variables(content.get(u'user_variables', []))
422 reply_content[u'user_expressions'] = \
428 reply_content[u'user_expressions'] = \
423 shell.user_expressions(content.get(u'user_expressions', {}))
429 shell.user_expressions(content.get(u'user_expressions', {}))
424 else:
430 else:
425 # If there was an error, don't even try to compute variables or
431 # If there was an error, don't even try to compute variables or
426 # expressions
432 # expressions
427 reply_content[u'user_variables'] = {}
433 reply_content[u'user_variables'] = {}
428 reply_content[u'user_expressions'] = {}
434 reply_content[u'user_expressions'] = {}
429
435
430 # Payloads should be retrieved regardless of outcome, so we can both
436 # Payloads should be retrieved regardless of outcome, so we can both
431 # recover partial output (that could have been generated early in a
437 # recover partial output (that could have been generated early in a
432 # block, before an error) and clear the payload system always.
438 # block, before an error) and clear the payload system always.
433 reply_content[u'payload'] = shell.payload_manager.read_payload()
439 reply_content[u'payload'] = shell.payload_manager.read_payload()
434 # Be agressive about clearing the payload because we don't want
440 # Be agressive about clearing the payload because we don't want
435 # it to sit in memory until the next execute_request comes in.
441 # it to sit in memory until the next execute_request comes in.
436 shell.payload_manager.clear_payload()
442 shell.payload_manager.clear_payload()
437
443
438 # Flush output before sending the reply.
444 # Flush output before sending the reply.
439 sys.stdout.flush()
445 sys.stdout.flush()
440 sys.stderr.flush()
446 sys.stderr.flush()
441 # FIXME: on rare occasions, the flush doesn't seem to make it to the
447 # FIXME: on rare occasions, the flush doesn't seem to make it to the
442 # clients... This seems to mitigate the problem, but we definitely need
448 # clients... This seems to mitigate the problem, but we definitely need
443 # to better understand what's going on.
449 # to better understand what's going on.
444 if self._execute_sleep:
450 if self._execute_sleep:
445 time.sleep(self._execute_sleep)
451 time.sleep(self._execute_sleep)
446
452
447 # Send the reply.
453 # Send the reply.
448 reply_content = json_clean(reply_content)
454 reply_content = json_clean(reply_content)
449
455
450 md['status'] = reply_content['status']
456 md['status'] = reply_content['status']
451 if reply_content['status'] == 'error' and \
457 if reply_content['status'] == 'error' and \
452 reply_content['ename'] == 'UnmetDependency':
458 reply_content['ename'] == 'UnmetDependency':
453 md['dependencies_met'] = False
459 md['dependencies_met'] = False
454
460
455 reply_msg = self.session.send(stream, u'execute_reply',
461 reply_msg = self.session.send(stream, u'execute_reply',
456 reply_content, parent, metadata=md,
462 reply_content, parent, metadata=md,
457 ident=ident)
463 ident=ident)
458
464
459 self.log.debug("%s", reply_msg)
465 self.log.debug("%s", reply_msg)
460
466
461 if not silent and reply_msg['content']['status'] == u'error':
467 if not silent and reply_msg['content']['status'] == u'error':
462 self._abort_queues()
468 self._abort_queues()
463
469
464 self._publish_status(u'idle', parent)
470 self._publish_status(u'idle', parent)
465
471
466 def complete_request(self, stream, ident, parent):
472 def complete_request(self, stream, ident, parent):
467 txt, matches = self._complete(parent)
473 txt, matches = self._complete(parent)
468 matches = {'matches' : matches,
474 matches = {'matches' : matches,
469 'matched_text' : txt,
475 'matched_text' : txt,
470 'status' : 'ok'}
476 'status' : 'ok'}
471 matches = json_clean(matches)
477 matches = json_clean(matches)
472 completion_msg = self.session.send(stream, 'complete_reply',
478 completion_msg = self.session.send(stream, 'complete_reply',
473 matches, parent, ident)
479 matches, parent, ident)
474 self.log.debug("%s", completion_msg)
480 self.log.debug("%s", completion_msg)
475
481
476 def object_info_request(self, stream, ident, parent):
482 def object_info_request(self, stream, ident, parent):
477 content = parent['content']
483 content = parent['content']
478 object_info = self.shell.object_inspect(content['oname'],
484 object_info = self.shell.object_inspect(content['oname'],
479 detail_level = content.get('detail_level', 0)
485 detail_level = content.get('detail_level', 0)
480 )
486 )
481 # Before we send this object over, we scrub it for JSON usage
487 # Before we send this object over, we scrub it for JSON usage
482 oinfo = json_clean(object_info)
488 oinfo = json_clean(object_info)
483 msg = self.session.send(stream, 'object_info_reply',
489 msg = self.session.send(stream, 'object_info_reply',
484 oinfo, parent, ident)
490 oinfo, parent, ident)
485 self.log.debug("%s", msg)
491 self.log.debug("%s", msg)
486
492
487 def history_request(self, stream, ident, parent):
493 def history_request(self, stream, ident, parent):
488 # We need to pull these out, as passing **kwargs doesn't work with
494 # We need to pull these out, as passing **kwargs doesn't work with
489 # unicode keys before Python 2.6.5.
495 # unicode keys before Python 2.6.5.
490 hist_access_type = parent['content']['hist_access_type']
496 hist_access_type = parent['content']['hist_access_type']
491 raw = parent['content']['raw']
497 raw = parent['content']['raw']
492 output = parent['content']['output']
498 output = parent['content']['output']
493 if hist_access_type == 'tail':
499 if hist_access_type == 'tail':
494 n = parent['content']['n']
500 n = parent['content']['n']
495 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
501 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
496 include_latest=True)
502 include_latest=True)
497
503
498 elif hist_access_type == 'range':
504 elif hist_access_type == 'range':
499 session = parent['content']['session']
505 session = parent['content']['session']
500 start = parent['content']['start']
506 start = parent['content']['start']
501 stop = parent['content']['stop']
507 stop = parent['content']['stop']
502 hist = self.shell.history_manager.get_range(session, start, stop,
508 hist = self.shell.history_manager.get_range(session, start, stop,
503 raw=raw, output=output)
509 raw=raw, output=output)
504
510
505 elif hist_access_type == 'search':
511 elif hist_access_type == 'search':
506 n = parent['content'].get('n')
512 n = parent['content'].get('n')
507 unique = parent['content'].get('unique', False)
513 unique = parent['content'].get('unique', False)
508 pattern = parent['content']['pattern']
514 pattern = parent['content']['pattern']
509 hist = self.shell.history_manager.search(
515 hist = self.shell.history_manager.search(
510 pattern, raw=raw, output=output, n=n, unique=unique)
516 pattern, raw=raw, output=output, n=n, unique=unique)
511
517
512 else:
518 else:
513 hist = []
519 hist = []
514 hist = list(hist)
520 hist = list(hist)
515 content = {'history' : hist}
521 content = {'history' : hist}
516 content = json_clean(content)
522 content = json_clean(content)
517 msg = self.session.send(stream, 'history_reply',
523 msg = self.session.send(stream, 'history_reply',
518 content, parent, ident)
524 content, parent, ident)
519 self.log.debug("Sending history reply with %i entries", len(hist))
525 self.log.debug("Sending history reply with %i entries", len(hist))
520
526
521 def connect_request(self, stream, ident, parent):
527 def connect_request(self, stream, ident, parent):
522 if self._recorded_ports is not None:
528 if self._recorded_ports is not None:
523 content = self._recorded_ports.copy()
529 content = self._recorded_ports.copy()
524 else:
530 else:
525 content = {}
531 content = {}
526 msg = self.session.send(stream, 'connect_reply',
532 msg = self.session.send(stream, 'connect_reply',
527 content, parent, ident)
533 content, parent, ident)
528 self.log.debug("%s", msg)
534 self.log.debug("%s", msg)
529
535
530 def kernel_info_request(self, stream, ident, parent):
536 def kernel_info_request(self, stream, ident, parent):
531 vinfo = {
537 vinfo = {
532 'protocol_version': protocol_version,
538 'protocol_version': protocol_version,
533 'ipython_version': ipython_version,
539 'ipython_version': ipython_version,
534 'language_version': language_version,
540 'language_version': language_version,
535 'language': 'python',
541 'language': 'python',
536 }
542 }
537 msg = self.session.send(stream, 'kernel_info_reply',
543 msg = self.session.send(stream, 'kernel_info_reply',
538 vinfo, parent, ident)
544 vinfo, parent, ident)
539 self.log.debug("%s", msg)
545 self.log.debug("%s", msg)
540
546
541 def shutdown_request(self, stream, ident, parent):
547 def shutdown_request(self, stream, ident, parent):
542 self.shell.exit_now = True
548 self.shell.exit_now = True
543 content = dict(status='ok')
549 content = dict(status='ok')
544 content.update(parent['content'])
550 content.update(parent['content'])
545 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
551 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
546 # same content, but different msg_id for broadcasting on IOPub
552 # same content, but different msg_id for broadcasting on IOPub
547 self._shutdown_message = self.session.msg(u'shutdown_reply',
553 self._shutdown_message = self.session.msg(u'shutdown_reply',
548 content, parent
554 content, parent
549 )
555 )
550
556
551 self._at_shutdown()
557 self._at_shutdown()
552 # call sys.exit after a short delay
558 # call sys.exit after a short delay
553 loop = ioloop.IOLoop.instance()
559 loop = ioloop.IOLoop.instance()
554 loop.add_timeout(time.time()+0.1, loop.stop)
560 loop.add_timeout(time.time()+0.1, loop.stop)
555
561
556 #---------------------------------------------------------------------------
562 #---------------------------------------------------------------------------
557 # Engine methods
563 # Engine methods
558 #---------------------------------------------------------------------------
564 #---------------------------------------------------------------------------
559
565
560 def apply_request(self, stream, ident, parent):
566 def apply_request(self, stream, ident, parent):
561 try:
567 try:
562 content = parent[u'content']
568 content = parent[u'content']
563 bufs = parent[u'buffers']
569 bufs = parent[u'buffers']
564 msg_id = parent['header']['msg_id']
570 msg_id = parent['header']['msg_id']
565 except:
571 except:
566 self.log.error("Got bad msg: %s", parent, exc_info=True)
572 self.log.error("Got bad msg: %s", parent, exc_info=True)
567 return
573 return
568
574
569 self._publish_status(u'busy', parent)
575 self._publish_status(u'busy', parent)
570
576
571 # Set the parent message of the display hook and out streams.
577 # Set the parent message of the display hook and out streams.
572 shell = self.shell
578 shell = self.shell
573 shell.displayhook.set_parent(parent)
579 shell.displayhook.set_parent(parent)
574 shell.display_pub.set_parent(parent)
580 shell.display_pub.set_parent(parent)
575 shell.data_pub.set_parent(parent)
581 shell.data_pub.set_parent(parent)
576 sys.stdout.set_parent(parent)
582 try:
577 sys.stderr.set_parent(parent)
583 sys.stdout.set_parent(parent)
584 except AttributeError:
585 pass
586 try:
587 sys.stderr.set_parent(parent)
588 except AttributeError:
589 pass
578
590
579 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
591 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
580 # self.iopub_socket.send(pyin_msg)
592 # self.iopub_socket.send(pyin_msg)
581 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
593 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
582 md = self._make_metadata(parent['metadata'])
594 md = self._make_metadata(parent['metadata'])
583 try:
595 try:
584 working = shell.user_ns
596 working = shell.user_ns
585
597
586 prefix = "_"+str(msg_id).replace("-","")+"_"
598 prefix = "_"+str(msg_id).replace("-","")+"_"
587
599
588 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
600 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
589
601
590 fname = getattr(f, '__name__', 'f')
602 fname = getattr(f, '__name__', 'f')
591
603
592 fname = prefix+"f"
604 fname = prefix+"f"
593 argname = prefix+"args"
605 argname = prefix+"args"
594 kwargname = prefix+"kwargs"
606 kwargname = prefix+"kwargs"
595 resultname = prefix+"result"
607 resultname = prefix+"result"
596
608
597 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
609 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
598 # print ns
610 # print ns
599 working.update(ns)
611 working.update(ns)
600 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
612 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
601 try:
613 try:
602 exec code in shell.user_global_ns, shell.user_ns
614 exec code in shell.user_global_ns, shell.user_ns
603 result = working.get(resultname)
615 result = working.get(resultname)
604 finally:
616 finally:
605 for key in ns.iterkeys():
617 for key in ns.iterkeys():
606 working.pop(key)
618 working.pop(key)
607
619
608 result_buf = serialize_object(result,
620 result_buf = serialize_object(result,
609 buffer_threshold=self.session.buffer_threshold,
621 buffer_threshold=self.session.buffer_threshold,
610 item_threshold=self.session.item_threshold,
622 item_threshold=self.session.item_threshold,
611 )
623 )
612
624
613 except:
625 except:
614 # invoke IPython traceback formatting
626 # invoke IPython traceback formatting
615 shell.showtraceback()
627 shell.showtraceback()
616 # FIXME - fish exception info out of shell, possibly left there by
628 # FIXME - fish exception info out of shell, possibly left there by
617 # run_code. We'll need to clean up this logic later.
629 # run_code. We'll need to clean up this logic later.
618 reply_content = {}
630 reply_content = {}
619 if shell._reply_content is not None:
631 if shell._reply_content is not None:
620 reply_content.update(shell._reply_content)
632 reply_content.update(shell._reply_content)
621 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
633 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
622 reply_content['engine_info'] = e_info
634 reply_content['engine_info'] = e_info
623 # reset after use
635 # reset after use
624 shell._reply_content = None
636 shell._reply_content = None
625
637
626 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
638 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
627 ident=self._topic('pyerr'))
639 ident=self._topic('pyerr'))
628 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
640 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
629 result_buf = []
641 result_buf = []
630
642
631 if reply_content['ename'] == 'UnmetDependency':
643 if reply_content['ename'] == 'UnmetDependency':
632 md['dependencies_met'] = False
644 md['dependencies_met'] = False
633 else:
645 else:
634 reply_content = {'status' : 'ok'}
646 reply_content = {'status' : 'ok'}
635
647
636 # put 'ok'/'error' status in header, for scheduler introspection:
648 # put 'ok'/'error' status in header, for scheduler introspection:
637 md['status'] = reply_content['status']
649 md['status'] = reply_content['status']
638
650
639 # flush i/o
651 # flush i/o
640 sys.stdout.flush()
652 sys.stdout.flush()
641 sys.stderr.flush()
653 sys.stderr.flush()
642
654
643 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
655 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
644 parent=parent, ident=ident,buffers=result_buf, metadata=md)
656 parent=parent, ident=ident,buffers=result_buf, metadata=md)
645
657
646 self._publish_status(u'idle', parent)
658 self._publish_status(u'idle', parent)
647
659
648 #---------------------------------------------------------------------------
660 #---------------------------------------------------------------------------
649 # Control messages
661 # Control messages
650 #---------------------------------------------------------------------------
662 #---------------------------------------------------------------------------
651
663
652 def abort_request(self, stream, ident, parent):
664 def abort_request(self, stream, ident, parent):
653 """abort a specifig msg by id"""
665 """abort a specifig msg by id"""
654 msg_ids = parent['content'].get('msg_ids', None)
666 msg_ids = parent['content'].get('msg_ids', None)
655 if isinstance(msg_ids, basestring):
667 if isinstance(msg_ids, basestring):
656 msg_ids = [msg_ids]
668 msg_ids = [msg_ids]
657 if not msg_ids:
669 if not msg_ids:
658 self.abort_queues()
670 self.abort_queues()
659 for mid in msg_ids:
671 for mid in msg_ids:
660 self.aborted.add(str(mid))
672 self.aborted.add(str(mid))
661
673
662 content = dict(status='ok')
674 content = dict(status='ok')
663 reply_msg = self.session.send(stream, 'abort_reply', content=content,
675 reply_msg = self.session.send(stream, 'abort_reply', content=content,
664 parent=parent, ident=ident)
676 parent=parent, ident=ident)
665 self.log.debug("%s", reply_msg)
677 self.log.debug("%s", reply_msg)
666
678
667 def clear_request(self, stream, idents, parent):
679 def clear_request(self, stream, idents, parent):
668 """Clear our namespace."""
680 """Clear our namespace."""
669 self.shell.reset(False)
681 self.shell.reset(False)
670 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
682 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
671 content = dict(status='ok'))
683 content = dict(status='ok'))
672
684
673
685
674 #---------------------------------------------------------------------------
686 #---------------------------------------------------------------------------
675 # Protected interface
687 # Protected interface
676 #---------------------------------------------------------------------------
688 #---------------------------------------------------------------------------
677
689
678 def _wrap_exception(self, method=None):
690 def _wrap_exception(self, method=None):
679 # import here, because _wrap_exception is only used in parallel,
691 # import here, because _wrap_exception is only used in parallel,
680 # and parallel has higher min pyzmq version
692 # and parallel has higher min pyzmq version
681 from IPython.parallel.error import wrap_exception
693 from IPython.parallel.error import wrap_exception
682 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
694 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
683 content = wrap_exception(e_info)
695 content = wrap_exception(e_info)
684 return content
696 return content
685
697
686 def _topic(self, topic):
698 def _topic(self, topic):
687 """prefixed topic for IOPub messages"""
699 """prefixed topic for IOPub messages"""
688 if self.int_id >= 0:
700 if self.int_id >= 0:
689 base = "engine.%i" % self.int_id
701 base = "engine.%i" % self.int_id
690 else:
702 else:
691 base = "kernel.%s" % self.ident
703 base = "kernel.%s" % self.ident
692
704
693 return py3compat.cast_bytes("%s.%s" % (base, topic))
705 return py3compat.cast_bytes("%s.%s" % (base, topic))
694
706
695 def _abort_queues(self):
707 def _abort_queues(self):
696 for stream in self.shell_streams:
708 for stream in self.shell_streams:
697 if stream:
709 if stream:
698 self._abort_queue(stream)
710 self._abort_queue(stream)
699
711
700 def _abort_queue(self, stream):
712 def _abort_queue(self, stream):
701 poller = zmq.Poller()
713 poller = zmq.Poller()
702 poller.register(stream.socket, zmq.POLLIN)
714 poller.register(stream.socket, zmq.POLLIN)
703 while True:
715 while True:
704 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
716 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
705 if msg is None:
717 if msg is None:
706 return
718 return
707
719
708 self.log.info("Aborting:")
720 self.log.info("Aborting:")
709 self.log.info("%s", msg)
721 self.log.info("%s", msg)
710 msg_type = msg['header']['msg_type']
722 msg_type = msg['header']['msg_type']
711 reply_type = msg_type.split('_')[0] + '_reply'
723 reply_type = msg_type.split('_')[0] + '_reply'
712
724
713 status = {'status' : 'aborted'}
725 status = {'status' : 'aborted'}
714 md = {'engine' : self.ident}
726 md = {'engine' : self.ident}
715 md.update(status)
727 md.update(status)
716 reply_msg = self.session.send(stream, reply_type, metadata=md,
728 reply_msg = self.session.send(stream, reply_type, metadata=md,
717 content=status, parent=msg, ident=idents)
729 content=status, parent=msg, ident=idents)
718 self.log.debug("%s", reply_msg)
730 self.log.debug("%s", reply_msg)
719 # We need to wait a bit for requests to come in. This can probably
731 # We need to wait a bit for requests to come in. This can probably
720 # be set shorter for true asynchronous clients.
732 # be set shorter for true asynchronous clients.
721 poller.poll(50)
733 poller.poll(50)
722
734
723
735
724 def _no_raw_input(self):
736 def _no_raw_input(self):
725 """Raise StdinNotImplentedError if active frontend doesn't support
737 """Raise StdinNotImplentedError if active frontend doesn't support
726 stdin."""
738 stdin."""
727 raise StdinNotImplementedError("raw_input was called, but this "
739 raise StdinNotImplementedError("raw_input was called, but this "
728 "frontend does not support stdin.")
740 "frontend does not support stdin.")
729
741
730 def _raw_input(self, prompt, ident, parent):
742 def _raw_input(self, prompt, ident, parent):
731 # Flush output before making the request.
743 # Flush output before making the request.
732 sys.stderr.flush()
744 sys.stderr.flush()
733 sys.stdout.flush()
745 sys.stdout.flush()
734
746
735 # Send the input request.
747 # Send the input request.
736 content = json_clean(dict(prompt=prompt))
748 content = json_clean(dict(prompt=prompt))
737 self.session.send(self.stdin_socket, u'input_request', content, parent,
749 self.session.send(self.stdin_socket, u'input_request', content, parent,
738 ident=ident)
750 ident=ident)
739
751
740 # Await a response.
752 # Await a response.
741 while True:
753 while True:
742 try:
754 try:
743 ident, reply = self.session.recv(self.stdin_socket, 0)
755 ident, reply = self.session.recv(self.stdin_socket, 0)
744 except Exception:
756 except Exception:
745 self.log.warn("Invalid Message:", exc_info=True)
757 self.log.warn("Invalid Message:", exc_info=True)
746 else:
758 else:
747 break
759 break
748 try:
760 try:
749 value = reply['content']['value']
761 value = reply['content']['value']
750 except:
762 except:
751 self.log.error("Got bad raw_input reply: ")
763 self.log.error("Got bad raw_input reply: ")
752 self.log.error("%s", parent)
764 self.log.error("%s", parent)
753 value = ''
765 value = ''
754 if value == '\x04':
766 if value == '\x04':
755 # EOF
767 # EOF
756 raise EOFError
768 raise EOFError
757 return value
769 return value
758
770
759 def _complete(self, msg):
771 def _complete(self, msg):
760 c = msg['content']
772 c = msg['content']
761 try:
773 try:
762 cpos = int(c['cursor_pos'])
774 cpos = int(c['cursor_pos'])
763 except:
775 except:
764 # If we don't get something that we can convert to an integer, at
776 # If we don't get something that we can convert to an integer, at
765 # least attempt the completion guessing the cursor is at the end of
777 # least attempt the completion guessing the cursor is at the end of
766 # the text, if there's any, and otherwise of the line
778 # the text, if there's any, and otherwise of the line
767 cpos = len(c['text'])
779 cpos = len(c['text'])
768 if cpos==0:
780 if cpos==0:
769 cpos = len(c['line'])
781 cpos = len(c['line'])
770 return self.shell.complete(c['text'], c['line'], cpos)
782 return self.shell.complete(c['text'], c['line'], cpos)
771
783
772 def _at_shutdown(self):
784 def _at_shutdown(self):
773 """Actions taken at shutdown by the kernel, called by python's atexit.
785 """Actions taken at shutdown by the kernel, called by python's atexit.
774 """
786 """
775 # io.rprint("Kernel at_shutdown") # dbg
787 # io.rprint("Kernel at_shutdown") # dbg
776 if self._shutdown_message is not None:
788 if self._shutdown_message is not None:
777 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
789 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
778 self.log.debug("%s", self._shutdown_message)
790 self.log.debug("%s", self._shutdown_message)
779 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
791 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
780
792
General Comments 0
You need to be logged in to leave comments. Login now