##// END OF EJS Templates
Move ZMQ socket creation out of channels
Thomas Kluyver -
Show More
@@ -1,174 +1,171 b''
1 1 """Blocking channels
2 2
3 3 Useful for test suites and blocking terminal interfaces.
4 4 """
5 5
6 6 # Copyright (c) IPython Development Team.
7 7 # Distributed under the terms of the Modified BSD License.
8 8
9 9 try:
10 10 from queue import Queue, Empty # Py 3
11 11 except ImportError:
12 12 from Queue import Queue, Empty # Py 2
13 13
14 14 from IPython.kernel.channelsabc import ShellChannelABC, IOPubChannelABC, \
15 15 StdInChannelABC
16 16 from IPython.kernel.channels import HBChannel,\
17 17 make_iopub_socket, make_shell_socket, make_stdin_socket,\
18 18 InvalidPortNumber, major_protocol_version
19 19 from IPython.utils.py3compat import string_types, iteritems
20 20
21 21 # some utilities to validate message structure, these might get moved elsewhere
22 22 # if they prove to have more generic utility
23 23
24 24 def validate_string_list(lst):
25 25 """Validate that the input is a list of strings.
26 26
27 27 Raises ValueError if not."""
28 28 if not isinstance(lst, list):
29 29 raise ValueError('input %r must be a list' % lst)
30 30 for x in lst:
31 31 if not isinstance(x, string_types):
32 32 raise ValueError('element %r in list must be a string' % x)
33 33
34 34
35 35 def validate_string_dict(dct):
36 36 """Validate that the input is a dict with string keys and values.
37 37
38 38 Raises ValueError if not."""
39 39 for k,v in iteritems(dct):
40 40 if not isinstance(k, string_types):
41 41 raise ValueError('key %r in dict must be a string' % k)
42 42 if not isinstance(v, string_types):
43 43 raise ValueError('value %r in dict must be a string' % v)
44 44
45 45
46 46 class ZMQSocketChannel(object):
47 47 """The base class for the channels that use ZMQ sockets."""
48 48 context = None
49 49 session = None
50 50 socket = None
51 51 ioloop = None
52 52 stream = None
53 53 _address = None
54 54 _exiting = False
55 55 proxy_methods = []
56 56
57 def __init__(self, context, session, address):
57 def __init__(self, socket, session):
58 58 """Create a channel.
59 59
60 60 Parameters
61 61 ----------
62 62 context : :class:`zmq.Context`
63 63 The ZMQ context to use.
64 64 session : :class:`session.Session`
65 65 The session to use.
66 66 address : zmq url
67 67 Standard (ip, port) tuple that the kernel is listening on.
68 68 """
69 69 super(ZMQSocketChannel, self).__init__()
70 70 self.daemon = True
71 71
72 self.context = context
72 self.socket = socket
73 73 self.session = session
74 if isinstance(address, tuple):
75 if address[1] == 0:
76 message = 'The port number for a channel cannot be 0.'
77 raise InvalidPortNumber(message)
78 address = "tcp://%s:%i" % address
79 self._address = address
80 74
81 75 def _recv(self, **kwargs):
82 76 msg = self.socket.recv_multipart(**kwargs)
83 77 ident,smsg = self.session.feed_identities(msg)
84 78 return self.session.deserialize(smsg)
85 79
86 80 def get_msg(self, block=True, timeout=None):
87 81 """ Gets a message if there is one that is ready. """
88 82 if block:
89 83 if timeout is not None:
90 84 timeout *= 1000 # seconds to ms
91 85 ready = self.socket.poll(timeout)
92 86 else:
93 87 ready = self.socket.poll(timeout=0)
94 88
95 89 if ready:
96 90 return self._recv()
97 91 else:
98 92 raise Empty
99 93
100 94 def get_msgs(self):
101 95 """ Get all messages that are currently ready. """
102 96 msgs = []
103 97 while True:
104 98 try:
105 99 msgs.append(self.get_msg(block=False))
106 100 except Empty:
107 101 break
108 102 return msgs
109 103
110 104 def msg_ready(self):
111 105 """ Is there a message that has been received? """
112 106 return bool(self.socket.poll(timeout=0))
113 107
114 108 def close(self):
115 109 if self.socket is not None:
116 110 try:
117 111 self.socket.close(linger=0)
118 112 except Exception:
119 113 pass
120 114 self.socket = None
121 115 stop = close
122 116
123 117 def is_alive(self):
124 118 return (self.socket is not None)
125 119
126 120 @property
127 121 def address(self):
128 122 """Get the channel's address as a zmq url string.
129 123
130 124 These URLS have the form: 'tcp://127.0.0.1:5555'.
131 125 """
132 126 return self._address
133 127
134 128 def _queue_send(self, msg):
135 129 """Pass a message to the ZMQ socket to send
136 130 """
137 131 self.session.send(self.socket, msg)
138 132
133 def start(self):
134 pass
135
139 136
140 137 class BlockingShellChannel(ZMQSocketChannel):
141 138 """The shell channel for issuing request/replies to the kernel."""
142 139
143 140 def start(self):
144 141 self.socket = make_stdin_socket(self.context, self.session.bsession, self.address)
145 142
146 143
147 144 class BlockingIOPubChannel(ZMQSocketChannel):
148 145 """The iopub channel which listens for messages that the kernel publishes.
149 146
150 147 This channel is where all output is published to frontends.
151 148 """
152 149 def start(self):
153 150 self.socket = make_iopub_socket(self.context, self.session.bsession, self.address)
154 151
155 152 class BlockingStdInChannel(ZMQSocketChannel):
156 153 """The stdin channel to handle raw_input requests that the kernel makes."""
157 154 def start(self):
158 155 self.socket = make_stdin_socket(self.context, self.session.bsession, self.address)
159 156
160 157 ShellChannelABC.register(BlockingShellChannel)
161 158 IOPubChannelABC.register(BlockingIOPubChannel)
162 159 StdInChannelABC.register(BlockingStdInChannel)
163 160
164 161
165 162 class BlockingHBChannel(HBChannel):
166 163
167 164 # This kernel needs quicker monitoring, shorten to 1 sec.
168 165 # less than 0.5s is unreliable, and will get occasional
169 166 # false reports of missed beats.
170 167 time_to_dead = 1.
171 168
172 169 def call_handlers(self, since_last_heartbeat):
173 170 """ Pause beating on missed heartbeat. """
174 171 pass
@@ -1,41 +1,38 b''
1 1 """Implements a fully blocking kernel client.
2 2
3 3 Useful for test suites and blocking terminal interfaces.
4 4 """
5 5 # Copyright (c) IPython Development Team.
6 6 # Distributed under the terms of the Modified BSD License.
7 7
8 8 try:
9 9 from queue import Empty # Python 3
10 10 except ImportError:
11 11 from Queue import Empty # Python 2
12 12
13 13 from IPython.utils.traitlets import Type
14 14 from IPython.kernel.client import KernelClient
15 from .channels import (
16 BlockingIOPubChannel, BlockingHBChannel,
17 BlockingShellChannel, BlockingStdInChannel
18 )
15 from .channels import ZMQSocketChannel, BlockingHBChannel
19 16
20 17 class BlockingKernelClient(KernelClient):
21 18 def wait_for_ready(self):
22 19 # Wait for kernel info reply on shell channel
23 20 while True:
24 21 msg = self.shell_channel.get_msg(block=True)
25 22 if msg['msg_type'] == 'kernel_info_reply':
26 23 self._handle_kernel_info_reply(msg)
27 24 break
28 25
29 26 # Flush IOPub channel
30 27 while True:
31 28 try:
32 29 msg = self.iopub_channel.get_msg(block=True, timeout=0.2)
33 30 print(msg['msg_type'])
34 31 except Empty:
35 32 break
36 33
37 34 # The classes to use for the various channels
38 shell_channel_class = Type(BlockingShellChannel)
39 iopub_channel_class = Type(BlockingIOPubChannel)
40 stdin_channel_class = Type(BlockingStdInChannel)
35 shell_channel_class = Type(ZMQSocketChannel)
36 iopub_channel_class = Type(ZMQSocketChannel)
37 stdin_channel_class = Type(ZMQSocketChannel)
41 38 hb_channel_class = Type(BlockingHBChannel)
@@ -1,369 +1,375 b''
1 1 """Base class to manage the interaction with a running kernel"""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from __future__ import absolute_import
7 7 from IPython.kernel.channels import validate_string_dict, major_protocol_version
8 8 from IPython.utils.py3compat import string_types
9 9
10 10 import zmq
11 11
12 12 from IPython.utils.traitlets import (
13 13 Any, Instance, Type,
14 14 )
15 15
16 16 from .channelsabc import (
17 17 ShellChannelABC, IOPubChannelABC, HBChannelABC, StdInChannelABC
18 18 )
19 from .channels import (
20 make_shell_socket, make_stdin_socket, make_iopub_socket
21 )
19 22 from .clientabc import KernelClientABC
20 23 from .connect import ConnectionFileMixin
21 24
22 25
23 26 class KernelClient(ConnectionFileMixin):
24 27 """Communicates with a single kernel on any host via zmq channels.
25 28
26 29 There are four channels associated with each kernel:
27 30
28 31 * shell: for request/reply calls to the kernel.
29 32 * iopub: for the kernel to publish results to frontends.
30 33 * hb: for monitoring the kernel's heartbeat.
31 34 * stdin: for frontends to reply to raw_input calls in the kernel.
32 35
33 36 The methods of the channels are exposed as methods of the client itself
34 37 (KernelClient.execute, complete, history, etc.).
35 38 See the channels themselves for documentation of these methods.
36 39
37 40 """
38 41
39 42 # The PyZMQ Context to use for communication with the kernel.
40 43 context = Instance(zmq.Context)
41 44 def _context_default(self):
42 45 return zmq.Context.instance()
43 46
44 47 # The classes to use for the various channels
45 48 shell_channel_class = Type(ShellChannelABC)
46 49 iopub_channel_class = Type(IOPubChannelABC)
47 50 stdin_channel_class = Type(StdInChannelABC)
48 51 hb_channel_class = Type(HBChannelABC)
49 52
50 53 # Protected traits
51 54 _shell_channel = Any
52 55 _iopub_channel = Any
53 56 _stdin_channel = Any
54 57 _hb_channel = Any
55 58
56 59 # flag for whether execute requests should be allowed to call raw_input:
57 60 allow_stdin = True
58 61
59 62 #--------------------------------------------------------------------------
60 63 # Channel proxy methods
61 64 #--------------------------------------------------------------------------
62 65
63 66 def _get_msg(channel, *args, **kwargs):
64 67 return channel.get_msg(*args, **kwargs)
65 68
66 69 def get_shell_msg(self, *args, **kwargs):
67 70 """Get a message from the shell channel"""
68 71 return self.shell_channel.get_msg(*args, **kwargs)
69 72
70 73 def get_iopub_msg(self, *args, **kwargs):
71 74 """Get a message from the iopub channel"""
72 75 return self.iopub_channel.get_msg(*args, **kwargs)
73 76
74 77 def get_stdin_msg(self, *args, **kwargs):
75 78 """Get a message from the stdin channel"""
76 79 return self.stdin_channel.get_msg(*args, **kwargs)
77 80
78 81 #--------------------------------------------------------------------------
79 82 # Channel management methods
80 83 #--------------------------------------------------------------------------
81 84
82 85 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
83 86 """Starts the channels for this kernel.
84 87
85 88 This will create the channels if they do not exist and then start
86 89 them (their activity runs in a thread). If port numbers of 0 are
87 90 being used (random ports) then you must first call
88 91 :meth:`start_kernel`. If the channels have been stopped and you
89 92 call this, :class:`RuntimeError` will be raised.
90 93 """
91 94 if shell:
92 95 self.shell_channel.start()
93 96 self.kernel_info()
94 97 if iopub:
95 98 self.iopub_channel.start()
96 99 if stdin:
97 100 self.stdin_channel.start()
98 101 self.allow_stdin = True
99 102 else:
100 103 self.allow_stdin = False
101 104 if hb:
102 105 self.hb_channel.start()
103 106
104 107 def stop_channels(self):
105 108 """Stops all the running channels for this kernel.
106 109
107 110 This stops their event loops and joins their threads.
108 111 """
109 112 if self.shell_channel.is_alive():
110 113 self.shell_channel.stop()
111 114 if self.iopub_channel.is_alive():
112 115 self.iopub_channel.stop()
113 116 if self.stdin_channel.is_alive():
114 117 self.stdin_channel.stop()
115 118 if self.hb_channel.is_alive():
116 119 self.hb_channel.stop()
117 120
118 121 @property
119 122 def channels_running(self):
120 123 """Are any of the channels created and running?"""
121 124 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
122 125 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
123 126
124 127 @property
125 128 def shell_channel(self):
126 129 """Get the shell channel object for this kernel."""
127 130 if self._shell_channel is None:
128 131 url = self._make_url('shell')
129 132 self.log.debug("connecting shell channel to %s", url)
133 socket = make_shell_socket(self.context, self.session.bsession, url)
130 134 self._shell_channel = self.shell_channel_class(
131 self.context, self.session, url
135 socket, self.session
132 136 )
133 137 return self._shell_channel
134 138
135 139 @property
136 140 def iopub_channel(self):
137 141 """Get the iopub channel object for this kernel."""
138 142 if self._iopub_channel is None:
139 143 url = self._make_url('iopub')
140 144 self.log.debug("connecting iopub channel to %s", url)
145 socket = make_iopub_socket(self.context, self.session.bsession, url)
141 146 self._iopub_channel = self.iopub_channel_class(
142 self.context, self.session, url
147 socket, self.session
143 148 )
144 149 return self._iopub_channel
145 150
146 151 @property
147 152 def stdin_channel(self):
148 153 """Get the stdin channel object for this kernel."""
149 154 if self._stdin_channel is None:
150 155 url = self._make_url('stdin')
151 156 self.log.debug("connecting stdin channel to %s", url)
157 socket = make_stdin_socket(self.context, self.session.bsession, url)
152 158 self._stdin_channel = self.stdin_channel_class(
153 self.context, self.session, url
159 socket, self.session
154 160 )
155 161 return self._stdin_channel
156 162
157 163 @property
158 164 def hb_channel(self):
159 165 """Get the hb channel object for this kernel."""
160 166 if self._hb_channel is None:
161 167 url = self._make_url('hb')
162 168 self.log.debug("connecting heartbeat channel to %s", url)
163 169 self._hb_channel = self.hb_channel_class(
164 170 self.context, self.session, url
165 171 )
166 172 return self._hb_channel
167 173
168 174 def is_alive(self):
169 175 """Is the kernel process still running?"""
170 176 if self._hb_channel is not None:
171 177 # We didn't start the kernel with this KernelManager so we
172 178 # use the heartbeat.
173 179 return self._hb_channel.is_beating()
174 180 else:
175 181 # no heartbeat and not local, we can't tell if it's running,
176 182 # so naively return True
177 183 return True
178 184
179 185
180 186 # Methods to send specific messages on channels
181 187 def execute(self, code, silent=False, store_history=True,
182 188 user_expressions=None, allow_stdin=None):
183 189 """Execute code in the kernel.
184 190
185 191 Parameters
186 192 ----------
187 193 code : str
188 194 A string of Python code.
189 195
190 196 silent : bool, optional (default False)
191 197 If set, the kernel will execute the code as quietly possible, and
192 198 will force store_history to be False.
193 199
194 200 store_history : bool, optional (default True)
195 201 If set, the kernel will store command history. This is forced
196 202 to be False if silent is True.
197 203
198 204 user_expressions : dict, optional
199 205 A dict mapping names to expressions to be evaluated in the user's
200 206 dict. The expression values are returned as strings formatted using
201 207 :func:`repr`.
202 208
203 209 allow_stdin : bool, optional (default self.allow_stdin)
204 210 Flag for whether the kernel can send stdin requests to frontends.
205 211
206 212 Some frontends (e.g. the Notebook) do not support stdin requests.
207 213 If raw_input is called from code executed from such a frontend, a
208 214 StdinNotImplementedError will be raised.
209 215
210 216 Returns
211 217 -------
212 218 The msg_id of the message sent.
213 219 """
214 220 if user_expressions is None:
215 221 user_expressions = {}
216 222 if allow_stdin is None:
217 223 allow_stdin = self.allow_stdin
218 224
219 225
220 226 # Don't waste network traffic if inputs are invalid
221 227 if not isinstance(code, string_types):
222 228 raise ValueError('code %r must be a string' % code)
223 229 validate_string_dict(user_expressions)
224 230
225 231 # Create class for content/msg creation. Related to, but possibly
226 232 # not in Session.
227 233 content = dict(code=code, silent=silent, store_history=store_history,
228 234 user_expressions=user_expressions,
229 235 allow_stdin=allow_stdin,
230 236 )
231 237 msg = self.session.msg('execute_request', content)
232 238 self.shell_channel._queue_send(msg)
233 239 return msg['header']['msg_id']
234 240
235 241 def complete(self, code, cursor_pos=None):
236 242 """Tab complete text in the kernel's namespace.
237 243
238 244 Parameters
239 245 ----------
240 246 code : str
241 247 The context in which completion is requested.
242 248 Can be anything between a variable name and an entire cell.
243 249 cursor_pos : int, optional
244 250 The position of the cursor in the block of code where the completion was requested.
245 251 Default: ``len(code)``
246 252
247 253 Returns
248 254 -------
249 255 The msg_id of the message sent.
250 256 """
251 257 if cursor_pos is None:
252 258 cursor_pos = len(code)
253 259 content = dict(code=code, cursor_pos=cursor_pos)
254 260 msg = self.session.msg('complete_request', content)
255 261 self.shell_channel._queue_send(msg)
256 262 return msg['header']['msg_id']
257 263
258 264 def inspect(self, code, cursor_pos=None, detail_level=0):
259 265 """Get metadata information about an object in the kernel's namespace.
260 266
261 267 It is up to the kernel to determine the appropriate object to inspect.
262 268
263 269 Parameters
264 270 ----------
265 271 code : str
266 272 The context in which info is requested.
267 273 Can be anything between a variable name and an entire cell.
268 274 cursor_pos : int, optional
269 275 The position of the cursor in the block of code where the info was requested.
270 276 Default: ``len(code)``
271 277 detail_level : int, optional
272 278 The level of detail for the introspection (0-2)
273 279
274 280 Returns
275 281 -------
276 282 The msg_id of the message sent.
277 283 """
278 284 if cursor_pos is None:
279 285 cursor_pos = len(code)
280 286 content = dict(code=code, cursor_pos=cursor_pos,
281 287 detail_level=detail_level,
282 288 )
283 289 msg = self.session.msg('inspect_request', content)
284 290 self.shell_channel._queue_send(msg)
285 291 return msg['header']['msg_id']
286 292
287 293 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
288 294 """Get entries from the kernel's history list.
289 295
290 296 Parameters
291 297 ----------
292 298 raw : bool
293 299 If True, return the raw input.
294 300 output : bool
295 301 If True, then return the output as well.
296 302 hist_access_type : str
297 303 'range' (fill in session, start and stop params), 'tail' (fill in n)
298 304 or 'search' (fill in pattern param).
299 305
300 306 session : int
301 307 For a range request, the session from which to get lines. Session
302 308 numbers are positive integers; negative ones count back from the
303 309 current session.
304 310 start : int
305 311 The first line number of a history range.
306 312 stop : int
307 313 The final (excluded) line number of a history range.
308 314
309 315 n : int
310 316 The number of lines of history to get for a tail request.
311 317
312 318 pattern : str
313 319 The glob-syntax pattern for a search request.
314 320
315 321 Returns
316 322 -------
317 323 The msg_id of the message sent.
318 324 """
319 325 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
320 326 **kwargs)
321 327 msg = self.session.msg('history_request', content)
322 328 self.shell_channel._queue_send(msg)
323 329 return msg['header']['msg_id']
324 330
325 331 def kernel_info(self):
326 332 """Request kernel info."""
327 333 msg = self.session.msg('kernel_info_request')
328 334 self.shell_channel._queue_send(msg)
329 335 return msg['header']['msg_id']
330 336
331 337 def _handle_kernel_info_reply(self, msg):
332 338 """handle kernel info reply
333 339
334 340 sets protocol adaptation version
335 341 """
336 342 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
337 343 if adapt_version != major_protocol_version:
338 344 self.session.adapt_version = adapt_version
339 345
340 346 def shutdown(self, restart=False):
341 347 """Request an immediate kernel shutdown.
342 348
343 349 Upon receipt of the (empty) reply, client code can safely assume that
344 350 the kernel has shut down and it's safe to forcefully terminate it if
345 351 it's still alive.
346 352
347 353 The kernel will send the reply via a function registered with Python's
348 354 atexit module, ensuring it's truly done as the kernel is done with all
349 355 normal operation.
350 356 """
351 357 # Send quit message to kernel. Once we implement kernel-side setattr,
352 358 # this should probably be done that way, but for now this will do.
353 359 msg = self.session.msg('shutdown_request', {'restart':restart})
354 360 self.shell_channel._queue_send(msg)
355 361 return msg['header']['msg_id']
356 362
357 363 def is_complete(self, code):
358 364 msg = self.session.msg('is_complete_request', {'code': code})
359 365 self.shell_channel._queue_send(msg)
360 366 return msg['header']['msg_id']
361 367
362 368 def input(self, string):
363 369 """Send a string of raw input to the kernel."""
364 370 content = dict(value=string)
365 371 msg = self.session.msg('input_reply', content)
366 372 self.stdin_channel._queue_send(msg)
367 373
368 374
369 375 KernelClientABC.register(KernelClient)
@@ -1,381 +1,370 b''
1 1 """ Defines a KernelClient that provides signals and slots.
2 2 """
3 3 import atexit
4 4 import errno
5 5 from threading import Thread
6 6 import time
7 7
8 8 import zmq
9 9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 10 # during garbage collection of threads at exit:
11 11 from zmq import ZMQError
12 12 from zmq.eventloop import ioloop, zmqstream
13 13
14 14 from IPython.external.qt import QtCore
15 15
16 16 # Local imports
17 17 from IPython.utils.traitlets import Type
18 18 from IPython.kernel.channels import HBChannel,\
19 19 make_shell_socket, make_iopub_socket, make_stdin_socket
20 20 from IPython.kernel import KernelClient
21 21
22 22 from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin)
23 23 from .util import SuperQObject
24 24
25 25 class QtHBChannel(QtHBChannelMixin, HBChannel):
26 26 pass
27 27
28 28 from IPython.core.release import kernel_protocol_version_info
29 29
30 30 from IPython.kernel.channelsabc import (
31 31 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
32 32 )
33 33 from IPython.utils.py3compat import string_types, iteritems
34 34
35 35 major_protocol_version = kernel_protocol_version_info[0]
36 36
37 37 class InvalidPortNumber(Exception):
38 38 pass
39 39
40 40 # some utilities to validate message structure, these might get moved elsewhere
41 41 # if they prove to have more generic utility
42 42
43 43
44 44 def validate_string_dict(dct):
45 45 """Validate that the input is a dict with string keys and values.
46 46
47 47 Raises ValueError if not."""
48 48 for k,v in iteritems(dct):
49 49 if not isinstance(k, string_types):
50 50 raise ValueError('key %r in dict must be a string' % k)
51 51 if not isinstance(v, string_types):
52 52 raise ValueError('value %r in dict must be a string' % v)
53 53
54 54
55 55
56 56 class QtZMQSocketChannel(SuperQObject, Thread):
57 57 """The base class for the channels that use ZMQ sockets."""
58 context = None
59 58 session = None
60 59 socket = None
61 60 ioloop = None
62 61 stream = None
63 _address = None
64 62 _exiting = False
65 63 proxy_methods = []
66 64
67 65 # Emitted when the channel is started.
68 66 started = QtCore.Signal()
69 67
70 68 # Emitted when the channel is stopped.
71 69 stopped = QtCore.Signal()
72 70
73 71 message_received = QtCore.Signal(object)
74 72
75 73 #---------------------------------------------------------------------------
76 74 # InProcessChannel interface
77 75 #---------------------------------------------------------------------------
78 76
79 77 def call_handlers_later(self, *args, **kwds):
80 78 """ Call the message handlers later.
81 79 """
82 80 do_later = lambda: self.call_handlers(*args, **kwds)
83 81 QtCore.QTimer.singleShot(0, do_later)
84 82
85 83 def process_events(self):
86 84 """ Process any pending GUI events.
87 85 """
88 86 QtCore.QCoreApplication.instance().processEvents()
89 87
90 def __init__(self, context, session, address):
88 def __init__(self, socket, session):
91 89 """Create a channel.
92 90
93 91 Parameters
94 92 ----------
95 93 context : :class:`zmq.Context`
96 94 The ZMQ context to use.
97 95 session : :class:`session.Session`
98 96 The session to use.
99 97 address : zmq url
100 98 Standard (ip, port) tuple that the kernel is listening on.
101 99 """
102 100 super(QtZMQSocketChannel, self).__init__()
103 101 self.daemon = True
104 102
105 self.context = context
103 self.socket = socket
106 104 self.session = session
107 if isinstance(address, tuple):
108 if address[1] == 0:
109 message = 'The port number for a channel cannot be 0.'
110 raise InvalidPortNumber(message)
111 address = "tcp://%s:%i" % address
112 self._address = address
113 105 atexit.register(self._notice_exit)
114 106
115 107 def _notice_exit(self):
116 108 self._exiting = True
117 109
118 110 def _run_loop(self):
119 111 """Run my loop, ignoring EINTR events in the poller"""
120 112 while True:
121 113 try:
122 114 self.ioloop.start()
123 115 except ZMQError as e:
124 116 if e.errno == errno.EINTR:
125 117 continue
126 118 else:
127 119 raise
128 120 except Exception:
129 121 if self._exiting:
130 122 break
131 123 else:
132 124 raise
133 125 else:
134 126 break
135 127
136 128 def start(self):
137 129 """ Reimplemented to emit signal.
138 130 """
139 131 super(QtZMQSocketChannel, self).start()
140 132 self.started.emit()
141 133
142 134 def stop(self):
143 135 """Stop the channel's event loop and join its thread.
144 136
145 137 This calls :meth:`~threading.Thread.join` and returns when the thread
146 138 terminates. :class:`RuntimeError` will be raised if
147 139 :meth:`~threading.Thread.start` is called again.
148 140 """
149 141 if self.ioloop is not None:
150 142 self.ioloop.stop()
151 143 self.join()
152 144 self.close()
153 145 self.stopped.emit()
154 146
155 147 def close(self):
156 148 if self.ioloop is not None:
157 149 try:
158 150 self.ioloop.close(all_fds=True)
159 151 except Exception:
160 152 pass
161 153 if self.socket is not None:
162 154 try:
163 155 self.socket.close(linger=0)
164 156 except Exception:
165 157 pass
166 158 self.socket = None
167 159
168 160 @property
169 161 def address(self):
170 162 """Get the channel's address as a zmq url string.
171 163
172 164 These URLS have the form: 'tcp://127.0.0.1:5555'.
173 165 """
174 166 return self._address
175 167
176 168 def _queue_send(self, msg):
177 169 """Queue a message to be sent from the IOLoop's thread.
178 170
179 171 Parameters
180 172 ----------
181 173 msg : message to send
182 174
183 175 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
184 176 thread control of the action.
185 177 """
186 178 def thread_send():
187 179 self.session.send(self.stream, msg)
188 180 self.ioloop.add_callback(thread_send)
189 181
190 182 def _handle_recv(self, msg):
191 183 """Callback for stream.on_recv.
192 184
193 185 Unpacks message, and calls handlers with it.
194 186 """
195 187 ident,smsg = self.session.feed_identities(msg)
196 188 msg = self.session.deserialize(smsg)
197 189 self.call_handlers(msg)
198 190
199 191 def call_handlers(self, msg):
200 192 """This method is called in the ioloop thread when a message arrives.
201 193
202 194 Subclasses should override this method to handle incoming messages.
203 195 It is important to remember that this method is called in the thread
204 196 so that some logic must be done to ensure that the application level
205 197 handlers are called in the application thread.
206 198 """
207 199 # Emit the generic signal.
208 200 self.message_received.emit(msg)
209 201
210 202
211 203 class QtShellChannel(QtZMQSocketChannel):
212 204 """The shell channel for issuing request/replies to the kernel."""
213 205
214 206 # Emitted when a reply has been received for the corresponding request type.
215 207 execute_reply = QtCore.Signal(object)
216 208 complete_reply = QtCore.Signal(object)
217 209 inspect_reply = QtCore.Signal(object)
218 210 history_reply = QtCore.Signal(object)
219 211 kernel_info_reply = QtCore.Signal(object)
220 212
221 def __init__(self, context, session, address):
222 super(QtShellChannel, self).__init__(context, session, address)
213 def __init__(self, socket, session):
214 super(QtShellChannel, self).__init__(socket, session)
223 215 self.ioloop = ioloop.IOLoop()
224 216
225 217 def run(self):
226 218 """The thread's main activity. Call start() instead."""
227 self.socket = make_shell_socket(self.context, self.session.bsession, self.address)
228 219 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
229 220 self.stream.on_recv(self._handle_recv)
230 221 self._run_loop()
231 222
232 223 def call_handlers(self, msg):
233 224 super(QtShellChannel, self).call_handlers(msg)
234 225
235 226 # Catch kernel_info_reply for message spec adaptation
236 227 msg_type = msg['header']['msg_type']
237 228 if msg_type == 'kernel_info_reply':
238 229 self._handle_kernel_info_reply(msg)
239 230
240 231 # Emit specific signals
241 232 signal = getattr(self, msg_type, None)
242 233 if signal:
243 234 signal.emit(msg)
244 235
245 236 def _handle_kernel_info_reply(self, msg):
246 237 """handle kernel info reply
247 238
248 239 sets protocol adaptation version
249 240 """
250 241 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
251 242 if adapt_version != major_protocol_version:
252 243 self.session.adapt_version = adapt_version
253 244
254 245
255 246 class QtIOPubChannel(QtZMQSocketChannel):
256 247 """The iopub channel which listens for messages that the kernel publishes.
257 248
258 249 This channel is where all output is published to frontends.
259 250 """
260 251 # Emitted when a message of type 'stream' is received.
261 252 stream_received = QtCore.Signal(object)
262 253
263 254 # Emitted when a message of type 'execute_input' is received.
264 255 execute_input_received = QtCore.Signal(object)
265 256
266 257 # Emitted when a message of type 'execute_result' is received.
267 258 execute_result_received = QtCore.Signal(object)
268 259
269 260 # Emitted when a message of type 'error' is received.
270 261 error_received = QtCore.Signal(object)
271 262
272 263 # Emitted when a message of type 'display_data' is received
273 264 display_data_received = QtCore.Signal(object)
274 265
275 266 # Emitted when a crash report message is received from the kernel's
276 267 # last-resort sys.excepthook.
277 268 crash_received = QtCore.Signal(object)
278 269
279 270 # Emitted when a shutdown is noticed.
280 271 shutdown_reply_received = QtCore.Signal(object)
281 272
282 def __init__(self, context, session, address):
283 super(QtIOPubChannel, self).__init__(context, session, address)
273 def __init__(self, socket, session):
274 super(QtIOPubChannel, self).__init__(socket, session)
284 275 self.ioloop = ioloop.IOLoop()
285 276
286 277 def run(self):
287 278 """The thread's main activity. Call start() instead."""
288 self.socket = make_iopub_socket(self.context, self.session.bsession, self.address)
289 279 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
290 280 self.stream.on_recv(self._handle_recv)
291 281 self._run_loop()
292 282
293 283 def call_handlers(self, msg):
294 284 super(QtIOPubChannel, self).call_handlers(msg)
295 285
296 286 # Emit signals for specialized message types.
297 287 msg_type = msg['header']['msg_type']
298 288 signal = getattr(self, msg_type + '_received', None)
299 289 if signal:
300 290 signal.emit(msg)
301 291
302 292 def flush(self, timeout=1.0):
303 293 """Immediately processes all pending messages on the iopub channel.
304 294
305 295 Callers should use this method to ensure that :meth:`call_handlers`
306 296 has been called for all messages that have been received on the
307 297 0MQ SUB socket of this channel.
308 298
309 299 This method is thread safe.
310 300
311 301 Parameters
312 302 ----------
313 303 timeout : float, optional
314 304 The maximum amount of time to spend flushing, in seconds. The
315 305 default is one second.
316 306 """
317 307 # We do the IOLoop callback process twice to ensure that the IOLoop
318 308 # gets to perform at least one full poll.
319 309 stop_time = time.time() + timeout
320 310 for i in range(2):
321 311 self._flushed = False
322 312 self.ioloop.add_callback(self._flush)
323 313 while not self._flushed and time.time() < stop_time:
324 314 time.sleep(0.01)
325 315
326 316 def _flush(self):
327 317 """Callback for :method:`self.flush`."""
328 318 self.stream.flush()
329 319 self._flushed = True
330 320
331 321
332 322 class QtStdInChannel(QtZMQSocketChannel):
333 323 """The stdin channel to handle raw_input requests that the kernel makes."""
334 324
335 325 msg_queue = None
336 326 proxy_methods = ['input']
337 327
338 328 # Emitted when an input request is received.
339 329 input_requested = QtCore.Signal(object)
340 330
341 def __init__(self, context, session, address):
342 super(QtStdInChannel, self).__init__(context, session, address)
331 def __init__(self, socket, session):
332 super(QtStdInChannel, self).__init__(socket, session)
343 333 self.ioloop = ioloop.IOLoop()
344 334
345 335 def run(self):
346 336 """The thread's main activity. Call start() instead."""
347 self.socket = make_stdin_socket(self.context, self.session.bsession, self.address)
348 337 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
349 338 self.stream.on_recv(self._handle_recv)
350 339 self._run_loop()
351 340
352 341 def call_handlers(self, msg):
353 342 super(QtStdInChannel, self).call_handlers(msg)
354 343
355 344 # Emit signals for specialized message types.
356 345 msg_type = msg['header']['msg_type']
357 346 if msg_type == 'input_request':
358 347 self.input_requested.emit(msg)
359 348
360 349
361 350 ShellChannelABC.register(QtShellChannel)
362 351 IOPubChannelABC.register(QtIOPubChannel)
363 352 StdInChannelABC.register(QtStdInChannel)
364 353
365 354
366 355 class QtKernelClient(QtKernelClientMixin, KernelClient):
367 356 """ A KernelClient that provides signals and slots.
368 357 """
369 358 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
370 359 if shell:
371 360 self.shell_channel.kernel_info_reply.connect(self._handle_kernel_info_reply)
372 361 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
373 362
374 363 def _handle_kernel_info_reply(self, msg):
375 364 super(QtKernelClient, self)._handle_kernel_info_reply(msg)
376 365 self.shell_channel.kernel_info_reply.disconnect(self._handle_kernel_info_reply)
377 366
378 367 iopub_channel_class = Type(QtIOPubChannel)
379 368 shell_channel_class = Type(QtShellChannel)
380 369 stdin_channel_class = Type(QtStdInChannel)
381 370 hb_channel_class = Type(QtHBChannel)
General Comments 0
You need to be logged in to leave comments. Login now