##// END OF EJS Templates
Get rid of address property on channels
Thomas Kluyver -
Show More
@@ -1,143 +1,135
1 1 """Blocking channels
2 2
3 3 Useful for test suites and blocking terminal interfaces.
4 4 """
5 5
6 6 # Copyright (c) IPython Development Team.
7 7 # Distributed under the terms of the Modified BSD License.
8 8
9 9 try:
10 10 from queue import Queue, Empty # Py 3
11 11 except ImportError:
12 12 from Queue import Queue, Empty # Py 2
13 13
14 14 from IPython.kernel.channelsabc import ShellChannelABC, IOPubChannelABC, \
15 15 StdInChannelABC
16 16 from IPython.kernel.channels import HBChannel,\
17 17 make_iopub_socket, make_shell_socket, make_stdin_socket,\
18 18 InvalidPortNumber, major_protocol_version
19 19 from IPython.utils.py3compat import string_types, iteritems
20 20
21 21 # some utilities to validate message structure, these might get moved elsewhere
22 22 # if they prove to have more generic utility
23 23
24 24 def validate_string_list(lst):
25 25 """Validate that the input is a list of strings.
26 26
27 27 Raises ValueError if not."""
28 28 if not isinstance(lst, list):
29 29 raise ValueError('input %r must be a list' % lst)
30 30 for x in lst:
31 31 if not isinstance(x, string_types):
32 32 raise ValueError('element %r in list must be a string' % x)
33 33
34 34
35 35 def validate_string_dict(dct):
36 36 """Validate that the input is a dict with string keys and values.
37 37
38 38 Raises ValueError if not."""
39 39 for k,v in iteritems(dct):
40 40 if not isinstance(k, string_types):
41 41 raise ValueError('key %r in dict must be a string' % k)
42 42 if not isinstance(v, string_types):
43 43 raise ValueError('value %r in dict must be a string' % v)
44 44
45 45
46 46 class ZMQSocketChannel(object):
47 """The base class for the channels that use ZMQ sockets."""
47 """A ZMQ socket in a simple blocking API"""
48 48 session = None
49 49 socket = None
50 50 stream = None
51 51 _exiting = False
52 52 proxy_methods = []
53 53
54 54 def __init__(self, socket, session, loop=None):
55 55 """Create a channel.
56 56
57 57 Parameters
58 58 ----------
59 59 socket : :class:`zmq.Socket`
60 60 The ZMQ socket to use.
61 61 session : :class:`session.Session`
62 62 The session to use.
63 63 loop
64 64 Unused here, for other implementations
65 65 """
66 66 super(ZMQSocketChannel, self).__init__()
67 67
68 68 self.socket = socket
69 69 self.session = session
70 70
71 71 def _recv(self, **kwargs):
72 72 msg = self.socket.recv_multipart(**kwargs)
73 73 ident,smsg = self.session.feed_identities(msg)
74 74 return self.session.deserialize(smsg)
75 75
76 76 def get_msg(self, block=True, timeout=None):
77 77 """ Gets a message if there is one that is ready. """
78 78 if block:
79 79 if timeout is not None:
80 80 timeout *= 1000 # seconds to ms
81 81 ready = self.socket.poll(timeout)
82 82 else:
83 83 ready = self.socket.poll(timeout=0)
84 84
85 85 if ready:
86 86 return self._recv()
87 87 else:
88 88 raise Empty
89 89
90 90 def get_msgs(self):
91 91 """ Get all messages that are currently ready. """
92 92 msgs = []
93 93 while True:
94 94 try:
95 95 msgs.append(self.get_msg(block=False))
96 96 except Empty:
97 97 break
98 98 return msgs
99 99
100 100 def msg_ready(self):
101 101 """ Is there a message that has been received? """
102 102 return bool(self.socket.poll(timeout=0))
103 103
104 104 def close(self):
105 105 if self.socket is not None:
106 106 try:
107 107 self.socket.close(linger=0)
108 108 except Exception:
109 109 pass
110 110 self.socket = None
111 111 stop = close
112 112
113 113 def is_alive(self):
114 114 return (self.socket is not None)
115 115
116 @property
117 def address(self):
118 """Get the channel's address as a zmq url string.
119
120 These URLS have the form: 'tcp://127.0.0.1:5555'.
121 """
122 return self._address
123
124 116 def _queue_send(self, msg):
125 117 """Pass a message to the ZMQ socket to send
126 118 """
127 119 self.session.send(self.socket, msg)
128 120
129 121 def start(self):
130 122 pass
131 123
132 124
133 125
134 126 class BlockingHBChannel(HBChannel):
135 127
136 128 # This kernel needs quicker monitoring, shorten to 1 sec.
137 129 # less than 0.5s is unreliable, and will get occasional
138 130 # false reports of missed beats.
139 131 time_to_dead = 1.
140 132
141 133 def call_handlers(self, since_last_heartbeat):
142 134 """ Pause beating on missed heartbeat. """
143 135 pass
@@ -1,278 +1,270
1 1 """ Defines a KernelClient that provides signals and slots.
2 2 """
3 3 import atexit
4 4 import errno
5 5 from threading import Thread
6 6 import time
7 7
8 8 import zmq
9 9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 10 # during garbage collection of threads at exit:
11 11 from zmq import ZMQError
12 12 from zmq.eventloop import ioloop, zmqstream
13 13
14 14 from IPython.external.qt import QtCore
15 15
16 16 # Local imports
17 17 from IPython.utils.traitlets import Type, Instance
18 18 from IPython.kernel.channels import HBChannel,\
19 19 make_shell_socket, make_iopub_socket, make_stdin_socket
20 20 from IPython.kernel import KernelClient
21 21
22 22 from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin)
23 23 from .util import SuperQObject
24 24
25 25 class QtHBChannel(QtHBChannelMixin, HBChannel):
26 26 pass
27 27
28 28 from IPython.core.release import kernel_protocol_version_info
29 29
30 30 from IPython.kernel.channelsabc import (
31 31 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
32 32 )
33 33 from IPython.utils.py3compat import string_types, iteritems
34 34
35 35 major_protocol_version = kernel_protocol_version_info[0]
36 36
37 37 class InvalidPortNumber(Exception):
38 38 pass
39 39
40 40 # some utilities to validate message structure, these might get moved elsewhere
41 41 # if they prove to have more generic utility
42 42
43 43
44 44 def validate_string_dict(dct):
45 45 """Validate that the input is a dict with string keys and values.
46 46
47 47 Raises ValueError if not."""
48 48 for k,v in iteritems(dct):
49 49 if not isinstance(k, string_types):
50 50 raise ValueError('key %r in dict must be a string' % k)
51 51 if not isinstance(v, string_types):
52 52 raise ValueError('value %r in dict must be a string' % v)
53 53
54 54
55 55
56 56 class QtZMQSocketChannel(SuperQObject):
57 """The base class for the channels that use ZMQ sockets."""
57 """A ZMQ socket emitting a Qt signal when a message is received."""
58 58 session = None
59 59 socket = None
60 60 ioloop = None
61 61 stream = None
62 62
63 63 message_received = QtCore.Signal(object)
64 64
65 65 #---------------------------------------------------------------------------
66 66 # InProcessChannel interface
67 67 #---------------------------------------------------------------------------
68 68
69 69 def call_handlers_later(self, *args, **kwds):
70 70 """ Call the message handlers later.
71 71 """
72 72 do_later = lambda: self.call_handlers(*args, **kwds)
73 73 QtCore.QTimer.singleShot(0, do_later)
74 74
75 75 def process_events(self):
76 76 """ Process any pending GUI events.
77 77 """
78 78 QtCore.QCoreApplication.instance().processEvents()
79 79
80 80 def __init__(self, socket, session, loop):
81 81 """Create a channel.
82 82
83 83 Parameters
84 84 ----------
85 85 socket : :class:`zmq.Socket`
86 86 The ZMQ socket to use.
87 87 session : :class:`session.Session`
88 88 The session to use.
89 89 loop
90 90 A pyzmq ioloop to connect the socket to using a ZMQStream
91 91 """
92 92 super(QtZMQSocketChannel, self).__init__()
93 93
94 94 self.socket = socket
95 95 self.session = session
96 96 self.ioloop = loop
97 97
98 98 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
99 99 self.stream.on_recv(self._handle_recv)
100 100
101 101 _is_alive = False
102 102 def is_alive(self):
103 103 return self._is_alive
104 104
105 105 def start(self):
106 106 self._is_alive = True
107 107
108 108 def stop(self):
109 109 self._is_alive = False
110 110
111 111 def close(self):
112 112 if self.socket is not None:
113 113 try:
114 114 self.socket.close(linger=0)
115 115 except Exception:
116 116 pass
117 117 self.socket = None
118 118
119 @property
120 def address(self):
121 """Get the channel's address as a zmq url string.
122
123 These URLS have the form: 'tcp://127.0.0.1:5555'.
124 """
125 return self._address
126
127 119 def _queue_send(self, msg):
128 120 """Queue a message to be sent from the IOLoop's thread.
129 121
130 122 Parameters
131 123 ----------
132 124 msg : message to send
133 125
134 126 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
135 127 thread control of the action.
136 128 """
137 129 def thread_send():
138 130 self.session.send(self.stream, msg)
139 131 self.ioloop.add_callback(thread_send)
140 132
141 133 def _handle_recv(self, msg):
142 134 """Callback for stream.on_recv.
143 135
144 136 Unpacks message, and calls handlers with it.
145 137 """
146 138 ident,smsg = self.session.feed_identities(msg)
147 139 msg = self.session.deserialize(smsg)
148 140 self.call_handlers(msg)
149 141
150 142 def call_handlers(self, msg):
151 143 """This method is called in the ioloop thread when a message arrives.
152 144
153 145 Subclasses should override this method to handle incoming messages.
154 146 It is important to remember that this method is called in the thread
155 147 so that some logic must be done to ensure that the application level
156 148 handlers are called in the application thread.
157 149 """
158 150 # Emit the generic signal.
159 151 self.message_received.emit(msg)
160 152
161 153 def flush(self, timeout=1.0):
162 154 """Immediately processes all pending messages on this channel.
163 155
164 156 This is only used for the IOPub channel.
165 157
166 158 Callers should use this method to ensure that :meth:`call_handlers`
167 159 has been called for all messages that have been received on the
168 160 0MQ SUB socket of this channel.
169 161
170 162 This method is thread safe.
171 163
172 164 Parameters
173 165 ----------
174 166 timeout : float, optional
175 167 The maximum amount of time to spend flushing, in seconds. The
176 168 default is one second.
177 169 """
178 170 # We do the IOLoop callback process twice to ensure that the IOLoop
179 171 # gets to perform at least one full poll.
180 172 stop_time = time.time() + timeout
181 173 for i in range(2):
182 174 self._flushed = False
183 175 self.ioloop.add_callback(self._flush)
184 176 while not self._flushed and time.time() < stop_time:
185 177 time.sleep(0.01)
186 178
187 179 def _flush(self):
188 180 """Callback for :method:`self.flush`."""
189 181 self.stream.flush()
190 182 self._flushed = True
191 183
192 184
193 185 class IOLoopThread(Thread):
194 186 """Run a pyzmq ioloop in a thread to send and receive messages
195 187 """
196 188 def __init__(self, loop):
197 189 super(IOLoopThread, self).__init__()
198 190 self.daemon = True
199 191 atexit.register(self._notice_exit)
200 192 self.ioloop = loop or ioloop.IOLoop()
201 193
202 194 def _notice_exit(self):
203 195 self._exiting = True
204 196
205 197 def run(self):
206 198 """Run my loop, ignoring EINTR events in the poller"""
207 199 while True:
208 200 try:
209 201 self.ioloop.start()
210 202 except ZMQError as e:
211 203 if e.errno == errno.EINTR:
212 204 continue
213 205 else:
214 206 raise
215 207 except Exception:
216 208 if self._exiting:
217 209 break
218 210 else:
219 211 raise
220 212 else:
221 213 break
222 214
223 215 def stop(self):
224 216 """Stop the channel's event loop and join its thread.
225 217
226 218 This calls :meth:`~threading.Thread.join` and returns when the thread
227 219 terminates. :class:`RuntimeError` will be raised if
228 220 :meth:`~threading.Thread.start` is called again.
229 221 """
230 222 if self.ioloop is not None:
231 223 self.ioloop.stop()
232 224 self.join()
233 225 self.close()
234 226
235 227 def close(self):
236 228 if self.ioloop is not None:
237 229 try:
238 230 self.ioloop.close(all_fds=True)
239 231 except Exception:
240 232 pass
241 233
242 234
243 235 class QtKernelClient(QtKernelClientMixin, KernelClient):
244 236 """ A KernelClient that provides signals and slots.
245 237 """
246 238
247 239 _ioloop = None
248 240 @property
249 241 def ioloop(self):
250 242 if self._ioloop is None:
251 243 self._ioloop = ioloop.IOLoop()
252 244 return self._ioloop
253 245
254 246 ioloop_thread = Instance(IOLoopThread)
255 247
256 248 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
257 249 if shell:
258 250 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
259 251
260 252 self.channel_listener_thread = IOLoopThread(self.ioloop)
261 253 self.channel_listener_thread.start()
262 254
263 255 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
264 256
265 257 def _check_kernel_info_reply(self, msg):
266 258 if msg['msg_type'] == 'kernel_info_reply':
267 259 self._handle_kernel_info_reply(msg)
268 260 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
269 261
270 262 def stop_channels(self):
271 263 super(QtKernelClient, self).stop_channels()
272 264 if self.ioloop_thread.is_alive():
273 265 self.ioloop_thread.stop()
274 266
275 267 iopub_channel_class = Type(QtZMQSocketChannel)
276 268 shell_channel_class = Type(QtZMQSocketChannel)
277 269 stdin_channel_class = Type(QtZMQSocketChannel)
278 270 hb_channel_class = Type(QtHBChannel)
General Comments 0
You need to be logged in to leave comments. Login now