##// END OF EJS Templates
track parent in kernel...
MinRK -
Show More
@@ -1,809 +1,846 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """An interactive kernel that talks to frontends over 0MQ."""
2 """An interactive kernel that talks to frontends over 0MQ."""
3
3
4 # Copyright (c) IPython Development Team.
4 # Copyright (c) IPython Development Team.
5 # Distributed under the terms of the Modified BSD License.
5 # Distributed under the terms of the Modified BSD License.
6
6
7 from __future__ import print_function
7 from __future__ import print_function
8
8
9 import getpass
9 import getpass
10 import sys
10 import sys
11 import time
11 import time
12 import traceback
12 import traceback
13 import logging
13 import logging
14 import uuid
14 import uuid
15
15
16 from datetime import datetime
16 from datetime import datetime
17 from signal import (
17 from signal import (
18 signal, default_int_handler, SIGINT
18 signal, default_int_handler, SIGINT
19 )
19 )
20
20
21 import zmq
21 import zmq
22 from zmq.eventloop import ioloop
22 from zmq.eventloop import ioloop
23 from zmq.eventloop.zmqstream import ZMQStream
23 from zmq.eventloop.zmqstream import ZMQStream
24
24
25 from IPython.config.configurable import Configurable
25 from IPython.config.configurable import Configurable
26 from IPython.core.error import StdinNotImplementedError
26 from IPython.core.error import StdinNotImplementedError
27 from IPython.core import release
27 from IPython.core import release
28 from IPython.utils import py3compat
28 from IPython.utils import py3compat
29 from IPython.utils.py3compat import builtin_mod, unicode_type, string_types
29 from IPython.utils.py3compat import builtin_mod, unicode_type, string_types
30 from IPython.utils.jsonutil import json_clean
30 from IPython.utils.jsonutil import json_clean
31 from IPython.utils.traitlets import (
31 from IPython.utils.traitlets import (
32 Any, Instance, Float, Dict, List, Set, Integer, Unicode,
32 Any, Instance, Float, Dict, List, Set, Integer, Unicode,
33 Type, Bool,
33 Type, Bool,
34 )
34 )
35
35
36 from .serialize import serialize_object, unpack_apply_message
36 from .serialize import serialize_object, unpack_apply_message
37 from .session import Session
37 from .session import Session
38 from .zmqshell import ZMQInteractiveShell
38 from .zmqshell import ZMQInteractiveShell
39
39
40
40
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42 # Main kernel class
42 # Main kernel class
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44
44
45 protocol_version = release.kernel_protocol_version
45 protocol_version = release.kernel_protocol_version
46 ipython_version = release.version
46 ipython_version = release.version
47 language_version = sys.version.split()[0]
47 language_version = sys.version.split()[0]
48
48
49
49
50 class Kernel(Configurable):
50 class Kernel(Configurable):
51
51
52 #---------------------------------------------------------------------------
52 #---------------------------------------------------------------------------
53 # Kernel interface
53 # Kernel interface
54 #---------------------------------------------------------------------------
54 #---------------------------------------------------------------------------
55
55
56 # attribute to override with a GUI
56 # attribute to override with a GUI
57 eventloop = Any(None)
57 eventloop = Any(None)
58 def _eventloop_changed(self, name, old, new):
58 def _eventloop_changed(self, name, old, new):
59 """schedule call to eventloop from IOLoop"""
59 """schedule call to eventloop from IOLoop"""
60 loop = ioloop.IOLoop.instance()
60 loop = ioloop.IOLoop.instance()
61 loop.add_callback(self.enter_eventloop)
61 loop.add_callback(self.enter_eventloop)
62
62
63 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
63 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
64 shell_class = Type(ZMQInteractiveShell)
64 shell_class = Type(ZMQInteractiveShell)
65
65
66 session = Instance(Session)
66 session = Instance(Session)
67 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
67 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
68 shell_streams = List()
68 shell_streams = List()
69 control_stream = Instance(ZMQStream)
69 control_stream = Instance(ZMQStream)
70 iopub_socket = Instance(zmq.Socket)
70 iopub_socket = Instance(zmq.Socket)
71 stdin_socket = Instance(zmq.Socket)
71 stdin_socket = Instance(zmq.Socket)
72 log = Instance(logging.Logger)
72 log = Instance(logging.Logger)
73
73
74 user_module = Any()
74 user_module = Any()
75 def _user_module_changed(self, name, old, new):
75 def _user_module_changed(self, name, old, new):
76 if self.shell is not None:
76 if self.shell is not None:
77 self.shell.user_module = new
77 self.shell.user_module = new
78
78
79 user_ns = Instance(dict, args=None, allow_none=True)
79 user_ns = Instance(dict, args=None, allow_none=True)
80 def _user_ns_changed(self, name, old, new):
80 def _user_ns_changed(self, name, old, new):
81 if self.shell is not None:
81 if self.shell is not None:
82 self.shell.user_ns = new
82 self.shell.user_ns = new
83 self.shell.init_user_ns()
83 self.shell.init_user_ns()
84
84
85 # identities:
85 # identities:
86 int_id = Integer(-1)
86 int_id = Integer(-1)
87 ident = Unicode()
87 ident = Unicode()
88
88
89 def _ident_default(self):
89 def _ident_default(self):
90 return unicode_type(uuid.uuid4())
90 return unicode_type(uuid.uuid4())
91
91
92 # Private interface
92 # Private interface
93
93
94 _darwin_app_nap = Bool(True, config=True,
94 _darwin_app_nap = Bool(True, config=True,
95 help="""Whether to use appnope for compatiblity with OS X App Nap.
95 help="""Whether to use appnope for compatiblity with OS X App Nap.
96
96
97 Only affects OS X >= 10.9.
97 Only affects OS X >= 10.9.
98 """
98 """
99 )
99 )
100
100
101 # track associations with current request
102 _allow_stdin = Bool(False)
103 _parent_header = Dict()
104 _parent_ident = Any(b'')
101 # Time to sleep after flushing the stdout/err buffers in each execute
105 # Time to sleep after flushing the stdout/err buffers in each execute
102 # cycle. While this introduces a hard limit on the minimal latency of the
106 # cycle. While this introduces a hard limit on the minimal latency of the
103 # execute cycle, it helps prevent output synchronization problems for
107 # execute cycle, it helps prevent output synchronization problems for
104 # clients.
108 # clients.
105 # Units are in seconds. The minimum zmq latency on local host is probably
109 # Units are in seconds. The minimum zmq latency on local host is probably
106 # ~150 microseconds, set this to 500us for now. We may need to increase it
110 # ~150 microseconds, set this to 500us for now. We may need to increase it
107 # a little if it's not enough after more interactive testing.
111 # a little if it's not enough after more interactive testing.
108 _execute_sleep = Float(0.0005, config=True)
112 _execute_sleep = Float(0.0005, config=True)
109
113
110 # Frequency of the kernel's event loop.
114 # Frequency of the kernel's event loop.
111 # Units are in seconds, kernel subclasses for GUI toolkits may need to
115 # Units are in seconds, kernel subclasses for GUI toolkits may need to
112 # adapt to milliseconds.
116 # adapt to milliseconds.
113 _poll_interval = Float(0.05, config=True)
117 _poll_interval = Float(0.05, config=True)
114
118
115 # If the shutdown was requested over the network, we leave here the
119 # If the shutdown was requested over the network, we leave here the
116 # necessary reply message so it can be sent by our registered atexit
120 # necessary reply message so it can be sent by our registered atexit
117 # handler. This ensures that the reply is only sent to clients truly at
121 # handler. This ensures that the reply is only sent to clients truly at
118 # the end of our shutdown process (which happens after the underlying
122 # the end of our shutdown process (which happens after the underlying
119 # IPython shell's own shutdown).
123 # IPython shell's own shutdown).
120 _shutdown_message = None
124 _shutdown_message = None
121
125
122 # This is a dict of port number that the kernel is listening on. It is set
126 # This is a dict of port number that the kernel is listening on. It is set
123 # by record_ports and used by connect_request.
127 # by record_ports and used by connect_request.
124 _recorded_ports = Dict()
128 _recorded_ports = Dict()
125
129
126 # A reference to the Python builtin 'raw_input' function.
130 # A reference to the Python builtin 'raw_input' function.
127 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
131 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
128 _sys_raw_input = Any()
132 _sys_raw_input = Any()
129 _sys_eval_input = Any()
133 _sys_eval_input = Any()
130
134
131 # set of aborted msg_ids
135 # set of aborted msg_ids
132 aborted = Set()
136 aborted = Set()
133
137
134
138
135 def __init__(self, **kwargs):
139 def __init__(self, **kwargs):
136 super(Kernel, self).__init__(**kwargs)
140 super(Kernel, self).__init__(**kwargs)
137
141
138 # Initialize the InteractiveShell subclass
142 # Initialize the InteractiveShell subclass
139 self.shell = self.shell_class.instance(parent=self,
143 self.shell = self.shell_class.instance(parent=self,
140 profile_dir = self.profile_dir,
144 profile_dir = self.profile_dir,
141 user_module = self.user_module,
145 user_module = self.user_module,
142 user_ns = self.user_ns,
146 user_ns = self.user_ns,
143 kernel = self,
147 kernel = self,
144 )
148 )
145 self.shell.displayhook.session = self.session
149 self.shell.displayhook.session = self.session
146 self.shell.displayhook.pub_socket = self.iopub_socket
150 self.shell.displayhook.pub_socket = self.iopub_socket
147 self.shell.displayhook.topic = self._topic('execute_result')
151 self.shell.displayhook.topic = self._topic('execute_result')
148 self.shell.display_pub.session = self.session
152 self.shell.display_pub.session = self.session
149 self.shell.display_pub.pub_socket = self.iopub_socket
153 self.shell.display_pub.pub_socket = self.iopub_socket
150 self.shell.data_pub.session = self.session
154 self.shell.data_pub.session = self.session
151 self.shell.data_pub.pub_socket = self.iopub_socket
155 self.shell.data_pub.pub_socket = self.iopub_socket
152
156
153 # TMP - hack while developing
157 # TMP - hack while developing
154 self.shell._reply_content = None
158 self.shell._reply_content = None
155
159
156 # Build dict of handlers for message types
160 # Build dict of handlers for message types
157 msg_types = [ 'execute_request', 'complete_request',
161 msg_types = [ 'execute_request', 'complete_request',
158 'object_info_request', 'history_request',
162 'object_info_request', 'history_request',
159 'kernel_info_request',
163 'kernel_info_request',
160 'connect_request', 'shutdown_request',
164 'connect_request', 'shutdown_request',
161 'apply_request',
165 'apply_request',
162 ]
166 ]
163 self.shell_handlers = {}
167 self.shell_handlers = {}
164 for msg_type in msg_types:
168 for msg_type in msg_types:
165 self.shell_handlers[msg_type] = getattr(self, msg_type)
169 self.shell_handlers[msg_type] = getattr(self, msg_type)
166
170
167 comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]
171 comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]
168 comm_manager = self.shell.comm_manager
172 comm_manager = self.shell.comm_manager
169 for msg_type in comm_msg_types:
173 for msg_type in comm_msg_types:
170 self.shell_handlers[msg_type] = getattr(comm_manager, msg_type)
174 self.shell_handlers[msg_type] = getattr(comm_manager, msg_type)
171
175
172 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
176 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
173 self.control_handlers = {}
177 self.control_handlers = {}
174 for msg_type in control_msg_types:
178 for msg_type in control_msg_types:
175 self.control_handlers[msg_type] = getattr(self, msg_type)
179 self.control_handlers[msg_type] = getattr(self, msg_type)
176
180
177
181
178 def dispatch_control(self, msg):
182 def dispatch_control(self, msg):
179 """dispatch control requests"""
183 """dispatch control requests"""
180 idents,msg = self.session.feed_identities(msg, copy=False)
184 idents,msg = self.session.feed_identities(msg, copy=False)
181 try:
185 try:
182 msg = self.session.unserialize(msg, content=True, copy=False)
186 msg = self.session.unserialize(msg, content=True, copy=False)
183 except:
187 except:
184 self.log.error("Invalid Control Message", exc_info=True)
188 self.log.error("Invalid Control Message", exc_info=True)
185 return
189 return
186
190
187 self.log.debug("Control received: %s", msg)
191 self.log.debug("Control received: %s", msg)
188
192
189 header = msg['header']
193 header = msg['header']
190 msg_id = header['msg_id']
194 msg_id = header['msg_id']
191 msg_type = header['msg_type']
195 msg_type = header['msg_type']
192
196
193 handler = self.control_handlers.get(msg_type, None)
197 handler = self.control_handlers.get(msg_type, None)
194 if handler is None:
198 if handler is None:
195 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
199 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
196 else:
200 else:
197 try:
201 try:
198 handler(self.control_stream, idents, msg)
202 handler(self.control_stream, idents, msg)
199 except Exception:
203 except Exception:
200 self.log.error("Exception in control handler:", exc_info=True)
204 self.log.error("Exception in control handler:", exc_info=True)
201
205
202 def dispatch_shell(self, stream, msg):
206 def dispatch_shell(self, stream, msg):
203 """dispatch shell requests"""
207 """dispatch shell requests"""
204 # flush control requests first
208 # flush control requests first
205 if self.control_stream:
209 if self.control_stream:
206 self.control_stream.flush()
210 self.control_stream.flush()
207
211
208 idents,msg = self.session.feed_identities(msg, copy=False)
212 idents,msg = self.session.feed_identities(msg, copy=False)
209 try:
213 try:
210 msg = self.session.unserialize(msg, content=True, copy=False)
214 msg = self.session.unserialize(msg, content=True, copy=False)
211 except:
215 except:
212 self.log.error("Invalid Message", exc_info=True)
216 self.log.error("Invalid Message", exc_info=True)
213 return
217 return
214
218
215 header = msg['header']
219 header = msg['header']
216 msg_id = header['msg_id']
220 msg_id = header['msg_id']
217 msg_type = msg['header']['msg_type']
221 msg_type = msg['header']['msg_type']
218
222
219 # Print some info about this message and leave a '--->' marker, so it's
223 # Print some info about this message and leave a '--->' marker, so it's
220 # easier to trace visually the message chain when debugging. Each
224 # easier to trace visually the message chain when debugging. Each
221 # handler prints its message at the end.
225 # handler prints its message at the end.
222 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
226 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
223 self.log.debug(' Content: %s\n --->\n ', msg['content'])
227 self.log.debug(' Content: %s\n --->\n ', msg['content'])
224
228
225 if msg_id in self.aborted:
229 if msg_id in self.aborted:
226 self.aborted.remove(msg_id)
230 self.aborted.remove(msg_id)
227 # is it safe to assume a msg_id will not be resubmitted?
231 # is it safe to assume a msg_id will not be resubmitted?
228 reply_type = msg_type.split('_')[0] + '_reply'
232 reply_type = msg_type.split('_')[0] + '_reply'
229 status = {'status' : 'aborted'}
233 status = {'status' : 'aborted'}
230 md = {'engine' : self.ident}
234 md = {'engine' : self.ident}
231 md.update(status)
235 md.update(status)
232 reply_msg = self.session.send(stream, reply_type, metadata=md,
236 reply_msg = self.session.send(stream, reply_type, metadata=md,
233 content=status, parent=msg, ident=idents)
237 content=status, parent=msg, ident=idents)
234 return
238 return
235
239
236 handler = self.shell_handlers.get(msg_type, None)
240 handler = self.shell_handlers.get(msg_type, None)
237 if handler is None:
241 if handler is None:
238 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
242 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
239 else:
243 else:
240 # ensure default_int_handler during handler call
244 # ensure default_int_handler during handler call
241 sig = signal(SIGINT, default_int_handler)
245 sig = signal(SIGINT, default_int_handler)
242 try:
246 try:
243 handler(stream, idents, msg)
247 handler(stream, idents, msg)
244 except Exception:
248 except Exception:
245 self.log.error("Exception in message handler:", exc_info=True)
249 self.log.error("Exception in message handler:", exc_info=True)
246 finally:
250 finally:
247 signal(SIGINT, sig)
251 signal(SIGINT, sig)
248
252
249 def enter_eventloop(self):
253 def enter_eventloop(self):
250 """enter eventloop"""
254 """enter eventloop"""
251 self.log.info("entering eventloop %s", self.eventloop)
255 self.log.info("entering eventloop %s", self.eventloop)
252 for stream in self.shell_streams:
256 for stream in self.shell_streams:
253 # flush any pending replies,
257 # flush any pending replies,
254 # which may be skipped by entering the eventloop
258 # which may be skipped by entering the eventloop
255 stream.flush(zmq.POLLOUT)
259 stream.flush(zmq.POLLOUT)
256 # restore default_int_handler
260 # restore default_int_handler
257 signal(SIGINT, default_int_handler)
261 signal(SIGINT, default_int_handler)
258 while self.eventloop is not None:
262 while self.eventloop is not None:
259 try:
263 try:
260 self.eventloop(self)
264 self.eventloop(self)
261 except KeyboardInterrupt:
265 except KeyboardInterrupt:
262 # Ctrl-C shouldn't crash the kernel
266 # Ctrl-C shouldn't crash the kernel
263 self.log.error("KeyboardInterrupt caught in kernel")
267 self.log.error("KeyboardInterrupt caught in kernel")
264 continue
268 continue
265 else:
269 else:
266 # eventloop exited cleanly, this means we should stop (right?)
270 # eventloop exited cleanly, this means we should stop (right?)
267 self.eventloop = None
271 self.eventloop = None
268 break
272 break
269 self.log.info("exiting eventloop")
273 self.log.info("exiting eventloop")
270
274
271 def start(self):
275 def start(self):
272 """register dispatchers for streams"""
276 """register dispatchers for streams"""
273 self.shell.exit_now = False
277 self.shell.exit_now = False
274 if self.control_stream:
278 if self.control_stream:
275 self.control_stream.on_recv(self.dispatch_control, copy=False)
279 self.control_stream.on_recv(self.dispatch_control, copy=False)
276
280
277 def make_dispatcher(stream):
281 def make_dispatcher(stream):
278 def dispatcher(msg):
282 def dispatcher(msg):
279 return self.dispatch_shell(stream, msg)
283 return self.dispatch_shell(stream, msg)
280 return dispatcher
284 return dispatcher
281
285
282 for s in self.shell_streams:
286 for s in self.shell_streams:
283 s.on_recv(make_dispatcher(s), copy=False)
287 s.on_recv(make_dispatcher(s), copy=False)
284
288
285 # publish idle status
289 # publish idle status
286 self._publish_status('starting')
290 self._publish_status('starting')
287
291
288 def do_one_iteration(self):
292 def do_one_iteration(self):
289 """step eventloop just once"""
293 """step eventloop just once"""
290 if self.control_stream:
294 if self.control_stream:
291 self.control_stream.flush()
295 self.control_stream.flush()
292 for stream in self.shell_streams:
296 for stream in self.shell_streams:
293 # handle at most one request per iteration
297 # handle at most one request per iteration
294 stream.flush(zmq.POLLIN, 1)
298 stream.flush(zmq.POLLIN, 1)
295 stream.flush(zmq.POLLOUT)
299 stream.flush(zmq.POLLOUT)
296
300
297
301
298 def record_ports(self, ports):
302 def record_ports(self, ports):
299 """Record the ports that this kernel is using.
303 """Record the ports that this kernel is using.
300
304
301 The creator of the Kernel instance must call this methods if they
305 The creator of the Kernel instance must call this methods if they
302 want the :meth:`connect_request` method to return the port numbers.
306 want the :meth:`connect_request` method to return the port numbers.
303 """
307 """
304 self._recorded_ports = ports
308 self._recorded_ports = ports
305
309
306 #---------------------------------------------------------------------------
310 #---------------------------------------------------------------------------
307 # Kernel request handlers
311 # Kernel request handlers
308 #---------------------------------------------------------------------------
312 #---------------------------------------------------------------------------
309
313
310 def _make_metadata(self, other=None):
314 def _make_metadata(self, other=None):
311 """init metadata dict, for execute/apply_reply"""
315 """init metadata dict, for execute/apply_reply"""
312 new_md = {
316 new_md = {
313 'dependencies_met' : True,
317 'dependencies_met' : True,
314 'engine' : self.ident,
318 'engine' : self.ident,
315 'started': datetime.now(),
319 'started': datetime.now(),
316 }
320 }
317 if other:
321 if other:
318 new_md.update(other)
322 new_md.update(other)
319 return new_md
323 return new_md
320
324
321 def _publish_execute_input(self, code, parent, execution_count):
325 def _publish_execute_input(self, code, parent, execution_count):
322 """Publish the code request on the iopub stream."""
326 """Publish the code request on the iopub stream."""
323
327
324 self.session.send(self.iopub_socket, u'execute_input',
328 self.session.send(self.iopub_socket, u'execute_input',
325 {u'code':code, u'execution_count': execution_count},
329 {u'code':code, u'execution_count': execution_count},
326 parent=parent, ident=self._topic('execute_input')
330 parent=parent, ident=self._topic('execute_input')
327 )
331 )
328
332
329 def _publish_status(self, status, parent=None):
333 def _publish_status(self, status, parent=None):
330 """send status (busy/idle) on IOPub"""
334 """send status (busy/idle) on IOPub"""
331 self.session.send(self.iopub_socket,
335 self.session.send(self.iopub_socket,
332 u'status',
336 u'status',
333 {u'execution_state': status},
337 {u'execution_state': status},
334 parent=parent,
338 parent=parent,
335 ident=self._topic('status'),
339 ident=self._topic('status'),
336 )
340 )
337
341
338 def _forward_raw_input(self, ident, parent, allow_stdin):
342 def _forward_input(self, allow_stdin=False):
339 """Replace raw_input. Note that is not sufficient to replace
343 """Forward raw_input and getpass to the current frontend.
340 raw_input in the user namespace.
344
345 via input_request
341 """
346 """
347 self._allow_stdin = allow_stdin
342
348
343 if allow_stdin:
344 raw_input = lambda prompt='': self._raw_input(prompt, ident, parent)
345 input = lambda prompt='': eval(raw_input(prompt))
346 _getpass = lambda prompt='', stream=None: self._raw_input(
347 prompt, ident, parent, password=True
348 )
349 else:
350 _getpass = raw_input = input = lambda prompt='' : self._no_raw_input()
351
352
353 if py3compat.PY3:
349 if py3compat.PY3:
354 self._sys_raw_input = builtin_mod.input
350 self._sys_raw_input = builtin_mod.input
355 builtin_mod.input = raw_input
351 builtin_mod.input = self.raw_input
356 else:
352 else:
357 self._sys_raw_input = builtin_mod.raw_input
353 self._sys_raw_input = builtin_mod.raw_input
358 self._sys_eval_input = builtin_mod.input
354 self._sys_eval_input = builtin_mod.input
359 builtin_mod.raw_input = raw_input
355 builtin_mod.raw_input = self.raw_input
360 builtin_mod.input = input
356 builtin_mod.input = lambda prompt='': eval(self.raw_input(prompt))
361 self._save_getpass = getpass.getpass
357 self._save_getpass = getpass.getpass
362 getpass.getpass = _getpass
358 getpass.getpass = self.getpass
363
359
364 def _restore_raw_input(self):
360 def _restore_input(self):
365 # Restore raw_input
361 """Restore raw_input, getpass"""
366 if py3compat.PY3:
362 if py3compat.PY3:
367 builtin_mod.input = self._sys_raw_input
363 builtin_mod.input = self._sys_raw_input
368 else:
364 else:
369 builtin_mod.raw_input = self._sys_raw_input
365 builtin_mod.raw_input = self._sys_raw_input
370 builtin_mod.input = self._sys_eval_input
366 builtin_mod.input = self._sys_eval_input
371
367
372 getpass.getpass = self._save_getpass
368 getpass.getpass = self._save_getpass
373
369
370 def set_parent(self, ident, parent):
371 """Record the parent state
372
373 For associating side effects with their requests.
374 """
375 self._parent_ident = ident
376 self._parent_header = parent
377 self.shell.set_parent(parent)
378
374 def execute_request(self, stream, ident, parent):
379 def execute_request(self, stream, ident, parent):
375 """handle an execute_request"""
380 """handle an execute_request"""
376
381
377 self._publish_status(u'busy', parent)
382 self._publish_status(u'busy', parent)
378
383
379 try:
384 try:
380 content = parent[u'content']
385 content = parent[u'content']
381 code = py3compat.cast_unicode_py2(content[u'code'])
386 code = py3compat.cast_unicode_py2(content[u'code'])
382 silent = content[u'silent']
387 silent = content[u'silent']
383 store_history = content.get(u'store_history', not silent)
388 store_history = content.get(u'store_history', not silent)
384 except:
389 except:
385 self.log.error("Got bad msg: ")
390 self.log.error("Got bad msg: ")
386 self.log.error("%s", parent)
391 self.log.error("%s", parent)
387 return
392 return
388
393
389 md = self._make_metadata(parent['metadata'])
394 md = self._make_metadata(parent['metadata'])
390
395
391 shell = self.shell # we'll need this a lot here
396 shell = self.shell # we'll need this a lot here
392
397
393 self._forward_raw_input(ident, parent, content.get('allow_stdin', False))
398 self._forward_input(content.get('allow_stdin', False))
394
395 # Set the parent message of the display hook and out streams.
399 # Set the parent message of the display hook and out streams.
396 shell.set_parent(parent)
400 self.set_parent(ident, parent)
397
401
398 # Re-broadcast our input for the benefit of listening clients, and
402 # Re-broadcast our input for the benefit of listening clients, and
399 # start computing output
403 # start computing output
400 if not silent:
404 if not silent:
401 self._publish_execute_input(code, parent, shell.execution_count)
405 self._publish_execute_input(code, parent, shell.execution_count)
402
406
403 reply_content = {}
407 reply_content = {}
404 # FIXME: the shell calls the exception handler itself.
408 # FIXME: the shell calls the exception handler itself.
405 shell._reply_content = None
409 shell._reply_content = None
406 try:
410 try:
407 shell.run_cell(code, store_history=store_history, silent=silent)
411 shell.run_cell(code, store_history=store_history, silent=silent)
408 except:
412 except:
409 status = u'error'
413 status = u'error'
410 # FIXME: this code right now isn't being used yet by default,
414 # FIXME: this code right now isn't being used yet by default,
411 # because the run_cell() call above directly fires off exception
415 # because the run_cell() call above directly fires off exception
412 # reporting. This code, therefore, is only active in the scenario
416 # reporting. This code, therefore, is only active in the scenario
413 # where runlines itself has an unhandled exception. We need to
417 # where runlines itself has an unhandled exception. We need to
414 # uniformize this, for all exception construction to come from a
418 # uniformize this, for all exception construction to come from a
415 # single location in the codbase.
419 # single location in the codbase.
416 etype, evalue, tb = sys.exc_info()
420 etype, evalue, tb = sys.exc_info()
417 tb_list = traceback.format_exception(etype, evalue, tb)
421 tb_list = traceback.format_exception(etype, evalue, tb)
418 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
422 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
419 else:
423 else:
420 status = u'ok'
424 status = u'ok'
421 finally:
425 finally:
422 self._restore_raw_input()
426 self._restore_input()
423
427
424 reply_content[u'status'] = status
428 reply_content[u'status'] = status
425
429
426 # Return the execution counter so clients can display prompts
430 # Return the execution counter so clients can display prompts
427 reply_content['execution_count'] = shell.execution_count - 1
431 reply_content['execution_count'] = shell.execution_count - 1
428
432
429 # FIXME - fish exception info out of shell, possibly left there by
433 # FIXME - fish exception info out of shell, possibly left there by
430 # runlines. We'll need to clean up this logic later.
434 # runlines. We'll need to clean up this logic later.
431 if shell._reply_content is not None:
435 if shell._reply_content is not None:
432 reply_content.update(shell._reply_content)
436 reply_content.update(shell._reply_content)
433 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
437 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
434 reply_content['engine_info'] = e_info
438 reply_content['engine_info'] = e_info
435 # reset after use
439 # reset after use
436 shell._reply_content = None
440 shell._reply_content = None
437
441
438 if 'traceback' in reply_content:
442 if 'traceback' in reply_content:
439 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
443 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
440
444
441
445
442 # At this point, we can tell whether the main code execution succeeded
446 # At this point, we can tell whether the main code execution succeeded
443 # or not. If it did, we proceed to evaluate user_expressions
447 # or not. If it did, we proceed to evaluate user_expressions
444 if reply_content['status'] == 'ok':
448 if reply_content['status'] == 'ok':
445 reply_content[u'user_expressions'] = \
449 reply_content[u'user_expressions'] = \
446 shell.user_expressions(content.get(u'user_expressions', {}))
450 shell.user_expressions(content.get(u'user_expressions', {}))
447 else:
451 else:
448 # If there was an error, don't even try to compute expressions
452 # If there was an error, don't even try to compute expressions
449 reply_content[u'user_expressions'] = {}
453 reply_content[u'user_expressions'] = {}
450
454
451 # Payloads should be retrieved regardless of outcome, so we can both
455 # Payloads should be retrieved regardless of outcome, so we can both
452 # recover partial output (that could have been generated early in a
456 # recover partial output (that could have been generated early in a
453 # block, before an error) and clear the payload system always.
457 # block, before an error) and clear the payload system always.
454 reply_content[u'payload'] = shell.payload_manager.read_payload()
458 reply_content[u'payload'] = shell.payload_manager.read_payload()
455 # Be agressive about clearing the payload because we don't want
459 # Be agressive about clearing the payload because we don't want
456 # it to sit in memory until the next execute_request comes in.
460 # it to sit in memory until the next execute_request comes in.
457 shell.payload_manager.clear_payload()
461 shell.payload_manager.clear_payload()
458
462
459 # Flush output before sending the reply.
463 # Flush output before sending the reply.
460 sys.stdout.flush()
464 sys.stdout.flush()
461 sys.stderr.flush()
465 sys.stderr.flush()
462 # FIXME: on rare occasions, the flush doesn't seem to make it to the
466 # FIXME: on rare occasions, the flush doesn't seem to make it to the
463 # clients... This seems to mitigate the problem, but we definitely need
467 # clients... This seems to mitigate the problem, but we definitely need
464 # to better understand what's going on.
468 # to better understand what's going on.
465 if self._execute_sleep:
469 if self._execute_sleep:
466 time.sleep(self._execute_sleep)
470 time.sleep(self._execute_sleep)
467
471
468 # Send the reply.
472 # Send the reply.
469 reply_content = json_clean(reply_content)
473 reply_content = json_clean(reply_content)
470
474
471 md['status'] = reply_content['status']
475 md['status'] = reply_content['status']
472 if reply_content['status'] == 'error' and \
476 if reply_content['status'] == 'error' and \
473 reply_content['ename'] == 'UnmetDependency':
477 reply_content['ename'] == 'UnmetDependency':
474 md['dependencies_met'] = False
478 md['dependencies_met'] = False
475
479
476 reply_msg = self.session.send(stream, u'execute_reply',
480 reply_msg = self.session.send(stream, u'execute_reply',
477 reply_content, parent, metadata=md,
481 reply_content, parent, metadata=md,
478 ident=ident)
482 ident=ident)
479
483
480 self.log.debug("%s", reply_msg)
484 self.log.debug("%s", reply_msg)
481
485
482 if not silent and reply_msg['content']['status'] == u'error':
486 if not silent and reply_msg['content']['status'] == u'error':
483 self._abort_queues()
487 self._abort_queues()
484
488
485 self._publish_status(u'idle', parent)
489 self._publish_status(u'idle', parent)
486
490
487 def complete_request(self, stream, ident, parent):
491 def complete_request(self, stream, ident, parent):
488 txt, matches = self._complete(parent)
492 txt, matches = self._complete(parent)
489 matches = {'matches' : matches,
493 matches = {'matches' : matches,
490 'matched_text' : txt,
494 'matched_text' : txt,
491 'status' : 'ok'}
495 'status' : 'ok'}
492 matches = json_clean(matches)
496 matches = json_clean(matches)
493 completion_msg = self.session.send(stream, 'complete_reply',
497 completion_msg = self.session.send(stream, 'complete_reply',
494 matches, parent, ident)
498 matches, parent, ident)
495 self.log.debug("%s", completion_msg)
499 self.log.debug("%s", completion_msg)
496
500
497 def object_info_request(self, stream, ident, parent):
501 def object_info_request(self, stream, ident, parent):
498 content = parent['content']
502 content = parent['content']
499 object_info = self.shell.object_inspect(content['oname'],
503 object_info = self.shell.object_inspect(content['oname'],
500 detail_level = content.get('detail_level', 0)
504 detail_level = content.get('detail_level', 0)
501 )
505 )
502 # Before we send this object over, we scrub it for JSON usage
506 # Before we send this object over, we scrub it for JSON usage
503 oinfo = json_clean(object_info)
507 oinfo = json_clean(object_info)
504 msg = self.session.send(stream, 'object_info_reply',
508 msg = self.session.send(stream, 'object_info_reply',
505 oinfo, parent, ident)
509 oinfo, parent, ident)
506 self.log.debug("%s", msg)
510 self.log.debug("%s", msg)
507
511
508 def history_request(self, stream, ident, parent):
512 def history_request(self, stream, ident, parent):
509 # We need to pull these out, as passing **kwargs doesn't work with
513 # We need to pull these out, as passing **kwargs doesn't work with
510 # unicode keys before Python 2.6.5.
514 # unicode keys before Python 2.6.5.
511 hist_access_type = parent['content']['hist_access_type']
515 hist_access_type = parent['content']['hist_access_type']
512 raw = parent['content']['raw']
516 raw = parent['content']['raw']
513 output = parent['content']['output']
517 output = parent['content']['output']
514 if hist_access_type == 'tail':
518 if hist_access_type == 'tail':
515 n = parent['content']['n']
519 n = parent['content']['n']
516 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
520 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
517 include_latest=True)
521 include_latest=True)
518
522
519 elif hist_access_type == 'range':
523 elif hist_access_type == 'range':
520 session = parent['content']['session']
524 session = parent['content']['session']
521 start = parent['content']['start']
525 start = parent['content']['start']
522 stop = parent['content']['stop']
526 stop = parent['content']['stop']
523 hist = self.shell.history_manager.get_range(session, start, stop,
527 hist = self.shell.history_manager.get_range(session, start, stop,
524 raw=raw, output=output)
528 raw=raw, output=output)
525
529
526 elif hist_access_type == 'search':
530 elif hist_access_type == 'search':
527 n = parent['content'].get('n')
531 n = parent['content'].get('n')
528 unique = parent['content'].get('unique', False)
532 unique = parent['content'].get('unique', False)
529 pattern = parent['content']['pattern']
533 pattern = parent['content']['pattern']
530 hist = self.shell.history_manager.search(
534 hist = self.shell.history_manager.search(
531 pattern, raw=raw, output=output, n=n, unique=unique)
535 pattern, raw=raw, output=output, n=n, unique=unique)
532
536
533 else:
537 else:
534 hist = []
538 hist = []
535 hist = list(hist)
539 hist = list(hist)
536 content = {'history' : hist}
540 content = {'history' : hist}
537 content = json_clean(content)
541 content = json_clean(content)
538 msg = self.session.send(stream, 'history_reply',
542 msg = self.session.send(stream, 'history_reply',
539 content, parent, ident)
543 content, parent, ident)
540 self.log.debug("Sending history reply with %i entries", len(hist))
544 self.log.debug("Sending history reply with %i entries", len(hist))
541
545
542 def connect_request(self, stream, ident, parent):
546 def connect_request(self, stream, ident, parent):
543 if self._recorded_ports is not None:
547 if self._recorded_ports is not None:
544 content = self._recorded_ports.copy()
548 content = self._recorded_ports.copy()
545 else:
549 else:
546 content = {}
550 content = {}
547 msg = self.session.send(stream, 'connect_reply',
551 msg = self.session.send(stream, 'connect_reply',
548 content, parent, ident)
552 content, parent, ident)
549 self.log.debug("%s", msg)
553 self.log.debug("%s", msg)
550
554
551 def kernel_info_request(self, stream, ident, parent):
555 def kernel_info_request(self, stream, ident, parent):
552 vinfo = {
556 vinfo = {
553 'protocol_version': protocol_version,
557 'protocol_version': protocol_version,
554 'ipython_version': ipython_version,
558 'ipython_version': ipython_version,
555 'language_version': language_version,
559 'language_version': language_version,
556 'language': 'python',
560 'language': 'python',
557 }
561 }
558 msg = self.session.send(stream, 'kernel_info_reply',
562 msg = self.session.send(stream, 'kernel_info_reply',
559 vinfo, parent, ident)
563 vinfo, parent, ident)
560 self.log.debug("%s", msg)
564 self.log.debug("%s", msg)
561
565
562 def shutdown_request(self, stream, ident, parent):
566 def shutdown_request(self, stream, ident, parent):
563 self.shell.exit_now = True
567 self.shell.exit_now = True
564 content = dict(status='ok')
568 content = dict(status='ok')
565 content.update(parent['content'])
569 content.update(parent['content'])
566 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
570 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
567 # same content, but different msg_id for broadcasting on IOPub
571 # same content, but different msg_id for broadcasting on IOPub
568 self._shutdown_message = self.session.msg(u'shutdown_reply',
572 self._shutdown_message = self.session.msg(u'shutdown_reply',
569 content, parent
573 content, parent
570 )
574 )
571
575
572 self._at_shutdown()
576 self._at_shutdown()
573 # call sys.exit after a short delay
577 # call sys.exit after a short delay
574 loop = ioloop.IOLoop.instance()
578 loop = ioloop.IOLoop.instance()
575 loop.add_timeout(time.time()+0.1, loop.stop)
579 loop.add_timeout(time.time()+0.1, loop.stop)
576
580
577 #---------------------------------------------------------------------------
581 #---------------------------------------------------------------------------
578 # Engine methods
582 # Engine methods
579 #---------------------------------------------------------------------------
583 #---------------------------------------------------------------------------
580
584
581 def apply_request(self, stream, ident, parent):
585 def apply_request(self, stream, ident, parent):
582 try:
586 try:
583 content = parent[u'content']
587 content = parent[u'content']
584 bufs = parent[u'buffers']
588 bufs = parent[u'buffers']
585 msg_id = parent['header']['msg_id']
589 msg_id = parent['header']['msg_id']
586 except:
590 except:
587 self.log.error("Got bad msg: %s", parent, exc_info=True)
591 self.log.error("Got bad msg: %s", parent, exc_info=True)
588 return
592 return
589
593
590 self._publish_status(u'busy', parent)
594 self._publish_status(u'busy', parent)
591
595
592 # Set the parent message of the display hook and out streams.
596 # Set the parent message of the display hook and out streams.
593 shell = self.shell
597 shell = self.shell
594 shell.set_parent(parent)
598 shell.set_parent(parent)
595
599
596 # execute_input_msg = self.session.msg(u'execute_input',{u'code':code}, parent=parent)
600 # execute_input_msg = self.session.msg(u'execute_input',{u'code':code}, parent=parent)
597 # self.iopub_socket.send(execute_input_msg)
601 # self.iopub_socket.send(execute_input_msg)
598 # self.session.send(self.iopub_socket, u'execute_input', {u'code':code},parent=parent)
602 # self.session.send(self.iopub_socket, u'execute_input', {u'code':code},parent=parent)
599 md = self._make_metadata(parent['metadata'])
603 md = self._make_metadata(parent['metadata'])
600 try:
604 try:
601 working = shell.user_ns
605 working = shell.user_ns
602
606
603 prefix = "_"+str(msg_id).replace("-","")+"_"
607 prefix = "_"+str(msg_id).replace("-","")+"_"
604
608
605 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
609 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
606
610
607 fname = getattr(f, '__name__', 'f')
611 fname = getattr(f, '__name__', 'f')
608
612
609 fname = prefix+"f"
613 fname = prefix+"f"
610 argname = prefix+"args"
614 argname = prefix+"args"
611 kwargname = prefix+"kwargs"
615 kwargname = prefix+"kwargs"
612 resultname = prefix+"result"
616 resultname = prefix+"result"
613
617
614 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
618 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
615 # print ns
619 # print ns
616 working.update(ns)
620 working.update(ns)
617 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
621 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
618 try:
622 try:
619 exec(code, shell.user_global_ns, shell.user_ns)
623 exec(code, shell.user_global_ns, shell.user_ns)
620 result = working.get(resultname)
624 result = working.get(resultname)
621 finally:
625 finally:
622 for key in ns:
626 for key in ns:
623 working.pop(key)
627 working.pop(key)
624
628
625 result_buf = serialize_object(result,
629 result_buf = serialize_object(result,
626 buffer_threshold=self.session.buffer_threshold,
630 buffer_threshold=self.session.buffer_threshold,
627 item_threshold=self.session.item_threshold,
631 item_threshold=self.session.item_threshold,
628 )
632 )
629
633
630 except:
634 except:
631 # invoke IPython traceback formatting
635 # invoke IPython traceback formatting
632 shell.showtraceback()
636 shell.showtraceback()
633 # FIXME - fish exception info out of shell, possibly left there by
637 # FIXME - fish exception info out of shell, possibly left there by
634 # run_code. We'll need to clean up this logic later.
638 # run_code. We'll need to clean up this logic later.
635 reply_content = {}
639 reply_content = {}
636 if shell._reply_content is not None:
640 if shell._reply_content is not None:
637 reply_content.update(shell._reply_content)
641 reply_content.update(shell._reply_content)
638 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
642 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
639 reply_content['engine_info'] = e_info
643 reply_content['engine_info'] = e_info
640 # reset after use
644 # reset after use
641 shell._reply_content = None
645 shell._reply_content = None
642
646
643 self.session.send(self.iopub_socket, u'error', reply_content, parent=parent,
647 self.session.send(self.iopub_socket, u'error', reply_content, parent=parent,
644 ident=self._topic('error'))
648 ident=self._topic('error'))
645 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
649 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
646 result_buf = []
650 result_buf = []
647
651
648 if reply_content['ename'] == 'UnmetDependency':
652 if reply_content['ename'] == 'UnmetDependency':
649 md['dependencies_met'] = False
653 md['dependencies_met'] = False
650 else:
654 else:
651 reply_content = {'status' : 'ok'}
655 reply_content = {'status' : 'ok'}
652
656
653 # put 'ok'/'error' status in header, for scheduler introspection:
657 # put 'ok'/'error' status in header, for scheduler introspection:
654 md['status'] = reply_content['status']
658 md['status'] = reply_content['status']
655
659
656 # flush i/o
660 # flush i/o
657 sys.stdout.flush()
661 sys.stdout.flush()
658 sys.stderr.flush()
662 sys.stderr.flush()
659
663
660 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
664 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
661 parent=parent, ident=ident,buffers=result_buf, metadata=md)
665 parent=parent, ident=ident,buffers=result_buf, metadata=md)
662
666
663 self._publish_status(u'idle', parent)
667 self._publish_status(u'idle', parent)
664
668
665 #---------------------------------------------------------------------------
669 #---------------------------------------------------------------------------
666 # Control messages
670 # Control messages
667 #---------------------------------------------------------------------------
671 #---------------------------------------------------------------------------
668
672
669 def abort_request(self, stream, ident, parent):
673 def abort_request(self, stream, ident, parent):
670 """abort a specifig msg by id"""
674 """abort a specifig msg by id"""
671 msg_ids = parent['content'].get('msg_ids', None)
675 msg_ids = parent['content'].get('msg_ids', None)
672 if isinstance(msg_ids, string_types):
676 if isinstance(msg_ids, string_types):
673 msg_ids = [msg_ids]
677 msg_ids = [msg_ids]
674 if not msg_ids:
678 if not msg_ids:
675 self.abort_queues()
679 self.abort_queues()
676 for mid in msg_ids:
680 for mid in msg_ids:
677 self.aborted.add(str(mid))
681 self.aborted.add(str(mid))
678
682
679 content = dict(status='ok')
683 content = dict(status='ok')
680 reply_msg = self.session.send(stream, 'abort_reply', content=content,
684 reply_msg = self.session.send(stream, 'abort_reply', content=content,
681 parent=parent, ident=ident)
685 parent=parent, ident=ident)
682 self.log.debug("%s", reply_msg)
686 self.log.debug("%s", reply_msg)
683
687
684 def clear_request(self, stream, idents, parent):
688 def clear_request(self, stream, idents, parent):
685 """Clear our namespace."""
689 """Clear our namespace."""
686 self.shell.reset(False)
690 self.shell.reset(False)
687 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
691 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
688 content = dict(status='ok'))
692 content = dict(status='ok'))
689
693
690
694
691 #---------------------------------------------------------------------------
695 #---------------------------------------------------------------------------
692 # Protected interface
696 # Protected interface
693 #---------------------------------------------------------------------------
697 #---------------------------------------------------------------------------
694
698
695 def _wrap_exception(self, method=None):
699 def _wrap_exception(self, method=None):
696 # import here, because _wrap_exception is only used in parallel,
700 # import here, because _wrap_exception is only used in parallel,
697 # and parallel has higher min pyzmq version
701 # and parallel has higher min pyzmq version
698 from IPython.parallel.error import wrap_exception
702 from IPython.parallel.error import wrap_exception
699 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
703 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
700 content = wrap_exception(e_info)
704 content = wrap_exception(e_info)
701 return content
705 return content
702
706
703 def _topic(self, topic):
707 def _topic(self, topic):
704 """prefixed topic for IOPub messages"""
708 """prefixed topic for IOPub messages"""
705 if self.int_id >= 0:
709 if self.int_id >= 0:
706 base = "engine.%i" % self.int_id
710 base = "engine.%i" % self.int_id
707 else:
711 else:
708 base = "kernel.%s" % self.ident
712 base = "kernel.%s" % self.ident
709
713
710 return py3compat.cast_bytes("%s.%s" % (base, topic))
714 return py3compat.cast_bytes("%s.%s" % (base, topic))
711
715
712 def _abort_queues(self):
716 def _abort_queues(self):
713 for stream in self.shell_streams:
717 for stream in self.shell_streams:
714 if stream:
718 if stream:
715 self._abort_queue(stream)
719 self._abort_queue(stream)
716
720
717 def _abort_queue(self, stream):
721 def _abort_queue(self, stream):
718 poller = zmq.Poller()
722 poller = zmq.Poller()
719 poller.register(stream.socket, zmq.POLLIN)
723 poller.register(stream.socket, zmq.POLLIN)
720 while True:
724 while True:
721 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
725 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
722 if msg is None:
726 if msg is None:
723 return
727 return
724
728
725 self.log.info("Aborting:")
729 self.log.info("Aborting:")
726 self.log.info("%s", msg)
730 self.log.info("%s", msg)
727 msg_type = msg['header']['msg_type']
731 msg_type = msg['header']['msg_type']
728 reply_type = msg_type.split('_')[0] + '_reply'
732 reply_type = msg_type.split('_')[0] + '_reply'
729
733
730 status = {'status' : 'aborted'}
734 status = {'status' : 'aborted'}
731 md = {'engine' : self.ident}
735 md = {'engine' : self.ident}
732 md.update(status)
736 md.update(status)
733 reply_msg = self.session.send(stream, reply_type, metadata=md,
737 reply_msg = self.session.send(stream, reply_type, metadata=md,
734 content=status, parent=msg, ident=idents)
738 content=status, parent=msg, ident=idents)
735 self.log.debug("%s", reply_msg)
739 self.log.debug("%s", reply_msg)
736 # We need to wait a bit for requests to come in. This can probably
740 # We need to wait a bit for requests to come in. This can probably
737 # be set shorter for true asynchronous clients.
741 # be set shorter for true asynchronous clients.
738 poller.poll(50)
742 poller.poll(50)
739
743
740
744
741 def _no_raw_input(self):
745 def _no_raw_input(self):
742 """Raise StdinNotImplentedError if active frontend doesn't support
746 """Raise StdinNotImplentedError if active frontend doesn't support
743 stdin."""
747 stdin."""
744 raise StdinNotImplementedError("raw_input was called, but this "
748 raise StdinNotImplementedError("raw_input was called, but this "
745 "frontend does not support stdin.")
749 "frontend does not support stdin.")
746
750
747 def _raw_input(self, prompt, ident, parent, password=False):
751 def getpass(self, prompt=''):
752 """Forward getpass to frontends
753
754 Raises
755 ------
756 StdinNotImplentedError if active frontend doesn't support stdin.
757 """
758 if not self._allow_stdin:
759 raise StdinNotImplementedError(
760 "getpass was called, but this frontend does not support input requests."
761 )
762 return self._input_request(prompt,
763 self._parent_ident,
764 self._parent_header,
765 password=True,
766 )
767
768 def raw_input(self, prompt=''):
769 """Forward raw_input to frontends
770
771 Raises
772 ------
773 StdinNotImplentedError if active frontend doesn't support stdin.
774 """
775 if not self._allow_stdin:
776 raise StdinNotImplementedError(
777 "raw_input was called, but this frontend does not support input requests."
778 )
779 return self._input_request(prompt,
780 self._parent_ident,
781 self._parent_header,
782 password=False,
783 )
784
785 def _input_request(self, prompt, ident, parent, password=False):
748 # Flush output before making the request.
786 # Flush output before making the request.
749 sys.stderr.flush()
787 sys.stderr.flush()
750 sys.stdout.flush()
788 sys.stdout.flush()
751 # flush the stdin socket, to purge stale replies
789 # flush the stdin socket, to purge stale replies
752 while True:
790 while True:
753 try:
791 try:
754 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
792 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
755 except zmq.ZMQError as e:
793 except zmq.ZMQError as e:
756 if e.errno == zmq.EAGAIN:
794 if e.errno == zmq.EAGAIN:
757 break
795 break
758 else:
796 else:
759 raise
797 raise
760
798
761 # Send the input request.
799 # Send the input request.
762 content = json_clean(dict(prompt=prompt, password=password))
800 content = json_clean(dict(prompt=prompt, password=password))
763 self.session.send(self.stdin_socket, u'input_request', content, parent,
801 self.session.send(self.stdin_socket, u'input_request', content, parent,
764 ident=ident)
802 ident=ident)
765
803
766 # Await a response.
804 # Await a response.
767 while True:
805 while True:
768 try:
806 try:
769 ident, reply = self.session.recv(self.stdin_socket, 0)
807 ident, reply = self.session.recv(self.stdin_socket, 0)
770 except Exception:
808 except Exception:
771 self.log.warn("Invalid Message:", exc_info=True)
809 self.log.warn("Invalid Message:", exc_info=True)
772 except KeyboardInterrupt:
810 except KeyboardInterrupt:
773 # re-raise KeyboardInterrupt, to truncate traceback
811 # re-raise KeyboardInterrupt, to truncate traceback
774 raise KeyboardInterrupt
812 raise KeyboardInterrupt
775 else:
813 else:
776 break
814 break
777 try:
815 try:
778 value = py3compat.unicode_to_str(reply['content']['value'])
816 value = py3compat.unicode_to_str(reply['content']['value'])
779 except:
817 except:
780 self.log.error("Got bad raw_input reply: ")
818 self.log.error("Bad input_reply: %s", parent)
781 self.log.error("%s", parent)
782 value = ''
819 value = ''
783 if value == '\x04':
820 if value == '\x04':
784 # EOF
821 # EOF
785 raise EOFError
822 raise EOFError
786 return value
823 return value
787
824
788 def _complete(self, msg):
825 def _complete(self, msg):
789 c = msg['content']
826 c = msg['content']
790 try:
827 try:
791 cpos = int(c['cursor_pos'])
828 cpos = int(c['cursor_pos'])
792 except:
829 except:
793 # If we don't get something that we can convert to an integer, at
830 # If we don't get something that we can convert to an integer, at
794 # least attempt the completion guessing the cursor is at the end of
831 # least attempt the completion guessing the cursor is at the end of
795 # the text, if there's any, and otherwise of the line
832 # the text, if there's any, and otherwise of the line
796 cpos = len(c['text'])
833 cpos = len(c['text'])
797 if cpos==0:
834 if cpos==0:
798 cpos = len(c['line'])
835 cpos = len(c['line'])
799 return self.shell.complete(c['text'], c['line'], cpos)
836 return self.shell.complete(c['text'], c['line'], cpos)
800
837
801 def _at_shutdown(self):
838 def _at_shutdown(self):
802 """Actions taken at shutdown by the kernel, called by python's atexit.
839 """Actions taken at shutdown by the kernel, called by python's atexit.
803 """
840 """
804 # io.rprint("Kernel at_shutdown") # dbg
841 # io.rprint("Kernel at_shutdown") # dbg
805 if self._shutdown_message is not None:
842 if self._shutdown_message is not None:
806 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
843 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
807 self.log.debug("%s", self._shutdown_message)
844 self.log.debug("%s", self._shutdown_message)
808 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
845 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
809
846
General Comments 0
You need to be logged in to leave comments. Login now