##// END OF EJS Templates
Get rid of address property on channels
Thomas Kluyver -
Show More
@@ -1,143 +1,135 b''
1 """Blocking channels
1 """Blocking channels
2
2
3 Useful for test suites and blocking terminal interfaces.
3 Useful for test suites and blocking terminal interfaces.
4 """
4 """
5
5
6 # Copyright (c) IPython Development Team.
6 # Copyright (c) IPython Development Team.
7 # Distributed under the terms of the Modified BSD License.
7 # Distributed under the terms of the Modified BSD License.
8
8
9 try:
9 try:
10 from queue import Queue, Empty # Py 3
10 from queue import Queue, Empty # Py 3
11 except ImportError:
11 except ImportError:
12 from Queue import Queue, Empty # Py 2
12 from Queue import Queue, Empty # Py 2
13
13
14 from IPython.kernel.channelsabc import ShellChannelABC, IOPubChannelABC, \
14 from IPython.kernel.channelsabc import ShellChannelABC, IOPubChannelABC, \
15 StdInChannelABC
15 StdInChannelABC
16 from IPython.kernel.channels import HBChannel,\
16 from IPython.kernel.channels import HBChannel,\
17 make_iopub_socket, make_shell_socket, make_stdin_socket,\
17 make_iopub_socket, make_shell_socket, make_stdin_socket,\
18 InvalidPortNumber, major_protocol_version
18 InvalidPortNumber, major_protocol_version
19 from IPython.utils.py3compat import string_types, iteritems
19 from IPython.utils.py3compat import string_types, iteritems
20
20
21 # some utilities to validate message structure, these might get moved elsewhere
21 # some utilities to validate message structure, these might get moved elsewhere
22 # if they prove to have more generic utility
22 # if they prove to have more generic utility
23
23
24 def validate_string_list(lst):
24 def validate_string_list(lst):
25 """Validate that the input is a list of strings.
25 """Validate that the input is a list of strings.
26
26
27 Raises ValueError if not."""
27 Raises ValueError if not."""
28 if not isinstance(lst, list):
28 if not isinstance(lst, list):
29 raise ValueError('input %r must be a list' % lst)
29 raise ValueError('input %r must be a list' % lst)
30 for x in lst:
30 for x in lst:
31 if not isinstance(x, string_types):
31 if not isinstance(x, string_types):
32 raise ValueError('element %r in list must be a string' % x)
32 raise ValueError('element %r in list must be a string' % x)
33
33
34
34
35 def validate_string_dict(dct):
35 def validate_string_dict(dct):
36 """Validate that the input is a dict with string keys and values.
36 """Validate that the input is a dict with string keys and values.
37
37
38 Raises ValueError if not."""
38 Raises ValueError if not."""
39 for k,v in iteritems(dct):
39 for k,v in iteritems(dct):
40 if not isinstance(k, string_types):
40 if not isinstance(k, string_types):
41 raise ValueError('key %r in dict must be a string' % k)
41 raise ValueError('key %r in dict must be a string' % k)
42 if not isinstance(v, string_types):
42 if not isinstance(v, string_types):
43 raise ValueError('value %r in dict must be a string' % v)
43 raise ValueError('value %r in dict must be a string' % v)
44
44
45
45
46 class ZMQSocketChannel(object):
46 class ZMQSocketChannel(object):
47 """The base class for the channels that use ZMQ sockets."""
47 """A ZMQ socket in a simple blocking API"""
48 session = None
48 session = None
49 socket = None
49 socket = None
50 stream = None
50 stream = None
51 _exiting = False
51 _exiting = False
52 proxy_methods = []
52 proxy_methods = []
53
53
54 def __init__(self, socket, session, loop=None):
54 def __init__(self, socket, session, loop=None):
55 """Create a channel.
55 """Create a channel.
56
56
57 Parameters
57 Parameters
58 ----------
58 ----------
59 socket : :class:`zmq.Socket`
59 socket : :class:`zmq.Socket`
60 The ZMQ socket to use.
60 The ZMQ socket to use.
61 session : :class:`session.Session`
61 session : :class:`session.Session`
62 The session to use.
62 The session to use.
63 loop
63 loop
64 Unused here, for other implementations
64 Unused here, for other implementations
65 """
65 """
66 super(ZMQSocketChannel, self).__init__()
66 super(ZMQSocketChannel, self).__init__()
67
67
68 self.socket = socket
68 self.socket = socket
69 self.session = session
69 self.session = session
70
70
71 def _recv(self, **kwargs):
71 def _recv(self, **kwargs):
72 msg = self.socket.recv_multipart(**kwargs)
72 msg = self.socket.recv_multipart(**kwargs)
73 ident,smsg = self.session.feed_identities(msg)
73 ident,smsg = self.session.feed_identities(msg)
74 return self.session.deserialize(smsg)
74 return self.session.deserialize(smsg)
75
75
76 def get_msg(self, block=True, timeout=None):
76 def get_msg(self, block=True, timeout=None):
77 """ Gets a message if there is one that is ready. """
77 """ Gets a message if there is one that is ready. """
78 if block:
78 if block:
79 if timeout is not None:
79 if timeout is not None:
80 timeout *= 1000 # seconds to ms
80 timeout *= 1000 # seconds to ms
81 ready = self.socket.poll(timeout)
81 ready = self.socket.poll(timeout)
82 else:
82 else:
83 ready = self.socket.poll(timeout=0)
83 ready = self.socket.poll(timeout=0)
84
84
85 if ready:
85 if ready:
86 return self._recv()
86 return self._recv()
87 else:
87 else:
88 raise Empty
88 raise Empty
89
89
90 def get_msgs(self):
90 def get_msgs(self):
91 """ Get all messages that are currently ready. """
91 """ Get all messages that are currently ready. """
92 msgs = []
92 msgs = []
93 while True:
93 while True:
94 try:
94 try:
95 msgs.append(self.get_msg(block=False))
95 msgs.append(self.get_msg(block=False))
96 except Empty:
96 except Empty:
97 break
97 break
98 return msgs
98 return msgs
99
99
100 def msg_ready(self):
100 def msg_ready(self):
101 """ Is there a message that has been received? """
101 """ Is there a message that has been received? """
102 return bool(self.socket.poll(timeout=0))
102 return bool(self.socket.poll(timeout=0))
103
103
104 def close(self):
104 def close(self):
105 if self.socket is not None:
105 if self.socket is not None:
106 try:
106 try:
107 self.socket.close(linger=0)
107 self.socket.close(linger=0)
108 except Exception:
108 except Exception:
109 pass
109 pass
110 self.socket = None
110 self.socket = None
111 stop = close
111 stop = close
112
112
113 def is_alive(self):
113 def is_alive(self):
114 return (self.socket is not None)
114 return (self.socket is not None)
115
115
116 @property
117 def address(self):
118 """Get the channel's address as a zmq url string.
119
120 These URLS have the form: 'tcp://127.0.0.1:5555'.
121 """
122 return self._address
123
124 def _queue_send(self, msg):
116 def _queue_send(self, msg):
125 """Pass a message to the ZMQ socket to send
117 """Pass a message to the ZMQ socket to send
126 """
118 """
127 self.session.send(self.socket, msg)
119 self.session.send(self.socket, msg)
128
120
129 def start(self):
121 def start(self):
130 pass
122 pass
131
123
132
124
133
125
134 class BlockingHBChannel(HBChannel):
126 class BlockingHBChannel(HBChannel):
135
127
136 # This kernel needs quicker monitoring, shorten to 1 sec.
128 # This kernel needs quicker monitoring, shorten to 1 sec.
137 # less than 0.5s is unreliable, and will get occasional
129 # less than 0.5s is unreliable, and will get occasional
138 # false reports of missed beats.
130 # false reports of missed beats.
139 time_to_dead = 1.
131 time_to_dead = 1.
140
132
141 def call_handlers(self, since_last_heartbeat):
133 def call_handlers(self, since_last_heartbeat):
142 """ Pause beating on missed heartbeat. """
134 """ Pause beating on missed heartbeat. """
143 pass
135 pass
@@ -1,278 +1,270 b''
1 """ Defines a KernelClient that provides signals and slots.
1 """ Defines a KernelClient that provides signals and slots.
2 """
2 """
3 import atexit
3 import atexit
4 import errno
4 import errno
5 from threading import Thread
5 from threading import Thread
6 import time
6 import time
7
7
8 import zmq
8 import zmq
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 # during garbage collection of threads at exit:
10 # during garbage collection of threads at exit:
11 from zmq import ZMQError
11 from zmq import ZMQError
12 from zmq.eventloop import ioloop, zmqstream
12 from zmq.eventloop import ioloop, zmqstream
13
13
14 from IPython.external.qt import QtCore
14 from IPython.external.qt import QtCore
15
15
16 # Local imports
16 # Local imports
17 from IPython.utils.traitlets import Type, Instance
17 from IPython.utils.traitlets import Type, Instance
18 from IPython.kernel.channels import HBChannel,\
18 from IPython.kernel.channels import HBChannel,\
19 make_shell_socket, make_iopub_socket, make_stdin_socket
19 make_shell_socket, make_iopub_socket, make_stdin_socket
20 from IPython.kernel import KernelClient
20 from IPython.kernel import KernelClient
21
21
22 from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin)
22 from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin)
23 from .util import SuperQObject
23 from .util import SuperQObject
24
24
25 class QtHBChannel(QtHBChannelMixin, HBChannel):
25 class QtHBChannel(QtHBChannelMixin, HBChannel):
26 pass
26 pass
27
27
28 from IPython.core.release import kernel_protocol_version_info
28 from IPython.core.release import kernel_protocol_version_info
29
29
30 from IPython.kernel.channelsabc import (
30 from IPython.kernel.channelsabc import (
31 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
31 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
32 )
32 )
33 from IPython.utils.py3compat import string_types, iteritems
33 from IPython.utils.py3compat import string_types, iteritems
34
34
35 major_protocol_version = kernel_protocol_version_info[0]
35 major_protocol_version = kernel_protocol_version_info[0]
36
36
37 class InvalidPortNumber(Exception):
37 class InvalidPortNumber(Exception):
38 pass
38 pass
39
39
40 # some utilities to validate message structure, these might get moved elsewhere
40 # some utilities to validate message structure, these might get moved elsewhere
41 # if they prove to have more generic utility
41 # if they prove to have more generic utility
42
42
43
43
44 def validate_string_dict(dct):
44 def validate_string_dict(dct):
45 """Validate that the input is a dict with string keys and values.
45 """Validate that the input is a dict with string keys and values.
46
46
47 Raises ValueError if not."""
47 Raises ValueError if not."""
48 for k,v in iteritems(dct):
48 for k,v in iteritems(dct):
49 if not isinstance(k, string_types):
49 if not isinstance(k, string_types):
50 raise ValueError('key %r in dict must be a string' % k)
50 raise ValueError('key %r in dict must be a string' % k)
51 if not isinstance(v, string_types):
51 if not isinstance(v, string_types):
52 raise ValueError('value %r in dict must be a string' % v)
52 raise ValueError('value %r in dict must be a string' % v)
53
53
54
54
55
55
56 class QtZMQSocketChannel(SuperQObject):
56 class QtZMQSocketChannel(SuperQObject):
57 """The base class for the channels that use ZMQ sockets."""
57 """A ZMQ socket emitting a Qt signal when a message is received."""
58 session = None
58 session = None
59 socket = None
59 socket = None
60 ioloop = None
60 ioloop = None
61 stream = None
61 stream = None
62
62
63 message_received = QtCore.Signal(object)
63 message_received = QtCore.Signal(object)
64
64
65 #---------------------------------------------------------------------------
65 #---------------------------------------------------------------------------
66 # InProcessChannel interface
66 # InProcessChannel interface
67 #---------------------------------------------------------------------------
67 #---------------------------------------------------------------------------
68
68
69 def call_handlers_later(self, *args, **kwds):
69 def call_handlers_later(self, *args, **kwds):
70 """ Call the message handlers later.
70 """ Call the message handlers later.
71 """
71 """
72 do_later = lambda: self.call_handlers(*args, **kwds)
72 do_later = lambda: self.call_handlers(*args, **kwds)
73 QtCore.QTimer.singleShot(0, do_later)
73 QtCore.QTimer.singleShot(0, do_later)
74
74
75 def process_events(self):
75 def process_events(self):
76 """ Process any pending GUI events.
76 """ Process any pending GUI events.
77 """
77 """
78 QtCore.QCoreApplication.instance().processEvents()
78 QtCore.QCoreApplication.instance().processEvents()
79
79
80 def __init__(self, socket, session, loop):
80 def __init__(self, socket, session, loop):
81 """Create a channel.
81 """Create a channel.
82
82
83 Parameters
83 Parameters
84 ----------
84 ----------
85 socket : :class:`zmq.Socket`
85 socket : :class:`zmq.Socket`
86 The ZMQ socket to use.
86 The ZMQ socket to use.
87 session : :class:`session.Session`
87 session : :class:`session.Session`
88 The session to use.
88 The session to use.
89 loop
89 loop
90 A pyzmq ioloop to connect the socket to using a ZMQStream
90 A pyzmq ioloop to connect the socket to using a ZMQStream
91 """
91 """
92 super(QtZMQSocketChannel, self).__init__()
92 super(QtZMQSocketChannel, self).__init__()
93
93
94 self.socket = socket
94 self.socket = socket
95 self.session = session
95 self.session = session
96 self.ioloop = loop
96 self.ioloop = loop
97
97
98 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
98 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
99 self.stream.on_recv(self._handle_recv)
99 self.stream.on_recv(self._handle_recv)
100
100
101 _is_alive = False
101 _is_alive = False
102 def is_alive(self):
102 def is_alive(self):
103 return self._is_alive
103 return self._is_alive
104
104
105 def start(self):
105 def start(self):
106 self._is_alive = True
106 self._is_alive = True
107
107
108 def stop(self):
108 def stop(self):
109 self._is_alive = False
109 self._is_alive = False
110
110
111 def close(self):
111 def close(self):
112 if self.socket is not None:
112 if self.socket is not None:
113 try:
113 try:
114 self.socket.close(linger=0)
114 self.socket.close(linger=0)
115 except Exception:
115 except Exception:
116 pass
116 pass
117 self.socket = None
117 self.socket = None
118
118
119 @property
120 def address(self):
121 """Get the channel's address as a zmq url string.
122
123 These URLS have the form: 'tcp://127.0.0.1:5555'.
124 """
125 return self._address
126
127 def _queue_send(self, msg):
119 def _queue_send(self, msg):
128 """Queue a message to be sent from the IOLoop's thread.
120 """Queue a message to be sent from the IOLoop's thread.
129
121
130 Parameters
122 Parameters
131 ----------
123 ----------
132 msg : message to send
124 msg : message to send
133
125
134 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
126 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
135 thread control of the action.
127 thread control of the action.
136 """
128 """
137 def thread_send():
129 def thread_send():
138 self.session.send(self.stream, msg)
130 self.session.send(self.stream, msg)
139 self.ioloop.add_callback(thread_send)
131 self.ioloop.add_callback(thread_send)
140
132
141 def _handle_recv(self, msg):
133 def _handle_recv(self, msg):
142 """Callback for stream.on_recv.
134 """Callback for stream.on_recv.
143
135
144 Unpacks message, and calls handlers with it.
136 Unpacks message, and calls handlers with it.
145 """
137 """
146 ident,smsg = self.session.feed_identities(msg)
138 ident,smsg = self.session.feed_identities(msg)
147 msg = self.session.deserialize(smsg)
139 msg = self.session.deserialize(smsg)
148 self.call_handlers(msg)
140 self.call_handlers(msg)
149
141
150 def call_handlers(self, msg):
142 def call_handlers(self, msg):
151 """This method is called in the ioloop thread when a message arrives.
143 """This method is called in the ioloop thread when a message arrives.
152
144
153 Subclasses should override this method to handle incoming messages.
145 Subclasses should override this method to handle incoming messages.
154 It is important to remember that this method is called in the thread
146 It is important to remember that this method is called in the thread
155 so that some logic must be done to ensure that the application level
147 so that some logic must be done to ensure that the application level
156 handlers are called in the application thread.
148 handlers are called in the application thread.
157 """
149 """
158 # Emit the generic signal.
150 # Emit the generic signal.
159 self.message_received.emit(msg)
151 self.message_received.emit(msg)
160
152
161 def flush(self, timeout=1.0):
153 def flush(self, timeout=1.0):
162 """Immediately processes all pending messages on this channel.
154 """Immediately processes all pending messages on this channel.
163
155
164 This is only used for the IOPub channel.
156 This is only used for the IOPub channel.
165
157
166 Callers should use this method to ensure that :meth:`call_handlers`
158 Callers should use this method to ensure that :meth:`call_handlers`
167 has been called for all messages that have been received on the
159 has been called for all messages that have been received on the
168 0MQ SUB socket of this channel.
160 0MQ SUB socket of this channel.
169
161
170 This method is thread safe.
162 This method is thread safe.
171
163
172 Parameters
164 Parameters
173 ----------
165 ----------
174 timeout : float, optional
166 timeout : float, optional
175 The maximum amount of time to spend flushing, in seconds. The
167 The maximum amount of time to spend flushing, in seconds. The
176 default is one second.
168 default is one second.
177 """
169 """
178 # We do the IOLoop callback process twice to ensure that the IOLoop
170 # We do the IOLoop callback process twice to ensure that the IOLoop
179 # gets to perform at least one full poll.
171 # gets to perform at least one full poll.
180 stop_time = time.time() + timeout
172 stop_time = time.time() + timeout
181 for i in range(2):
173 for i in range(2):
182 self._flushed = False
174 self._flushed = False
183 self.ioloop.add_callback(self._flush)
175 self.ioloop.add_callback(self._flush)
184 while not self._flushed and time.time() < stop_time:
176 while not self._flushed and time.time() < stop_time:
185 time.sleep(0.01)
177 time.sleep(0.01)
186
178
187 def _flush(self):
179 def _flush(self):
188 """Callback for :method:`self.flush`."""
180 """Callback for :method:`self.flush`."""
189 self.stream.flush()
181 self.stream.flush()
190 self._flushed = True
182 self._flushed = True
191
183
192
184
193 class IOLoopThread(Thread):
185 class IOLoopThread(Thread):
194 """Run a pyzmq ioloop in a thread to send and receive messages
186 """Run a pyzmq ioloop in a thread to send and receive messages
195 """
187 """
196 def __init__(self, loop):
188 def __init__(self, loop):
197 super(IOLoopThread, self).__init__()
189 super(IOLoopThread, self).__init__()
198 self.daemon = True
190 self.daemon = True
199 atexit.register(self._notice_exit)
191 atexit.register(self._notice_exit)
200 self.ioloop = loop or ioloop.IOLoop()
192 self.ioloop = loop or ioloop.IOLoop()
201
193
202 def _notice_exit(self):
194 def _notice_exit(self):
203 self._exiting = True
195 self._exiting = True
204
196
205 def run(self):
197 def run(self):
206 """Run my loop, ignoring EINTR events in the poller"""
198 """Run my loop, ignoring EINTR events in the poller"""
207 while True:
199 while True:
208 try:
200 try:
209 self.ioloop.start()
201 self.ioloop.start()
210 except ZMQError as e:
202 except ZMQError as e:
211 if e.errno == errno.EINTR:
203 if e.errno == errno.EINTR:
212 continue
204 continue
213 else:
205 else:
214 raise
206 raise
215 except Exception:
207 except Exception:
216 if self._exiting:
208 if self._exiting:
217 break
209 break
218 else:
210 else:
219 raise
211 raise
220 else:
212 else:
221 break
213 break
222
214
223 def stop(self):
215 def stop(self):
224 """Stop the channel's event loop and join its thread.
216 """Stop the channel's event loop and join its thread.
225
217
226 This calls :meth:`~threading.Thread.join` and returns when the thread
218 This calls :meth:`~threading.Thread.join` and returns when the thread
227 terminates. :class:`RuntimeError` will be raised if
219 terminates. :class:`RuntimeError` will be raised if
228 :meth:`~threading.Thread.start` is called again.
220 :meth:`~threading.Thread.start` is called again.
229 """
221 """
230 if self.ioloop is not None:
222 if self.ioloop is not None:
231 self.ioloop.stop()
223 self.ioloop.stop()
232 self.join()
224 self.join()
233 self.close()
225 self.close()
234
226
235 def close(self):
227 def close(self):
236 if self.ioloop is not None:
228 if self.ioloop is not None:
237 try:
229 try:
238 self.ioloop.close(all_fds=True)
230 self.ioloop.close(all_fds=True)
239 except Exception:
231 except Exception:
240 pass
232 pass
241
233
242
234
243 class QtKernelClient(QtKernelClientMixin, KernelClient):
235 class QtKernelClient(QtKernelClientMixin, KernelClient):
244 """ A KernelClient that provides signals and slots.
236 """ A KernelClient that provides signals and slots.
245 """
237 """
246
238
247 _ioloop = None
239 _ioloop = None
248 @property
240 @property
249 def ioloop(self):
241 def ioloop(self):
250 if self._ioloop is None:
242 if self._ioloop is None:
251 self._ioloop = ioloop.IOLoop()
243 self._ioloop = ioloop.IOLoop()
252 return self._ioloop
244 return self._ioloop
253
245
254 ioloop_thread = Instance(IOLoopThread)
246 ioloop_thread = Instance(IOLoopThread)
255
247
256 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
248 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
257 if shell:
249 if shell:
258 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
250 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
259
251
260 self.channel_listener_thread = IOLoopThread(self.ioloop)
252 self.channel_listener_thread = IOLoopThread(self.ioloop)
261 self.channel_listener_thread.start()
253 self.channel_listener_thread.start()
262
254
263 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
255 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
264
256
265 def _check_kernel_info_reply(self, msg):
257 def _check_kernel_info_reply(self, msg):
266 if msg['msg_type'] == 'kernel_info_reply':
258 if msg['msg_type'] == 'kernel_info_reply':
267 self._handle_kernel_info_reply(msg)
259 self._handle_kernel_info_reply(msg)
268 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
260 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
269
261
270 def stop_channels(self):
262 def stop_channels(self):
271 super(QtKernelClient, self).stop_channels()
263 super(QtKernelClient, self).stop_channels()
272 if self.ioloop_thread.is_alive():
264 if self.ioloop_thread.is_alive():
273 self.ioloop_thread.stop()
265 self.ioloop_thread.stop()
274
266
275 iopub_channel_class = Type(QtZMQSocketChannel)
267 iopub_channel_class = Type(QtZMQSocketChannel)
276 shell_channel_class = Type(QtZMQSocketChannel)
268 shell_channel_class = Type(QtZMQSocketChannel)
277 stdin_channel_class = Type(QtZMQSocketChannel)
269 stdin_channel_class = Type(QtZMQSocketChannel)
278 hb_channel_class = Type(QtHBChannel)
270 hb_channel_class = Type(QtHBChannel)
General Comments 0
You need to be logged in to leave comments. Login now