##// END OF EJS Templates
Simplify HBChannel inheritance
Thomas Kluyver -
Show More
@@ -1,135 +1,118 b''
1 """Blocking channels
1 """Blocking channels
2
2
3 Useful for test suites and blocking terminal interfaces.
3 Useful for test suites and blocking terminal interfaces.
4 """
4 """
5
5
6 # Copyright (c) IPython Development Team.
6 # Copyright (c) IPython Development Team.
7 # Distributed under the terms of the Modified BSD License.
7 # Distributed under the terms of the Modified BSD License.
8
8
9 try:
9 try:
10 from queue import Queue, Empty # Py 3
10 from queue import Queue, Empty # Py 3
11 except ImportError:
11 except ImportError:
12 from Queue import Queue, Empty # Py 2
12 from Queue import Queue, Empty # Py 2
13
13
14 from IPython.kernel.channelsabc import ShellChannelABC, IOPubChannelABC, \
15 StdInChannelABC
16 from IPython.kernel.channels import HBChannel,\
17 make_iopub_socket, make_shell_socket, make_stdin_socket,\
18 InvalidPortNumber, major_protocol_version
19 from IPython.utils.py3compat import string_types, iteritems
14 from IPython.utils.py3compat import string_types, iteritems
20
15
21 # some utilities to validate message structure, these might get moved elsewhere
16 # some utilities to validate message structure, these might get moved elsewhere
22 # if they prove to have more generic utility
17 # if they prove to have more generic utility
23
18
24 def validate_string_list(lst):
19 def validate_string_list(lst):
25 """Validate that the input is a list of strings.
20 """Validate that the input is a list of strings.
26
21
27 Raises ValueError if not."""
22 Raises ValueError if not."""
28 if not isinstance(lst, list):
23 if not isinstance(lst, list):
29 raise ValueError('input %r must be a list' % lst)
24 raise ValueError('input %r must be a list' % lst)
30 for x in lst:
25 for x in lst:
31 if not isinstance(x, string_types):
26 if not isinstance(x, string_types):
32 raise ValueError('element %r in list must be a string' % x)
27 raise ValueError('element %r in list must be a string' % x)
33
28
34
29
35 def validate_string_dict(dct):
30 def validate_string_dict(dct):
36 """Validate that the input is a dict with string keys and values.
31 """Validate that the input is a dict with string keys and values.
37
32
38 Raises ValueError if not."""
33 Raises ValueError if not."""
39 for k,v in iteritems(dct):
34 for k,v in iteritems(dct):
40 if not isinstance(k, string_types):
35 if not isinstance(k, string_types):
41 raise ValueError('key %r in dict must be a string' % k)
36 raise ValueError('key %r in dict must be a string' % k)
42 if not isinstance(v, string_types):
37 if not isinstance(v, string_types):
43 raise ValueError('value %r in dict must be a string' % v)
38 raise ValueError('value %r in dict must be a string' % v)
44
39
45
40
46 class ZMQSocketChannel(object):
41 class ZMQSocketChannel(object):
47 """A ZMQ socket in a simple blocking API"""
42 """A ZMQ socket in a simple blocking API"""
48 session = None
43 session = None
49 socket = None
44 socket = None
50 stream = None
45 stream = None
51 _exiting = False
46 _exiting = False
52 proxy_methods = []
47 proxy_methods = []
53
48
54 def __init__(self, socket, session, loop=None):
49 def __init__(self, socket, session, loop=None):
55 """Create a channel.
50 """Create a channel.
56
51
57 Parameters
52 Parameters
58 ----------
53 ----------
59 socket : :class:`zmq.Socket`
54 socket : :class:`zmq.Socket`
60 The ZMQ socket to use.
55 The ZMQ socket to use.
61 session : :class:`session.Session`
56 session : :class:`session.Session`
62 The session to use.
57 The session to use.
63 loop
58 loop
64 Unused here, for other implementations
59 Unused here, for other implementations
65 """
60 """
66 super(ZMQSocketChannel, self).__init__()
61 super(ZMQSocketChannel, self).__init__()
67
62
68 self.socket = socket
63 self.socket = socket
69 self.session = session
64 self.session = session
70
65
71 def _recv(self, **kwargs):
66 def _recv(self, **kwargs):
72 msg = self.socket.recv_multipart(**kwargs)
67 msg = self.socket.recv_multipart(**kwargs)
73 ident,smsg = self.session.feed_identities(msg)
68 ident,smsg = self.session.feed_identities(msg)
74 return self.session.deserialize(smsg)
69 return self.session.deserialize(smsg)
75
70
76 def get_msg(self, block=True, timeout=None):
71 def get_msg(self, block=True, timeout=None):
77 """ Gets a message if there is one that is ready. """
72 """ Gets a message if there is one that is ready. """
78 if block:
73 if block:
79 if timeout is not None:
74 if timeout is not None:
80 timeout *= 1000 # seconds to ms
75 timeout *= 1000 # seconds to ms
81 ready = self.socket.poll(timeout)
76 ready = self.socket.poll(timeout)
82 else:
77 else:
83 ready = self.socket.poll(timeout=0)
78 ready = self.socket.poll(timeout=0)
84
79
85 if ready:
80 if ready:
86 return self._recv()
81 return self._recv()
87 else:
82 else:
88 raise Empty
83 raise Empty
89
84
90 def get_msgs(self):
85 def get_msgs(self):
91 """ Get all messages that are currently ready. """
86 """ Get all messages that are currently ready. """
92 msgs = []
87 msgs = []
93 while True:
88 while True:
94 try:
89 try:
95 msgs.append(self.get_msg(block=False))
90 msgs.append(self.get_msg(block=False))
96 except Empty:
91 except Empty:
97 break
92 break
98 return msgs
93 return msgs
99
94
100 def msg_ready(self):
95 def msg_ready(self):
101 """ Is there a message that has been received? """
96 """ Is there a message that has been received? """
102 return bool(self.socket.poll(timeout=0))
97 return bool(self.socket.poll(timeout=0))
103
98
104 def close(self):
99 def close(self):
105 if self.socket is not None:
100 if self.socket is not None:
106 try:
101 try:
107 self.socket.close(linger=0)
102 self.socket.close(linger=0)
108 except Exception:
103 except Exception:
109 pass
104 pass
110 self.socket = None
105 self.socket = None
111 stop = close
106 stop = close
112
107
113 def is_alive(self):
108 def is_alive(self):
114 return (self.socket is not None)
109 return (self.socket is not None)
115
110
116 def _queue_send(self, msg):
111 def _queue_send(self, msg):
117 """Pass a message to the ZMQ socket to send
112 """Pass a message to the ZMQ socket to send
118 """
113 """
119 self.session.send(self.socket, msg)
114 self.session.send(self.socket, msg)
120
115
121 def start(self):
116 def start(self):
122 pass
117 pass
123
118
124
125
126 class BlockingHBChannel(HBChannel):
127
128 # This kernel needs quicker monitoring, shorten to 1 sec.
129 # less than 0.5s is unreliable, and will get occasional
130 # false reports of missed beats.
131 time_to_dead = 1.
132
133 def call_handlers(self, since_last_heartbeat):
134 """ Pause beating on missed heartbeat. """
135 pass
@@ -1,38 +1,39 b''
1 """Implements a fully blocking kernel client.
1 """Implements a fully blocking kernel client.
2
2
3 Useful for test suites and blocking terminal interfaces.
3 Useful for test suites and blocking terminal interfaces.
4 """
4 """
5 # Copyright (c) IPython Development Team.
5 # Copyright (c) IPython Development Team.
6 # Distributed under the terms of the Modified BSD License.
6 # Distributed under the terms of the Modified BSD License.
7
7
8 try:
8 try:
9 from queue import Empty # Python 3
9 from queue import Empty # Python 3
10 except ImportError:
10 except ImportError:
11 from Queue import Empty # Python 2
11 from Queue import Empty # Python 2
12
12
13 from IPython.utils.traitlets import Type
13 from IPython.utils.traitlets import Type
14 from IPython.kernel.channels import HBChannel
14 from IPython.kernel.client import KernelClient
15 from IPython.kernel.client import KernelClient
15 from .channels import ZMQSocketChannel, BlockingHBChannel
16 from .channels import ZMQSocketChannel
16
17
17 class BlockingKernelClient(KernelClient):
18 class BlockingKernelClient(KernelClient):
18 def wait_for_ready(self):
19 def wait_for_ready(self):
19 # Wait for kernel info reply on shell channel
20 # Wait for kernel info reply on shell channel
20 while True:
21 while True:
21 msg = self.shell_channel.get_msg(block=True)
22 msg = self.shell_channel.get_msg(block=True)
22 if msg['msg_type'] == 'kernel_info_reply':
23 if msg['msg_type'] == 'kernel_info_reply':
23 self._handle_kernel_info_reply(msg)
24 self._handle_kernel_info_reply(msg)
24 break
25 break
25
26
26 # Flush IOPub channel
27 # Flush IOPub channel
27 while True:
28 while True:
28 try:
29 try:
29 msg = self.iopub_channel.get_msg(block=True, timeout=0.2)
30 msg = self.iopub_channel.get_msg(block=True, timeout=0.2)
30 print(msg['msg_type'])
31 print(msg['msg_type'])
31 except Empty:
32 except Empty:
32 break
33 break
33
34
34 # The classes to use for the various channels
35 # The classes to use for the various channels
35 shell_channel_class = Type(ZMQSocketChannel)
36 shell_channel_class = Type(ZMQSocketChannel)
36 iopub_channel_class = Type(ZMQSocketChannel)
37 iopub_channel_class = Type(ZMQSocketChannel)
37 stdin_channel_class = Type(ZMQSocketChannel)
38 stdin_channel_class = Type(ZMQSocketChannel)
38 hb_channel_class = Type(BlockingHBChannel)
39 hb_channel_class = Type(HBChannel)
@@ -1,338 +1,338 b''
1 """Base classes to manage a Client's interaction with a running kernel"""
1 """Base classes to manage a Client's interaction with a running kernel"""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from __future__ import absolute_import
6 from __future__ import absolute_import
7
7
8 import atexit
8 import atexit
9 import errno
9 import errno
10 from threading import Thread
10 from threading import Thread
11 import time
11 import time
12
12
13 import zmq
13 import zmq
14 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
14 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
15 # during garbage collection of threads at exit:
15 # during garbage collection of threads at exit:
16 from zmq import ZMQError
16 from zmq import ZMQError
17 from zmq.eventloop import ioloop, zmqstream
17 from zmq.eventloop import ioloop, zmqstream
18
18
19 from IPython.core.release import kernel_protocol_version_info
19 from IPython.core.release import kernel_protocol_version_info
20
20
21 from .channelsabc import (
21 from .channelsabc import (
22 ShellChannelABC, IOPubChannelABC,
22 ShellChannelABC, IOPubChannelABC,
23 HBChannelABC, StdInChannelABC,
23 HBChannelABC, StdInChannelABC,
24 )
24 )
25 from IPython.utils.py3compat import string_types, iteritems
25 from IPython.utils.py3compat import string_types, iteritems
26
26
27 #-----------------------------------------------------------------------------
27 #-----------------------------------------------------------------------------
28 # Constants and exceptions
28 # Constants and exceptions
29 #-----------------------------------------------------------------------------
29 #-----------------------------------------------------------------------------
30
30
31 major_protocol_version = kernel_protocol_version_info[0]
31 major_protocol_version = kernel_protocol_version_info[0]
32
32
33 class InvalidPortNumber(Exception):
33 class InvalidPortNumber(Exception):
34 pass
34 pass
35
35
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37 # Utility functions
37 # Utility functions
38 #-----------------------------------------------------------------------------
38 #-----------------------------------------------------------------------------
39
39
40 # some utilities to validate message structure, these might get moved elsewhere
40 # some utilities to validate message structure, these might get moved elsewhere
41 # if they prove to have more generic utility
41 # if they prove to have more generic utility
42
42
43 def validate_string_list(lst):
43 def validate_string_list(lst):
44 """Validate that the input is a list of strings.
44 """Validate that the input is a list of strings.
45
45
46 Raises ValueError if not."""
46 Raises ValueError if not."""
47 if not isinstance(lst, list):
47 if not isinstance(lst, list):
48 raise ValueError('input %r must be a list' % lst)
48 raise ValueError('input %r must be a list' % lst)
49 for x in lst:
49 for x in lst:
50 if not isinstance(x, string_types):
50 if not isinstance(x, string_types):
51 raise ValueError('element %r in list must be a string' % x)
51 raise ValueError('element %r in list must be a string' % x)
52
52
53
53
54 def validate_string_dict(dct):
54 def validate_string_dict(dct):
55 """Validate that the input is a dict with string keys and values.
55 """Validate that the input is a dict with string keys and values.
56
56
57 Raises ValueError if not."""
57 Raises ValueError if not."""
58 for k,v in iteritems(dct):
58 for k,v in iteritems(dct):
59 if not isinstance(k, string_types):
59 if not isinstance(k, string_types):
60 raise ValueError('key %r in dict must be a string' % k)
60 raise ValueError('key %r in dict must be a string' % k)
61 if not isinstance(v, string_types):
61 if not isinstance(v, string_types):
62 raise ValueError('value %r in dict must be a string' % v)
62 raise ValueError('value %r in dict must be a string' % v)
63
63
64
64
65 #-----------------------------------------------------------------------------
65 #-----------------------------------------------------------------------------
66 # ZMQ Socket Channel classes
66 # ZMQ Socket Channel classes
67 #-----------------------------------------------------------------------------
67 #-----------------------------------------------------------------------------
68
68
69 class ZMQSocketChannel(Thread):
69 class ZMQSocketChannel(Thread):
70 """The base class for the channels that use ZMQ sockets."""
70 """The base class for the channels that use ZMQ sockets."""
71 context = None
71 context = None
72 session = None
72 session = None
73 socket = None
73 socket = None
74 ioloop = None
74 ioloop = None
75 stream = None
75 stream = None
76 _address = None
76 _address = None
77 _exiting = False
77 _exiting = False
78 proxy_methods = []
78 proxy_methods = []
79
79
80 def __init__(self, context, session, address):
80 def __init__(self, context, session, address):
81 """Create a channel.
81 """Create a channel.
82
82
83 Parameters
83 Parameters
84 ----------
84 ----------
85 context : :class:`zmq.Context`
85 context : :class:`zmq.Context`
86 The ZMQ context to use.
86 The ZMQ context to use.
87 session : :class:`session.Session`
87 session : :class:`session.Session`
88 The session to use.
88 The session to use.
89 address : zmq url
89 address : zmq url
90 Standard (ip, port) tuple that the kernel is listening on.
90 Standard (ip, port) tuple that the kernel is listening on.
91 """
91 """
92 super(ZMQSocketChannel, self).__init__()
92 super(ZMQSocketChannel, self).__init__()
93 self.daemon = True
93 self.daemon = True
94
94
95 self.context = context
95 self.context = context
96 self.session = session
96 self.session = session
97 if isinstance(address, tuple):
97 if isinstance(address, tuple):
98 if address[1] == 0:
98 if address[1] == 0:
99 message = 'The port number for a channel cannot be 0.'
99 message = 'The port number for a channel cannot be 0.'
100 raise InvalidPortNumber(message)
100 raise InvalidPortNumber(message)
101 address = "tcp://%s:%i" % address
101 address = "tcp://%s:%i" % address
102 self._address = address
102 self._address = address
103 atexit.register(self._notice_exit)
103 atexit.register(self._notice_exit)
104
104
105 def _notice_exit(self):
105 def _notice_exit(self):
106 self._exiting = True
106 self._exiting = True
107
107
108 def _run_loop(self):
108 def _run_loop(self):
109 """Run my loop, ignoring EINTR events in the poller"""
109 """Run my loop, ignoring EINTR events in the poller"""
110 while True:
110 while True:
111 try:
111 try:
112 self.ioloop.start()
112 self.ioloop.start()
113 except ZMQError as e:
113 except ZMQError as e:
114 if e.errno == errno.EINTR:
114 if e.errno == errno.EINTR:
115 continue
115 continue
116 else:
116 else:
117 raise
117 raise
118 except Exception:
118 except Exception:
119 if self._exiting:
119 if self._exiting:
120 break
120 break
121 else:
121 else:
122 raise
122 raise
123 else:
123 else:
124 break
124 break
125
125
126 def stop(self):
126 def stop(self):
127 """Stop the channel's event loop and join its thread.
127 """Stop the channel's event loop and join its thread.
128
128
129 This calls :meth:`~threading.Thread.join` and returns when the thread
129 This calls :meth:`~threading.Thread.join` and returns when the thread
130 terminates. :class:`RuntimeError` will be raised if
130 terminates. :class:`RuntimeError` will be raised if
131 :meth:`~threading.Thread.start` is called again.
131 :meth:`~threading.Thread.start` is called again.
132 """
132 """
133 if self.ioloop is not None:
133 if self.ioloop is not None:
134 self.ioloop.stop()
134 self.ioloop.stop()
135 self.join()
135 self.join()
136 self.close()
136 self.close()
137
137
138 def close(self):
138 def close(self):
139 if self.ioloop is not None:
139 if self.ioloop is not None:
140 try:
140 try:
141 self.ioloop.close(all_fds=True)
141 self.ioloop.close(all_fds=True)
142 except Exception:
142 except Exception:
143 pass
143 pass
144 if self.socket is not None:
144 if self.socket is not None:
145 try:
145 try:
146 self.socket.close(linger=0)
146 self.socket.close(linger=0)
147 except Exception:
147 except Exception:
148 pass
148 pass
149 self.socket = None
149 self.socket = None
150
150
151 @property
151 @property
152 def address(self):
152 def address(self):
153 """Get the channel's address as a zmq url string.
153 """Get the channel's address as a zmq url string.
154
154
155 These URLS have the form: 'tcp://127.0.0.1:5555'.
155 These URLS have the form: 'tcp://127.0.0.1:5555'.
156 """
156 """
157 return self._address
157 return self._address
158
158
159 def _queue_send(self, msg):
159 def _queue_send(self, msg):
160 """Queue a message to be sent from the IOLoop's thread.
160 """Queue a message to be sent from the IOLoop's thread.
161
161
162 Parameters
162 Parameters
163 ----------
163 ----------
164 msg : message to send
164 msg : message to send
165
165
166 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
166 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
167 thread control of the action.
167 thread control of the action.
168 """
168 """
169 def thread_send():
169 def thread_send():
170 self.session.send(self.stream, msg)
170 self.session.send(self.stream, msg)
171 self.ioloop.add_callback(thread_send)
171 self.ioloop.add_callback(thread_send)
172
172
173 def _handle_recv(self, msg):
173 def _handle_recv(self, msg):
174 """Callback for stream.on_recv.
174 """Callback for stream.on_recv.
175
175
176 Unpacks message, and calls handlers with it.
176 Unpacks message, and calls handlers with it.
177 """
177 """
178 ident,smsg = self.session.feed_identities(msg)
178 ident,smsg = self.session.feed_identities(msg)
179 msg = self.session.deserialize(smsg)
179 msg = self.session.deserialize(smsg)
180 self.call_handlers(msg)
180 self.call_handlers(msg)
181
181
182
182
183 def make_shell_socket(context, identity, address):
183 def make_shell_socket(context, identity, address):
184 socket = context.socket(zmq.DEALER)
184 socket = context.socket(zmq.DEALER)
185 socket.linger = 1000
185 socket.linger = 1000
186 socket.setsockopt(zmq.IDENTITY, identity)
186 socket.setsockopt(zmq.IDENTITY, identity)
187 socket.connect(address)
187 socket.connect(address)
188 return socket
188 return socket
189
189
190 def make_iopub_socket(context, identity, address):
190 def make_iopub_socket(context, identity, address):
191 socket = context.socket(zmq.SUB)
191 socket = context.socket(zmq.SUB)
192 socket.linger = 1000
192 socket.linger = 1000
193 socket.setsockopt(zmq.SUBSCRIBE,b'')
193 socket.setsockopt(zmq.SUBSCRIBE,b'')
194 socket.setsockopt(zmq.IDENTITY, identity)
194 socket.setsockopt(zmq.IDENTITY, identity)
195 socket.connect(address)
195 socket.connect(address)
196 return socket
196 return socket
197
197
198 def make_stdin_socket(context, identity, address):
198 def make_stdin_socket(context, identity, address):
199 socket = context.socket(zmq.DEALER)
199 socket = context.socket(zmq.DEALER)
200 socket.linger = 1000
200 socket.linger = 1000
201 socket.setsockopt(zmq.IDENTITY, identity)
201 socket.setsockopt(zmq.IDENTITY, identity)
202 socket.connect(address)
202 socket.connect(address)
203 return socket
203 return socket
204
204
205 class HBChannel(ZMQSocketChannel):
205 class HBChannel(ZMQSocketChannel):
206 """The heartbeat channel which monitors the kernel heartbeat.
206 """The heartbeat channel which monitors the kernel heartbeat.
207
207
208 Note that the heartbeat channel is paused by default. As long as you start
208 Note that the heartbeat channel is paused by default. As long as you start
209 this channel, the kernel manager will ensure that it is paused and un-paused
209 this channel, the kernel manager will ensure that it is paused and un-paused
210 as appropriate.
210 as appropriate.
211 """
211 """
212
212
213 time_to_dead = 3.0
213 time_to_dead = 1.
214 socket = None
214 socket = None
215 poller = None
215 poller = None
216 _running = None
216 _running = None
217 _pause = None
217 _pause = None
218 _beating = None
218 _beating = None
219
219
220 def __init__(self, context, session, address):
220 def __init__(self, context, session, address):
221 super(HBChannel, self).__init__(context, session, address)
221 super(HBChannel, self).__init__(context, session, address)
222 self._running = False
222 self._running = False
223 self._pause =True
223 self._pause =True
224 self.poller = zmq.Poller()
224 self.poller = zmq.Poller()
225
225
226 def _create_socket(self):
226 def _create_socket(self):
227 if self.socket is not None:
227 if self.socket is not None:
228 # close previous socket, before opening a new one
228 # close previous socket, before opening a new one
229 self.poller.unregister(self.socket)
229 self.poller.unregister(self.socket)
230 self.socket.close()
230 self.socket.close()
231 self.socket = self.context.socket(zmq.REQ)
231 self.socket = self.context.socket(zmq.REQ)
232 self.socket.linger = 1000
232 self.socket.linger = 1000
233 self.socket.connect(self.address)
233 self.socket.connect(self.address)
234
234
235 self.poller.register(self.socket, zmq.POLLIN)
235 self.poller.register(self.socket, zmq.POLLIN)
236
236
237 def _poll(self, start_time):
237 def _poll(self, start_time):
238 """poll for heartbeat replies until we reach self.time_to_dead.
238 """poll for heartbeat replies until we reach self.time_to_dead.
239
239
240 Ignores interrupts, and returns the result of poll(), which
240 Ignores interrupts, and returns the result of poll(), which
241 will be an empty list if no messages arrived before the timeout,
241 will be an empty list if no messages arrived before the timeout,
242 or the event tuple if there is a message to receive.
242 or the event tuple if there is a message to receive.
243 """
243 """
244
244
245 until_dead = self.time_to_dead - (time.time() - start_time)
245 until_dead = self.time_to_dead - (time.time() - start_time)
246 # ensure poll at least once
246 # ensure poll at least once
247 until_dead = max(until_dead, 1e-3)
247 until_dead = max(until_dead, 1e-3)
248 events = []
248 events = []
249 while True:
249 while True:
250 try:
250 try:
251 events = self.poller.poll(1000 * until_dead)
251 events = self.poller.poll(1000 * until_dead)
252 except ZMQError as e:
252 except ZMQError as e:
253 if e.errno == errno.EINTR:
253 if e.errno == errno.EINTR:
254 # ignore interrupts during heartbeat
254 # ignore interrupts during heartbeat
255 # this may never actually happen
255 # this may never actually happen
256 until_dead = self.time_to_dead - (time.time() - start_time)
256 until_dead = self.time_to_dead - (time.time() - start_time)
257 until_dead = max(until_dead, 1e-3)
257 until_dead = max(until_dead, 1e-3)
258 pass
258 pass
259 else:
259 else:
260 raise
260 raise
261 except Exception:
261 except Exception:
262 if self._exiting:
262 if self._exiting:
263 break
263 break
264 else:
264 else:
265 raise
265 raise
266 else:
266 else:
267 break
267 break
268 return events
268 return events
269
269
270 def run(self):
270 def run(self):
271 """The thread's main activity. Call start() instead."""
271 """The thread's main activity. Call start() instead."""
272 self._create_socket()
272 self._create_socket()
273 self._running = True
273 self._running = True
274 self._beating = True
274 self._beating = True
275
275
276 while self._running:
276 while self._running:
277 if self._pause:
277 if self._pause:
278 # just sleep, and skip the rest of the loop
278 # just sleep, and skip the rest of the loop
279 time.sleep(self.time_to_dead)
279 time.sleep(self.time_to_dead)
280 continue
280 continue
281
281
282 since_last_heartbeat = 0.0
282 since_last_heartbeat = 0.0
283 # io.rprint('Ping from HB channel') # dbg
283 # io.rprint('Ping from HB channel') # dbg
284 # no need to catch EFSM here, because the previous event was
284 # no need to catch EFSM here, because the previous event was
285 # either a recv or connect, which cannot be followed by EFSM
285 # either a recv or connect, which cannot be followed by EFSM
286 self.socket.send(b'ping')
286 self.socket.send(b'ping')
287 request_time = time.time()
287 request_time = time.time()
288 ready = self._poll(request_time)
288 ready = self._poll(request_time)
289 if ready:
289 if ready:
290 self._beating = True
290 self._beating = True
291 # the poll above guarantees we have something to recv
291 # the poll above guarantees we have something to recv
292 self.socket.recv()
292 self.socket.recv()
293 # sleep the remainder of the cycle
293 # sleep the remainder of the cycle
294 remainder = self.time_to_dead - (time.time() - request_time)
294 remainder = self.time_to_dead - (time.time() - request_time)
295 if remainder > 0:
295 if remainder > 0:
296 time.sleep(remainder)
296 time.sleep(remainder)
297 continue
297 continue
298 else:
298 else:
299 # nothing was received within the time limit, signal heart failure
299 # nothing was received within the time limit, signal heart failure
300 self._beating = False
300 self._beating = False
301 since_last_heartbeat = time.time() - request_time
301 since_last_heartbeat = time.time() - request_time
302 self.call_handlers(since_last_heartbeat)
302 self.call_handlers(since_last_heartbeat)
303 # and close/reopen the socket, because the REQ/REP cycle has been broken
303 # and close/reopen the socket, because the REQ/REP cycle has been broken
304 self._create_socket()
304 self._create_socket()
305 continue
305 continue
306
306
307 def pause(self):
307 def pause(self):
308 """Pause the heartbeat."""
308 """Pause the heartbeat."""
309 self._pause = True
309 self._pause = True
310
310
311 def unpause(self):
311 def unpause(self):
312 """Unpause the heartbeat."""
312 """Unpause the heartbeat."""
313 self._pause = False
313 self._pause = False
314
314
315 def is_beating(self):
315 def is_beating(self):
316 """Is the heartbeat running and responsive (and not paused)."""
316 """Is the heartbeat running and responsive (and not paused)."""
317 if self.is_alive() and not self._pause and self._beating:
317 if self.is_alive() and not self._pause and self._beating:
318 return True
318 return True
319 else:
319 else:
320 return False
320 return False
321
321
322 def stop(self):
322 def stop(self):
323 """Stop the channel's event loop and join its thread."""
323 """Stop the channel's event loop and join its thread."""
324 self._running = False
324 self._running = False
325 super(HBChannel, self).stop()
325 super(HBChannel, self).stop()
326
326
327 def call_handlers(self, since_last_heartbeat):
327 def call_handlers(self, since_last_heartbeat):
328 """This method is called in the ioloop thread when a message arrives.
328 """This method is called in the ioloop thread when a message arrives.
329
329
330 Subclasses should override this method to handle incoming messages.
330 Subclasses should override this method to handle incoming messages.
331 It is important to remember that this method is called in the thread
331 It is important to remember that this method is called in the thread
332 so that some logic must be done to ensure that the application level
332 so that some logic must be done to ensure that the application level
333 handlers are called in the application thread.
333 handlers are called in the application thread.
334 """
334 """
335 raise NotImplementedError('call_handlers must be defined in a subclass.')
335 pass
336
336
337
337
338 HBChannelABC.register(HBChannel)
338 HBChannelABC.register(HBChannel)
@@ -1,84 +1,95 b''
1 """A kernel client for in-process kernels."""
1 """A kernel client for in-process kernels."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from IPython.kernel.channelsabc import (
6 from IPython.kernel.channelsabc import (
7 ShellChannelABC, IOPubChannelABC,
7 ShellChannelABC, IOPubChannelABC,
8 HBChannelABC, StdInChannelABC,
8 HBChannelABC, StdInChannelABC,
9 )
9 )
10
10
11 from .socket import DummySocket
11 from .socket import DummySocket
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Channel classes
14 # Channel classes
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 class InProcessChannel(object):
17 class InProcessChannel(object):
18 """Base class for in-process channels."""
18 """Base class for in-process channels."""
19 proxy_methods = []
19 proxy_methods = []
20
20
21 def __init__(self, client=None):
21 def __init__(self, client=None):
22 super(InProcessChannel, self).__init__()
22 super(InProcessChannel, self).__init__()
23 self.client = client
23 self.client = client
24 self._is_alive = False
24 self._is_alive = False
25
25
26 def is_alive(self):
26 def is_alive(self):
27 return self._is_alive
27 return self._is_alive
28
28
29 def start(self):
29 def start(self):
30 self._is_alive = True
30 self._is_alive = True
31
31
32 def stop(self):
32 def stop(self):
33 self._is_alive = False
33 self._is_alive = False
34
34
35 def call_handlers(self, msg):
35 def call_handlers(self, msg):
36 """ This method is called in the main thread when a message arrives.
36 """ This method is called in the main thread when a message arrives.
37
37
38 Subclasses should override this method to handle incoming messages.
38 Subclasses should override this method to handle incoming messages.
39 """
39 """
40 raise NotImplementedError('call_handlers must be defined in a subclass.')
40 raise NotImplementedError('call_handlers must be defined in a subclass.')
41
41
42 def flush(self, timeout=1.0):
42 def flush(self, timeout=1.0):
43 pass
43 pass
44
44
45
45
46 def call_handlers_later(self, *args, **kwds):
46 def call_handlers_later(self, *args, **kwds):
47 """ Call the message handlers later.
47 """ Call the message handlers later.
48
48
49 The default implementation just calls the handlers immediately, but this
49 The default implementation just calls the handlers immediately, but this
50 method exists so that GUI toolkits can defer calling the handlers until
50 method exists so that GUI toolkits can defer calling the handlers until
51 after the event loop has run, as expected by GUI frontends.
51 after the event loop has run, as expected by GUI frontends.
52 """
52 """
53 self.call_handlers(*args, **kwds)
53 self.call_handlers(*args, **kwds)
54
54
55 def process_events(self):
55 def process_events(self):
56 """ Process any pending GUI events.
56 """ Process any pending GUI events.
57
57
58 This method will be never be called from a frontend without an event
58 This method will be never be called from a frontend without an event
59 loop (e.g., a terminal frontend).
59 loop (e.g., a terminal frontend).
60 """
60 """
61 raise NotImplementedError
61 raise NotImplementedError
62
62
63
63
64
64
65 class InProcessHBChannel(InProcessChannel):
65 class InProcessHBChannel(object):
66 """See `IPython.kernel.channels.HBChannel` for docstrings."""
66 """See `IPython.kernel.channels.HBChannel` for docstrings."""
67
67
68 time_to_dead = 3.0
68 time_to_dead = 3.0
69
69
70 def __init__(self, *args, **kwds):
70 def __init__(self, client=None):
71 super(InProcessHBChannel, self).__init__(*args, **kwds)
71 super(InProcessHBChannel, self).__init__()
72 self.client = client
73 self._is_alive = False
72 self._pause = True
74 self._pause = True
73
75
76 def is_alive(self):
77 return self._is_alive
78
79 def start(self):
80 self._is_alive = True
81
82 def stop(self):
83 self._is_alive = False
84
74 def pause(self):
85 def pause(self):
75 self._pause = True
86 self._pause = True
76
87
77 def unpause(self):
88 def unpause(self):
78 self._pause = False
89 self._pause = False
79
90
80 def is_beating(self):
91 def is_beating(self):
81 return not self._pause
92 return not self._pause
82
93
83
94
84 HBChannelABC.register(InProcessHBChannel)
95 HBChannelABC.register(InProcessHBChannel)
@@ -1,270 +1,280 b''
1 """ Defines a KernelClient that provides signals and slots.
1 """ Defines a KernelClient that provides signals and slots.
2 """
2 """
3 import atexit
3 import atexit
4 import errno
4 import errno
5 from threading import Thread
5 from threading import Thread
6 import time
6 import time
7
7
8 import zmq
8 import zmq
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 # during garbage collection of threads at exit:
10 # during garbage collection of threads at exit:
11 from zmq import ZMQError
11 from zmq import ZMQError
12 from zmq.eventloop import ioloop, zmqstream
12 from zmq.eventloop import ioloop, zmqstream
13
13
14 from IPython.external.qt import QtCore
14 from IPython.external.qt import QtCore
15
15
16 # Local imports
16 # Local imports
17 from IPython.utils.traitlets import Type, Instance
17 from IPython.utils.traitlets import Type, Instance
18 from IPython.kernel.channels import HBChannel,\
18 from IPython.kernel.channels import HBChannel,\
19 make_shell_socket, make_iopub_socket, make_stdin_socket
19 make_shell_socket, make_iopub_socket, make_stdin_socket
20 from IPython.kernel import KernelClient
20 from IPython.kernel import KernelClient
21
21
22 from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin)
22 from .kernel_mixins import QtKernelClientMixin
23 from .util import SuperQObject
23 from .util import SuperQObject
24
24
25 class QtHBChannel(QtHBChannelMixin, HBChannel):
25 class QtHBChannel(SuperQObject, HBChannel):
26 pass
26 # A longer timeout than the base class
27 time_to_dead = 3.0
28
29 # Emitted when the kernel has died.
30 kernel_died = QtCore.Signal(object)
31
32 def call_handlers(self, since_last_heartbeat):
33 """ Reimplemented to emit signals instead of making callbacks.
34 """
35 # Emit the generic signal.
36 self.kernel_died.emit(since_last_heartbeat)
27
37
28 from IPython.core.release import kernel_protocol_version_info
38 from IPython.core.release import kernel_protocol_version_info
29
39
30 from IPython.kernel.channelsabc import (
40 from IPython.kernel.channelsabc import (
31 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
41 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
32 )
42 )
33 from IPython.utils.py3compat import string_types, iteritems
43 from IPython.utils.py3compat import string_types, iteritems
34
44
35 major_protocol_version = kernel_protocol_version_info[0]
45 major_protocol_version = kernel_protocol_version_info[0]
36
46
37 class InvalidPortNumber(Exception):
47 class InvalidPortNumber(Exception):
38 pass
48 pass
39
49
40 # some utilities to validate message structure, these might get moved elsewhere
50 # some utilities to validate message structure, these might get moved elsewhere
41 # if they prove to have more generic utility
51 # if they prove to have more generic utility
42
52
43
53
44 def validate_string_dict(dct):
54 def validate_string_dict(dct):
45 """Validate that the input is a dict with string keys and values.
55 """Validate that the input is a dict with string keys and values.
46
56
47 Raises ValueError if not."""
57 Raises ValueError if not."""
48 for k,v in iteritems(dct):
58 for k,v in iteritems(dct):
49 if not isinstance(k, string_types):
59 if not isinstance(k, string_types):
50 raise ValueError('key %r in dict must be a string' % k)
60 raise ValueError('key %r in dict must be a string' % k)
51 if not isinstance(v, string_types):
61 if not isinstance(v, string_types):
52 raise ValueError('value %r in dict must be a string' % v)
62 raise ValueError('value %r in dict must be a string' % v)
53
63
54
64
55
65
56 class QtZMQSocketChannel(SuperQObject):
66 class QtZMQSocketChannel(SuperQObject):
57 """A ZMQ socket emitting a Qt signal when a message is received."""
67 """A ZMQ socket emitting a Qt signal when a message is received."""
58 session = None
68 session = None
59 socket = None
69 socket = None
60 ioloop = None
70 ioloop = None
61 stream = None
71 stream = None
62
72
63 message_received = QtCore.Signal(object)
73 message_received = QtCore.Signal(object)
64
74
65 #---------------------------------------------------------------------------
75 #---------------------------------------------------------------------------
66 # InProcessChannel interface
76 # InProcessChannel interface
67 #---------------------------------------------------------------------------
77 #---------------------------------------------------------------------------
68
78
69 def call_handlers_later(self, *args, **kwds):
79 def call_handlers_later(self, *args, **kwds):
70 """ Call the message handlers later.
80 """ Call the message handlers later.
71 """
81 """
72 do_later = lambda: self.call_handlers(*args, **kwds)
82 do_later = lambda: self.call_handlers(*args, **kwds)
73 QtCore.QTimer.singleShot(0, do_later)
83 QtCore.QTimer.singleShot(0, do_later)
74
84
75 def process_events(self):
85 def process_events(self):
76 """ Process any pending GUI events.
86 """ Process any pending GUI events.
77 """
87 """
78 QtCore.QCoreApplication.instance().processEvents()
88 QtCore.QCoreApplication.instance().processEvents()
79
89
80 def __init__(self, socket, session, loop):
90 def __init__(self, socket, session, loop):
81 """Create a channel.
91 """Create a channel.
82
92
83 Parameters
93 Parameters
84 ----------
94 ----------
85 socket : :class:`zmq.Socket`
95 socket : :class:`zmq.Socket`
86 The ZMQ socket to use.
96 The ZMQ socket to use.
87 session : :class:`session.Session`
97 session : :class:`session.Session`
88 The session to use.
98 The session to use.
89 loop
99 loop
90 A pyzmq ioloop to connect the socket to using a ZMQStream
100 A pyzmq ioloop to connect the socket to using a ZMQStream
91 """
101 """
92 super(QtZMQSocketChannel, self).__init__()
102 super(QtZMQSocketChannel, self).__init__()
93
103
94 self.socket = socket
104 self.socket = socket
95 self.session = session
105 self.session = session
96 self.ioloop = loop
106 self.ioloop = loop
97
107
98 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
108 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
99 self.stream.on_recv(self._handle_recv)
109 self.stream.on_recv(self._handle_recv)
100
110
101 _is_alive = False
111 _is_alive = False
102 def is_alive(self):
112 def is_alive(self):
103 return self._is_alive
113 return self._is_alive
104
114
105 def start(self):
115 def start(self):
106 self._is_alive = True
116 self._is_alive = True
107
117
108 def stop(self):
118 def stop(self):
109 self._is_alive = False
119 self._is_alive = False
110
120
111 def close(self):
121 def close(self):
112 if self.socket is not None:
122 if self.socket is not None:
113 try:
123 try:
114 self.socket.close(linger=0)
124 self.socket.close(linger=0)
115 except Exception:
125 except Exception:
116 pass
126 pass
117 self.socket = None
127 self.socket = None
118
128
119 def _queue_send(self, msg):
129 def _queue_send(self, msg):
120 """Queue a message to be sent from the IOLoop's thread.
130 """Queue a message to be sent from the IOLoop's thread.
121
131
122 Parameters
132 Parameters
123 ----------
133 ----------
124 msg : message to send
134 msg : message to send
125
135
126 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
136 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
127 thread control of the action.
137 thread control of the action.
128 """
138 """
129 def thread_send():
139 def thread_send():
130 self.session.send(self.stream, msg)
140 self.session.send(self.stream, msg)
131 self.ioloop.add_callback(thread_send)
141 self.ioloop.add_callback(thread_send)
132
142
133 def _handle_recv(self, msg):
143 def _handle_recv(self, msg):
134 """Callback for stream.on_recv.
144 """Callback for stream.on_recv.
135
145
136 Unpacks message, and calls handlers with it.
146 Unpacks message, and calls handlers with it.
137 """
147 """
138 ident,smsg = self.session.feed_identities(msg)
148 ident,smsg = self.session.feed_identities(msg)
139 msg = self.session.deserialize(smsg)
149 msg = self.session.deserialize(smsg)
140 self.call_handlers(msg)
150 self.call_handlers(msg)
141
151
142 def call_handlers(self, msg):
152 def call_handlers(self, msg):
143 """This method is called in the ioloop thread when a message arrives.
153 """This method is called in the ioloop thread when a message arrives.
144
154
145 Subclasses should override this method to handle incoming messages.
155 Subclasses should override this method to handle incoming messages.
146 It is important to remember that this method is called in the thread
156 It is important to remember that this method is called in the thread
147 so that some logic must be done to ensure that the application level
157 so that some logic must be done to ensure that the application level
148 handlers are called in the application thread.
158 handlers are called in the application thread.
149 """
159 """
150 # Emit the generic signal.
160 # Emit the generic signal.
151 self.message_received.emit(msg)
161 self.message_received.emit(msg)
152
162
153 def flush(self, timeout=1.0):
163 def flush(self, timeout=1.0):
154 """Immediately processes all pending messages on this channel.
164 """Immediately processes all pending messages on this channel.
155
165
156 This is only used for the IOPub channel.
166 This is only used for the IOPub channel.
157
167
158 Callers should use this method to ensure that :meth:`call_handlers`
168 Callers should use this method to ensure that :meth:`call_handlers`
159 has been called for all messages that have been received on the
169 has been called for all messages that have been received on the
160 0MQ SUB socket of this channel.
170 0MQ SUB socket of this channel.
161
171
162 This method is thread safe.
172 This method is thread safe.
163
173
164 Parameters
174 Parameters
165 ----------
175 ----------
166 timeout : float, optional
176 timeout : float, optional
167 The maximum amount of time to spend flushing, in seconds. The
177 The maximum amount of time to spend flushing, in seconds. The
168 default is one second.
178 default is one second.
169 """
179 """
170 # We do the IOLoop callback process twice to ensure that the IOLoop
180 # We do the IOLoop callback process twice to ensure that the IOLoop
171 # gets to perform at least one full poll.
181 # gets to perform at least one full poll.
172 stop_time = time.time() + timeout
182 stop_time = time.time() + timeout
173 for i in range(2):
183 for i in range(2):
174 self._flushed = False
184 self._flushed = False
175 self.ioloop.add_callback(self._flush)
185 self.ioloop.add_callback(self._flush)
176 while not self._flushed and time.time() < stop_time:
186 while not self._flushed and time.time() < stop_time:
177 time.sleep(0.01)
187 time.sleep(0.01)
178
188
179 def _flush(self):
189 def _flush(self):
180 """Callback for :method:`self.flush`."""
190 """Callback for :method:`self.flush`."""
181 self.stream.flush()
191 self.stream.flush()
182 self._flushed = True
192 self._flushed = True
183
193
184
194
185 class IOLoopThread(Thread):
195 class IOLoopThread(Thread):
186 """Run a pyzmq ioloop in a thread to send and receive messages
196 """Run a pyzmq ioloop in a thread to send and receive messages
187 """
197 """
188 def __init__(self, loop):
198 def __init__(self, loop):
189 super(IOLoopThread, self).__init__()
199 super(IOLoopThread, self).__init__()
190 self.daemon = True
200 self.daemon = True
191 atexit.register(self._notice_exit)
201 atexit.register(self._notice_exit)
192 self.ioloop = loop or ioloop.IOLoop()
202 self.ioloop = loop or ioloop.IOLoop()
193
203
194 def _notice_exit(self):
204 def _notice_exit(self):
195 self._exiting = True
205 self._exiting = True
196
206
197 def run(self):
207 def run(self):
198 """Run my loop, ignoring EINTR events in the poller"""
208 """Run my loop, ignoring EINTR events in the poller"""
199 while True:
209 while True:
200 try:
210 try:
201 self.ioloop.start()
211 self.ioloop.start()
202 except ZMQError as e:
212 except ZMQError as e:
203 if e.errno == errno.EINTR:
213 if e.errno == errno.EINTR:
204 continue
214 continue
205 else:
215 else:
206 raise
216 raise
207 except Exception:
217 except Exception:
208 if self._exiting:
218 if self._exiting:
209 break
219 break
210 else:
220 else:
211 raise
221 raise
212 else:
222 else:
213 break
223 break
214
224
215 def stop(self):
225 def stop(self):
216 """Stop the channel's event loop and join its thread.
226 """Stop the channel's event loop and join its thread.
217
227
218 This calls :meth:`~threading.Thread.join` and returns when the thread
228 This calls :meth:`~threading.Thread.join` and returns when the thread
219 terminates. :class:`RuntimeError` will be raised if
229 terminates. :class:`RuntimeError` will be raised if
220 :meth:`~threading.Thread.start` is called again.
230 :meth:`~threading.Thread.start` is called again.
221 """
231 """
222 if self.ioloop is not None:
232 if self.ioloop is not None:
223 self.ioloop.stop()
233 self.ioloop.stop()
224 self.join()
234 self.join()
225 self.close()
235 self.close()
226
236
227 def close(self):
237 def close(self):
228 if self.ioloop is not None:
238 if self.ioloop is not None:
229 try:
239 try:
230 self.ioloop.close(all_fds=True)
240 self.ioloop.close(all_fds=True)
231 except Exception:
241 except Exception:
232 pass
242 pass
233
243
234
244
235 class QtKernelClient(QtKernelClientMixin, KernelClient):
245 class QtKernelClient(QtKernelClientMixin, KernelClient):
236 """ A KernelClient that provides signals and slots.
246 """ A KernelClient that provides signals and slots.
237 """
247 """
238
248
239 _ioloop = None
249 _ioloop = None
240 @property
250 @property
241 def ioloop(self):
251 def ioloop(self):
242 if self._ioloop is None:
252 if self._ioloop is None:
243 self._ioloop = ioloop.IOLoop()
253 self._ioloop = ioloop.IOLoop()
244 return self._ioloop
254 return self._ioloop
245
255
246 ioloop_thread = Instance(IOLoopThread)
256 ioloop_thread = Instance(IOLoopThread)
247
257
248 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
258 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
249 if shell:
259 if shell:
250 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
260 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
251
261
252 self.ioloop_thread = IOLoopThread(self.ioloop)
262 self.ioloop_thread = IOLoopThread(self.ioloop)
253 self.ioloop_thread.start()
263 self.ioloop_thread.start()
254
264
255 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
265 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
256
266
257 def _check_kernel_info_reply(self, msg):
267 def _check_kernel_info_reply(self, msg):
258 if msg['msg_type'] == 'kernel_info_reply':
268 if msg['msg_type'] == 'kernel_info_reply':
259 self._handle_kernel_info_reply(msg)
269 self._handle_kernel_info_reply(msg)
260 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
270 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
261
271
262 def stop_channels(self):
272 def stop_channels(self):
263 super(QtKernelClient, self).stop_channels()
273 super(QtKernelClient, self).stop_channels()
264 if self.ioloop_thread.is_alive():
274 if self.ioloop_thread.is_alive():
265 self.ioloop_thread.stop()
275 self.ioloop_thread.stop()
266
276
267 iopub_channel_class = Type(QtZMQSocketChannel)
277 iopub_channel_class = Type(QtZMQSocketChannel)
268 shell_channel_class = Type(QtZMQSocketChannel)
278 shell_channel_class = Type(QtZMQSocketChannel)
269 stdin_channel_class = Type(QtZMQSocketChannel)
279 stdin_channel_class = Type(QtZMQSocketChannel)
270 hb_channel_class = Type(QtHBChannel)
280 hb_channel_class = Type(QtHBChannel)
@@ -1,33 +1,30 b''
1 """ Defines an in-process KernelManager with signals and slots.
1 """ Defines an in-process KernelManager with signals and slots.
2 """
2 """
3
3
4 # Local imports.
4 # Local imports.
5 from IPython.external.qt import QtCore
5 from IPython.external.qt import QtCore
6 from IPython.kernel.inprocess import (
6 from IPython.kernel.inprocess import (
7 InProcessHBChannel, InProcessKernelClient, InProcessKernelManager,
7 InProcessHBChannel, InProcessKernelClient, InProcessKernelManager,
8 )
8 )
9 from IPython.kernel.inprocess.channels import InProcessChannel
9 from IPython.kernel.inprocess.channels import InProcessChannel
10
10
11 from IPython.utils.traitlets import Type
11 from IPython.utils.traitlets import Type
12 from .kernel_mixins import ( ChannelQObject,
12 from .kernel_mixins import ( ChannelQObject,
13 QtHBChannelMixin, QtKernelClientMixin,
13 QtKernelClientMixin,
14 QtKernelManagerMixin,
14 QtKernelManagerMixin,
15 )
15 )
16
16
17 class QtInProcessChannel(ChannelQObject, InProcessChannel):
17 class QtInProcessChannel(ChannelQObject, InProcessChannel):
18 pass
18 pass
19
19
20 class QtInProcessHBChannel(QtHBChannelMixin, InProcessHBChannel):
21 pass
22
23 class QtInProcessKernelClient(QtKernelClientMixin, InProcessKernelClient):
20 class QtInProcessKernelClient(QtKernelClientMixin, InProcessKernelClient):
24 """ An in-process KernelManager with signals and slots.
21 """ An in-process KernelManager with signals and slots.
25 """
22 """
26
23
27 iopub_channel_class = Type(QtInProcessChannel)
24 iopub_channel_class = Type(QtInProcessChannel)
28 shell_channel_class = Type(QtInProcessChannel)
25 shell_channel_class = Type(QtInProcessChannel)
29 stdin_channel_class = Type(QtInProcessChannel)
26 stdin_channel_class = Type(QtInProcessChannel)
30 hb_channel_class = Type(QtInProcessHBChannel)
27 hb_channel_class = Type(InProcessHBChannel)
31
28
32 class QtInProcessKernelManager(QtKernelManagerMixin, InProcessKernelManager):
29 class QtInProcessKernelManager(QtKernelManagerMixin, InProcessKernelManager):
33 client_class = __module__ + '.QtInProcessKernelClient'
30 client_class = __module__ + '.QtInProcessKernelClient'
@@ -1,105 +1,94 b''
1 """Defines a KernelManager that provides signals and slots."""
1 """Defines a KernelManager that provides signals and slots."""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from IPython.external.qt import QtCore
6 from IPython.external.qt import QtCore
7
7
8 from IPython.utils.traitlets import HasTraits, Type
8 from IPython.utils.traitlets import HasTraits, Type
9 from .util import MetaQObjectHasTraits, SuperQObject
9 from .util import MetaQObjectHasTraits, SuperQObject
10
10
11
11
12 class ChannelQObject(SuperQObject):
12 class ChannelQObject(SuperQObject):
13
13
14 # Emitted when the channel is started.
14 # Emitted when the channel is started.
15 started = QtCore.Signal()
15 started = QtCore.Signal()
16
16
17 # Emitted when the channel is stopped.
17 # Emitted when the channel is stopped.
18 stopped = QtCore.Signal()
18 stopped = QtCore.Signal()
19
19
20 # Emitted when any message is received.
20 # Emitted when any message is received.
21 message_received = QtCore.Signal(object)
21 message_received = QtCore.Signal(object)
22
22
23 def start(self):
23 def start(self):
24 """ Reimplemented to emit signal.
24 """ Reimplemented to emit signal.
25 """
25 """
26 super(ChannelQObject, self).start()
26 super(ChannelQObject, self).start()
27 self.started.emit()
27 self.started.emit()
28
28
29 def stop(self):
29 def stop(self):
30 """ Reimplemented to emit signal.
30 """ Reimplemented to emit signal.
31 """
31 """
32 super(ChannelQObject, self).stop()
32 super(ChannelQObject, self).stop()
33 self.stopped.emit()
33 self.stopped.emit()
34
34
35 def call_handlers_later(self, *args, **kwds):
35 def call_handlers_later(self, *args, **kwds):
36 """ Call the message handlers later.
36 """ Call the message handlers later.
37 """
37 """
38 do_later = lambda: self.call_handlers(*args, **kwds)
38 do_later = lambda: self.call_handlers(*args, **kwds)
39 QtCore.QTimer.singleShot(0, do_later)
39 QtCore.QTimer.singleShot(0, do_later)
40
40
41 def call_handlers(self, msg):
41 def call_handlers(self, msg):
42 self.message_received.emit(msg)
42 self.message_received.emit(msg)
43
43
44 def process_events(self):
44 def process_events(self):
45 """ Process any pending GUI events.
45 """ Process any pending GUI events.
46 """
46 """
47 QtCore.QCoreApplication.instance().processEvents()
47 QtCore.QCoreApplication.instance().processEvents()
48
48
49 def flush(self):
49 def flush(self):
50 """ Reimplemented to ensure that signals are dispatched immediately.
50 """ Reimplemented to ensure that signals are dispatched immediately.
51 """
51 """
52 super(ChannelQObject, self).flush()
52 super(ChannelQObject, self).flush()
53 self.process_events()
53 self.process_events()
54
54
55
55
56 class QtHBChannelMixin(ChannelQObject):
57
58 # Emitted when the kernel has died.
59 kernel_died = QtCore.Signal(object)
60
61 def call_handlers(self, since_last_heartbeat):
62 """ Reimplemented to emit signals instead of making callbacks.
63 """
64 self.kernel_died.emit(since_last_heartbeat)
65
66
67 class QtKernelRestarterMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
56 class QtKernelRestarterMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
68
57
69 _timer = None
58 _timer = None
70
59
71
60
72 class QtKernelManagerMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
61 class QtKernelManagerMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
73 """ A KernelClient that provides signals and slots.
62 """ A KernelClient that provides signals and slots.
74 """
63 """
75
64
76 kernel_restarted = QtCore.Signal()
65 kernel_restarted = QtCore.Signal()
77
66
78
67
79 class QtKernelClientMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
68 class QtKernelClientMixin(MetaQObjectHasTraits('NewBase', (HasTraits, SuperQObject), {})):
80 """ A KernelClient that provides signals and slots.
69 """ A KernelClient that provides signals and slots.
81 """
70 """
82
71
83 # Emitted when the kernel client has started listening.
72 # Emitted when the kernel client has started listening.
84 started_channels = QtCore.Signal()
73 started_channels = QtCore.Signal()
85
74
86 # Emitted when the kernel client has stopped listening.
75 # Emitted when the kernel client has stopped listening.
87 stopped_channels = QtCore.Signal()
76 stopped_channels = QtCore.Signal()
88
77
89 #---------------------------------------------------------------------------
78 #---------------------------------------------------------------------------
90 # 'KernelClient' interface
79 # 'KernelClient' interface
91 #---------------------------------------------------------------------------
80 #---------------------------------------------------------------------------
92
81
93 #------ Channel management -------------------------------------------------
82 #------ Channel management -------------------------------------------------
94
83
95 def start_channels(self, *args, **kw):
84 def start_channels(self, *args, **kw):
96 """ Reimplemented to emit signal.
85 """ Reimplemented to emit signal.
97 """
86 """
98 super(QtKernelClientMixin, self).start_channels(*args, **kw)
87 super(QtKernelClientMixin, self).start_channels(*args, **kw)
99 self.started_channels.emit()
88 self.started_channels.emit()
100
89
101 def stop_channels(self):
90 def stop_channels(self):
102 """ Reimplemented to emit signal.
91 """ Reimplemented to emit signal.
103 """
92 """
104 super(QtKernelClientMixin, self).stop_channels()
93 super(QtKernelClientMixin, self).stop_channels()
105 self.stopped_channels.emit()
94 self.stopped_channels.emit()
General Comments 0
You need to be logged in to leave comments. Login now