##// END OF EJS Templates
Merge pull request #6374 from dhirschfeld/about_queues...
Min RK -
r17746:13e42d67 merge
parent child Browse files
Show More
@@ -1,677 +1,676 b''
1 """Base class for a kernel that talks to frontends over 0MQ."""
1 """Base class for a kernel that talks to frontends over 0MQ."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from __future__ import print_function
6 from __future__ import print_function
7
7
8 import sys
8 import sys
9 import time
9 import time
10 import logging
10 import logging
11 import uuid
11 import uuid
12
12
13 from datetime import datetime
13 from datetime import datetime
14 from signal import (
14 from signal import (
15 signal, default_int_handler, SIGINT
15 signal, default_int_handler, SIGINT
16 )
16 )
17
17
18 import zmq
18 import zmq
19 from zmq.eventloop import ioloop
19 from zmq.eventloop import ioloop
20 from zmq.eventloop.zmqstream import ZMQStream
20 from zmq.eventloop.zmqstream import ZMQStream
21
21
22 from IPython.config.configurable import Configurable
22 from IPython.config.configurable import Configurable
23 from IPython.core.error import StdinNotImplementedError
23 from IPython.core.error import StdinNotImplementedError
24 from IPython.core import release
24 from IPython.core import release
25 from IPython.utils import py3compat
25 from IPython.utils import py3compat
26 from IPython.utils.py3compat import unicode_type, string_types
26 from IPython.utils.py3compat import unicode_type, string_types
27 from IPython.utils.jsonutil import json_clean
27 from IPython.utils.jsonutil import json_clean
28 from IPython.utils.traitlets import (
28 from IPython.utils.traitlets import (
29 Any, Instance, Float, Dict, List, Set, Integer, Unicode, Bool,
29 Any, Instance, Float, Dict, List, Set, Integer, Unicode, Bool,
30 )
30 )
31
31
32 from .session import Session
32 from .session import Session
33
33
34
34
35 class Kernel(Configurable):
35 class Kernel(Configurable):
36
36
37 #---------------------------------------------------------------------------
37 #---------------------------------------------------------------------------
38 # Kernel interface
38 # Kernel interface
39 #---------------------------------------------------------------------------
39 #---------------------------------------------------------------------------
40
40
41 # attribute to override with a GUI
41 # attribute to override with a GUI
42 eventloop = Any(None)
42 eventloop = Any(None)
43 def _eventloop_changed(self, name, old, new):
43 def _eventloop_changed(self, name, old, new):
44 """schedule call to eventloop from IOLoop"""
44 """schedule call to eventloop from IOLoop"""
45 loop = ioloop.IOLoop.instance()
45 loop = ioloop.IOLoop.instance()
46 loop.add_callback(self.enter_eventloop)
46 loop.add_callback(self.enter_eventloop)
47
47
48 session = Instance(Session)
48 session = Instance(Session)
49 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
49 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
50 shell_streams = List()
50 shell_streams = List()
51 control_stream = Instance(ZMQStream)
51 control_stream = Instance(ZMQStream)
52 iopub_socket = Instance(zmq.Socket)
52 iopub_socket = Instance(zmq.Socket)
53 stdin_socket = Instance(zmq.Socket)
53 stdin_socket = Instance(zmq.Socket)
54 log = Instance(logging.Logger)
54 log = Instance(logging.Logger)
55
55
56 # identities:
56 # identities:
57 int_id = Integer(-1)
57 int_id = Integer(-1)
58 ident = Unicode()
58 ident = Unicode()
59
59
60 def _ident_default(self):
60 def _ident_default(self):
61 return unicode_type(uuid.uuid4())
61 return unicode_type(uuid.uuid4())
62
62
63 # Private interface
63 # Private interface
64
64
65 _darwin_app_nap = Bool(True, config=True,
65 _darwin_app_nap = Bool(True, config=True,
66 help="""Whether to use appnope for compatiblity with OS X App Nap.
66 help="""Whether to use appnope for compatiblity with OS X App Nap.
67
67
68 Only affects OS X >= 10.9.
68 Only affects OS X >= 10.9.
69 """
69 """
70 )
70 )
71
71
72 # track associations with current request
72 # track associations with current request
73 _allow_stdin = Bool(False)
73 _allow_stdin = Bool(False)
74 _parent_header = Dict()
74 _parent_header = Dict()
75 _parent_ident = Any(b'')
75 _parent_ident = Any(b'')
76 # Time to sleep after flushing the stdout/err buffers in each execute
76 # Time to sleep after flushing the stdout/err buffers in each execute
77 # cycle. While this introduces a hard limit on the minimal latency of the
77 # cycle. While this introduces a hard limit on the minimal latency of the
78 # execute cycle, it helps prevent output synchronization problems for
78 # execute cycle, it helps prevent output synchronization problems for
79 # clients.
79 # clients.
80 # Units are in seconds. The minimum zmq latency on local host is probably
80 # Units are in seconds. The minimum zmq latency on local host is probably
81 # ~150 microseconds, set this to 500us for now. We may need to increase it
81 # ~150 microseconds, set this to 500us for now. We may need to increase it
82 # a little if it's not enough after more interactive testing.
82 # a little if it's not enough after more interactive testing.
83 _execute_sleep = Float(0.0005, config=True)
83 _execute_sleep = Float(0.0005, config=True)
84
84
85 # Frequency of the kernel's event loop.
85 # Frequency of the kernel's event loop.
86 # Units are in seconds, kernel subclasses for GUI toolkits may need to
86 # Units are in seconds, kernel subclasses for GUI toolkits may need to
87 # adapt to milliseconds.
87 # adapt to milliseconds.
88 _poll_interval = Float(0.05, config=True)
88 _poll_interval = Float(0.05, config=True)
89
89
90 # If the shutdown was requested over the network, we leave here the
90 # If the shutdown was requested over the network, we leave here the
91 # necessary reply message so it can be sent by our registered atexit
91 # necessary reply message so it can be sent by our registered atexit
92 # handler. This ensures that the reply is only sent to clients truly at
92 # handler. This ensures that the reply is only sent to clients truly at
93 # the end of our shutdown process (which happens after the underlying
93 # the end of our shutdown process (which happens after the underlying
94 # IPython shell's own shutdown).
94 # IPython shell's own shutdown).
95 _shutdown_message = None
95 _shutdown_message = None
96
96
97 # This is a dict of port number that the kernel is listening on. It is set
97 # This is a dict of port number that the kernel is listening on. It is set
98 # by record_ports and used by connect_request.
98 # by record_ports and used by connect_request.
99 _recorded_ports = Dict()
99 _recorded_ports = Dict()
100
100
101 # set of aborted msg_ids
101 # set of aborted msg_ids
102 aborted = Set()
102 aborted = Set()
103
103
104 # Track execution count here. For IPython, we override this to use the
104 # Track execution count here. For IPython, we override this to use the
105 # execution count we store in the shell.
105 # execution count we store in the shell.
106 execution_count = 0
106 execution_count = 0
107
107
108
108
109 def __init__(self, **kwargs):
109 def __init__(self, **kwargs):
110 super(Kernel, self).__init__(**kwargs)
110 super(Kernel, self).__init__(**kwargs)
111
111
112 # Build dict of handlers for message types
112 # Build dict of handlers for message types
113 msg_types = [ 'execute_request', 'complete_request',
113 msg_types = [ 'execute_request', 'complete_request',
114 'inspect_request', 'history_request',
114 'inspect_request', 'history_request',
115 'kernel_info_request',
115 'kernel_info_request',
116 'connect_request', 'shutdown_request',
116 'connect_request', 'shutdown_request',
117 'apply_request',
117 'apply_request',
118 ]
118 ]
119 self.shell_handlers = {}
119 self.shell_handlers = {}
120 for msg_type in msg_types:
120 for msg_type in msg_types:
121 self.shell_handlers[msg_type] = getattr(self, msg_type)
121 self.shell_handlers[msg_type] = getattr(self, msg_type)
122
122
123 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
123 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
124 self.control_handlers = {}
124 self.control_handlers = {}
125 for msg_type in control_msg_types:
125 for msg_type in control_msg_types:
126 self.control_handlers[msg_type] = getattr(self, msg_type)
126 self.control_handlers[msg_type] = getattr(self, msg_type)
127
127
128
128
129 def dispatch_control(self, msg):
129 def dispatch_control(self, msg):
130 """dispatch control requests"""
130 """dispatch control requests"""
131 idents,msg = self.session.feed_identities(msg, copy=False)
131 idents,msg = self.session.feed_identities(msg, copy=False)
132 try:
132 try:
133 msg = self.session.unserialize(msg, content=True, copy=False)
133 msg = self.session.unserialize(msg, content=True, copy=False)
134 except:
134 except:
135 self.log.error("Invalid Control Message", exc_info=True)
135 self.log.error("Invalid Control Message", exc_info=True)
136 return
136 return
137
137
138 self.log.debug("Control received: %s", msg)
138 self.log.debug("Control received: %s", msg)
139
139
140 # Set the parent message for side effects.
140 # Set the parent message for side effects.
141 self.set_parent(idents, msg)
141 self.set_parent(idents, msg)
142 self._publish_status(u'busy')
142 self._publish_status(u'busy')
143
143
144 header = msg['header']
144 header = msg['header']
145 msg_type = header['msg_type']
145 msg_type = header['msg_type']
146
146
147 handler = self.control_handlers.get(msg_type, None)
147 handler = self.control_handlers.get(msg_type, None)
148 if handler is None:
148 if handler is None:
149 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
149 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
150 else:
150 else:
151 try:
151 try:
152 handler(self.control_stream, idents, msg)
152 handler(self.control_stream, idents, msg)
153 except Exception:
153 except Exception:
154 self.log.error("Exception in control handler:", exc_info=True)
154 self.log.error("Exception in control handler:", exc_info=True)
155
155
156 sys.stdout.flush()
156 sys.stdout.flush()
157 sys.stderr.flush()
157 sys.stderr.flush()
158 self._publish_status(u'idle')
158 self._publish_status(u'idle')
159
159
160 def dispatch_shell(self, stream, msg):
160 def dispatch_shell(self, stream, msg):
161 """dispatch shell requests"""
161 """dispatch shell requests"""
162 # flush control requests first
162 # flush control requests first
163 if self.control_stream:
163 if self.control_stream:
164 self.control_stream.flush()
164 self.control_stream.flush()
165
165
166 idents,msg = self.session.feed_identities(msg, copy=False)
166 idents,msg = self.session.feed_identities(msg, copy=False)
167 try:
167 try:
168 msg = self.session.unserialize(msg, content=True, copy=False)
168 msg = self.session.unserialize(msg, content=True, copy=False)
169 except:
169 except:
170 self.log.error("Invalid Message", exc_info=True)
170 self.log.error("Invalid Message", exc_info=True)
171 return
171 return
172
172
173 # Set the parent message for side effects.
173 # Set the parent message for side effects.
174 self.set_parent(idents, msg)
174 self.set_parent(idents, msg)
175 self._publish_status(u'busy')
175 self._publish_status(u'busy')
176
176
177 header = msg['header']
177 header = msg['header']
178 msg_id = header['msg_id']
178 msg_id = header['msg_id']
179 msg_type = msg['header']['msg_type']
179 msg_type = msg['header']['msg_type']
180
180
181 # Print some info about this message and leave a '--->' marker, so it's
181 # Print some info about this message and leave a '--->' marker, so it's
182 # easier to trace visually the message chain when debugging. Each
182 # easier to trace visually the message chain when debugging. Each
183 # handler prints its message at the end.
183 # handler prints its message at the end.
184 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
184 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
185 self.log.debug(' Content: %s\n --->\n ', msg['content'])
185 self.log.debug(' Content: %s\n --->\n ', msg['content'])
186
186
187 if msg_id in self.aborted:
187 if msg_id in self.aborted:
188 self.aborted.remove(msg_id)
188 self.aborted.remove(msg_id)
189 # is it safe to assume a msg_id will not be resubmitted?
189 # is it safe to assume a msg_id will not be resubmitted?
190 reply_type = msg_type.split('_')[0] + '_reply'
190 reply_type = msg_type.split('_')[0] + '_reply'
191 status = {'status' : 'aborted'}
191 status = {'status' : 'aborted'}
192 md = {'engine' : self.ident}
192 md = {'engine' : self.ident}
193 md.update(status)
193 md.update(status)
194 self.session.send(stream, reply_type, metadata=md,
194 self.session.send(stream, reply_type, metadata=md,
195 content=status, parent=msg, ident=idents)
195 content=status, parent=msg, ident=idents)
196 return
196 return
197
197
198 handler = self.shell_handlers.get(msg_type, None)
198 handler = self.shell_handlers.get(msg_type, None)
199 if handler is None:
199 if handler is None:
200 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
200 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
201 else:
201 else:
202 # ensure default_int_handler during handler call
202 # ensure default_int_handler during handler call
203 sig = signal(SIGINT, default_int_handler)
203 sig = signal(SIGINT, default_int_handler)
204 self.log.debug("%s: %s", msg_type, msg)
204 self.log.debug("%s: %s", msg_type, msg)
205 try:
205 try:
206 handler(stream, idents, msg)
206 handler(stream, idents, msg)
207 except Exception:
207 except Exception:
208 self.log.error("Exception in message handler:", exc_info=True)
208 self.log.error("Exception in message handler:", exc_info=True)
209 finally:
209 finally:
210 signal(SIGINT, sig)
210 signal(SIGINT, sig)
211
211
212 sys.stdout.flush()
212 sys.stdout.flush()
213 sys.stderr.flush()
213 sys.stderr.flush()
214 self._publish_status(u'idle')
214 self._publish_status(u'idle')
215
215
216 def enter_eventloop(self):
216 def enter_eventloop(self):
217 """enter eventloop"""
217 """enter eventloop"""
218 self.log.info("entering eventloop %s", self.eventloop)
218 self.log.info("entering eventloop %s", self.eventloop)
219 for stream in self.shell_streams:
219 for stream in self.shell_streams:
220 # flush any pending replies,
220 # flush any pending replies,
221 # which may be skipped by entering the eventloop
221 # which may be skipped by entering the eventloop
222 stream.flush(zmq.POLLOUT)
222 stream.flush(zmq.POLLOUT)
223 # restore default_int_handler
223 # restore default_int_handler
224 signal(SIGINT, default_int_handler)
224 signal(SIGINT, default_int_handler)
225 while self.eventloop is not None:
225 while self.eventloop is not None:
226 try:
226 try:
227 self.eventloop(self)
227 self.eventloop(self)
228 except KeyboardInterrupt:
228 except KeyboardInterrupt:
229 # Ctrl-C shouldn't crash the kernel
229 # Ctrl-C shouldn't crash the kernel
230 self.log.error("KeyboardInterrupt caught in kernel")
230 self.log.error("KeyboardInterrupt caught in kernel")
231 continue
231 continue
232 else:
232 else:
233 # eventloop exited cleanly, this means we should stop (right?)
233 # eventloop exited cleanly, this means we should stop (right?)
234 self.eventloop = None
234 self.eventloop = None
235 break
235 break
236 self.log.info("exiting eventloop")
236 self.log.info("exiting eventloop")
237
237
238 def start(self):
238 def start(self):
239 """register dispatchers for streams"""
239 """register dispatchers for streams"""
240 if self.control_stream:
240 if self.control_stream:
241 self.control_stream.on_recv(self.dispatch_control, copy=False)
241 self.control_stream.on_recv(self.dispatch_control, copy=False)
242
242
243 def make_dispatcher(stream):
243 def make_dispatcher(stream):
244 def dispatcher(msg):
244 def dispatcher(msg):
245 return self.dispatch_shell(stream, msg)
245 return self.dispatch_shell(stream, msg)
246 return dispatcher
246 return dispatcher
247
247
248 for s in self.shell_streams:
248 for s in self.shell_streams:
249 s.on_recv(make_dispatcher(s), copy=False)
249 s.on_recv(make_dispatcher(s), copy=False)
250
250
251 # publish idle status
251 # publish idle status
252 self._publish_status('starting')
252 self._publish_status('starting')
253
253
254 def do_one_iteration(self):
254 def do_one_iteration(self):
255 """step eventloop just once"""
255 """step eventloop just once"""
256 if self.control_stream:
256 if self.control_stream:
257 self.control_stream.flush()
257 self.control_stream.flush()
258 for stream in self.shell_streams:
258 for stream in self.shell_streams:
259 # handle at most one request per iteration
259 # handle at most one request per iteration
260 stream.flush(zmq.POLLIN, 1)
260 stream.flush(zmq.POLLIN, 1)
261 stream.flush(zmq.POLLOUT)
261 stream.flush(zmq.POLLOUT)
262
262
263
263
264 def record_ports(self, ports):
264 def record_ports(self, ports):
265 """Record the ports that this kernel is using.
265 """Record the ports that this kernel is using.
266
266
267 The creator of the Kernel instance must call this methods if they
267 The creator of the Kernel instance must call this methods if they
268 want the :meth:`connect_request` method to return the port numbers.
268 want the :meth:`connect_request` method to return the port numbers.
269 """
269 """
270 self._recorded_ports = ports
270 self._recorded_ports = ports
271
271
272 #---------------------------------------------------------------------------
272 #---------------------------------------------------------------------------
273 # Kernel request handlers
273 # Kernel request handlers
274 #---------------------------------------------------------------------------
274 #---------------------------------------------------------------------------
275
275
276 def _make_metadata(self, other=None):
276 def _make_metadata(self, other=None):
277 """init metadata dict, for execute/apply_reply"""
277 """init metadata dict, for execute/apply_reply"""
278 new_md = {
278 new_md = {
279 'dependencies_met' : True,
279 'dependencies_met' : True,
280 'engine' : self.ident,
280 'engine' : self.ident,
281 'started': datetime.now(),
281 'started': datetime.now(),
282 }
282 }
283 if other:
283 if other:
284 new_md.update(other)
284 new_md.update(other)
285 return new_md
285 return new_md
286
286
287 def _publish_execute_input(self, code, parent, execution_count):
287 def _publish_execute_input(self, code, parent, execution_count):
288 """Publish the code request on the iopub stream."""
288 """Publish the code request on the iopub stream."""
289
289
290 self.session.send(self.iopub_socket, u'execute_input',
290 self.session.send(self.iopub_socket, u'execute_input',
291 {u'code':code, u'execution_count': execution_count},
291 {u'code':code, u'execution_count': execution_count},
292 parent=parent, ident=self._topic('execute_input')
292 parent=parent, ident=self._topic('execute_input')
293 )
293 )
294
294
295 def _publish_status(self, status, parent=None):
295 def _publish_status(self, status, parent=None):
296 """send status (busy/idle) on IOPub"""
296 """send status (busy/idle) on IOPub"""
297 self.session.send(self.iopub_socket,
297 self.session.send(self.iopub_socket,
298 u'status',
298 u'status',
299 {u'execution_state': status},
299 {u'execution_state': status},
300 parent=parent or self._parent_header,
300 parent=parent or self._parent_header,
301 ident=self._topic('status'),
301 ident=self._topic('status'),
302 )
302 )
303
303
304 def set_parent(self, ident, parent):
304 def set_parent(self, ident, parent):
305 """Set the current parent_header
305 """Set the current parent_header
306
306
307 Side effects (IOPub messages) and replies are associated with
307 Side effects (IOPub messages) and replies are associated with
308 the request that caused them via the parent_header.
308 the request that caused them via the parent_header.
309
309
310 The parent identity is used to route input_request messages
310 The parent identity is used to route input_request messages
311 on the stdin channel.
311 on the stdin channel.
312 """
312 """
313 self._parent_ident = ident
313 self._parent_ident = ident
314 self._parent_header = parent
314 self._parent_header = parent
315
315
316 def send_response(self, stream, msg_or_type, content=None, ident=None,
316 def send_response(self, stream, msg_or_type, content=None, ident=None,
317 buffers=None, track=False, header=None, metadata=None):
317 buffers=None, track=False, header=None, metadata=None):
318 """Send a response to the message we're currently processing.
318 """Send a response to the message we're currently processing.
319
319
320 This accepts all the parameters of :meth:`IPython.kernel.zmq.session.Session.send`
320 This accepts all the parameters of :meth:`IPython.kernel.zmq.session.Session.send`
321 except ``parent``.
321 except ``parent``.
322
322
323 This relies on :meth:`set_parent` having been called for the current
323 This relies on :meth:`set_parent` having been called for the current
324 message.
324 message.
325 """
325 """
326 return self.session.send(stream, msg_or_type, content, self._parent_header,
326 return self.session.send(stream, msg_or_type, content, self._parent_header,
327 ident, buffers, track, header, metadata)
327 ident, buffers, track, header, metadata)
328
328
329 def execute_request(self, stream, ident, parent):
329 def execute_request(self, stream, ident, parent):
330 """handle an execute_request"""
330 """handle an execute_request"""
331
331
332 try:
332 try:
333 content = parent[u'content']
333 content = parent[u'content']
334 code = py3compat.cast_unicode_py2(content[u'code'])
334 code = py3compat.cast_unicode_py2(content[u'code'])
335 silent = content[u'silent']
335 silent = content[u'silent']
336 store_history = content.get(u'store_history', not silent)
336 store_history = content.get(u'store_history', not silent)
337 user_expressions = content.get('user_expressions', {})
337 user_expressions = content.get('user_expressions', {})
338 allow_stdin = content.get('allow_stdin', False)
338 allow_stdin = content.get('allow_stdin', False)
339 except:
339 except:
340 self.log.error("Got bad msg: ")
340 self.log.error("Got bad msg: ")
341 self.log.error("%s", parent)
341 self.log.error("%s", parent)
342 return
342 return
343
343
344 md = self._make_metadata(parent['metadata'])
344 md = self._make_metadata(parent['metadata'])
345
345
346 # Re-broadcast our input for the benefit of listening clients, and
346 # Re-broadcast our input for the benefit of listening clients, and
347 # start computing output
347 # start computing output
348 if not silent:
348 if not silent:
349 self.execution_count += 1
349 self.execution_count += 1
350 self._publish_execute_input(code, parent, self.execution_count)
350 self._publish_execute_input(code, parent, self.execution_count)
351
351
352 reply_content = self.do_execute(code, silent, store_history,
352 reply_content = self.do_execute(code, silent, store_history,
353 user_expressions, allow_stdin)
353 user_expressions, allow_stdin)
354
354
355 # Flush output before sending the reply.
355 # Flush output before sending the reply.
356 sys.stdout.flush()
356 sys.stdout.flush()
357 sys.stderr.flush()
357 sys.stderr.flush()
358 # FIXME: on rare occasions, the flush doesn't seem to make it to the
358 # FIXME: on rare occasions, the flush doesn't seem to make it to the
359 # clients... This seems to mitigate the problem, but we definitely need
359 # clients... This seems to mitigate the problem, but we definitely need
360 # to better understand what's going on.
360 # to better understand what's going on.
361 if self._execute_sleep:
361 if self._execute_sleep:
362 time.sleep(self._execute_sleep)
362 time.sleep(self._execute_sleep)
363
363
364 # Send the reply.
364 # Send the reply.
365 reply_content = json_clean(reply_content)
365 reply_content = json_clean(reply_content)
366
366
367 md['status'] = reply_content['status']
367 md['status'] = reply_content['status']
368 if reply_content['status'] == 'error' and \
368 if reply_content['status'] == 'error' and \
369 reply_content['ename'] == 'UnmetDependency':
369 reply_content['ename'] == 'UnmetDependency':
370 md['dependencies_met'] = False
370 md['dependencies_met'] = False
371
371
372 reply_msg = self.session.send(stream, u'execute_reply',
372 reply_msg = self.session.send(stream, u'execute_reply',
373 reply_content, parent, metadata=md,
373 reply_content, parent, metadata=md,
374 ident=ident)
374 ident=ident)
375
375
376 self.log.debug("%s", reply_msg)
376 self.log.debug("%s", reply_msg)
377
377
378 if not silent and reply_msg['content']['status'] == u'error':
378 if not silent and reply_msg['content']['status'] == u'error':
379 self._abort_queues()
379 self._abort_queues()
380
380
381 def do_execute(self, code, silent, store_history=True,
381 def do_execute(self, code, silent, store_history=True,
382 user_experssions=None, allow_stdin=False):
382 user_experssions=None, allow_stdin=False):
383 """Execute user code. Must be overridden by subclasses.
383 """Execute user code. Must be overridden by subclasses.
384 """
384 """
385 raise NotImplementedError
385 raise NotImplementedError
386
386
387 def complete_request(self, stream, ident, parent):
387 def complete_request(self, stream, ident, parent):
388 content = parent['content']
388 content = parent['content']
389 code = content['code']
389 code = content['code']
390 cursor_pos = content['cursor_pos']
390 cursor_pos = content['cursor_pos']
391
391
392 matches = self.do_complete(code, cursor_pos)
392 matches = self.do_complete(code, cursor_pos)
393 matches = json_clean(matches)
393 matches = json_clean(matches)
394 completion_msg = self.session.send(stream, 'complete_reply',
394 completion_msg = self.session.send(stream, 'complete_reply',
395 matches, parent, ident)
395 matches, parent, ident)
396 self.log.debug("%s", completion_msg)
396 self.log.debug("%s", completion_msg)
397
397
398 def do_complete(self, code, cursor_pos):
398 def do_complete(self, code, cursor_pos):
399 """Override in subclasses to find completions.
399 """Override in subclasses to find completions.
400 """
400 """
401 return {'matches' : [],
401 return {'matches' : [],
402 'cursor_end' : cursor_pos,
402 'cursor_end' : cursor_pos,
403 'cursor_start' : cursor_pos,
403 'cursor_start' : cursor_pos,
404 'metadata' : {},
404 'metadata' : {},
405 'status' : 'ok'}
405 'status' : 'ok'}
406
406
407 def inspect_request(self, stream, ident, parent):
407 def inspect_request(self, stream, ident, parent):
408 content = parent['content']
408 content = parent['content']
409
409
410 reply_content = self.do_inspect(content['code'], content['cursor_pos'],
410 reply_content = self.do_inspect(content['code'], content['cursor_pos'],
411 content.get('detail_level', 0))
411 content.get('detail_level', 0))
412 # Before we send this object over, we scrub it for JSON usage
412 # Before we send this object over, we scrub it for JSON usage
413 reply_content = json_clean(reply_content)
413 reply_content = json_clean(reply_content)
414 msg = self.session.send(stream, 'inspect_reply',
414 msg = self.session.send(stream, 'inspect_reply',
415 reply_content, parent, ident)
415 reply_content, parent, ident)
416 self.log.debug("%s", msg)
416 self.log.debug("%s", msg)
417
417
418 def do_inspect(self, code, cursor_pos, detail_level=0):
418 def do_inspect(self, code, cursor_pos, detail_level=0):
419 """Override in subclasses to allow introspection.
419 """Override in subclasses to allow introspection.
420 """
420 """
421 return {'status': 'ok', 'data':{}, 'metadata':{}, 'found':False}
421 return {'status': 'ok', 'data':{}, 'metadata':{}, 'found':False}
422
422
423 def history_request(self, stream, ident, parent):
423 def history_request(self, stream, ident, parent):
424 content = parent['content']
424 content = parent['content']
425
425
426 reply_content = self.do_history(**content)
426 reply_content = self.do_history(**content)
427
427
428 reply_content = json_clean(reply_content)
428 reply_content = json_clean(reply_content)
429 msg = self.session.send(stream, 'history_reply',
429 msg = self.session.send(stream, 'history_reply',
430 reply_content, parent, ident)
430 reply_content, parent, ident)
431 self.log.debug("%s", msg)
431 self.log.debug("%s", msg)
432
432
433 def do_history(self, hist_access_type, output, raw, session=None, start=None,
433 def do_history(self, hist_access_type, output, raw, session=None, start=None,
434 stop=None, n=None, pattern=None, unique=False):
434 stop=None, n=None, pattern=None, unique=False):
435 """Override in subclasses to access history.
435 """Override in subclasses to access history.
436 """
436 """
437 return {'history': []}
437 return {'history': []}
438
438
439 def connect_request(self, stream, ident, parent):
439 def connect_request(self, stream, ident, parent):
440 if self._recorded_ports is not None:
440 if self._recorded_ports is not None:
441 content = self._recorded_ports.copy()
441 content = self._recorded_ports.copy()
442 else:
442 else:
443 content = {}
443 content = {}
444 msg = self.session.send(stream, 'connect_reply',
444 msg = self.session.send(stream, 'connect_reply',
445 content, parent, ident)
445 content, parent, ident)
446 self.log.debug("%s", msg)
446 self.log.debug("%s", msg)
447
447
448 @property
448 @property
449 def kernel_info(self):
449 def kernel_info(self):
450 return {
450 return {
451 'protocol_version': release.kernel_protocol_version,
451 'protocol_version': release.kernel_protocol_version,
452 'implementation': self.implementation,
452 'implementation': self.implementation,
453 'implementation_version': self.implementation_version,
453 'implementation_version': self.implementation_version,
454 'language': self.language,
454 'language': self.language,
455 'language_version': self.language_version,
455 'language_version': self.language_version,
456 'banner': self.banner,
456 'banner': self.banner,
457 }
457 }
458
458
459 def kernel_info_request(self, stream, ident, parent):
459 def kernel_info_request(self, stream, ident, parent):
460 msg = self.session.send(stream, 'kernel_info_reply',
460 msg = self.session.send(stream, 'kernel_info_reply',
461 self.kernel_info, parent, ident)
461 self.kernel_info, parent, ident)
462 self.log.debug("%s", msg)
462 self.log.debug("%s", msg)
463
463
464 def shutdown_request(self, stream, ident, parent):
464 def shutdown_request(self, stream, ident, parent):
465 content = self.do_shutdown(parent['content']['restart'])
465 content = self.do_shutdown(parent['content']['restart'])
466 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
466 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
467 # same content, but different msg_id for broadcasting on IOPub
467 # same content, but different msg_id for broadcasting on IOPub
468 self._shutdown_message = self.session.msg(u'shutdown_reply',
468 self._shutdown_message = self.session.msg(u'shutdown_reply',
469 content, parent
469 content, parent
470 )
470 )
471
471
472 self._at_shutdown()
472 self._at_shutdown()
473 # call sys.exit after a short delay
473 # call sys.exit after a short delay
474 loop = ioloop.IOLoop.instance()
474 loop = ioloop.IOLoop.instance()
475 loop.add_timeout(time.time()+0.1, loop.stop)
475 loop.add_timeout(time.time()+0.1, loop.stop)
476
476
477 def do_shutdown(self, restart):
477 def do_shutdown(self, restart):
478 """Override in subclasses to do things when the frontend shuts down the
478 """Override in subclasses to do things when the frontend shuts down the
479 kernel.
479 kernel.
480 """
480 """
481 return {'status': 'ok', 'restart': restart}
481 return {'status': 'ok', 'restart': restart}
482
482
483 #---------------------------------------------------------------------------
483 #---------------------------------------------------------------------------
484 # Engine methods
484 # Engine methods
485 #---------------------------------------------------------------------------
485 #---------------------------------------------------------------------------
486
486
487 def apply_request(self, stream, ident, parent):
487 def apply_request(self, stream, ident, parent):
488 try:
488 try:
489 content = parent[u'content']
489 content = parent[u'content']
490 bufs = parent[u'buffers']
490 bufs = parent[u'buffers']
491 msg_id = parent['header']['msg_id']
491 msg_id = parent['header']['msg_id']
492 except:
492 except:
493 self.log.error("Got bad msg: %s", parent, exc_info=True)
493 self.log.error("Got bad msg: %s", parent, exc_info=True)
494 return
494 return
495
495
496 md = self._make_metadata(parent['metadata'])
496 md = self._make_metadata(parent['metadata'])
497
497
498 reply_content, result_buf = self.do_apply(content, bufs, msg_id, md)
498 reply_content, result_buf = self.do_apply(content, bufs, msg_id, md)
499
499
500 # put 'ok'/'error' status in header, for scheduler introspection:
500 # put 'ok'/'error' status in header, for scheduler introspection:
501 md['status'] = reply_content['status']
501 md['status'] = reply_content['status']
502
502
503 # flush i/o
503 # flush i/o
504 sys.stdout.flush()
504 sys.stdout.flush()
505 sys.stderr.flush()
505 sys.stderr.flush()
506
506
507 self.session.send(stream, u'apply_reply', reply_content,
507 self.session.send(stream, u'apply_reply', reply_content,
508 parent=parent, ident=ident,buffers=result_buf, metadata=md)
508 parent=parent, ident=ident,buffers=result_buf, metadata=md)
509
509
510 def do_apply(self, content, bufs, msg_id, reply_metadata):
510 def do_apply(self, content, bufs, msg_id, reply_metadata):
511 """Override in subclasses to support the IPython parallel framework.
511 """Override in subclasses to support the IPython parallel framework.
512 """
512 """
513 raise NotImplementedError
513 raise NotImplementedError
514
514
515 #---------------------------------------------------------------------------
515 #---------------------------------------------------------------------------
516 # Control messages
516 # Control messages
517 #---------------------------------------------------------------------------
517 #---------------------------------------------------------------------------
518
518
519 def abort_request(self, stream, ident, parent):
519 def abort_request(self, stream, ident, parent):
520 """abort a specifig msg by id"""
520 """abort a specific msg by id"""
521 msg_ids = parent['content'].get('msg_ids', None)
521 msg_ids = parent['content'].get('msg_ids', None)
522 if isinstance(msg_ids, string_types):
522 if isinstance(msg_ids, string_types):
523 msg_ids = [msg_ids]
523 msg_ids = [msg_ids]
524 if not msg_ids:
524 if not msg_ids:
525 self.abort_queues()
525 self._abort_queues()
526 for mid in msg_ids:
526 for mid in msg_ids:
527 self.aborted.add(str(mid))
527 self.aborted.add(str(mid))
528
528
529 content = dict(status='ok')
529 content = dict(status='ok')
530 reply_msg = self.session.send(stream, 'abort_reply', content=content,
530 reply_msg = self.session.send(stream, 'abort_reply', content=content,
531 parent=parent, ident=ident)
531 parent=parent, ident=ident)
532 self.log.debug("%s", reply_msg)
532 self.log.debug("%s", reply_msg)
533
533
534 def clear_request(self, stream, idents, parent):
534 def clear_request(self, stream, idents, parent):
535 """Clear our namespace."""
535 """Clear our namespace."""
536 content = self.do_clear()
536 content = self.do_clear()
537 self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
537 self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
538 content = content)
538 content = content)
539
539
540 def do_clear(self):
540 def do_clear(self):
541 """Override in subclasses to clear the namespace
541 """Override in subclasses to clear the namespace
542
542
543 This is only required for IPython.parallel.
543 This is only required for IPython.parallel.
544 """
544 """
545 raise NotImplementedError
545 raise NotImplementedError
546
546
547 #---------------------------------------------------------------------------
547 #---------------------------------------------------------------------------
548 # Protected interface
548 # Protected interface
549 #---------------------------------------------------------------------------
549 #---------------------------------------------------------------------------
550
550
551 def _topic(self, topic):
551 def _topic(self, topic):
552 """prefixed topic for IOPub messages"""
552 """prefixed topic for IOPub messages"""
553 if self.int_id >= 0:
553 if self.int_id >= 0:
554 base = "engine.%i" % self.int_id
554 base = "engine.%i" % self.int_id
555 else:
555 else:
556 base = "kernel.%s" % self.ident
556 base = "kernel.%s" % self.ident
557
557
558 return py3compat.cast_bytes("%s.%s" % (base, topic))
558 return py3compat.cast_bytes("%s.%s" % (base, topic))
559
559
560 def _abort_queues(self):
560 def _abort_queues(self):
561 for stream in self.shell_streams:
561 for stream in self.shell_streams:
562 if stream:
562 if stream:
563 self._abort_queue(stream)
563 self._abort_queue(stream)
564
564
565 def _abort_queue(self, stream):
565 def _abort_queue(self, stream):
566 poller = zmq.Poller()
566 poller = zmq.Poller()
567 poller.register(stream.socket, zmq.POLLIN)
567 poller.register(stream.socket, zmq.POLLIN)
568 while True:
568 while True:
569 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
569 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
570 if msg is None:
570 if msg is None:
571 return
571 return
572
572
573 self.log.info("Aborting:")
573 self.log.info("Aborting:")
574 self.log.info("%s", msg)
574 self.log.info("%s", msg)
575 msg_type = msg['header']['msg_type']
575 msg_type = msg['header']['msg_type']
576 reply_type = msg_type.split('_')[0] + '_reply'
576 reply_type = msg_type.split('_')[0] + '_reply'
577
577
578 status = {'status' : 'aborted'}
578 status = {'status' : 'aborted'}
579 md = {'engine' : self.ident}
579 md = {'engine' : self.ident}
580 md.update(status)
580 md.update(status)
581 reply_msg = self.session.send(stream, reply_type, metadata=md,
581 reply_msg = self.session.send(stream, reply_type, metadata=md,
582 content=status, parent=msg, ident=idents)
582 content=status, parent=msg, ident=idents)
583 self.log.debug("%s", reply_msg)
583 self.log.debug("%s", reply_msg)
584 # We need to wait a bit for requests to come in. This can probably
584 # We need to wait a bit for requests to come in. This can probably
585 # be set shorter for true asynchronous clients.
585 # be set shorter for true asynchronous clients.
586 poller.poll(50)
586 poller.poll(50)
587
587
588
588
589 def _no_raw_input(self):
589 def _no_raw_input(self):
590 """Raise StdinNotImplentedError if active frontend doesn't support
590 """Raise StdinNotImplentedError if active frontend doesn't support
591 stdin."""
591 stdin."""
592 raise StdinNotImplementedError("raw_input was called, but this "
592 raise StdinNotImplementedError("raw_input was called, but this "
593 "frontend does not support stdin.")
593 "frontend does not support stdin.")
594
594
595 def getpass(self, prompt=''):
595 def getpass(self, prompt=''):
596 """Forward getpass to frontends
596 """Forward getpass to frontends
597
597
598 Raises
598 Raises
599 ------
599 ------
600 StdinNotImplentedError if active frontend doesn't support stdin.
600 StdinNotImplentedError if active frontend doesn't support stdin.
601 """
601 """
602 if not self._allow_stdin:
602 if not self._allow_stdin:
603 raise StdinNotImplementedError(
603 raise StdinNotImplementedError(
604 "getpass was called, but this frontend does not support input requests."
604 "getpass was called, but this frontend does not support input requests."
605 )
605 )
606 return self._input_request(prompt,
606 return self._input_request(prompt,
607 self._parent_ident,
607 self._parent_ident,
608 self._parent_header,
608 self._parent_header,
609 password=True,
609 password=True,
610 )
610 )
611
611
612 def raw_input(self, prompt=''):
612 def raw_input(self, prompt=''):
613 """Forward raw_input to frontends
613 """Forward raw_input to frontends
614
614
615 Raises
615 Raises
616 ------
616 ------
617 StdinNotImplentedError if active frontend doesn't support stdin.
617 StdinNotImplentedError if active frontend doesn't support stdin.
618 """
618 """
619 if not self._allow_stdin:
619 if not self._allow_stdin:
620 raise StdinNotImplementedError(
620 raise StdinNotImplementedError(
621 "raw_input was called, but this frontend does not support input requests."
621 "raw_input was called, but this frontend does not support input requests."
622 )
622 )
623 return self._input_request(prompt,
623 return self._input_request(prompt,
624 self._parent_ident,
624 self._parent_ident,
625 self._parent_header,
625 self._parent_header,
626 password=False,
626 password=False,
627 )
627 )
628
628
629 def _input_request(self, prompt, ident, parent, password=False):
629 def _input_request(self, prompt, ident, parent, password=False):
630 # Flush output before making the request.
630 # Flush output before making the request.
631 sys.stderr.flush()
631 sys.stderr.flush()
632 sys.stdout.flush()
632 sys.stdout.flush()
633 # flush the stdin socket, to purge stale replies
633 # flush the stdin socket, to purge stale replies
634 while True:
634 while True:
635 try:
635 try:
636 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
636 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
637 except zmq.ZMQError as e:
637 except zmq.ZMQError as e:
638 if e.errno == zmq.EAGAIN:
638 if e.errno == zmq.EAGAIN:
639 break
639 break
640 else:
640 else:
641 raise
641 raise
642
642
643 # Send the input request.
643 # Send the input request.
644 content = json_clean(dict(prompt=prompt, password=password))
644 content = json_clean(dict(prompt=prompt, password=password))
645 self.session.send(self.stdin_socket, u'input_request', content, parent,
645 self.session.send(self.stdin_socket, u'input_request', content, parent,
646 ident=ident)
646 ident=ident)
647
647
648 # Await a response.
648 # Await a response.
649 while True:
649 while True:
650 try:
650 try:
651 ident, reply = self.session.recv(self.stdin_socket, 0)
651 ident, reply = self.session.recv(self.stdin_socket, 0)
652 except Exception:
652 except Exception:
653 self.log.warn("Invalid Message:", exc_info=True)
653 self.log.warn("Invalid Message:", exc_info=True)
654 except KeyboardInterrupt:
654 except KeyboardInterrupt:
655 # re-raise KeyboardInterrupt, to truncate traceback
655 # re-raise KeyboardInterrupt, to truncate traceback
656 raise KeyboardInterrupt
656 raise KeyboardInterrupt
657 else:
657 else:
658 break
658 break
659 try:
659 try:
660 value = py3compat.unicode_to_str(reply['content']['value'])
660 value = py3compat.unicode_to_str(reply['content']['value'])
661 except:
661 except:
662 self.log.error("Bad input_reply: %s", parent)
662 self.log.error("Bad input_reply: %s", parent)
663 value = ''
663 value = ''
664 if value == '\x04':
664 if value == '\x04':
665 # EOF
665 # EOF
666 raise EOFError
666 raise EOFError
667 return value
667 return value
668
668
669 def _at_shutdown(self):
669 def _at_shutdown(self):
670 """Actions taken at shutdown by the kernel, called by python's atexit.
670 """Actions taken at shutdown by the kernel, called by python's atexit.
671 """
671 """
672 # io.rprint("Kernel at_shutdown") # dbg
672 # io.rprint("Kernel at_shutdown") # dbg
673 if self._shutdown_message is not None:
673 if self._shutdown_message is not None:
674 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
674 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
675 self.log.debug("%s", self._shutdown_message)
675 self.log.debug("%s", self._shutdown_message)
676 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
676 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
677
General Comments 0
You need to be logged in to leave comments. Login now