##// END OF EJS Templates
interrogate kernel_info to get protocol version for adaptation
MinRK -
Show More
@@ -1,150 +1,134 b''
1 """Tornado handlers for WebSocket <-> ZMQ sockets.
1 """Tornado handlers for WebSocket <-> ZMQ sockets."""
2 2
3 Authors:
4
5 * Brian Granger
6 """
7
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2008-2011 The IPython Development Team
10 #
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
14
15 #-----------------------------------------------------------------------------
16 # Imports
17 #-----------------------------------------------------------------------------
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
18 5
19 6 try:
20 7 from urllib.parse import urlparse # Py 3
21 8 except ImportError:
22 9 from urlparse import urlparse # Py 2
23 10
24 11 try:
25 12 from http.cookies import SimpleCookie # Py 3
26 13 except ImportError:
27 14 from Cookie import SimpleCookie # Py 2
28 15 import logging
29 16 from tornado import web
30 17 from tornado import websocket
31 18
32 19 from zmq.utils import jsonapi
33 20
34 21 from IPython.kernel.zmq.session import Session
35 22 from IPython.utils.jsonutil import date_default
36 23 from IPython.utils.py3compat import PY3, cast_unicode
37 24
38 25 from .handlers import IPythonHandler
39 26
40 #-----------------------------------------------------------------------------
41 # ZMQ handlers
42 #-----------------------------------------------------------------------------
43 27
44 28 class ZMQStreamHandler(websocket.WebSocketHandler):
45 29
46 30 def same_origin(self):
47 31 """Check to see that origin and host match in the headers."""
48 32
49 33 # The difference between version 8 and 13 is that in 8 the
50 34 # client sends a "Sec-Websocket-Origin" header and in 13 it's
51 35 # simply "Origin".
52 36 if self.request.headers.get("Sec-WebSocket-Version") in ("7", "8"):
53 37 origin_header = self.request.headers.get("Sec-Websocket-Origin")
54 38 else:
55 39 origin_header = self.request.headers.get("Origin")
56 40
57 41 host = self.request.headers.get("Host")
58 42
59 43 # If no header is provided, assume we can't verify origin
60 44 if(origin_header is None or host is None):
61 45 return False
62 46
63 47 parsed_origin = urlparse(origin_header)
64 48 origin = parsed_origin.netloc
65 49
66 50 # Check to see that origin matches host directly, including ports
67 51 return origin == host
68 52
69 53 def clear_cookie(self, *args, **kwargs):
70 54 """meaningless for websockets"""
71 55 pass
72 56
73 57 def _reserialize_reply(self, msg_list):
74 58 """Reserialize a reply message using JSON.
75 59
76 60 This takes the msg list from the ZMQ socket, unserializes it using
77 61 self.session and then serializes the result using JSON. This method
78 62 should be used by self._on_zmq_reply to build messages that can
79 63 be sent back to the browser.
80 64 """
81 65 idents, msg_list = self.session.feed_identities(msg_list)
82 66 msg = self.session.unserialize(msg_list)
83 67 try:
84 68 msg['header'].pop('date')
85 69 except KeyError:
86 70 pass
87 71 try:
88 72 msg['parent_header'].pop('date')
89 73 except KeyError:
90 74 pass
91 75 msg.pop('buffers')
92 76 return jsonapi.dumps(msg, default=date_default)
93 77
94 78 def _on_zmq_reply(self, msg_list):
95 79 # Sometimes this gets triggered when the on_close method is scheduled in the
96 80 # eventloop but hasn't been called.
97 81 if self.stream.closed(): return
98 82 try:
99 83 msg = self._reserialize_reply(msg_list)
100 84 except Exception:
101 85 self.log.critical("Malformed message: %r" % msg_list, exc_info=True)
102 86 else:
103 87 self.write_message(msg)
104 88
105 89 def allow_draft76(self):
106 90 """Allow draft 76, until browsers such as Safari update to RFC 6455.
107 91
108 92 This has been disabled by default in tornado in release 2.2.0, and
109 93 support will be removed in later versions.
110 94 """
111 95 return True
112 96
113 97
114 98 class AuthenticatedZMQStreamHandler(ZMQStreamHandler, IPythonHandler):
115 99
116 100 def open(self, kernel_id):
117 101 # Check to see that origin matches host directly, including ports
118 102 if not self.same_origin():
119 103 self.log.warn("Cross Origin WebSocket Attempt.")
120 104 raise web.HTTPError(404)
121 105
122 106 self.kernel_id = cast_unicode(kernel_id, 'ascii')
123 107 self.session = Session(config=self.config)
124 108 self.save_on_message = self.on_message
125 109 self.on_message = self.on_first_message
126 110
127 111 def _inject_cookie_message(self, msg):
128 112 """Inject the first message, which is the document cookie,
129 113 for authentication."""
130 114 if not PY3 and isinstance(msg, unicode):
131 115 # Cookie constructor doesn't accept unicode strings
132 116 # under Python 2.x for some reason
133 117 msg = msg.encode('utf8', 'replace')
134 118 try:
135 119 identity, msg = msg.split(':', 1)
136 120 self.session.session = cast_unicode(identity, 'ascii')
137 121 except Exception:
138 122 logging.error("First ws message didn't have the form 'identity:[cookie]' - %r", msg)
139 123
140 124 try:
141 125 self.request._cookies = SimpleCookie(msg)
142 126 except:
143 127 self.log.warn("couldn't parse cookie string: %s",msg, exc_info=True)
144 128
145 129 def on_first_message(self, msg):
146 130 self._inject_cookie_message(msg)
147 131 if self.get_current_user() is None:
148 132 self.log.warn("Couldn't authenticate WebSocket connection")
149 133 raise web.HTTPError(403)
150 134 self.on_message = self.save_on_message
@@ -1,198 +1,211 b''
1 """Tornado handlers for the notebook.
1 """Tornado handlers for kernels."""
2 2
3 Authors:
4
5 * Brian Granger
6 """
7
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2008-2011 The IPython Development Team
10 #
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
14
15 #-----------------------------------------------------------------------------
16 # Imports
17 #-----------------------------------------------------------------------------
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
18 5
19 6 import logging
20 7 from tornado import web
21 8
22 9 from zmq.utils import jsonapi
23 10
24 11 from IPython.utils.jsonutil import date_default
12 from IPython.utils.py3compat import string_types
25 13 from IPython.html.utils import url_path_join, url_escape
26 14
27 15 from ...base.handlers import IPythonHandler, json_errors
28 16 from ...base.zmqhandlers import AuthenticatedZMQStreamHandler
29 17
30 #-----------------------------------------------------------------------------
31 # Kernel handlers
32 #-----------------------------------------------------------------------------
33
18 from IPython.core.release import kernel_protocol_version
34 19
35 20 class MainKernelHandler(IPythonHandler):
36 21
37 22 @web.authenticated
38 23 @json_errors
39 24 def get(self):
40 25 km = self.kernel_manager
41 26 self.finish(jsonapi.dumps(km.list_kernels()))
42 27
43 28 @web.authenticated
44 29 @json_errors
45 30 def post(self):
46 31 km = self.kernel_manager
47 32 kernel_id = km.start_kernel()
48 33 model = km.kernel_model(kernel_id)
49 34 location = url_path_join(self.base_url, 'api', 'kernels', kernel_id)
50 35 self.set_header('Location', url_escape(location))
51 36 self.set_status(201)
52 37 self.finish(jsonapi.dumps(model))
53 38
54 39
55 40 class KernelHandler(IPythonHandler):
56 41
57 42 SUPPORTED_METHODS = ('DELETE', 'GET')
58 43
59 44 @web.authenticated
60 45 @json_errors
61 46 def get(self, kernel_id):
62 47 km = self.kernel_manager
63 48 km._check_kernel_id(kernel_id)
64 49 model = km.kernel_model(kernel_id)
65 50 self.finish(jsonapi.dumps(model))
66 51
67 52 @web.authenticated
68 53 @json_errors
69 54 def delete(self, kernel_id):
70 55 km = self.kernel_manager
71 56 km.shutdown_kernel(kernel_id)
72 57 self.set_status(204)
73 58 self.finish()
74 59
75 60
76 61 class KernelActionHandler(IPythonHandler):
77 62
78 63 @web.authenticated
79 64 @json_errors
80 65 def post(self, kernel_id, action):
81 66 km = self.kernel_manager
82 67 if action == 'interrupt':
83 68 km.interrupt_kernel(kernel_id)
84 69 self.set_status(204)
85 70 if action == 'restart':
86 71 km.restart_kernel(kernel_id)
87 72 model = km.kernel_model(kernel_id)
88 73 self.set_header('Location', '{0}api/kernels/{1}'.format(self.base_url, kernel_id))
89 74 self.write(jsonapi.dumps(model))
90 75 self.finish()
91 76
92 77
93 78 class ZMQChannelHandler(AuthenticatedZMQStreamHandler):
94 79
95 80 def create_stream(self):
96 81 km = self.kernel_manager
97 82 meth = getattr(km, 'connect_%s' % self.channel)
98 83 self.zmq_stream = meth(self.kernel_id, identity=self.session.bsession)
84 self.kernel_info_channel = None
85 self.kernel_info_channel = km.connect_shell(self.kernel_id)
86 self.kernel_info_channel.on_recv(self._handle_kernel_info)
87 self._request_kernel_info()
88
89 def _request_kernel_info(self):
90 self.log.debug("requesting kernel info")
91 self.session.send(self.kernel_info_channel, "kernel_info_request")
92
93 def _handle_kernel_info(self, msg):
94 idents,msg = self.session.feed_identities(msg)
95 try:
96 msg = self.session.unserialize(msg)
97 except:
98 self.log.error("Bad kernel_info reply", exc_info=True)
99 self._request_kernel_info()
100 return
101 else:
102 if msg['msg_type'] != 'kernel_info_reply' or 'protocol_version' not in msg['content']:
103 self.log.error("Kernel info request failed, assuming current %s", msg['content'])
104 else:
105 protocol_version = msg['content']['protocol_version']
106 if protocol_version != kernel_protocol_version:
107 self.session.adapt_version = int(protocol_version.split('.')[0])
108 self.log.info("adapting kernel to %s" % protocol_version)
109 self.kernel_info_channel.close()
110 self.kernel_info_channel = None
111
99 112
100 113 def initialize(self, *args, **kwargs):
101 114 self.zmq_stream = None
102 115
103 116 def on_first_message(self, msg):
104 117 try:
105 118 super(ZMQChannelHandler, self).on_first_message(msg)
106 119 except web.HTTPError:
107 120 self.close()
108 121 return
109 122 try:
110 123 self.create_stream()
111 124 except web.HTTPError:
112 125 # WebSockets don't response to traditional error codes so we
113 126 # close the connection.
114 127 if not self.stream.closed():
115 128 self.stream.close()
116 129 self.close()
117 130 else:
118 131 self.zmq_stream.on_recv(self._on_zmq_reply)
119 132
120 133 def on_message(self, msg):
121 134 msg = jsonapi.loads(msg)
122 135 self.session.send(self.zmq_stream, msg)
123 136
124 137 def on_close(self):
125 138 # This method can be called twice, once by self.kernel_died and once
126 139 # from the WebSocket close event. If the WebSocket connection is
127 140 # closed before the ZMQ streams are setup, they could be None.
128 141 if self.zmq_stream is not None and not self.zmq_stream.closed():
129 142 self.zmq_stream.on_recv(None)
130 143 # close the socket directly, don't wait for the stream
131 144 socket = self.zmq_stream.socket
132 145 self.zmq_stream.close()
133 146 socket.close()
134 147
135 148
136 149 class IOPubHandler(ZMQChannelHandler):
137 150 channel = 'iopub'
138 151
139 152 def create_stream(self):
140 153 super(IOPubHandler, self).create_stream()
141 154 km = self.kernel_manager
142 155 km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
143 156 km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
144 157
145 158 def on_close(self):
146 159 km = self.kernel_manager
147 160 if self.kernel_id in km:
148 161 km.remove_restart_callback(
149 162 self.kernel_id, self.on_kernel_restarted,
150 163 )
151 164 km.remove_restart_callback(
152 165 self.kernel_id, self.on_restart_failed, 'dead',
153 166 )
154 167 super(IOPubHandler, self).on_close()
155 168
156 169 def _send_status_message(self, status):
157 170 msg = self.session.msg("status",
158 171 {'execution_state': status}
159 172 )
160 173 self.write_message(jsonapi.dumps(msg, default=date_default))
161 174
162 175 def on_kernel_restarted(self):
163 176 logging.warn("kernel %s restarted", self.kernel_id)
164 177 self._send_status_message('restarting')
165 178
166 179 def on_restart_failed(self):
167 180 logging.error("kernel %s restarted failed!", self.kernel_id)
168 181 self._send_status_message('dead')
169 182
170 183 def on_message(self, msg):
171 184 """IOPub messages make no sense"""
172 185 pass
173 186
174 187
175 188 class ShellHandler(ZMQChannelHandler):
176 189 channel = 'shell'
177 190
178 191
179 192 class StdinHandler(ZMQChannelHandler):
180 193 channel = 'stdin'
181 194
182 195
183 196 #-----------------------------------------------------------------------------
184 197 # URL to handler mappings
185 198 #-----------------------------------------------------------------------------
186 199
187 200
188 201 _kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
189 202 _kernel_action_regex = r"(?P<action>restart|interrupt)"
190 203
191 204 default_handlers = [
192 205 (r"/api/kernels", MainKernelHandler),
193 206 (r"/api/kernels/%s" % _kernel_id_regex, KernelHandler),
194 207 (r"/api/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler),
195 208 (r"/api/kernels/%s/iopub" % _kernel_id_regex, IOPubHandler),
196 209 (r"/api/kernels/%s/shell" % _kernel_id_regex, ShellHandler),
197 210 (r"/api/kernels/%s/stdin" % _kernel_id_regex, StdinHandler)
198 211 ]
@@ -1,82 +1,85 b''
1 1 """Blocking channels
2 2
3 3 Useful for test suites and blocking terminal interfaces.
4 4 """
5 5 #-----------------------------------------------------------------------------
6 6 # Copyright (C) 2013 The IPython Development Team
7 7 #
8 8 # Distributed under the terms of the BSD License. The full license is in
9 9 # the file COPYING.txt, distributed as part of this software.
10 10 #-----------------------------------------------------------------------------
11 11
12 12 #-----------------------------------------------------------------------------
13 13 # Imports
14 14 #-----------------------------------------------------------------------------
15 15
16 16 try:
17 17 from queue import Queue, Empty # Py 3
18 18 except ImportError:
19 19 from Queue import Queue, Empty # Py 2
20 20
21 21 from IPython.kernel.channels import IOPubChannel, HBChannel, \
22 22 ShellChannel, StdInChannel
23 23
24 24 #-----------------------------------------------------------------------------
25 25 # Blocking kernel manager
26 26 #-----------------------------------------------------------------------------
27 27
28 28
29 29 class BlockingChannelMixin(object):
30 30
31 31 def __init__(self, *args, **kwds):
32 32 super(BlockingChannelMixin, self).__init__(*args, **kwds)
33 33 self._in_queue = Queue()
34 34
35 35 def call_handlers(self, msg):
36 36 self._in_queue.put(msg)
37 37
38 38 def get_msg(self, block=True, timeout=None):
39 39 """ Gets a message if there is one that is ready. """
40 40 if timeout is None:
41 41 # Queue.get(timeout=None) has stupid uninteruptible
42 42 # behavior, so wait for a week instead
43 43 timeout = 604800
44 44 return self._in_queue.get(block, timeout)
45 45
46 46 def get_msgs(self):
47 47 """ Get all messages that are currently ready. """
48 48 msgs = []
49 49 while True:
50 50 try:
51 51 msgs.append(self.get_msg(block=False))
52 52 except Empty:
53 53 break
54 54 return msgs
55 55
56 56 def msg_ready(self):
57 57 """ Is there a message that has been received? """
58 58 return not self._in_queue.empty()
59 59
60 60
61 61 class BlockingIOPubChannel(BlockingChannelMixin, IOPubChannel):
62 62 pass
63 63
64 64
65 65 class BlockingShellChannel(BlockingChannelMixin, ShellChannel):
66 pass
66 def call_handlers(self, msg):
67 if msg['msg_type'] == 'kernel_info_reply':
68 self._handle_kernel_info_reply(msg)
69 return super(BlockingShellChannel, self).call_handlers(msg)
67 70
68 71
69 72 class BlockingStdInChannel(BlockingChannelMixin, StdInChannel):
70 73 pass
71 74
72 75
73 76 class BlockingHBChannel(HBChannel):
74 77
75 78 # This kernel needs quicker monitoring, shorten to 1 sec.
76 79 # less than 0.5s is unreliable, and will get occasional
77 80 # false reports of missed beats.
78 81 time_to_dead = 1.
79 82
80 83 def call_handlers(self, since_last_heartbeat):
81 84 """ Pause beating on missed heartbeat. """
82 85 pass
@@ -1,626 +1,639 b''
1 1 """Base classes to manage a Client's interaction with a running kernel"""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from __future__ import absolute_import
7 7
8 8 import atexit
9 9 import errno
10 10 from threading import Thread
11 11 import time
12 12
13 13 import zmq
14 14 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
15 15 # during garbage collection of threads at exit:
16 16 from zmq import ZMQError
17 17 from zmq.eventloop import ioloop, zmqstream
18 18
19 # Local imports
19 from IPython.core.release import kernel_protocol_version_info
20
20 21 from .channelsabc import (
21 22 ShellChannelABC, IOPubChannelABC,
22 23 HBChannelABC, StdInChannelABC,
23 24 )
24 25 from IPython.utils.py3compat import string_types, iteritems
25 26
26 27 #-----------------------------------------------------------------------------
27 28 # Constants and exceptions
28 29 #-----------------------------------------------------------------------------
29 30
31 major_protocol_version = kernel_protocol_version_info[0]
32
30 33 class InvalidPortNumber(Exception):
31 34 pass
32 35
33 36 #-----------------------------------------------------------------------------
34 37 # Utility functions
35 38 #-----------------------------------------------------------------------------
36 39
37 40 # some utilities to validate message structure, these might get moved elsewhere
38 41 # if they prove to have more generic utility
39 42
40 43 def validate_string_list(lst):
41 44 """Validate that the input is a list of strings.
42 45
43 46 Raises ValueError if not."""
44 47 if not isinstance(lst, list):
45 48 raise ValueError('input %r must be a list' % lst)
46 49 for x in lst:
47 50 if not isinstance(x, string_types):
48 51 raise ValueError('element %r in list must be a string' % x)
49 52
50 53
51 54 def validate_string_dict(dct):
52 55 """Validate that the input is a dict with string keys and values.
53 56
54 57 Raises ValueError if not."""
55 58 for k,v in iteritems(dct):
56 59 if not isinstance(k, string_types):
57 60 raise ValueError('key %r in dict must be a string' % k)
58 61 if not isinstance(v, string_types):
59 62 raise ValueError('value %r in dict must be a string' % v)
60 63
61 64
62 65 #-----------------------------------------------------------------------------
63 66 # ZMQ Socket Channel classes
64 67 #-----------------------------------------------------------------------------
65 68
66 69 class ZMQSocketChannel(Thread):
67 70 """The base class for the channels that use ZMQ sockets."""
68 71 context = None
69 72 session = None
70 73 socket = None
71 74 ioloop = None
72 75 stream = None
73 76 _address = None
74 77 _exiting = False
75 78 proxy_methods = []
76 79
77 80 def __init__(self, context, session, address):
78 81 """Create a channel.
79 82
80 83 Parameters
81 84 ----------
82 85 context : :class:`zmq.Context`
83 86 The ZMQ context to use.
84 87 session : :class:`session.Session`
85 88 The session to use.
86 89 address : zmq url
87 90 Standard (ip, port) tuple that the kernel is listening on.
88 91 """
89 92 super(ZMQSocketChannel, self).__init__()
90 93 self.daemon = True
91 94
92 95 self.context = context
93 96 self.session = session
94 97 if isinstance(address, tuple):
95 98 if address[1] == 0:
96 99 message = 'The port number for a channel cannot be 0.'
97 100 raise InvalidPortNumber(message)
98 101 address = "tcp://%s:%i" % address
99 102 self._address = address
100 103 atexit.register(self._notice_exit)
101 104
102 105 def _notice_exit(self):
103 106 self._exiting = True
104 107
105 108 def _run_loop(self):
106 109 """Run my loop, ignoring EINTR events in the poller"""
107 110 while True:
108 111 try:
109 112 self.ioloop.start()
110 113 except ZMQError as e:
111 114 if e.errno == errno.EINTR:
112 115 continue
113 116 else:
114 117 raise
115 118 except Exception:
116 119 if self._exiting:
117 120 break
118 121 else:
119 122 raise
120 123 else:
121 124 break
122 125
123 126 def stop(self):
124 127 """Stop the channel's event loop and join its thread.
125 128
126 129 This calls :meth:`~threading.Thread.join` and returns when the thread
127 130 terminates. :class:`RuntimeError` will be raised if
128 131 :meth:`~threading.Thread.start` is called again.
129 132 """
130 133 if self.ioloop is not None:
131 134 self.ioloop.stop()
132 135 self.join()
133 136 self.close()
134 137
135 138 def close(self):
136 139 if self.ioloop is not None:
137 140 try:
138 141 self.ioloop.close(all_fds=True)
139 142 except Exception:
140 143 pass
141 144 if self.socket is not None:
142 145 try:
143 146 self.socket.close(linger=0)
144 147 except Exception:
145 148 pass
146 149 self.socket = None
147 150
148 151 @property
149 152 def address(self):
150 153 """Get the channel's address as a zmq url string.
151 154
152 155 These URLS have the form: 'tcp://127.0.0.1:5555'.
153 156 """
154 157 return self._address
155 158
156 159 def _queue_send(self, msg):
157 160 """Queue a message to be sent from the IOLoop's thread.
158 161
159 162 Parameters
160 163 ----------
161 164 msg : message to send
162 165
163 166 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
164 167 thread control of the action.
165 168 """
166 169 def thread_send():
167 170 self.session.send(self.stream, msg)
168 171 self.ioloop.add_callback(thread_send)
169 172
170 173 def _handle_recv(self, msg):
171 174 """Callback for stream.on_recv.
172 175
173 176 Unpacks message, and calls handlers with it.
174 177 """
175 178 ident,smsg = self.session.feed_identities(msg)
176 self.call_handlers(self.session.unserialize(smsg))
179 msg = self.session.unserialize(smsg)
180 self.call_handlers(msg)
177 181
178 182
179 183
180 184 class ShellChannel(ZMQSocketChannel):
181 185 """The shell channel for issuing request/replies to the kernel."""
182 186
183 187 command_queue = None
184 188 # flag for whether execute requests should be allowed to call raw_input:
185 189 allow_stdin = True
186 190 proxy_methods = [
187 191 'execute',
188 192 'complete',
189 193 'inspect',
190 194 'history',
191 195 'kernel_info',
192 196 'shutdown',
193 197 ]
194 198
195 199 def __init__(self, context, session, address):
196 200 super(ShellChannel, self).__init__(context, session, address)
197 201 self.ioloop = ioloop.IOLoop()
198
202
199 203 def run(self):
200 204 """The thread's main activity. Call start() instead."""
201 205 self.socket = self.context.socket(zmq.DEALER)
202 206 self.socket.linger = 1000
203 207 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
204 208 self.socket.connect(self.address)
205 209 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
206 210 self.stream.on_recv(self._handle_recv)
207 211 self._run_loop()
208 212
209 213 def call_handlers(self, msg):
210 214 """This method is called in the ioloop thread when a message arrives.
211 215
212 216 Subclasses should override this method to handle incoming messages.
213 217 It is important to remember that this method is called in the thread
214 218 so that some logic must be done to ensure that the application level
215 219 handlers are called in the application thread.
216 220 """
217 221 raise NotImplementedError('call_handlers must be defined in a subclass.')
218 222
219 223 def execute(self, code, silent=False, store_history=True,
220 224 user_expressions=None, allow_stdin=None):
221 225 """Execute code in the kernel.
222 226
223 227 Parameters
224 228 ----------
225 229 code : str
226 230 A string of Python code.
227 231
228 232 silent : bool, optional (default False)
229 233 If set, the kernel will execute the code as quietly possible, and
230 234 will force store_history to be False.
231 235
232 236 store_history : bool, optional (default True)
233 237 If set, the kernel will store command history. This is forced
234 238 to be False if silent is True.
235 239
236 240 user_expressions : dict, optional
237 241 A dict mapping names to expressions to be evaluated in the user's
238 242 dict. The expression values are returned as strings formatted using
239 243 :func:`repr`.
240 244
241 245 allow_stdin : bool, optional (default self.allow_stdin)
242 246 Flag for whether the kernel can send stdin requests to frontends.
243 247
244 248 Some frontends (e.g. the Notebook) do not support stdin requests.
245 249 If raw_input is called from code executed from such a frontend, a
246 250 StdinNotImplementedError will be raised.
247 251
248 252 Returns
249 253 -------
250 254 The msg_id of the message sent.
251 255 """
252 256 if user_expressions is None:
253 257 user_expressions = {}
254 258 if allow_stdin is None:
255 259 allow_stdin = self.allow_stdin
256 260
257 261
258 262 # Don't waste network traffic if inputs are invalid
259 263 if not isinstance(code, string_types):
260 264 raise ValueError('code %r must be a string' % code)
261 265 validate_string_dict(user_expressions)
262 266
263 267 # Create class for content/msg creation. Related to, but possibly
264 268 # not in Session.
265 269 content = dict(code=code, silent=silent, store_history=store_history,
266 270 user_expressions=user_expressions,
267 271 allow_stdin=allow_stdin,
268 272 )
269 273 msg = self.session.msg('execute_request', content)
270 274 self._queue_send(msg)
271 275 return msg['header']['msg_id']
272 276
273 277 def complete(self, code, cursor_pos=None):
274 278 """Tab complete text in the kernel's namespace.
275 279
276 280 Parameters
277 281 ----------
278 282 code : str
279 283 The context in which completion is requested.
280 284 Can be anything between a variable name and an entire cell.
281 285 cursor_pos : int, optional
282 286 The position of the cursor in the block of code where the completion was requested.
283 287 Default: ``len(code)``
284 288
285 289 Returns
286 290 -------
287 291 The msg_id of the message sent.
288 292 """
289 293 if cursor_pos is None:
290 294 cursor_pos = len(code)
291 295 content = dict(code=code, cursor_pos=cursor_pos)
292 296 msg = self.session.msg('complete_request', content)
293 297 self._queue_send(msg)
294 298 return msg['header']['msg_id']
295 299
296 300 def inspect(self, code, cursor_pos=None, detail_level=0):
297 301 """Get metadata information about an object in the kernel's namespace.
298 302
299 303 It is up to the kernel to determine the appropriate object to inspect.
300 304
301 305 Parameters
302 306 ----------
303 307 code : str
304 308 The context in which info is requested.
305 309 Can be anything between a variable name and an entire cell.
306 310 cursor_pos : int, optional
307 311 The position of the cursor in the block of code where the info was requested.
308 312 Default: ``len(code)``
309 313 detail_level : int, optional
310 314 The level of detail for the introspection (0-2)
311 315
312 316 Returns
313 317 -------
314 318 The msg_id of the message sent.
315 319 """
316 320 if cursor_pos is None:
317 321 cursor_pos = len(code)
318 322 content = dict(code=code, cursor_pos=cursor_pos,
319 323 detail_level=detail_level,
320 324 )
321 325 msg = self.session.msg('inspect_request', content)
322 326 self._queue_send(msg)
323 327 return msg['header']['msg_id']
324 328
325 329 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
326 330 """Get entries from the kernel's history list.
327 331
328 332 Parameters
329 333 ----------
330 334 raw : bool
331 335 If True, return the raw input.
332 336 output : bool
333 337 If True, then return the output as well.
334 338 hist_access_type : str
335 339 'range' (fill in session, start and stop params), 'tail' (fill in n)
336 340 or 'search' (fill in pattern param).
337 341
338 342 session : int
339 343 For a range request, the session from which to get lines. Session
340 344 numbers are positive integers; negative ones count back from the
341 345 current session.
342 346 start : int
343 347 The first line number of a history range.
344 348 stop : int
345 349 The final (excluded) line number of a history range.
346 350
347 351 n : int
348 352 The number of lines of history to get for a tail request.
349 353
350 354 pattern : str
351 355 The glob-syntax pattern for a search request.
352 356
353 357 Returns
354 358 -------
355 359 The msg_id of the message sent.
356 360 """
357 361 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
358 362 **kwargs)
359 363 msg = self.session.msg('history_request', content)
360 364 self._queue_send(msg)
361 365 return msg['header']['msg_id']
362 366
363 367 def kernel_info(self):
364 368 """Request kernel info."""
365 369 msg = self.session.msg('kernel_info_request')
366 370 self._queue_send(msg)
367 371 return msg['header']['msg_id']
372
373 def _handle_kernel_info_reply(self, msg):
374 """handle kernel info reply
375
376 sets protocol adaptation version
377 """
378 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
379 if adapt_version != major_protocol_version:
380 self.session.adapt_version = adapt_version
368 381
369 382 def shutdown(self, restart=False):
370 383 """Request an immediate kernel shutdown.
371 384
372 385 Upon receipt of the (empty) reply, client code can safely assume that
373 386 the kernel has shut down and it's safe to forcefully terminate it if
374 387 it's still alive.
375 388
376 389 The kernel will send the reply via a function registered with Python's
377 390 atexit module, ensuring it's truly done as the kernel is done with all
378 391 normal operation.
379 392 """
380 393 # Send quit message to kernel. Once we implement kernel-side setattr,
381 394 # this should probably be done that way, but for now this will do.
382 395 msg = self.session.msg('shutdown_request', {'restart':restart})
383 396 self._queue_send(msg)
384 397 return msg['header']['msg_id']
385 398
386 399
387 400
388 401 class IOPubChannel(ZMQSocketChannel):
389 402 """The iopub channel which listens for messages that the kernel publishes.
390 403
391 404 This channel is where all output is published to frontends.
392 405 """
393 406
394 407 def __init__(self, context, session, address):
395 408 super(IOPubChannel, self).__init__(context, session, address)
396 409 self.ioloop = ioloop.IOLoop()
397 410
398 411 def run(self):
399 412 """The thread's main activity. Call start() instead."""
400 413 self.socket = self.context.socket(zmq.SUB)
401 414 self.socket.linger = 1000
402 415 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
403 416 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
404 417 self.socket.connect(self.address)
405 418 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
406 419 self.stream.on_recv(self._handle_recv)
407 420 self._run_loop()
408 421
409 422 def call_handlers(self, msg):
410 423 """This method is called in the ioloop thread when a message arrives.
411 424
412 425 Subclasses should override this method to handle incoming messages.
413 426 It is important to remember that this method is called in the thread
414 427 so that some logic must be done to ensure that the application leve
415 428 handlers are called in the application thread.
416 429 """
417 430 raise NotImplementedError('call_handlers must be defined in a subclass.')
418 431
419 432 def flush(self, timeout=1.0):
420 433 """Immediately processes all pending messages on the iopub channel.
421 434
422 435 Callers should use this method to ensure that :meth:`call_handlers`
423 436 has been called for all messages that have been received on the
424 437 0MQ SUB socket of this channel.
425 438
426 439 This method is thread safe.
427 440
428 441 Parameters
429 442 ----------
430 443 timeout : float, optional
431 444 The maximum amount of time to spend flushing, in seconds. The
432 445 default is one second.
433 446 """
434 447 # We do the IOLoop callback process twice to ensure that the IOLoop
435 448 # gets to perform at least one full poll.
436 449 stop_time = time.time() + timeout
437 450 for i in range(2):
438 451 self._flushed = False
439 452 self.ioloop.add_callback(self._flush)
440 453 while not self._flushed and time.time() < stop_time:
441 454 time.sleep(0.01)
442 455
443 456 def _flush(self):
444 457 """Callback for :method:`self.flush`."""
445 458 self.stream.flush()
446 459 self._flushed = True
447 460
448 461
449 462 class StdInChannel(ZMQSocketChannel):
450 463 """The stdin channel to handle raw_input requests that the kernel makes."""
451 464
452 465 msg_queue = None
453 466 proxy_methods = ['input']
454 467
455 468 def __init__(self, context, session, address):
456 469 super(StdInChannel, self).__init__(context, session, address)
457 470 self.ioloop = ioloop.IOLoop()
458 471
459 472 def run(self):
460 473 """The thread's main activity. Call start() instead."""
461 474 self.socket = self.context.socket(zmq.DEALER)
462 475 self.socket.linger = 1000
463 476 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
464 477 self.socket.connect(self.address)
465 478 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
466 479 self.stream.on_recv(self._handle_recv)
467 480 self._run_loop()
468 481
469 482 def call_handlers(self, msg):
470 483 """This method is called in the ioloop thread when a message arrives.
471 484
472 485 Subclasses should override this method to handle incoming messages.
473 486 It is important to remember that this method is called in the thread
474 487 so that some logic must be done to ensure that the application leve
475 488 handlers are called in the application thread.
476 489 """
477 490 raise NotImplementedError('call_handlers must be defined in a subclass.')
478 491
479 492 def input(self, string):
480 493 """Send a string of raw input to the kernel."""
481 494 content = dict(value=string)
482 495 msg = self.session.msg('input_reply', content)
483 496 self._queue_send(msg)
484 497
485 498
486 499 class HBChannel(ZMQSocketChannel):
487 500 """The heartbeat channel which monitors the kernel heartbeat.
488 501
489 502 Note that the heartbeat channel is paused by default. As long as you start
490 503 this channel, the kernel manager will ensure that it is paused and un-paused
491 504 as appropriate.
492 505 """
493 506
494 507 time_to_dead = 3.0
495 508 socket = None
496 509 poller = None
497 510 _running = None
498 511 _pause = None
499 512 _beating = None
500 513
501 514 def __init__(self, context, session, address):
502 515 super(HBChannel, self).__init__(context, session, address)
503 516 self._running = False
504 517 self._pause =True
505 518 self.poller = zmq.Poller()
506 519
507 520 def _create_socket(self):
508 521 if self.socket is not None:
509 522 # close previous socket, before opening a new one
510 523 self.poller.unregister(self.socket)
511 524 self.socket.close()
512 525 self.socket = self.context.socket(zmq.REQ)
513 526 self.socket.linger = 1000
514 527 self.socket.connect(self.address)
515 528
516 529 self.poller.register(self.socket, zmq.POLLIN)
517 530
518 531 def _poll(self, start_time):
519 532 """poll for heartbeat replies until we reach self.time_to_dead.
520 533
521 534 Ignores interrupts, and returns the result of poll(), which
522 535 will be an empty list if no messages arrived before the timeout,
523 536 or the event tuple if there is a message to receive.
524 537 """
525 538
526 539 until_dead = self.time_to_dead - (time.time() - start_time)
527 540 # ensure poll at least once
528 541 until_dead = max(until_dead, 1e-3)
529 542 events = []
530 543 while True:
531 544 try:
532 545 events = self.poller.poll(1000 * until_dead)
533 546 except ZMQError as e:
534 547 if e.errno == errno.EINTR:
535 548 # ignore interrupts during heartbeat
536 549 # this may never actually happen
537 550 until_dead = self.time_to_dead - (time.time() - start_time)
538 551 until_dead = max(until_dead, 1e-3)
539 552 pass
540 553 else:
541 554 raise
542 555 except Exception:
543 556 if self._exiting:
544 557 break
545 558 else:
546 559 raise
547 560 else:
548 561 break
549 562 return events
550 563
551 564 def run(self):
552 565 """The thread's main activity. Call start() instead."""
553 566 self._create_socket()
554 567 self._running = True
555 568 self._beating = True
556 569
557 570 while self._running:
558 571 if self._pause:
559 572 # just sleep, and skip the rest of the loop
560 573 time.sleep(self.time_to_dead)
561 574 continue
562 575
563 576 since_last_heartbeat = 0.0
564 577 # io.rprint('Ping from HB channel') # dbg
565 578 # no need to catch EFSM here, because the previous event was
566 579 # either a recv or connect, which cannot be followed by EFSM
567 580 self.socket.send(b'ping')
568 581 request_time = time.time()
569 582 ready = self._poll(request_time)
570 583 if ready:
571 584 self._beating = True
572 585 # the poll above guarantees we have something to recv
573 586 self.socket.recv()
574 587 # sleep the remainder of the cycle
575 588 remainder = self.time_to_dead - (time.time() - request_time)
576 589 if remainder > 0:
577 590 time.sleep(remainder)
578 591 continue
579 592 else:
580 593 # nothing was received within the time limit, signal heart failure
581 594 self._beating = False
582 595 since_last_heartbeat = time.time() - request_time
583 596 self.call_handlers(since_last_heartbeat)
584 597 # and close/reopen the socket, because the REQ/REP cycle has been broken
585 598 self._create_socket()
586 599 continue
587 600
588 601 def pause(self):
589 602 """Pause the heartbeat."""
590 603 self._pause = True
591 604
592 605 def unpause(self):
593 606 """Unpause the heartbeat."""
594 607 self._pause = False
595 608
596 609 def is_beating(self):
597 610 """Is the heartbeat running and responsive (and not paused)."""
598 611 if self.is_alive() and not self._pause and self._beating:
599 612 return True
600 613 else:
601 614 return False
602 615
603 616 def stop(self):
604 617 """Stop the channel's event loop and join its thread."""
605 618 self._running = False
606 619 super(HBChannel, self).stop()
607 620
608 621 def call_handlers(self, since_last_heartbeat):
609 622 """This method is called in the ioloop thread when a message arrives.
610 623
611 624 Subclasses should override this method to handle incoming messages.
612 625 It is important to remember that this method is called in the thread
613 626 so that some logic must be done to ensure that the application level
614 627 handlers are called in the application thread.
615 628 """
616 629 raise NotImplementedError('call_handlers must be defined in a subclass.')
617 630
618 631
619 632 #---------------------------------------------------------------------#-----------------------------------------------------------------------------
620 633 # ABC Registration
621 634 #-----------------------------------------------------------------------------
622 635
623 636 ShellChannelABC.register(ShellChannel)
624 637 IOPubChannelABC.register(IOPubChannel)
625 638 HBChannelABC.register(HBChannel)
626 639 StdInChannelABC.register(StdInChannel)
@@ -1,215 +1,219 b''
1 1 """Defines a KernelManager that provides signals and slots."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from IPython.external.qt import QtCore
7 7
8 8 from IPython.utils.traitlets import HasTraits, Type
9 9 from .util import MetaQObjectHasTraits, SuperQObject
10 10
11 11
12 12 class ChannelQObject(SuperQObject):
13 13
14 14 # Emitted when the channel is started.
15 15 started = QtCore.Signal()
16 16
17 17 # Emitted when the channel is stopped.
18 18 stopped = QtCore.Signal()
19 19
20 20 #---------------------------------------------------------------------------
21 21 # Channel interface
22 22 #---------------------------------------------------------------------------
23 23
24 24 def start(self):
25 25 """ Reimplemented to emit signal.
26 26 """
27 27 super(ChannelQObject, self).start()
28 28 self.started.emit()
29 29
30 30 def stop(self):
31 31 """ Reimplemented to emit signal.
32 32 """
33 33 super(ChannelQObject, self).stop()
34 34 self.stopped.emit()
35 35
36 36 #---------------------------------------------------------------------------
37 37 # InProcessChannel interface
38 38 #---------------------------------------------------------------------------
39 39
40 40 def call_handlers_later(self, *args, **kwds):
41 41 """ Call the message handlers later.
42 42 """
43 43 do_later = lambda: self.call_handlers(*args, **kwds)
44 44 QtCore.QTimer.singleShot(0, do_later)
45 45
46 46 def process_events(self):
47 47 """ Process any pending GUI events.
48 48 """
49 49 QtCore.QCoreApplication.instance().processEvents()
50 50
51 51
52 52 class QtShellChannelMixin(ChannelQObject):
53 53
54 54 # Emitted when any message is received.
55 55 message_received = QtCore.Signal(object)
56 56
57 57 # Emitted when a reply has been received for the corresponding request type.
58 58 execute_reply = QtCore.Signal(object)
59 59 complete_reply = QtCore.Signal(object)
60 60 inspect_reply = QtCore.Signal(object)
61 61 history_reply = QtCore.Signal(object)
62 kernel_info_reply = QtCore.Signal(object)
62 63
63 64 #---------------------------------------------------------------------------
64 65 # 'ShellChannel' interface
65 66 #---------------------------------------------------------------------------
66 67
67 68 def call_handlers(self, msg):
68 69 """ Reimplemented to emit signals instead of making callbacks.
69 70 """
70 71 # Emit the generic signal.
71 72 self.message_received.emit(msg)
72 73
73 74 # Emit signals for specialized message types.
74 75 msg_type = msg['header']['msg_type']
76 if msg_type == 'kernel_info_reply':
77 self._handle_kernel_info_reply(msg)
78
75 79 signal = getattr(self, msg_type, None)
76 80 if signal:
77 81 signal.emit(msg)
78 82
79 83
80 84 class QtIOPubChannelMixin(ChannelQObject):
81 85
82 86 # Emitted when any message is received.
83 87 message_received = QtCore.Signal(object)
84 88
85 89 # Emitted when a message of type 'stream' is received.
86 90 stream_received = QtCore.Signal(object)
87 91
88 92 # Emitted when a message of type 'execute_input' is received.
89 93 execute_input_received = QtCore.Signal(object)
90 94
91 95 # Emitted when a message of type 'execute_result' is received.
92 96 execute_result_received = QtCore.Signal(object)
93 97
94 98 # Emitted when a message of type 'error' is received.
95 99 error_received = QtCore.Signal(object)
96 100
97 101 # Emitted when a message of type 'display_data' is received
98 102 display_data_received = QtCore.Signal(object)
99 103
100 104 # Emitted when a crash report message is received from the kernel's
101 105 # last-resort sys.excepthook.
102 106 crash_received = QtCore.Signal(object)
103 107
104 108 # Emitted when a shutdown is noticed.
105 109 shutdown_reply_received = QtCore.Signal(object)
106 110
107 111 #---------------------------------------------------------------------------
108 112 # 'IOPubChannel' interface
109 113 #---------------------------------------------------------------------------
110 114
111 115 def call_handlers(self, msg):
112 116 """ Reimplemented to emit signals instead of making callbacks.
113 117 """
114 118 # Emit the generic signal.
115 119 self.message_received.emit(msg)
116 120 # Emit signals for specialized message types.
117 121 msg_type = msg['header']['msg_type']
118 122 signal = getattr(self, msg_type + '_received', None)
119 123 if signal:
120 124 signal.emit(msg)
121 125 elif msg_type in ('stdout', 'stderr'):
122 126 self.stream_received.emit(msg)
123 127
124 128 def flush(self):
125 129 """ Reimplemented to ensure that signals are dispatched immediately.
126 130 """
127 131 super(QtIOPubChannelMixin, self).flush()
128 132 QtCore.QCoreApplication.instance().processEvents()
129 133
130 134
131 135 class QtStdInChannelMixin(ChannelQObject):
132 136
133 137 # Emitted when any message is received.
134 138 message_received = QtCore.Signal(object)
135 139
136 140 # Emitted when an input request is received.
137 141 input_requested = QtCore.Signal(object)
138 142
139 143 #---------------------------------------------------------------------------
140 144 # 'StdInChannel' interface
141 145 #---------------------------------------------------------------------------
142 146
143 147 def call_handlers(self, msg):
144 148 """ Reimplemented to emit signals instead of making callbacks.
145 149 """
146 150 # Emit the generic signal.
147 151 self.message_received.emit(msg)
148 152
149 153 # Emit signals for specialized message types.
150 154 msg_type = msg['header']['msg_type']
151 155 if msg_type == 'input_request':
152 156 self.input_requested.emit(msg)
153 157
154 158
155 159 class QtHBChannelMixin(ChannelQObject):
156 160
157 161 # Emitted when the kernel has died.
158 162 kernel_died = QtCore.Signal(object)
159 163
160 164 #---------------------------------------------------------------------------
161 165 # 'HBChannel' interface
162 166 #---------------------------------------------------------------------------
163 167
164 168 def call_handlers(self, since_last_heartbeat):
165 169 """ Reimplemented to emit signals instead of making callbacks.
166 170 """
167 171 # Emit the generic signal.
168 172 self.kernel_died.emit(since_last_heartbeat)
169 173
170 174
171 175 class QtKernelRestarterMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
172 176
173 177 _timer = None
174 178
175 179
176 180 class QtKernelManagerMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
177 181 """ A KernelClient that provides signals and slots.
178 182 """
179 183
180 184 kernel_restarted = QtCore.Signal()
181 185
182 186
183 187 class QtKernelClientMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
184 188 """ A KernelClient that provides signals and slots.
185 189 """
186 190
187 191 # Emitted when the kernel client has started listening.
188 192 started_channels = QtCore.Signal()
189 193
190 194 # Emitted when the kernel client has stopped listening.
191 195 stopped_channels = QtCore.Signal()
192 196
193 197 # Use Qt-specific channel classes that emit signals.
194 198 iopub_channel_class = Type(QtIOPubChannelMixin)
195 199 shell_channel_class = Type(QtShellChannelMixin)
196 200 stdin_channel_class = Type(QtStdInChannelMixin)
197 201 hb_channel_class = Type(QtHBChannelMixin)
198 202
199 203 #---------------------------------------------------------------------------
200 204 # 'KernelClient' interface
201 205 #---------------------------------------------------------------------------
202 206
203 207 #------ Channel management -------------------------------------------------
204 208
205 209 def start_channels(self, *args, **kw):
206 210 """ Reimplemented to emit signal.
207 211 """
208 212 super(QtKernelClientMixin, self).start_channels(*args, **kw)
209 213 self.started_channels.emit()
210 214
211 215 def stop_channels(self):
212 216 """ Reimplemented to emit signal.
213 217 """
214 218 super(QtKernelClientMixin, self).stop_channels()
215 219 self.stopped_channels.emit()
General Comments 0
You need to be logged in to leave comments. Login now