##// END OF EJS Templates
Simplify HBChannel inheritance
Thomas Kluyver -
Show More
@@ -1,135 +1,118 b''
1 1 """Blocking channels
2 2
3 3 Useful for test suites and blocking terminal interfaces.
4 4 """
5 5
6 6 # Copyright (c) IPython Development Team.
7 7 # Distributed under the terms of the Modified BSD License.
8 8
9 9 try:
10 10 from queue import Queue, Empty # Py 3
11 11 except ImportError:
12 12 from Queue import Queue, Empty # Py 2
13 13
14 from IPython.kernel.channelsabc import ShellChannelABC, IOPubChannelABC, \
15 StdInChannelABC
16 from IPython.kernel.channels import HBChannel,\
17 make_iopub_socket, make_shell_socket, make_stdin_socket,\
18 InvalidPortNumber, major_protocol_version
19 14 from IPython.utils.py3compat import string_types, iteritems
20 15
21 16 # some utilities to validate message structure, these might get moved elsewhere
22 17 # if they prove to have more generic utility
23 18
24 19 def validate_string_list(lst):
25 20 """Validate that the input is a list of strings.
26 21
27 22 Raises ValueError if not."""
28 23 if not isinstance(lst, list):
29 24 raise ValueError('input %r must be a list' % lst)
30 25 for x in lst:
31 26 if not isinstance(x, string_types):
32 27 raise ValueError('element %r in list must be a string' % x)
33 28
34 29
35 30 def validate_string_dict(dct):
36 31 """Validate that the input is a dict with string keys and values.
37 32
38 33 Raises ValueError if not."""
39 34 for k,v in iteritems(dct):
40 35 if not isinstance(k, string_types):
41 36 raise ValueError('key %r in dict must be a string' % k)
42 37 if not isinstance(v, string_types):
43 38 raise ValueError('value %r in dict must be a string' % v)
44 39
45 40
46 41 class ZMQSocketChannel(object):
47 42 """A ZMQ socket in a simple blocking API"""
48 43 session = None
49 44 socket = None
50 45 stream = None
51 46 _exiting = False
52 47 proxy_methods = []
53 48
54 49 def __init__(self, socket, session, loop=None):
55 50 """Create a channel.
56 51
57 52 Parameters
58 53 ----------
59 54 socket : :class:`zmq.Socket`
60 55 The ZMQ socket to use.
61 56 session : :class:`session.Session`
62 57 The session to use.
63 58 loop
64 59 Unused here, for other implementations
65 60 """
66 61 super(ZMQSocketChannel, self).__init__()
67 62
68 63 self.socket = socket
69 64 self.session = session
70 65
71 66 def _recv(self, **kwargs):
72 67 msg = self.socket.recv_multipart(**kwargs)
73 68 ident,smsg = self.session.feed_identities(msg)
74 69 return self.session.deserialize(smsg)
75 70
76 71 def get_msg(self, block=True, timeout=None):
77 72 """ Gets a message if there is one that is ready. """
78 73 if block:
79 74 if timeout is not None:
80 75 timeout *= 1000 # seconds to ms
81 76 ready = self.socket.poll(timeout)
82 77 else:
83 78 ready = self.socket.poll(timeout=0)
84 79
85 80 if ready:
86 81 return self._recv()
87 82 else:
88 83 raise Empty
89 84
90 85 def get_msgs(self):
91 86 """ Get all messages that are currently ready. """
92 87 msgs = []
93 88 while True:
94 89 try:
95 90 msgs.append(self.get_msg(block=False))
96 91 except Empty:
97 92 break
98 93 return msgs
99 94
100 95 def msg_ready(self):
101 96 """ Is there a message that has been received? """
102 97 return bool(self.socket.poll(timeout=0))
103 98
104 99 def close(self):
105 100 if self.socket is not None:
106 101 try:
107 102 self.socket.close(linger=0)
108 103 except Exception:
109 104 pass
110 105 self.socket = None
111 106 stop = close
112 107
113 108 def is_alive(self):
114 109 return (self.socket is not None)
115 110
116 111 def _queue_send(self, msg):
117 112 """Pass a message to the ZMQ socket to send
118 113 """
119 114 self.session.send(self.socket, msg)
120 115
121 116 def start(self):
122 117 pass
123 118
124
125
126 class BlockingHBChannel(HBChannel):
127
128 # This kernel needs quicker monitoring, shorten to 1 sec.
129 # less than 0.5s is unreliable, and will get occasional
130 # false reports of missed beats.
131 time_to_dead = 1.
132
133 def call_handlers(self, since_last_heartbeat):
134 """ Pause beating on missed heartbeat. """
135 pass
@@ -1,38 +1,39 b''
1 1 """Implements a fully blocking kernel client.
2 2
3 3 Useful for test suites and blocking terminal interfaces.
4 4 """
5 5 # Copyright (c) IPython Development Team.
6 6 # Distributed under the terms of the Modified BSD License.
7 7
8 8 try:
9 9 from queue import Empty # Python 3
10 10 except ImportError:
11 11 from Queue import Empty # Python 2
12 12
13 13 from IPython.utils.traitlets import Type
14 from IPython.kernel.channels import HBChannel
14 15 from IPython.kernel.client import KernelClient
15 from .channels import ZMQSocketChannel, BlockingHBChannel
16 from .channels import ZMQSocketChannel
16 17
17 18 class BlockingKernelClient(KernelClient):
18 19 def wait_for_ready(self):
19 20 # Wait for kernel info reply on shell channel
20 21 while True:
21 22 msg = self.shell_channel.get_msg(block=True)
22 23 if msg['msg_type'] == 'kernel_info_reply':
23 24 self._handle_kernel_info_reply(msg)
24 25 break
25 26
26 27 # Flush IOPub channel
27 28 while True:
28 29 try:
29 30 msg = self.iopub_channel.get_msg(block=True, timeout=0.2)
30 31 print(msg['msg_type'])
31 32 except Empty:
32 33 break
33 34
34 35 # The classes to use for the various channels
35 36 shell_channel_class = Type(ZMQSocketChannel)
36 37 iopub_channel_class = Type(ZMQSocketChannel)
37 38 stdin_channel_class = Type(ZMQSocketChannel)
38 hb_channel_class = Type(BlockingHBChannel)
39 hb_channel_class = Type(HBChannel)
@@ -1,338 +1,338 b''
1 1 """Base classes to manage a Client's interaction with a running kernel"""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from __future__ import absolute_import
7 7
8 8 import atexit
9 9 import errno
10 10 from threading import Thread
11 11 import time
12 12
13 13 import zmq
14 14 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
15 15 # during garbage collection of threads at exit:
16 16 from zmq import ZMQError
17 17 from zmq.eventloop import ioloop, zmqstream
18 18
19 19 from IPython.core.release import kernel_protocol_version_info
20 20
21 21 from .channelsabc import (
22 22 ShellChannelABC, IOPubChannelABC,
23 23 HBChannelABC, StdInChannelABC,
24 24 )
25 25 from IPython.utils.py3compat import string_types, iteritems
26 26
27 27 #-----------------------------------------------------------------------------
28 28 # Constants and exceptions
29 29 #-----------------------------------------------------------------------------
30 30
31 31 major_protocol_version = kernel_protocol_version_info[0]
32 32
33 33 class InvalidPortNumber(Exception):
34 34 pass
35 35
36 36 #-----------------------------------------------------------------------------
37 37 # Utility functions
38 38 #-----------------------------------------------------------------------------
39 39
40 40 # some utilities to validate message structure, these might get moved elsewhere
41 41 # if they prove to have more generic utility
42 42
43 43 def validate_string_list(lst):
44 44 """Validate that the input is a list of strings.
45 45
46 46 Raises ValueError if not."""
47 47 if not isinstance(lst, list):
48 48 raise ValueError('input %r must be a list' % lst)
49 49 for x in lst:
50 50 if not isinstance(x, string_types):
51 51 raise ValueError('element %r in list must be a string' % x)
52 52
53 53
54 54 def validate_string_dict(dct):
55 55 """Validate that the input is a dict with string keys and values.
56 56
57 57 Raises ValueError if not."""
58 58 for k,v in iteritems(dct):
59 59 if not isinstance(k, string_types):
60 60 raise ValueError('key %r in dict must be a string' % k)
61 61 if not isinstance(v, string_types):
62 62 raise ValueError('value %r in dict must be a string' % v)
63 63
64 64
65 65 #-----------------------------------------------------------------------------
66 66 # ZMQ Socket Channel classes
67 67 #-----------------------------------------------------------------------------
68 68
69 69 class ZMQSocketChannel(Thread):
70 70 """The base class for the channels that use ZMQ sockets."""
71 71 context = None
72 72 session = None
73 73 socket = None
74 74 ioloop = None
75 75 stream = None
76 76 _address = None
77 77 _exiting = False
78 78 proxy_methods = []
79 79
80 80 def __init__(self, context, session, address):
81 81 """Create a channel.
82 82
83 83 Parameters
84 84 ----------
85 85 context : :class:`zmq.Context`
86 86 The ZMQ context to use.
87 87 session : :class:`session.Session`
88 88 The session to use.
89 89 address : zmq url
90 90 Standard (ip, port) tuple that the kernel is listening on.
91 91 """
92 92 super(ZMQSocketChannel, self).__init__()
93 93 self.daemon = True
94 94
95 95 self.context = context
96 96 self.session = session
97 97 if isinstance(address, tuple):
98 98 if address[1] == 0:
99 99 message = 'The port number for a channel cannot be 0.'
100 100 raise InvalidPortNumber(message)
101 101 address = "tcp://%s:%i" % address
102 102 self._address = address
103 103 atexit.register(self._notice_exit)
104 104
105 105 def _notice_exit(self):
106 106 self._exiting = True
107 107
108 108 def _run_loop(self):
109 109 """Run my loop, ignoring EINTR events in the poller"""
110 110 while True:
111 111 try:
112 112 self.ioloop.start()
113 113 except ZMQError as e:
114 114 if e.errno == errno.EINTR:
115 115 continue
116 116 else:
117 117 raise
118 118 except Exception:
119 119 if self._exiting:
120 120 break
121 121 else:
122 122 raise
123 123 else:
124 124 break
125 125
126 126 def stop(self):
127 127 """Stop the channel's event loop and join its thread.
128 128
129 129 This calls :meth:`~threading.Thread.join` and returns when the thread
130 130 terminates. :class:`RuntimeError` will be raised if
131 131 :meth:`~threading.Thread.start` is called again.
132 132 """
133 133 if self.ioloop is not None:
134 134 self.ioloop.stop()
135 135 self.join()
136 136 self.close()
137 137
138 138 def close(self):
139 139 if self.ioloop is not None:
140 140 try:
141 141 self.ioloop.close(all_fds=True)
142 142 except Exception:
143 143 pass
144 144 if self.socket is not None:
145 145 try:
146 146 self.socket.close(linger=0)
147 147 except Exception:
148 148 pass
149 149 self.socket = None
150 150
151 151 @property
152 152 def address(self):
153 153 """Get the channel's address as a zmq url string.
154 154
155 155 These URLS have the form: 'tcp://127.0.0.1:5555'.
156 156 """
157 157 return self._address
158 158
159 159 def _queue_send(self, msg):
160 160 """Queue a message to be sent from the IOLoop's thread.
161 161
162 162 Parameters
163 163 ----------
164 164 msg : message to send
165 165
166 166 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
167 167 thread control of the action.
168 168 """
169 169 def thread_send():
170 170 self.session.send(self.stream, msg)
171 171 self.ioloop.add_callback(thread_send)
172 172
173 173 def _handle_recv(self, msg):
174 174 """Callback for stream.on_recv.
175 175
176 176 Unpacks message, and calls handlers with it.
177 177 """
178 178 ident,smsg = self.session.feed_identities(msg)
179 179 msg = self.session.deserialize(smsg)
180 180 self.call_handlers(msg)
181 181
182 182
183 183 def make_shell_socket(context, identity, address):
184 184 socket = context.socket(zmq.DEALER)
185 185 socket.linger = 1000
186 186 socket.setsockopt(zmq.IDENTITY, identity)
187 187 socket.connect(address)
188 188 return socket
189 189
190 190 def make_iopub_socket(context, identity, address):
191 191 socket = context.socket(zmq.SUB)
192 192 socket.linger = 1000
193 193 socket.setsockopt(zmq.SUBSCRIBE,b'')
194 194 socket.setsockopt(zmq.IDENTITY, identity)
195 195 socket.connect(address)
196 196 return socket
197 197
198 198 def make_stdin_socket(context, identity, address):
199 199 socket = context.socket(zmq.DEALER)
200 200 socket.linger = 1000
201 201 socket.setsockopt(zmq.IDENTITY, identity)
202 202 socket.connect(address)
203 203 return socket
204 204
205 205 class HBChannel(ZMQSocketChannel):
206 206 """The heartbeat channel which monitors the kernel heartbeat.
207 207
208 208 Note that the heartbeat channel is paused by default. As long as you start
209 209 this channel, the kernel manager will ensure that it is paused and un-paused
210 210 as appropriate.
211 211 """
212 212
213 time_to_dead = 3.0
213 time_to_dead = 1.
214 214 socket = None
215 215 poller = None
216 216 _running = None
217 217 _pause = None
218 218 _beating = None
219 219
220 220 def __init__(self, context, session, address):
221 221 super(HBChannel, self).__init__(context, session, address)
222 222 self._running = False
223 223 self._pause =True
224 224 self.poller = zmq.Poller()
225 225
226 226 def _create_socket(self):
227 227 if self.socket is not None:
228 228 # close previous socket, before opening a new one
229 229 self.poller.unregister(self.socket)
230 230 self.socket.close()
231 231 self.socket = self.context.socket(zmq.REQ)
232 232 self.socket.linger = 1000
233 233 self.socket.connect(self.address)
234 234
235 235 self.poller.register(self.socket, zmq.POLLIN)
236 236
237 237 def _poll(self, start_time):
238 238 """poll for heartbeat replies until we reach self.time_to_dead.
239 239
240 240 Ignores interrupts, and returns the result of poll(), which
241 241 will be an empty list if no messages arrived before the timeout,
242 242 or the event tuple if there is a message to receive.
243 243 """
244 244
245 245 until_dead = self.time_to_dead - (time.time() - start_time)
246 246 # ensure poll at least once
247 247 until_dead = max(until_dead, 1e-3)
248 248 events = []
249 249 while True:
250 250 try:
251 251 events = self.poller.poll(1000 * until_dead)
252 252 except ZMQError as e:
253 253 if e.errno == errno.EINTR:
254 254 # ignore interrupts during heartbeat
255 255 # this may never actually happen
256 256 until_dead = self.time_to_dead - (time.time() - start_time)
257 257 until_dead = max(until_dead, 1e-3)
258 258 pass
259 259 else:
260 260 raise
261 261 except Exception:
262 262 if self._exiting:
263 263 break
264 264 else:
265 265 raise
266 266 else:
267 267 break
268 268 return events
269 269
270 270 def run(self):
271 271 """The thread's main activity. Call start() instead."""
272 272 self._create_socket()
273 273 self._running = True
274 274 self._beating = True
275 275
276 276 while self._running:
277 277 if self._pause:
278 278 # just sleep, and skip the rest of the loop
279 279 time.sleep(self.time_to_dead)
280 280 continue
281 281
282 282 since_last_heartbeat = 0.0
283 283 # io.rprint('Ping from HB channel') # dbg
284 284 # no need to catch EFSM here, because the previous event was
285 285 # either a recv or connect, which cannot be followed by EFSM
286 286 self.socket.send(b'ping')
287 287 request_time = time.time()
288 288 ready = self._poll(request_time)
289 289 if ready:
290 290 self._beating = True
291 291 # the poll above guarantees we have something to recv
292 292 self.socket.recv()
293 293 # sleep the remainder of the cycle
294 294 remainder = self.time_to_dead - (time.time() - request_time)
295 295 if remainder > 0:
296 296 time.sleep(remainder)
297 297 continue
298 298 else:
299 299 # nothing was received within the time limit, signal heart failure
300 300 self._beating = False
301 301 since_last_heartbeat = time.time() - request_time
302 302 self.call_handlers(since_last_heartbeat)
303 303 # and close/reopen the socket, because the REQ/REP cycle has been broken
304 304 self._create_socket()
305 305 continue
306 306
307 307 def pause(self):
308 308 """Pause the heartbeat."""
309 309 self._pause = True
310 310
311 311 def unpause(self):
312 312 """Unpause the heartbeat."""
313 313 self._pause = False
314 314
315 315 def is_beating(self):
316 316 """Is the heartbeat running and responsive (and not paused)."""
317 317 if self.is_alive() and not self._pause and self._beating:
318 318 return True
319 319 else:
320 320 return False
321 321
322 322 def stop(self):
323 323 """Stop the channel's event loop and join its thread."""
324 324 self._running = False
325 325 super(HBChannel, self).stop()
326 326
327 327 def call_handlers(self, since_last_heartbeat):
328 328 """This method is called in the ioloop thread when a message arrives.
329 329
330 330 Subclasses should override this method to handle incoming messages.
331 331 It is important to remember that this method is called in the thread
332 332 so that some logic must be done to ensure that the application level
333 333 handlers are called in the application thread.
334 334 """
335 raise NotImplementedError('call_handlers must be defined in a subclass.')
335 pass
336 336
337 337
338 338 HBChannelABC.register(HBChannel)
@@ -1,84 +1,95 b''
1 1 """A kernel client for in-process kernels."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from IPython.kernel.channelsabc import (
7 7 ShellChannelABC, IOPubChannelABC,
8 8 HBChannelABC, StdInChannelABC,
9 9 )
10 10
11 11 from .socket import DummySocket
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Channel classes
15 15 #-----------------------------------------------------------------------------
16 16
17 17 class InProcessChannel(object):
18 18 """Base class for in-process channels."""
19 19 proxy_methods = []
20 20
21 21 def __init__(self, client=None):
22 22 super(InProcessChannel, self).__init__()
23 23 self.client = client
24 24 self._is_alive = False
25 25
26 26 def is_alive(self):
27 27 return self._is_alive
28 28
29 29 def start(self):
30 30 self._is_alive = True
31 31
32 32 def stop(self):
33 33 self._is_alive = False
34 34
35 35 def call_handlers(self, msg):
36 36 """ This method is called in the main thread when a message arrives.
37 37
38 38 Subclasses should override this method to handle incoming messages.
39 39 """
40 40 raise NotImplementedError('call_handlers must be defined in a subclass.')
41 41
42 42 def flush(self, timeout=1.0):
43 43 pass
44 44
45 45
46 46 def call_handlers_later(self, *args, **kwds):
47 47 """ Call the message handlers later.
48 48
49 49 The default implementation just calls the handlers immediately, but this
50 50 method exists so that GUI toolkits can defer calling the handlers until
51 51 after the event loop has run, as expected by GUI frontends.
52 52 """
53 53 self.call_handlers(*args, **kwds)
54 54
55 55 def process_events(self):
56 56 """ Process any pending GUI events.
57 57
58 58 This method will be never be called from a frontend without an event
59 59 loop (e.g., a terminal frontend).
60 60 """
61 61 raise NotImplementedError
62 62
63 63
64 64
65 class InProcessHBChannel(InProcessChannel):
65 class InProcessHBChannel(object):
66 66 """See `IPython.kernel.channels.HBChannel` for docstrings."""
67 67
68 68 time_to_dead = 3.0
69 69
70 def __init__(self, *args, **kwds):
71 super(InProcessHBChannel, self).__init__(*args, **kwds)
70 def __init__(self, client=None):
71 super(InProcessHBChannel, self).__init__()
72 self.client = client
73 self._is_alive = False
72 74 self._pause = True
73 75
76 def is_alive(self):
77 return self._is_alive
78
79 def start(self):
80 self._is_alive = True
81
82 def stop(self):
83 self._is_alive = False
84
74 85 def pause(self):
75 86 self._pause = True
76 87
77 88 def unpause(self):
78 89 self._pause = False
79 90
80 91 def is_beating(self):
81 92 return not self._pause
82 93
83 94
84 95 HBChannelABC.register(InProcessHBChannel)
@@ -1,270 +1,280 b''
1 1 """ Defines a KernelClient that provides signals and slots.
2 2 """
3 3 import atexit
4 4 import errno
5 5 from threading import Thread
6 6 import time
7 7
8 8 import zmq
9 9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 10 # during garbage collection of threads at exit:
11 11 from zmq import ZMQError
12 12 from zmq.eventloop import ioloop, zmqstream
13 13
14 14 from IPython.external.qt import QtCore
15 15
16 16 # Local imports
17 17 from IPython.utils.traitlets import Type, Instance
18 18 from IPython.kernel.channels import HBChannel,\
19 19 make_shell_socket, make_iopub_socket, make_stdin_socket
20 20 from IPython.kernel import KernelClient
21 21
22 from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin)
22 from .kernel_mixins import QtKernelClientMixin
23 23 from .util import SuperQObject
24 24
25 class QtHBChannel(QtHBChannelMixin, HBChannel):
26 pass
25 class QtHBChannel(SuperQObject, HBChannel):
26 # A longer timeout than the base class
27 time_to_dead = 3.0
28
29 # Emitted when the kernel has died.
30 kernel_died = QtCore.Signal(object)
31
32 def call_handlers(self, since_last_heartbeat):
33 """ Reimplemented to emit signals instead of making callbacks.
34 """
35 # Emit the generic signal.
36 self.kernel_died.emit(since_last_heartbeat)
27 37
28 38 from IPython.core.release import kernel_protocol_version_info
29 39
30 40 from IPython.kernel.channelsabc import (
31 41 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
32 42 )
33 43 from IPython.utils.py3compat import string_types, iteritems
34 44
35 45 major_protocol_version = kernel_protocol_version_info[0]
36 46
37 47 class InvalidPortNumber(Exception):
38 48 pass
39 49
40 50 # some utilities to validate message structure, these might get moved elsewhere
41 51 # if they prove to have more generic utility
42 52
43 53
44 54 def validate_string_dict(dct):
45 55 """Validate that the input is a dict with string keys and values.
46 56
47 57 Raises ValueError if not."""
48 58 for k,v in iteritems(dct):
49 59 if not isinstance(k, string_types):
50 60 raise ValueError('key %r in dict must be a string' % k)
51 61 if not isinstance(v, string_types):
52 62 raise ValueError('value %r in dict must be a string' % v)
53 63
54 64
55 65
56 66 class QtZMQSocketChannel(SuperQObject):
57 67 """A ZMQ socket emitting a Qt signal when a message is received."""
58 68 session = None
59 69 socket = None
60 70 ioloop = None
61 71 stream = None
62 72
63 73 message_received = QtCore.Signal(object)
64 74
65 75 #---------------------------------------------------------------------------
66 76 # InProcessChannel interface
67 77 #---------------------------------------------------------------------------
68 78
69 79 def call_handlers_later(self, *args, **kwds):
70 80 """ Call the message handlers later.
71 81 """
72 82 do_later = lambda: self.call_handlers(*args, **kwds)
73 83 QtCore.QTimer.singleShot(0, do_later)
74 84
75 85 def process_events(self):
76 86 """ Process any pending GUI events.
77 87 """
78 88 QtCore.QCoreApplication.instance().processEvents()
79 89
80 90 def __init__(self, socket, session, loop):
81 91 """Create a channel.
82 92
83 93 Parameters
84 94 ----------
85 95 socket : :class:`zmq.Socket`
86 96 The ZMQ socket to use.
87 97 session : :class:`session.Session`
88 98 The session to use.
89 99 loop
90 100 A pyzmq ioloop to connect the socket to using a ZMQStream
91 101 """
92 102 super(QtZMQSocketChannel, self).__init__()
93 103
94 104 self.socket = socket
95 105 self.session = session
96 106 self.ioloop = loop
97 107
98 108 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
99 109 self.stream.on_recv(self._handle_recv)
100 110
101 111 _is_alive = False
102 112 def is_alive(self):
103 113 return self._is_alive
104 114
105 115 def start(self):
106 116 self._is_alive = True
107 117
108 118 def stop(self):
109 119 self._is_alive = False
110 120
111 121 def close(self):
112 122 if self.socket is not None:
113 123 try:
114 124 self.socket.close(linger=0)
115 125 except Exception:
116 126 pass
117 127 self.socket = None
118 128
119 129 def _queue_send(self, msg):
120 130 """Queue a message to be sent from the IOLoop's thread.
121 131
122 132 Parameters
123 133 ----------
124 134 msg : message to send
125 135
126 136 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
127 137 thread control of the action.
128 138 """
129 139 def thread_send():
130 140 self.session.send(self.stream, msg)
131 141 self.ioloop.add_callback(thread_send)
132 142
133 143 def _handle_recv(self, msg):
134 144 """Callback for stream.on_recv.
135 145
136 146 Unpacks message, and calls handlers with it.
137 147 """
138 148 ident,smsg = self.session.feed_identities(msg)
139 149 msg = self.session.deserialize(smsg)
140 150 self.call_handlers(msg)
141 151
142 152 def call_handlers(self, msg):
143 153 """This method is called in the ioloop thread when a message arrives.
144 154
145 155 Subclasses should override this method to handle incoming messages.
146 156 It is important to remember that this method is called in the thread
147 157 so that some logic must be done to ensure that the application level
148 158 handlers are called in the application thread.
149 159 """
150 160 # Emit the generic signal.
151 161 self.message_received.emit(msg)
152 162
153 163 def flush(self, timeout=1.0):
154 164 """Immediately processes all pending messages on this channel.
155 165
156 166 This is only used for the IOPub channel.
157 167
158 168 Callers should use this method to ensure that :meth:`call_handlers`
159 169 has been called for all messages that have been received on the
160 170 0MQ SUB socket of this channel.
161 171
162 172 This method is thread safe.
163 173
164 174 Parameters
165 175 ----------
166 176 timeout : float, optional
167 177 The maximum amount of time to spend flushing, in seconds. The
168 178 default is one second.
169 179 """
170 180 # We do the IOLoop callback process twice to ensure that the IOLoop
171 181 # gets to perform at least one full poll.
172 182 stop_time = time.time() + timeout
173 183 for i in range(2):
174 184 self._flushed = False
175 185 self.ioloop.add_callback(self._flush)
176 186 while not self._flushed and time.time() < stop_time:
177 187 time.sleep(0.01)
178 188
179 189 def _flush(self):
180 190 """Callback for :method:`self.flush`."""
181 191 self.stream.flush()
182 192 self._flushed = True
183 193
184 194
185 195 class IOLoopThread(Thread):
186 196 """Run a pyzmq ioloop in a thread to send and receive messages
187 197 """
188 198 def __init__(self, loop):
189 199 super(IOLoopThread, self).__init__()
190 200 self.daemon = True
191 201 atexit.register(self._notice_exit)
192 202 self.ioloop = loop or ioloop.IOLoop()
193 203
194 204 def _notice_exit(self):
195 205 self._exiting = True
196 206
197 207 def run(self):
198 208 """Run my loop, ignoring EINTR events in the poller"""
199 209 while True:
200 210 try:
201 211 self.ioloop.start()
202 212 except ZMQError as e:
203 213 if e.errno == errno.EINTR:
204 214 continue
205 215 else:
206 216 raise
207 217 except Exception:
208 218 if self._exiting:
209 219 break
210 220 else:
211 221 raise
212 222 else:
213 223 break
214 224
215 225 def stop(self):
216 226 """Stop the channel's event loop and join its thread.
217 227
218 228 This calls :meth:`~threading.Thread.join` and returns when the thread
219 229 terminates. :class:`RuntimeError` will be raised if
220 230 :meth:`~threading.Thread.start` is called again.
221 231 """
222 232 if self.ioloop is not None:
223 233 self.ioloop.stop()
224 234 self.join()
225 235 self.close()
226 236
227 237 def close(self):
228 238 if self.ioloop is not None:
229 239 try:
230 240 self.ioloop.close(all_fds=True)
231 241 except Exception:
232 242 pass
233 243
234 244
235 245 class QtKernelClient(QtKernelClientMixin, KernelClient):
236 246 """ A KernelClient that provides signals and slots.
237 247 """
238 248
239 249 _ioloop = None
240 250 @property
241 251 def ioloop(self):
242 252 if self._ioloop is None:
243 253 self._ioloop = ioloop.IOLoop()
244 254 return self._ioloop
245 255
246 256 ioloop_thread = Instance(IOLoopThread)
247 257
248 258 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
249 259 if shell:
250 260 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
251 261
252 262 self.ioloop_thread = IOLoopThread(self.ioloop)
253 263 self.ioloop_thread.start()
254 264
255 265 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
256 266
257 267 def _check_kernel_info_reply(self, msg):
258 268 if msg['msg_type'] == 'kernel_info_reply':
259 269 self._handle_kernel_info_reply(msg)
260 270 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
261 271
262 272 def stop_channels(self):
263 273 super(QtKernelClient, self).stop_channels()
264 274 if self.ioloop_thread.is_alive():
265 275 self.ioloop_thread.stop()
266 276
267 277 iopub_channel_class = Type(QtZMQSocketChannel)
268 278 shell_channel_class = Type(QtZMQSocketChannel)
269 279 stdin_channel_class = Type(QtZMQSocketChannel)
270 280 hb_channel_class = Type(QtHBChannel)
@@ -1,33 +1,30 b''
1 1 """ Defines an in-process KernelManager with signals and slots.
2 2 """
3 3
4 4 # Local imports.
5 5 from IPython.external.qt import QtCore
6 6 from IPython.kernel.inprocess import (
7 7 InProcessHBChannel, InProcessKernelClient, InProcessKernelManager,
8 8 )
9 9 from IPython.kernel.inprocess.channels import InProcessChannel
10 10
11 11 from IPython.utils.traitlets import Type
12 12 from .kernel_mixins import ( ChannelQObject,
13 QtHBChannelMixin, QtKernelClientMixin,
13 QtKernelClientMixin,
14 14 QtKernelManagerMixin,
15 15 )
16 16
17 17 class QtInProcessChannel(ChannelQObject, InProcessChannel):
18 18 pass
19 19
20 class QtInProcessHBChannel(QtHBChannelMixin, InProcessHBChannel):
21 pass
22
23 20 class QtInProcessKernelClient(QtKernelClientMixin, InProcessKernelClient):
24 21 """ An in-process KernelManager with signals and slots.
25 22 """
26 23
27 24 iopub_channel_class = Type(QtInProcessChannel)
28 25 shell_channel_class = Type(QtInProcessChannel)
29 26 stdin_channel_class = Type(QtInProcessChannel)
30 hb_channel_class = Type(QtInProcessHBChannel)
27 hb_channel_class = Type(InProcessHBChannel)
31 28
32 29 class QtInProcessKernelManager(QtKernelManagerMixin, InProcessKernelManager):
33 30 client_class = __module__ + '.QtInProcessKernelClient'
@@ -1,105 +1,94 b''
1 1 """Defines a KernelManager that provides signals and slots."""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from IPython.external.qt import QtCore
7 7
8 8 from IPython.utils.traitlets import HasTraits, Type
9 9 from .util import MetaQObjectHasTraits, SuperQObject
10 10
11 11
12 12 class ChannelQObject(SuperQObject):
13 13
14 14 # Emitted when the channel is started.
15 15 started = QtCore.Signal()
16 16
17 17 # Emitted when the channel is stopped.
18 18 stopped = QtCore.Signal()
19 19
20 20 # Emitted when any message is received.
21 21 message_received = QtCore.Signal(object)
22 22
23 23 def start(self):
24 24 """ Reimplemented to emit signal.
25 25 """
26 26 super(ChannelQObject, self).start()
27 27 self.started.emit()
28 28
29 29 def stop(self):
30 30 """ Reimplemented to emit signal.
31 31 """
32 32 super(ChannelQObject, self).stop()
33 33 self.stopped.emit()
34 34
35 35 def call_handlers_later(self, *args, **kwds):
36 36 """ Call the message handlers later.
37 37 """
38 38 do_later = lambda: self.call_handlers(*args, **kwds)
39 39 QtCore.QTimer.singleShot(0, do_later)
40 40
41 41 def call_handlers(self, msg):
42 42 self.message_received.emit(msg)
43 43
44 44 def process_events(self):
45 45 """ Process any pending GUI events.
46 46 """
47 47 QtCore.QCoreApplication.instance().processEvents()
48 48
49 49 def flush(self):
50 50 """ Reimplemented to ensure that signals are dispatched immediately.
51 51 """
52 52 super(ChannelQObject, self).flush()
53 53 self.process_events()
54 54
55 55
56 class QtHBChannelMixin(ChannelQObject):
57
58 # Emitted when the kernel has died.
59 kernel_died = QtCore.Signal(object)
60
61 def call_handlers(self, since_last_heartbeat):
62 """ Reimplemented to emit signals instead of making callbacks.
63 """
64 self.kernel_died.emit(since_last_heartbeat)
65
66
67 56 class QtKernelRestarterMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
68 57
69 58 _timer = None
70 59
71 60
72 61 class QtKernelManagerMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
73 62 """ A KernelClient that provides signals and slots.
74 63 """
75 64
76 65 kernel_restarted = QtCore.Signal()
77 66
78 67
79 68 class QtKernelClientMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
80 69 """ A KernelClient that provides signals and slots.
81 70 """
82 71
83 72 # Emitted when the kernel client has started listening.
84 73 started_channels = QtCore.Signal()
85 74
86 75 # Emitted when the kernel client has stopped listening.
87 76 stopped_channels = QtCore.Signal()
88 77
89 78 #---------------------------------------------------------------------------
90 79 # 'KernelClient' interface
91 80 #---------------------------------------------------------------------------
92 81
93 82 #------ Channel management -------------------------------------------------
94 83
95 84 def start_channels(self, *args, **kw):
96 85 """ Reimplemented to emit signal.
97 86 """
98 87 super(QtKernelClientMixin, self).start_channels(*args, **kw)
99 88 self.started_channels.emit()
100 89
101 90 def stop_channels(self):
102 91 """ Reimplemented to emit signal.
103 92 """
104 93 super(QtKernelClientMixin, self).stop_channels()
105 94 self.stopped_channels.emit()
General Comments 0
You need to be logged in to leave comments. Login now