##// END OF EJS Templates
ensure raw_input returns str in zmq shell...
MinRK -
Show More
@@ -1,804 +1,804 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple interactive kernel that talks to a frontend over 0MQ.
2 """A simple interactive kernel that talks to a frontend over 0MQ.
3
3
4 Things to do:
4 Things to do:
5
5
6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
6 * Implement `set_parent` logic. Right before doing exec, the Kernel should
7 call set_parent on all the PUB objects with the message about to be executed.
7 call set_parent on all the PUB objects with the message about to be executed.
8 * Implement random port and security key logic.
8 * Implement random port and security key logic.
9 * Implement control messages.
9 * Implement control messages.
10 * Implement event loop and poll version.
10 * Implement event loop and poll version.
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 # Standard library imports
18 # Standard library imports
19 import __builtin__
19 import __builtin__
20 import sys
20 import sys
21 import time
21 import time
22 import traceback
22 import traceback
23 import logging
23 import logging
24 import uuid
24 import uuid
25
25
26 from datetime import datetime
26 from datetime import datetime
27 from signal import (
27 from signal import (
28 signal, getsignal, default_int_handler, SIGINT, SIG_IGN
28 signal, getsignal, default_int_handler, SIGINT, SIG_IGN
29 )
29 )
30
30
31 # System library imports
31 # System library imports
32 import zmq
32 import zmq
33 from zmq.eventloop import ioloop
33 from zmq.eventloop import ioloop
34 from zmq.eventloop.zmqstream import ZMQStream
34 from zmq.eventloop.zmqstream import ZMQStream
35
35
36 # Local imports
36 # Local imports
37 from IPython.config.configurable import Configurable
37 from IPython.config.configurable import Configurable
38 from IPython.core.error import StdinNotImplementedError
38 from IPython.core.error import StdinNotImplementedError
39 from IPython.core import release
39 from IPython.core import release
40 from IPython.utils import io
40 from IPython.utils import io
41 from IPython.utils import py3compat
41 from IPython.utils import py3compat
42 from IPython.utils.jsonutil import json_clean
42 from IPython.utils.jsonutil import json_clean
43 from IPython.utils.traitlets import (
43 from IPython.utils.traitlets import (
44 Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode,
44 Any, Instance, Float, Dict, CaselessStrEnum, List, Set, Integer, Unicode,
45 Type
45 Type
46 )
46 )
47
47
48 from serialize import serialize_object, unpack_apply_message
48 from serialize import serialize_object, unpack_apply_message
49 from session import Session
49 from session import Session
50 from zmqshell import ZMQInteractiveShell
50 from zmqshell import ZMQInteractiveShell
51
51
52
52
53 #-----------------------------------------------------------------------------
53 #-----------------------------------------------------------------------------
54 # Main kernel class
54 # Main kernel class
55 #-----------------------------------------------------------------------------
55 #-----------------------------------------------------------------------------
56
56
57 protocol_version = list(release.kernel_protocol_version_info)
57 protocol_version = list(release.kernel_protocol_version_info)
58 ipython_version = list(release.version_info)
58 ipython_version = list(release.version_info)
59 language_version = list(sys.version_info[:3])
59 language_version = list(sys.version_info[:3])
60
60
61
61
62 class Kernel(Configurable):
62 class Kernel(Configurable):
63
63
64 #---------------------------------------------------------------------------
64 #---------------------------------------------------------------------------
65 # Kernel interface
65 # Kernel interface
66 #---------------------------------------------------------------------------
66 #---------------------------------------------------------------------------
67
67
68 # attribute to override with a GUI
68 # attribute to override with a GUI
69 eventloop = Any(None)
69 eventloop = Any(None)
70 def _eventloop_changed(self, name, old, new):
70 def _eventloop_changed(self, name, old, new):
71 """schedule call to eventloop from IOLoop"""
71 """schedule call to eventloop from IOLoop"""
72 loop = ioloop.IOLoop.instance()
72 loop = ioloop.IOLoop.instance()
73 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
73 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
74
74
75 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
75 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
76 shell_class = Type(ZMQInteractiveShell)
76 shell_class = Type(ZMQInteractiveShell)
77
77
78 session = Instance(Session)
78 session = Instance(Session)
79 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
79 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
80 shell_streams = List()
80 shell_streams = List()
81 control_stream = Instance(ZMQStream)
81 control_stream = Instance(ZMQStream)
82 iopub_socket = Instance(zmq.Socket)
82 iopub_socket = Instance(zmq.Socket)
83 stdin_socket = Instance(zmq.Socket)
83 stdin_socket = Instance(zmq.Socket)
84 log = Instance(logging.Logger)
84 log = Instance(logging.Logger)
85
85
86 user_module = Any()
86 user_module = Any()
87 def _user_module_changed(self, name, old, new):
87 def _user_module_changed(self, name, old, new):
88 if self.shell is not None:
88 if self.shell is not None:
89 self.shell.user_module = new
89 self.shell.user_module = new
90
90
91 user_ns = Dict(default_value=None)
91 user_ns = Dict(default_value=None)
92 def _user_ns_changed(self, name, old, new):
92 def _user_ns_changed(self, name, old, new):
93 if self.shell is not None:
93 if self.shell is not None:
94 self.shell.user_ns = new
94 self.shell.user_ns = new
95 self.shell.init_user_ns()
95 self.shell.init_user_ns()
96
96
97 # identities:
97 # identities:
98 int_id = Integer(-1)
98 int_id = Integer(-1)
99 ident = Unicode()
99 ident = Unicode()
100
100
101 def _ident_default(self):
101 def _ident_default(self):
102 return unicode(uuid.uuid4())
102 return unicode(uuid.uuid4())
103
103
104
104
105 # Private interface
105 # Private interface
106
106
107 # Time to sleep after flushing the stdout/err buffers in each execute
107 # Time to sleep after flushing the stdout/err buffers in each execute
108 # cycle. While this introduces a hard limit on the minimal latency of the
108 # cycle. While this introduces a hard limit on the minimal latency of the
109 # execute cycle, it helps prevent output synchronization problems for
109 # execute cycle, it helps prevent output synchronization problems for
110 # clients.
110 # clients.
111 # Units are in seconds. The minimum zmq latency on local host is probably
111 # Units are in seconds. The minimum zmq latency on local host is probably
112 # ~150 microseconds, set this to 500us for now. We may need to increase it
112 # ~150 microseconds, set this to 500us for now. We may need to increase it
113 # a little if it's not enough after more interactive testing.
113 # a little if it's not enough after more interactive testing.
114 _execute_sleep = Float(0.0005, config=True)
114 _execute_sleep = Float(0.0005, config=True)
115
115
116 # Frequency of the kernel's event loop.
116 # Frequency of the kernel's event loop.
117 # Units are in seconds, kernel subclasses for GUI toolkits may need to
117 # Units are in seconds, kernel subclasses for GUI toolkits may need to
118 # adapt to milliseconds.
118 # adapt to milliseconds.
119 _poll_interval = Float(0.05, config=True)
119 _poll_interval = Float(0.05, config=True)
120
120
121 # If the shutdown was requested over the network, we leave here the
121 # If the shutdown was requested over the network, we leave here the
122 # necessary reply message so it can be sent by our registered atexit
122 # necessary reply message so it can be sent by our registered atexit
123 # handler. This ensures that the reply is only sent to clients truly at
123 # handler. This ensures that the reply is only sent to clients truly at
124 # the end of our shutdown process (which happens after the underlying
124 # the end of our shutdown process (which happens after the underlying
125 # IPython shell's own shutdown).
125 # IPython shell's own shutdown).
126 _shutdown_message = None
126 _shutdown_message = None
127
127
128 # This is a dict of port number that the kernel is listening on. It is set
128 # This is a dict of port number that the kernel is listening on. It is set
129 # by record_ports and used by connect_request.
129 # by record_ports and used by connect_request.
130 _recorded_ports = Dict()
130 _recorded_ports = Dict()
131
131
132 # A reference to the Python builtin 'raw_input' function.
132 # A reference to the Python builtin 'raw_input' function.
133 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
133 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
134 _sys_raw_input = Any()
134 _sys_raw_input = Any()
135
135
136 # set of aborted msg_ids
136 # set of aborted msg_ids
137 aborted = Set()
137 aborted = Set()
138
138
139
139
140 def __init__(self, **kwargs):
140 def __init__(self, **kwargs):
141 super(Kernel, self).__init__(**kwargs)
141 super(Kernel, self).__init__(**kwargs)
142
142
143 # Initialize the InteractiveShell subclass
143 # Initialize the InteractiveShell subclass
144 self.shell = self.shell_class.instance(parent=self,
144 self.shell = self.shell_class.instance(parent=self,
145 profile_dir = self.profile_dir,
145 profile_dir = self.profile_dir,
146 user_module = self.user_module,
146 user_module = self.user_module,
147 user_ns = self.user_ns,
147 user_ns = self.user_ns,
148 )
148 )
149 self.shell.displayhook.session = self.session
149 self.shell.displayhook.session = self.session
150 self.shell.displayhook.pub_socket = self.iopub_socket
150 self.shell.displayhook.pub_socket = self.iopub_socket
151 self.shell.displayhook.topic = self._topic('pyout')
151 self.shell.displayhook.topic = self._topic('pyout')
152 self.shell.display_pub.session = self.session
152 self.shell.display_pub.session = self.session
153 self.shell.display_pub.pub_socket = self.iopub_socket
153 self.shell.display_pub.pub_socket = self.iopub_socket
154 self.shell.data_pub.session = self.session
154 self.shell.data_pub.session = self.session
155 self.shell.data_pub.pub_socket = self.iopub_socket
155 self.shell.data_pub.pub_socket = self.iopub_socket
156
156
157 # TMP - hack while developing
157 # TMP - hack while developing
158 self.shell._reply_content = None
158 self.shell._reply_content = None
159
159
160 # Build dict of handlers for message types
160 # Build dict of handlers for message types
161 msg_types = [ 'execute_request', 'complete_request',
161 msg_types = [ 'execute_request', 'complete_request',
162 'object_info_request', 'history_request',
162 'object_info_request', 'history_request',
163 'kernel_info_request',
163 'kernel_info_request',
164 'connect_request', 'shutdown_request',
164 'connect_request', 'shutdown_request',
165 'apply_request',
165 'apply_request',
166 ]
166 ]
167 self.shell_handlers = {}
167 self.shell_handlers = {}
168 for msg_type in msg_types:
168 for msg_type in msg_types:
169 self.shell_handlers[msg_type] = getattr(self, msg_type)
169 self.shell_handlers[msg_type] = getattr(self, msg_type)
170
170
171 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
171 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
172 self.control_handlers = {}
172 self.control_handlers = {}
173 for msg_type in control_msg_types:
173 for msg_type in control_msg_types:
174 self.control_handlers[msg_type] = getattr(self, msg_type)
174 self.control_handlers[msg_type] = getattr(self, msg_type)
175
175
176 def dispatch_control(self, msg):
176 def dispatch_control(self, msg):
177 """dispatch control requests"""
177 """dispatch control requests"""
178 idents,msg = self.session.feed_identities(msg, copy=False)
178 idents,msg = self.session.feed_identities(msg, copy=False)
179 try:
179 try:
180 msg = self.session.unserialize(msg, content=True, copy=False)
180 msg = self.session.unserialize(msg, content=True, copy=False)
181 except:
181 except:
182 self.log.error("Invalid Control Message", exc_info=True)
182 self.log.error("Invalid Control Message", exc_info=True)
183 return
183 return
184
184
185 self.log.debug("Control received: %s", msg)
185 self.log.debug("Control received: %s", msg)
186
186
187 header = msg['header']
187 header = msg['header']
188 msg_id = header['msg_id']
188 msg_id = header['msg_id']
189 msg_type = header['msg_type']
189 msg_type = header['msg_type']
190
190
191 handler = self.control_handlers.get(msg_type, None)
191 handler = self.control_handlers.get(msg_type, None)
192 if handler is None:
192 if handler is None:
193 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
193 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
194 else:
194 else:
195 try:
195 try:
196 handler(self.control_stream, idents, msg)
196 handler(self.control_stream, idents, msg)
197 except Exception:
197 except Exception:
198 self.log.error("Exception in control handler:", exc_info=True)
198 self.log.error("Exception in control handler:", exc_info=True)
199
199
200 def dispatch_shell(self, stream, msg):
200 def dispatch_shell(self, stream, msg):
201 """dispatch shell requests"""
201 """dispatch shell requests"""
202 # flush control requests first
202 # flush control requests first
203 if self.control_stream:
203 if self.control_stream:
204 self.control_stream.flush()
204 self.control_stream.flush()
205
205
206 idents,msg = self.session.feed_identities(msg, copy=False)
206 idents,msg = self.session.feed_identities(msg, copy=False)
207 try:
207 try:
208 msg = self.session.unserialize(msg, content=True, copy=False)
208 msg = self.session.unserialize(msg, content=True, copy=False)
209 except:
209 except:
210 self.log.error("Invalid Message", exc_info=True)
210 self.log.error("Invalid Message", exc_info=True)
211 return
211 return
212
212
213 header = msg['header']
213 header = msg['header']
214 msg_id = header['msg_id']
214 msg_id = header['msg_id']
215 msg_type = msg['header']['msg_type']
215 msg_type = msg['header']['msg_type']
216
216
217 # Print some info about this message and leave a '--->' marker, so it's
217 # Print some info about this message and leave a '--->' marker, so it's
218 # easier to trace visually the message chain when debugging. Each
218 # easier to trace visually the message chain when debugging. Each
219 # handler prints its message at the end.
219 # handler prints its message at the end.
220 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
220 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
221 self.log.debug(' Content: %s\n --->\n ', msg['content'])
221 self.log.debug(' Content: %s\n --->\n ', msg['content'])
222
222
223 if msg_id in self.aborted:
223 if msg_id in self.aborted:
224 self.aborted.remove(msg_id)
224 self.aborted.remove(msg_id)
225 # is it safe to assume a msg_id will not be resubmitted?
225 # is it safe to assume a msg_id will not be resubmitted?
226 reply_type = msg_type.split('_')[0] + '_reply'
226 reply_type = msg_type.split('_')[0] + '_reply'
227 status = {'status' : 'aborted'}
227 status = {'status' : 'aborted'}
228 md = {'engine' : self.ident}
228 md = {'engine' : self.ident}
229 md.update(status)
229 md.update(status)
230 reply_msg = self.session.send(stream, reply_type, metadata=md,
230 reply_msg = self.session.send(stream, reply_type, metadata=md,
231 content=status, parent=msg, ident=idents)
231 content=status, parent=msg, ident=idents)
232 return
232 return
233
233
234 handler = self.shell_handlers.get(msg_type, None)
234 handler = self.shell_handlers.get(msg_type, None)
235 if handler is None:
235 if handler is None:
236 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
236 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
237 else:
237 else:
238 # ensure default_int_handler during handler call
238 # ensure default_int_handler during handler call
239 sig = signal(SIGINT, default_int_handler)
239 sig = signal(SIGINT, default_int_handler)
240 try:
240 try:
241 handler(stream, idents, msg)
241 handler(stream, idents, msg)
242 except Exception:
242 except Exception:
243 self.log.error("Exception in message handler:", exc_info=True)
243 self.log.error("Exception in message handler:", exc_info=True)
244 finally:
244 finally:
245 signal(SIGINT, sig)
245 signal(SIGINT, sig)
246
246
247 def enter_eventloop(self):
247 def enter_eventloop(self):
248 """enter eventloop"""
248 """enter eventloop"""
249 self.log.info("entering eventloop")
249 self.log.info("entering eventloop")
250 # restore default_int_handler
250 # restore default_int_handler
251 signal(SIGINT, default_int_handler)
251 signal(SIGINT, default_int_handler)
252 while self.eventloop is not None:
252 while self.eventloop is not None:
253 try:
253 try:
254 self.eventloop(self)
254 self.eventloop(self)
255 except KeyboardInterrupt:
255 except KeyboardInterrupt:
256 # Ctrl-C shouldn't crash the kernel
256 # Ctrl-C shouldn't crash the kernel
257 self.log.error("KeyboardInterrupt caught in kernel")
257 self.log.error("KeyboardInterrupt caught in kernel")
258 continue
258 continue
259 else:
259 else:
260 # eventloop exited cleanly, this means we should stop (right?)
260 # eventloop exited cleanly, this means we should stop (right?)
261 self.eventloop = None
261 self.eventloop = None
262 break
262 break
263 self.log.info("exiting eventloop")
263 self.log.info("exiting eventloop")
264
264
265 def start(self):
265 def start(self):
266 """register dispatchers for streams"""
266 """register dispatchers for streams"""
267 self.shell.exit_now = False
267 self.shell.exit_now = False
268 if self.control_stream:
268 if self.control_stream:
269 self.control_stream.on_recv(self.dispatch_control, copy=False)
269 self.control_stream.on_recv(self.dispatch_control, copy=False)
270
270
271 def make_dispatcher(stream):
271 def make_dispatcher(stream):
272 def dispatcher(msg):
272 def dispatcher(msg):
273 return self.dispatch_shell(stream, msg)
273 return self.dispatch_shell(stream, msg)
274 return dispatcher
274 return dispatcher
275
275
276 for s in self.shell_streams:
276 for s in self.shell_streams:
277 s.on_recv(make_dispatcher(s), copy=False)
277 s.on_recv(make_dispatcher(s), copy=False)
278
278
279 # publish idle status
279 # publish idle status
280 self._publish_status('starting')
280 self._publish_status('starting')
281
281
282 def do_one_iteration(self):
282 def do_one_iteration(self):
283 """step eventloop just once"""
283 """step eventloop just once"""
284 if self.control_stream:
284 if self.control_stream:
285 self.control_stream.flush()
285 self.control_stream.flush()
286 for stream in self.shell_streams:
286 for stream in self.shell_streams:
287 # handle at most one request per iteration
287 # handle at most one request per iteration
288 stream.flush(zmq.POLLIN, 1)
288 stream.flush(zmq.POLLIN, 1)
289 stream.flush(zmq.POLLOUT)
289 stream.flush(zmq.POLLOUT)
290
290
291
291
292 def record_ports(self, ports):
292 def record_ports(self, ports):
293 """Record the ports that this kernel is using.
293 """Record the ports that this kernel is using.
294
294
295 The creator of the Kernel instance must call this methods if they
295 The creator of the Kernel instance must call this methods if they
296 want the :meth:`connect_request` method to return the port numbers.
296 want the :meth:`connect_request` method to return the port numbers.
297 """
297 """
298 self._recorded_ports = ports
298 self._recorded_ports = ports
299
299
300 #---------------------------------------------------------------------------
300 #---------------------------------------------------------------------------
301 # Kernel request handlers
301 # Kernel request handlers
302 #---------------------------------------------------------------------------
302 #---------------------------------------------------------------------------
303
303
304 def _make_metadata(self, other=None):
304 def _make_metadata(self, other=None):
305 """init metadata dict, for execute/apply_reply"""
305 """init metadata dict, for execute/apply_reply"""
306 new_md = {
306 new_md = {
307 'dependencies_met' : True,
307 'dependencies_met' : True,
308 'engine' : self.ident,
308 'engine' : self.ident,
309 'started': datetime.now(),
309 'started': datetime.now(),
310 }
310 }
311 if other:
311 if other:
312 new_md.update(other)
312 new_md.update(other)
313 return new_md
313 return new_md
314
314
315 def _publish_pyin(self, code, parent, execution_count):
315 def _publish_pyin(self, code, parent, execution_count):
316 """Publish the code request on the pyin stream."""
316 """Publish the code request on the pyin stream."""
317
317
318 self.session.send(self.iopub_socket, u'pyin',
318 self.session.send(self.iopub_socket, u'pyin',
319 {u'code':code, u'execution_count': execution_count},
319 {u'code':code, u'execution_count': execution_count},
320 parent=parent, ident=self._topic('pyin')
320 parent=parent, ident=self._topic('pyin')
321 )
321 )
322
322
323 def _publish_status(self, status, parent=None):
323 def _publish_status(self, status, parent=None):
324 """send status (busy/idle) on IOPub"""
324 """send status (busy/idle) on IOPub"""
325 self.session.send(self.iopub_socket,
325 self.session.send(self.iopub_socket,
326 u'status',
326 u'status',
327 {u'execution_state': status},
327 {u'execution_state': status},
328 parent=parent,
328 parent=parent,
329 ident=self._topic('status'),
329 ident=self._topic('status'),
330 )
330 )
331
331
332
332
333 def execute_request(self, stream, ident, parent):
333 def execute_request(self, stream, ident, parent):
334 """handle an execute_request"""
334 """handle an execute_request"""
335
335
336 self._publish_status(u'busy', parent)
336 self._publish_status(u'busy', parent)
337
337
338 try:
338 try:
339 content = parent[u'content']
339 content = parent[u'content']
340 code = content[u'code']
340 code = content[u'code']
341 silent = content[u'silent']
341 silent = content[u'silent']
342 store_history = content.get(u'store_history', not silent)
342 store_history = content.get(u'store_history', not silent)
343 except:
343 except:
344 self.log.error("Got bad msg: ")
344 self.log.error("Got bad msg: ")
345 self.log.error("%s", parent)
345 self.log.error("%s", parent)
346 return
346 return
347
347
348 md = self._make_metadata(parent['metadata'])
348 md = self._make_metadata(parent['metadata'])
349
349
350 shell = self.shell # we'll need this a lot here
350 shell = self.shell # we'll need this a lot here
351
351
352 # Replace raw_input. Note that is not sufficient to replace
352 # Replace raw_input. Note that is not sufficient to replace
353 # raw_input in the user namespace.
353 # raw_input in the user namespace.
354 if content.get('allow_stdin', False):
354 if content.get('allow_stdin', False):
355 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
355 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
356 else:
356 else:
357 raw_input = lambda prompt='' : self._no_raw_input()
357 raw_input = lambda prompt='' : self._no_raw_input()
358
358
359 if py3compat.PY3:
359 if py3compat.PY3:
360 self._sys_raw_input = __builtin__.input
360 self._sys_raw_input = __builtin__.input
361 __builtin__.input = raw_input
361 __builtin__.input = raw_input
362 else:
362 else:
363 self._sys_raw_input = __builtin__.raw_input
363 self._sys_raw_input = __builtin__.raw_input
364 __builtin__.raw_input = raw_input
364 __builtin__.raw_input = raw_input
365
365
366 # Set the parent message of the display hook and out streams.
366 # Set the parent message of the display hook and out streams.
367 shell.displayhook.set_parent(parent)
367 shell.displayhook.set_parent(parent)
368 shell.display_pub.set_parent(parent)
368 shell.display_pub.set_parent(parent)
369 shell.data_pub.set_parent(parent)
369 shell.data_pub.set_parent(parent)
370 try:
370 try:
371 sys.stdout.set_parent(parent)
371 sys.stdout.set_parent(parent)
372 except AttributeError:
372 except AttributeError:
373 pass
373 pass
374 try:
374 try:
375 sys.stderr.set_parent(parent)
375 sys.stderr.set_parent(parent)
376 except AttributeError:
376 except AttributeError:
377 pass
377 pass
378
378
379 # Re-broadcast our input for the benefit of listening clients, and
379 # Re-broadcast our input for the benefit of listening clients, and
380 # start computing output
380 # start computing output
381 if not silent:
381 if not silent:
382 self._publish_pyin(code, parent, shell.execution_count)
382 self._publish_pyin(code, parent, shell.execution_count)
383
383
384 reply_content = {}
384 reply_content = {}
385 try:
385 try:
386 # FIXME: the shell calls the exception handler itself.
386 # FIXME: the shell calls the exception handler itself.
387 shell.run_cell(code, store_history=store_history, silent=silent)
387 shell.run_cell(code, store_history=store_history, silent=silent)
388 except:
388 except:
389 status = u'error'
389 status = u'error'
390 # FIXME: this code right now isn't being used yet by default,
390 # FIXME: this code right now isn't being used yet by default,
391 # because the run_cell() call above directly fires off exception
391 # because the run_cell() call above directly fires off exception
392 # reporting. This code, therefore, is only active in the scenario
392 # reporting. This code, therefore, is only active in the scenario
393 # where runlines itself has an unhandled exception. We need to
393 # where runlines itself has an unhandled exception. We need to
394 # uniformize this, for all exception construction to come from a
394 # uniformize this, for all exception construction to come from a
395 # single location in the codbase.
395 # single location in the codbase.
396 etype, evalue, tb = sys.exc_info()
396 etype, evalue, tb = sys.exc_info()
397 tb_list = traceback.format_exception(etype, evalue, tb)
397 tb_list = traceback.format_exception(etype, evalue, tb)
398 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
398 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
399 else:
399 else:
400 status = u'ok'
400 status = u'ok'
401 finally:
401 finally:
402 # Restore raw_input.
402 # Restore raw_input.
403 if py3compat.PY3:
403 if py3compat.PY3:
404 __builtin__.input = self._sys_raw_input
404 __builtin__.input = self._sys_raw_input
405 else:
405 else:
406 __builtin__.raw_input = self._sys_raw_input
406 __builtin__.raw_input = self._sys_raw_input
407
407
408 reply_content[u'status'] = status
408 reply_content[u'status'] = status
409
409
410 # Return the execution counter so clients can display prompts
410 # Return the execution counter so clients can display prompts
411 reply_content['execution_count'] = shell.execution_count - 1
411 reply_content['execution_count'] = shell.execution_count - 1
412
412
413 # FIXME - fish exception info out of shell, possibly left there by
413 # FIXME - fish exception info out of shell, possibly left there by
414 # runlines. We'll need to clean up this logic later.
414 # runlines. We'll need to clean up this logic later.
415 if shell._reply_content is not None:
415 if shell._reply_content is not None:
416 reply_content.update(shell._reply_content)
416 reply_content.update(shell._reply_content)
417 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
417 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
418 reply_content['engine_info'] = e_info
418 reply_content['engine_info'] = e_info
419 # reset after use
419 # reset after use
420 shell._reply_content = None
420 shell._reply_content = None
421
421
422 if 'traceback' in reply_content:
422 if 'traceback' in reply_content:
423 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
423 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
424
424
425
425
426 # At this point, we can tell whether the main code execution succeeded
426 # At this point, we can tell whether the main code execution succeeded
427 # or not. If it did, we proceed to evaluate user_variables/expressions
427 # or not. If it did, we proceed to evaluate user_variables/expressions
428 if reply_content['status'] == 'ok':
428 if reply_content['status'] == 'ok':
429 reply_content[u'user_variables'] = \
429 reply_content[u'user_variables'] = \
430 shell.user_variables(content.get(u'user_variables', []))
430 shell.user_variables(content.get(u'user_variables', []))
431 reply_content[u'user_expressions'] = \
431 reply_content[u'user_expressions'] = \
432 shell.user_expressions(content.get(u'user_expressions', {}))
432 shell.user_expressions(content.get(u'user_expressions', {}))
433 else:
433 else:
434 # If there was an error, don't even try to compute variables or
434 # If there was an error, don't even try to compute variables or
435 # expressions
435 # expressions
436 reply_content[u'user_variables'] = {}
436 reply_content[u'user_variables'] = {}
437 reply_content[u'user_expressions'] = {}
437 reply_content[u'user_expressions'] = {}
438
438
439 # Payloads should be retrieved regardless of outcome, so we can both
439 # Payloads should be retrieved regardless of outcome, so we can both
440 # recover partial output (that could have been generated early in a
440 # recover partial output (that could have been generated early in a
441 # block, before an error) and clear the payload system always.
441 # block, before an error) and clear the payload system always.
442 reply_content[u'payload'] = shell.payload_manager.read_payload()
442 reply_content[u'payload'] = shell.payload_manager.read_payload()
443 # Be agressive about clearing the payload because we don't want
443 # Be agressive about clearing the payload because we don't want
444 # it to sit in memory until the next execute_request comes in.
444 # it to sit in memory until the next execute_request comes in.
445 shell.payload_manager.clear_payload()
445 shell.payload_manager.clear_payload()
446
446
447 # Flush output before sending the reply.
447 # Flush output before sending the reply.
448 sys.stdout.flush()
448 sys.stdout.flush()
449 sys.stderr.flush()
449 sys.stderr.flush()
450 # FIXME: on rare occasions, the flush doesn't seem to make it to the
450 # FIXME: on rare occasions, the flush doesn't seem to make it to the
451 # clients... This seems to mitigate the problem, but we definitely need
451 # clients... This seems to mitigate the problem, but we definitely need
452 # to better understand what's going on.
452 # to better understand what's going on.
453 if self._execute_sleep:
453 if self._execute_sleep:
454 time.sleep(self._execute_sleep)
454 time.sleep(self._execute_sleep)
455
455
456 # Send the reply.
456 # Send the reply.
457 reply_content = json_clean(reply_content)
457 reply_content = json_clean(reply_content)
458
458
459 md['status'] = reply_content['status']
459 md['status'] = reply_content['status']
460 if reply_content['status'] == 'error' and \
460 if reply_content['status'] == 'error' and \
461 reply_content['ename'] == 'UnmetDependency':
461 reply_content['ename'] == 'UnmetDependency':
462 md['dependencies_met'] = False
462 md['dependencies_met'] = False
463
463
464 reply_msg = self.session.send(stream, u'execute_reply',
464 reply_msg = self.session.send(stream, u'execute_reply',
465 reply_content, parent, metadata=md,
465 reply_content, parent, metadata=md,
466 ident=ident)
466 ident=ident)
467
467
468 self.log.debug("%s", reply_msg)
468 self.log.debug("%s", reply_msg)
469
469
470 if not silent and reply_msg['content']['status'] == u'error':
470 if not silent and reply_msg['content']['status'] == u'error':
471 self._abort_queues()
471 self._abort_queues()
472
472
473 self._publish_status(u'idle', parent)
473 self._publish_status(u'idle', parent)
474
474
475 def complete_request(self, stream, ident, parent):
475 def complete_request(self, stream, ident, parent):
476 txt, matches = self._complete(parent)
476 txt, matches = self._complete(parent)
477 matches = {'matches' : matches,
477 matches = {'matches' : matches,
478 'matched_text' : txt,
478 'matched_text' : txt,
479 'status' : 'ok'}
479 'status' : 'ok'}
480 matches = json_clean(matches)
480 matches = json_clean(matches)
481 completion_msg = self.session.send(stream, 'complete_reply',
481 completion_msg = self.session.send(stream, 'complete_reply',
482 matches, parent, ident)
482 matches, parent, ident)
483 self.log.debug("%s", completion_msg)
483 self.log.debug("%s", completion_msg)
484
484
485 def object_info_request(self, stream, ident, parent):
485 def object_info_request(self, stream, ident, parent):
486 content = parent['content']
486 content = parent['content']
487 object_info = self.shell.object_inspect(content['oname'],
487 object_info = self.shell.object_inspect(content['oname'],
488 detail_level = content.get('detail_level', 0)
488 detail_level = content.get('detail_level', 0)
489 )
489 )
490 # Before we send this object over, we scrub it for JSON usage
490 # Before we send this object over, we scrub it for JSON usage
491 oinfo = json_clean(object_info)
491 oinfo = json_clean(object_info)
492 msg = self.session.send(stream, 'object_info_reply',
492 msg = self.session.send(stream, 'object_info_reply',
493 oinfo, parent, ident)
493 oinfo, parent, ident)
494 self.log.debug("%s", msg)
494 self.log.debug("%s", msg)
495
495
496 def history_request(self, stream, ident, parent):
496 def history_request(self, stream, ident, parent):
497 # We need to pull these out, as passing **kwargs doesn't work with
497 # We need to pull these out, as passing **kwargs doesn't work with
498 # unicode keys before Python 2.6.5.
498 # unicode keys before Python 2.6.5.
499 hist_access_type = parent['content']['hist_access_type']
499 hist_access_type = parent['content']['hist_access_type']
500 raw = parent['content']['raw']
500 raw = parent['content']['raw']
501 output = parent['content']['output']
501 output = parent['content']['output']
502 if hist_access_type == 'tail':
502 if hist_access_type == 'tail':
503 n = parent['content']['n']
503 n = parent['content']['n']
504 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
504 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
505 include_latest=True)
505 include_latest=True)
506
506
507 elif hist_access_type == 'range':
507 elif hist_access_type == 'range':
508 session = parent['content']['session']
508 session = parent['content']['session']
509 start = parent['content']['start']
509 start = parent['content']['start']
510 stop = parent['content']['stop']
510 stop = parent['content']['stop']
511 hist = self.shell.history_manager.get_range(session, start, stop,
511 hist = self.shell.history_manager.get_range(session, start, stop,
512 raw=raw, output=output)
512 raw=raw, output=output)
513
513
514 elif hist_access_type == 'search':
514 elif hist_access_type == 'search':
515 n = parent['content'].get('n')
515 n = parent['content'].get('n')
516 unique = parent['content'].get('unique', False)
516 unique = parent['content'].get('unique', False)
517 pattern = parent['content']['pattern']
517 pattern = parent['content']['pattern']
518 hist = self.shell.history_manager.search(
518 hist = self.shell.history_manager.search(
519 pattern, raw=raw, output=output, n=n, unique=unique)
519 pattern, raw=raw, output=output, n=n, unique=unique)
520
520
521 else:
521 else:
522 hist = []
522 hist = []
523 hist = list(hist)
523 hist = list(hist)
524 content = {'history' : hist}
524 content = {'history' : hist}
525 content = json_clean(content)
525 content = json_clean(content)
526 msg = self.session.send(stream, 'history_reply',
526 msg = self.session.send(stream, 'history_reply',
527 content, parent, ident)
527 content, parent, ident)
528 self.log.debug("Sending history reply with %i entries", len(hist))
528 self.log.debug("Sending history reply with %i entries", len(hist))
529
529
530 def connect_request(self, stream, ident, parent):
530 def connect_request(self, stream, ident, parent):
531 if self._recorded_ports is not None:
531 if self._recorded_ports is not None:
532 content = self._recorded_ports.copy()
532 content = self._recorded_ports.copy()
533 else:
533 else:
534 content = {}
534 content = {}
535 msg = self.session.send(stream, 'connect_reply',
535 msg = self.session.send(stream, 'connect_reply',
536 content, parent, ident)
536 content, parent, ident)
537 self.log.debug("%s", msg)
537 self.log.debug("%s", msg)
538
538
539 def kernel_info_request(self, stream, ident, parent):
539 def kernel_info_request(self, stream, ident, parent):
540 vinfo = {
540 vinfo = {
541 'protocol_version': protocol_version,
541 'protocol_version': protocol_version,
542 'ipython_version': ipython_version,
542 'ipython_version': ipython_version,
543 'language_version': language_version,
543 'language_version': language_version,
544 'language': 'python',
544 'language': 'python',
545 }
545 }
546 msg = self.session.send(stream, 'kernel_info_reply',
546 msg = self.session.send(stream, 'kernel_info_reply',
547 vinfo, parent, ident)
547 vinfo, parent, ident)
548 self.log.debug("%s", msg)
548 self.log.debug("%s", msg)
549
549
550 def shutdown_request(self, stream, ident, parent):
550 def shutdown_request(self, stream, ident, parent):
551 self.shell.exit_now = True
551 self.shell.exit_now = True
552 content = dict(status='ok')
552 content = dict(status='ok')
553 content.update(parent['content'])
553 content.update(parent['content'])
554 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
554 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
555 # same content, but different msg_id for broadcasting on IOPub
555 # same content, but different msg_id for broadcasting on IOPub
556 self._shutdown_message = self.session.msg(u'shutdown_reply',
556 self._shutdown_message = self.session.msg(u'shutdown_reply',
557 content, parent
557 content, parent
558 )
558 )
559
559
560 self._at_shutdown()
560 self._at_shutdown()
561 # call sys.exit after a short delay
561 # call sys.exit after a short delay
562 loop = ioloop.IOLoop.instance()
562 loop = ioloop.IOLoop.instance()
563 loop.add_timeout(time.time()+0.1, loop.stop)
563 loop.add_timeout(time.time()+0.1, loop.stop)
564
564
565 #---------------------------------------------------------------------------
565 #---------------------------------------------------------------------------
566 # Engine methods
566 # Engine methods
567 #---------------------------------------------------------------------------
567 #---------------------------------------------------------------------------
568
568
569 def apply_request(self, stream, ident, parent):
569 def apply_request(self, stream, ident, parent):
570 try:
570 try:
571 content = parent[u'content']
571 content = parent[u'content']
572 bufs = parent[u'buffers']
572 bufs = parent[u'buffers']
573 msg_id = parent['header']['msg_id']
573 msg_id = parent['header']['msg_id']
574 except:
574 except:
575 self.log.error("Got bad msg: %s", parent, exc_info=True)
575 self.log.error("Got bad msg: %s", parent, exc_info=True)
576 return
576 return
577
577
578 self._publish_status(u'busy', parent)
578 self._publish_status(u'busy', parent)
579
579
580 # Set the parent message of the display hook and out streams.
580 # Set the parent message of the display hook and out streams.
581 shell = self.shell
581 shell = self.shell
582 shell.displayhook.set_parent(parent)
582 shell.displayhook.set_parent(parent)
583 shell.display_pub.set_parent(parent)
583 shell.display_pub.set_parent(parent)
584 shell.data_pub.set_parent(parent)
584 shell.data_pub.set_parent(parent)
585 try:
585 try:
586 sys.stdout.set_parent(parent)
586 sys.stdout.set_parent(parent)
587 except AttributeError:
587 except AttributeError:
588 pass
588 pass
589 try:
589 try:
590 sys.stderr.set_parent(parent)
590 sys.stderr.set_parent(parent)
591 except AttributeError:
591 except AttributeError:
592 pass
592 pass
593
593
594 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
594 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
595 # self.iopub_socket.send(pyin_msg)
595 # self.iopub_socket.send(pyin_msg)
596 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
596 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
597 md = self._make_metadata(parent['metadata'])
597 md = self._make_metadata(parent['metadata'])
598 try:
598 try:
599 working = shell.user_ns
599 working = shell.user_ns
600
600
601 prefix = "_"+str(msg_id).replace("-","")+"_"
601 prefix = "_"+str(msg_id).replace("-","")+"_"
602
602
603 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
603 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
604
604
605 fname = getattr(f, '__name__', 'f')
605 fname = getattr(f, '__name__', 'f')
606
606
607 fname = prefix+"f"
607 fname = prefix+"f"
608 argname = prefix+"args"
608 argname = prefix+"args"
609 kwargname = prefix+"kwargs"
609 kwargname = prefix+"kwargs"
610 resultname = prefix+"result"
610 resultname = prefix+"result"
611
611
612 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
612 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
613 # print ns
613 # print ns
614 working.update(ns)
614 working.update(ns)
615 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
615 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
616 try:
616 try:
617 exec code in shell.user_global_ns, shell.user_ns
617 exec code in shell.user_global_ns, shell.user_ns
618 result = working.get(resultname)
618 result = working.get(resultname)
619 finally:
619 finally:
620 for key in ns.iterkeys():
620 for key in ns.iterkeys():
621 working.pop(key)
621 working.pop(key)
622
622
623 result_buf = serialize_object(result,
623 result_buf = serialize_object(result,
624 buffer_threshold=self.session.buffer_threshold,
624 buffer_threshold=self.session.buffer_threshold,
625 item_threshold=self.session.item_threshold,
625 item_threshold=self.session.item_threshold,
626 )
626 )
627
627
628 except:
628 except:
629 # invoke IPython traceback formatting
629 # invoke IPython traceback formatting
630 shell.showtraceback()
630 shell.showtraceback()
631 # FIXME - fish exception info out of shell, possibly left there by
631 # FIXME - fish exception info out of shell, possibly left there by
632 # run_code. We'll need to clean up this logic later.
632 # run_code. We'll need to clean up this logic later.
633 reply_content = {}
633 reply_content = {}
634 if shell._reply_content is not None:
634 if shell._reply_content is not None:
635 reply_content.update(shell._reply_content)
635 reply_content.update(shell._reply_content)
636 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
636 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
637 reply_content['engine_info'] = e_info
637 reply_content['engine_info'] = e_info
638 # reset after use
638 # reset after use
639 shell._reply_content = None
639 shell._reply_content = None
640
640
641 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
641 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
642 ident=self._topic('pyerr'))
642 ident=self._topic('pyerr'))
643 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
643 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
644 result_buf = []
644 result_buf = []
645
645
646 if reply_content['ename'] == 'UnmetDependency':
646 if reply_content['ename'] == 'UnmetDependency':
647 md['dependencies_met'] = False
647 md['dependencies_met'] = False
648 else:
648 else:
649 reply_content = {'status' : 'ok'}
649 reply_content = {'status' : 'ok'}
650
650
651 # put 'ok'/'error' status in header, for scheduler introspection:
651 # put 'ok'/'error' status in header, for scheduler introspection:
652 md['status'] = reply_content['status']
652 md['status'] = reply_content['status']
653
653
654 # flush i/o
654 # flush i/o
655 sys.stdout.flush()
655 sys.stdout.flush()
656 sys.stderr.flush()
656 sys.stderr.flush()
657
657
658 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
658 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
659 parent=parent, ident=ident,buffers=result_buf, metadata=md)
659 parent=parent, ident=ident,buffers=result_buf, metadata=md)
660
660
661 self._publish_status(u'idle', parent)
661 self._publish_status(u'idle', parent)
662
662
663 #---------------------------------------------------------------------------
663 #---------------------------------------------------------------------------
664 # Control messages
664 # Control messages
665 #---------------------------------------------------------------------------
665 #---------------------------------------------------------------------------
666
666
667 def abort_request(self, stream, ident, parent):
667 def abort_request(self, stream, ident, parent):
668 """abort a specifig msg by id"""
668 """abort a specifig msg by id"""
669 msg_ids = parent['content'].get('msg_ids', None)
669 msg_ids = parent['content'].get('msg_ids', None)
670 if isinstance(msg_ids, basestring):
670 if isinstance(msg_ids, basestring):
671 msg_ids = [msg_ids]
671 msg_ids = [msg_ids]
672 if not msg_ids:
672 if not msg_ids:
673 self.abort_queues()
673 self.abort_queues()
674 for mid in msg_ids:
674 for mid in msg_ids:
675 self.aborted.add(str(mid))
675 self.aborted.add(str(mid))
676
676
677 content = dict(status='ok')
677 content = dict(status='ok')
678 reply_msg = self.session.send(stream, 'abort_reply', content=content,
678 reply_msg = self.session.send(stream, 'abort_reply', content=content,
679 parent=parent, ident=ident)
679 parent=parent, ident=ident)
680 self.log.debug("%s", reply_msg)
680 self.log.debug("%s", reply_msg)
681
681
682 def clear_request(self, stream, idents, parent):
682 def clear_request(self, stream, idents, parent):
683 """Clear our namespace."""
683 """Clear our namespace."""
684 self.shell.reset(False)
684 self.shell.reset(False)
685 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
685 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
686 content = dict(status='ok'))
686 content = dict(status='ok'))
687
687
688
688
689 #---------------------------------------------------------------------------
689 #---------------------------------------------------------------------------
690 # Protected interface
690 # Protected interface
691 #---------------------------------------------------------------------------
691 #---------------------------------------------------------------------------
692
692
693 def _wrap_exception(self, method=None):
693 def _wrap_exception(self, method=None):
694 # import here, because _wrap_exception is only used in parallel,
694 # import here, because _wrap_exception is only used in parallel,
695 # and parallel has higher min pyzmq version
695 # and parallel has higher min pyzmq version
696 from IPython.parallel.error import wrap_exception
696 from IPython.parallel.error import wrap_exception
697 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
697 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
698 content = wrap_exception(e_info)
698 content = wrap_exception(e_info)
699 return content
699 return content
700
700
701 def _topic(self, topic):
701 def _topic(self, topic):
702 """prefixed topic for IOPub messages"""
702 """prefixed topic for IOPub messages"""
703 if self.int_id >= 0:
703 if self.int_id >= 0:
704 base = "engine.%i" % self.int_id
704 base = "engine.%i" % self.int_id
705 else:
705 else:
706 base = "kernel.%s" % self.ident
706 base = "kernel.%s" % self.ident
707
707
708 return py3compat.cast_bytes("%s.%s" % (base, topic))
708 return py3compat.cast_bytes("%s.%s" % (base, topic))
709
709
710 def _abort_queues(self):
710 def _abort_queues(self):
711 for stream in self.shell_streams:
711 for stream in self.shell_streams:
712 if stream:
712 if stream:
713 self._abort_queue(stream)
713 self._abort_queue(stream)
714
714
715 def _abort_queue(self, stream):
715 def _abort_queue(self, stream):
716 poller = zmq.Poller()
716 poller = zmq.Poller()
717 poller.register(stream.socket, zmq.POLLIN)
717 poller.register(stream.socket, zmq.POLLIN)
718 while True:
718 while True:
719 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
719 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
720 if msg is None:
720 if msg is None:
721 return
721 return
722
722
723 self.log.info("Aborting:")
723 self.log.info("Aborting:")
724 self.log.info("%s", msg)
724 self.log.info("%s", msg)
725 msg_type = msg['header']['msg_type']
725 msg_type = msg['header']['msg_type']
726 reply_type = msg_type.split('_')[0] + '_reply'
726 reply_type = msg_type.split('_')[0] + '_reply'
727
727
728 status = {'status' : 'aborted'}
728 status = {'status' : 'aborted'}
729 md = {'engine' : self.ident}
729 md = {'engine' : self.ident}
730 md.update(status)
730 md.update(status)
731 reply_msg = self.session.send(stream, reply_type, metadata=md,
731 reply_msg = self.session.send(stream, reply_type, metadata=md,
732 content=status, parent=msg, ident=idents)
732 content=status, parent=msg, ident=idents)
733 self.log.debug("%s", reply_msg)
733 self.log.debug("%s", reply_msg)
734 # We need to wait a bit for requests to come in. This can probably
734 # We need to wait a bit for requests to come in. This can probably
735 # be set shorter for true asynchronous clients.
735 # be set shorter for true asynchronous clients.
736 poller.poll(50)
736 poller.poll(50)
737
737
738
738
739 def _no_raw_input(self):
739 def _no_raw_input(self):
740 """Raise StdinNotImplentedError if active frontend doesn't support
740 """Raise StdinNotImplentedError if active frontend doesn't support
741 stdin."""
741 stdin."""
742 raise StdinNotImplementedError("raw_input was called, but this "
742 raise StdinNotImplementedError("raw_input was called, but this "
743 "frontend does not support stdin.")
743 "frontend does not support stdin.")
744
744
745 def _raw_input(self, prompt, ident, parent):
745 def _raw_input(self, prompt, ident, parent):
746 # Flush output before making the request.
746 # Flush output before making the request.
747 sys.stderr.flush()
747 sys.stderr.flush()
748 sys.stdout.flush()
748 sys.stdout.flush()
749 # flush the stdin socket, to purge stale replies
749 # flush the stdin socket, to purge stale replies
750 while True:
750 while True:
751 try:
751 try:
752 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
752 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
753 except zmq.ZMQError as e:
753 except zmq.ZMQError as e:
754 if e.errno == zmq.EAGAIN:
754 if e.errno == zmq.EAGAIN:
755 break
755 break
756 else:
756 else:
757 raise
757 raise
758
758
759 # Send the input request.
759 # Send the input request.
760 content = json_clean(dict(prompt=prompt))
760 content = json_clean(dict(prompt=prompt))
761 self.session.send(self.stdin_socket, u'input_request', content, parent,
761 self.session.send(self.stdin_socket, u'input_request', content, parent,
762 ident=ident)
762 ident=ident)
763
763
764 # Await a response.
764 # Await a response.
765 while True:
765 while True:
766 try:
766 try:
767 ident, reply = self.session.recv(self.stdin_socket, 0)
767 ident, reply = self.session.recv(self.stdin_socket, 0)
768 except Exception:
768 except Exception:
769 self.log.warn("Invalid Message:", exc_info=True)
769 self.log.warn("Invalid Message:", exc_info=True)
770 else:
770 else:
771 break
771 break
772 try:
772 try:
773 value = reply['content']['value']
773 value = py3compat.unicode_to_str(reply['content']['value'])
774 except:
774 except:
775 self.log.error("Got bad raw_input reply: ")
775 self.log.error("Got bad raw_input reply: ")
776 self.log.error("%s", parent)
776 self.log.error("%s", parent)
777 value = ''
777 value = ''
778 if value == '\x04':
778 if value == '\x04':
779 # EOF
779 # EOF
780 raise EOFError
780 raise EOFError
781 return value
781 return value
782
782
783 def _complete(self, msg):
783 def _complete(self, msg):
784 c = msg['content']
784 c = msg['content']
785 try:
785 try:
786 cpos = int(c['cursor_pos'])
786 cpos = int(c['cursor_pos'])
787 except:
787 except:
788 # If we don't get something that we can convert to an integer, at
788 # If we don't get something that we can convert to an integer, at
789 # least attempt the completion guessing the cursor is at the end of
789 # least attempt the completion guessing the cursor is at the end of
790 # the text, if there's any, and otherwise of the line
790 # the text, if there's any, and otherwise of the line
791 cpos = len(c['text'])
791 cpos = len(c['text'])
792 if cpos==0:
792 if cpos==0:
793 cpos = len(c['line'])
793 cpos = len(c['line'])
794 return self.shell.complete(c['text'], c['line'], cpos)
794 return self.shell.complete(c['text'], c['line'], cpos)
795
795
796 def _at_shutdown(self):
796 def _at_shutdown(self):
797 """Actions taken at shutdown by the kernel, called by python's atexit.
797 """Actions taken at shutdown by the kernel, called by python's atexit.
798 """
798 """
799 # io.rprint("Kernel at_shutdown") # dbg
799 # io.rprint("Kernel at_shutdown") # dbg
800 if self._shutdown_message is not None:
800 if self._shutdown_message is not None:
801 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
801 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
802 self.log.debug("%s", self._shutdown_message)
802 self.log.debug("%s", self._shutdown_message)
803 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
803 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
804
804
General Comments 0
You need to be logged in to leave comments. Login now