##// END OF EJS Templates
flush replies when entering an eventloop...
MinRK -
Show More
1 NO CONTENT: modified file
NO CONTENT: modified file
@@ -1,795 +1,799 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """An interactive kernel that talks to frontends over 0MQ."""
2 """An interactive kernel that talks to frontends over 0MQ."""
3
3
4 #-----------------------------------------------------------------------------
4 #-----------------------------------------------------------------------------
5 # Imports
5 # Imports
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 from __future__ import print_function
7 from __future__ import print_function
8
8
9 # Standard library imports
9 # Standard library imports
10 import sys
10 import sys
11 import time
11 import time
12 import traceback
12 import traceback
13 import logging
13 import logging
14 import uuid
14 import uuid
15
15
16 from datetime import datetime
16 from datetime import datetime
17 from signal import (
17 from signal import (
18 signal, default_int_handler, SIGINT
18 signal, default_int_handler, SIGINT
19 )
19 )
20
20
21 # System library imports
21 # System library imports
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop
23 from zmq.eventloop import ioloop
24 from zmq.eventloop.zmqstream import ZMQStream
24 from zmq.eventloop.zmqstream import ZMQStream
25
25
26 # Local imports
26 # Local imports
27 from IPython.config.configurable import Configurable
27 from IPython.config.configurable import Configurable
28 from IPython.core.error import StdinNotImplementedError
28 from IPython.core.error import StdinNotImplementedError
29 from IPython.core import release
29 from IPython.core import release
30 from IPython.utils import py3compat
30 from IPython.utils import py3compat
31 from IPython.utils.py3compat import builtin_mod, unicode_type, string_types
31 from IPython.utils.py3compat import builtin_mod, unicode_type, string_types
32 from IPython.utils.jsonutil import json_clean
32 from IPython.utils.jsonutil import json_clean
33 from IPython.utils.traitlets import (
33 from IPython.utils.traitlets import (
34 Any, Instance, Float, Dict, List, Set, Integer, Unicode,
34 Any, Instance, Float, Dict, List, Set, Integer, Unicode,
35 Type, Bool,
35 Type, Bool,
36 )
36 )
37
37
38 from .serialize import serialize_object, unpack_apply_message
38 from .serialize import serialize_object, unpack_apply_message
39 from .session import Session
39 from .session import Session
40 from .zmqshell import ZMQInteractiveShell
40 from .zmqshell import ZMQInteractiveShell
41
41
42
42
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44 # Main kernel class
44 # Main kernel class
45 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
46
46
47 protocol_version = list(release.kernel_protocol_version_info)
47 protocol_version = list(release.kernel_protocol_version_info)
48 ipython_version = list(release.version_info)
48 ipython_version = list(release.version_info)
49 language_version = list(sys.version_info[:3])
49 language_version = list(sys.version_info[:3])
50
50
51
51
52 class Kernel(Configurable):
52 class Kernel(Configurable):
53
53
54 #---------------------------------------------------------------------------
54 #---------------------------------------------------------------------------
55 # Kernel interface
55 # Kernel interface
56 #---------------------------------------------------------------------------
56 #---------------------------------------------------------------------------
57
57
58 # attribute to override with a GUI
58 # attribute to override with a GUI
59 eventloop = Any(None)
59 eventloop = Any(None)
60 def _eventloop_changed(self, name, old, new):
60 def _eventloop_changed(self, name, old, new):
61 """schedule call to eventloop from IOLoop"""
61 """schedule call to eventloop from IOLoop"""
62 loop = ioloop.IOLoop.instance()
62 loop = ioloop.IOLoop.instance()
63 loop.add_timeout(time.time()+0.1, self.enter_eventloop)
63 loop.add_callback(self.enter_eventloop)
64
64
65 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
65 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
66 shell_class = Type(ZMQInteractiveShell)
66 shell_class = Type(ZMQInteractiveShell)
67
67
68 session = Instance(Session)
68 session = Instance(Session)
69 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
69 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
70 shell_streams = List()
70 shell_streams = List()
71 control_stream = Instance(ZMQStream)
71 control_stream = Instance(ZMQStream)
72 iopub_socket = Instance(zmq.Socket)
72 iopub_socket = Instance(zmq.Socket)
73 stdin_socket = Instance(zmq.Socket)
73 stdin_socket = Instance(zmq.Socket)
74 log = Instance(logging.Logger)
74 log = Instance(logging.Logger)
75
75
76 user_module = Any()
76 user_module = Any()
77 def _user_module_changed(self, name, old, new):
77 def _user_module_changed(self, name, old, new):
78 if self.shell is not None:
78 if self.shell is not None:
79 self.shell.user_module = new
79 self.shell.user_module = new
80
80
81 user_ns = Instance(dict, args=None, allow_none=True)
81 user_ns = Instance(dict, args=None, allow_none=True)
82 def _user_ns_changed(self, name, old, new):
82 def _user_ns_changed(self, name, old, new):
83 if self.shell is not None:
83 if self.shell is not None:
84 self.shell.user_ns = new
84 self.shell.user_ns = new
85 self.shell.init_user_ns()
85 self.shell.init_user_ns()
86
86
87 # identities:
87 # identities:
88 int_id = Integer(-1)
88 int_id = Integer(-1)
89 ident = Unicode()
89 ident = Unicode()
90
90
91 def _ident_default(self):
91 def _ident_default(self):
92 return unicode_type(uuid.uuid4())
92 return unicode_type(uuid.uuid4())
93
93
94 # Private interface
94 # Private interface
95
95
96 _darwin_app_nap = Bool(True, config=True,
96 _darwin_app_nap = Bool(True, config=True,
97 help="""Whether to use appnope for compatiblity with OS X App Nap.
97 help="""Whether to use appnope for compatiblity with OS X App Nap.
98
98
99 Only affects OS X >= 10.9.
99 Only affects OS X >= 10.9.
100 """
100 """
101 )
101 )
102
102
103 # Time to sleep after flushing the stdout/err buffers in each execute
103 # Time to sleep after flushing the stdout/err buffers in each execute
104 # cycle. While this introduces a hard limit on the minimal latency of the
104 # cycle. While this introduces a hard limit on the minimal latency of the
105 # execute cycle, it helps prevent output synchronization problems for
105 # execute cycle, it helps prevent output synchronization problems for
106 # clients.
106 # clients.
107 # Units are in seconds. The minimum zmq latency on local host is probably
107 # Units are in seconds. The minimum zmq latency on local host is probably
108 # ~150 microseconds, set this to 500us for now. We may need to increase it
108 # ~150 microseconds, set this to 500us for now. We may need to increase it
109 # a little if it's not enough after more interactive testing.
109 # a little if it's not enough after more interactive testing.
110 _execute_sleep = Float(0.0005, config=True)
110 _execute_sleep = Float(0.0005, config=True)
111
111
112 # Frequency of the kernel's event loop.
112 # Frequency of the kernel's event loop.
113 # Units are in seconds, kernel subclasses for GUI toolkits may need to
113 # Units are in seconds, kernel subclasses for GUI toolkits may need to
114 # adapt to milliseconds.
114 # adapt to milliseconds.
115 _poll_interval = Float(0.05, config=True)
115 _poll_interval = Float(0.05, config=True)
116
116
117 # If the shutdown was requested over the network, we leave here the
117 # If the shutdown was requested over the network, we leave here the
118 # necessary reply message so it can be sent by our registered atexit
118 # necessary reply message so it can be sent by our registered atexit
119 # handler. This ensures that the reply is only sent to clients truly at
119 # handler. This ensures that the reply is only sent to clients truly at
120 # the end of our shutdown process (which happens after the underlying
120 # the end of our shutdown process (which happens after the underlying
121 # IPython shell's own shutdown).
121 # IPython shell's own shutdown).
122 _shutdown_message = None
122 _shutdown_message = None
123
123
124 # This is a dict of port number that the kernel is listening on. It is set
124 # This is a dict of port number that the kernel is listening on. It is set
125 # by record_ports and used by connect_request.
125 # by record_ports and used by connect_request.
126 _recorded_ports = Dict()
126 _recorded_ports = Dict()
127
127
128 # A reference to the Python builtin 'raw_input' function.
128 # A reference to the Python builtin 'raw_input' function.
129 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
129 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
130 _sys_raw_input = Any()
130 _sys_raw_input = Any()
131 _sys_eval_input = Any()
131 _sys_eval_input = Any()
132
132
133 # set of aborted msg_ids
133 # set of aborted msg_ids
134 aborted = Set()
134 aborted = Set()
135
135
136
136
137 def __init__(self, **kwargs):
137 def __init__(self, **kwargs):
138 super(Kernel, self).__init__(**kwargs)
138 super(Kernel, self).__init__(**kwargs)
139
139
140 # Initialize the InteractiveShell subclass
140 # Initialize the InteractiveShell subclass
141 self.shell = self.shell_class.instance(parent=self,
141 self.shell = self.shell_class.instance(parent=self,
142 profile_dir = self.profile_dir,
142 profile_dir = self.profile_dir,
143 user_module = self.user_module,
143 user_module = self.user_module,
144 user_ns = self.user_ns,
144 user_ns = self.user_ns,
145 kernel = self,
145 kernel = self,
146 )
146 )
147 self.shell.displayhook.session = self.session
147 self.shell.displayhook.session = self.session
148 self.shell.displayhook.pub_socket = self.iopub_socket
148 self.shell.displayhook.pub_socket = self.iopub_socket
149 self.shell.displayhook.topic = self._topic('pyout')
149 self.shell.displayhook.topic = self._topic('pyout')
150 self.shell.display_pub.session = self.session
150 self.shell.display_pub.session = self.session
151 self.shell.display_pub.pub_socket = self.iopub_socket
151 self.shell.display_pub.pub_socket = self.iopub_socket
152 self.shell.data_pub.session = self.session
152 self.shell.data_pub.session = self.session
153 self.shell.data_pub.pub_socket = self.iopub_socket
153 self.shell.data_pub.pub_socket = self.iopub_socket
154
154
155 # TMP - hack while developing
155 # TMP - hack while developing
156 self.shell._reply_content = None
156 self.shell._reply_content = None
157
157
158 # Build dict of handlers for message types
158 # Build dict of handlers for message types
159 msg_types = [ 'execute_request', 'complete_request',
159 msg_types = [ 'execute_request', 'complete_request',
160 'object_info_request', 'history_request',
160 'object_info_request', 'history_request',
161 'kernel_info_request',
161 'kernel_info_request',
162 'connect_request', 'shutdown_request',
162 'connect_request', 'shutdown_request',
163 'apply_request',
163 'apply_request',
164 ]
164 ]
165 self.shell_handlers = {}
165 self.shell_handlers = {}
166 for msg_type in msg_types:
166 for msg_type in msg_types:
167 self.shell_handlers[msg_type] = getattr(self, msg_type)
167 self.shell_handlers[msg_type] = getattr(self, msg_type)
168
168
169 comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]
169 comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]
170 comm_manager = self.shell.comm_manager
170 comm_manager = self.shell.comm_manager
171 for msg_type in comm_msg_types:
171 for msg_type in comm_msg_types:
172 self.shell_handlers[msg_type] = getattr(comm_manager, msg_type)
172 self.shell_handlers[msg_type] = getattr(comm_manager, msg_type)
173
173
174 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
174 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
175 self.control_handlers = {}
175 self.control_handlers = {}
176 for msg_type in control_msg_types:
176 for msg_type in control_msg_types:
177 self.control_handlers[msg_type] = getattr(self, msg_type)
177 self.control_handlers[msg_type] = getattr(self, msg_type)
178
178
179
179
180 def dispatch_control(self, msg):
180 def dispatch_control(self, msg):
181 """dispatch control requests"""
181 """dispatch control requests"""
182 idents,msg = self.session.feed_identities(msg, copy=False)
182 idents,msg = self.session.feed_identities(msg, copy=False)
183 try:
183 try:
184 msg = self.session.unserialize(msg, content=True, copy=False)
184 msg = self.session.unserialize(msg, content=True, copy=False)
185 except:
185 except:
186 self.log.error("Invalid Control Message", exc_info=True)
186 self.log.error("Invalid Control Message", exc_info=True)
187 return
187 return
188
188
189 self.log.debug("Control received: %s", msg)
189 self.log.debug("Control received: %s", msg)
190
190
191 header = msg['header']
191 header = msg['header']
192 msg_id = header['msg_id']
192 msg_id = header['msg_id']
193 msg_type = header['msg_type']
193 msg_type = header['msg_type']
194
194
195 handler = self.control_handlers.get(msg_type, None)
195 handler = self.control_handlers.get(msg_type, None)
196 if handler is None:
196 if handler is None:
197 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
197 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
198 else:
198 else:
199 try:
199 try:
200 handler(self.control_stream, idents, msg)
200 handler(self.control_stream, idents, msg)
201 except Exception:
201 except Exception:
202 self.log.error("Exception in control handler:", exc_info=True)
202 self.log.error("Exception in control handler:", exc_info=True)
203
203
204 def dispatch_shell(self, stream, msg):
204 def dispatch_shell(self, stream, msg):
205 """dispatch shell requests"""
205 """dispatch shell requests"""
206 # flush control requests first
206 # flush control requests first
207 if self.control_stream:
207 if self.control_stream:
208 self.control_stream.flush()
208 self.control_stream.flush()
209
209
210 idents,msg = self.session.feed_identities(msg, copy=False)
210 idents,msg = self.session.feed_identities(msg, copy=False)
211 try:
211 try:
212 msg = self.session.unserialize(msg, content=True, copy=False)
212 msg = self.session.unserialize(msg, content=True, copy=False)
213 except:
213 except:
214 self.log.error("Invalid Message", exc_info=True)
214 self.log.error("Invalid Message", exc_info=True)
215 return
215 return
216
216
217 header = msg['header']
217 header = msg['header']
218 msg_id = header['msg_id']
218 msg_id = header['msg_id']
219 msg_type = msg['header']['msg_type']
219 msg_type = msg['header']['msg_type']
220
220
221 # Print some info about this message and leave a '--->' marker, so it's
221 # Print some info about this message and leave a '--->' marker, so it's
222 # easier to trace visually the message chain when debugging. Each
222 # easier to trace visually the message chain when debugging. Each
223 # handler prints its message at the end.
223 # handler prints its message at the end.
224 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
224 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
225 self.log.debug(' Content: %s\n --->\n ', msg['content'])
225 self.log.debug(' Content: %s\n --->\n ', msg['content'])
226
226
227 if msg_id in self.aborted:
227 if msg_id in self.aborted:
228 self.aborted.remove(msg_id)
228 self.aborted.remove(msg_id)
229 # is it safe to assume a msg_id will not be resubmitted?
229 # is it safe to assume a msg_id will not be resubmitted?
230 reply_type = msg_type.split('_')[0] + '_reply'
230 reply_type = msg_type.split('_')[0] + '_reply'
231 status = {'status' : 'aborted'}
231 status = {'status' : 'aborted'}
232 md = {'engine' : self.ident}
232 md = {'engine' : self.ident}
233 md.update(status)
233 md.update(status)
234 reply_msg = self.session.send(stream, reply_type, metadata=md,
234 reply_msg = self.session.send(stream, reply_type, metadata=md,
235 content=status, parent=msg, ident=idents)
235 content=status, parent=msg, ident=idents)
236 return
236 return
237
237
238 handler = self.shell_handlers.get(msg_type, None)
238 handler = self.shell_handlers.get(msg_type, None)
239 if handler is None:
239 if handler is None:
240 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
240 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
241 else:
241 else:
242 # ensure default_int_handler during handler call
242 # ensure default_int_handler during handler call
243 sig = signal(SIGINT, default_int_handler)
243 sig = signal(SIGINT, default_int_handler)
244 try:
244 try:
245 handler(stream, idents, msg)
245 handler(stream, idents, msg)
246 except Exception:
246 except Exception:
247 self.log.error("Exception in message handler:", exc_info=True)
247 self.log.error("Exception in message handler:", exc_info=True)
248 finally:
248 finally:
249 signal(SIGINT, sig)
249 signal(SIGINT, sig)
250
250
251 def enter_eventloop(self):
251 def enter_eventloop(self):
252 """enter eventloop"""
252 """enter eventloop"""
253 self.log.info("entering eventloop")
253 self.log.info("entering eventloop %s", self.eventloop)
254 for stream in self.shell_streams:
255 # flush any pending replies,
256 # which may be skipped by entering the eventloop
257 stream.flush(zmq.POLLOUT)
254 # restore default_int_handler
258 # restore default_int_handler
255 signal(SIGINT, default_int_handler)
259 signal(SIGINT, default_int_handler)
256 while self.eventloop is not None:
260 while self.eventloop is not None:
257 try:
261 try:
258 self.eventloop(self)
262 self.eventloop(self)
259 except KeyboardInterrupt:
263 except KeyboardInterrupt:
260 # Ctrl-C shouldn't crash the kernel
264 # Ctrl-C shouldn't crash the kernel
261 self.log.error("KeyboardInterrupt caught in kernel")
265 self.log.error("KeyboardInterrupt caught in kernel")
262 continue
266 continue
263 else:
267 else:
264 # eventloop exited cleanly, this means we should stop (right?)
268 # eventloop exited cleanly, this means we should stop (right?)
265 self.eventloop = None
269 self.eventloop = None
266 break
270 break
267 self.log.info("exiting eventloop")
271 self.log.info("exiting eventloop")
268
272
269 def start(self):
273 def start(self):
270 """register dispatchers for streams"""
274 """register dispatchers for streams"""
271 self.shell.exit_now = False
275 self.shell.exit_now = False
272 if self.control_stream:
276 if self.control_stream:
273 self.control_stream.on_recv(self.dispatch_control, copy=False)
277 self.control_stream.on_recv(self.dispatch_control, copy=False)
274
278
275 def make_dispatcher(stream):
279 def make_dispatcher(stream):
276 def dispatcher(msg):
280 def dispatcher(msg):
277 return self.dispatch_shell(stream, msg)
281 return self.dispatch_shell(stream, msg)
278 return dispatcher
282 return dispatcher
279
283
280 for s in self.shell_streams:
284 for s in self.shell_streams:
281 s.on_recv(make_dispatcher(s), copy=False)
285 s.on_recv(make_dispatcher(s), copy=False)
282
286
283 # publish idle status
287 # publish idle status
284 self._publish_status('starting')
288 self._publish_status('starting')
285
289
286 def do_one_iteration(self):
290 def do_one_iteration(self):
287 """step eventloop just once"""
291 """step eventloop just once"""
288 if self.control_stream:
292 if self.control_stream:
289 self.control_stream.flush()
293 self.control_stream.flush()
290 for stream in self.shell_streams:
294 for stream in self.shell_streams:
291 # handle at most one request per iteration
295 # handle at most one request per iteration
292 stream.flush(zmq.POLLIN, 1)
296 stream.flush(zmq.POLLIN, 1)
293 stream.flush(zmq.POLLOUT)
297 stream.flush(zmq.POLLOUT)
294
298
295
299
296 def record_ports(self, ports):
300 def record_ports(self, ports):
297 """Record the ports that this kernel is using.
301 """Record the ports that this kernel is using.
298
302
299 The creator of the Kernel instance must call this methods if they
303 The creator of the Kernel instance must call this methods if they
300 want the :meth:`connect_request` method to return the port numbers.
304 want the :meth:`connect_request` method to return the port numbers.
301 """
305 """
302 self._recorded_ports = ports
306 self._recorded_ports = ports
303
307
304 #---------------------------------------------------------------------------
308 #---------------------------------------------------------------------------
305 # Kernel request handlers
309 # Kernel request handlers
306 #---------------------------------------------------------------------------
310 #---------------------------------------------------------------------------
307
311
308 def _make_metadata(self, other=None):
312 def _make_metadata(self, other=None):
309 """init metadata dict, for execute/apply_reply"""
313 """init metadata dict, for execute/apply_reply"""
310 new_md = {
314 new_md = {
311 'dependencies_met' : True,
315 'dependencies_met' : True,
312 'engine' : self.ident,
316 'engine' : self.ident,
313 'started': datetime.now(),
317 'started': datetime.now(),
314 }
318 }
315 if other:
319 if other:
316 new_md.update(other)
320 new_md.update(other)
317 return new_md
321 return new_md
318
322
319 def _publish_pyin(self, code, parent, execution_count):
323 def _publish_pyin(self, code, parent, execution_count):
320 """Publish the code request on the pyin stream."""
324 """Publish the code request on the pyin stream."""
321
325
322 self.session.send(self.iopub_socket, u'pyin',
326 self.session.send(self.iopub_socket, u'pyin',
323 {u'code':code, u'execution_count': execution_count},
327 {u'code':code, u'execution_count': execution_count},
324 parent=parent, ident=self._topic('pyin')
328 parent=parent, ident=self._topic('pyin')
325 )
329 )
326
330
327 def _publish_status(self, status, parent=None):
331 def _publish_status(self, status, parent=None):
328 """send status (busy/idle) on IOPub"""
332 """send status (busy/idle) on IOPub"""
329 self.session.send(self.iopub_socket,
333 self.session.send(self.iopub_socket,
330 u'status',
334 u'status',
331 {u'execution_state': status},
335 {u'execution_state': status},
332 parent=parent,
336 parent=parent,
333 ident=self._topic('status'),
337 ident=self._topic('status'),
334 )
338 )
335
339
336
340
337 def execute_request(self, stream, ident, parent):
341 def execute_request(self, stream, ident, parent):
338 """handle an execute_request"""
342 """handle an execute_request"""
339
343
340 self._publish_status(u'busy', parent)
344 self._publish_status(u'busy', parent)
341
345
342 try:
346 try:
343 content = parent[u'content']
347 content = parent[u'content']
344 code = py3compat.cast_unicode_py2(content[u'code'])
348 code = py3compat.cast_unicode_py2(content[u'code'])
345 silent = content[u'silent']
349 silent = content[u'silent']
346 store_history = content.get(u'store_history', not silent)
350 store_history = content.get(u'store_history', not silent)
347 except:
351 except:
348 self.log.error("Got bad msg: ")
352 self.log.error("Got bad msg: ")
349 self.log.error("%s", parent)
353 self.log.error("%s", parent)
350 return
354 return
351
355
352 md = self._make_metadata(parent['metadata'])
356 md = self._make_metadata(parent['metadata'])
353
357
354 shell = self.shell # we'll need this a lot here
358 shell = self.shell # we'll need this a lot here
355
359
356 # Replace raw_input. Note that is not sufficient to replace
360 # Replace raw_input. Note that is not sufficient to replace
357 # raw_input in the user namespace.
361 # raw_input in the user namespace.
358 if content.get('allow_stdin', False):
362 if content.get('allow_stdin', False):
359 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
363 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
360 input = lambda prompt='': eval(raw_input(prompt))
364 input = lambda prompt='': eval(raw_input(prompt))
361 else:
365 else:
362 raw_input = input = lambda prompt='' : self._no_raw_input()
366 raw_input = input = lambda prompt='' : self._no_raw_input()
363
367
364 if py3compat.PY3:
368 if py3compat.PY3:
365 self._sys_raw_input = builtin_mod.input
369 self._sys_raw_input = builtin_mod.input
366 builtin_mod.input = raw_input
370 builtin_mod.input = raw_input
367 else:
371 else:
368 self._sys_raw_input = builtin_mod.raw_input
372 self._sys_raw_input = builtin_mod.raw_input
369 self._sys_eval_input = builtin_mod.input
373 self._sys_eval_input = builtin_mod.input
370 builtin_mod.raw_input = raw_input
374 builtin_mod.raw_input = raw_input
371 builtin_mod.input = input
375 builtin_mod.input = input
372
376
373 # Set the parent message of the display hook and out streams.
377 # Set the parent message of the display hook and out streams.
374 shell.set_parent(parent)
378 shell.set_parent(parent)
375
379
376 # Re-broadcast our input for the benefit of listening clients, and
380 # Re-broadcast our input for the benefit of listening clients, and
377 # start computing output
381 # start computing output
378 if not silent:
382 if not silent:
379 self._publish_pyin(code, parent, shell.execution_count)
383 self._publish_pyin(code, parent, shell.execution_count)
380
384
381 reply_content = {}
385 reply_content = {}
382 try:
386 try:
383 # FIXME: the shell calls the exception handler itself.
387 # FIXME: the shell calls the exception handler itself.
384 shell.run_cell(code, store_history=store_history, silent=silent)
388 shell.run_cell(code, store_history=store_history, silent=silent)
385 except:
389 except:
386 status = u'error'
390 status = u'error'
387 # FIXME: this code right now isn't being used yet by default,
391 # FIXME: this code right now isn't being used yet by default,
388 # because the run_cell() call above directly fires off exception
392 # because the run_cell() call above directly fires off exception
389 # reporting. This code, therefore, is only active in the scenario
393 # reporting. This code, therefore, is only active in the scenario
390 # where runlines itself has an unhandled exception. We need to
394 # where runlines itself has an unhandled exception. We need to
391 # uniformize this, for all exception construction to come from a
395 # uniformize this, for all exception construction to come from a
392 # single location in the codbase.
396 # single location in the codbase.
393 etype, evalue, tb = sys.exc_info()
397 etype, evalue, tb = sys.exc_info()
394 tb_list = traceback.format_exception(etype, evalue, tb)
398 tb_list = traceback.format_exception(etype, evalue, tb)
395 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
399 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
396 else:
400 else:
397 status = u'ok'
401 status = u'ok'
398 finally:
402 finally:
399 # Restore raw_input.
403 # Restore raw_input.
400 if py3compat.PY3:
404 if py3compat.PY3:
401 builtin_mod.input = self._sys_raw_input
405 builtin_mod.input = self._sys_raw_input
402 else:
406 else:
403 builtin_mod.raw_input = self._sys_raw_input
407 builtin_mod.raw_input = self._sys_raw_input
404 builtin_mod.input = self._sys_eval_input
408 builtin_mod.input = self._sys_eval_input
405
409
406 reply_content[u'status'] = status
410 reply_content[u'status'] = status
407
411
408 # Return the execution counter so clients can display prompts
412 # Return the execution counter so clients can display prompts
409 reply_content['execution_count'] = shell.execution_count - 1
413 reply_content['execution_count'] = shell.execution_count - 1
410
414
411 # FIXME - fish exception info out of shell, possibly left there by
415 # FIXME - fish exception info out of shell, possibly left there by
412 # runlines. We'll need to clean up this logic later.
416 # runlines. We'll need to clean up this logic later.
413 if shell._reply_content is not None:
417 if shell._reply_content is not None:
414 reply_content.update(shell._reply_content)
418 reply_content.update(shell._reply_content)
415 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
419 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
416 reply_content['engine_info'] = e_info
420 reply_content['engine_info'] = e_info
417 # reset after use
421 # reset after use
418 shell._reply_content = None
422 shell._reply_content = None
419
423
420 if 'traceback' in reply_content:
424 if 'traceback' in reply_content:
421 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
425 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
422
426
423
427
424 # At this point, we can tell whether the main code execution succeeded
428 # At this point, we can tell whether the main code execution succeeded
425 # or not. If it did, we proceed to evaluate user_variables/expressions
429 # or not. If it did, we proceed to evaluate user_variables/expressions
426 if reply_content['status'] == 'ok':
430 if reply_content['status'] == 'ok':
427 reply_content[u'user_variables'] = \
431 reply_content[u'user_variables'] = \
428 shell.user_variables(content.get(u'user_variables', []))
432 shell.user_variables(content.get(u'user_variables', []))
429 reply_content[u'user_expressions'] = \
433 reply_content[u'user_expressions'] = \
430 shell.user_expressions(content.get(u'user_expressions', {}))
434 shell.user_expressions(content.get(u'user_expressions', {}))
431 else:
435 else:
432 # If there was an error, don't even try to compute variables or
436 # If there was an error, don't even try to compute variables or
433 # expressions
437 # expressions
434 reply_content[u'user_variables'] = {}
438 reply_content[u'user_variables'] = {}
435 reply_content[u'user_expressions'] = {}
439 reply_content[u'user_expressions'] = {}
436
440
437 # Payloads should be retrieved regardless of outcome, so we can both
441 # Payloads should be retrieved regardless of outcome, so we can both
438 # recover partial output (that could have been generated early in a
442 # recover partial output (that could have been generated early in a
439 # block, before an error) and clear the payload system always.
443 # block, before an error) and clear the payload system always.
440 reply_content[u'payload'] = shell.payload_manager.read_payload()
444 reply_content[u'payload'] = shell.payload_manager.read_payload()
441 # Be agressive about clearing the payload because we don't want
445 # Be agressive about clearing the payload because we don't want
442 # it to sit in memory until the next execute_request comes in.
446 # it to sit in memory until the next execute_request comes in.
443 shell.payload_manager.clear_payload()
447 shell.payload_manager.clear_payload()
444
448
445 # Flush output before sending the reply.
449 # Flush output before sending the reply.
446 sys.stdout.flush()
450 sys.stdout.flush()
447 sys.stderr.flush()
451 sys.stderr.flush()
448 # FIXME: on rare occasions, the flush doesn't seem to make it to the
452 # FIXME: on rare occasions, the flush doesn't seem to make it to the
449 # clients... This seems to mitigate the problem, but we definitely need
453 # clients... This seems to mitigate the problem, but we definitely need
450 # to better understand what's going on.
454 # to better understand what's going on.
451 if self._execute_sleep:
455 if self._execute_sleep:
452 time.sleep(self._execute_sleep)
456 time.sleep(self._execute_sleep)
453
457
454 # Send the reply.
458 # Send the reply.
455 reply_content = json_clean(reply_content)
459 reply_content = json_clean(reply_content)
456
460
457 md['status'] = reply_content['status']
461 md['status'] = reply_content['status']
458 if reply_content['status'] == 'error' and \
462 if reply_content['status'] == 'error' and \
459 reply_content['ename'] == 'UnmetDependency':
463 reply_content['ename'] == 'UnmetDependency':
460 md['dependencies_met'] = False
464 md['dependencies_met'] = False
461
465
462 reply_msg = self.session.send(stream, u'execute_reply',
466 reply_msg = self.session.send(stream, u'execute_reply',
463 reply_content, parent, metadata=md,
467 reply_content, parent, metadata=md,
464 ident=ident)
468 ident=ident)
465
469
466 self.log.debug("%s", reply_msg)
470 self.log.debug("%s", reply_msg)
467
471
468 if not silent and reply_msg['content']['status'] == u'error':
472 if not silent and reply_msg['content']['status'] == u'error':
469 self._abort_queues()
473 self._abort_queues()
470
474
471 self._publish_status(u'idle', parent)
475 self._publish_status(u'idle', parent)
472
476
473 def complete_request(self, stream, ident, parent):
477 def complete_request(self, stream, ident, parent):
474 txt, matches = self._complete(parent)
478 txt, matches = self._complete(parent)
475 matches = {'matches' : matches,
479 matches = {'matches' : matches,
476 'matched_text' : txt,
480 'matched_text' : txt,
477 'status' : 'ok'}
481 'status' : 'ok'}
478 matches = json_clean(matches)
482 matches = json_clean(matches)
479 completion_msg = self.session.send(stream, 'complete_reply',
483 completion_msg = self.session.send(stream, 'complete_reply',
480 matches, parent, ident)
484 matches, parent, ident)
481 self.log.debug("%s", completion_msg)
485 self.log.debug("%s", completion_msg)
482
486
483 def object_info_request(self, stream, ident, parent):
487 def object_info_request(self, stream, ident, parent):
484 content = parent['content']
488 content = parent['content']
485 object_info = self.shell.object_inspect(content['oname'],
489 object_info = self.shell.object_inspect(content['oname'],
486 detail_level = content.get('detail_level', 0)
490 detail_level = content.get('detail_level', 0)
487 )
491 )
488 # Before we send this object over, we scrub it for JSON usage
492 # Before we send this object over, we scrub it for JSON usage
489 oinfo = json_clean(object_info)
493 oinfo = json_clean(object_info)
490 msg = self.session.send(stream, 'object_info_reply',
494 msg = self.session.send(stream, 'object_info_reply',
491 oinfo, parent, ident)
495 oinfo, parent, ident)
492 self.log.debug("%s", msg)
496 self.log.debug("%s", msg)
493
497
494 def history_request(self, stream, ident, parent):
498 def history_request(self, stream, ident, parent):
495 # We need to pull these out, as passing **kwargs doesn't work with
499 # We need to pull these out, as passing **kwargs doesn't work with
496 # unicode keys before Python 2.6.5.
500 # unicode keys before Python 2.6.5.
497 hist_access_type = parent['content']['hist_access_type']
501 hist_access_type = parent['content']['hist_access_type']
498 raw = parent['content']['raw']
502 raw = parent['content']['raw']
499 output = parent['content']['output']
503 output = parent['content']['output']
500 if hist_access_type == 'tail':
504 if hist_access_type == 'tail':
501 n = parent['content']['n']
505 n = parent['content']['n']
502 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
506 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
503 include_latest=True)
507 include_latest=True)
504
508
505 elif hist_access_type == 'range':
509 elif hist_access_type == 'range':
506 session = parent['content']['session']
510 session = parent['content']['session']
507 start = parent['content']['start']
511 start = parent['content']['start']
508 stop = parent['content']['stop']
512 stop = parent['content']['stop']
509 hist = self.shell.history_manager.get_range(session, start, stop,
513 hist = self.shell.history_manager.get_range(session, start, stop,
510 raw=raw, output=output)
514 raw=raw, output=output)
511
515
512 elif hist_access_type == 'search':
516 elif hist_access_type == 'search':
513 n = parent['content'].get('n')
517 n = parent['content'].get('n')
514 unique = parent['content'].get('unique', False)
518 unique = parent['content'].get('unique', False)
515 pattern = parent['content']['pattern']
519 pattern = parent['content']['pattern']
516 hist = self.shell.history_manager.search(
520 hist = self.shell.history_manager.search(
517 pattern, raw=raw, output=output, n=n, unique=unique)
521 pattern, raw=raw, output=output, n=n, unique=unique)
518
522
519 else:
523 else:
520 hist = []
524 hist = []
521 hist = list(hist)
525 hist = list(hist)
522 content = {'history' : hist}
526 content = {'history' : hist}
523 content = json_clean(content)
527 content = json_clean(content)
524 msg = self.session.send(stream, 'history_reply',
528 msg = self.session.send(stream, 'history_reply',
525 content, parent, ident)
529 content, parent, ident)
526 self.log.debug("Sending history reply with %i entries", len(hist))
530 self.log.debug("Sending history reply with %i entries", len(hist))
527
531
528 def connect_request(self, stream, ident, parent):
532 def connect_request(self, stream, ident, parent):
529 if self._recorded_ports is not None:
533 if self._recorded_ports is not None:
530 content = self._recorded_ports.copy()
534 content = self._recorded_ports.copy()
531 else:
535 else:
532 content = {}
536 content = {}
533 msg = self.session.send(stream, 'connect_reply',
537 msg = self.session.send(stream, 'connect_reply',
534 content, parent, ident)
538 content, parent, ident)
535 self.log.debug("%s", msg)
539 self.log.debug("%s", msg)
536
540
537 def kernel_info_request(self, stream, ident, parent):
541 def kernel_info_request(self, stream, ident, parent):
538 vinfo = {
542 vinfo = {
539 'protocol_version': protocol_version,
543 'protocol_version': protocol_version,
540 'ipython_version': ipython_version,
544 'ipython_version': ipython_version,
541 'language_version': language_version,
545 'language_version': language_version,
542 'language': 'python',
546 'language': 'python',
543 }
547 }
544 msg = self.session.send(stream, 'kernel_info_reply',
548 msg = self.session.send(stream, 'kernel_info_reply',
545 vinfo, parent, ident)
549 vinfo, parent, ident)
546 self.log.debug("%s", msg)
550 self.log.debug("%s", msg)
547
551
548 def shutdown_request(self, stream, ident, parent):
552 def shutdown_request(self, stream, ident, parent):
549 self.shell.exit_now = True
553 self.shell.exit_now = True
550 content = dict(status='ok')
554 content = dict(status='ok')
551 content.update(parent['content'])
555 content.update(parent['content'])
552 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
556 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
553 # same content, but different msg_id for broadcasting on IOPub
557 # same content, but different msg_id for broadcasting on IOPub
554 self._shutdown_message = self.session.msg(u'shutdown_reply',
558 self._shutdown_message = self.session.msg(u'shutdown_reply',
555 content, parent
559 content, parent
556 )
560 )
557
561
558 self._at_shutdown()
562 self._at_shutdown()
559 # call sys.exit after a short delay
563 # call sys.exit after a short delay
560 loop = ioloop.IOLoop.instance()
564 loop = ioloop.IOLoop.instance()
561 loop.add_timeout(time.time()+0.1, loop.stop)
565 loop.add_timeout(time.time()+0.1, loop.stop)
562
566
563 #---------------------------------------------------------------------------
567 #---------------------------------------------------------------------------
564 # Engine methods
568 # Engine methods
565 #---------------------------------------------------------------------------
569 #---------------------------------------------------------------------------
566
570
567 def apply_request(self, stream, ident, parent):
571 def apply_request(self, stream, ident, parent):
568 try:
572 try:
569 content = parent[u'content']
573 content = parent[u'content']
570 bufs = parent[u'buffers']
574 bufs = parent[u'buffers']
571 msg_id = parent['header']['msg_id']
575 msg_id = parent['header']['msg_id']
572 except:
576 except:
573 self.log.error("Got bad msg: %s", parent, exc_info=True)
577 self.log.error("Got bad msg: %s", parent, exc_info=True)
574 return
578 return
575
579
576 self._publish_status(u'busy', parent)
580 self._publish_status(u'busy', parent)
577
581
578 # Set the parent message of the display hook and out streams.
582 # Set the parent message of the display hook and out streams.
579 shell = self.shell
583 shell = self.shell
580 shell.set_parent(parent)
584 shell.set_parent(parent)
581
585
582 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
586 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
583 # self.iopub_socket.send(pyin_msg)
587 # self.iopub_socket.send(pyin_msg)
584 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
588 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
585 md = self._make_metadata(parent['metadata'])
589 md = self._make_metadata(parent['metadata'])
586 try:
590 try:
587 working = shell.user_ns
591 working = shell.user_ns
588
592
589 prefix = "_"+str(msg_id).replace("-","")+"_"
593 prefix = "_"+str(msg_id).replace("-","")+"_"
590
594
591 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
595 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
592
596
593 fname = getattr(f, '__name__', 'f')
597 fname = getattr(f, '__name__', 'f')
594
598
595 fname = prefix+"f"
599 fname = prefix+"f"
596 argname = prefix+"args"
600 argname = prefix+"args"
597 kwargname = prefix+"kwargs"
601 kwargname = prefix+"kwargs"
598 resultname = prefix+"result"
602 resultname = prefix+"result"
599
603
600 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
604 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
601 # print ns
605 # print ns
602 working.update(ns)
606 working.update(ns)
603 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
607 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
604 try:
608 try:
605 exec(code, shell.user_global_ns, shell.user_ns)
609 exec(code, shell.user_global_ns, shell.user_ns)
606 result = working.get(resultname)
610 result = working.get(resultname)
607 finally:
611 finally:
608 for key in ns:
612 for key in ns:
609 working.pop(key)
613 working.pop(key)
610
614
611 result_buf = serialize_object(result,
615 result_buf = serialize_object(result,
612 buffer_threshold=self.session.buffer_threshold,
616 buffer_threshold=self.session.buffer_threshold,
613 item_threshold=self.session.item_threshold,
617 item_threshold=self.session.item_threshold,
614 )
618 )
615
619
616 except:
620 except:
617 # invoke IPython traceback formatting
621 # invoke IPython traceback formatting
618 shell.showtraceback()
622 shell.showtraceback()
619 # FIXME - fish exception info out of shell, possibly left there by
623 # FIXME - fish exception info out of shell, possibly left there by
620 # run_code. We'll need to clean up this logic later.
624 # run_code. We'll need to clean up this logic later.
621 reply_content = {}
625 reply_content = {}
622 if shell._reply_content is not None:
626 if shell._reply_content is not None:
623 reply_content.update(shell._reply_content)
627 reply_content.update(shell._reply_content)
624 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
628 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
625 reply_content['engine_info'] = e_info
629 reply_content['engine_info'] = e_info
626 # reset after use
630 # reset after use
627 shell._reply_content = None
631 shell._reply_content = None
628
632
629 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
633 self.session.send(self.iopub_socket, u'pyerr', reply_content, parent=parent,
630 ident=self._topic('pyerr'))
634 ident=self._topic('pyerr'))
631 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
635 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
632 result_buf = []
636 result_buf = []
633
637
634 if reply_content['ename'] == 'UnmetDependency':
638 if reply_content['ename'] == 'UnmetDependency':
635 md['dependencies_met'] = False
639 md['dependencies_met'] = False
636 else:
640 else:
637 reply_content = {'status' : 'ok'}
641 reply_content = {'status' : 'ok'}
638
642
639 # put 'ok'/'error' status in header, for scheduler introspection:
643 # put 'ok'/'error' status in header, for scheduler introspection:
640 md['status'] = reply_content['status']
644 md['status'] = reply_content['status']
641
645
642 # flush i/o
646 # flush i/o
643 sys.stdout.flush()
647 sys.stdout.flush()
644 sys.stderr.flush()
648 sys.stderr.flush()
645
649
646 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
650 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
647 parent=parent, ident=ident,buffers=result_buf, metadata=md)
651 parent=parent, ident=ident,buffers=result_buf, metadata=md)
648
652
649 self._publish_status(u'idle', parent)
653 self._publish_status(u'idle', parent)
650
654
651 #---------------------------------------------------------------------------
655 #---------------------------------------------------------------------------
652 # Control messages
656 # Control messages
653 #---------------------------------------------------------------------------
657 #---------------------------------------------------------------------------
654
658
655 def abort_request(self, stream, ident, parent):
659 def abort_request(self, stream, ident, parent):
656 """abort a specifig msg by id"""
660 """abort a specifig msg by id"""
657 msg_ids = parent['content'].get('msg_ids', None)
661 msg_ids = parent['content'].get('msg_ids', None)
658 if isinstance(msg_ids, string_types):
662 if isinstance(msg_ids, string_types):
659 msg_ids = [msg_ids]
663 msg_ids = [msg_ids]
660 if not msg_ids:
664 if not msg_ids:
661 self.abort_queues()
665 self.abort_queues()
662 for mid in msg_ids:
666 for mid in msg_ids:
663 self.aborted.add(str(mid))
667 self.aborted.add(str(mid))
664
668
665 content = dict(status='ok')
669 content = dict(status='ok')
666 reply_msg = self.session.send(stream, 'abort_reply', content=content,
670 reply_msg = self.session.send(stream, 'abort_reply', content=content,
667 parent=parent, ident=ident)
671 parent=parent, ident=ident)
668 self.log.debug("%s", reply_msg)
672 self.log.debug("%s", reply_msg)
669
673
670 def clear_request(self, stream, idents, parent):
674 def clear_request(self, stream, idents, parent):
671 """Clear our namespace."""
675 """Clear our namespace."""
672 self.shell.reset(False)
676 self.shell.reset(False)
673 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
677 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
674 content = dict(status='ok'))
678 content = dict(status='ok'))
675
679
676
680
677 #---------------------------------------------------------------------------
681 #---------------------------------------------------------------------------
678 # Protected interface
682 # Protected interface
679 #---------------------------------------------------------------------------
683 #---------------------------------------------------------------------------
680
684
681 def _wrap_exception(self, method=None):
685 def _wrap_exception(self, method=None):
682 # import here, because _wrap_exception is only used in parallel,
686 # import here, because _wrap_exception is only used in parallel,
683 # and parallel has higher min pyzmq version
687 # and parallel has higher min pyzmq version
684 from IPython.parallel.error import wrap_exception
688 from IPython.parallel.error import wrap_exception
685 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
689 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
686 content = wrap_exception(e_info)
690 content = wrap_exception(e_info)
687 return content
691 return content
688
692
689 def _topic(self, topic):
693 def _topic(self, topic):
690 """prefixed topic for IOPub messages"""
694 """prefixed topic for IOPub messages"""
691 if self.int_id >= 0:
695 if self.int_id >= 0:
692 base = "engine.%i" % self.int_id
696 base = "engine.%i" % self.int_id
693 else:
697 else:
694 base = "kernel.%s" % self.ident
698 base = "kernel.%s" % self.ident
695
699
696 return py3compat.cast_bytes("%s.%s" % (base, topic))
700 return py3compat.cast_bytes("%s.%s" % (base, topic))
697
701
698 def _abort_queues(self):
702 def _abort_queues(self):
699 for stream in self.shell_streams:
703 for stream in self.shell_streams:
700 if stream:
704 if stream:
701 self._abort_queue(stream)
705 self._abort_queue(stream)
702
706
703 def _abort_queue(self, stream):
707 def _abort_queue(self, stream):
704 poller = zmq.Poller()
708 poller = zmq.Poller()
705 poller.register(stream.socket, zmq.POLLIN)
709 poller.register(stream.socket, zmq.POLLIN)
706 while True:
710 while True:
707 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
711 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
708 if msg is None:
712 if msg is None:
709 return
713 return
710
714
711 self.log.info("Aborting:")
715 self.log.info("Aborting:")
712 self.log.info("%s", msg)
716 self.log.info("%s", msg)
713 msg_type = msg['header']['msg_type']
717 msg_type = msg['header']['msg_type']
714 reply_type = msg_type.split('_')[0] + '_reply'
718 reply_type = msg_type.split('_')[0] + '_reply'
715
719
716 status = {'status' : 'aborted'}
720 status = {'status' : 'aborted'}
717 md = {'engine' : self.ident}
721 md = {'engine' : self.ident}
718 md.update(status)
722 md.update(status)
719 reply_msg = self.session.send(stream, reply_type, metadata=md,
723 reply_msg = self.session.send(stream, reply_type, metadata=md,
720 content=status, parent=msg, ident=idents)
724 content=status, parent=msg, ident=idents)
721 self.log.debug("%s", reply_msg)
725 self.log.debug("%s", reply_msg)
722 # We need to wait a bit for requests to come in. This can probably
726 # We need to wait a bit for requests to come in. This can probably
723 # be set shorter for true asynchronous clients.
727 # be set shorter for true asynchronous clients.
724 poller.poll(50)
728 poller.poll(50)
725
729
726
730
727 def _no_raw_input(self):
731 def _no_raw_input(self):
728 """Raise StdinNotImplentedError if active frontend doesn't support
732 """Raise StdinNotImplentedError if active frontend doesn't support
729 stdin."""
733 stdin."""
730 raise StdinNotImplementedError("raw_input was called, but this "
734 raise StdinNotImplementedError("raw_input was called, but this "
731 "frontend does not support stdin.")
735 "frontend does not support stdin.")
732
736
733 def _raw_input(self, prompt, ident, parent):
737 def _raw_input(self, prompt, ident, parent):
734 # Flush output before making the request.
738 # Flush output before making the request.
735 sys.stderr.flush()
739 sys.stderr.flush()
736 sys.stdout.flush()
740 sys.stdout.flush()
737 # flush the stdin socket, to purge stale replies
741 # flush the stdin socket, to purge stale replies
738 while True:
742 while True:
739 try:
743 try:
740 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
744 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
741 except zmq.ZMQError as e:
745 except zmq.ZMQError as e:
742 if e.errno == zmq.EAGAIN:
746 if e.errno == zmq.EAGAIN:
743 break
747 break
744 else:
748 else:
745 raise
749 raise
746
750
747 # Send the input request.
751 # Send the input request.
748 content = json_clean(dict(prompt=prompt))
752 content = json_clean(dict(prompt=prompt))
749 self.session.send(self.stdin_socket, u'input_request', content, parent,
753 self.session.send(self.stdin_socket, u'input_request', content, parent,
750 ident=ident)
754 ident=ident)
751
755
752 # Await a response.
756 # Await a response.
753 while True:
757 while True:
754 try:
758 try:
755 ident, reply = self.session.recv(self.stdin_socket, 0)
759 ident, reply = self.session.recv(self.stdin_socket, 0)
756 except Exception:
760 except Exception:
757 self.log.warn("Invalid Message:", exc_info=True)
761 self.log.warn("Invalid Message:", exc_info=True)
758 except KeyboardInterrupt:
762 except KeyboardInterrupt:
759 # re-raise KeyboardInterrupt, to truncate traceback
763 # re-raise KeyboardInterrupt, to truncate traceback
760 raise KeyboardInterrupt
764 raise KeyboardInterrupt
761 else:
765 else:
762 break
766 break
763 try:
767 try:
764 value = py3compat.unicode_to_str(reply['content']['value'])
768 value = py3compat.unicode_to_str(reply['content']['value'])
765 except:
769 except:
766 self.log.error("Got bad raw_input reply: ")
770 self.log.error("Got bad raw_input reply: ")
767 self.log.error("%s", parent)
771 self.log.error("%s", parent)
768 value = ''
772 value = ''
769 if value == '\x04':
773 if value == '\x04':
770 # EOF
774 # EOF
771 raise EOFError
775 raise EOFError
772 return value
776 return value
773
777
774 def _complete(self, msg):
778 def _complete(self, msg):
775 c = msg['content']
779 c = msg['content']
776 try:
780 try:
777 cpos = int(c['cursor_pos'])
781 cpos = int(c['cursor_pos'])
778 except:
782 except:
779 # If we don't get something that we can convert to an integer, at
783 # If we don't get something that we can convert to an integer, at
780 # least attempt the completion guessing the cursor is at the end of
784 # least attempt the completion guessing the cursor is at the end of
781 # the text, if there's any, and otherwise of the line
785 # the text, if there's any, and otherwise of the line
782 cpos = len(c['text'])
786 cpos = len(c['text'])
783 if cpos==0:
787 if cpos==0:
784 cpos = len(c['line'])
788 cpos = len(c['line'])
785 return self.shell.complete(c['text'], c['line'], cpos)
789 return self.shell.complete(c['text'], c['line'], cpos)
786
790
787 def _at_shutdown(self):
791 def _at_shutdown(self):
788 """Actions taken at shutdown by the kernel, called by python's atexit.
792 """Actions taken at shutdown by the kernel, called by python's atexit.
789 """
793 """
790 # io.rprint("Kernel at_shutdown") # dbg
794 # io.rprint("Kernel at_shutdown") # dbg
791 if self._shutdown_message is not None:
795 if self._shutdown_message is not None:
792 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
796 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
793 self.log.debug("%s", self._shutdown_message)
797 self.log.debug("%s", self._shutdown_message)
794 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
798 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
795
799
General Comments 0
You need to be logged in to leave comments. Login now