##// END OF EJS Templates
Share an IOLoop among Qt channels, rather than one each
Thomas Kluyver -
Show More
@@ -1,143 +1,143
1 1 """Blocking channels
2 2
3 3 Useful for test suites and blocking terminal interfaces.
4 4 """
5 5
6 6 # Copyright (c) IPython Development Team.
7 7 # Distributed under the terms of the Modified BSD License.
8 8
9 9 try:
10 10 from queue import Queue, Empty # Py 3
11 11 except ImportError:
12 12 from Queue import Queue, Empty # Py 2
13 13
14 14 from IPython.kernel.channelsabc import ShellChannelABC, IOPubChannelABC, \
15 15 StdInChannelABC
16 16 from IPython.kernel.channels import HBChannel,\
17 17 make_iopub_socket, make_shell_socket, make_stdin_socket,\
18 18 InvalidPortNumber, major_protocol_version
19 19 from IPython.utils.py3compat import string_types, iteritems
20 20
21 21 # some utilities to validate message structure, these might get moved elsewhere
22 22 # if they prove to have more generic utility
23 23
24 24 def validate_string_list(lst):
25 25 """Validate that the input is a list of strings.
26 26
27 27 Raises ValueError if not."""
28 28 if not isinstance(lst, list):
29 29 raise ValueError('input %r must be a list' % lst)
30 30 for x in lst:
31 31 if not isinstance(x, string_types):
32 32 raise ValueError('element %r in list must be a string' % x)
33 33
34 34
35 35 def validate_string_dict(dct):
36 36 """Validate that the input is a dict with string keys and values.
37 37
38 38 Raises ValueError if not."""
39 39 for k,v in iteritems(dct):
40 40 if not isinstance(k, string_types):
41 41 raise ValueError('key %r in dict must be a string' % k)
42 42 if not isinstance(v, string_types):
43 43 raise ValueError('value %r in dict must be a string' % v)
44 44
45 45
46 46 class ZMQSocketChannel(object):
47 47 """The base class for the channels that use ZMQ sockets."""
48 48 session = None
49 49 socket = None
50 50 stream = None
51 51 _exiting = False
52 52 proxy_methods = []
53 53
54 def __init__(self, socket, session):
54 def __init__(self, socket, session, loop=None):
55 55 """Create a channel.
56 56
57 57 Parameters
58 58 ----------
59 context : :class:`zmq.Context`
60 The ZMQ context to use.
59 socket : :class:`zmq.Socket`
60 The ZMQ socket to use.
61 61 session : :class:`session.Session`
62 62 The session to use.
63 address : zmq url
64 Standard (ip, port) tuple that the kernel is listening on.
63 loop
64 Unused here, for other implementations
65 65 """
66 66 super(ZMQSocketChannel, self).__init__()
67 67
68 68 self.socket = socket
69 69 self.session = session
70 70
71 71 def _recv(self, **kwargs):
72 72 msg = self.socket.recv_multipart(**kwargs)
73 73 ident,smsg = self.session.feed_identities(msg)
74 74 return self.session.deserialize(smsg)
75 75
76 76 def get_msg(self, block=True, timeout=None):
77 77 """ Gets a message if there is one that is ready. """
78 78 if block:
79 79 if timeout is not None:
80 80 timeout *= 1000 # seconds to ms
81 81 ready = self.socket.poll(timeout)
82 82 else:
83 83 ready = self.socket.poll(timeout=0)
84 84
85 85 if ready:
86 86 return self._recv()
87 87 else:
88 88 raise Empty
89 89
90 90 def get_msgs(self):
91 91 """ Get all messages that are currently ready. """
92 92 msgs = []
93 93 while True:
94 94 try:
95 95 msgs.append(self.get_msg(block=False))
96 96 except Empty:
97 97 break
98 98 return msgs
99 99
100 100 def msg_ready(self):
101 101 """ Is there a message that has been received? """
102 102 return bool(self.socket.poll(timeout=0))
103 103
104 104 def close(self):
105 105 if self.socket is not None:
106 106 try:
107 107 self.socket.close(linger=0)
108 108 except Exception:
109 109 pass
110 110 self.socket = None
111 111 stop = close
112 112
113 113 def is_alive(self):
114 114 return (self.socket is not None)
115 115
116 116 @property
117 117 def address(self):
118 118 """Get the channel's address as a zmq url string.
119 119
120 120 These URLS have the form: 'tcp://127.0.0.1:5555'.
121 121 """
122 122 return self._address
123 123
124 124 def _queue_send(self, msg):
125 125 """Pass a message to the ZMQ socket to send
126 126 """
127 127 self.session.send(self.socket, msg)
128 128
129 129 def start(self):
130 130 pass
131 131
132 132
133 133
134 134 class BlockingHBChannel(HBChannel):
135 135
136 136 # This kernel needs quicker monitoring, shorten to 1 sec.
137 137 # less than 0.5s is unreliable, and will get occasional
138 138 # false reports of missed beats.
139 139 time_to_dead = 1.
140 140
141 141 def call_handlers(self, since_last_heartbeat):
142 142 """ Pause beating on missed heartbeat. """
143 143 pass
@@ -1,375 +1,377
1 1 """Base class to manage the interaction with a running kernel"""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from __future__ import absolute_import
7 7 from IPython.kernel.channels import validate_string_dict, major_protocol_version
8 8 from IPython.utils.py3compat import string_types
9 9
10 10 import zmq
11 11
12 12 from IPython.utils.traitlets import (
13 13 Any, Instance, Type,
14 14 )
15 15
16 16 from .channelsabc import (
17 17 ShellChannelABC, IOPubChannelABC, HBChannelABC, StdInChannelABC
18 18 )
19 19 from .channels import (
20 20 make_shell_socket, make_stdin_socket, make_iopub_socket
21 21 )
22 22 from .clientabc import KernelClientABC
23 23 from .connect import ConnectionFileMixin
24 24
25 25
26 26 class KernelClient(ConnectionFileMixin):
27 27 """Communicates with a single kernel on any host via zmq channels.
28 28
29 29 There are four channels associated with each kernel:
30 30
31 31 * shell: for request/reply calls to the kernel.
32 32 * iopub: for the kernel to publish results to frontends.
33 33 * hb: for monitoring the kernel's heartbeat.
34 34 * stdin: for frontends to reply to raw_input calls in the kernel.
35 35
36 36 The methods of the channels are exposed as methods of the client itself
37 37 (KernelClient.execute, complete, history, etc.).
38 38 See the channels themselves for documentation of these methods.
39 39
40 40 """
41 41
42 42 # The PyZMQ Context to use for communication with the kernel.
43 43 context = Instance(zmq.Context)
44 44 def _context_default(self):
45 45 return zmq.Context.instance()
46 46
47 47 # The classes to use for the various channels
48 48 shell_channel_class = Type(ShellChannelABC)
49 49 iopub_channel_class = Type(IOPubChannelABC)
50 50 stdin_channel_class = Type(StdInChannelABC)
51 51 hb_channel_class = Type(HBChannelABC)
52 52
53 53 # Protected traits
54 54 _shell_channel = Any
55 55 _iopub_channel = Any
56 56 _stdin_channel = Any
57 57 _hb_channel = Any
58 58
59 59 # flag for whether execute requests should be allowed to call raw_input:
60 60 allow_stdin = True
61 61
62 62 #--------------------------------------------------------------------------
63 63 # Channel proxy methods
64 64 #--------------------------------------------------------------------------
65 65
66 66 def _get_msg(channel, *args, **kwargs):
67 67 return channel.get_msg(*args, **kwargs)
68 68
69 69 def get_shell_msg(self, *args, **kwargs):
70 70 """Get a message from the shell channel"""
71 71 return self.shell_channel.get_msg(*args, **kwargs)
72 72
73 73 def get_iopub_msg(self, *args, **kwargs):
74 74 """Get a message from the iopub channel"""
75 75 return self.iopub_channel.get_msg(*args, **kwargs)
76 76
77 77 def get_stdin_msg(self, *args, **kwargs):
78 78 """Get a message from the stdin channel"""
79 79 return self.stdin_channel.get_msg(*args, **kwargs)
80 80
81 81 #--------------------------------------------------------------------------
82 82 # Channel management methods
83 83 #--------------------------------------------------------------------------
84 84
85 85 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
86 86 """Starts the channels for this kernel.
87 87
88 88 This will create the channels if they do not exist and then start
89 89 them (their activity runs in a thread). If port numbers of 0 are
90 90 being used (random ports) then you must first call
91 91 :meth:`start_kernel`. If the channels have been stopped and you
92 92 call this, :class:`RuntimeError` will be raised.
93 93 """
94 94 if shell:
95 95 self.shell_channel.start()
96 96 self.kernel_info()
97 97 if iopub:
98 98 self.iopub_channel.start()
99 99 if stdin:
100 100 self.stdin_channel.start()
101 101 self.allow_stdin = True
102 102 else:
103 103 self.allow_stdin = False
104 104 if hb:
105 105 self.hb_channel.start()
106 106
107 107 def stop_channels(self):
108 108 """Stops all the running channels for this kernel.
109 109
110 110 This stops their event loops and joins their threads.
111 111 """
112 112 if self.shell_channel.is_alive():
113 113 self.shell_channel.stop()
114 114 if self.iopub_channel.is_alive():
115 115 self.iopub_channel.stop()
116 116 if self.stdin_channel.is_alive():
117 117 self.stdin_channel.stop()
118 118 if self.hb_channel.is_alive():
119 119 self.hb_channel.stop()
120 120
121 121 @property
122 122 def channels_running(self):
123 123 """Are any of the channels created and running?"""
124 124 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
125 125 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
126 126
127 ioloop = None # Overridden in subclasses that use pyzmq event loop
128
127 129 @property
128 130 def shell_channel(self):
129 131 """Get the shell channel object for this kernel."""
130 132 if self._shell_channel is None:
131 133 url = self._make_url('shell')
132 134 self.log.debug("connecting shell channel to %s", url)
133 135 socket = make_shell_socket(self.context, self.session.bsession, url)
134 136 self._shell_channel = self.shell_channel_class(
135 socket, self.session
137 socket, self.session, self.ioloop
136 138 )
137 139 return self._shell_channel
138 140
139 141 @property
140 142 def iopub_channel(self):
141 143 """Get the iopub channel object for this kernel."""
142 144 if self._iopub_channel is None:
143 145 url = self._make_url('iopub')
144 146 self.log.debug("connecting iopub channel to %s", url)
145 147 socket = make_iopub_socket(self.context, self.session.bsession, url)
146 148 self._iopub_channel = self.iopub_channel_class(
147 socket, self.session
149 socket, self.session, self.ioloop
148 150 )
149 151 return self._iopub_channel
150 152
151 153 @property
152 154 def stdin_channel(self):
153 155 """Get the stdin channel object for this kernel."""
154 156 if self._stdin_channel is None:
155 157 url = self._make_url('stdin')
156 158 self.log.debug("connecting stdin channel to %s", url)
157 159 socket = make_stdin_socket(self.context, self.session.bsession, url)
158 160 self._stdin_channel = self.stdin_channel_class(
159 socket, self.session
161 socket, self.session, self.ioloop
160 162 )
161 163 return self._stdin_channel
162 164
163 165 @property
164 166 def hb_channel(self):
165 167 """Get the hb channel object for this kernel."""
166 168 if self._hb_channel is None:
167 169 url = self._make_url('hb')
168 170 self.log.debug("connecting heartbeat channel to %s", url)
169 171 self._hb_channel = self.hb_channel_class(
170 172 self.context, self.session, url
171 173 )
172 174 return self._hb_channel
173 175
174 176 def is_alive(self):
175 177 """Is the kernel process still running?"""
176 178 if self._hb_channel is not None:
177 179 # We didn't start the kernel with this KernelManager so we
178 180 # use the heartbeat.
179 181 return self._hb_channel.is_beating()
180 182 else:
181 183 # no heartbeat and not local, we can't tell if it's running,
182 184 # so naively return True
183 185 return True
184 186
185 187
186 188 # Methods to send specific messages on channels
187 189 def execute(self, code, silent=False, store_history=True,
188 190 user_expressions=None, allow_stdin=None):
189 191 """Execute code in the kernel.
190 192
191 193 Parameters
192 194 ----------
193 195 code : str
194 196 A string of Python code.
195 197
196 198 silent : bool, optional (default False)
197 199 If set, the kernel will execute the code as quietly possible, and
198 200 will force store_history to be False.
199 201
200 202 store_history : bool, optional (default True)
201 203 If set, the kernel will store command history. This is forced
202 204 to be False if silent is True.
203 205
204 206 user_expressions : dict, optional
205 207 A dict mapping names to expressions to be evaluated in the user's
206 208 dict. The expression values are returned as strings formatted using
207 209 :func:`repr`.
208 210
209 211 allow_stdin : bool, optional (default self.allow_stdin)
210 212 Flag for whether the kernel can send stdin requests to frontends.
211 213
212 214 Some frontends (e.g. the Notebook) do not support stdin requests.
213 215 If raw_input is called from code executed from such a frontend, a
214 216 StdinNotImplementedError will be raised.
215 217
216 218 Returns
217 219 -------
218 220 The msg_id of the message sent.
219 221 """
220 222 if user_expressions is None:
221 223 user_expressions = {}
222 224 if allow_stdin is None:
223 225 allow_stdin = self.allow_stdin
224 226
225 227
226 228 # Don't waste network traffic if inputs are invalid
227 229 if not isinstance(code, string_types):
228 230 raise ValueError('code %r must be a string' % code)
229 231 validate_string_dict(user_expressions)
230 232
231 233 # Create class for content/msg creation. Related to, but possibly
232 234 # not in Session.
233 235 content = dict(code=code, silent=silent, store_history=store_history,
234 236 user_expressions=user_expressions,
235 237 allow_stdin=allow_stdin,
236 238 )
237 239 msg = self.session.msg('execute_request', content)
238 240 self.shell_channel._queue_send(msg)
239 241 return msg['header']['msg_id']
240 242
241 243 def complete(self, code, cursor_pos=None):
242 244 """Tab complete text in the kernel's namespace.
243 245
244 246 Parameters
245 247 ----------
246 248 code : str
247 249 The context in which completion is requested.
248 250 Can be anything between a variable name and an entire cell.
249 251 cursor_pos : int, optional
250 252 The position of the cursor in the block of code where the completion was requested.
251 253 Default: ``len(code)``
252 254
253 255 Returns
254 256 -------
255 257 The msg_id of the message sent.
256 258 """
257 259 if cursor_pos is None:
258 260 cursor_pos = len(code)
259 261 content = dict(code=code, cursor_pos=cursor_pos)
260 262 msg = self.session.msg('complete_request', content)
261 263 self.shell_channel._queue_send(msg)
262 264 return msg['header']['msg_id']
263 265
264 266 def inspect(self, code, cursor_pos=None, detail_level=0):
265 267 """Get metadata information about an object in the kernel's namespace.
266 268
267 269 It is up to the kernel to determine the appropriate object to inspect.
268 270
269 271 Parameters
270 272 ----------
271 273 code : str
272 274 The context in which info is requested.
273 275 Can be anything between a variable name and an entire cell.
274 276 cursor_pos : int, optional
275 277 The position of the cursor in the block of code where the info was requested.
276 278 Default: ``len(code)``
277 279 detail_level : int, optional
278 280 The level of detail for the introspection (0-2)
279 281
280 282 Returns
281 283 -------
282 284 The msg_id of the message sent.
283 285 """
284 286 if cursor_pos is None:
285 287 cursor_pos = len(code)
286 288 content = dict(code=code, cursor_pos=cursor_pos,
287 289 detail_level=detail_level,
288 290 )
289 291 msg = self.session.msg('inspect_request', content)
290 292 self.shell_channel._queue_send(msg)
291 293 return msg['header']['msg_id']
292 294
293 295 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
294 296 """Get entries from the kernel's history list.
295 297
296 298 Parameters
297 299 ----------
298 300 raw : bool
299 301 If True, return the raw input.
300 302 output : bool
301 303 If True, then return the output as well.
302 304 hist_access_type : str
303 305 'range' (fill in session, start and stop params), 'tail' (fill in n)
304 306 or 'search' (fill in pattern param).
305 307
306 308 session : int
307 309 For a range request, the session from which to get lines. Session
308 310 numbers are positive integers; negative ones count back from the
309 311 current session.
310 312 start : int
311 313 The first line number of a history range.
312 314 stop : int
313 315 The final (excluded) line number of a history range.
314 316
315 317 n : int
316 318 The number of lines of history to get for a tail request.
317 319
318 320 pattern : str
319 321 The glob-syntax pattern for a search request.
320 322
321 323 Returns
322 324 -------
323 325 The msg_id of the message sent.
324 326 """
325 327 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
326 328 **kwargs)
327 329 msg = self.session.msg('history_request', content)
328 330 self.shell_channel._queue_send(msg)
329 331 return msg['header']['msg_id']
330 332
331 333 def kernel_info(self):
332 334 """Request kernel info."""
333 335 msg = self.session.msg('kernel_info_request')
334 336 self.shell_channel._queue_send(msg)
335 337 return msg['header']['msg_id']
336 338
337 339 def _handle_kernel_info_reply(self, msg):
338 340 """handle kernel info reply
339 341
340 342 sets protocol adaptation version
341 343 """
342 344 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
343 345 if adapt_version != major_protocol_version:
344 346 self.session.adapt_version = adapt_version
345 347
346 348 def shutdown(self, restart=False):
347 349 """Request an immediate kernel shutdown.
348 350
349 351 Upon receipt of the (empty) reply, client code can safely assume that
350 352 the kernel has shut down and it's safe to forcefully terminate it if
351 353 it's still alive.
352 354
353 355 The kernel will send the reply via a function registered with Python's
354 356 atexit module, ensuring it's truly done as the kernel is done with all
355 357 normal operation.
356 358 """
357 359 # Send quit message to kernel. Once we implement kernel-side setattr,
358 360 # this should probably be done that way, but for now this will do.
359 361 msg = self.session.msg('shutdown_request', {'restart':restart})
360 362 self.shell_channel._queue_send(msg)
361 363 return msg['header']['msg_id']
362 364
363 365 def is_complete(self, code):
364 366 msg = self.session.msg('is_complete_request', {'code': code})
365 367 self.shell_channel._queue_send(msg)
366 368 return msg['header']['msg_id']
367 369
368 370 def input(self, string):
369 371 """Send a string of raw input to the kernel."""
370 372 content = dict(value=string)
371 373 msg = self.session.msg('input_reply', content)
372 374 self.stdin_channel._queue_send(msg)
373 375
374 376
375 377 KernelClientABC.register(KernelClient)
@@ -1,257 +1,278
1 1 """ Defines a KernelClient that provides signals and slots.
2 2 """
3 3 import atexit
4 4 import errno
5 5 from threading import Thread
6 6 import time
7 7
8 8 import zmq
9 9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 10 # during garbage collection of threads at exit:
11 11 from zmq import ZMQError
12 12 from zmq.eventloop import ioloop, zmqstream
13 13
14 14 from IPython.external.qt import QtCore
15 15
16 16 # Local imports
17 from IPython.utils.traitlets import Type
17 from IPython.utils.traitlets import Type, Instance
18 18 from IPython.kernel.channels import HBChannel,\
19 19 make_shell_socket, make_iopub_socket, make_stdin_socket
20 20 from IPython.kernel import KernelClient
21 21
22 22 from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin)
23 23 from .util import SuperQObject
24 24
25 25 class QtHBChannel(QtHBChannelMixin, HBChannel):
26 26 pass
27 27
28 28 from IPython.core.release import kernel_protocol_version_info
29 29
30 30 from IPython.kernel.channelsabc import (
31 31 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
32 32 )
33 33 from IPython.utils.py3compat import string_types, iteritems
34 34
35 35 major_protocol_version = kernel_protocol_version_info[0]
36 36
37 37 class InvalidPortNumber(Exception):
38 38 pass
39 39
40 40 # some utilities to validate message structure, these might get moved elsewhere
41 41 # if they prove to have more generic utility
42 42
43 43
44 44 def validate_string_dict(dct):
45 45 """Validate that the input is a dict with string keys and values.
46 46
47 47 Raises ValueError if not."""
48 48 for k,v in iteritems(dct):
49 49 if not isinstance(k, string_types):
50 50 raise ValueError('key %r in dict must be a string' % k)
51 51 if not isinstance(v, string_types):
52 52 raise ValueError('value %r in dict must be a string' % v)
53 53
54 54
55 55
56 class QtZMQSocketChannel(SuperQObject, Thread):
56 class QtZMQSocketChannel(SuperQObject):
57 57 """The base class for the channels that use ZMQ sockets."""
58 58 session = None
59 59 socket = None
60 60 ioloop = None
61 61 stream = None
62 _exiting = False
63 proxy_methods = []
64
65 # Emitted when the channel is started.
66 started = QtCore.Signal()
67
68 # Emitted when the channel is stopped.
69 stopped = QtCore.Signal()
70 62
71 63 message_received = QtCore.Signal(object)
72 64
73 65 #---------------------------------------------------------------------------
74 66 # InProcessChannel interface
75 67 #---------------------------------------------------------------------------
76 68
77 69 def call_handlers_later(self, *args, **kwds):
78 70 """ Call the message handlers later.
79 71 """
80 72 do_later = lambda: self.call_handlers(*args, **kwds)
81 73 QtCore.QTimer.singleShot(0, do_later)
82 74
83 75 def process_events(self):
84 76 """ Process any pending GUI events.
85 77 """
86 78 QtCore.QCoreApplication.instance().processEvents()
87 79
88 def __init__(self, socket, session):
80 def __init__(self, socket, session, loop):
89 81 """Create a channel.
90 82
91 83 Parameters
92 84 ----------
93 context : :class:`zmq.Context`
94 The ZMQ context to use.
85 socket : :class:`zmq.Socket`
86 The ZMQ socket to use.
95 87 session : :class:`session.Session`
96 88 The session to use.
97 address : zmq url
98 Standard (ip, port) tuple that the kernel is listening on.
89 loop
90 A pyzmq ioloop to connect the socket to using a ZMQStream
99 91 """
100 92 super(QtZMQSocketChannel, self).__init__()
101 self.daemon = True
102 93
103 94 self.socket = socket
104 95 self.session = session
105 atexit.register(self._notice_exit)
106 self.ioloop = ioloop.IOLoop()
96 self.ioloop = loop
107 97
108 def _notice_exit(self):
109 self._exiting = True
98 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
99 self.stream.on_recv(self._handle_recv)
110 100
111 def _run_loop(self):
112 """Run my loop, ignoring EINTR events in the poller"""
113 while True:
114 try:
115 self.ioloop.start()
116 except ZMQError as e:
117 if e.errno == errno.EINTR:
118 continue
119 else:
120 raise
121 except Exception:
122 if self._exiting:
123 break
124 else:
125 raise
126 else:
127 break
101 _is_alive = False
102 def is_alive(self):
103 return self._is_alive
128 104
129 105 def start(self):
130 """ Reimplemented to emit signal.
131 """
132 super(QtZMQSocketChannel, self).start()
133 self.started.emit()
134
135 def run(self):
136 """The thread's main activity. Call start() instead."""
137 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
138 self.stream.on_recv(self._handle_recv)
139 self._run_loop()
106 self._is_alive = True
140 107
141 108 def stop(self):
142 """Stop the channel's event loop and join its thread.
143
144 This calls :meth:`~threading.Thread.join` and returns when the thread
145 terminates. :class:`RuntimeError` will be raised if
146 :meth:`~threading.Thread.start` is called again.
147 """
148 if self.ioloop is not None:
149 self.ioloop.stop()
150 self.join()
151 self.close()
152 self.stopped.emit()
109 self._is_alive = False
153 110
154 111 def close(self):
155 if self.ioloop is not None:
156 try:
157 self.ioloop.close(all_fds=True)
158 except Exception:
159 pass
160 112 if self.socket is not None:
161 113 try:
162 114 self.socket.close(linger=0)
163 115 except Exception:
164 116 pass
165 117 self.socket = None
166 118
167 119 @property
168 120 def address(self):
169 121 """Get the channel's address as a zmq url string.
170 122
171 123 These URLS have the form: 'tcp://127.0.0.1:5555'.
172 124 """
173 125 return self._address
174 126
175 127 def _queue_send(self, msg):
176 128 """Queue a message to be sent from the IOLoop's thread.
177 129
178 130 Parameters
179 131 ----------
180 132 msg : message to send
181 133
182 134 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
183 135 thread control of the action.
184 136 """
185 137 def thread_send():
186 138 self.session.send(self.stream, msg)
187 139 self.ioloop.add_callback(thread_send)
188 140
189 141 def _handle_recv(self, msg):
190 142 """Callback for stream.on_recv.
191 143
192 144 Unpacks message, and calls handlers with it.
193 145 """
194 146 ident,smsg = self.session.feed_identities(msg)
195 147 msg = self.session.deserialize(smsg)
196 148 self.call_handlers(msg)
197 149
198 150 def call_handlers(self, msg):
199 151 """This method is called in the ioloop thread when a message arrives.
200 152
201 153 Subclasses should override this method to handle incoming messages.
202 154 It is important to remember that this method is called in the thread
203 155 so that some logic must be done to ensure that the application level
204 156 handlers are called in the application thread.
205 157 """
206 158 # Emit the generic signal.
207 159 self.message_received.emit(msg)
208 160
209 161 def flush(self, timeout=1.0):
210 162 """Immediately processes all pending messages on this channel.
211 163
212 164 This is only used for the IOPub channel.
213 165
214 166 Callers should use this method to ensure that :meth:`call_handlers`
215 167 has been called for all messages that have been received on the
216 168 0MQ SUB socket of this channel.
217 169
218 170 This method is thread safe.
219 171
220 172 Parameters
221 173 ----------
222 174 timeout : float, optional
223 175 The maximum amount of time to spend flushing, in seconds. The
224 176 default is one second.
225 177 """
226 178 # We do the IOLoop callback process twice to ensure that the IOLoop
227 179 # gets to perform at least one full poll.
228 180 stop_time = time.time() + timeout
229 181 for i in range(2):
230 182 self._flushed = False
231 183 self.ioloop.add_callback(self._flush)
232 184 while not self._flushed and time.time() < stop_time:
233 185 time.sleep(0.01)
234 186
235 187 def _flush(self):
236 188 """Callback for :method:`self.flush`."""
237 189 self.stream.flush()
238 190 self._flushed = True
239 191
240 192
193 class IOLoopThread(Thread):
194 """Run a pyzmq ioloop in a thread to send and receive messages
195 """
196 def __init__(self, loop):
197 super(IOLoopThread, self).__init__()
198 self.daemon = True
199 atexit.register(self._notice_exit)
200 self.ioloop = loop or ioloop.IOLoop()
201
202 def _notice_exit(self):
203 self._exiting = True
204
205 def run(self):
206 """Run my loop, ignoring EINTR events in the poller"""
207 while True:
208 try:
209 self.ioloop.start()
210 except ZMQError as e:
211 if e.errno == errno.EINTR:
212 continue
213 else:
214 raise
215 except Exception:
216 if self._exiting:
217 break
218 else:
219 raise
220 else:
221 break
222
223 def stop(self):
224 """Stop the channel's event loop and join its thread.
225
226 This calls :meth:`~threading.Thread.join` and returns when the thread
227 terminates. :class:`RuntimeError` will be raised if
228 :meth:`~threading.Thread.start` is called again.
229 """
230 if self.ioloop is not None:
231 self.ioloop.stop()
232 self.join()
233 self.close()
234
235 def close(self):
236 if self.ioloop is not None:
237 try:
238 self.ioloop.close(all_fds=True)
239 except Exception:
240 pass
241
242
241 243 class QtKernelClient(QtKernelClientMixin, KernelClient):
242 244 """ A KernelClient that provides signals and slots.
243 245 """
246
247 _ioloop = None
248 @property
249 def ioloop(self):
250 if self._ioloop is None:
251 self._ioloop = ioloop.IOLoop()
252 return self._ioloop
253
254 ioloop_thread = Instance(IOLoopThread)
255
244 256 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
245 257 if shell:
246 258 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
259
260 self.channel_listener_thread = IOLoopThread(self.ioloop)
261 self.channel_listener_thread.start()
262
247 263 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
248 264
249 265 def _check_kernel_info_reply(self, msg):
250 266 if msg['msg_type'] == 'kernel_info_reply':
251 267 self._handle_kernel_info_reply(msg)
252 268 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
253 269
270 def stop_channels(self):
271 super(QtKernelClient, self).stop_channels()
272 if self.ioloop_thread.is_alive():
273 self.ioloop_thread.stop()
274
254 275 iopub_channel_class = Type(QtZMQSocketChannel)
255 276 shell_channel_class = Type(QtZMQSocketChannel)
256 277 stdin_channel_class = Type(QtZMQSocketChannel)
257 278 hb_channel_class = Type(QtHBChannel)
General Comments 0
You need to be logged in to leave comments. Login now