##// END OF EJS Templates
Stub out kernel methods to allow overriding a subset of them.
Thomas Kluyver -
Show More
@@ -1,286 +1,290 b''
1 1 """The IPython kernel implementation"""
2 2
3 3 import getpass
4 4 import sys
5 5 import traceback
6 6
7 7 from IPython.core import release
8 8 from IPython.utils.py3compat import builtin_mod, PY3
9 9 from IPython.utils.tokenutil import token_at_cursor
10 10 from IPython.utils.traitlets import Instance, Type, Any
11 11
12 12 from .kernelbase import KernelBase
13 13 from .serialize import serialize_object, unpack_apply_message
14 14 from .zmqshell import ZMQInteractiveShell
15 15
16 16 class Kernel(KernelBase):
17 17 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
18 18 shell_class = Type(ZMQInteractiveShell)
19 19
20 20 user_module = Any()
21 21 def _user_module_changed(self, name, old, new):
22 22 if self.shell is not None:
23 23 self.shell.user_module = new
24 24
25 25 user_ns = Instance(dict, args=None, allow_none=True)
26 26 def _user_ns_changed(self, name, old, new):
27 27 if self.shell is not None:
28 28 self.shell.user_ns = new
29 29 self.shell.init_user_ns()
30 30
31 31 # A reference to the Python builtin 'raw_input' function.
32 32 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
33 33 _sys_raw_input = Any()
34 34 _sys_eval_input = Any()
35 35
36 36 def __init__(self, **kwargs):
37 37 super(Kernel, self).__init__(**kwargs)
38 38
39 39 # Initialize the InteractiveShell subclass
40 40 self.shell = self.shell_class.instance(parent=self,
41 41 profile_dir = self.profile_dir,
42 42 user_module = self.user_module,
43 43 user_ns = self.user_ns,
44 44 kernel = self,
45 45 )
46 46 self.shell.displayhook.session = self.session
47 47 self.shell.displayhook.pub_socket = self.iopub_socket
48 48 self.shell.displayhook.topic = self._topic('execute_result')
49 49 self.shell.display_pub.session = self.session
50 50 self.shell.display_pub.pub_socket = self.iopub_socket
51 51 self.shell.data_pub.session = self.session
52 52 self.shell.data_pub.pub_socket = self.iopub_socket
53 53
54 54 # TMP - hack while developing
55 55 self.shell._reply_content = None
56 56
57 57 comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]
58 58 comm_manager = self.shell.comm_manager
59 59 for msg_type in comm_msg_types:
60 60 self.shell_handlers[msg_type] = getattr(comm_manager, msg_type)
61 61
62 62 # Kernel info fields
63 63 implementation = 'ipython'
64 64 implementation_version = release.version
65 65 language = 'python'
66 66 language_version = sys.version.split()[0]
67 67 @property
68 68 def banner(self):
69 69 return self.shell.banner
70 70
71 def start(self):
72 self.shell.exit_now = False
73 super(Kernel, self).start()
74
71 75 def set_parent(self, ident, parent):
72 76 """Overridden from parent to tell the display hook and output streams
73 77 about the parent message.
74 78 """
75 79 super(Kernel, self).set_parent(ident, parent)
76 80 self.shell.set_parent(parent)
77 81
78 82 def _forward_input(self, allow_stdin=False):
79 83 """Forward raw_input and getpass to the current frontend.
80 84
81 85 via input_request
82 86 """
83 87 self._allow_stdin = allow_stdin
84 88
85 89 if PY3:
86 90 self._sys_raw_input = builtin_mod.input
87 91 builtin_mod.input = self.raw_input
88 92 else:
89 93 self._sys_raw_input = builtin_mod.raw_input
90 94 self._sys_eval_input = builtin_mod.input
91 95 builtin_mod.raw_input = self.raw_input
92 96 builtin_mod.input = lambda prompt='': eval(self.raw_input(prompt))
93 97 self._save_getpass = getpass.getpass
94 98 getpass.getpass = self.getpass
95 99
96 100 def _restore_input(self):
97 101 """Restore raw_input, getpass"""
98 102 if PY3:
99 103 builtin_mod.input = self._sys_raw_input
100 104 else:
101 105 builtin_mod.raw_input = self._sys_raw_input
102 106 builtin_mod.input = self._sys_eval_input
103 107
104 108 getpass.getpass = self._save_getpass
105 109
106 110 @property
107 111 def execution_count(self):
108 112 return self.shell.execution_count
109 113
110 114 @execution_count.setter
111 115 def execution_count(self, value):
112 116 # Ignore the incrememnting done by KernelBase, in favour of our shell's
113 117 # execution counter.
114 118 pass
115 119
116 120 def do_execute(self, code, silent, store_history=True,
117 121 user_expressions=None, allow_stdin=False):
118 122 shell = self.shell # we'll need this a lot here
119 123
120 124 self._forward_input(allow_stdin)
121 125
122 126 reply_content = {}
123 127 # FIXME: the shell calls the exception handler itself.
124 128 shell._reply_content = None
125 129 try:
126 130 shell.run_cell(code, store_history=store_history, silent=silent)
127 131 except:
128 132 status = u'error'
129 133 # FIXME: this code right now isn't being used yet by default,
130 134 # because the run_cell() call above directly fires off exception
131 135 # reporting. This code, therefore, is only active in the scenario
132 136 # where runlines itself has an unhandled exception. We need to
133 137 # uniformize this, for all exception construction to come from a
134 138 # single location in the codbase.
135 139 etype, evalue, tb = sys.exc_info()
136 140 tb_list = traceback.format_exception(etype, evalue, tb)
137 141 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
138 142 else:
139 143 status = u'ok'
140 144 finally:
141 145 self._restore_input()
142 146
143 147 reply_content[u'status'] = status
144 148
145 149 # Return the execution counter so clients can display prompts
146 150 reply_content['execution_count'] = shell.execution_count - 1
147 151
148 152 # FIXME - fish exception info out of shell, possibly left there by
149 153 # runlines. We'll need to clean up this logic later.
150 154 if shell._reply_content is not None:
151 155 reply_content.update(shell._reply_content)
152 156 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
153 157 reply_content['engine_info'] = e_info
154 158 # reset after use
155 159 shell._reply_content = None
156 160
157 161 if 'traceback' in reply_content:
158 162 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
159 163
160 164
161 165 # At this point, we can tell whether the main code execution succeeded
162 166 # or not. If it did, we proceed to evaluate user_expressions
163 167 if reply_content['status'] == 'ok':
164 168 reply_content[u'user_expressions'] = \
165 169 shell.user_expressions(user_expressions or {})
166 170 else:
167 171 # If there was an error, don't even try to compute expressions
168 172 reply_content[u'user_expressions'] = {}
169 173
170 174 # Payloads should be retrieved regardless of outcome, so we can both
171 175 # recover partial output (that could have been generated early in a
172 176 # block, before an error) and clear the payload system always.
173 177 reply_content[u'payload'] = shell.payload_manager.read_payload()
174 178 # Be agressive about clearing the payload because we don't want
175 179 # it to sit in memory until the next execute_request comes in.
176 180 shell.payload_manager.clear_payload()
177 181
178 182 return reply_content
179 183
180 184 def do_complete(self, code, cursor_pos):
181 185 txt, matches = self.shell.complete('', code, cursor_pos)
182 186 return {'matches' : matches,
183 187 'cursor_end' : cursor_pos,
184 188 'cursor_start' : cursor_pos - len(txt),
185 189 'metadata' : {},
186 190 'status' : 'ok'}
187 191
188 192 def do_inspect(self, code, cursor_pos, detail_level=0):
189 193 name = token_at_cursor(code, cursor_pos)
190 194 info = self.shell.object_inspect(name)
191 195
192 196 reply_content = {'status' : 'ok'}
193 197 reply_content['data'] = data = {}
194 198 reply_content['metadata'] = {}
195 199 reply_content['found'] = info['found']
196 200 if info['found']:
197 201 info_text = self.shell.object_inspect_text(
198 202 name,
199 203 detail_level=detail_level,
200 204 )
201 205 data['text/plain'] = info_text
202 206
203 207 return reply_content
204 208
205 209 def do_history(self, hist_access_type, output, raw, session=None, start=None,
206 210 stop=None, n=None, pattern=None, unique=False):
207 211 if hist_access_type == 'tail':
208 212 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
209 213 include_latest=True)
210 214
211 215 elif hist_access_type == 'range':
212 216 hist = self.shell.history_manager.get_range(session, start, stop,
213 217 raw=raw, output=output)
214 218
215 219 elif hist_access_type == 'search':
216 220 hist = self.shell.history_manager.search(
217 221 pattern, raw=raw, output=output, n=n, unique=unique)
218 222 else:
219 223 hist = []
220 224
221 225 return {'history' : list(hist)}
222 226
223 227 def do_shutdown(self, restart):
224 228 self.shell.exit_now = True
225 229 return dict(status='ok', restart=restart)
226 230
227 231 def do_apply(self, content, bufs, msg_id, reply_metadata):
228 232 shell = self.shell
229 233 try:
230 234 working = shell.user_ns
231 235
232 236 prefix = "_"+str(msg_id).replace("-","")+"_"
233 237
234 238 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
235 239
236 240 fname = getattr(f, '__name__', 'f')
237 241
238 242 fname = prefix+"f"
239 243 argname = prefix+"args"
240 244 kwargname = prefix+"kwargs"
241 245 resultname = prefix+"result"
242 246
243 247 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
244 248 # print ns
245 249 working.update(ns)
246 250 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
247 251 try:
248 252 exec(code, shell.user_global_ns, shell.user_ns)
249 253 result = working.get(resultname)
250 254 finally:
251 255 for key in ns:
252 256 working.pop(key)
253 257
254 258 result_buf = serialize_object(result,
255 259 buffer_threshold=self.session.buffer_threshold,
256 260 item_threshold=self.session.item_threshold,
257 261 )
258 262
259 263 except:
260 264 # invoke IPython traceback formatting
261 265 shell.showtraceback()
262 266 # FIXME - fish exception info out of shell, possibly left there by
263 267 # run_code. We'll need to clean up this logic later.
264 268 reply_content = {}
265 269 if shell._reply_content is not None:
266 270 reply_content.update(shell._reply_content)
267 271 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
268 272 reply_content['engine_info'] = e_info
269 273 # reset after use
270 274 shell._reply_content = None
271 275
272 276 self.send_response(self.iopub_socket, u'error', reply_content,
273 277 ident=self._topic('error'))
274 278 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
275 279 result_buf = []
276 280
277 281 if reply_content['ename'] == 'UnmetDependency':
278 282 reply_metadata['dependencies_met'] = False
279 283 else:
280 284 reply_content = {'status' : 'ok'}
281 285
282 286 return reply_content, result_buf
283 287
284 288 def do_clear(self):
285 289 self.shell.reset(False)
286 290 return dict(status='ok')
@@ -1,646 +1,689 b''
1 1 """Base class for a kernel that talks to frontends over 0MQ."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from __future__ import print_function
7 7
8 8 import sys
9 9 import time
10 10 import logging
11 11 import uuid
12 12
13 13 from datetime import datetime
14 14 from signal import (
15 15 signal, default_int_handler, SIGINT
16 16 )
17 17
18 18 import zmq
19 19 from zmq.eventloop import ioloop
20 20 from zmq.eventloop.zmqstream import ZMQStream
21 21
22 22 from IPython.config.configurable import Configurable
23 23 from IPython.core.error import StdinNotImplementedError
24 24 from IPython.core import release
25 25 from IPython.utils import py3compat
26 26 from IPython.utils.py3compat import unicode_type, string_types
27 27 from IPython.utils.jsonutil import json_clean
28 28 from IPython.utils.traitlets import (
29 29 Any, Instance, Float, Dict, List, Set, Integer, Unicode, Bool,
30 30 )
31 31
32 32 from .session import Session
33 33
34 34
35 35 #-----------------------------------------------------------------------------
36 36 # Main kernel class
37 37 #-----------------------------------------------------------------------------
38 38
39 39 class KernelBase(Configurable):
40 40
41 41 #---------------------------------------------------------------------------
42 42 # Kernel interface
43 43 #---------------------------------------------------------------------------
44 44
45 45 # attribute to override with a GUI
46 46 eventloop = Any(None)
47 47 def _eventloop_changed(self, name, old, new):
48 48 """schedule call to eventloop from IOLoop"""
49 49 loop = ioloop.IOLoop.instance()
50 50 loop.add_callback(self.enter_eventloop)
51 51
52 52 session = Instance(Session)
53 53 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
54 54 shell_streams = List()
55 55 control_stream = Instance(ZMQStream)
56 56 iopub_socket = Instance(zmq.Socket)
57 57 stdin_socket = Instance(zmq.Socket)
58 58 log = Instance(logging.Logger)
59 59
60 60 # identities:
61 61 int_id = Integer(-1)
62 62 ident = Unicode()
63 63
64 64 def _ident_default(self):
65 65 return unicode_type(uuid.uuid4())
66 66
67 67 # Private interface
68 68
69 69 _darwin_app_nap = Bool(True, config=True,
70 70 help="""Whether to use appnope for compatiblity with OS X App Nap.
71 71
72 72 Only affects OS X >= 10.9.
73 73 """
74 74 )
75 75
76 76 # track associations with current request
77 77 _allow_stdin = Bool(False)
78 78 _parent_header = Dict()
79 79 _parent_ident = Any(b'')
80 80 # Time to sleep after flushing the stdout/err buffers in each execute
81 81 # cycle. While this introduces a hard limit on the minimal latency of the
82 82 # execute cycle, it helps prevent output synchronization problems for
83 83 # clients.
84 84 # Units are in seconds. The minimum zmq latency on local host is probably
85 85 # ~150 microseconds, set this to 500us for now. We may need to increase it
86 86 # a little if it's not enough after more interactive testing.
87 87 _execute_sleep = Float(0.0005, config=True)
88 88
89 89 # Frequency of the kernel's event loop.
90 90 # Units are in seconds, kernel subclasses for GUI toolkits may need to
91 91 # adapt to milliseconds.
92 92 _poll_interval = Float(0.05, config=True)
93 93
94 94 # If the shutdown was requested over the network, we leave here the
95 95 # necessary reply message so it can be sent by our registered atexit
96 96 # handler. This ensures that the reply is only sent to clients truly at
97 97 # the end of our shutdown process (which happens after the underlying
98 98 # IPython shell's own shutdown).
99 99 _shutdown_message = None
100 100
101 101 # This is a dict of port number that the kernel is listening on. It is set
102 102 # by record_ports and used by connect_request.
103 103 _recorded_ports = Dict()
104 104
105 105 # set of aborted msg_ids
106 106 aborted = Set()
107 107
108 108 # Track execution count here. For IPython, we override this to use the
109 109 # execution count we store in the shell.
110 110 execution_count = 0
111 111
112 112
113 113 def __init__(self, **kwargs):
114 114 super(KernelBase, self).__init__(**kwargs)
115 115
116 116 # Build dict of handlers for message types
117 117 msg_types = [ 'execute_request', 'complete_request',
118 118 'inspect_request', 'history_request',
119 119 'kernel_info_request',
120 120 'connect_request', 'shutdown_request',
121 121 'apply_request',
122 122 ]
123 123 self.shell_handlers = {}
124 124 for msg_type in msg_types:
125 125 self.shell_handlers[msg_type] = getattr(self, msg_type)
126 126
127 127 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
128 128 self.control_handlers = {}
129 129 for msg_type in control_msg_types:
130 130 self.control_handlers[msg_type] = getattr(self, msg_type)
131 131
132 132
133 133 def dispatch_control(self, msg):
134 134 """dispatch control requests"""
135 135 idents,msg = self.session.feed_identities(msg, copy=False)
136 136 try:
137 137 msg = self.session.unserialize(msg, content=True, copy=False)
138 138 except:
139 139 self.log.error("Invalid Control Message", exc_info=True)
140 140 return
141 141
142 142 self.log.debug("Control received: %s", msg)
143 143
144 144 header = msg['header']
145 145 msg_type = header['msg_type']
146 146
147 147 handler = self.control_handlers.get(msg_type, None)
148 148 if handler is None:
149 149 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
150 150 else:
151 151 try:
152 152 handler(self.control_stream, idents, msg)
153 153 except Exception:
154 154 self.log.error("Exception in control handler:", exc_info=True)
155 155
156 156 def dispatch_shell(self, stream, msg):
157 157 """dispatch shell requests"""
158 158 # flush control requests first
159 159 if self.control_stream:
160 160 self.control_stream.flush()
161 161
162 162 idents,msg = self.session.feed_identities(msg, copy=False)
163 163 try:
164 164 msg = self.session.unserialize(msg, content=True, copy=False)
165 165 except:
166 166 self.log.error("Invalid Message", exc_info=True)
167 167 return
168 168
169 169 header = msg['header']
170 170 msg_id = header['msg_id']
171 171 msg_type = msg['header']['msg_type']
172 172
173 173 # Print some info about this message and leave a '--->' marker, so it's
174 174 # easier to trace visually the message chain when debugging. Each
175 175 # handler prints its message at the end.
176 176 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
177 177 self.log.debug(' Content: %s\n --->\n ', msg['content'])
178 178
179 179 if msg_id in self.aborted:
180 180 self.aborted.remove(msg_id)
181 181 # is it safe to assume a msg_id will not be resubmitted?
182 182 reply_type = msg_type.split('_')[0] + '_reply'
183 183 status = {'status' : 'aborted'}
184 184 md = {'engine' : self.ident}
185 185 md.update(status)
186 186 self.session.send(stream, reply_type, metadata=md,
187 187 content=status, parent=msg, ident=idents)
188 188 return
189 189
190 190 handler = self.shell_handlers.get(msg_type, None)
191 191 if handler is None:
192 192 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
193 193 else:
194 194 # ensure default_int_handler during handler call
195 195 sig = signal(SIGINT, default_int_handler)
196 196 self.log.debug("%s: %s", msg_type, msg)
197 197 try:
198 198 handler(stream, idents, msg)
199 199 except Exception:
200 200 self.log.error("Exception in message handler:", exc_info=True)
201 201 finally:
202 202 signal(SIGINT, sig)
203 203
204 204 def enter_eventloop(self):
205 205 """enter eventloop"""
206 206 self.log.info("entering eventloop %s", self.eventloop)
207 207 for stream in self.shell_streams:
208 208 # flush any pending replies,
209 209 # which may be skipped by entering the eventloop
210 210 stream.flush(zmq.POLLOUT)
211 211 # restore default_int_handler
212 212 signal(SIGINT, default_int_handler)
213 213 while self.eventloop is not None:
214 214 try:
215 215 self.eventloop(self)
216 216 except KeyboardInterrupt:
217 217 # Ctrl-C shouldn't crash the kernel
218 218 self.log.error("KeyboardInterrupt caught in kernel")
219 219 continue
220 220 else:
221 221 # eventloop exited cleanly, this means we should stop (right?)
222 222 self.eventloop = None
223 223 break
224 224 self.log.info("exiting eventloop")
225 225
226 226 def start(self):
227 227 """register dispatchers for streams"""
228 self.shell.exit_now = False
229 228 if self.control_stream:
230 229 self.control_stream.on_recv(self.dispatch_control, copy=False)
231 230
232 231 def make_dispatcher(stream):
233 232 def dispatcher(msg):
234 233 return self.dispatch_shell(stream, msg)
235 234 return dispatcher
236 235
237 236 for s in self.shell_streams:
238 237 s.on_recv(make_dispatcher(s), copy=False)
239 238
240 239 # publish idle status
241 240 self._publish_status('starting')
242 241
243 242 def do_one_iteration(self):
244 243 """step eventloop just once"""
245 244 if self.control_stream:
246 245 self.control_stream.flush()
247 246 for stream in self.shell_streams:
248 247 # handle at most one request per iteration
249 248 stream.flush(zmq.POLLIN, 1)
250 249 stream.flush(zmq.POLLOUT)
251 250
252 251
253 252 def record_ports(self, ports):
254 253 """Record the ports that this kernel is using.
255 254
256 255 The creator of the Kernel instance must call this methods if they
257 256 want the :meth:`connect_request` method to return the port numbers.
258 257 """
259 258 self._recorded_ports = ports
260 259
261 260 #---------------------------------------------------------------------------
262 261 # Kernel request handlers
263 262 #---------------------------------------------------------------------------
264 263
265 264 def _make_metadata(self, other=None):
266 265 """init metadata dict, for execute/apply_reply"""
267 266 new_md = {
268 267 'dependencies_met' : True,
269 268 'engine' : self.ident,
270 269 'started': datetime.now(),
271 270 }
272 271 if other:
273 272 new_md.update(other)
274 273 return new_md
275 274
276 275 def _publish_execute_input(self, code, parent, execution_count):
277 276 """Publish the code request on the iopub stream."""
278 277
279 278 self.session.send(self.iopub_socket, u'execute_input',
280 279 {u'code':code, u'execution_count': execution_count},
281 280 parent=parent, ident=self._topic('execute_input')
282 281 )
283 282
284 283 def _publish_status(self, status, parent=None):
285 284 """send status (busy/idle) on IOPub"""
286 285 self.session.send(self.iopub_socket,
287 286 u'status',
288 287 {u'execution_state': status},
289 288 parent=parent,
290 289 ident=self._topic('status'),
291 290 )
292 291
293 292 def set_parent(self, ident, parent):
294 293 """Set the current parent_header
295 294
296 295 Side effects (IOPub messages) and replies are associated with
297 296 the request that caused them via the parent_header.
298 297
299 298 The parent identity is used to route input_request messages
300 299 on the stdin channel.
301 300 """
302 301 self._parent_ident = ident
303 302 self._parent_header = parent
304 303
305 304 def send_response(self, stream, msg_or_type, content=None, ident=None,
306 305 buffers=None, track=False, header=None, metadata=None):
307 306 """Send a response to the message we're currently processing.
308 307
309 308 This accepts all the parameters of :meth:`IPython.kernel.zmq.session.Session.send`
310 309 except ``parent``.
311 310
312 311 This relies on :meth:`set_parent` having been called for the current
313 312 message.
314 313 """
315 314 return self.session.send(stream, msg_or_type, content, self._parent_header,
316 315 ident, buffers, track, header, metadata)
317 316
318 317 def execute_request(self, stream, ident, parent):
319 318 """handle an execute_request"""
320 319
321 320 self._publish_status(u'busy', parent)
322 321
323 322 try:
324 323 content = parent[u'content']
325 324 code = py3compat.cast_unicode_py2(content[u'code'])
326 325 silent = content[u'silent']
327 326 store_history = content.get(u'store_history', not silent)
328 327 user_expressions = content.get('user_expressions', {})
329 328 allow_stdin = content.get('allow_stdin', False)
330 329 except:
331 330 self.log.error("Got bad msg: ")
332 331 self.log.error("%s", parent)
333 332 return
334 333
335 334 md = self._make_metadata(parent['metadata'])
336 335
337 336 # Set the parent message of the display hook and out streams.
338 337 self.set_parent(ident, parent)
339 338
340 339 # Re-broadcast our input for the benefit of listening clients, and
341 340 # start computing output
342 341 if not silent:
343 342 self.execution_count += 1
344 343 self._publish_execute_input(code, parent, self.execution_count)
345 344
346 345 reply_content = self.do_execute(code, silent, store_history,
347 346 user_expressions, allow_stdin)
348 347
349 348 # Flush output before sending the reply.
350 349 sys.stdout.flush()
351 350 sys.stderr.flush()
352 351 # FIXME: on rare occasions, the flush doesn't seem to make it to the
353 352 # clients... This seems to mitigate the problem, but we definitely need
354 353 # to better understand what's going on.
355 354 if self._execute_sleep:
356 355 time.sleep(self._execute_sleep)
357 356
358 357 # Send the reply.
359 358 reply_content = json_clean(reply_content)
360 359
361 360 md['status'] = reply_content['status']
362 361 if reply_content['status'] == 'error' and \
363 362 reply_content['ename'] == 'UnmetDependency':
364 363 md['dependencies_met'] = False
365 364
366 365 reply_msg = self.session.send(stream, u'execute_reply',
367 366 reply_content, parent, metadata=md,
368 367 ident=ident)
369 368
370 369 self.log.debug("%s", reply_msg)
371 370
372 371 if not silent and reply_msg['content']['status'] == u'error':
373 372 self._abort_queues()
374 373
375 374 self._publish_status(u'idle', parent)
376 375
376 def do_execute(self, code, silent, store_history=True,
377 user_experssions=None, allow_stdin=False):
378 """Execute user code. Must be overridden by subclasses.
379 """
380 raise NotImplementedError
381
377 382 def complete_request(self, stream, ident, parent):
378 383 content = parent['content']
379 384 code = content['code']
380 385 cursor_pos = content['cursor_pos']
381 386
382 387 matches = self.do_complete(code, cursor_pos)
383 388 matches = json_clean(matches)
384 389 completion_msg = self.session.send(stream, 'complete_reply',
385 390 matches, parent, ident)
386 391 self.log.debug("%s", completion_msg)
387 392
393 def do_complete(self, code, cursor_pos):
394 """Override in subclasses to find completions.
395 """
396 return {'matches' : [],
397 'cursor_end' : cursor_pos,
398 'cursor_start' : cursor_pos,
399 'metadata' : {},
400 'status' : 'ok'}
401
388 402 def inspect_request(self, stream, ident, parent):
389 403 content = parent['content']
390 404
391 405 reply_content = self.do_inspect(content['code'], content['cursor_pos'],
392 406 content.get('detail_level', 0))
393 407 # Before we send this object over, we scrub it for JSON usage
394 408 reply_content = json_clean(reply_content)
395 409 msg = self.session.send(stream, 'inspect_reply',
396 410 reply_content, parent, ident)
397 411 self.log.debug("%s", msg)
398 412
413 def do_inspect(self, code, cursor_pos, detail_level=0):
414 """Override in subclasses to allow introspection.
415 """
416 return {'status': 'ok', 'data':{}, 'metadata':{}, 'found':False}
417
399 418 def history_request(self, stream, ident, parent):
400 419 # We need to pull these out, as passing **kwargs doesn't work with
401 420 # unicode keys before Python 2.6.5.
402 421 content = parent['content']
403 422
404 423 reply_content = self.do_history(content['hist_access_type'],
405 424 content['output'], content['raw'],
406 425 content.get('session', None),
407 426 content.get('start', None),
408 427 content.get('stop', None),
409 428 content.get('n', None),
410 429 content.get('pattern', None),
411 430 content.get('unique', False),
412 431 )
413 432
414 433 reply_content = json_clean(reply_content)
415 434 msg = self.session.send(stream, 'history_reply',
416 435 reply_content, parent, ident)
417 436 self.log.debug("%s", msg)
418 437
438 def do_history(self, hist_access_type, output, raw, session=None, start=None,
439 stop=None, n=None, pattern=None, unique=False):
440 """Override in subclasses to access history.
441 """
442 return {'history': []}
443
419 444 def connect_request(self, stream, ident, parent):
420 445 if self._recorded_ports is not None:
421 446 content = self._recorded_ports.copy()
422 447 else:
423 448 content = {}
424 449 msg = self.session.send(stream, 'connect_reply',
425 450 content, parent, ident)
426 451 self.log.debug("%s", msg)
427 452
428 453 @property
429 454 def kernel_info(self):
430 455 return {
431 456 'protocol_version': release.kernel_protocol_version,
432 457 'implementation': self.implementation,
433 458 'implementation_version': self.implementation_version,
434 459 'language': self.language,
435 460 'language_version': self.language_version,
436 461 'banner': self.banner,
437 462 }
438 463
439 464 def kernel_info_request(self, stream, ident, parent):
440 465 msg = self.session.send(stream, 'kernel_info_reply',
441 466 self.kernel_info, parent, ident)
442 467 self.log.debug("%s", msg)
443 468
444 469 def shutdown_request(self, stream, ident, parent):
445 470 content = self.do_shutdown(parent['content']['restart'])
446 471 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
447 472 # same content, but different msg_id for broadcasting on IOPub
448 473 self._shutdown_message = self.session.msg(u'shutdown_reply',
449 474 content, parent
450 475 )
451 476
452 477 self._at_shutdown()
453 478 # call sys.exit after a short delay
454 479 loop = ioloop.IOLoop.instance()
455 480 loop.add_timeout(time.time()+0.1, loop.stop)
456 481
482 def do_shutdown(self, restart):
483 """Override in subclasses to do things when the frontend shuts down the
484 kernel.
485 """
486 return {'status': 'ok', 'restart': restart}
487
457 488 #---------------------------------------------------------------------------
458 489 # Engine methods
459 490 #---------------------------------------------------------------------------
460 491
461 492 def apply_request(self, stream, ident, parent):
462 493 try:
463 494 content = parent[u'content']
464 495 bufs = parent[u'buffers']
465 496 msg_id = parent['header']['msg_id']
466 497 except:
467 498 self.log.error("Got bad msg: %s", parent, exc_info=True)
468 499 return
469 500
470 501 self._publish_status(u'busy', parent)
471 502
472 503 # Set the parent message of the display hook and out streams.
473 504 self.set_parent(ident, parent)
474 505
475 506 md = self._make_metadata(parent['metadata'])
476 507
477 508 reply_content, result_buf = self.do_apply(content, bufs, msg_id, md)
478 509
479 510 # put 'ok'/'error' status in header, for scheduler introspection:
480 511 md['status'] = reply_content['status']
481 512
482 513 # flush i/o
483 514 sys.stdout.flush()
484 515 sys.stderr.flush()
485 516
486 517 self.session.send(stream, u'apply_reply', reply_content,
487 518 parent=parent, ident=ident,buffers=result_buf, metadata=md)
488 519
489 520 self._publish_status(u'idle', parent)
490 521
522 def do_apply(self, content, bufs, msg_id, reply_metadata):
523 """Override in subclasses to support the IPython parallel framework.
524 """
525 raise NotImplementedError
526
491 527 #---------------------------------------------------------------------------
492 528 # Control messages
493 529 #---------------------------------------------------------------------------
494 530
495 531 def abort_request(self, stream, ident, parent):
496 532 """abort a specifig msg by id"""
497 533 msg_ids = parent['content'].get('msg_ids', None)
498 534 if isinstance(msg_ids, string_types):
499 535 msg_ids = [msg_ids]
500 536 if not msg_ids:
501 537 self.abort_queues()
502 538 for mid in msg_ids:
503 539 self.aborted.add(str(mid))
504 540
505 541 content = dict(status='ok')
506 542 reply_msg = self.session.send(stream, 'abort_reply', content=content,
507 543 parent=parent, ident=ident)
508 544 self.log.debug("%s", reply_msg)
509 545
510 546 def clear_request(self, stream, idents, parent):
511 547 """Clear our namespace."""
512 548 content = self.do_clear()
513 549 self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
514 550 content = content)
515 551
552 def do_clear(self):
553 """Override in subclasses to clear the namespace
554
555 This is only required for IPython.parallel.
556 """
557 raise NotImplementedError
558
516 559 #---------------------------------------------------------------------------
517 560 # Protected interface
518 561 #---------------------------------------------------------------------------
519 562
520 563 def _topic(self, topic):
521 564 """prefixed topic for IOPub messages"""
522 565 if self.int_id >= 0:
523 566 base = "engine.%i" % self.int_id
524 567 else:
525 568 base = "kernel.%s" % self.ident
526 569
527 570 return py3compat.cast_bytes("%s.%s" % (base, topic))
528 571
529 572 def _abort_queues(self):
530 573 for stream in self.shell_streams:
531 574 if stream:
532 575 self._abort_queue(stream)
533 576
534 577 def _abort_queue(self, stream):
535 578 poller = zmq.Poller()
536 579 poller.register(stream.socket, zmq.POLLIN)
537 580 while True:
538 581 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
539 582 if msg is None:
540 583 return
541 584
542 585 self.log.info("Aborting:")
543 586 self.log.info("%s", msg)
544 587 msg_type = msg['header']['msg_type']
545 588 reply_type = msg_type.split('_')[0] + '_reply'
546 589
547 590 status = {'status' : 'aborted'}
548 591 md = {'engine' : self.ident}
549 592 md.update(status)
550 593 reply_msg = self.session.send(stream, reply_type, metadata=md,
551 594 content=status, parent=msg, ident=idents)
552 595 self.log.debug("%s", reply_msg)
553 596 # We need to wait a bit for requests to come in. This can probably
554 597 # be set shorter for true asynchronous clients.
555 598 poller.poll(50)
556 599
557 600
558 601 def _no_raw_input(self):
559 602 """Raise StdinNotImplentedError if active frontend doesn't support
560 603 stdin."""
561 604 raise StdinNotImplementedError("raw_input was called, but this "
562 605 "frontend does not support stdin.")
563 606
564 607 def getpass(self, prompt=''):
565 608 """Forward getpass to frontends
566 609
567 610 Raises
568 611 ------
569 612 StdinNotImplentedError if active frontend doesn't support stdin.
570 613 """
571 614 if not self._allow_stdin:
572 615 raise StdinNotImplementedError(
573 616 "getpass was called, but this frontend does not support input requests."
574 617 )
575 618 return self._input_request(prompt,
576 619 self._parent_ident,
577 620 self._parent_header,
578 621 password=True,
579 622 )
580 623
581 624 def raw_input(self, prompt=''):
582 625 """Forward raw_input to frontends
583 626
584 627 Raises
585 628 ------
586 629 StdinNotImplentedError if active frontend doesn't support stdin.
587 630 """
588 631 if not self._allow_stdin:
589 632 raise StdinNotImplementedError(
590 633 "raw_input was called, but this frontend does not support input requests."
591 634 )
592 635 return self._input_request(prompt,
593 636 self._parent_ident,
594 637 self._parent_header,
595 638 password=False,
596 639 )
597 640
598 641 def _input_request(self, prompt, ident, parent, password=False):
599 642 # Flush output before making the request.
600 643 sys.stderr.flush()
601 644 sys.stdout.flush()
602 645 # flush the stdin socket, to purge stale replies
603 646 while True:
604 647 try:
605 648 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
606 649 except zmq.ZMQError as e:
607 650 if e.errno == zmq.EAGAIN:
608 651 break
609 652 else:
610 653 raise
611 654
612 655 # Send the input request.
613 656 content = json_clean(dict(prompt=prompt, password=password))
614 657 self.session.send(self.stdin_socket, u'input_request', content, parent,
615 658 ident=ident)
616 659
617 660 # Await a response.
618 661 while True:
619 662 try:
620 663 ident, reply = self.session.recv(self.stdin_socket, 0)
621 664 except Exception:
622 665 self.log.warn("Invalid Message:", exc_info=True)
623 666 except KeyboardInterrupt:
624 667 # re-raise KeyboardInterrupt, to truncate traceback
625 668 raise KeyboardInterrupt
626 669 else:
627 670 break
628 671 try:
629 672 value = py3compat.unicode_to_str(reply['content']['value'])
630 673 except:
631 674 self.log.error("Bad input_reply: %s", parent)
632 675 value = ''
633 676 if value == '\x04':
634 677 # EOF
635 678 raise EOFError
636 679 return value
637 680
638 681 def _at_shutdown(self):
639 682 """Actions taken at shutdown by the kernel, called by python's atexit.
640 683 """
641 684 # io.rprint("Kernel at_shutdown") # dbg
642 685 if self._shutdown_message is not None:
643 686 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
644 687 self.log.debug("%s", self._shutdown_message)
645 688 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
646 689
General Comments 0
You need to be logged in to leave comments. Login now