##// END OF EJS Templates
Remove unused validate_string_list function
Thomas Kluyver -
Show More
@@ -1,118 +1,107 b''
1 1 """Blocking channels
2 2
3 3 Useful for test suites and blocking terminal interfaces.
4 4 """
5 5
6 6 # Copyright (c) IPython Development Team.
7 7 # Distributed under the terms of the Modified BSD License.
8 8
9 9 try:
10 10 from queue import Queue, Empty # Py 3
11 11 except ImportError:
12 12 from Queue import Queue, Empty # Py 2
13 13
14 14 from IPython.utils.py3compat import string_types, iteritems
15 15
16 16 # some utilities to validate message structure, these might get moved elsewhere
17 17 # if they prove to have more generic utility
18 18
19 def validate_string_list(lst):
20 """Validate that the input is a list of strings.
21
22 Raises ValueError if not."""
23 if not isinstance(lst, list):
24 raise ValueError('input %r must be a list' % lst)
25 for x in lst:
26 if not isinstance(x, string_types):
27 raise ValueError('element %r in list must be a string' % x)
28
29
30 19 def validate_string_dict(dct):
31 20 """Validate that the input is a dict with string keys and values.
32 21
33 22 Raises ValueError if not."""
34 23 for k,v in iteritems(dct):
35 24 if not isinstance(k, string_types):
36 25 raise ValueError('key %r in dict must be a string' % k)
37 26 if not isinstance(v, string_types):
38 27 raise ValueError('value %r in dict must be a string' % v)
39 28
40 29
41 30 class ZMQSocketChannel(object):
42 31 """A ZMQ socket in a simple blocking API"""
43 32 session = None
44 33 socket = None
45 34 stream = None
46 35 _exiting = False
47 36 proxy_methods = []
48 37
49 38 def __init__(self, socket, session, loop=None):
50 39 """Create a channel.
51 40
52 41 Parameters
53 42 ----------
54 43 socket : :class:`zmq.Socket`
55 44 The ZMQ socket to use.
56 45 session : :class:`session.Session`
57 46 The session to use.
58 47 loop
59 48 Unused here, for other implementations
60 49 """
61 50 super(ZMQSocketChannel, self).__init__()
62 51
63 52 self.socket = socket
64 53 self.session = session
65 54
66 55 def _recv(self, **kwargs):
67 56 msg = self.socket.recv_multipart(**kwargs)
68 57 ident,smsg = self.session.feed_identities(msg)
69 58 return self.session.deserialize(smsg)
70 59
71 60 def get_msg(self, block=True, timeout=None):
72 61 """ Gets a message if there is one that is ready. """
73 62 if block:
74 63 if timeout is not None:
75 64 timeout *= 1000 # seconds to ms
76 65 ready = self.socket.poll(timeout)
77 66 else:
78 67 ready = self.socket.poll(timeout=0)
79 68
80 69 if ready:
81 70 return self._recv()
82 71 else:
83 72 raise Empty
84 73
85 74 def get_msgs(self):
86 75 """ Get all messages that are currently ready. """
87 76 msgs = []
88 77 while True:
89 78 try:
90 79 msgs.append(self.get_msg(block=False))
91 80 except Empty:
92 81 break
93 82 return msgs
94 83
95 84 def msg_ready(self):
96 85 """ Is there a message that has been received? """
97 86 return bool(self.socket.poll(timeout=0))
98 87
99 88 def close(self):
100 89 if self.socket is not None:
101 90 try:
102 91 self.socket.close(linger=0)
103 92 except Exception:
104 93 pass
105 94 self.socket = None
106 95 stop = close
107 96
108 97 def is_alive(self):
109 98 return (self.socket is not None)
110 99
111 100 def _queue_send(self, msg):
112 101 """Pass a message to the ZMQ socket to send
113 102 """
114 103 self.session.send(self.socket, msg)
115 104
116 105 def start(self):
117 106 pass
118 107
@@ -1,259 +1,248 b''
1 1 """Base classes to manage a Client's interaction with a running kernel"""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from __future__ import absolute_import
7 7
8 8 import atexit
9 9 import errno
10 10 from threading import Thread
11 11 import time
12 12
13 13 import zmq
14 14 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
15 15 # during garbage collection of threads at exit:
16 16 from zmq import ZMQError
17 17 from zmq.eventloop import ioloop, zmqstream
18 18
19 19 from IPython.core.release import kernel_protocol_version_info
20 20
21 21 from .channelsabc import (
22 22 ShellChannelABC, IOPubChannelABC,
23 23 HBChannelABC, StdInChannelABC,
24 24 )
25 25 from IPython.utils.py3compat import string_types, iteritems
26 26
27 27 #-----------------------------------------------------------------------------
28 28 # Constants and exceptions
29 29 #-----------------------------------------------------------------------------
30 30
31 31 major_protocol_version = kernel_protocol_version_info[0]
32 32
33 33 class InvalidPortNumber(Exception):
34 34 pass
35 35
36 36 #-----------------------------------------------------------------------------
37 37 # Utility functions
38 38 #-----------------------------------------------------------------------------
39 39
40 40 # some utilities to validate message structure, these might get moved elsewhere
41 41 # if they prove to have more generic utility
42 42
43 def validate_string_list(lst):
44 """Validate that the input is a list of strings.
45
46 Raises ValueError if not."""
47 if not isinstance(lst, list):
48 raise ValueError('input %r must be a list' % lst)
49 for x in lst:
50 if not isinstance(x, string_types):
51 raise ValueError('element %r in list must be a string' % x)
52
53
54 43 def validate_string_dict(dct):
55 44 """Validate that the input is a dict with string keys and values.
56 45
57 46 Raises ValueError if not."""
58 47 for k,v in iteritems(dct):
59 48 if not isinstance(k, string_types):
60 49 raise ValueError('key %r in dict must be a string' % k)
61 50 if not isinstance(v, string_types):
62 51 raise ValueError('value %r in dict must be a string' % v)
63 52
64 53
65 54 def make_shell_socket(context, identity, address):
66 55 socket = context.socket(zmq.DEALER)
67 56 socket.linger = 1000
68 57 socket.setsockopt(zmq.IDENTITY, identity)
69 58 socket.connect(address)
70 59 return socket
71 60
72 61 def make_iopub_socket(context, identity, address):
73 62 socket = context.socket(zmq.SUB)
74 63 socket.linger = 1000
75 64 socket.setsockopt(zmq.SUBSCRIBE,b'')
76 65 socket.setsockopt(zmq.IDENTITY, identity)
77 66 socket.connect(address)
78 67 return socket
79 68
80 69 def make_stdin_socket(context, identity, address):
81 70 socket = context.socket(zmq.DEALER)
82 71 socket.linger = 1000
83 72 socket.setsockopt(zmq.IDENTITY, identity)
84 73 socket.connect(address)
85 74 return socket
86 75
87 76 class HBChannel(Thread):
88 77 """The heartbeat channel which monitors the kernel heartbeat.
89 78
90 79 Note that the heartbeat channel is paused by default. As long as you start
91 80 this channel, the kernel manager will ensure that it is paused and un-paused
92 81 as appropriate.
93 82 """
94 83 context = None
95 84 session = None
96 85 socket = None
97 86 address = None
98 87 _exiting = False
99 88
100 89 time_to_dead = 1.
101 90 poller = None
102 91 _running = None
103 92 _pause = None
104 93 _beating = None
105 94
106 95 def __init__(self, context, session, address):
107 96 """Create the heartbeat monitor thread.
108 97
109 98 Parameters
110 99 ----------
111 100 context : :class:`zmq.Context`
112 101 The ZMQ context to use.
113 102 session : :class:`session.Session`
114 103 The session to use.
115 104 address : zmq url
116 105 Standard (ip, port) tuple that the kernel is listening on.
117 106 """
118 107 super(HBChannel, self).__init__()
119 108 self.daemon = True
120 109
121 110 self.context = context
122 111 self.session = session
123 112 if isinstance(address, tuple):
124 113 if address[1] == 0:
125 114 message = 'The port number for a channel cannot be 0.'
126 115 raise InvalidPortNumber(message)
127 116 address = "tcp://%s:%i" % address
128 117 self.address = address
129 118 atexit.register(self._notice_exit)
130 119
131 120 self._running = False
132 121 self._pause = True
133 122 self.poller = zmq.Poller()
134 123
135 124 def _notice_exit(self):
136 125 self._exiting = True
137 126
138 127 def _create_socket(self):
139 128 if self.socket is not None:
140 129 # close previous socket, before opening a new one
141 130 self.poller.unregister(self.socket)
142 131 self.socket.close()
143 132 self.socket = self.context.socket(zmq.REQ)
144 133 self.socket.linger = 1000
145 134 self.socket.connect(self.address)
146 135
147 136 self.poller.register(self.socket, zmq.POLLIN)
148 137
149 138 def _poll(self, start_time):
150 139 """poll for heartbeat replies until we reach self.time_to_dead.
151 140
152 141 Ignores interrupts, and returns the result of poll(), which
153 142 will be an empty list if no messages arrived before the timeout,
154 143 or the event tuple if there is a message to receive.
155 144 """
156 145
157 146 until_dead = self.time_to_dead - (time.time() - start_time)
158 147 # ensure poll at least once
159 148 until_dead = max(until_dead, 1e-3)
160 149 events = []
161 150 while True:
162 151 try:
163 152 events = self.poller.poll(1000 * until_dead)
164 153 except ZMQError as e:
165 154 if e.errno == errno.EINTR:
166 155 # ignore interrupts during heartbeat
167 156 # this may never actually happen
168 157 until_dead = self.time_to_dead - (time.time() - start_time)
169 158 until_dead = max(until_dead, 1e-3)
170 159 pass
171 160 else:
172 161 raise
173 162 except Exception:
174 163 if self._exiting:
175 164 break
176 165 else:
177 166 raise
178 167 else:
179 168 break
180 169 return events
181 170
182 171 def run(self):
183 172 """The thread's main activity. Call start() instead."""
184 173 self._create_socket()
185 174 self._running = True
186 175 self._beating = True
187 176
188 177 while self._running:
189 178 if self._pause:
190 179 # just sleep, and skip the rest of the loop
191 180 time.sleep(self.time_to_dead)
192 181 continue
193 182
194 183 since_last_heartbeat = 0.0
195 184 # io.rprint('Ping from HB channel') # dbg
196 185 # no need to catch EFSM here, because the previous event was
197 186 # either a recv or connect, which cannot be followed by EFSM
198 187 self.socket.send(b'ping')
199 188 request_time = time.time()
200 189 ready = self._poll(request_time)
201 190 if ready:
202 191 self._beating = True
203 192 # the poll above guarantees we have something to recv
204 193 self.socket.recv()
205 194 # sleep the remainder of the cycle
206 195 remainder = self.time_to_dead - (time.time() - request_time)
207 196 if remainder > 0:
208 197 time.sleep(remainder)
209 198 continue
210 199 else:
211 200 # nothing was received within the time limit, signal heart failure
212 201 self._beating = False
213 202 since_last_heartbeat = time.time() - request_time
214 203 self.call_handlers(since_last_heartbeat)
215 204 # and close/reopen the socket, because the REQ/REP cycle has been broken
216 205 self._create_socket()
217 206 continue
218 207
219 208 def pause(self):
220 209 """Pause the heartbeat."""
221 210 self._pause = True
222 211
223 212 def unpause(self):
224 213 """Unpause the heartbeat."""
225 214 self._pause = False
226 215
227 216 def is_beating(self):
228 217 """Is the heartbeat running and responsive (and not paused)."""
229 218 if self.is_alive() and not self._pause and self._beating:
230 219 return True
231 220 else:
232 221 return False
233 222
234 223 def stop(self):
235 224 """Stop the channel's event loop and join its thread."""
236 225 self._running = False
237 226 self.join()
238 227 self.close()
239 228
240 229 def close(self):
241 230 if self.socket is not None:
242 231 try:
243 232 self.socket.close(linger=0)
244 233 except Exception:
245 234 pass
246 235 self.socket = None
247 236
248 237 def call_handlers(self, since_last_heartbeat):
249 238 """This method is called in the ioloop thread when a message arrives.
250 239
251 240 Subclasses should override this method to handle incoming messages.
252 241 It is important to remember that this method is called in the thread
253 242 so that some logic must be done to ensure that the application level
254 243 handlers are called in the application thread.
255 244 """
256 245 pass
257 246
258 247
259 248 HBChannelABC.register(HBChannel)
General Comments 0
You need to be logged in to leave comments. Login now