##// END OF EJS Templates
Stub out kernel methods to allow overriding a subset of them.
Thomas Kluyver -
Show More
@@ -1,286 +1,290 b''
1 """The IPython kernel implementation"""
1 """The IPython kernel implementation"""
2
2
3 import getpass
3 import getpass
4 import sys
4 import sys
5 import traceback
5 import traceback
6
6
7 from IPython.core import release
7 from IPython.core import release
8 from IPython.utils.py3compat import builtin_mod, PY3
8 from IPython.utils.py3compat import builtin_mod, PY3
9 from IPython.utils.tokenutil import token_at_cursor
9 from IPython.utils.tokenutil import token_at_cursor
10 from IPython.utils.traitlets import Instance, Type, Any
10 from IPython.utils.traitlets import Instance, Type, Any
11
11
12 from .kernelbase import KernelBase
12 from .kernelbase import KernelBase
13 from .serialize import serialize_object, unpack_apply_message
13 from .serialize import serialize_object, unpack_apply_message
14 from .zmqshell import ZMQInteractiveShell
14 from .zmqshell import ZMQInteractiveShell
15
15
16 class Kernel(KernelBase):
16 class Kernel(KernelBase):
17 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
17 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
18 shell_class = Type(ZMQInteractiveShell)
18 shell_class = Type(ZMQInteractiveShell)
19
19
20 user_module = Any()
20 user_module = Any()
21 def _user_module_changed(self, name, old, new):
21 def _user_module_changed(self, name, old, new):
22 if self.shell is not None:
22 if self.shell is not None:
23 self.shell.user_module = new
23 self.shell.user_module = new
24
24
25 user_ns = Instance(dict, args=None, allow_none=True)
25 user_ns = Instance(dict, args=None, allow_none=True)
26 def _user_ns_changed(self, name, old, new):
26 def _user_ns_changed(self, name, old, new):
27 if self.shell is not None:
27 if self.shell is not None:
28 self.shell.user_ns = new
28 self.shell.user_ns = new
29 self.shell.init_user_ns()
29 self.shell.init_user_ns()
30
30
31 # A reference to the Python builtin 'raw_input' function.
31 # A reference to the Python builtin 'raw_input' function.
32 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
32 # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3)
33 _sys_raw_input = Any()
33 _sys_raw_input = Any()
34 _sys_eval_input = Any()
34 _sys_eval_input = Any()
35
35
36 def __init__(self, **kwargs):
36 def __init__(self, **kwargs):
37 super(Kernel, self).__init__(**kwargs)
37 super(Kernel, self).__init__(**kwargs)
38
38
39 # Initialize the InteractiveShell subclass
39 # Initialize the InteractiveShell subclass
40 self.shell = self.shell_class.instance(parent=self,
40 self.shell = self.shell_class.instance(parent=self,
41 profile_dir = self.profile_dir,
41 profile_dir = self.profile_dir,
42 user_module = self.user_module,
42 user_module = self.user_module,
43 user_ns = self.user_ns,
43 user_ns = self.user_ns,
44 kernel = self,
44 kernel = self,
45 )
45 )
46 self.shell.displayhook.session = self.session
46 self.shell.displayhook.session = self.session
47 self.shell.displayhook.pub_socket = self.iopub_socket
47 self.shell.displayhook.pub_socket = self.iopub_socket
48 self.shell.displayhook.topic = self._topic('execute_result')
48 self.shell.displayhook.topic = self._topic('execute_result')
49 self.shell.display_pub.session = self.session
49 self.shell.display_pub.session = self.session
50 self.shell.display_pub.pub_socket = self.iopub_socket
50 self.shell.display_pub.pub_socket = self.iopub_socket
51 self.shell.data_pub.session = self.session
51 self.shell.data_pub.session = self.session
52 self.shell.data_pub.pub_socket = self.iopub_socket
52 self.shell.data_pub.pub_socket = self.iopub_socket
53
53
54 # TMP - hack while developing
54 # TMP - hack while developing
55 self.shell._reply_content = None
55 self.shell._reply_content = None
56
56
57 comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]
57 comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ]
58 comm_manager = self.shell.comm_manager
58 comm_manager = self.shell.comm_manager
59 for msg_type in comm_msg_types:
59 for msg_type in comm_msg_types:
60 self.shell_handlers[msg_type] = getattr(comm_manager, msg_type)
60 self.shell_handlers[msg_type] = getattr(comm_manager, msg_type)
61
61
62 # Kernel info fields
62 # Kernel info fields
63 implementation = 'ipython'
63 implementation = 'ipython'
64 implementation_version = release.version
64 implementation_version = release.version
65 language = 'python'
65 language = 'python'
66 language_version = sys.version.split()[0]
66 language_version = sys.version.split()[0]
67 @property
67 @property
68 def banner(self):
68 def banner(self):
69 return self.shell.banner
69 return self.shell.banner
70
70
71 def start(self):
72 self.shell.exit_now = False
73 super(Kernel, self).start()
74
71 def set_parent(self, ident, parent):
75 def set_parent(self, ident, parent):
72 """Overridden from parent to tell the display hook and output streams
76 """Overridden from parent to tell the display hook and output streams
73 about the parent message.
77 about the parent message.
74 """
78 """
75 super(Kernel, self).set_parent(ident, parent)
79 super(Kernel, self).set_parent(ident, parent)
76 self.shell.set_parent(parent)
80 self.shell.set_parent(parent)
77
81
78 def _forward_input(self, allow_stdin=False):
82 def _forward_input(self, allow_stdin=False):
79 """Forward raw_input and getpass to the current frontend.
83 """Forward raw_input and getpass to the current frontend.
80
84
81 via input_request
85 via input_request
82 """
86 """
83 self._allow_stdin = allow_stdin
87 self._allow_stdin = allow_stdin
84
88
85 if PY3:
89 if PY3:
86 self._sys_raw_input = builtin_mod.input
90 self._sys_raw_input = builtin_mod.input
87 builtin_mod.input = self.raw_input
91 builtin_mod.input = self.raw_input
88 else:
92 else:
89 self._sys_raw_input = builtin_mod.raw_input
93 self._sys_raw_input = builtin_mod.raw_input
90 self._sys_eval_input = builtin_mod.input
94 self._sys_eval_input = builtin_mod.input
91 builtin_mod.raw_input = self.raw_input
95 builtin_mod.raw_input = self.raw_input
92 builtin_mod.input = lambda prompt='': eval(self.raw_input(prompt))
96 builtin_mod.input = lambda prompt='': eval(self.raw_input(prompt))
93 self._save_getpass = getpass.getpass
97 self._save_getpass = getpass.getpass
94 getpass.getpass = self.getpass
98 getpass.getpass = self.getpass
95
99
96 def _restore_input(self):
100 def _restore_input(self):
97 """Restore raw_input, getpass"""
101 """Restore raw_input, getpass"""
98 if PY3:
102 if PY3:
99 builtin_mod.input = self._sys_raw_input
103 builtin_mod.input = self._sys_raw_input
100 else:
104 else:
101 builtin_mod.raw_input = self._sys_raw_input
105 builtin_mod.raw_input = self._sys_raw_input
102 builtin_mod.input = self._sys_eval_input
106 builtin_mod.input = self._sys_eval_input
103
107
104 getpass.getpass = self._save_getpass
108 getpass.getpass = self._save_getpass
105
109
106 @property
110 @property
107 def execution_count(self):
111 def execution_count(self):
108 return self.shell.execution_count
112 return self.shell.execution_count
109
113
110 @execution_count.setter
114 @execution_count.setter
111 def execution_count(self, value):
115 def execution_count(self, value):
112 # Ignore the incrememnting done by KernelBase, in favour of our shell's
116 # Ignore the incrememnting done by KernelBase, in favour of our shell's
113 # execution counter.
117 # execution counter.
114 pass
118 pass
115
119
116 def do_execute(self, code, silent, store_history=True,
120 def do_execute(self, code, silent, store_history=True,
117 user_expressions=None, allow_stdin=False):
121 user_expressions=None, allow_stdin=False):
118 shell = self.shell # we'll need this a lot here
122 shell = self.shell # we'll need this a lot here
119
123
120 self._forward_input(allow_stdin)
124 self._forward_input(allow_stdin)
121
125
122 reply_content = {}
126 reply_content = {}
123 # FIXME: the shell calls the exception handler itself.
127 # FIXME: the shell calls the exception handler itself.
124 shell._reply_content = None
128 shell._reply_content = None
125 try:
129 try:
126 shell.run_cell(code, store_history=store_history, silent=silent)
130 shell.run_cell(code, store_history=store_history, silent=silent)
127 except:
131 except:
128 status = u'error'
132 status = u'error'
129 # FIXME: this code right now isn't being used yet by default,
133 # FIXME: this code right now isn't being used yet by default,
130 # because the run_cell() call above directly fires off exception
134 # because the run_cell() call above directly fires off exception
131 # reporting. This code, therefore, is only active in the scenario
135 # reporting. This code, therefore, is only active in the scenario
132 # where runlines itself has an unhandled exception. We need to
136 # where runlines itself has an unhandled exception. We need to
133 # uniformize this, for all exception construction to come from a
137 # uniformize this, for all exception construction to come from a
134 # single location in the codbase.
138 # single location in the codbase.
135 etype, evalue, tb = sys.exc_info()
139 etype, evalue, tb = sys.exc_info()
136 tb_list = traceback.format_exception(etype, evalue, tb)
140 tb_list = traceback.format_exception(etype, evalue, tb)
137 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
141 reply_content.update(shell._showtraceback(etype, evalue, tb_list))
138 else:
142 else:
139 status = u'ok'
143 status = u'ok'
140 finally:
144 finally:
141 self._restore_input()
145 self._restore_input()
142
146
143 reply_content[u'status'] = status
147 reply_content[u'status'] = status
144
148
145 # Return the execution counter so clients can display prompts
149 # Return the execution counter so clients can display prompts
146 reply_content['execution_count'] = shell.execution_count - 1
150 reply_content['execution_count'] = shell.execution_count - 1
147
151
148 # FIXME - fish exception info out of shell, possibly left there by
152 # FIXME - fish exception info out of shell, possibly left there by
149 # runlines. We'll need to clean up this logic later.
153 # runlines. We'll need to clean up this logic later.
150 if shell._reply_content is not None:
154 if shell._reply_content is not None:
151 reply_content.update(shell._reply_content)
155 reply_content.update(shell._reply_content)
152 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
156 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute')
153 reply_content['engine_info'] = e_info
157 reply_content['engine_info'] = e_info
154 # reset after use
158 # reset after use
155 shell._reply_content = None
159 shell._reply_content = None
156
160
157 if 'traceback' in reply_content:
161 if 'traceback' in reply_content:
158 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
162 self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback']))
159
163
160
164
161 # At this point, we can tell whether the main code execution succeeded
165 # At this point, we can tell whether the main code execution succeeded
162 # or not. If it did, we proceed to evaluate user_expressions
166 # or not. If it did, we proceed to evaluate user_expressions
163 if reply_content['status'] == 'ok':
167 if reply_content['status'] == 'ok':
164 reply_content[u'user_expressions'] = \
168 reply_content[u'user_expressions'] = \
165 shell.user_expressions(user_expressions or {})
169 shell.user_expressions(user_expressions or {})
166 else:
170 else:
167 # If there was an error, don't even try to compute expressions
171 # If there was an error, don't even try to compute expressions
168 reply_content[u'user_expressions'] = {}
172 reply_content[u'user_expressions'] = {}
169
173
170 # Payloads should be retrieved regardless of outcome, so we can both
174 # Payloads should be retrieved regardless of outcome, so we can both
171 # recover partial output (that could have been generated early in a
175 # recover partial output (that could have been generated early in a
172 # block, before an error) and clear the payload system always.
176 # block, before an error) and clear the payload system always.
173 reply_content[u'payload'] = shell.payload_manager.read_payload()
177 reply_content[u'payload'] = shell.payload_manager.read_payload()
174 # Be agressive about clearing the payload because we don't want
178 # Be agressive about clearing the payload because we don't want
175 # it to sit in memory until the next execute_request comes in.
179 # it to sit in memory until the next execute_request comes in.
176 shell.payload_manager.clear_payload()
180 shell.payload_manager.clear_payload()
177
181
178 return reply_content
182 return reply_content
179
183
180 def do_complete(self, code, cursor_pos):
184 def do_complete(self, code, cursor_pos):
181 txt, matches = self.shell.complete('', code, cursor_pos)
185 txt, matches = self.shell.complete('', code, cursor_pos)
182 return {'matches' : matches,
186 return {'matches' : matches,
183 'cursor_end' : cursor_pos,
187 'cursor_end' : cursor_pos,
184 'cursor_start' : cursor_pos - len(txt),
188 'cursor_start' : cursor_pos - len(txt),
185 'metadata' : {},
189 'metadata' : {},
186 'status' : 'ok'}
190 'status' : 'ok'}
187
191
188 def do_inspect(self, code, cursor_pos, detail_level=0):
192 def do_inspect(self, code, cursor_pos, detail_level=0):
189 name = token_at_cursor(code, cursor_pos)
193 name = token_at_cursor(code, cursor_pos)
190 info = self.shell.object_inspect(name)
194 info = self.shell.object_inspect(name)
191
195
192 reply_content = {'status' : 'ok'}
196 reply_content = {'status' : 'ok'}
193 reply_content['data'] = data = {}
197 reply_content['data'] = data = {}
194 reply_content['metadata'] = {}
198 reply_content['metadata'] = {}
195 reply_content['found'] = info['found']
199 reply_content['found'] = info['found']
196 if info['found']:
200 if info['found']:
197 info_text = self.shell.object_inspect_text(
201 info_text = self.shell.object_inspect_text(
198 name,
202 name,
199 detail_level=detail_level,
203 detail_level=detail_level,
200 )
204 )
201 data['text/plain'] = info_text
205 data['text/plain'] = info_text
202
206
203 return reply_content
207 return reply_content
204
208
205 def do_history(self, hist_access_type, output, raw, session=None, start=None,
209 def do_history(self, hist_access_type, output, raw, session=None, start=None,
206 stop=None, n=None, pattern=None, unique=False):
210 stop=None, n=None, pattern=None, unique=False):
207 if hist_access_type == 'tail':
211 if hist_access_type == 'tail':
208 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
212 hist = self.shell.history_manager.get_tail(n, raw=raw, output=output,
209 include_latest=True)
213 include_latest=True)
210
214
211 elif hist_access_type == 'range':
215 elif hist_access_type == 'range':
212 hist = self.shell.history_manager.get_range(session, start, stop,
216 hist = self.shell.history_manager.get_range(session, start, stop,
213 raw=raw, output=output)
217 raw=raw, output=output)
214
218
215 elif hist_access_type == 'search':
219 elif hist_access_type == 'search':
216 hist = self.shell.history_manager.search(
220 hist = self.shell.history_manager.search(
217 pattern, raw=raw, output=output, n=n, unique=unique)
221 pattern, raw=raw, output=output, n=n, unique=unique)
218 else:
222 else:
219 hist = []
223 hist = []
220
224
221 return {'history' : list(hist)}
225 return {'history' : list(hist)}
222
226
223 def do_shutdown(self, restart):
227 def do_shutdown(self, restart):
224 self.shell.exit_now = True
228 self.shell.exit_now = True
225 return dict(status='ok', restart=restart)
229 return dict(status='ok', restart=restart)
226
230
227 def do_apply(self, content, bufs, msg_id, reply_metadata):
231 def do_apply(self, content, bufs, msg_id, reply_metadata):
228 shell = self.shell
232 shell = self.shell
229 try:
233 try:
230 working = shell.user_ns
234 working = shell.user_ns
231
235
232 prefix = "_"+str(msg_id).replace("-","")+"_"
236 prefix = "_"+str(msg_id).replace("-","")+"_"
233
237
234 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
238 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
235
239
236 fname = getattr(f, '__name__', 'f')
240 fname = getattr(f, '__name__', 'f')
237
241
238 fname = prefix+"f"
242 fname = prefix+"f"
239 argname = prefix+"args"
243 argname = prefix+"args"
240 kwargname = prefix+"kwargs"
244 kwargname = prefix+"kwargs"
241 resultname = prefix+"result"
245 resultname = prefix+"result"
242
246
243 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
247 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
244 # print ns
248 # print ns
245 working.update(ns)
249 working.update(ns)
246 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
250 code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
247 try:
251 try:
248 exec(code, shell.user_global_ns, shell.user_ns)
252 exec(code, shell.user_global_ns, shell.user_ns)
249 result = working.get(resultname)
253 result = working.get(resultname)
250 finally:
254 finally:
251 for key in ns:
255 for key in ns:
252 working.pop(key)
256 working.pop(key)
253
257
254 result_buf = serialize_object(result,
258 result_buf = serialize_object(result,
255 buffer_threshold=self.session.buffer_threshold,
259 buffer_threshold=self.session.buffer_threshold,
256 item_threshold=self.session.item_threshold,
260 item_threshold=self.session.item_threshold,
257 )
261 )
258
262
259 except:
263 except:
260 # invoke IPython traceback formatting
264 # invoke IPython traceback formatting
261 shell.showtraceback()
265 shell.showtraceback()
262 # FIXME - fish exception info out of shell, possibly left there by
266 # FIXME - fish exception info out of shell, possibly left there by
263 # run_code. We'll need to clean up this logic later.
267 # run_code. We'll need to clean up this logic later.
264 reply_content = {}
268 reply_content = {}
265 if shell._reply_content is not None:
269 if shell._reply_content is not None:
266 reply_content.update(shell._reply_content)
270 reply_content.update(shell._reply_content)
267 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
271 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply')
268 reply_content['engine_info'] = e_info
272 reply_content['engine_info'] = e_info
269 # reset after use
273 # reset after use
270 shell._reply_content = None
274 shell._reply_content = None
271
275
272 self.send_response(self.iopub_socket, u'error', reply_content,
276 self.send_response(self.iopub_socket, u'error', reply_content,
273 ident=self._topic('error'))
277 ident=self._topic('error'))
274 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
278 self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback']))
275 result_buf = []
279 result_buf = []
276
280
277 if reply_content['ename'] == 'UnmetDependency':
281 if reply_content['ename'] == 'UnmetDependency':
278 reply_metadata['dependencies_met'] = False
282 reply_metadata['dependencies_met'] = False
279 else:
283 else:
280 reply_content = {'status' : 'ok'}
284 reply_content = {'status' : 'ok'}
281
285
282 return reply_content, result_buf
286 return reply_content, result_buf
283
287
284 def do_clear(self):
288 def do_clear(self):
285 self.shell.reset(False)
289 self.shell.reset(False)
286 return dict(status='ok')
290 return dict(status='ok')
@@ -1,646 +1,689 b''
1 """Base class for a kernel that talks to frontends over 0MQ."""
1 """Base class for a kernel that talks to frontends over 0MQ."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from __future__ import print_function
6 from __future__ import print_function
7
7
8 import sys
8 import sys
9 import time
9 import time
10 import logging
10 import logging
11 import uuid
11 import uuid
12
12
13 from datetime import datetime
13 from datetime import datetime
14 from signal import (
14 from signal import (
15 signal, default_int_handler, SIGINT
15 signal, default_int_handler, SIGINT
16 )
16 )
17
17
18 import zmq
18 import zmq
19 from zmq.eventloop import ioloop
19 from zmq.eventloop import ioloop
20 from zmq.eventloop.zmqstream import ZMQStream
20 from zmq.eventloop.zmqstream import ZMQStream
21
21
22 from IPython.config.configurable import Configurable
22 from IPython.config.configurable import Configurable
23 from IPython.core.error import StdinNotImplementedError
23 from IPython.core.error import StdinNotImplementedError
24 from IPython.core import release
24 from IPython.core import release
25 from IPython.utils import py3compat
25 from IPython.utils import py3compat
26 from IPython.utils.py3compat import unicode_type, string_types
26 from IPython.utils.py3compat import unicode_type, string_types
27 from IPython.utils.jsonutil import json_clean
27 from IPython.utils.jsonutil import json_clean
28 from IPython.utils.traitlets import (
28 from IPython.utils.traitlets import (
29 Any, Instance, Float, Dict, List, Set, Integer, Unicode, Bool,
29 Any, Instance, Float, Dict, List, Set, Integer, Unicode, Bool,
30 )
30 )
31
31
32 from .session import Session
32 from .session import Session
33
33
34
34
35 #-----------------------------------------------------------------------------
35 #-----------------------------------------------------------------------------
36 # Main kernel class
36 # Main kernel class
37 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
38
38
39 class KernelBase(Configurable):
39 class KernelBase(Configurable):
40
40
41 #---------------------------------------------------------------------------
41 #---------------------------------------------------------------------------
42 # Kernel interface
42 # Kernel interface
43 #---------------------------------------------------------------------------
43 #---------------------------------------------------------------------------
44
44
45 # attribute to override with a GUI
45 # attribute to override with a GUI
46 eventloop = Any(None)
46 eventloop = Any(None)
47 def _eventloop_changed(self, name, old, new):
47 def _eventloop_changed(self, name, old, new):
48 """schedule call to eventloop from IOLoop"""
48 """schedule call to eventloop from IOLoop"""
49 loop = ioloop.IOLoop.instance()
49 loop = ioloop.IOLoop.instance()
50 loop.add_callback(self.enter_eventloop)
50 loop.add_callback(self.enter_eventloop)
51
51
52 session = Instance(Session)
52 session = Instance(Session)
53 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
53 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
54 shell_streams = List()
54 shell_streams = List()
55 control_stream = Instance(ZMQStream)
55 control_stream = Instance(ZMQStream)
56 iopub_socket = Instance(zmq.Socket)
56 iopub_socket = Instance(zmq.Socket)
57 stdin_socket = Instance(zmq.Socket)
57 stdin_socket = Instance(zmq.Socket)
58 log = Instance(logging.Logger)
58 log = Instance(logging.Logger)
59
59
60 # identities:
60 # identities:
61 int_id = Integer(-1)
61 int_id = Integer(-1)
62 ident = Unicode()
62 ident = Unicode()
63
63
64 def _ident_default(self):
64 def _ident_default(self):
65 return unicode_type(uuid.uuid4())
65 return unicode_type(uuid.uuid4())
66
66
67 # Private interface
67 # Private interface
68
68
69 _darwin_app_nap = Bool(True, config=True,
69 _darwin_app_nap = Bool(True, config=True,
70 help="""Whether to use appnope for compatiblity with OS X App Nap.
70 help="""Whether to use appnope for compatiblity with OS X App Nap.
71
71
72 Only affects OS X >= 10.9.
72 Only affects OS X >= 10.9.
73 """
73 """
74 )
74 )
75
75
76 # track associations with current request
76 # track associations with current request
77 _allow_stdin = Bool(False)
77 _allow_stdin = Bool(False)
78 _parent_header = Dict()
78 _parent_header = Dict()
79 _parent_ident = Any(b'')
79 _parent_ident = Any(b'')
80 # Time to sleep after flushing the stdout/err buffers in each execute
80 # Time to sleep after flushing the stdout/err buffers in each execute
81 # cycle. While this introduces a hard limit on the minimal latency of the
81 # cycle. While this introduces a hard limit on the minimal latency of the
82 # execute cycle, it helps prevent output synchronization problems for
82 # execute cycle, it helps prevent output synchronization problems for
83 # clients.
83 # clients.
84 # Units are in seconds. The minimum zmq latency on local host is probably
84 # Units are in seconds. The minimum zmq latency on local host is probably
85 # ~150 microseconds, set this to 500us for now. We may need to increase it
85 # ~150 microseconds, set this to 500us for now. We may need to increase it
86 # a little if it's not enough after more interactive testing.
86 # a little if it's not enough after more interactive testing.
87 _execute_sleep = Float(0.0005, config=True)
87 _execute_sleep = Float(0.0005, config=True)
88
88
89 # Frequency of the kernel's event loop.
89 # Frequency of the kernel's event loop.
90 # Units are in seconds, kernel subclasses for GUI toolkits may need to
90 # Units are in seconds, kernel subclasses for GUI toolkits may need to
91 # adapt to milliseconds.
91 # adapt to milliseconds.
92 _poll_interval = Float(0.05, config=True)
92 _poll_interval = Float(0.05, config=True)
93
93
94 # If the shutdown was requested over the network, we leave here the
94 # If the shutdown was requested over the network, we leave here the
95 # necessary reply message so it can be sent by our registered atexit
95 # necessary reply message so it can be sent by our registered atexit
96 # handler. This ensures that the reply is only sent to clients truly at
96 # handler. This ensures that the reply is only sent to clients truly at
97 # the end of our shutdown process (which happens after the underlying
97 # the end of our shutdown process (which happens after the underlying
98 # IPython shell's own shutdown).
98 # IPython shell's own shutdown).
99 _shutdown_message = None
99 _shutdown_message = None
100
100
101 # This is a dict of port number that the kernel is listening on. It is set
101 # This is a dict of port number that the kernel is listening on. It is set
102 # by record_ports and used by connect_request.
102 # by record_ports and used by connect_request.
103 _recorded_ports = Dict()
103 _recorded_ports = Dict()
104
104
105 # set of aborted msg_ids
105 # set of aborted msg_ids
106 aborted = Set()
106 aborted = Set()
107
107
108 # Track execution count here. For IPython, we override this to use the
108 # Track execution count here. For IPython, we override this to use the
109 # execution count we store in the shell.
109 # execution count we store in the shell.
110 execution_count = 0
110 execution_count = 0
111
111
112
112
113 def __init__(self, **kwargs):
113 def __init__(self, **kwargs):
114 super(KernelBase, self).__init__(**kwargs)
114 super(KernelBase, self).__init__(**kwargs)
115
115
116 # Build dict of handlers for message types
116 # Build dict of handlers for message types
117 msg_types = [ 'execute_request', 'complete_request',
117 msg_types = [ 'execute_request', 'complete_request',
118 'inspect_request', 'history_request',
118 'inspect_request', 'history_request',
119 'kernel_info_request',
119 'kernel_info_request',
120 'connect_request', 'shutdown_request',
120 'connect_request', 'shutdown_request',
121 'apply_request',
121 'apply_request',
122 ]
122 ]
123 self.shell_handlers = {}
123 self.shell_handlers = {}
124 for msg_type in msg_types:
124 for msg_type in msg_types:
125 self.shell_handlers[msg_type] = getattr(self, msg_type)
125 self.shell_handlers[msg_type] = getattr(self, msg_type)
126
126
127 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
127 control_msg_types = msg_types + [ 'clear_request', 'abort_request' ]
128 self.control_handlers = {}
128 self.control_handlers = {}
129 for msg_type in control_msg_types:
129 for msg_type in control_msg_types:
130 self.control_handlers[msg_type] = getattr(self, msg_type)
130 self.control_handlers[msg_type] = getattr(self, msg_type)
131
131
132
132
133 def dispatch_control(self, msg):
133 def dispatch_control(self, msg):
134 """dispatch control requests"""
134 """dispatch control requests"""
135 idents,msg = self.session.feed_identities(msg, copy=False)
135 idents,msg = self.session.feed_identities(msg, copy=False)
136 try:
136 try:
137 msg = self.session.unserialize(msg, content=True, copy=False)
137 msg = self.session.unserialize(msg, content=True, copy=False)
138 except:
138 except:
139 self.log.error("Invalid Control Message", exc_info=True)
139 self.log.error("Invalid Control Message", exc_info=True)
140 return
140 return
141
141
142 self.log.debug("Control received: %s", msg)
142 self.log.debug("Control received: %s", msg)
143
143
144 header = msg['header']
144 header = msg['header']
145 msg_type = header['msg_type']
145 msg_type = header['msg_type']
146
146
147 handler = self.control_handlers.get(msg_type, None)
147 handler = self.control_handlers.get(msg_type, None)
148 if handler is None:
148 if handler is None:
149 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
149 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type)
150 else:
150 else:
151 try:
151 try:
152 handler(self.control_stream, idents, msg)
152 handler(self.control_stream, idents, msg)
153 except Exception:
153 except Exception:
154 self.log.error("Exception in control handler:", exc_info=True)
154 self.log.error("Exception in control handler:", exc_info=True)
155
155
156 def dispatch_shell(self, stream, msg):
156 def dispatch_shell(self, stream, msg):
157 """dispatch shell requests"""
157 """dispatch shell requests"""
158 # flush control requests first
158 # flush control requests first
159 if self.control_stream:
159 if self.control_stream:
160 self.control_stream.flush()
160 self.control_stream.flush()
161
161
162 idents,msg = self.session.feed_identities(msg, copy=False)
162 idents,msg = self.session.feed_identities(msg, copy=False)
163 try:
163 try:
164 msg = self.session.unserialize(msg, content=True, copy=False)
164 msg = self.session.unserialize(msg, content=True, copy=False)
165 except:
165 except:
166 self.log.error("Invalid Message", exc_info=True)
166 self.log.error("Invalid Message", exc_info=True)
167 return
167 return
168
168
169 header = msg['header']
169 header = msg['header']
170 msg_id = header['msg_id']
170 msg_id = header['msg_id']
171 msg_type = msg['header']['msg_type']
171 msg_type = msg['header']['msg_type']
172
172
173 # Print some info about this message and leave a '--->' marker, so it's
173 # Print some info about this message and leave a '--->' marker, so it's
174 # easier to trace visually the message chain when debugging. Each
174 # easier to trace visually the message chain when debugging. Each
175 # handler prints its message at the end.
175 # handler prints its message at the end.
176 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
176 self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type)
177 self.log.debug(' Content: %s\n --->\n ', msg['content'])
177 self.log.debug(' Content: %s\n --->\n ', msg['content'])
178
178
179 if msg_id in self.aborted:
179 if msg_id in self.aborted:
180 self.aborted.remove(msg_id)
180 self.aborted.remove(msg_id)
181 # is it safe to assume a msg_id will not be resubmitted?
181 # is it safe to assume a msg_id will not be resubmitted?
182 reply_type = msg_type.split('_')[0] + '_reply'
182 reply_type = msg_type.split('_')[0] + '_reply'
183 status = {'status' : 'aborted'}
183 status = {'status' : 'aborted'}
184 md = {'engine' : self.ident}
184 md = {'engine' : self.ident}
185 md.update(status)
185 md.update(status)
186 self.session.send(stream, reply_type, metadata=md,
186 self.session.send(stream, reply_type, metadata=md,
187 content=status, parent=msg, ident=idents)
187 content=status, parent=msg, ident=idents)
188 return
188 return
189
189
190 handler = self.shell_handlers.get(msg_type, None)
190 handler = self.shell_handlers.get(msg_type, None)
191 if handler is None:
191 if handler is None:
192 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
192 self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type)
193 else:
193 else:
194 # ensure default_int_handler during handler call
194 # ensure default_int_handler during handler call
195 sig = signal(SIGINT, default_int_handler)
195 sig = signal(SIGINT, default_int_handler)
196 self.log.debug("%s: %s", msg_type, msg)
196 self.log.debug("%s: %s", msg_type, msg)
197 try:
197 try:
198 handler(stream, idents, msg)
198 handler(stream, idents, msg)
199 except Exception:
199 except Exception:
200 self.log.error("Exception in message handler:", exc_info=True)
200 self.log.error("Exception in message handler:", exc_info=True)
201 finally:
201 finally:
202 signal(SIGINT, sig)
202 signal(SIGINT, sig)
203
203
204 def enter_eventloop(self):
204 def enter_eventloop(self):
205 """enter eventloop"""
205 """enter eventloop"""
206 self.log.info("entering eventloop %s", self.eventloop)
206 self.log.info("entering eventloop %s", self.eventloop)
207 for stream in self.shell_streams:
207 for stream in self.shell_streams:
208 # flush any pending replies,
208 # flush any pending replies,
209 # which may be skipped by entering the eventloop
209 # which may be skipped by entering the eventloop
210 stream.flush(zmq.POLLOUT)
210 stream.flush(zmq.POLLOUT)
211 # restore default_int_handler
211 # restore default_int_handler
212 signal(SIGINT, default_int_handler)
212 signal(SIGINT, default_int_handler)
213 while self.eventloop is not None:
213 while self.eventloop is not None:
214 try:
214 try:
215 self.eventloop(self)
215 self.eventloop(self)
216 except KeyboardInterrupt:
216 except KeyboardInterrupt:
217 # Ctrl-C shouldn't crash the kernel
217 # Ctrl-C shouldn't crash the kernel
218 self.log.error("KeyboardInterrupt caught in kernel")
218 self.log.error("KeyboardInterrupt caught in kernel")
219 continue
219 continue
220 else:
220 else:
221 # eventloop exited cleanly, this means we should stop (right?)
221 # eventloop exited cleanly, this means we should stop (right?)
222 self.eventloop = None
222 self.eventloop = None
223 break
223 break
224 self.log.info("exiting eventloop")
224 self.log.info("exiting eventloop")
225
225
226 def start(self):
226 def start(self):
227 """register dispatchers for streams"""
227 """register dispatchers for streams"""
228 self.shell.exit_now = False
229 if self.control_stream:
228 if self.control_stream:
230 self.control_stream.on_recv(self.dispatch_control, copy=False)
229 self.control_stream.on_recv(self.dispatch_control, copy=False)
231
230
232 def make_dispatcher(stream):
231 def make_dispatcher(stream):
233 def dispatcher(msg):
232 def dispatcher(msg):
234 return self.dispatch_shell(stream, msg)
233 return self.dispatch_shell(stream, msg)
235 return dispatcher
234 return dispatcher
236
235
237 for s in self.shell_streams:
236 for s in self.shell_streams:
238 s.on_recv(make_dispatcher(s), copy=False)
237 s.on_recv(make_dispatcher(s), copy=False)
239
238
240 # publish idle status
239 # publish idle status
241 self._publish_status('starting')
240 self._publish_status('starting')
242
241
243 def do_one_iteration(self):
242 def do_one_iteration(self):
244 """step eventloop just once"""
243 """step eventloop just once"""
245 if self.control_stream:
244 if self.control_stream:
246 self.control_stream.flush()
245 self.control_stream.flush()
247 for stream in self.shell_streams:
246 for stream in self.shell_streams:
248 # handle at most one request per iteration
247 # handle at most one request per iteration
249 stream.flush(zmq.POLLIN, 1)
248 stream.flush(zmq.POLLIN, 1)
250 stream.flush(zmq.POLLOUT)
249 stream.flush(zmq.POLLOUT)
251
250
252
251
253 def record_ports(self, ports):
252 def record_ports(self, ports):
254 """Record the ports that this kernel is using.
253 """Record the ports that this kernel is using.
255
254
256 The creator of the Kernel instance must call this methods if they
255 The creator of the Kernel instance must call this methods if they
257 want the :meth:`connect_request` method to return the port numbers.
256 want the :meth:`connect_request` method to return the port numbers.
258 """
257 """
259 self._recorded_ports = ports
258 self._recorded_ports = ports
260
259
261 #---------------------------------------------------------------------------
260 #---------------------------------------------------------------------------
262 # Kernel request handlers
261 # Kernel request handlers
263 #---------------------------------------------------------------------------
262 #---------------------------------------------------------------------------
264
263
265 def _make_metadata(self, other=None):
264 def _make_metadata(self, other=None):
266 """init metadata dict, for execute/apply_reply"""
265 """init metadata dict, for execute/apply_reply"""
267 new_md = {
266 new_md = {
268 'dependencies_met' : True,
267 'dependencies_met' : True,
269 'engine' : self.ident,
268 'engine' : self.ident,
270 'started': datetime.now(),
269 'started': datetime.now(),
271 }
270 }
272 if other:
271 if other:
273 new_md.update(other)
272 new_md.update(other)
274 return new_md
273 return new_md
275
274
276 def _publish_execute_input(self, code, parent, execution_count):
275 def _publish_execute_input(self, code, parent, execution_count):
277 """Publish the code request on the iopub stream."""
276 """Publish the code request on the iopub stream."""
278
277
279 self.session.send(self.iopub_socket, u'execute_input',
278 self.session.send(self.iopub_socket, u'execute_input',
280 {u'code':code, u'execution_count': execution_count},
279 {u'code':code, u'execution_count': execution_count},
281 parent=parent, ident=self._topic('execute_input')
280 parent=parent, ident=self._topic('execute_input')
282 )
281 )
283
282
284 def _publish_status(self, status, parent=None):
283 def _publish_status(self, status, parent=None):
285 """send status (busy/idle) on IOPub"""
284 """send status (busy/idle) on IOPub"""
286 self.session.send(self.iopub_socket,
285 self.session.send(self.iopub_socket,
287 u'status',
286 u'status',
288 {u'execution_state': status},
287 {u'execution_state': status},
289 parent=parent,
288 parent=parent,
290 ident=self._topic('status'),
289 ident=self._topic('status'),
291 )
290 )
292
291
293 def set_parent(self, ident, parent):
292 def set_parent(self, ident, parent):
294 """Set the current parent_header
293 """Set the current parent_header
295
294
296 Side effects (IOPub messages) and replies are associated with
295 Side effects (IOPub messages) and replies are associated with
297 the request that caused them via the parent_header.
296 the request that caused them via the parent_header.
298
297
299 The parent identity is used to route input_request messages
298 The parent identity is used to route input_request messages
300 on the stdin channel.
299 on the stdin channel.
301 """
300 """
302 self._parent_ident = ident
301 self._parent_ident = ident
303 self._parent_header = parent
302 self._parent_header = parent
304
303
305 def send_response(self, stream, msg_or_type, content=None, ident=None,
304 def send_response(self, stream, msg_or_type, content=None, ident=None,
306 buffers=None, track=False, header=None, metadata=None):
305 buffers=None, track=False, header=None, metadata=None):
307 """Send a response to the message we're currently processing.
306 """Send a response to the message we're currently processing.
308
307
309 This accepts all the parameters of :meth:`IPython.kernel.zmq.session.Session.send`
308 This accepts all the parameters of :meth:`IPython.kernel.zmq.session.Session.send`
310 except ``parent``.
309 except ``parent``.
311
310
312 This relies on :meth:`set_parent` having been called for the current
311 This relies on :meth:`set_parent` having been called for the current
313 message.
312 message.
314 """
313 """
315 return self.session.send(stream, msg_or_type, content, self._parent_header,
314 return self.session.send(stream, msg_or_type, content, self._parent_header,
316 ident, buffers, track, header, metadata)
315 ident, buffers, track, header, metadata)
317
316
318 def execute_request(self, stream, ident, parent):
317 def execute_request(self, stream, ident, parent):
319 """handle an execute_request"""
318 """handle an execute_request"""
320
319
321 self._publish_status(u'busy', parent)
320 self._publish_status(u'busy', parent)
322
321
323 try:
322 try:
324 content = parent[u'content']
323 content = parent[u'content']
325 code = py3compat.cast_unicode_py2(content[u'code'])
324 code = py3compat.cast_unicode_py2(content[u'code'])
326 silent = content[u'silent']
325 silent = content[u'silent']
327 store_history = content.get(u'store_history', not silent)
326 store_history = content.get(u'store_history', not silent)
328 user_expressions = content.get('user_expressions', {})
327 user_expressions = content.get('user_expressions', {})
329 allow_stdin = content.get('allow_stdin', False)
328 allow_stdin = content.get('allow_stdin', False)
330 except:
329 except:
331 self.log.error("Got bad msg: ")
330 self.log.error("Got bad msg: ")
332 self.log.error("%s", parent)
331 self.log.error("%s", parent)
333 return
332 return
334
333
335 md = self._make_metadata(parent['metadata'])
334 md = self._make_metadata(parent['metadata'])
336
335
337 # Set the parent message of the display hook and out streams.
336 # Set the parent message of the display hook and out streams.
338 self.set_parent(ident, parent)
337 self.set_parent(ident, parent)
339
338
340 # Re-broadcast our input for the benefit of listening clients, and
339 # Re-broadcast our input for the benefit of listening clients, and
341 # start computing output
340 # start computing output
342 if not silent:
341 if not silent:
343 self.execution_count += 1
342 self.execution_count += 1
344 self._publish_execute_input(code, parent, self.execution_count)
343 self._publish_execute_input(code, parent, self.execution_count)
345
344
346 reply_content = self.do_execute(code, silent, store_history,
345 reply_content = self.do_execute(code, silent, store_history,
347 user_expressions, allow_stdin)
346 user_expressions, allow_stdin)
348
347
349 # Flush output before sending the reply.
348 # Flush output before sending the reply.
350 sys.stdout.flush()
349 sys.stdout.flush()
351 sys.stderr.flush()
350 sys.stderr.flush()
352 # FIXME: on rare occasions, the flush doesn't seem to make it to the
351 # FIXME: on rare occasions, the flush doesn't seem to make it to the
353 # clients... This seems to mitigate the problem, but we definitely need
352 # clients... This seems to mitigate the problem, but we definitely need
354 # to better understand what's going on.
353 # to better understand what's going on.
355 if self._execute_sleep:
354 if self._execute_sleep:
356 time.sleep(self._execute_sleep)
355 time.sleep(self._execute_sleep)
357
356
358 # Send the reply.
357 # Send the reply.
359 reply_content = json_clean(reply_content)
358 reply_content = json_clean(reply_content)
360
359
361 md['status'] = reply_content['status']
360 md['status'] = reply_content['status']
362 if reply_content['status'] == 'error' and \
361 if reply_content['status'] == 'error' and \
363 reply_content['ename'] == 'UnmetDependency':
362 reply_content['ename'] == 'UnmetDependency':
364 md['dependencies_met'] = False
363 md['dependencies_met'] = False
365
364
366 reply_msg = self.session.send(stream, u'execute_reply',
365 reply_msg = self.session.send(stream, u'execute_reply',
367 reply_content, parent, metadata=md,
366 reply_content, parent, metadata=md,
368 ident=ident)
367 ident=ident)
369
368
370 self.log.debug("%s", reply_msg)
369 self.log.debug("%s", reply_msg)
371
370
372 if not silent and reply_msg['content']['status'] == u'error':
371 if not silent and reply_msg['content']['status'] == u'error':
373 self._abort_queues()
372 self._abort_queues()
374
373
375 self._publish_status(u'idle', parent)
374 self._publish_status(u'idle', parent)
376
375
376 def do_execute(self, code, silent, store_history=True,
377 user_experssions=None, allow_stdin=False):
378 """Execute user code. Must be overridden by subclasses.
379 """
380 raise NotImplementedError
381
377 def complete_request(self, stream, ident, parent):
382 def complete_request(self, stream, ident, parent):
378 content = parent['content']
383 content = parent['content']
379 code = content['code']
384 code = content['code']
380 cursor_pos = content['cursor_pos']
385 cursor_pos = content['cursor_pos']
381
386
382 matches = self.do_complete(code, cursor_pos)
387 matches = self.do_complete(code, cursor_pos)
383 matches = json_clean(matches)
388 matches = json_clean(matches)
384 completion_msg = self.session.send(stream, 'complete_reply',
389 completion_msg = self.session.send(stream, 'complete_reply',
385 matches, parent, ident)
390 matches, parent, ident)
386 self.log.debug("%s", completion_msg)
391 self.log.debug("%s", completion_msg)
387
392
393 def do_complete(self, code, cursor_pos):
394 """Override in subclasses to find completions.
395 """
396 return {'matches' : [],
397 'cursor_end' : cursor_pos,
398 'cursor_start' : cursor_pos,
399 'metadata' : {},
400 'status' : 'ok'}
401
388 def inspect_request(self, stream, ident, parent):
402 def inspect_request(self, stream, ident, parent):
389 content = parent['content']
403 content = parent['content']
390
404
391 reply_content = self.do_inspect(content['code'], content['cursor_pos'],
405 reply_content = self.do_inspect(content['code'], content['cursor_pos'],
392 content.get('detail_level', 0))
406 content.get('detail_level', 0))
393 # Before we send this object over, we scrub it for JSON usage
407 # Before we send this object over, we scrub it for JSON usage
394 reply_content = json_clean(reply_content)
408 reply_content = json_clean(reply_content)
395 msg = self.session.send(stream, 'inspect_reply',
409 msg = self.session.send(stream, 'inspect_reply',
396 reply_content, parent, ident)
410 reply_content, parent, ident)
397 self.log.debug("%s", msg)
411 self.log.debug("%s", msg)
398
412
413 def do_inspect(self, code, cursor_pos, detail_level=0):
414 """Override in subclasses to allow introspection.
415 """
416 return {'status': 'ok', 'data':{}, 'metadata':{}, 'found':False}
417
399 def history_request(self, stream, ident, parent):
418 def history_request(self, stream, ident, parent):
400 # We need to pull these out, as passing **kwargs doesn't work with
419 # We need to pull these out, as passing **kwargs doesn't work with
401 # unicode keys before Python 2.6.5.
420 # unicode keys before Python 2.6.5.
402 content = parent['content']
421 content = parent['content']
403
422
404 reply_content = self.do_history(content['hist_access_type'],
423 reply_content = self.do_history(content['hist_access_type'],
405 content['output'], content['raw'],
424 content['output'], content['raw'],
406 content.get('session', None),
425 content.get('session', None),
407 content.get('start', None),
426 content.get('start', None),
408 content.get('stop', None),
427 content.get('stop', None),
409 content.get('n', None),
428 content.get('n', None),
410 content.get('pattern', None),
429 content.get('pattern', None),
411 content.get('unique', False),
430 content.get('unique', False),
412 )
431 )
413
432
414 reply_content = json_clean(reply_content)
433 reply_content = json_clean(reply_content)
415 msg = self.session.send(stream, 'history_reply',
434 msg = self.session.send(stream, 'history_reply',
416 reply_content, parent, ident)
435 reply_content, parent, ident)
417 self.log.debug("%s", msg)
436 self.log.debug("%s", msg)
418
437
438 def do_history(self, hist_access_type, output, raw, session=None, start=None,
439 stop=None, n=None, pattern=None, unique=False):
440 """Override in subclasses to access history.
441 """
442 return {'history': []}
443
419 def connect_request(self, stream, ident, parent):
444 def connect_request(self, stream, ident, parent):
420 if self._recorded_ports is not None:
445 if self._recorded_ports is not None:
421 content = self._recorded_ports.copy()
446 content = self._recorded_ports.copy()
422 else:
447 else:
423 content = {}
448 content = {}
424 msg = self.session.send(stream, 'connect_reply',
449 msg = self.session.send(stream, 'connect_reply',
425 content, parent, ident)
450 content, parent, ident)
426 self.log.debug("%s", msg)
451 self.log.debug("%s", msg)
427
452
428 @property
453 @property
429 def kernel_info(self):
454 def kernel_info(self):
430 return {
455 return {
431 'protocol_version': release.kernel_protocol_version,
456 'protocol_version': release.kernel_protocol_version,
432 'implementation': self.implementation,
457 'implementation': self.implementation,
433 'implementation_version': self.implementation_version,
458 'implementation_version': self.implementation_version,
434 'language': self.language,
459 'language': self.language,
435 'language_version': self.language_version,
460 'language_version': self.language_version,
436 'banner': self.banner,
461 'banner': self.banner,
437 }
462 }
438
463
439 def kernel_info_request(self, stream, ident, parent):
464 def kernel_info_request(self, stream, ident, parent):
440 msg = self.session.send(stream, 'kernel_info_reply',
465 msg = self.session.send(stream, 'kernel_info_reply',
441 self.kernel_info, parent, ident)
466 self.kernel_info, parent, ident)
442 self.log.debug("%s", msg)
467 self.log.debug("%s", msg)
443
468
444 def shutdown_request(self, stream, ident, parent):
469 def shutdown_request(self, stream, ident, parent):
445 content = self.do_shutdown(parent['content']['restart'])
470 content = self.do_shutdown(parent['content']['restart'])
446 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
471 self.session.send(stream, u'shutdown_reply', content, parent, ident=ident)
447 # same content, but different msg_id for broadcasting on IOPub
472 # same content, but different msg_id for broadcasting on IOPub
448 self._shutdown_message = self.session.msg(u'shutdown_reply',
473 self._shutdown_message = self.session.msg(u'shutdown_reply',
449 content, parent
474 content, parent
450 )
475 )
451
476
452 self._at_shutdown()
477 self._at_shutdown()
453 # call sys.exit after a short delay
478 # call sys.exit after a short delay
454 loop = ioloop.IOLoop.instance()
479 loop = ioloop.IOLoop.instance()
455 loop.add_timeout(time.time()+0.1, loop.stop)
480 loop.add_timeout(time.time()+0.1, loop.stop)
456
481
482 def do_shutdown(self, restart):
483 """Override in subclasses to do things when the frontend shuts down the
484 kernel.
485 """
486 return {'status': 'ok', 'restart': restart}
487
457 #---------------------------------------------------------------------------
488 #---------------------------------------------------------------------------
458 # Engine methods
489 # Engine methods
459 #---------------------------------------------------------------------------
490 #---------------------------------------------------------------------------
460
491
461 def apply_request(self, stream, ident, parent):
492 def apply_request(self, stream, ident, parent):
462 try:
493 try:
463 content = parent[u'content']
494 content = parent[u'content']
464 bufs = parent[u'buffers']
495 bufs = parent[u'buffers']
465 msg_id = parent['header']['msg_id']
496 msg_id = parent['header']['msg_id']
466 except:
497 except:
467 self.log.error("Got bad msg: %s", parent, exc_info=True)
498 self.log.error("Got bad msg: %s", parent, exc_info=True)
468 return
499 return
469
500
470 self._publish_status(u'busy', parent)
501 self._publish_status(u'busy', parent)
471
502
472 # Set the parent message of the display hook and out streams.
503 # Set the parent message of the display hook and out streams.
473 self.set_parent(ident, parent)
504 self.set_parent(ident, parent)
474
505
475 md = self._make_metadata(parent['metadata'])
506 md = self._make_metadata(parent['metadata'])
476
507
477 reply_content, result_buf = self.do_apply(content, bufs, msg_id, md)
508 reply_content, result_buf = self.do_apply(content, bufs, msg_id, md)
478
509
479 # put 'ok'/'error' status in header, for scheduler introspection:
510 # put 'ok'/'error' status in header, for scheduler introspection:
480 md['status'] = reply_content['status']
511 md['status'] = reply_content['status']
481
512
482 # flush i/o
513 # flush i/o
483 sys.stdout.flush()
514 sys.stdout.flush()
484 sys.stderr.flush()
515 sys.stderr.flush()
485
516
486 self.session.send(stream, u'apply_reply', reply_content,
517 self.session.send(stream, u'apply_reply', reply_content,
487 parent=parent, ident=ident,buffers=result_buf, metadata=md)
518 parent=parent, ident=ident,buffers=result_buf, metadata=md)
488
519
489 self._publish_status(u'idle', parent)
520 self._publish_status(u'idle', parent)
490
521
522 def do_apply(self, content, bufs, msg_id, reply_metadata):
523 """Override in subclasses to support the IPython parallel framework.
524 """
525 raise NotImplementedError
526
491 #---------------------------------------------------------------------------
527 #---------------------------------------------------------------------------
492 # Control messages
528 # Control messages
493 #---------------------------------------------------------------------------
529 #---------------------------------------------------------------------------
494
530
495 def abort_request(self, stream, ident, parent):
531 def abort_request(self, stream, ident, parent):
496 """abort a specifig msg by id"""
532 """abort a specifig msg by id"""
497 msg_ids = parent['content'].get('msg_ids', None)
533 msg_ids = parent['content'].get('msg_ids', None)
498 if isinstance(msg_ids, string_types):
534 if isinstance(msg_ids, string_types):
499 msg_ids = [msg_ids]
535 msg_ids = [msg_ids]
500 if not msg_ids:
536 if not msg_ids:
501 self.abort_queues()
537 self.abort_queues()
502 for mid in msg_ids:
538 for mid in msg_ids:
503 self.aborted.add(str(mid))
539 self.aborted.add(str(mid))
504
540
505 content = dict(status='ok')
541 content = dict(status='ok')
506 reply_msg = self.session.send(stream, 'abort_reply', content=content,
542 reply_msg = self.session.send(stream, 'abort_reply', content=content,
507 parent=parent, ident=ident)
543 parent=parent, ident=ident)
508 self.log.debug("%s", reply_msg)
544 self.log.debug("%s", reply_msg)
509
545
510 def clear_request(self, stream, idents, parent):
546 def clear_request(self, stream, idents, parent):
511 """Clear our namespace."""
547 """Clear our namespace."""
512 content = self.do_clear()
548 content = self.do_clear()
513 self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
549 self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
514 content = content)
550 content = content)
515
551
552 def do_clear(self):
553 """Override in subclasses to clear the namespace
554
555 This is only required for IPython.parallel.
556 """
557 raise NotImplementedError
558
516 #---------------------------------------------------------------------------
559 #---------------------------------------------------------------------------
517 # Protected interface
560 # Protected interface
518 #---------------------------------------------------------------------------
561 #---------------------------------------------------------------------------
519
562
520 def _topic(self, topic):
563 def _topic(self, topic):
521 """prefixed topic for IOPub messages"""
564 """prefixed topic for IOPub messages"""
522 if self.int_id >= 0:
565 if self.int_id >= 0:
523 base = "engine.%i" % self.int_id
566 base = "engine.%i" % self.int_id
524 else:
567 else:
525 base = "kernel.%s" % self.ident
568 base = "kernel.%s" % self.ident
526
569
527 return py3compat.cast_bytes("%s.%s" % (base, topic))
570 return py3compat.cast_bytes("%s.%s" % (base, topic))
528
571
529 def _abort_queues(self):
572 def _abort_queues(self):
530 for stream in self.shell_streams:
573 for stream in self.shell_streams:
531 if stream:
574 if stream:
532 self._abort_queue(stream)
575 self._abort_queue(stream)
533
576
534 def _abort_queue(self, stream):
577 def _abort_queue(self, stream):
535 poller = zmq.Poller()
578 poller = zmq.Poller()
536 poller.register(stream.socket, zmq.POLLIN)
579 poller.register(stream.socket, zmq.POLLIN)
537 while True:
580 while True:
538 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
581 idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
539 if msg is None:
582 if msg is None:
540 return
583 return
541
584
542 self.log.info("Aborting:")
585 self.log.info("Aborting:")
543 self.log.info("%s", msg)
586 self.log.info("%s", msg)
544 msg_type = msg['header']['msg_type']
587 msg_type = msg['header']['msg_type']
545 reply_type = msg_type.split('_')[0] + '_reply'
588 reply_type = msg_type.split('_')[0] + '_reply'
546
589
547 status = {'status' : 'aborted'}
590 status = {'status' : 'aborted'}
548 md = {'engine' : self.ident}
591 md = {'engine' : self.ident}
549 md.update(status)
592 md.update(status)
550 reply_msg = self.session.send(stream, reply_type, metadata=md,
593 reply_msg = self.session.send(stream, reply_type, metadata=md,
551 content=status, parent=msg, ident=idents)
594 content=status, parent=msg, ident=idents)
552 self.log.debug("%s", reply_msg)
595 self.log.debug("%s", reply_msg)
553 # We need to wait a bit for requests to come in. This can probably
596 # We need to wait a bit for requests to come in. This can probably
554 # be set shorter for true asynchronous clients.
597 # be set shorter for true asynchronous clients.
555 poller.poll(50)
598 poller.poll(50)
556
599
557
600
558 def _no_raw_input(self):
601 def _no_raw_input(self):
559 """Raise StdinNotImplentedError if active frontend doesn't support
602 """Raise StdinNotImplentedError if active frontend doesn't support
560 stdin."""
603 stdin."""
561 raise StdinNotImplementedError("raw_input was called, but this "
604 raise StdinNotImplementedError("raw_input was called, but this "
562 "frontend does not support stdin.")
605 "frontend does not support stdin.")
563
606
564 def getpass(self, prompt=''):
607 def getpass(self, prompt=''):
565 """Forward getpass to frontends
608 """Forward getpass to frontends
566
609
567 Raises
610 Raises
568 ------
611 ------
569 StdinNotImplentedError if active frontend doesn't support stdin.
612 StdinNotImplentedError if active frontend doesn't support stdin.
570 """
613 """
571 if not self._allow_stdin:
614 if not self._allow_stdin:
572 raise StdinNotImplementedError(
615 raise StdinNotImplementedError(
573 "getpass was called, but this frontend does not support input requests."
616 "getpass was called, but this frontend does not support input requests."
574 )
617 )
575 return self._input_request(prompt,
618 return self._input_request(prompt,
576 self._parent_ident,
619 self._parent_ident,
577 self._parent_header,
620 self._parent_header,
578 password=True,
621 password=True,
579 )
622 )
580
623
581 def raw_input(self, prompt=''):
624 def raw_input(self, prompt=''):
582 """Forward raw_input to frontends
625 """Forward raw_input to frontends
583
626
584 Raises
627 Raises
585 ------
628 ------
586 StdinNotImplentedError if active frontend doesn't support stdin.
629 StdinNotImplentedError if active frontend doesn't support stdin.
587 """
630 """
588 if not self._allow_stdin:
631 if not self._allow_stdin:
589 raise StdinNotImplementedError(
632 raise StdinNotImplementedError(
590 "raw_input was called, but this frontend does not support input requests."
633 "raw_input was called, but this frontend does not support input requests."
591 )
634 )
592 return self._input_request(prompt,
635 return self._input_request(prompt,
593 self._parent_ident,
636 self._parent_ident,
594 self._parent_header,
637 self._parent_header,
595 password=False,
638 password=False,
596 )
639 )
597
640
598 def _input_request(self, prompt, ident, parent, password=False):
641 def _input_request(self, prompt, ident, parent, password=False):
599 # Flush output before making the request.
642 # Flush output before making the request.
600 sys.stderr.flush()
643 sys.stderr.flush()
601 sys.stdout.flush()
644 sys.stdout.flush()
602 # flush the stdin socket, to purge stale replies
645 # flush the stdin socket, to purge stale replies
603 while True:
646 while True:
604 try:
647 try:
605 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
648 self.stdin_socket.recv_multipart(zmq.NOBLOCK)
606 except zmq.ZMQError as e:
649 except zmq.ZMQError as e:
607 if e.errno == zmq.EAGAIN:
650 if e.errno == zmq.EAGAIN:
608 break
651 break
609 else:
652 else:
610 raise
653 raise
611
654
612 # Send the input request.
655 # Send the input request.
613 content = json_clean(dict(prompt=prompt, password=password))
656 content = json_clean(dict(prompt=prompt, password=password))
614 self.session.send(self.stdin_socket, u'input_request', content, parent,
657 self.session.send(self.stdin_socket, u'input_request', content, parent,
615 ident=ident)
658 ident=ident)
616
659
617 # Await a response.
660 # Await a response.
618 while True:
661 while True:
619 try:
662 try:
620 ident, reply = self.session.recv(self.stdin_socket, 0)
663 ident, reply = self.session.recv(self.stdin_socket, 0)
621 except Exception:
664 except Exception:
622 self.log.warn("Invalid Message:", exc_info=True)
665 self.log.warn("Invalid Message:", exc_info=True)
623 except KeyboardInterrupt:
666 except KeyboardInterrupt:
624 # re-raise KeyboardInterrupt, to truncate traceback
667 # re-raise KeyboardInterrupt, to truncate traceback
625 raise KeyboardInterrupt
668 raise KeyboardInterrupt
626 else:
669 else:
627 break
670 break
628 try:
671 try:
629 value = py3compat.unicode_to_str(reply['content']['value'])
672 value = py3compat.unicode_to_str(reply['content']['value'])
630 except:
673 except:
631 self.log.error("Bad input_reply: %s", parent)
674 self.log.error("Bad input_reply: %s", parent)
632 value = ''
675 value = ''
633 if value == '\x04':
676 if value == '\x04':
634 # EOF
677 # EOF
635 raise EOFError
678 raise EOFError
636 return value
679 return value
637
680
638 def _at_shutdown(self):
681 def _at_shutdown(self):
639 """Actions taken at shutdown by the kernel, called by python's atexit.
682 """Actions taken at shutdown by the kernel, called by python's atexit.
640 """
683 """
641 # io.rprint("Kernel at_shutdown") # dbg
684 # io.rprint("Kernel at_shutdown") # dbg
642 if self._shutdown_message is not None:
685 if self._shutdown_message is not None:
643 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
686 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
644 self.log.debug("%s", self._shutdown_message)
687 self.log.debug("%s", self._shutdown_message)
645 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
688 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
646
689
General Comments 0
You need to be logged in to leave comments. Login now