##// END OF EJS Templates
Remove unused validate_string_list function
Thomas Kluyver -
Show More
@@ -1,118 +1,107
1 """Blocking channels
1 """Blocking channels
2
2
3 Useful for test suites and blocking terminal interfaces.
3 Useful for test suites and blocking terminal interfaces.
4 """
4 """
5
5
6 # Copyright (c) IPython Development Team.
6 # Copyright (c) IPython Development Team.
7 # Distributed under the terms of the Modified BSD License.
7 # Distributed under the terms of the Modified BSD License.
8
8
9 try:
9 try:
10 from queue import Queue, Empty # Py 3
10 from queue import Queue, Empty # Py 3
11 except ImportError:
11 except ImportError:
12 from Queue import Queue, Empty # Py 2
12 from Queue import Queue, Empty # Py 2
13
13
14 from IPython.utils.py3compat import string_types, iteritems
14 from IPython.utils.py3compat import string_types, iteritems
15
15
16 # some utilities to validate message structure, these might get moved elsewhere
16 # some utilities to validate message structure, these might get moved elsewhere
17 # if they prove to have more generic utility
17 # if they prove to have more generic utility
18
18
19 def validate_string_list(lst):
20 """Validate that the input is a list of strings.
21
22 Raises ValueError if not."""
23 if not isinstance(lst, list):
24 raise ValueError('input %r must be a list' % lst)
25 for x in lst:
26 if not isinstance(x, string_types):
27 raise ValueError('element %r in list must be a string' % x)
28
29
30 def validate_string_dict(dct):
19 def validate_string_dict(dct):
31 """Validate that the input is a dict with string keys and values.
20 """Validate that the input is a dict with string keys and values.
32
21
33 Raises ValueError if not."""
22 Raises ValueError if not."""
34 for k,v in iteritems(dct):
23 for k,v in iteritems(dct):
35 if not isinstance(k, string_types):
24 if not isinstance(k, string_types):
36 raise ValueError('key %r in dict must be a string' % k)
25 raise ValueError('key %r in dict must be a string' % k)
37 if not isinstance(v, string_types):
26 if not isinstance(v, string_types):
38 raise ValueError('value %r in dict must be a string' % v)
27 raise ValueError('value %r in dict must be a string' % v)
39
28
40
29
41 class ZMQSocketChannel(object):
30 class ZMQSocketChannel(object):
42 """A ZMQ socket in a simple blocking API"""
31 """A ZMQ socket in a simple blocking API"""
43 session = None
32 session = None
44 socket = None
33 socket = None
45 stream = None
34 stream = None
46 _exiting = False
35 _exiting = False
47 proxy_methods = []
36 proxy_methods = []
48
37
49 def __init__(self, socket, session, loop=None):
38 def __init__(self, socket, session, loop=None):
50 """Create a channel.
39 """Create a channel.
51
40
52 Parameters
41 Parameters
53 ----------
42 ----------
54 socket : :class:`zmq.Socket`
43 socket : :class:`zmq.Socket`
55 The ZMQ socket to use.
44 The ZMQ socket to use.
56 session : :class:`session.Session`
45 session : :class:`session.Session`
57 The session to use.
46 The session to use.
58 loop
47 loop
59 Unused here, for other implementations
48 Unused here, for other implementations
60 """
49 """
61 super(ZMQSocketChannel, self).__init__()
50 super(ZMQSocketChannel, self).__init__()
62
51
63 self.socket = socket
52 self.socket = socket
64 self.session = session
53 self.session = session
65
54
66 def _recv(self, **kwargs):
55 def _recv(self, **kwargs):
67 msg = self.socket.recv_multipart(**kwargs)
56 msg = self.socket.recv_multipart(**kwargs)
68 ident,smsg = self.session.feed_identities(msg)
57 ident,smsg = self.session.feed_identities(msg)
69 return self.session.deserialize(smsg)
58 return self.session.deserialize(smsg)
70
59
71 def get_msg(self, block=True, timeout=None):
60 def get_msg(self, block=True, timeout=None):
72 """ Gets a message if there is one that is ready. """
61 """ Gets a message if there is one that is ready. """
73 if block:
62 if block:
74 if timeout is not None:
63 if timeout is not None:
75 timeout *= 1000 # seconds to ms
64 timeout *= 1000 # seconds to ms
76 ready = self.socket.poll(timeout)
65 ready = self.socket.poll(timeout)
77 else:
66 else:
78 ready = self.socket.poll(timeout=0)
67 ready = self.socket.poll(timeout=0)
79
68
80 if ready:
69 if ready:
81 return self._recv()
70 return self._recv()
82 else:
71 else:
83 raise Empty
72 raise Empty
84
73
85 def get_msgs(self):
74 def get_msgs(self):
86 """ Get all messages that are currently ready. """
75 """ Get all messages that are currently ready. """
87 msgs = []
76 msgs = []
88 while True:
77 while True:
89 try:
78 try:
90 msgs.append(self.get_msg(block=False))
79 msgs.append(self.get_msg(block=False))
91 except Empty:
80 except Empty:
92 break
81 break
93 return msgs
82 return msgs
94
83
95 def msg_ready(self):
84 def msg_ready(self):
96 """ Is there a message that has been received? """
85 """ Is there a message that has been received? """
97 return bool(self.socket.poll(timeout=0))
86 return bool(self.socket.poll(timeout=0))
98
87
99 def close(self):
88 def close(self):
100 if self.socket is not None:
89 if self.socket is not None:
101 try:
90 try:
102 self.socket.close(linger=0)
91 self.socket.close(linger=0)
103 except Exception:
92 except Exception:
104 pass
93 pass
105 self.socket = None
94 self.socket = None
106 stop = close
95 stop = close
107
96
108 def is_alive(self):
97 def is_alive(self):
109 return (self.socket is not None)
98 return (self.socket is not None)
110
99
111 def _queue_send(self, msg):
100 def _queue_send(self, msg):
112 """Pass a message to the ZMQ socket to send
101 """Pass a message to the ZMQ socket to send
113 """
102 """
114 self.session.send(self.socket, msg)
103 self.session.send(self.socket, msg)
115
104
116 def start(self):
105 def start(self):
117 pass
106 pass
118
107
@@ -1,259 +1,248
1 """Base classes to manage a Client's interaction with a running kernel"""
1 """Base classes to manage a Client's interaction with a running kernel"""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from __future__ import absolute_import
6 from __future__ import absolute_import
7
7
8 import atexit
8 import atexit
9 import errno
9 import errno
10 from threading import Thread
10 from threading import Thread
11 import time
11 import time
12
12
13 import zmq
13 import zmq
14 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
14 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
15 # during garbage collection of threads at exit:
15 # during garbage collection of threads at exit:
16 from zmq import ZMQError
16 from zmq import ZMQError
17 from zmq.eventloop import ioloop, zmqstream
17 from zmq.eventloop import ioloop, zmqstream
18
18
19 from IPython.core.release import kernel_protocol_version_info
19 from IPython.core.release import kernel_protocol_version_info
20
20
21 from .channelsabc import (
21 from .channelsabc import (
22 ShellChannelABC, IOPubChannelABC,
22 ShellChannelABC, IOPubChannelABC,
23 HBChannelABC, StdInChannelABC,
23 HBChannelABC, StdInChannelABC,
24 )
24 )
25 from IPython.utils.py3compat import string_types, iteritems
25 from IPython.utils.py3compat import string_types, iteritems
26
26
27 #-----------------------------------------------------------------------------
27 #-----------------------------------------------------------------------------
28 # Constants and exceptions
28 # Constants and exceptions
29 #-----------------------------------------------------------------------------
29 #-----------------------------------------------------------------------------
30
30
31 major_protocol_version = kernel_protocol_version_info[0]
31 major_protocol_version = kernel_protocol_version_info[0]
32
32
33 class InvalidPortNumber(Exception):
33 class InvalidPortNumber(Exception):
34 pass
34 pass
35
35
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37 # Utility functions
37 # Utility functions
38 #-----------------------------------------------------------------------------
38 #-----------------------------------------------------------------------------
39
39
40 # some utilities to validate message structure, these might get moved elsewhere
40 # some utilities to validate message structure, these might get moved elsewhere
41 # if they prove to have more generic utility
41 # if they prove to have more generic utility
42
42
43 def validate_string_list(lst):
44 """Validate that the input is a list of strings.
45
46 Raises ValueError if not."""
47 if not isinstance(lst, list):
48 raise ValueError('input %r must be a list' % lst)
49 for x in lst:
50 if not isinstance(x, string_types):
51 raise ValueError('element %r in list must be a string' % x)
52
53
54 def validate_string_dict(dct):
43 def validate_string_dict(dct):
55 """Validate that the input is a dict with string keys and values.
44 """Validate that the input is a dict with string keys and values.
56
45
57 Raises ValueError if not."""
46 Raises ValueError if not."""
58 for k,v in iteritems(dct):
47 for k,v in iteritems(dct):
59 if not isinstance(k, string_types):
48 if not isinstance(k, string_types):
60 raise ValueError('key %r in dict must be a string' % k)
49 raise ValueError('key %r in dict must be a string' % k)
61 if not isinstance(v, string_types):
50 if not isinstance(v, string_types):
62 raise ValueError('value %r in dict must be a string' % v)
51 raise ValueError('value %r in dict must be a string' % v)
63
52
64
53
65 def make_shell_socket(context, identity, address):
54 def make_shell_socket(context, identity, address):
66 socket = context.socket(zmq.DEALER)
55 socket = context.socket(zmq.DEALER)
67 socket.linger = 1000
56 socket.linger = 1000
68 socket.setsockopt(zmq.IDENTITY, identity)
57 socket.setsockopt(zmq.IDENTITY, identity)
69 socket.connect(address)
58 socket.connect(address)
70 return socket
59 return socket
71
60
72 def make_iopub_socket(context, identity, address):
61 def make_iopub_socket(context, identity, address):
73 socket = context.socket(zmq.SUB)
62 socket = context.socket(zmq.SUB)
74 socket.linger = 1000
63 socket.linger = 1000
75 socket.setsockopt(zmq.SUBSCRIBE,b'')
64 socket.setsockopt(zmq.SUBSCRIBE,b'')
76 socket.setsockopt(zmq.IDENTITY, identity)
65 socket.setsockopt(zmq.IDENTITY, identity)
77 socket.connect(address)
66 socket.connect(address)
78 return socket
67 return socket
79
68
80 def make_stdin_socket(context, identity, address):
69 def make_stdin_socket(context, identity, address):
81 socket = context.socket(zmq.DEALER)
70 socket = context.socket(zmq.DEALER)
82 socket.linger = 1000
71 socket.linger = 1000
83 socket.setsockopt(zmq.IDENTITY, identity)
72 socket.setsockopt(zmq.IDENTITY, identity)
84 socket.connect(address)
73 socket.connect(address)
85 return socket
74 return socket
86
75
87 class HBChannel(Thread):
76 class HBChannel(Thread):
88 """The heartbeat channel which monitors the kernel heartbeat.
77 """The heartbeat channel which monitors the kernel heartbeat.
89
78
90 Note that the heartbeat channel is paused by default. As long as you start
79 Note that the heartbeat channel is paused by default. As long as you start
91 this channel, the kernel manager will ensure that it is paused and un-paused
80 this channel, the kernel manager will ensure that it is paused and un-paused
92 as appropriate.
81 as appropriate.
93 """
82 """
94 context = None
83 context = None
95 session = None
84 session = None
96 socket = None
85 socket = None
97 address = None
86 address = None
98 _exiting = False
87 _exiting = False
99
88
100 time_to_dead = 1.
89 time_to_dead = 1.
101 poller = None
90 poller = None
102 _running = None
91 _running = None
103 _pause = None
92 _pause = None
104 _beating = None
93 _beating = None
105
94
106 def __init__(self, context, session, address):
95 def __init__(self, context, session, address):
107 """Create the heartbeat monitor thread.
96 """Create the heartbeat monitor thread.
108
97
109 Parameters
98 Parameters
110 ----------
99 ----------
111 context : :class:`zmq.Context`
100 context : :class:`zmq.Context`
112 The ZMQ context to use.
101 The ZMQ context to use.
113 session : :class:`session.Session`
102 session : :class:`session.Session`
114 The session to use.
103 The session to use.
115 address : zmq url
104 address : zmq url
116 Standard (ip, port) tuple that the kernel is listening on.
105 Standard (ip, port) tuple that the kernel is listening on.
117 """
106 """
118 super(HBChannel, self).__init__()
107 super(HBChannel, self).__init__()
119 self.daemon = True
108 self.daemon = True
120
109
121 self.context = context
110 self.context = context
122 self.session = session
111 self.session = session
123 if isinstance(address, tuple):
112 if isinstance(address, tuple):
124 if address[1] == 0:
113 if address[1] == 0:
125 message = 'The port number for a channel cannot be 0.'
114 message = 'The port number for a channel cannot be 0.'
126 raise InvalidPortNumber(message)
115 raise InvalidPortNumber(message)
127 address = "tcp://%s:%i" % address
116 address = "tcp://%s:%i" % address
128 self.address = address
117 self.address = address
129 atexit.register(self._notice_exit)
118 atexit.register(self._notice_exit)
130
119
131 self._running = False
120 self._running = False
132 self._pause = True
121 self._pause = True
133 self.poller = zmq.Poller()
122 self.poller = zmq.Poller()
134
123
135 def _notice_exit(self):
124 def _notice_exit(self):
136 self._exiting = True
125 self._exiting = True
137
126
138 def _create_socket(self):
127 def _create_socket(self):
139 if self.socket is not None:
128 if self.socket is not None:
140 # close previous socket, before opening a new one
129 # close previous socket, before opening a new one
141 self.poller.unregister(self.socket)
130 self.poller.unregister(self.socket)
142 self.socket.close()
131 self.socket.close()
143 self.socket = self.context.socket(zmq.REQ)
132 self.socket = self.context.socket(zmq.REQ)
144 self.socket.linger = 1000
133 self.socket.linger = 1000
145 self.socket.connect(self.address)
134 self.socket.connect(self.address)
146
135
147 self.poller.register(self.socket, zmq.POLLIN)
136 self.poller.register(self.socket, zmq.POLLIN)
148
137
149 def _poll(self, start_time):
138 def _poll(self, start_time):
150 """poll for heartbeat replies until we reach self.time_to_dead.
139 """poll for heartbeat replies until we reach self.time_to_dead.
151
140
152 Ignores interrupts, and returns the result of poll(), which
141 Ignores interrupts, and returns the result of poll(), which
153 will be an empty list if no messages arrived before the timeout,
142 will be an empty list if no messages arrived before the timeout,
154 or the event tuple if there is a message to receive.
143 or the event tuple if there is a message to receive.
155 """
144 """
156
145
157 until_dead = self.time_to_dead - (time.time() - start_time)
146 until_dead = self.time_to_dead - (time.time() - start_time)
158 # ensure poll at least once
147 # ensure poll at least once
159 until_dead = max(until_dead, 1e-3)
148 until_dead = max(until_dead, 1e-3)
160 events = []
149 events = []
161 while True:
150 while True:
162 try:
151 try:
163 events = self.poller.poll(1000 * until_dead)
152 events = self.poller.poll(1000 * until_dead)
164 except ZMQError as e:
153 except ZMQError as e:
165 if e.errno == errno.EINTR:
154 if e.errno == errno.EINTR:
166 # ignore interrupts during heartbeat
155 # ignore interrupts during heartbeat
167 # this may never actually happen
156 # this may never actually happen
168 until_dead = self.time_to_dead - (time.time() - start_time)
157 until_dead = self.time_to_dead - (time.time() - start_time)
169 until_dead = max(until_dead, 1e-3)
158 until_dead = max(until_dead, 1e-3)
170 pass
159 pass
171 else:
160 else:
172 raise
161 raise
173 except Exception:
162 except Exception:
174 if self._exiting:
163 if self._exiting:
175 break
164 break
176 else:
165 else:
177 raise
166 raise
178 else:
167 else:
179 break
168 break
180 return events
169 return events
181
170
182 def run(self):
171 def run(self):
183 """The thread's main activity. Call start() instead."""
172 """The thread's main activity. Call start() instead."""
184 self._create_socket()
173 self._create_socket()
185 self._running = True
174 self._running = True
186 self._beating = True
175 self._beating = True
187
176
188 while self._running:
177 while self._running:
189 if self._pause:
178 if self._pause:
190 # just sleep, and skip the rest of the loop
179 # just sleep, and skip the rest of the loop
191 time.sleep(self.time_to_dead)
180 time.sleep(self.time_to_dead)
192 continue
181 continue
193
182
194 since_last_heartbeat = 0.0
183 since_last_heartbeat = 0.0
195 # io.rprint('Ping from HB channel') # dbg
184 # io.rprint('Ping from HB channel') # dbg
196 # no need to catch EFSM here, because the previous event was
185 # no need to catch EFSM here, because the previous event was
197 # either a recv or connect, which cannot be followed by EFSM
186 # either a recv or connect, which cannot be followed by EFSM
198 self.socket.send(b'ping')
187 self.socket.send(b'ping')
199 request_time = time.time()
188 request_time = time.time()
200 ready = self._poll(request_time)
189 ready = self._poll(request_time)
201 if ready:
190 if ready:
202 self._beating = True
191 self._beating = True
203 # the poll above guarantees we have something to recv
192 # the poll above guarantees we have something to recv
204 self.socket.recv()
193 self.socket.recv()
205 # sleep the remainder of the cycle
194 # sleep the remainder of the cycle
206 remainder = self.time_to_dead - (time.time() - request_time)
195 remainder = self.time_to_dead - (time.time() - request_time)
207 if remainder > 0:
196 if remainder > 0:
208 time.sleep(remainder)
197 time.sleep(remainder)
209 continue
198 continue
210 else:
199 else:
211 # nothing was received within the time limit, signal heart failure
200 # nothing was received within the time limit, signal heart failure
212 self._beating = False
201 self._beating = False
213 since_last_heartbeat = time.time() - request_time
202 since_last_heartbeat = time.time() - request_time
214 self.call_handlers(since_last_heartbeat)
203 self.call_handlers(since_last_heartbeat)
215 # and close/reopen the socket, because the REQ/REP cycle has been broken
204 # and close/reopen the socket, because the REQ/REP cycle has been broken
216 self._create_socket()
205 self._create_socket()
217 continue
206 continue
218
207
219 def pause(self):
208 def pause(self):
220 """Pause the heartbeat."""
209 """Pause the heartbeat."""
221 self._pause = True
210 self._pause = True
222
211
223 def unpause(self):
212 def unpause(self):
224 """Unpause the heartbeat."""
213 """Unpause the heartbeat."""
225 self._pause = False
214 self._pause = False
226
215
227 def is_beating(self):
216 def is_beating(self):
228 """Is the heartbeat running and responsive (and not paused)."""
217 """Is the heartbeat running and responsive (and not paused)."""
229 if self.is_alive() and not self._pause and self._beating:
218 if self.is_alive() and not self._pause and self._beating:
230 return True
219 return True
231 else:
220 else:
232 return False
221 return False
233
222
234 def stop(self):
223 def stop(self):
235 """Stop the channel's event loop and join its thread."""
224 """Stop the channel's event loop and join its thread."""
236 self._running = False
225 self._running = False
237 self.join()
226 self.join()
238 self.close()
227 self.close()
239
228
240 def close(self):
229 def close(self):
241 if self.socket is not None:
230 if self.socket is not None:
242 try:
231 try:
243 self.socket.close(linger=0)
232 self.socket.close(linger=0)
244 except Exception:
233 except Exception:
245 pass
234 pass
246 self.socket = None
235 self.socket = None
247
236
248 def call_handlers(self, since_last_heartbeat):
237 def call_handlers(self, since_last_heartbeat):
249 """This method is called in the ioloop thread when a message arrives.
238 """This method is called in the ioloop thread when a message arrives.
250
239
251 Subclasses should override this method to handle incoming messages.
240 Subclasses should override this method to handle incoming messages.
252 It is important to remember that this method is called in the thread
241 It is important to remember that this method is called in the thread
253 so that some logic must be done to ensure that the application level
242 so that some logic must be done to ensure that the application level
254 handlers are called in the application thread.
243 handlers are called in the application thread.
255 """
244 """
256 pass
245 pass
257
246
258
247
259 HBChannelABC.register(HBChannel)
248 HBChannelABC.register(HBChannel)
General Comments 0
You need to be logged in to leave comments. Login now