##// END OF EJS Templates
Remove unused copies of validate_string_dict
Thomas Kluyver -
Show More
@@ -1,107 +1,92
1 1 """Blocking channels
2 2
3 3 Useful for test suites and blocking terminal interfaces.
4 4 """
5 5
6 6 # Copyright (c) IPython Development Team.
7 7 # Distributed under the terms of the Modified BSD License.
8 8
9 9 try:
10 10 from queue import Queue, Empty # Py 3
11 11 except ImportError:
12 12 from Queue import Queue, Empty # Py 2
13 13
14 from IPython.utils.py3compat import string_types, iteritems
15
16 # some utilities to validate message structure, these might get moved elsewhere
17 # if they prove to have more generic utility
18
19 def validate_string_dict(dct):
20 """Validate that the input is a dict with string keys and values.
21
22 Raises ValueError if not."""
23 for k,v in iteritems(dct):
24 if not isinstance(k, string_types):
25 raise ValueError('key %r in dict must be a string' % k)
26 if not isinstance(v, string_types):
27 raise ValueError('value %r in dict must be a string' % v)
28
29 14
30 15 class ZMQSocketChannel(object):
31 16 """A ZMQ socket in a simple blocking API"""
32 17 session = None
33 18 socket = None
34 19 stream = None
35 20 _exiting = False
36 21 proxy_methods = []
37 22
38 23 def __init__(self, socket, session, loop=None):
39 24 """Create a channel.
40 25
41 26 Parameters
42 27 ----------
43 28 socket : :class:`zmq.Socket`
44 29 The ZMQ socket to use.
45 30 session : :class:`session.Session`
46 31 The session to use.
47 32 loop
48 33 Unused here, for other implementations
49 34 """
50 35 super(ZMQSocketChannel, self).__init__()
51 36
52 37 self.socket = socket
53 38 self.session = session
54 39
55 40 def _recv(self, **kwargs):
56 41 msg = self.socket.recv_multipart(**kwargs)
57 42 ident,smsg = self.session.feed_identities(msg)
58 43 return self.session.deserialize(smsg)
59 44
60 45 def get_msg(self, block=True, timeout=None):
61 46 """ Gets a message if there is one that is ready. """
62 47 if block:
63 48 if timeout is not None:
64 49 timeout *= 1000 # seconds to ms
65 50 ready = self.socket.poll(timeout)
66 51 else:
67 52 ready = self.socket.poll(timeout=0)
68 53
69 54 if ready:
70 55 return self._recv()
71 56 else:
72 57 raise Empty
73 58
74 59 def get_msgs(self):
75 60 """ Get all messages that are currently ready. """
76 61 msgs = []
77 62 while True:
78 63 try:
79 64 msgs.append(self.get_msg(block=False))
80 65 except Empty:
81 66 break
82 67 return msgs
83 68
84 69 def msg_ready(self):
85 70 """ Is there a message that has been received? """
86 71 return bool(self.socket.poll(timeout=0))
87 72
88 73 def close(self):
89 74 if self.socket is not None:
90 75 try:
91 76 self.socket.close(linger=0)
92 77 except Exception:
93 78 pass
94 79 self.socket = None
95 80 stop = close
96 81
97 82 def is_alive(self):
98 83 return (self.socket is not None)
99 84
100 85 def _queue_send(self, msg):
101 86 """Pass a message to the ZMQ socket to send
102 87 """
103 88 self.session.send(self.socket, msg)
104 89
105 90 def start(self):
106 91 pass
107 92
@@ -1,245 +1,226
1 1 """Base classes to manage a Client's interaction with a running kernel"""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from __future__ import absolute_import
7 7
8 8 import atexit
9 9 import errno
10 10 from threading import Thread
11 11 import time
12 12
13 13 import zmq
14 14 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
15 15 # during garbage collection of threads at exit:
16 16 from zmq import ZMQError
17 from zmq.eventloop import ioloop, zmqstream
18 17
19 18 from IPython.core.release import kernel_protocol_version_info
20 19
21 20 from .channelsabc import HBChannelABC
22 from IPython.utils.py3compat import string_types, iteritems
23 21
24 22 #-----------------------------------------------------------------------------
25 23 # Constants and exceptions
26 24 #-----------------------------------------------------------------------------
27 25
28 26 major_protocol_version = kernel_protocol_version_info[0]
29 27
30 28 class InvalidPortNumber(Exception):
31 29 pass
32 30
33 #-----------------------------------------------------------------------------
34 # Utility functions
35 #-----------------------------------------------------------------------------
36
37 # some utilities to validate message structure, these might get moved elsewhere
38 # if they prove to have more generic utility
39
40 def validate_string_dict(dct):
41 """Validate that the input is a dict with string keys and values.
42
43 Raises ValueError if not."""
44 for k,v in iteritems(dct):
45 if not isinstance(k, string_types):
46 raise ValueError('key %r in dict must be a string' % k)
47 if not isinstance(v, string_types):
48 raise ValueError('value %r in dict must be a string' % v)
49
50 31
51 32 def make_shell_socket(context, identity, address):
52 33 socket = context.socket(zmq.DEALER)
53 34 socket.linger = 1000
54 35 socket.setsockopt(zmq.IDENTITY, identity)
55 36 socket.connect(address)
56 37 return socket
57 38
58 39 def make_iopub_socket(context, identity, address):
59 40 socket = context.socket(zmq.SUB)
60 41 socket.linger = 1000
61 42 socket.setsockopt(zmq.SUBSCRIBE,b'')
62 43 socket.setsockopt(zmq.IDENTITY, identity)
63 44 socket.connect(address)
64 45 return socket
65 46
66 47 def make_stdin_socket(context, identity, address):
67 48 socket = context.socket(zmq.DEALER)
68 49 socket.linger = 1000
69 50 socket.setsockopt(zmq.IDENTITY, identity)
70 51 socket.connect(address)
71 52 return socket
72 53
73 54 class HBChannel(Thread):
74 55 """The heartbeat channel which monitors the kernel heartbeat.
75 56
76 57 Note that the heartbeat channel is paused by default. As long as you start
77 58 this channel, the kernel manager will ensure that it is paused and un-paused
78 59 as appropriate.
79 60 """
80 61 context = None
81 62 session = None
82 63 socket = None
83 64 address = None
84 65 _exiting = False
85 66
86 67 time_to_dead = 1.
87 68 poller = None
88 69 _running = None
89 70 _pause = None
90 71 _beating = None
91 72
92 73 def __init__(self, context, session, address):
93 74 """Create the heartbeat monitor thread.
94 75
95 76 Parameters
96 77 ----------
97 78 context : :class:`zmq.Context`
98 79 The ZMQ context to use.
99 80 session : :class:`session.Session`
100 81 The session to use.
101 82 address : zmq url
102 83 Standard (ip, port) tuple that the kernel is listening on.
103 84 """
104 85 super(HBChannel, self).__init__()
105 86 self.daemon = True
106 87
107 88 self.context = context
108 89 self.session = session
109 90 if isinstance(address, tuple):
110 91 if address[1] == 0:
111 92 message = 'The port number for a channel cannot be 0.'
112 93 raise InvalidPortNumber(message)
113 94 address = "tcp://%s:%i" % address
114 95 self.address = address
115 96 atexit.register(self._notice_exit)
116 97
117 98 self._running = False
118 99 self._pause = True
119 100 self.poller = zmq.Poller()
120 101
121 102 def _notice_exit(self):
122 103 self._exiting = True
123 104
124 105 def _create_socket(self):
125 106 if self.socket is not None:
126 107 # close previous socket, before opening a new one
127 108 self.poller.unregister(self.socket)
128 109 self.socket.close()
129 110 self.socket = self.context.socket(zmq.REQ)
130 111 self.socket.linger = 1000
131 112 self.socket.connect(self.address)
132 113
133 114 self.poller.register(self.socket, zmq.POLLIN)
134 115
135 116 def _poll(self, start_time):
136 117 """poll for heartbeat replies until we reach self.time_to_dead.
137 118
138 119 Ignores interrupts, and returns the result of poll(), which
139 120 will be an empty list if no messages arrived before the timeout,
140 121 or the event tuple if there is a message to receive.
141 122 """
142 123
143 124 until_dead = self.time_to_dead - (time.time() - start_time)
144 125 # ensure poll at least once
145 126 until_dead = max(until_dead, 1e-3)
146 127 events = []
147 128 while True:
148 129 try:
149 130 events = self.poller.poll(1000 * until_dead)
150 131 except ZMQError as e:
151 132 if e.errno == errno.EINTR:
152 133 # ignore interrupts during heartbeat
153 134 # this may never actually happen
154 135 until_dead = self.time_to_dead - (time.time() - start_time)
155 136 until_dead = max(until_dead, 1e-3)
156 137 pass
157 138 else:
158 139 raise
159 140 except Exception:
160 141 if self._exiting:
161 142 break
162 143 else:
163 144 raise
164 145 else:
165 146 break
166 147 return events
167 148
168 149 def run(self):
169 150 """The thread's main activity. Call start() instead."""
170 151 self._create_socket()
171 152 self._running = True
172 153 self._beating = True
173 154
174 155 while self._running:
175 156 if self._pause:
176 157 # just sleep, and skip the rest of the loop
177 158 time.sleep(self.time_to_dead)
178 159 continue
179 160
180 161 since_last_heartbeat = 0.0
181 162 # io.rprint('Ping from HB channel') # dbg
182 163 # no need to catch EFSM here, because the previous event was
183 164 # either a recv or connect, which cannot be followed by EFSM
184 165 self.socket.send(b'ping')
185 166 request_time = time.time()
186 167 ready = self._poll(request_time)
187 168 if ready:
188 169 self._beating = True
189 170 # the poll above guarantees we have something to recv
190 171 self.socket.recv()
191 172 # sleep the remainder of the cycle
192 173 remainder = self.time_to_dead - (time.time() - request_time)
193 174 if remainder > 0:
194 175 time.sleep(remainder)
195 176 continue
196 177 else:
197 178 # nothing was received within the time limit, signal heart failure
198 179 self._beating = False
199 180 since_last_heartbeat = time.time() - request_time
200 181 self.call_handlers(since_last_heartbeat)
201 182 # and close/reopen the socket, because the REQ/REP cycle has been broken
202 183 self._create_socket()
203 184 continue
204 185
205 186 def pause(self):
206 187 """Pause the heartbeat."""
207 188 self._pause = True
208 189
209 190 def unpause(self):
210 191 """Unpause the heartbeat."""
211 192 self._pause = False
212 193
213 194 def is_beating(self):
214 195 """Is the heartbeat running and responsive (and not paused)."""
215 196 if self.is_alive() and not self._pause and self._beating:
216 197 return True
217 198 else:
218 199 return False
219 200
220 201 def stop(self):
221 202 """Stop the channel's event loop and join its thread."""
222 203 self._running = False
223 204 self.join()
224 205 self.close()
225 206
226 207 def close(self):
227 208 if self.socket is not None:
228 209 try:
229 210 self.socket.close(linger=0)
230 211 except Exception:
231 212 pass
232 213 self.socket = None
233 214
234 215 def call_handlers(self, since_last_heartbeat):
235 216 """This method is called in the ioloop thread when a message arrives.
236 217
237 218 Subclasses should override this method to handle incoming messages.
238 219 It is important to remember that this method is called in the thread
239 220 so that some logic must be done to ensure that the application level
240 221 handlers are called in the application thread.
241 222 """
242 223 pass
243 224
244 225
245 226 HBChannelABC.register(HBChannel)
@@ -1,375 +1,389
1 1 """Base class to manage the interaction with a running kernel"""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from __future__ import absolute_import
7 from IPython.kernel.channels import validate_string_dict, major_protocol_version
8 from IPython.utils.py3compat import string_types
7 from IPython.kernel.channels import major_protocol_version
8 from IPython.utils.py3compat import string_types, iteritems
9 9
10 10 import zmq
11 11
12 12 from IPython.utils.traitlets import (
13 13 Any, Instance, Type,
14 14 )
15 15
16 16 from .channelsabc import (ChannelABC, HBChannelABC)
17 17 from .channels import (
18 18 make_shell_socket, make_stdin_socket, make_iopub_socket
19 19 )
20 20 from .clientabc import KernelClientABC
21 21 from .connect import ConnectionFileMixin
22 22
23 23
24 # some utilities to validate message structure, these might get moved elsewhere
25 # if they prove to have more generic utility
26
27 def validate_string_dict(dct):
28 """Validate that the input is a dict with string keys and values.
29
30 Raises ValueError if not."""
31 for k,v in iteritems(dct):
32 if not isinstance(k, string_types):
33 raise ValueError('key %r in dict must be a string' % k)
34 if not isinstance(v, string_types):
35 raise ValueError('value %r in dict must be a string' % v)
36
37
24 38 class KernelClient(ConnectionFileMixin):
25 39 """Communicates with a single kernel on any host via zmq channels.
26 40
27 41 There are four channels associated with each kernel:
28 42
29 43 * shell: for request/reply calls to the kernel.
30 44 * iopub: for the kernel to publish results to frontends.
31 45 * hb: for monitoring the kernel's heartbeat.
32 46 * stdin: for frontends to reply to raw_input calls in the kernel.
33 47
34 48 The methods of the channels are exposed as methods of the client itself
35 49 (KernelClient.execute, complete, history, etc.).
36 50 See the channels themselves for documentation of these methods.
37 51
38 52 """
39 53
40 54 # The PyZMQ Context to use for communication with the kernel.
41 55 context = Instance(zmq.Context)
42 56 def _context_default(self):
43 57 return zmq.Context.instance()
44 58
45 59 # The classes to use for the various channels
46 60 shell_channel_class = Type(ChannelABC)
47 61 iopub_channel_class = Type(ChannelABC)
48 62 stdin_channel_class = Type(ChannelABC)
49 63 hb_channel_class = Type(HBChannelABC)
50 64
51 65 # Protected traits
52 66 _shell_channel = Any
53 67 _iopub_channel = Any
54 68 _stdin_channel = Any
55 69 _hb_channel = Any
56 70
57 71 # flag for whether execute requests should be allowed to call raw_input:
58 72 allow_stdin = True
59 73
60 74 #--------------------------------------------------------------------------
61 75 # Channel proxy methods
62 76 #--------------------------------------------------------------------------
63 77
64 78 def _get_msg(channel, *args, **kwargs):
65 79 return channel.get_msg(*args, **kwargs)
66 80
67 81 def get_shell_msg(self, *args, **kwargs):
68 82 """Get a message from the shell channel"""
69 83 return self.shell_channel.get_msg(*args, **kwargs)
70 84
71 85 def get_iopub_msg(self, *args, **kwargs):
72 86 """Get a message from the iopub channel"""
73 87 return self.iopub_channel.get_msg(*args, **kwargs)
74 88
75 89 def get_stdin_msg(self, *args, **kwargs):
76 90 """Get a message from the stdin channel"""
77 91 return self.stdin_channel.get_msg(*args, **kwargs)
78 92
79 93 #--------------------------------------------------------------------------
80 94 # Channel management methods
81 95 #--------------------------------------------------------------------------
82 96
83 97 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
84 98 """Starts the channels for this kernel.
85 99
86 100 This will create the channels if they do not exist and then start
87 101 them (their activity runs in a thread). If port numbers of 0 are
88 102 being used (random ports) then you must first call
89 103 :meth:`start_kernel`. If the channels have been stopped and you
90 104 call this, :class:`RuntimeError` will be raised.
91 105 """
92 106 if shell:
93 107 self.shell_channel.start()
94 108 self.kernel_info()
95 109 if iopub:
96 110 self.iopub_channel.start()
97 111 if stdin:
98 112 self.stdin_channel.start()
99 113 self.allow_stdin = True
100 114 else:
101 115 self.allow_stdin = False
102 116 if hb:
103 117 self.hb_channel.start()
104 118
105 119 def stop_channels(self):
106 120 """Stops all the running channels for this kernel.
107 121
108 122 This stops their event loops and joins their threads.
109 123 """
110 124 if self.shell_channel.is_alive():
111 125 self.shell_channel.stop()
112 126 if self.iopub_channel.is_alive():
113 127 self.iopub_channel.stop()
114 128 if self.stdin_channel.is_alive():
115 129 self.stdin_channel.stop()
116 130 if self.hb_channel.is_alive():
117 131 self.hb_channel.stop()
118 132
119 133 @property
120 134 def channels_running(self):
121 135 """Are any of the channels created and running?"""
122 136 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
123 137 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
124 138
125 139 ioloop = None # Overridden in subclasses that use pyzmq event loop
126 140
127 141 @property
128 142 def shell_channel(self):
129 143 """Get the shell channel object for this kernel."""
130 144 if self._shell_channel is None:
131 145 url = self._make_url('shell')
132 146 self.log.debug("connecting shell channel to %s", url)
133 147 socket = make_shell_socket(self.context, self.session.bsession, url)
134 148 self._shell_channel = self.shell_channel_class(
135 149 socket, self.session, self.ioloop
136 150 )
137 151 return self._shell_channel
138 152
139 153 @property
140 154 def iopub_channel(self):
141 155 """Get the iopub channel object for this kernel."""
142 156 if self._iopub_channel is None:
143 157 url = self._make_url('iopub')
144 158 self.log.debug("connecting iopub channel to %s", url)
145 159 socket = make_iopub_socket(self.context, self.session.bsession, url)
146 160 self._iopub_channel = self.iopub_channel_class(
147 161 socket, self.session, self.ioloop
148 162 )
149 163 return self._iopub_channel
150 164
151 165 @property
152 166 def stdin_channel(self):
153 167 """Get the stdin channel object for this kernel."""
154 168 if self._stdin_channel is None:
155 169 url = self._make_url('stdin')
156 170 self.log.debug("connecting stdin channel to %s", url)
157 171 socket = make_stdin_socket(self.context, self.session.bsession, url)
158 172 self._stdin_channel = self.stdin_channel_class(
159 173 socket, self.session, self.ioloop
160 174 )
161 175 return self._stdin_channel
162 176
163 177 @property
164 178 def hb_channel(self):
165 179 """Get the hb channel object for this kernel."""
166 180 if self._hb_channel is None:
167 181 url = self._make_url('hb')
168 182 self.log.debug("connecting heartbeat channel to %s", url)
169 183 self._hb_channel = self.hb_channel_class(
170 184 self.context, self.session, url
171 185 )
172 186 return self._hb_channel
173 187
174 188 def is_alive(self):
175 189 """Is the kernel process still running?"""
176 190 if self._hb_channel is not None:
177 191 # We didn't start the kernel with this KernelManager so we
178 192 # use the heartbeat.
179 193 return self._hb_channel.is_beating()
180 194 else:
181 195 # no heartbeat and not local, we can't tell if it's running,
182 196 # so naively return True
183 197 return True
184 198
185 199
186 200 # Methods to send specific messages on channels
187 201 def execute(self, code, silent=False, store_history=True,
188 202 user_expressions=None, allow_stdin=None):
189 203 """Execute code in the kernel.
190 204
191 205 Parameters
192 206 ----------
193 207 code : str
194 208 A string of Python code.
195 209
196 210 silent : bool, optional (default False)
197 211 If set, the kernel will execute the code as quietly possible, and
198 212 will force store_history to be False.
199 213
200 214 store_history : bool, optional (default True)
201 215 If set, the kernel will store command history. This is forced
202 216 to be False if silent is True.
203 217
204 218 user_expressions : dict, optional
205 219 A dict mapping names to expressions to be evaluated in the user's
206 220 dict. The expression values are returned as strings formatted using
207 221 :func:`repr`.
208 222
209 223 allow_stdin : bool, optional (default self.allow_stdin)
210 224 Flag for whether the kernel can send stdin requests to frontends.
211 225
212 226 Some frontends (e.g. the Notebook) do not support stdin requests.
213 227 If raw_input is called from code executed from such a frontend, a
214 228 StdinNotImplementedError will be raised.
215 229
216 230 Returns
217 231 -------
218 232 The msg_id of the message sent.
219 233 """
220 234 if user_expressions is None:
221 235 user_expressions = {}
222 236 if allow_stdin is None:
223 237 allow_stdin = self.allow_stdin
224 238
225 239
226 240 # Don't waste network traffic if inputs are invalid
227 241 if not isinstance(code, string_types):
228 242 raise ValueError('code %r must be a string' % code)
229 243 validate_string_dict(user_expressions)
230 244
231 245 # Create class for content/msg creation. Related to, but possibly
232 246 # not in Session.
233 247 content = dict(code=code, silent=silent, store_history=store_history,
234 248 user_expressions=user_expressions,
235 249 allow_stdin=allow_stdin,
236 250 )
237 251 msg = self.session.msg('execute_request', content)
238 252 self.shell_channel._queue_send(msg)
239 253 return msg['header']['msg_id']
240 254
241 255 def complete(self, code, cursor_pos=None):
242 256 """Tab complete text in the kernel's namespace.
243 257
244 258 Parameters
245 259 ----------
246 260 code : str
247 261 The context in which completion is requested.
248 262 Can be anything between a variable name and an entire cell.
249 263 cursor_pos : int, optional
250 264 The position of the cursor in the block of code where the completion was requested.
251 265 Default: ``len(code)``
252 266
253 267 Returns
254 268 -------
255 269 The msg_id of the message sent.
256 270 """
257 271 if cursor_pos is None:
258 272 cursor_pos = len(code)
259 273 content = dict(code=code, cursor_pos=cursor_pos)
260 274 msg = self.session.msg('complete_request', content)
261 275 self.shell_channel._queue_send(msg)
262 276 return msg['header']['msg_id']
263 277
264 278 def inspect(self, code, cursor_pos=None, detail_level=0):
265 279 """Get metadata information about an object in the kernel's namespace.
266 280
267 281 It is up to the kernel to determine the appropriate object to inspect.
268 282
269 283 Parameters
270 284 ----------
271 285 code : str
272 286 The context in which info is requested.
273 287 Can be anything between a variable name and an entire cell.
274 288 cursor_pos : int, optional
275 289 The position of the cursor in the block of code where the info was requested.
276 290 Default: ``len(code)``
277 291 detail_level : int, optional
278 292 The level of detail for the introspection (0-2)
279 293
280 294 Returns
281 295 -------
282 296 The msg_id of the message sent.
283 297 """
284 298 if cursor_pos is None:
285 299 cursor_pos = len(code)
286 300 content = dict(code=code, cursor_pos=cursor_pos,
287 301 detail_level=detail_level,
288 302 )
289 303 msg = self.session.msg('inspect_request', content)
290 304 self.shell_channel._queue_send(msg)
291 305 return msg['header']['msg_id']
292 306
293 307 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
294 308 """Get entries from the kernel's history list.
295 309
296 310 Parameters
297 311 ----------
298 312 raw : bool
299 313 If True, return the raw input.
300 314 output : bool
301 315 If True, then return the output as well.
302 316 hist_access_type : str
303 317 'range' (fill in session, start and stop params), 'tail' (fill in n)
304 318 or 'search' (fill in pattern param).
305 319
306 320 session : int
307 321 For a range request, the session from which to get lines. Session
308 322 numbers are positive integers; negative ones count back from the
309 323 current session.
310 324 start : int
311 325 The first line number of a history range.
312 326 stop : int
313 327 The final (excluded) line number of a history range.
314 328
315 329 n : int
316 330 The number of lines of history to get for a tail request.
317 331
318 332 pattern : str
319 333 The glob-syntax pattern for a search request.
320 334
321 335 Returns
322 336 -------
323 337 The msg_id of the message sent.
324 338 """
325 339 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
326 340 **kwargs)
327 341 msg = self.session.msg('history_request', content)
328 342 self.shell_channel._queue_send(msg)
329 343 return msg['header']['msg_id']
330 344
331 345 def kernel_info(self):
332 346 """Request kernel info."""
333 347 msg = self.session.msg('kernel_info_request')
334 348 self.shell_channel._queue_send(msg)
335 349 return msg['header']['msg_id']
336 350
337 351 def _handle_kernel_info_reply(self, msg):
338 352 """handle kernel info reply
339 353
340 354 sets protocol adaptation version
341 355 """
342 356 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
343 357 if adapt_version != major_protocol_version:
344 358 self.session.adapt_version = adapt_version
345 359
346 360 def shutdown(self, restart=False):
347 361 """Request an immediate kernel shutdown.
348 362
349 363 Upon receipt of the (empty) reply, client code can safely assume that
350 364 the kernel has shut down and it's safe to forcefully terminate it if
351 365 it's still alive.
352 366
353 367 The kernel will send the reply via a function registered with Python's
354 368 atexit module, ensuring it's truly done as the kernel is done with all
355 369 normal operation.
356 370 """
357 371 # Send quit message to kernel. Once we implement kernel-side setattr,
358 372 # this should probably be done that way, but for now this will do.
359 373 msg = self.session.msg('shutdown_request', {'restart':restart})
360 374 self.shell_channel._queue_send(msg)
361 375 return msg['header']['msg_id']
362 376
363 377 def is_complete(self, code):
364 378 msg = self.session.msg('is_complete_request', {'code': code})
365 379 self.shell_channel._queue_send(msg)
366 380 return msg['header']['msg_id']
367 381
368 382 def input(self, string):
369 383 """Send a string of raw input to the kernel."""
370 384 content = dict(value=string)
371 385 msg = self.session.msg('input_reply', content)
372 386 self.stdin_channel._queue_send(msg)
373 387
374 388
375 389 KernelClientABC.register(KernelClient)
@@ -1,267 +1,250
1 1 """ Defines a KernelClient that provides signals and slots.
2 2 """
3 3 import atexit
4 4 import errno
5 5 from threading import Thread
6 6 import time
7 7
8 8 import zmq
9 9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 10 # during garbage collection of threads at exit:
11 11 from zmq import ZMQError
12 12 from zmq.eventloop import ioloop, zmqstream
13 13
14 14 from IPython.external.qt import QtCore
15 15
16 16 # Local imports
17 17 from IPython.utils.traitlets import Type, Instance
18 18 from IPython.kernel.channels import HBChannel,\
19 19 make_shell_socket, make_iopub_socket, make_stdin_socket
20 20 from IPython.kernel import KernelClient
21 21
22 22 from .kernel_mixins import QtKernelClientMixin
23 23 from .util import SuperQObject
24 24
25 25 class QtHBChannel(SuperQObject, HBChannel):
26 26 # A longer timeout than the base class
27 27 time_to_dead = 3.0
28 28
29 29 # Emitted when the kernel has died.
30 30 kernel_died = QtCore.Signal(object)
31 31
32 32 def call_handlers(self, since_last_heartbeat):
33 33 """ Reimplemented to emit signals instead of making callbacks.
34 34 """
35 35 # Emit the generic signal.
36 36 self.kernel_died.emit(since_last_heartbeat)
37 37
38 38 from IPython.core.release import kernel_protocol_version_info
39 39
40 from IPython.utils.py3compat import string_types, iteritems
41
42 40 major_protocol_version = kernel_protocol_version_info[0]
43 41
44 42 class InvalidPortNumber(Exception):
45 43 pass
46 44
47 # some utilities to validate message structure, these might get moved elsewhere
48 # if they prove to have more generic utility
49
50
51 def validate_string_dict(dct):
52 """Validate that the input is a dict with string keys and values.
53
54 Raises ValueError if not."""
55 for k,v in iteritems(dct):
56 if not isinstance(k, string_types):
57 raise ValueError('key %r in dict must be a string' % k)
58 if not isinstance(v, string_types):
59 raise ValueError('value %r in dict must be a string' % v)
60
61
62 45
63 46 class QtZMQSocketChannel(SuperQObject):
64 47 """A ZMQ socket emitting a Qt signal when a message is received."""
65 48 session = None
66 49 socket = None
67 50 ioloop = None
68 51 stream = None
69 52
70 53 message_received = QtCore.Signal(object)
71 54
72 55 def process_events(self):
73 56 """ Process any pending GUI events.
74 57 """
75 58 QtCore.QCoreApplication.instance().processEvents()
76 59
77 60 def __init__(self, socket, session, loop):
78 61 """Create a channel.
79 62
80 63 Parameters
81 64 ----------
82 65 socket : :class:`zmq.Socket`
83 66 The ZMQ socket to use.
84 67 session : :class:`session.Session`
85 68 The session to use.
86 69 loop
87 70 A pyzmq ioloop to connect the socket to using a ZMQStream
88 71 """
89 72 super(QtZMQSocketChannel, self).__init__()
90 73
91 74 self.socket = socket
92 75 self.session = session
93 76 self.ioloop = loop
94 77
95 78 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
96 79 self.stream.on_recv(self._handle_recv)
97 80
98 81 _is_alive = False
99 82 def is_alive(self):
100 83 return self._is_alive
101 84
102 85 def start(self):
103 86 self._is_alive = True
104 87
105 88 def stop(self):
106 89 self._is_alive = False
107 90
108 91 def close(self):
109 92 if self.socket is not None:
110 93 try:
111 94 self.socket.close(linger=0)
112 95 except Exception:
113 96 pass
114 97 self.socket = None
115 98
116 99 def _queue_send(self, msg):
117 100 """Queue a message to be sent from the IOLoop's thread.
118 101
119 102 Parameters
120 103 ----------
121 104 msg : message to send
122 105
123 106 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
124 107 thread control of the action.
125 108 """
126 109 def thread_send():
127 110 self.session.send(self.stream, msg)
128 111 self.ioloop.add_callback(thread_send)
129 112
130 113 def _handle_recv(self, msg):
131 114 """Callback for stream.on_recv.
132 115
133 116 Unpacks message, and calls handlers with it.
134 117 """
135 118 ident,smsg = self.session.feed_identities(msg)
136 119 msg = self.session.deserialize(smsg)
137 120 self.call_handlers(msg)
138 121
139 122 def call_handlers(self, msg):
140 123 """This method is called in the ioloop thread when a message arrives.
141 124
142 125 Subclasses should override this method to handle incoming messages.
143 126 It is important to remember that this method is called in the thread
144 127 so that some logic must be done to ensure that the application level
145 128 handlers are called in the application thread.
146 129 """
147 130 # Emit the generic signal.
148 131 self.message_received.emit(msg)
149 132
150 133 def flush(self, timeout=1.0):
151 134 """Immediately processes all pending messages on this channel.
152 135
153 136 This is only used for the IOPub channel.
154 137
155 138 Callers should use this method to ensure that :meth:`call_handlers`
156 139 has been called for all messages that have been received on the
157 140 0MQ SUB socket of this channel.
158 141
159 142 This method is thread safe.
160 143
161 144 Parameters
162 145 ----------
163 146 timeout : float, optional
164 147 The maximum amount of time to spend flushing, in seconds. The
165 148 default is one second.
166 149 """
167 150 # We do the IOLoop callback process twice to ensure that the IOLoop
168 151 # gets to perform at least one full poll.
169 152 stop_time = time.time() + timeout
170 153 for i in range(2):
171 154 self._flushed = False
172 155 self.ioloop.add_callback(self._flush)
173 156 while not self._flushed and time.time() < stop_time:
174 157 time.sleep(0.01)
175 158
176 159 def _flush(self):
177 160 """Callback for :method:`self.flush`."""
178 161 self.stream.flush()
179 162 self._flushed = True
180 163
181 164
182 165 class IOLoopThread(Thread):
183 166 """Run a pyzmq ioloop in a thread to send and receive messages
184 167 """
185 168 def __init__(self, loop):
186 169 super(IOLoopThread, self).__init__()
187 170 self.daemon = True
188 171 atexit.register(self._notice_exit)
189 172 self.ioloop = loop or ioloop.IOLoop()
190 173
191 174 def _notice_exit(self):
192 175 self._exiting = True
193 176
194 177 def run(self):
195 178 """Run my loop, ignoring EINTR events in the poller"""
196 179 while True:
197 180 try:
198 181 self.ioloop.start()
199 182 except ZMQError as e:
200 183 if e.errno == errno.EINTR:
201 184 continue
202 185 else:
203 186 raise
204 187 except Exception:
205 188 if self._exiting:
206 189 break
207 190 else:
208 191 raise
209 192 else:
210 193 break
211 194
212 195 def stop(self):
213 196 """Stop the channel's event loop and join its thread.
214 197
215 198 This calls :meth:`~threading.Thread.join` and returns when the thread
216 199 terminates. :class:`RuntimeError` will be raised if
217 200 :meth:`~threading.Thread.start` is called again.
218 201 """
219 202 if self.ioloop is not None:
220 203 self.ioloop.stop()
221 204 self.join()
222 205 self.close()
223 206
224 207 def close(self):
225 208 if self.ioloop is not None:
226 209 try:
227 210 self.ioloop.close(all_fds=True)
228 211 except Exception:
229 212 pass
230 213
231 214
232 215 class QtKernelClient(QtKernelClientMixin, KernelClient):
233 216 """ A KernelClient that provides signals and slots.
234 217 """
235 218
236 219 _ioloop = None
237 220 @property
238 221 def ioloop(self):
239 222 if self._ioloop is None:
240 223 self._ioloop = ioloop.IOLoop()
241 224 return self._ioloop
242 225
243 226 ioloop_thread = Instance(IOLoopThread)
244 227
245 228 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
246 229 if shell:
247 230 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
248 231
249 232 self.ioloop_thread = IOLoopThread(self.ioloop)
250 233 self.ioloop_thread.start()
251 234
252 235 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
253 236
254 237 def _check_kernel_info_reply(self, msg):
255 238 if msg['msg_type'] == 'kernel_info_reply':
256 239 self._handle_kernel_info_reply(msg)
257 240 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
258 241
259 242 def stop_channels(self):
260 243 super(QtKernelClient, self).stop_channels()
261 244 if self.ioloop_thread.is_alive():
262 245 self.ioloop_thread.stop()
263 246
264 247 iopub_channel_class = Type(QtZMQSocketChannel)
265 248 shell_channel_class = Type(QtZMQSocketChannel)
266 249 stdin_channel_class = Type(QtZMQSocketChannel)
267 250 hb_channel_class = Type(QtHBChannel)
General Comments 0
You need to be logged in to leave comments. Login now