##// END OF EJS Templates
interrogate kernel_info to get protocol version for adaptation
MinRK -
Show More
@@ -1,150 +1,134 b''
1 """Tornado handlers for WebSocket <-> ZMQ sockets.
1 """Tornado handlers for WebSocket <-> ZMQ sockets."""
2
2
3 Authors:
3 # Copyright (c) IPython Development Team.
4
4 # Distributed under the terms of the Modified BSD License.
5 * Brian Granger
6 """
7
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2008-2011 The IPython Development Team
10 #
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
14
15 #-----------------------------------------------------------------------------
16 # Imports
17 #-----------------------------------------------------------------------------
18
5
19 try:
6 try:
20 from urllib.parse import urlparse # Py 3
7 from urllib.parse import urlparse # Py 3
21 except ImportError:
8 except ImportError:
22 from urlparse import urlparse # Py 2
9 from urlparse import urlparse # Py 2
23
10
24 try:
11 try:
25 from http.cookies import SimpleCookie # Py 3
12 from http.cookies import SimpleCookie # Py 3
26 except ImportError:
13 except ImportError:
27 from Cookie import SimpleCookie # Py 2
14 from Cookie import SimpleCookie # Py 2
28 import logging
15 import logging
29 from tornado import web
16 from tornado import web
30 from tornado import websocket
17 from tornado import websocket
31
18
32 from zmq.utils import jsonapi
19 from zmq.utils import jsonapi
33
20
34 from IPython.kernel.zmq.session import Session
21 from IPython.kernel.zmq.session import Session
35 from IPython.utils.jsonutil import date_default
22 from IPython.utils.jsonutil import date_default
36 from IPython.utils.py3compat import PY3, cast_unicode
23 from IPython.utils.py3compat import PY3, cast_unicode
37
24
38 from .handlers import IPythonHandler
25 from .handlers import IPythonHandler
39
26
40 #-----------------------------------------------------------------------------
41 # ZMQ handlers
42 #-----------------------------------------------------------------------------
43
27
44 class ZMQStreamHandler(websocket.WebSocketHandler):
28 class ZMQStreamHandler(websocket.WebSocketHandler):
45
29
46 def same_origin(self):
30 def same_origin(self):
47 """Check to see that origin and host match in the headers."""
31 """Check to see that origin and host match in the headers."""
48
32
49 # The difference between version 8 and 13 is that in 8 the
33 # The difference between version 8 and 13 is that in 8 the
50 # client sends a "Sec-Websocket-Origin" header and in 13 it's
34 # client sends a "Sec-Websocket-Origin" header and in 13 it's
51 # simply "Origin".
35 # simply "Origin".
52 if self.request.headers.get("Sec-WebSocket-Version") in ("7", "8"):
36 if self.request.headers.get("Sec-WebSocket-Version") in ("7", "8"):
53 origin_header = self.request.headers.get("Sec-Websocket-Origin")
37 origin_header = self.request.headers.get("Sec-Websocket-Origin")
54 else:
38 else:
55 origin_header = self.request.headers.get("Origin")
39 origin_header = self.request.headers.get("Origin")
56
40
57 host = self.request.headers.get("Host")
41 host = self.request.headers.get("Host")
58
42
59 # If no header is provided, assume we can't verify origin
43 # If no header is provided, assume we can't verify origin
60 if(origin_header is None or host is None):
44 if(origin_header is None or host is None):
61 return False
45 return False
62
46
63 parsed_origin = urlparse(origin_header)
47 parsed_origin = urlparse(origin_header)
64 origin = parsed_origin.netloc
48 origin = parsed_origin.netloc
65
49
66 # Check to see that origin matches host directly, including ports
50 # Check to see that origin matches host directly, including ports
67 return origin == host
51 return origin == host
68
52
69 def clear_cookie(self, *args, **kwargs):
53 def clear_cookie(self, *args, **kwargs):
70 """meaningless for websockets"""
54 """meaningless for websockets"""
71 pass
55 pass
72
56
73 def _reserialize_reply(self, msg_list):
57 def _reserialize_reply(self, msg_list):
74 """Reserialize a reply message using JSON.
58 """Reserialize a reply message using JSON.
75
59
76 This takes the msg list from the ZMQ socket, unserializes it using
60 This takes the msg list from the ZMQ socket, unserializes it using
77 self.session and then serializes the result using JSON. This method
61 self.session and then serializes the result using JSON. This method
78 should be used by self._on_zmq_reply to build messages that can
62 should be used by self._on_zmq_reply to build messages that can
79 be sent back to the browser.
63 be sent back to the browser.
80 """
64 """
81 idents, msg_list = self.session.feed_identities(msg_list)
65 idents, msg_list = self.session.feed_identities(msg_list)
82 msg = self.session.unserialize(msg_list)
66 msg = self.session.unserialize(msg_list)
83 try:
67 try:
84 msg['header'].pop('date')
68 msg['header'].pop('date')
85 except KeyError:
69 except KeyError:
86 pass
70 pass
87 try:
71 try:
88 msg['parent_header'].pop('date')
72 msg['parent_header'].pop('date')
89 except KeyError:
73 except KeyError:
90 pass
74 pass
91 msg.pop('buffers')
75 msg.pop('buffers')
92 return jsonapi.dumps(msg, default=date_default)
76 return jsonapi.dumps(msg, default=date_default)
93
77
94 def _on_zmq_reply(self, msg_list):
78 def _on_zmq_reply(self, msg_list):
95 # Sometimes this gets triggered when the on_close method is scheduled in the
79 # Sometimes this gets triggered when the on_close method is scheduled in the
96 # eventloop but hasn't been called.
80 # eventloop but hasn't been called.
97 if self.stream.closed(): return
81 if self.stream.closed(): return
98 try:
82 try:
99 msg = self._reserialize_reply(msg_list)
83 msg = self._reserialize_reply(msg_list)
100 except Exception:
84 except Exception:
101 self.log.critical("Malformed message: %r" % msg_list, exc_info=True)
85 self.log.critical("Malformed message: %r" % msg_list, exc_info=True)
102 else:
86 else:
103 self.write_message(msg)
87 self.write_message(msg)
104
88
105 def allow_draft76(self):
89 def allow_draft76(self):
106 """Allow draft 76, until browsers such as Safari update to RFC 6455.
90 """Allow draft 76, until browsers such as Safari update to RFC 6455.
107
91
108 This has been disabled by default in tornado in release 2.2.0, and
92 This has been disabled by default in tornado in release 2.2.0, and
109 support will be removed in later versions.
93 support will be removed in later versions.
110 """
94 """
111 return True
95 return True
112
96
113
97
114 class AuthenticatedZMQStreamHandler(ZMQStreamHandler, IPythonHandler):
98 class AuthenticatedZMQStreamHandler(ZMQStreamHandler, IPythonHandler):
115
99
116 def open(self, kernel_id):
100 def open(self, kernel_id):
117 # Check to see that origin matches host directly, including ports
101 # Check to see that origin matches host directly, including ports
118 if not self.same_origin():
102 if not self.same_origin():
119 self.log.warn("Cross Origin WebSocket Attempt.")
103 self.log.warn("Cross Origin WebSocket Attempt.")
120 raise web.HTTPError(404)
104 raise web.HTTPError(404)
121
105
122 self.kernel_id = cast_unicode(kernel_id, 'ascii')
106 self.kernel_id = cast_unicode(kernel_id, 'ascii')
123 self.session = Session(config=self.config)
107 self.session = Session(config=self.config)
124 self.save_on_message = self.on_message
108 self.save_on_message = self.on_message
125 self.on_message = self.on_first_message
109 self.on_message = self.on_first_message
126
110
127 def _inject_cookie_message(self, msg):
111 def _inject_cookie_message(self, msg):
128 """Inject the first message, which is the document cookie,
112 """Inject the first message, which is the document cookie,
129 for authentication."""
113 for authentication."""
130 if not PY3 and isinstance(msg, unicode):
114 if not PY3 and isinstance(msg, unicode):
131 # Cookie constructor doesn't accept unicode strings
115 # Cookie constructor doesn't accept unicode strings
132 # under Python 2.x for some reason
116 # under Python 2.x for some reason
133 msg = msg.encode('utf8', 'replace')
117 msg = msg.encode('utf8', 'replace')
134 try:
118 try:
135 identity, msg = msg.split(':', 1)
119 identity, msg = msg.split(':', 1)
136 self.session.session = cast_unicode(identity, 'ascii')
120 self.session.session = cast_unicode(identity, 'ascii')
137 except Exception:
121 except Exception:
138 logging.error("First ws message didn't have the form 'identity:[cookie]' - %r", msg)
122 logging.error("First ws message didn't have the form 'identity:[cookie]' - %r", msg)
139
123
140 try:
124 try:
141 self.request._cookies = SimpleCookie(msg)
125 self.request._cookies = SimpleCookie(msg)
142 except:
126 except:
143 self.log.warn("couldn't parse cookie string: %s",msg, exc_info=True)
127 self.log.warn("couldn't parse cookie string: %s",msg, exc_info=True)
144
128
145 def on_first_message(self, msg):
129 def on_first_message(self, msg):
146 self._inject_cookie_message(msg)
130 self._inject_cookie_message(msg)
147 if self.get_current_user() is None:
131 if self.get_current_user() is None:
148 self.log.warn("Couldn't authenticate WebSocket connection")
132 self.log.warn("Couldn't authenticate WebSocket connection")
149 raise web.HTTPError(403)
133 raise web.HTTPError(403)
150 self.on_message = self.save_on_message
134 self.on_message = self.save_on_message
@@ -1,198 +1,211 b''
1 """Tornado handlers for the notebook.
1 """Tornado handlers for kernels."""
2
2
3 Authors:
3 # Copyright (c) IPython Development Team.
4
4 # Distributed under the terms of the Modified BSD License.
5 * Brian Granger
6 """
7
8 #-----------------------------------------------------------------------------
9 # Copyright (C) 2008-2011 The IPython Development Team
10 #
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
13 #-----------------------------------------------------------------------------
14
15 #-----------------------------------------------------------------------------
16 # Imports
17 #-----------------------------------------------------------------------------
18
5
19 import logging
6 import logging
20 from tornado import web
7 from tornado import web
21
8
22 from zmq.utils import jsonapi
9 from zmq.utils import jsonapi
23
10
24 from IPython.utils.jsonutil import date_default
11 from IPython.utils.jsonutil import date_default
12 from IPython.utils.py3compat import string_types
25 from IPython.html.utils import url_path_join, url_escape
13 from IPython.html.utils import url_path_join, url_escape
26
14
27 from ...base.handlers import IPythonHandler, json_errors
15 from ...base.handlers import IPythonHandler, json_errors
28 from ...base.zmqhandlers import AuthenticatedZMQStreamHandler
16 from ...base.zmqhandlers import AuthenticatedZMQStreamHandler
29
17
30 #-----------------------------------------------------------------------------
18 from IPython.core.release import kernel_protocol_version
31 # Kernel handlers
32 #-----------------------------------------------------------------------------
33
34
19
35 class MainKernelHandler(IPythonHandler):
20 class MainKernelHandler(IPythonHandler):
36
21
37 @web.authenticated
22 @web.authenticated
38 @json_errors
23 @json_errors
39 def get(self):
24 def get(self):
40 km = self.kernel_manager
25 km = self.kernel_manager
41 self.finish(jsonapi.dumps(km.list_kernels()))
26 self.finish(jsonapi.dumps(km.list_kernels()))
42
27
43 @web.authenticated
28 @web.authenticated
44 @json_errors
29 @json_errors
45 def post(self):
30 def post(self):
46 km = self.kernel_manager
31 km = self.kernel_manager
47 kernel_id = km.start_kernel()
32 kernel_id = km.start_kernel()
48 model = km.kernel_model(kernel_id)
33 model = km.kernel_model(kernel_id)
49 location = url_path_join(self.base_url, 'api', 'kernels', kernel_id)
34 location = url_path_join(self.base_url, 'api', 'kernels', kernel_id)
50 self.set_header('Location', url_escape(location))
35 self.set_header('Location', url_escape(location))
51 self.set_status(201)
36 self.set_status(201)
52 self.finish(jsonapi.dumps(model))
37 self.finish(jsonapi.dumps(model))
53
38
54
39
55 class KernelHandler(IPythonHandler):
40 class KernelHandler(IPythonHandler):
56
41
57 SUPPORTED_METHODS = ('DELETE', 'GET')
42 SUPPORTED_METHODS = ('DELETE', 'GET')
58
43
59 @web.authenticated
44 @web.authenticated
60 @json_errors
45 @json_errors
61 def get(self, kernel_id):
46 def get(self, kernel_id):
62 km = self.kernel_manager
47 km = self.kernel_manager
63 km._check_kernel_id(kernel_id)
48 km._check_kernel_id(kernel_id)
64 model = km.kernel_model(kernel_id)
49 model = km.kernel_model(kernel_id)
65 self.finish(jsonapi.dumps(model))
50 self.finish(jsonapi.dumps(model))
66
51
67 @web.authenticated
52 @web.authenticated
68 @json_errors
53 @json_errors
69 def delete(self, kernel_id):
54 def delete(self, kernel_id):
70 km = self.kernel_manager
55 km = self.kernel_manager
71 km.shutdown_kernel(kernel_id)
56 km.shutdown_kernel(kernel_id)
72 self.set_status(204)
57 self.set_status(204)
73 self.finish()
58 self.finish()
74
59
75
60
76 class KernelActionHandler(IPythonHandler):
61 class KernelActionHandler(IPythonHandler):
77
62
78 @web.authenticated
63 @web.authenticated
79 @json_errors
64 @json_errors
80 def post(self, kernel_id, action):
65 def post(self, kernel_id, action):
81 km = self.kernel_manager
66 km = self.kernel_manager
82 if action == 'interrupt':
67 if action == 'interrupt':
83 km.interrupt_kernel(kernel_id)
68 km.interrupt_kernel(kernel_id)
84 self.set_status(204)
69 self.set_status(204)
85 if action == 'restart':
70 if action == 'restart':
86 km.restart_kernel(kernel_id)
71 km.restart_kernel(kernel_id)
87 model = km.kernel_model(kernel_id)
72 model = km.kernel_model(kernel_id)
88 self.set_header('Location', '{0}api/kernels/{1}'.format(self.base_url, kernel_id))
73 self.set_header('Location', '{0}api/kernels/{1}'.format(self.base_url, kernel_id))
89 self.write(jsonapi.dumps(model))
74 self.write(jsonapi.dumps(model))
90 self.finish()
75 self.finish()
91
76
92
77
93 class ZMQChannelHandler(AuthenticatedZMQStreamHandler):
78 class ZMQChannelHandler(AuthenticatedZMQStreamHandler):
94
79
95 def create_stream(self):
80 def create_stream(self):
96 km = self.kernel_manager
81 km = self.kernel_manager
97 meth = getattr(km, 'connect_%s' % self.channel)
82 meth = getattr(km, 'connect_%s' % self.channel)
98 self.zmq_stream = meth(self.kernel_id, identity=self.session.bsession)
83 self.zmq_stream = meth(self.kernel_id, identity=self.session.bsession)
84 self.kernel_info_channel = None
85 self.kernel_info_channel = km.connect_shell(self.kernel_id)
86 self.kernel_info_channel.on_recv(self._handle_kernel_info)
87 self._request_kernel_info()
88
89 def _request_kernel_info(self):
90 self.log.debug("requesting kernel info")
91 self.session.send(self.kernel_info_channel, "kernel_info_request")
92
93 def _handle_kernel_info(self, msg):
94 idents,msg = self.session.feed_identities(msg)
95 try:
96 msg = self.session.unserialize(msg)
97 except:
98 self.log.error("Bad kernel_info reply", exc_info=True)
99 self._request_kernel_info()
100 return
101 else:
102 if msg['msg_type'] != 'kernel_info_reply' or 'protocol_version' not in msg['content']:
103 self.log.error("Kernel info request failed, assuming current %s", msg['content'])
104 else:
105 protocol_version = msg['content']['protocol_version']
106 if protocol_version != kernel_protocol_version:
107 self.session.adapt_version = int(protocol_version.split('.')[0])
108 self.log.info("adapting kernel to %s" % protocol_version)
109 self.kernel_info_channel.close()
110 self.kernel_info_channel = None
111
99
112
100 def initialize(self, *args, **kwargs):
113 def initialize(self, *args, **kwargs):
101 self.zmq_stream = None
114 self.zmq_stream = None
102
115
103 def on_first_message(self, msg):
116 def on_first_message(self, msg):
104 try:
117 try:
105 super(ZMQChannelHandler, self).on_first_message(msg)
118 super(ZMQChannelHandler, self).on_first_message(msg)
106 except web.HTTPError:
119 except web.HTTPError:
107 self.close()
120 self.close()
108 return
121 return
109 try:
122 try:
110 self.create_stream()
123 self.create_stream()
111 except web.HTTPError:
124 except web.HTTPError:
112 # WebSockets don't response to traditional error codes so we
125 # WebSockets don't response to traditional error codes so we
113 # close the connection.
126 # close the connection.
114 if not self.stream.closed():
127 if not self.stream.closed():
115 self.stream.close()
128 self.stream.close()
116 self.close()
129 self.close()
117 else:
130 else:
118 self.zmq_stream.on_recv(self._on_zmq_reply)
131 self.zmq_stream.on_recv(self._on_zmq_reply)
119
132
120 def on_message(self, msg):
133 def on_message(self, msg):
121 msg = jsonapi.loads(msg)
134 msg = jsonapi.loads(msg)
122 self.session.send(self.zmq_stream, msg)
135 self.session.send(self.zmq_stream, msg)
123
136
124 def on_close(self):
137 def on_close(self):
125 # This method can be called twice, once by self.kernel_died and once
138 # This method can be called twice, once by self.kernel_died and once
126 # from the WebSocket close event. If the WebSocket connection is
139 # from the WebSocket close event. If the WebSocket connection is
127 # closed before the ZMQ streams are setup, they could be None.
140 # closed before the ZMQ streams are setup, they could be None.
128 if self.zmq_stream is not None and not self.zmq_stream.closed():
141 if self.zmq_stream is not None and not self.zmq_stream.closed():
129 self.zmq_stream.on_recv(None)
142 self.zmq_stream.on_recv(None)
130 # close the socket directly, don't wait for the stream
143 # close the socket directly, don't wait for the stream
131 socket = self.zmq_stream.socket
144 socket = self.zmq_stream.socket
132 self.zmq_stream.close()
145 self.zmq_stream.close()
133 socket.close()
146 socket.close()
134
147
135
148
136 class IOPubHandler(ZMQChannelHandler):
149 class IOPubHandler(ZMQChannelHandler):
137 channel = 'iopub'
150 channel = 'iopub'
138
151
139 def create_stream(self):
152 def create_stream(self):
140 super(IOPubHandler, self).create_stream()
153 super(IOPubHandler, self).create_stream()
141 km = self.kernel_manager
154 km = self.kernel_manager
142 km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
155 km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
143 km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
156 km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
144
157
145 def on_close(self):
158 def on_close(self):
146 km = self.kernel_manager
159 km = self.kernel_manager
147 if self.kernel_id in km:
160 if self.kernel_id in km:
148 km.remove_restart_callback(
161 km.remove_restart_callback(
149 self.kernel_id, self.on_kernel_restarted,
162 self.kernel_id, self.on_kernel_restarted,
150 )
163 )
151 km.remove_restart_callback(
164 km.remove_restart_callback(
152 self.kernel_id, self.on_restart_failed, 'dead',
165 self.kernel_id, self.on_restart_failed, 'dead',
153 )
166 )
154 super(IOPubHandler, self).on_close()
167 super(IOPubHandler, self).on_close()
155
168
156 def _send_status_message(self, status):
169 def _send_status_message(self, status):
157 msg = self.session.msg("status",
170 msg = self.session.msg("status",
158 {'execution_state': status}
171 {'execution_state': status}
159 )
172 )
160 self.write_message(jsonapi.dumps(msg, default=date_default))
173 self.write_message(jsonapi.dumps(msg, default=date_default))
161
174
162 def on_kernel_restarted(self):
175 def on_kernel_restarted(self):
163 logging.warn("kernel %s restarted", self.kernel_id)
176 logging.warn("kernel %s restarted", self.kernel_id)
164 self._send_status_message('restarting')
177 self._send_status_message('restarting')
165
178
166 def on_restart_failed(self):
179 def on_restart_failed(self):
167 logging.error("kernel %s restarted failed!", self.kernel_id)
180 logging.error("kernel %s restarted failed!", self.kernel_id)
168 self._send_status_message('dead')
181 self._send_status_message('dead')
169
182
170 def on_message(self, msg):
183 def on_message(self, msg):
171 """IOPub messages make no sense"""
184 """IOPub messages make no sense"""
172 pass
185 pass
173
186
174
187
175 class ShellHandler(ZMQChannelHandler):
188 class ShellHandler(ZMQChannelHandler):
176 channel = 'shell'
189 channel = 'shell'
177
190
178
191
179 class StdinHandler(ZMQChannelHandler):
192 class StdinHandler(ZMQChannelHandler):
180 channel = 'stdin'
193 channel = 'stdin'
181
194
182
195
183 #-----------------------------------------------------------------------------
196 #-----------------------------------------------------------------------------
184 # URL to handler mappings
197 # URL to handler mappings
185 #-----------------------------------------------------------------------------
198 #-----------------------------------------------------------------------------
186
199
187
200
188 _kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
201 _kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
189 _kernel_action_regex = r"(?P<action>restart|interrupt)"
202 _kernel_action_regex = r"(?P<action>restart|interrupt)"
190
203
191 default_handlers = [
204 default_handlers = [
192 (r"/api/kernels", MainKernelHandler),
205 (r"/api/kernels", MainKernelHandler),
193 (r"/api/kernels/%s" % _kernel_id_regex, KernelHandler),
206 (r"/api/kernels/%s" % _kernel_id_regex, KernelHandler),
194 (r"/api/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler),
207 (r"/api/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler),
195 (r"/api/kernels/%s/iopub" % _kernel_id_regex, IOPubHandler),
208 (r"/api/kernels/%s/iopub" % _kernel_id_regex, IOPubHandler),
196 (r"/api/kernels/%s/shell" % _kernel_id_regex, ShellHandler),
209 (r"/api/kernels/%s/shell" % _kernel_id_regex, ShellHandler),
197 (r"/api/kernels/%s/stdin" % _kernel_id_regex, StdinHandler)
210 (r"/api/kernels/%s/stdin" % _kernel_id_regex, StdinHandler)
198 ]
211 ]
@@ -1,82 +1,85 b''
1 """Blocking channels
1 """Blocking channels
2
2
3 Useful for test suites and blocking terminal interfaces.
3 Useful for test suites and blocking terminal interfaces.
4 """
4 """
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2013 The IPython Development Team
6 # Copyright (C) 2013 The IPython Development Team
7 #
7 #
8 # Distributed under the terms of the BSD License. The full license is in
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING.txt, distributed as part of this software.
9 # the file COPYING.txt, distributed as part of this software.
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11
11
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Imports
13 # Imports
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 try:
16 try:
17 from queue import Queue, Empty # Py 3
17 from queue import Queue, Empty # Py 3
18 except ImportError:
18 except ImportError:
19 from Queue import Queue, Empty # Py 2
19 from Queue import Queue, Empty # Py 2
20
20
21 from IPython.kernel.channels import IOPubChannel, HBChannel, \
21 from IPython.kernel.channels import IOPubChannel, HBChannel, \
22 ShellChannel, StdInChannel
22 ShellChannel, StdInChannel
23
23
24 #-----------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
25 # Blocking kernel manager
25 # Blocking kernel manager
26 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
27
27
28
28
29 class BlockingChannelMixin(object):
29 class BlockingChannelMixin(object):
30
30
31 def __init__(self, *args, **kwds):
31 def __init__(self, *args, **kwds):
32 super(BlockingChannelMixin, self).__init__(*args, **kwds)
32 super(BlockingChannelMixin, self).__init__(*args, **kwds)
33 self._in_queue = Queue()
33 self._in_queue = Queue()
34
34
35 def call_handlers(self, msg):
35 def call_handlers(self, msg):
36 self._in_queue.put(msg)
36 self._in_queue.put(msg)
37
37
38 def get_msg(self, block=True, timeout=None):
38 def get_msg(self, block=True, timeout=None):
39 """ Gets a message if there is one that is ready. """
39 """ Gets a message if there is one that is ready. """
40 if timeout is None:
40 if timeout is None:
41 # Queue.get(timeout=None) has stupid uninteruptible
41 # Queue.get(timeout=None) has stupid uninteruptible
42 # behavior, so wait for a week instead
42 # behavior, so wait for a week instead
43 timeout = 604800
43 timeout = 604800
44 return self._in_queue.get(block, timeout)
44 return self._in_queue.get(block, timeout)
45
45
46 def get_msgs(self):
46 def get_msgs(self):
47 """ Get all messages that are currently ready. """
47 """ Get all messages that are currently ready. """
48 msgs = []
48 msgs = []
49 while True:
49 while True:
50 try:
50 try:
51 msgs.append(self.get_msg(block=False))
51 msgs.append(self.get_msg(block=False))
52 except Empty:
52 except Empty:
53 break
53 break
54 return msgs
54 return msgs
55
55
56 def msg_ready(self):
56 def msg_ready(self):
57 """ Is there a message that has been received? """
57 """ Is there a message that has been received? """
58 return not self._in_queue.empty()
58 return not self._in_queue.empty()
59
59
60
60
61 class BlockingIOPubChannel(BlockingChannelMixin, IOPubChannel):
61 class BlockingIOPubChannel(BlockingChannelMixin, IOPubChannel):
62 pass
62 pass
63
63
64
64
65 class BlockingShellChannel(BlockingChannelMixin, ShellChannel):
65 class BlockingShellChannel(BlockingChannelMixin, ShellChannel):
66 pass
66 def call_handlers(self, msg):
67 if msg['msg_type'] == 'kernel_info_reply':
68 self._handle_kernel_info_reply(msg)
69 return super(BlockingShellChannel, self).call_handlers(msg)
67
70
68
71
69 class BlockingStdInChannel(BlockingChannelMixin, StdInChannel):
72 class BlockingStdInChannel(BlockingChannelMixin, StdInChannel):
70 pass
73 pass
71
74
72
75
73 class BlockingHBChannel(HBChannel):
76 class BlockingHBChannel(HBChannel):
74
77
75 # This kernel needs quicker monitoring, shorten to 1 sec.
78 # This kernel needs quicker monitoring, shorten to 1 sec.
76 # less than 0.5s is unreliable, and will get occasional
79 # less than 0.5s is unreliable, and will get occasional
77 # false reports of missed beats.
80 # false reports of missed beats.
78 time_to_dead = 1.
81 time_to_dead = 1.
79
82
80 def call_handlers(self, since_last_heartbeat):
83 def call_handlers(self, since_last_heartbeat):
81 """ Pause beating on missed heartbeat. """
84 """ Pause beating on missed heartbeat. """
82 pass
85 pass
@@ -1,626 +1,639 b''
1 """Base classes to manage a Client's interaction with a running kernel"""
1 """Base classes to manage a Client's interaction with a running kernel"""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from __future__ import absolute_import
6 from __future__ import absolute_import
7
7
8 import atexit
8 import atexit
9 import errno
9 import errno
10 from threading import Thread
10 from threading import Thread
11 import time
11 import time
12
12
13 import zmq
13 import zmq
14 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
14 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
15 # during garbage collection of threads at exit:
15 # during garbage collection of threads at exit:
16 from zmq import ZMQError
16 from zmq import ZMQError
17 from zmq.eventloop import ioloop, zmqstream
17 from zmq.eventloop import ioloop, zmqstream
18
18
19 # Local imports
19 from IPython.core.release import kernel_protocol_version_info
20
20 from .channelsabc import (
21 from .channelsabc import (
21 ShellChannelABC, IOPubChannelABC,
22 ShellChannelABC, IOPubChannelABC,
22 HBChannelABC, StdInChannelABC,
23 HBChannelABC, StdInChannelABC,
23 )
24 )
24 from IPython.utils.py3compat import string_types, iteritems
25 from IPython.utils.py3compat import string_types, iteritems
25
26
26 #-----------------------------------------------------------------------------
27 #-----------------------------------------------------------------------------
27 # Constants and exceptions
28 # Constants and exceptions
28 #-----------------------------------------------------------------------------
29 #-----------------------------------------------------------------------------
29
30
31 major_protocol_version = kernel_protocol_version_info[0]
32
30 class InvalidPortNumber(Exception):
33 class InvalidPortNumber(Exception):
31 pass
34 pass
32
35
33 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
34 # Utility functions
37 # Utility functions
35 #-----------------------------------------------------------------------------
38 #-----------------------------------------------------------------------------
36
39
37 # some utilities to validate message structure, these might get moved elsewhere
40 # some utilities to validate message structure, these might get moved elsewhere
38 # if they prove to have more generic utility
41 # if they prove to have more generic utility
39
42
40 def validate_string_list(lst):
43 def validate_string_list(lst):
41 """Validate that the input is a list of strings.
44 """Validate that the input is a list of strings.
42
45
43 Raises ValueError if not."""
46 Raises ValueError if not."""
44 if not isinstance(lst, list):
47 if not isinstance(lst, list):
45 raise ValueError('input %r must be a list' % lst)
48 raise ValueError('input %r must be a list' % lst)
46 for x in lst:
49 for x in lst:
47 if not isinstance(x, string_types):
50 if not isinstance(x, string_types):
48 raise ValueError('element %r in list must be a string' % x)
51 raise ValueError('element %r in list must be a string' % x)
49
52
50
53
51 def validate_string_dict(dct):
54 def validate_string_dict(dct):
52 """Validate that the input is a dict with string keys and values.
55 """Validate that the input is a dict with string keys and values.
53
56
54 Raises ValueError if not."""
57 Raises ValueError if not."""
55 for k,v in iteritems(dct):
58 for k,v in iteritems(dct):
56 if not isinstance(k, string_types):
59 if not isinstance(k, string_types):
57 raise ValueError('key %r in dict must be a string' % k)
60 raise ValueError('key %r in dict must be a string' % k)
58 if not isinstance(v, string_types):
61 if not isinstance(v, string_types):
59 raise ValueError('value %r in dict must be a string' % v)
62 raise ValueError('value %r in dict must be a string' % v)
60
63
61
64
62 #-----------------------------------------------------------------------------
65 #-----------------------------------------------------------------------------
63 # ZMQ Socket Channel classes
66 # ZMQ Socket Channel classes
64 #-----------------------------------------------------------------------------
67 #-----------------------------------------------------------------------------
65
68
66 class ZMQSocketChannel(Thread):
69 class ZMQSocketChannel(Thread):
67 """The base class for the channels that use ZMQ sockets."""
70 """The base class for the channels that use ZMQ sockets."""
68 context = None
71 context = None
69 session = None
72 session = None
70 socket = None
73 socket = None
71 ioloop = None
74 ioloop = None
72 stream = None
75 stream = None
73 _address = None
76 _address = None
74 _exiting = False
77 _exiting = False
75 proxy_methods = []
78 proxy_methods = []
76
79
77 def __init__(self, context, session, address):
80 def __init__(self, context, session, address):
78 """Create a channel.
81 """Create a channel.
79
82
80 Parameters
83 Parameters
81 ----------
84 ----------
82 context : :class:`zmq.Context`
85 context : :class:`zmq.Context`
83 The ZMQ context to use.
86 The ZMQ context to use.
84 session : :class:`session.Session`
87 session : :class:`session.Session`
85 The session to use.
88 The session to use.
86 address : zmq url
89 address : zmq url
87 Standard (ip, port) tuple that the kernel is listening on.
90 Standard (ip, port) tuple that the kernel is listening on.
88 """
91 """
89 super(ZMQSocketChannel, self).__init__()
92 super(ZMQSocketChannel, self).__init__()
90 self.daemon = True
93 self.daemon = True
91
94
92 self.context = context
95 self.context = context
93 self.session = session
96 self.session = session
94 if isinstance(address, tuple):
97 if isinstance(address, tuple):
95 if address[1] == 0:
98 if address[1] == 0:
96 message = 'The port number for a channel cannot be 0.'
99 message = 'The port number for a channel cannot be 0.'
97 raise InvalidPortNumber(message)
100 raise InvalidPortNumber(message)
98 address = "tcp://%s:%i" % address
101 address = "tcp://%s:%i" % address
99 self._address = address
102 self._address = address
100 atexit.register(self._notice_exit)
103 atexit.register(self._notice_exit)
101
104
102 def _notice_exit(self):
105 def _notice_exit(self):
103 self._exiting = True
106 self._exiting = True
104
107
105 def _run_loop(self):
108 def _run_loop(self):
106 """Run my loop, ignoring EINTR events in the poller"""
109 """Run my loop, ignoring EINTR events in the poller"""
107 while True:
110 while True:
108 try:
111 try:
109 self.ioloop.start()
112 self.ioloop.start()
110 except ZMQError as e:
113 except ZMQError as e:
111 if e.errno == errno.EINTR:
114 if e.errno == errno.EINTR:
112 continue
115 continue
113 else:
116 else:
114 raise
117 raise
115 except Exception:
118 except Exception:
116 if self._exiting:
119 if self._exiting:
117 break
120 break
118 else:
121 else:
119 raise
122 raise
120 else:
123 else:
121 break
124 break
122
125
123 def stop(self):
126 def stop(self):
124 """Stop the channel's event loop and join its thread.
127 """Stop the channel's event loop and join its thread.
125
128
126 This calls :meth:`~threading.Thread.join` and returns when the thread
129 This calls :meth:`~threading.Thread.join` and returns when the thread
127 terminates. :class:`RuntimeError` will be raised if
130 terminates. :class:`RuntimeError` will be raised if
128 :meth:`~threading.Thread.start` is called again.
131 :meth:`~threading.Thread.start` is called again.
129 """
132 """
130 if self.ioloop is not None:
133 if self.ioloop is not None:
131 self.ioloop.stop()
134 self.ioloop.stop()
132 self.join()
135 self.join()
133 self.close()
136 self.close()
134
137
135 def close(self):
138 def close(self):
136 if self.ioloop is not None:
139 if self.ioloop is not None:
137 try:
140 try:
138 self.ioloop.close(all_fds=True)
141 self.ioloop.close(all_fds=True)
139 except Exception:
142 except Exception:
140 pass
143 pass
141 if self.socket is not None:
144 if self.socket is not None:
142 try:
145 try:
143 self.socket.close(linger=0)
146 self.socket.close(linger=0)
144 except Exception:
147 except Exception:
145 pass
148 pass
146 self.socket = None
149 self.socket = None
147
150
148 @property
151 @property
149 def address(self):
152 def address(self):
150 """Get the channel's address as a zmq url string.
153 """Get the channel's address as a zmq url string.
151
154
152 These URLS have the form: 'tcp://127.0.0.1:5555'.
155 These URLS have the form: 'tcp://127.0.0.1:5555'.
153 """
156 """
154 return self._address
157 return self._address
155
158
156 def _queue_send(self, msg):
159 def _queue_send(self, msg):
157 """Queue a message to be sent from the IOLoop's thread.
160 """Queue a message to be sent from the IOLoop's thread.
158
161
159 Parameters
162 Parameters
160 ----------
163 ----------
161 msg : message to send
164 msg : message to send
162
165
163 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
166 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
164 thread control of the action.
167 thread control of the action.
165 """
168 """
166 def thread_send():
169 def thread_send():
167 self.session.send(self.stream, msg)
170 self.session.send(self.stream, msg)
168 self.ioloop.add_callback(thread_send)
171 self.ioloop.add_callback(thread_send)
169
172
170 def _handle_recv(self, msg):
173 def _handle_recv(self, msg):
171 """Callback for stream.on_recv.
174 """Callback for stream.on_recv.
172
175
173 Unpacks message, and calls handlers with it.
176 Unpacks message, and calls handlers with it.
174 """
177 """
175 ident,smsg = self.session.feed_identities(msg)
178 ident,smsg = self.session.feed_identities(msg)
176 self.call_handlers(self.session.unserialize(smsg))
179 msg = self.session.unserialize(smsg)
180 self.call_handlers(msg)
177
181
178
182
179
183
180 class ShellChannel(ZMQSocketChannel):
184 class ShellChannel(ZMQSocketChannel):
181 """The shell channel for issuing request/replies to the kernel."""
185 """The shell channel for issuing request/replies to the kernel."""
182
186
183 command_queue = None
187 command_queue = None
184 # flag for whether execute requests should be allowed to call raw_input:
188 # flag for whether execute requests should be allowed to call raw_input:
185 allow_stdin = True
189 allow_stdin = True
186 proxy_methods = [
190 proxy_methods = [
187 'execute',
191 'execute',
188 'complete',
192 'complete',
189 'inspect',
193 'inspect',
190 'history',
194 'history',
191 'kernel_info',
195 'kernel_info',
192 'shutdown',
196 'shutdown',
193 ]
197 ]
194
198
195 def __init__(self, context, session, address):
199 def __init__(self, context, session, address):
196 super(ShellChannel, self).__init__(context, session, address)
200 super(ShellChannel, self).__init__(context, session, address)
197 self.ioloop = ioloop.IOLoop()
201 self.ioloop = ioloop.IOLoop()
198
202
199 def run(self):
203 def run(self):
200 """The thread's main activity. Call start() instead."""
204 """The thread's main activity. Call start() instead."""
201 self.socket = self.context.socket(zmq.DEALER)
205 self.socket = self.context.socket(zmq.DEALER)
202 self.socket.linger = 1000
206 self.socket.linger = 1000
203 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
207 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
204 self.socket.connect(self.address)
208 self.socket.connect(self.address)
205 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
209 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
206 self.stream.on_recv(self._handle_recv)
210 self.stream.on_recv(self._handle_recv)
207 self._run_loop()
211 self._run_loop()
208
212
209 def call_handlers(self, msg):
213 def call_handlers(self, msg):
210 """This method is called in the ioloop thread when a message arrives.
214 """This method is called in the ioloop thread when a message arrives.
211
215
212 Subclasses should override this method to handle incoming messages.
216 Subclasses should override this method to handle incoming messages.
213 It is important to remember that this method is called in the thread
217 It is important to remember that this method is called in the thread
214 so that some logic must be done to ensure that the application level
218 so that some logic must be done to ensure that the application level
215 handlers are called in the application thread.
219 handlers are called in the application thread.
216 """
220 """
217 raise NotImplementedError('call_handlers must be defined in a subclass.')
221 raise NotImplementedError('call_handlers must be defined in a subclass.')
218
222
219 def execute(self, code, silent=False, store_history=True,
223 def execute(self, code, silent=False, store_history=True,
220 user_expressions=None, allow_stdin=None):
224 user_expressions=None, allow_stdin=None):
221 """Execute code in the kernel.
225 """Execute code in the kernel.
222
226
223 Parameters
227 Parameters
224 ----------
228 ----------
225 code : str
229 code : str
226 A string of Python code.
230 A string of Python code.
227
231
228 silent : bool, optional (default False)
232 silent : bool, optional (default False)
229 If set, the kernel will execute the code as quietly possible, and
233 If set, the kernel will execute the code as quietly possible, and
230 will force store_history to be False.
234 will force store_history to be False.
231
235
232 store_history : bool, optional (default True)
236 store_history : bool, optional (default True)
233 If set, the kernel will store command history. This is forced
237 If set, the kernel will store command history. This is forced
234 to be False if silent is True.
238 to be False if silent is True.
235
239
236 user_expressions : dict, optional
240 user_expressions : dict, optional
237 A dict mapping names to expressions to be evaluated in the user's
241 A dict mapping names to expressions to be evaluated in the user's
238 dict. The expression values are returned as strings formatted using
242 dict. The expression values are returned as strings formatted using
239 :func:`repr`.
243 :func:`repr`.
240
244
241 allow_stdin : bool, optional (default self.allow_stdin)
245 allow_stdin : bool, optional (default self.allow_stdin)
242 Flag for whether the kernel can send stdin requests to frontends.
246 Flag for whether the kernel can send stdin requests to frontends.
243
247
244 Some frontends (e.g. the Notebook) do not support stdin requests.
248 Some frontends (e.g. the Notebook) do not support stdin requests.
245 If raw_input is called from code executed from such a frontend, a
249 If raw_input is called from code executed from such a frontend, a
246 StdinNotImplementedError will be raised.
250 StdinNotImplementedError will be raised.
247
251
248 Returns
252 Returns
249 -------
253 -------
250 The msg_id of the message sent.
254 The msg_id of the message sent.
251 """
255 """
252 if user_expressions is None:
256 if user_expressions is None:
253 user_expressions = {}
257 user_expressions = {}
254 if allow_stdin is None:
258 if allow_stdin is None:
255 allow_stdin = self.allow_stdin
259 allow_stdin = self.allow_stdin
256
260
257
261
258 # Don't waste network traffic if inputs are invalid
262 # Don't waste network traffic if inputs are invalid
259 if not isinstance(code, string_types):
263 if not isinstance(code, string_types):
260 raise ValueError('code %r must be a string' % code)
264 raise ValueError('code %r must be a string' % code)
261 validate_string_dict(user_expressions)
265 validate_string_dict(user_expressions)
262
266
263 # Create class for content/msg creation. Related to, but possibly
267 # Create class for content/msg creation. Related to, but possibly
264 # not in Session.
268 # not in Session.
265 content = dict(code=code, silent=silent, store_history=store_history,
269 content = dict(code=code, silent=silent, store_history=store_history,
266 user_expressions=user_expressions,
270 user_expressions=user_expressions,
267 allow_stdin=allow_stdin,
271 allow_stdin=allow_stdin,
268 )
272 )
269 msg = self.session.msg('execute_request', content)
273 msg = self.session.msg('execute_request', content)
270 self._queue_send(msg)
274 self._queue_send(msg)
271 return msg['header']['msg_id']
275 return msg['header']['msg_id']
272
276
273 def complete(self, code, cursor_pos=None):
277 def complete(self, code, cursor_pos=None):
274 """Tab complete text in the kernel's namespace.
278 """Tab complete text in the kernel's namespace.
275
279
276 Parameters
280 Parameters
277 ----------
281 ----------
278 code : str
282 code : str
279 The context in which completion is requested.
283 The context in which completion is requested.
280 Can be anything between a variable name and an entire cell.
284 Can be anything between a variable name and an entire cell.
281 cursor_pos : int, optional
285 cursor_pos : int, optional
282 The position of the cursor in the block of code where the completion was requested.
286 The position of the cursor in the block of code where the completion was requested.
283 Default: ``len(code)``
287 Default: ``len(code)``
284
288
285 Returns
289 Returns
286 -------
290 -------
287 The msg_id of the message sent.
291 The msg_id of the message sent.
288 """
292 """
289 if cursor_pos is None:
293 if cursor_pos is None:
290 cursor_pos = len(code)
294 cursor_pos = len(code)
291 content = dict(code=code, cursor_pos=cursor_pos)
295 content = dict(code=code, cursor_pos=cursor_pos)
292 msg = self.session.msg('complete_request', content)
296 msg = self.session.msg('complete_request', content)
293 self._queue_send(msg)
297 self._queue_send(msg)
294 return msg['header']['msg_id']
298 return msg['header']['msg_id']
295
299
296 def inspect(self, code, cursor_pos=None, detail_level=0):
300 def inspect(self, code, cursor_pos=None, detail_level=0):
297 """Get metadata information about an object in the kernel's namespace.
301 """Get metadata information about an object in the kernel's namespace.
298
302
299 It is up to the kernel to determine the appropriate object to inspect.
303 It is up to the kernel to determine the appropriate object to inspect.
300
304
301 Parameters
305 Parameters
302 ----------
306 ----------
303 code : str
307 code : str
304 The context in which info is requested.
308 The context in which info is requested.
305 Can be anything between a variable name and an entire cell.
309 Can be anything between a variable name and an entire cell.
306 cursor_pos : int, optional
310 cursor_pos : int, optional
307 The position of the cursor in the block of code where the info was requested.
311 The position of the cursor in the block of code where the info was requested.
308 Default: ``len(code)``
312 Default: ``len(code)``
309 detail_level : int, optional
313 detail_level : int, optional
310 The level of detail for the introspection (0-2)
314 The level of detail for the introspection (0-2)
311
315
312 Returns
316 Returns
313 -------
317 -------
314 The msg_id of the message sent.
318 The msg_id of the message sent.
315 """
319 """
316 if cursor_pos is None:
320 if cursor_pos is None:
317 cursor_pos = len(code)
321 cursor_pos = len(code)
318 content = dict(code=code, cursor_pos=cursor_pos,
322 content = dict(code=code, cursor_pos=cursor_pos,
319 detail_level=detail_level,
323 detail_level=detail_level,
320 )
324 )
321 msg = self.session.msg('inspect_request', content)
325 msg = self.session.msg('inspect_request', content)
322 self._queue_send(msg)
326 self._queue_send(msg)
323 return msg['header']['msg_id']
327 return msg['header']['msg_id']
324
328
325 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
329 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
326 """Get entries from the kernel's history list.
330 """Get entries from the kernel's history list.
327
331
328 Parameters
332 Parameters
329 ----------
333 ----------
330 raw : bool
334 raw : bool
331 If True, return the raw input.
335 If True, return the raw input.
332 output : bool
336 output : bool
333 If True, then return the output as well.
337 If True, then return the output as well.
334 hist_access_type : str
338 hist_access_type : str
335 'range' (fill in session, start and stop params), 'tail' (fill in n)
339 'range' (fill in session, start and stop params), 'tail' (fill in n)
336 or 'search' (fill in pattern param).
340 or 'search' (fill in pattern param).
337
341
338 session : int
342 session : int
339 For a range request, the session from which to get lines. Session
343 For a range request, the session from which to get lines. Session
340 numbers are positive integers; negative ones count back from the
344 numbers are positive integers; negative ones count back from the
341 current session.
345 current session.
342 start : int
346 start : int
343 The first line number of a history range.
347 The first line number of a history range.
344 stop : int
348 stop : int
345 The final (excluded) line number of a history range.
349 The final (excluded) line number of a history range.
346
350
347 n : int
351 n : int
348 The number of lines of history to get for a tail request.
352 The number of lines of history to get for a tail request.
349
353
350 pattern : str
354 pattern : str
351 The glob-syntax pattern for a search request.
355 The glob-syntax pattern for a search request.
352
356
353 Returns
357 Returns
354 -------
358 -------
355 The msg_id of the message sent.
359 The msg_id of the message sent.
356 """
360 """
357 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
361 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
358 **kwargs)
362 **kwargs)
359 msg = self.session.msg('history_request', content)
363 msg = self.session.msg('history_request', content)
360 self._queue_send(msg)
364 self._queue_send(msg)
361 return msg['header']['msg_id']
365 return msg['header']['msg_id']
362
366
363 def kernel_info(self):
367 def kernel_info(self):
364 """Request kernel info."""
368 """Request kernel info."""
365 msg = self.session.msg('kernel_info_request')
369 msg = self.session.msg('kernel_info_request')
366 self._queue_send(msg)
370 self._queue_send(msg)
367 return msg['header']['msg_id']
371 return msg['header']['msg_id']
372
373 def _handle_kernel_info_reply(self, msg):
374 """handle kernel info reply
375
376 sets protocol adaptation version
377 """
378 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
379 if adapt_version != major_protocol_version:
380 self.session.adapt_version = adapt_version
368
381
369 def shutdown(self, restart=False):
382 def shutdown(self, restart=False):
370 """Request an immediate kernel shutdown.
383 """Request an immediate kernel shutdown.
371
384
372 Upon receipt of the (empty) reply, client code can safely assume that
385 Upon receipt of the (empty) reply, client code can safely assume that
373 the kernel has shut down and it's safe to forcefully terminate it if
386 the kernel has shut down and it's safe to forcefully terminate it if
374 it's still alive.
387 it's still alive.
375
388
376 The kernel will send the reply via a function registered with Python's
389 The kernel will send the reply via a function registered with Python's
377 atexit module, ensuring it's truly done as the kernel is done with all
390 atexit module, ensuring it's truly done as the kernel is done with all
378 normal operation.
391 normal operation.
379 """
392 """
380 # Send quit message to kernel. Once we implement kernel-side setattr,
393 # Send quit message to kernel. Once we implement kernel-side setattr,
381 # this should probably be done that way, but for now this will do.
394 # this should probably be done that way, but for now this will do.
382 msg = self.session.msg('shutdown_request', {'restart':restart})
395 msg = self.session.msg('shutdown_request', {'restart':restart})
383 self._queue_send(msg)
396 self._queue_send(msg)
384 return msg['header']['msg_id']
397 return msg['header']['msg_id']
385
398
386
399
387
400
388 class IOPubChannel(ZMQSocketChannel):
401 class IOPubChannel(ZMQSocketChannel):
389 """The iopub channel which listens for messages that the kernel publishes.
402 """The iopub channel which listens for messages that the kernel publishes.
390
403
391 This channel is where all output is published to frontends.
404 This channel is where all output is published to frontends.
392 """
405 """
393
406
394 def __init__(self, context, session, address):
407 def __init__(self, context, session, address):
395 super(IOPubChannel, self).__init__(context, session, address)
408 super(IOPubChannel, self).__init__(context, session, address)
396 self.ioloop = ioloop.IOLoop()
409 self.ioloop = ioloop.IOLoop()
397
410
398 def run(self):
411 def run(self):
399 """The thread's main activity. Call start() instead."""
412 """The thread's main activity. Call start() instead."""
400 self.socket = self.context.socket(zmq.SUB)
413 self.socket = self.context.socket(zmq.SUB)
401 self.socket.linger = 1000
414 self.socket.linger = 1000
402 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
415 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
403 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
416 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
404 self.socket.connect(self.address)
417 self.socket.connect(self.address)
405 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
418 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
406 self.stream.on_recv(self._handle_recv)
419 self.stream.on_recv(self._handle_recv)
407 self._run_loop()
420 self._run_loop()
408
421
409 def call_handlers(self, msg):
422 def call_handlers(self, msg):
410 """This method is called in the ioloop thread when a message arrives.
423 """This method is called in the ioloop thread when a message arrives.
411
424
412 Subclasses should override this method to handle incoming messages.
425 Subclasses should override this method to handle incoming messages.
413 It is important to remember that this method is called in the thread
426 It is important to remember that this method is called in the thread
414 so that some logic must be done to ensure that the application leve
427 so that some logic must be done to ensure that the application leve
415 handlers are called in the application thread.
428 handlers are called in the application thread.
416 """
429 """
417 raise NotImplementedError('call_handlers must be defined in a subclass.')
430 raise NotImplementedError('call_handlers must be defined in a subclass.')
418
431
419 def flush(self, timeout=1.0):
432 def flush(self, timeout=1.0):
420 """Immediately processes all pending messages on the iopub channel.
433 """Immediately processes all pending messages on the iopub channel.
421
434
422 Callers should use this method to ensure that :meth:`call_handlers`
435 Callers should use this method to ensure that :meth:`call_handlers`
423 has been called for all messages that have been received on the
436 has been called for all messages that have been received on the
424 0MQ SUB socket of this channel.
437 0MQ SUB socket of this channel.
425
438
426 This method is thread safe.
439 This method is thread safe.
427
440
428 Parameters
441 Parameters
429 ----------
442 ----------
430 timeout : float, optional
443 timeout : float, optional
431 The maximum amount of time to spend flushing, in seconds. The
444 The maximum amount of time to spend flushing, in seconds. The
432 default is one second.
445 default is one second.
433 """
446 """
434 # We do the IOLoop callback process twice to ensure that the IOLoop
447 # We do the IOLoop callback process twice to ensure that the IOLoop
435 # gets to perform at least one full poll.
448 # gets to perform at least one full poll.
436 stop_time = time.time() + timeout
449 stop_time = time.time() + timeout
437 for i in range(2):
450 for i in range(2):
438 self._flushed = False
451 self._flushed = False
439 self.ioloop.add_callback(self._flush)
452 self.ioloop.add_callback(self._flush)
440 while not self._flushed and time.time() < stop_time:
453 while not self._flushed and time.time() < stop_time:
441 time.sleep(0.01)
454 time.sleep(0.01)
442
455
443 def _flush(self):
456 def _flush(self):
444 """Callback for :method:`self.flush`."""
457 """Callback for :method:`self.flush`."""
445 self.stream.flush()
458 self.stream.flush()
446 self._flushed = True
459 self._flushed = True
447
460
448
461
449 class StdInChannel(ZMQSocketChannel):
462 class StdInChannel(ZMQSocketChannel):
450 """The stdin channel to handle raw_input requests that the kernel makes."""
463 """The stdin channel to handle raw_input requests that the kernel makes."""
451
464
452 msg_queue = None
465 msg_queue = None
453 proxy_methods = ['input']
466 proxy_methods = ['input']
454
467
455 def __init__(self, context, session, address):
468 def __init__(self, context, session, address):
456 super(StdInChannel, self).__init__(context, session, address)
469 super(StdInChannel, self).__init__(context, session, address)
457 self.ioloop = ioloop.IOLoop()
470 self.ioloop = ioloop.IOLoop()
458
471
459 def run(self):
472 def run(self):
460 """The thread's main activity. Call start() instead."""
473 """The thread's main activity. Call start() instead."""
461 self.socket = self.context.socket(zmq.DEALER)
474 self.socket = self.context.socket(zmq.DEALER)
462 self.socket.linger = 1000
475 self.socket.linger = 1000
463 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
476 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
464 self.socket.connect(self.address)
477 self.socket.connect(self.address)
465 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
478 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
466 self.stream.on_recv(self._handle_recv)
479 self.stream.on_recv(self._handle_recv)
467 self._run_loop()
480 self._run_loop()
468
481
469 def call_handlers(self, msg):
482 def call_handlers(self, msg):
470 """This method is called in the ioloop thread when a message arrives.
483 """This method is called in the ioloop thread when a message arrives.
471
484
472 Subclasses should override this method to handle incoming messages.
485 Subclasses should override this method to handle incoming messages.
473 It is important to remember that this method is called in the thread
486 It is important to remember that this method is called in the thread
474 so that some logic must be done to ensure that the application leve
487 so that some logic must be done to ensure that the application leve
475 handlers are called in the application thread.
488 handlers are called in the application thread.
476 """
489 """
477 raise NotImplementedError('call_handlers must be defined in a subclass.')
490 raise NotImplementedError('call_handlers must be defined in a subclass.')
478
491
479 def input(self, string):
492 def input(self, string):
480 """Send a string of raw input to the kernel."""
493 """Send a string of raw input to the kernel."""
481 content = dict(value=string)
494 content = dict(value=string)
482 msg = self.session.msg('input_reply', content)
495 msg = self.session.msg('input_reply', content)
483 self._queue_send(msg)
496 self._queue_send(msg)
484
497
485
498
486 class HBChannel(ZMQSocketChannel):
499 class HBChannel(ZMQSocketChannel):
487 """The heartbeat channel which monitors the kernel heartbeat.
500 """The heartbeat channel which monitors the kernel heartbeat.
488
501
489 Note that the heartbeat channel is paused by default. As long as you start
502 Note that the heartbeat channel is paused by default. As long as you start
490 this channel, the kernel manager will ensure that it is paused and un-paused
503 this channel, the kernel manager will ensure that it is paused and un-paused
491 as appropriate.
504 as appropriate.
492 """
505 """
493
506
494 time_to_dead = 3.0
507 time_to_dead = 3.0
495 socket = None
508 socket = None
496 poller = None
509 poller = None
497 _running = None
510 _running = None
498 _pause = None
511 _pause = None
499 _beating = None
512 _beating = None
500
513
501 def __init__(self, context, session, address):
514 def __init__(self, context, session, address):
502 super(HBChannel, self).__init__(context, session, address)
515 super(HBChannel, self).__init__(context, session, address)
503 self._running = False
516 self._running = False
504 self._pause =True
517 self._pause =True
505 self.poller = zmq.Poller()
518 self.poller = zmq.Poller()
506
519
507 def _create_socket(self):
520 def _create_socket(self):
508 if self.socket is not None:
521 if self.socket is not None:
509 # close previous socket, before opening a new one
522 # close previous socket, before opening a new one
510 self.poller.unregister(self.socket)
523 self.poller.unregister(self.socket)
511 self.socket.close()
524 self.socket.close()
512 self.socket = self.context.socket(zmq.REQ)
525 self.socket = self.context.socket(zmq.REQ)
513 self.socket.linger = 1000
526 self.socket.linger = 1000
514 self.socket.connect(self.address)
527 self.socket.connect(self.address)
515
528
516 self.poller.register(self.socket, zmq.POLLIN)
529 self.poller.register(self.socket, zmq.POLLIN)
517
530
518 def _poll(self, start_time):
531 def _poll(self, start_time):
519 """poll for heartbeat replies until we reach self.time_to_dead.
532 """poll for heartbeat replies until we reach self.time_to_dead.
520
533
521 Ignores interrupts, and returns the result of poll(), which
534 Ignores interrupts, and returns the result of poll(), which
522 will be an empty list if no messages arrived before the timeout,
535 will be an empty list if no messages arrived before the timeout,
523 or the event tuple if there is a message to receive.
536 or the event tuple if there is a message to receive.
524 """
537 """
525
538
526 until_dead = self.time_to_dead - (time.time() - start_time)
539 until_dead = self.time_to_dead - (time.time() - start_time)
527 # ensure poll at least once
540 # ensure poll at least once
528 until_dead = max(until_dead, 1e-3)
541 until_dead = max(until_dead, 1e-3)
529 events = []
542 events = []
530 while True:
543 while True:
531 try:
544 try:
532 events = self.poller.poll(1000 * until_dead)
545 events = self.poller.poll(1000 * until_dead)
533 except ZMQError as e:
546 except ZMQError as e:
534 if e.errno == errno.EINTR:
547 if e.errno == errno.EINTR:
535 # ignore interrupts during heartbeat
548 # ignore interrupts during heartbeat
536 # this may never actually happen
549 # this may never actually happen
537 until_dead = self.time_to_dead - (time.time() - start_time)
550 until_dead = self.time_to_dead - (time.time() - start_time)
538 until_dead = max(until_dead, 1e-3)
551 until_dead = max(until_dead, 1e-3)
539 pass
552 pass
540 else:
553 else:
541 raise
554 raise
542 except Exception:
555 except Exception:
543 if self._exiting:
556 if self._exiting:
544 break
557 break
545 else:
558 else:
546 raise
559 raise
547 else:
560 else:
548 break
561 break
549 return events
562 return events
550
563
551 def run(self):
564 def run(self):
552 """The thread's main activity. Call start() instead."""
565 """The thread's main activity. Call start() instead."""
553 self._create_socket()
566 self._create_socket()
554 self._running = True
567 self._running = True
555 self._beating = True
568 self._beating = True
556
569
557 while self._running:
570 while self._running:
558 if self._pause:
571 if self._pause:
559 # just sleep, and skip the rest of the loop
572 # just sleep, and skip the rest of the loop
560 time.sleep(self.time_to_dead)
573 time.sleep(self.time_to_dead)
561 continue
574 continue
562
575
563 since_last_heartbeat = 0.0
576 since_last_heartbeat = 0.0
564 # io.rprint('Ping from HB channel') # dbg
577 # io.rprint('Ping from HB channel') # dbg
565 # no need to catch EFSM here, because the previous event was
578 # no need to catch EFSM here, because the previous event was
566 # either a recv or connect, which cannot be followed by EFSM
579 # either a recv or connect, which cannot be followed by EFSM
567 self.socket.send(b'ping')
580 self.socket.send(b'ping')
568 request_time = time.time()
581 request_time = time.time()
569 ready = self._poll(request_time)
582 ready = self._poll(request_time)
570 if ready:
583 if ready:
571 self._beating = True
584 self._beating = True
572 # the poll above guarantees we have something to recv
585 # the poll above guarantees we have something to recv
573 self.socket.recv()
586 self.socket.recv()
574 # sleep the remainder of the cycle
587 # sleep the remainder of the cycle
575 remainder = self.time_to_dead - (time.time() - request_time)
588 remainder = self.time_to_dead - (time.time() - request_time)
576 if remainder > 0:
589 if remainder > 0:
577 time.sleep(remainder)
590 time.sleep(remainder)
578 continue
591 continue
579 else:
592 else:
580 # nothing was received within the time limit, signal heart failure
593 # nothing was received within the time limit, signal heart failure
581 self._beating = False
594 self._beating = False
582 since_last_heartbeat = time.time() - request_time
595 since_last_heartbeat = time.time() - request_time
583 self.call_handlers(since_last_heartbeat)
596 self.call_handlers(since_last_heartbeat)
584 # and close/reopen the socket, because the REQ/REP cycle has been broken
597 # and close/reopen the socket, because the REQ/REP cycle has been broken
585 self._create_socket()
598 self._create_socket()
586 continue
599 continue
587
600
588 def pause(self):
601 def pause(self):
589 """Pause the heartbeat."""
602 """Pause the heartbeat."""
590 self._pause = True
603 self._pause = True
591
604
592 def unpause(self):
605 def unpause(self):
593 """Unpause the heartbeat."""
606 """Unpause the heartbeat."""
594 self._pause = False
607 self._pause = False
595
608
596 def is_beating(self):
609 def is_beating(self):
597 """Is the heartbeat running and responsive (and not paused)."""
610 """Is the heartbeat running and responsive (and not paused)."""
598 if self.is_alive() and not self._pause and self._beating:
611 if self.is_alive() and not self._pause and self._beating:
599 return True
612 return True
600 else:
613 else:
601 return False
614 return False
602
615
603 def stop(self):
616 def stop(self):
604 """Stop the channel's event loop and join its thread."""
617 """Stop the channel's event loop and join its thread."""
605 self._running = False
618 self._running = False
606 super(HBChannel, self).stop()
619 super(HBChannel, self).stop()
607
620
608 def call_handlers(self, since_last_heartbeat):
621 def call_handlers(self, since_last_heartbeat):
609 """This method is called in the ioloop thread when a message arrives.
622 """This method is called in the ioloop thread when a message arrives.
610
623
611 Subclasses should override this method to handle incoming messages.
624 Subclasses should override this method to handle incoming messages.
612 It is important to remember that this method is called in the thread
625 It is important to remember that this method is called in the thread
613 so that some logic must be done to ensure that the application level
626 so that some logic must be done to ensure that the application level
614 handlers are called in the application thread.
627 handlers are called in the application thread.
615 """
628 """
616 raise NotImplementedError('call_handlers must be defined in a subclass.')
629 raise NotImplementedError('call_handlers must be defined in a subclass.')
617
630
618
631
619 #---------------------------------------------------------------------#-----------------------------------------------------------------------------
632 #---------------------------------------------------------------------#-----------------------------------------------------------------------------
620 # ABC Registration
633 # ABC Registration
621 #-----------------------------------------------------------------------------
634 #-----------------------------------------------------------------------------
622
635
623 ShellChannelABC.register(ShellChannel)
636 ShellChannelABC.register(ShellChannel)
624 IOPubChannelABC.register(IOPubChannel)
637 IOPubChannelABC.register(IOPubChannel)
625 HBChannelABC.register(HBChannel)
638 HBChannelABC.register(HBChannel)
626 StdInChannelABC.register(StdInChannel)
639 StdInChannelABC.register(StdInChannel)
@@ -1,215 +1,219 b''
1 """Defines a KernelManager that provides signals and slots."""
1 """Defines a KernelManager that provides signals and slots."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from IPython.external.qt import QtCore
6 from IPython.external.qt import QtCore
7
7
8 from IPython.utils.traitlets import HasTraits, Type
8 from IPython.utils.traitlets import HasTraits, Type
9 from .util import MetaQObjectHasTraits, SuperQObject
9 from .util import MetaQObjectHasTraits, SuperQObject
10
10
11
11
12 class ChannelQObject(SuperQObject):
12 class ChannelQObject(SuperQObject):
13
13
14 # Emitted when the channel is started.
14 # Emitted when the channel is started.
15 started = QtCore.Signal()
15 started = QtCore.Signal()
16
16
17 # Emitted when the channel is stopped.
17 # Emitted when the channel is stopped.
18 stopped = QtCore.Signal()
18 stopped = QtCore.Signal()
19
19
20 #---------------------------------------------------------------------------
20 #---------------------------------------------------------------------------
21 # Channel interface
21 # Channel interface
22 #---------------------------------------------------------------------------
22 #---------------------------------------------------------------------------
23
23
24 def start(self):
24 def start(self):
25 """ Reimplemented to emit signal.
25 """ Reimplemented to emit signal.
26 """
26 """
27 super(ChannelQObject, self).start()
27 super(ChannelQObject, self).start()
28 self.started.emit()
28 self.started.emit()
29
29
30 def stop(self):
30 def stop(self):
31 """ Reimplemented to emit signal.
31 """ Reimplemented to emit signal.
32 """
32 """
33 super(ChannelQObject, self).stop()
33 super(ChannelQObject, self).stop()
34 self.stopped.emit()
34 self.stopped.emit()
35
35
36 #---------------------------------------------------------------------------
36 #---------------------------------------------------------------------------
37 # InProcessChannel interface
37 # InProcessChannel interface
38 #---------------------------------------------------------------------------
38 #---------------------------------------------------------------------------
39
39
40 def call_handlers_later(self, *args, **kwds):
40 def call_handlers_later(self, *args, **kwds):
41 """ Call the message handlers later.
41 """ Call the message handlers later.
42 """
42 """
43 do_later = lambda: self.call_handlers(*args, **kwds)
43 do_later = lambda: self.call_handlers(*args, **kwds)
44 QtCore.QTimer.singleShot(0, do_later)
44 QtCore.QTimer.singleShot(0, do_later)
45
45
46 def process_events(self):
46 def process_events(self):
47 """ Process any pending GUI events.
47 """ Process any pending GUI events.
48 """
48 """
49 QtCore.QCoreApplication.instance().processEvents()
49 QtCore.QCoreApplication.instance().processEvents()
50
50
51
51
52 class QtShellChannelMixin(ChannelQObject):
52 class QtShellChannelMixin(ChannelQObject):
53
53
54 # Emitted when any message is received.
54 # Emitted when any message is received.
55 message_received = QtCore.Signal(object)
55 message_received = QtCore.Signal(object)
56
56
57 # Emitted when a reply has been received for the corresponding request type.
57 # Emitted when a reply has been received for the corresponding request type.
58 execute_reply = QtCore.Signal(object)
58 execute_reply = QtCore.Signal(object)
59 complete_reply = QtCore.Signal(object)
59 complete_reply = QtCore.Signal(object)
60 inspect_reply = QtCore.Signal(object)
60 inspect_reply = QtCore.Signal(object)
61 history_reply = QtCore.Signal(object)
61 history_reply = QtCore.Signal(object)
62 kernel_info_reply = QtCore.Signal(object)
62
63
63 #---------------------------------------------------------------------------
64 #---------------------------------------------------------------------------
64 # 'ShellChannel' interface
65 # 'ShellChannel' interface
65 #---------------------------------------------------------------------------
66 #---------------------------------------------------------------------------
66
67
67 def call_handlers(self, msg):
68 def call_handlers(self, msg):
68 """ Reimplemented to emit signals instead of making callbacks.
69 """ Reimplemented to emit signals instead of making callbacks.
69 """
70 """
70 # Emit the generic signal.
71 # Emit the generic signal.
71 self.message_received.emit(msg)
72 self.message_received.emit(msg)
72
73
73 # Emit signals for specialized message types.
74 # Emit signals for specialized message types.
74 msg_type = msg['header']['msg_type']
75 msg_type = msg['header']['msg_type']
76 if msg_type == 'kernel_info_reply':
77 self._handle_kernel_info_reply(msg)
78
75 signal = getattr(self, msg_type, None)
79 signal = getattr(self, msg_type, None)
76 if signal:
80 if signal:
77 signal.emit(msg)
81 signal.emit(msg)
78
82
79
83
80 class QtIOPubChannelMixin(ChannelQObject):
84 class QtIOPubChannelMixin(ChannelQObject):
81
85
82 # Emitted when any message is received.
86 # Emitted when any message is received.
83 message_received = QtCore.Signal(object)
87 message_received = QtCore.Signal(object)
84
88
85 # Emitted when a message of type 'stream' is received.
89 # Emitted when a message of type 'stream' is received.
86 stream_received = QtCore.Signal(object)
90 stream_received = QtCore.Signal(object)
87
91
88 # Emitted when a message of type 'execute_input' is received.
92 # Emitted when a message of type 'execute_input' is received.
89 execute_input_received = QtCore.Signal(object)
93 execute_input_received = QtCore.Signal(object)
90
94
91 # Emitted when a message of type 'execute_result' is received.
95 # Emitted when a message of type 'execute_result' is received.
92 execute_result_received = QtCore.Signal(object)
96 execute_result_received = QtCore.Signal(object)
93
97
94 # Emitted when a message of type 'error' is received.
98 # Emitted when a message of type 'error' is received.
95 error_received = QtCore.Signal(object)
99 error_received = QtCore.Signal(object)
96
100
97 # Emitted when a message of type 'display_data' is received
101 # Emitted when a message of type 'display_data' is received
98 display_data_received = QtCore.Signal(object)
102 display_data_received = QtCore.Signal(object)
99
103
100 # Emitted when a crash report message is received from the kernel's
104 # Emitted when a crash report message is received from the kernel's
101 # last-resort sys.excepthook.
105 # last-resort sys.excepthook.
102 crash_received = QtCore.Signal(object)
106 crash_received = QtCore.Signal(object)
103
107
104 # Emitted when a shutdown is noticed.
108 # Emitted when a shutdown is noticed.
105 shutdown_reply_received = QtCore.Signal(object)
109 shutdown_reply_received = QtCore.Signal(object)
106
110
107 #---------------------------------------------------------------------------
111 #---------------------------------------------------------------------------
108 # 'IOPubChannel' interface
112 # 'IOPubChannel' interface
109 #---------------------------------------------------------------------------
113 #---------------------------------------------------------------------------
110
114
111 def call_handlers(self, msg):
115 def call_handlers(self, msg):
112 """ Reimplemented to emit signals instead of making callbacks.
116 """ Reimplemented to emit signals instead of making callbacks.
113 """
117 """
114 # Emit the generic signal.
118 # Emit the generic signal.
115 self.message_received.emit(msg)
119 self.message_received.emit(msg)
116 # Emit signals for specialized message types.
120 # Emit signals for specialized message types.
117 msg_type = msg['header']['msg_type']
121 msg_type = msg['header']['msg_type']
118 signal = getattr(self, msg_type + '_received', None)
122 signal = getattr(self, msg_type + '_received', None)
119 if signal:
123 if signal:
120 signal.emit(msg)
124 signal.emit(msg)
121 elif msg_type in ('stdout', 'stderr'):
125 elif msg_type in ('stdout', 'stderr'):
122 self.stream_received.emit(msg)
126 self.stream_received.emit(msg)
123
127
124 def flush(self):
128 def flush(self):
125 """ Reimplemented to ensure that signals are dispatched immediately.
129 """ Reimplemented to ensure that signals are dispatched immediately.
126 """
130 """
127 super(QtIOPubChannelMixin, self).flush()
131 super(QtIOPubChannelMixin, self).flush()
128 QtCore.QCoreApplication.instance().processEvents()
132 QtCore.QCoreApplication.instance().processEvents()
129
133
130
134
131 class QtStdInChannelMixin(ChannelQObject):
135 class QtStdInChannelMixin(ChannelQObject):
132
136
133 # Emitted when any message is received.
137 # Emitted when any message is received.
134 message_received = QtCore.Signal(object)
138 message_received = QtCore.Signal(object)
135
139
136 # Emitted when an input request is received.
140 # Emitted when an input request is received.
137 input_requested = QtCore.Signal(object)
141 input_requested = QtCore.Signal(object)
138
142
139 #---------------------------------------------------------------------------
143 #---------------------------------------------------------------------------
140 # 'StdInChannel' interface
144 # 'StdInChannel' interface
141 #---------------------------------------------------------------------------
145 #---------------------------------------------------------------------------
142
146
143 def call_handlers(self, msg):
147 def call_handlers(self, msg):
144 """ Reimplemented to emit signals instead of making callbacks.
148 """ Reimplemented to emit signals instead of making callbacks.
145 """
149 """
146 # Emit the generic signal.
150 # Emit the generic signal.
147 self.message_received.emit(msg)
151 self.message_received.emit(msg)
148
152
149 # Emit signals for specialized message types.
153 # Emit signals for specialized message types.
150 msg_type = msg['header']['msg_type']
154 msg_type = msg['header']['msg_type']
151 if msg_type == 'input_request':
155 if msg_type == 'input_request':
152 self.input_requested.emit(msg)
156 self.input_requested.emit(msg)
153
157
154
158
155 class QtHBChannelMixin(ChannelQObject):
159 class QtHBChannelMixin(ChannelQObject):
156
160
157 # Emitted when the kernel has died.
161 # Emitted when the kernel has died.
158 kernel_died = QtCore.Signal(object)
162 kernel_died = QtCore.Signal(object)
159
163
160 #---------------------------------------------------------------------------
164 #---------------------------------------------------------------------------
161 # 'HBChannel' interface
165 # 'HBChannel' interface
162 #---------------------------------------------------------------------------
166 #---------------------------------------------------------------------------
163
167
164 def call_handlers(self, since_last_heartbeat):
168 def call_handlers(self, since_last_heartbeat):
165 """ Reimplemented to emit signals instead of making callbacks.
169 """ Reimplemented to emit signals instead of making callbacks.
166 """
170 """
167 # Emit the generic signal.
171 # Emit the generic signal.
168 self.kernel_died.emit(since_last_heartbeat)
172 self.kernel_died.emit(since_last_heartbeat)
169
173
170
174
171 class QtKernelRestarterMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
175 class QtKernelRestarterMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
172
176
173 _timer = None
177 _timer = None
174
178
175
179
176 class QtKernelManagerMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
180 class QtKernelManagerMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
177 """ A KernelClient that provides signals and slots.
181 """ A KernelClient that provides signals and slots.
178 """
182 """
179
183
180 kernel_restarted = QtCore.Signal()
184 kernel_restarted = QtCore.Signal()
181
185
182
186
183 class QtKernelClientMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
187 class QtKernelClientMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
184 """ A KernelClient that provides signals and slots.
188 """ A KernelClient that provides signals and slots.
185 """
189 """
186
190
187 # Emitted when the kernel client has started listening.
191 # Emitted when the kernel client has started listening.
188 started_channels = QtCore.Signal()
192 started_channels = QtCore.Signal()
189
193
190 # Emitted when the kernel client has stopped listening.
194 # Emitted when the kernel client has stopped listening.
191 stopped_channels = QtCore.Signal()
195 stopped_channels = QtCore.Signal()
192
196
193 # Use Qt-specific channel classes that emit signals.
197 # Use Qt-specific channel classes that emit signals.
194 iopub_channel_class = Type(QtIOPubChannelMixin)
198 iopub_channel_class = Type(QtIOPubChannelMixin)
195 shell_channel_class = Type(QtShellChannelMixin)
199 shell_channel_class = Type(QtShellChannelMixin)
196 stdin_channel_class = Type(QtStdInChannelMixin)
200 stdin_channel_class = Type(QtStdInChannelMixin)
197 hb_channel_class = Type(QtHBChannelMixin)
201 hb_channel_class = Type(QtHBChannelMixin)
198
202
199 #---------------------------------------------------------------------------
203 #---------------------------------------------------------------------------
200 # 'KernelClient' interface
204 # 'KernelClient' interface
201 #---------------------------------------------------------------------------
205 #---------------------------------------------------------------------------
202
206
203 #------ Channel management -------------------------------------------------
207 #------ Channel management -------------------------------------------------
204
208
205 def start_channels(self, *args, **kw):
209 def start_channels(self, *args, **kw):
206 """ Reimplemented to emit signal.
210 """ Reimplemented to emit signal.
207 """
211 """
208 super(QtKernelClientMixin, self).start_channels(*args, **kw)
212 super(QtKernelClientMixin, self).start_channels(*args, **kw)
209 self.started_channels.emit()
213 self.started_channels.emit()
210
214
211 def stop_channels(self):
215 def stop_channels(self):
212 """ Reimplemented to emit signal.
216 """ Reimplemented to emit signal.
213 """
217 """
214 super(QtKernelClientMixin, self).stop_channels()
218 super(QtKernelClientMixin, self).stop_channels()
215 self.stopped_channels.emit()
219 self.stopped_channels.emit()
General Comments 0
You need to be logged in to leave comments. Login now