##// END OF EJS Templates
Implement blocking channels without Python threads
Thomas Kluyver -
Show More
@@ -2,16 +2,12 b''
2 2
3 3 Useful for test suites and blocking terminal interfaces.
4 4 """
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2013 The IPython Development Team
7 #
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING.txt, distributed as part of this software.
10 #-----------------------------------------------------------------------------
11 5
12 #-----------------------------------------------------------------------------
13 # Imports
14 #-----------------------------------------------------------------------------
6 # Copyright (c) IPython Development Team.
7 # Distributed under the terms of the Modified BSD License.
8
9 import atexit
10 import zmq
15 11
16 12 try:
17 13 from queue import Queue, Empty # Py 3
@@ -19,29 +15,87 b' except ImportError:'
19 15 from Queue import Queue, Empty # Py 2
20 16
21 17 from IPython.kernel.channels import IOPubChannel, HBChannel, \
22 ShellChannel, StdInChannel
18 ShellChannel, StdInChannel, InvalidPortNumber, major_protocol_version
19 from IPython.utils.py3compat import string_types, iteritems
20
21 # some utilities to validate message structure, these might get moved elsewhere
22 # if they prove to have more generic utility
23
24 def validate_string_list(lst):
25 """Validate that the input is a list of strings.
26
27 Raises ValueError if not."""
28 if not isinstance(lst, list):
29 raise ValueError('input %r must be a list' % lst)
30 for x in lst:
31 if not isinstance(x, string_types):
32 raise ValueError('element %r in list must be a string' % x)
33
34
35 def validate_string_dict(dct):
36 """Validate that the input is a dict with string keys and values.
37
38 Raises ValueError if not."""
39 for k,v in iteritems(dct):
40 if not isinstance(k, string_types):
41 raise ValueError('key %r in dict must be a string' % k)
42 if not isinstance(v, string_types):
43 raise ValueError('value %r in dict must be a string' % v)
23 44
24 #-----------------------------------------------------------------------------
25 # Blocking kernel manager
26 #-----------------------------------------------------------------------------
27 45
46 class ZMQSocketChannel(object):
47 """The base class for the channels that use ZMQ sockets."""
48 context = None
49 session = None
50 socket = None
51 ioloop = None
52 stream = None
53 _address = None
54 _exiting = False
55 proxy_methods = []
28 56
29 class BlockingChannelMixin(object):
57 def __init__(self, context, session, address):
58 """Create a channel.
30 59
31 def __init__(self, *args, **kwds):
32 super(BlockingChannelMixin, self).__init__(*args, **kwds)
33 self._in_queue = Queue()
60 Parameters
61 ----------
62 context : :class:`zmq.Context`
63 The ZMQ context to use.
64 session : :class:`session.Session`
65 The session to use.
66 address : zmq url
67 Standard (ip, port) tuple that the kernel is listening on.
68 """
69 super(ZMQSocketChannel, self).__init__()
70 self.daemon = True
34 71
35 def call_handlers(self, msg):
36 self._in_queue.put(msg)
72 self.context = context
73 self.session = session
74 if isinstance(address, tuple):
75 if address[1] == 0:
76 message = 'The port number for a channel cannot be 0.'
77 raise InvalidPortNumber(message)
78 address = "tcp://%s:%i" % address
79 self._address = address
80
81 def _recv(self, **kwargs):
82 msg = self.socket.recv_multipart(**kwargs)
83 ident,smsg = self.session.feed_identities(msg)
84 return self.session.deserialize(smsg)
37 85
38 86 def get_msg(self, block=True, timeout=None):
39 87 """ Gets a message if there is one that is ready. """
40 if timeout is None:
41 # Queue.get(timeout=None) has stupid uninteruptible
42 # behavior, so wait for a week instead
43 timeout = 604800
44 return self._in_queue.get(block, timeout)
88 if block:
89 if timeout is not None:
90 timeout *= 1000 # seconds to ms
91 ready = self.socket.poll(timeout)
92 else:
93 ready = self.socket.poll(timeout=0)
94
95 if ready:
96 return self._recv()
97 else:
98 raise Empty
45 99
46 100 def get_msgs(self):
47 101 """ Get all messages that are currently ready. """
@@ -55,22 +109,274 b' class BlockingChannelMixin(object):'
55 109
56 110 def msg_ready(self):
57 111 """ Is there a message that has been received? """
58 return not self._in_queue.empty()
112 return bool(self.socket.poll(timeout=0))
113
114 def close(self):
115 if self.socket is not None:
116 try:
117 self.socket.close(linger=0)
118 except Exception:
119 pass
120 self.socket = None
121 stop = close
122
123 def is_alive(self):
124 return (self.socket is not None)
125
126 @property
127 def address(self):
128 """Get the channel's address as a zmq url string.
129
130 These URLS have the form: 'tcp://127.0.0.1:5555'.
131 """
132 return self._address
133
134 def _queue_send(self, msg):
135 """Pass a message to the ZMQ socket to send
136 """
137 self.session.send(self.socket, msg)
138
139
140 class BlockingShellChannel(ZMQSocketChannel):
141 """The shell channel for issuing request/replies to the kernel."""
142
143 command_queue = None
144 # flag for whether execute requests should be allowed to call raw_input:
145 allow_stdin = True
146 proxy_methods = [
147 'execute',
148 'complete',
149 'inspect',
150 'history',
151 'kernel_info',
152 'shutdown',
153 'is_complete',
154 ]
155
156 def start(self):
157 self.socket = self.context.socket(zmq.DEALER)
158 self.socket.linger = 1000
159 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
160 self.socket.connect(self.address)
161
162 def execute(self, code, silent=False, store_history=True,
163 user_expressions=None, allow_stdin=None):
164 """Execute code in the kernel.
165
166 Parameters
167 ----------
168 code : str
169 A string of Python code.
170
171 silent : bool, optional (default False)
172 If set, the kernel will execute the code as quietly possible, and
173 will force store_history to be False.
174
175 store_history : bool, optional (default True)
176 If set, the kernel will store command history. This is forced
177 to be False if silent is True.
178
179 user_expressions : dict, optional
180 A dict mapping names to expressions to be evaluated in the user's
181 dict. The expression values are returned as strings formatted using
182 :func:`repr`.
183
184 allow_stdin : bool, optional (default self.allow_stdin)
185 Flag for whether the kernel can send stdin requests to frontends.
186
187 Some frontends (e.g. the Notebook) do not support stdin requests.
188 If raw_input is called from code executed from such a frontend, a
189 StdinNotImplementedError will be raised.
59 190
191 Returns
192 -------
193 The msg_id of the message sent.
194 """
195 if user_expressions is None:
196 user_expressions = {}
197 if allow_stdin is None:
198 allow_stdin = self.allow_stdin
60 199
61 class BlockingIOPubChannel(BlockingChannelMixin, IOPubChannel):
62 pass
63 200
201 # Don't waste network traffic if inputs are invalid
202 if not isinstance(code, string_types):
203 raise ValueError('code %r must be a string' % code)
204 validate_string_dict(user_expressions)
64 205
65 class BlockingShellChannel(BlockingChannelMixin, ShellChannel):
66 def call_handlers(self, msg):
206 # Create class for content/msg creation. Related to, but possibly
207 # not in Session.
208 content = dict(code=code, silent=silent, store_history=store_history,
209 user_expressions=user_expressions,
210 allow_stdin=allow_stdin,
211 )
212 msg = self.session.msg('execute_request', content)
213 self._queue_send(msg)
214 return msg['header']['msg_id']
215
216 def complete(self, code, cursor_pos=None):
217 """Tab complete text in the kernel's namespace.
218
219 Parameters
220 ----------
221 code : str
222 The context in which completion is requested.
223 Can be anything between a variable name and an entire cell.
224 cursor_pos : int, optional
225 The position of the cursor in the block of code where the completion was requested.
226 Default: ``len(code)``
227
228 Returns
229 -------
230 The msg_id of the message sent.
231 """
232 if cursor_pos is None:
233 cursor_pos = len(code)
234 content = dict(code=code, cursor_pos=cursor_pos)
235 msg = self.session.msg('complete_request', content)
236 self._queue_send(msg)
237 return msg['header']['msg_id']
238
239 def inspect(self, code, cursor_pos=None, detail_level=0):
240 """Get metadata information about an object in the kernel's namespace.
241
242 It is up to the kernel to determine the appropriate object to inspect.
243
244 Parameters
245 ----------
246 code : str
247 The context in which info is requested.
248 Can be anything between a variable name and an entire cell.
249 cursor_pos : int, optional
250 The position of the cursor in the block of code where the info was requested.
251 Default: ``len(code)``
252 detail_level : int, optional
253 The level of detail for the introspection (0-2)
254
255 Returns
256 -------
257 The msg_id of the message sent.
258 """
259 if cursor_pos is None:
260 cursor_pos = len(code)
261 content = dict(code=code, cursor_pos=cursor_pos,
262 detail_level=detail_level,
263 )
264 msg = self.session.msg('inspect_request', content)
265 self._queue_send(msg)
266 return msg['header']['msg_id']
267
268 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
269 """Get entries from the kernel's history list.
270
271 Parameters
272 ----------
273 raw : bool
274 If True, return the raw input.
275 output : bool
276 If True, then return the output as well.
277 hist_access_type : str
278 'range' (fill in session, start and stop params), 'tail' (fill in n)
279 or 'search' (fill in pattern param).
280
281 session : int
282 For a range request, the session from which to get lines. Session
283 numbers are positive integers; negative ones count back from the
284 current session.
285 start : int
286 The first line number of a history range.
287 stop : int
288 The final (excluded) line number of a history range.
289
290 n : int
291 The number of lines of history to get for a tail request.
292
293 pattern : str
294 The glob-syntax pattern for a search request.
295
296 Returns
297 -------
298 The msg_id of the message sent.
299 """
300 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
301 **kwargs)
302 msg = self.session.msg('history_request', content)
303 self._queue_send(msg)
304 return msg['header']['msg_id']
305
306 def kernel_info(self):
307 """Request kernel info."""
308 msg = self.session.msg('kernel_info_request')
309 self._queue_send(msg)
310 return msg['header']['msg_id']
311
312 def _handle_kernel_info_reply(self, msg):
313 """handle kernel info reply
314
315 sets protocol adaptation version
316 """
317 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
318 if adapt_version != major_protocol_version:
319 self.session.adapt_version = adapt_version
320
321 def shutdown(self, restart=False):
322 """Request an immediate kernel shutdown.
323
324 Upon receipt of the (empty) reply, client code can safely assume that
325 the kernel has shut down and it's safe to forcefully terminate it if
326 it's still alive.
327
328 The kernel will send the reply via a function registered with Python's
329 atexit module, ensuring it's truly done as the kernel is done with all
330 normal operation.
331 """
332 # Send quit message to kernel. Once we implement kernel-side setattr,
333 # this should probably be done that way, but for now this will do.
334 msg = self.session.msg('shutdown_request', {'restart':restart})
335 self._queue_send(msg)
336 return msg['header']['msg_id']
337
338 def is_complete(self, code):
339 msg = self.session.msg('is_complete_request', {'code': code})
340 self._queue_send(msg)
341 return msg['header']['msg_id']
342
343 def _recv(self, **kwargs):
344 # Listen for kernel_info_reply message to do protocol adaptation
345 msg = ZMQSocketChannel._recv(self, **kwargs)
67 346 if msg['msg_type'] == 'kernel_info_reply':
68 347 self._handle_kernel_info_reply(msg)
69 return super(BlockingShellChannel, self).call_handlers(msg)
348 return msg
349
350
351 class BlockingIOPubChannel(ZMQSocketChannel):
352 """The iopub channel which listens for messages that the kernel publishes.
353
354 This channel is where all output is published to frontends.
355 """
356 def start(self):
357 self.socket = self.context.socket(zmq.SUB)
358 self.socket.linger = 1000
359 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
360 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
361 self.socket.connect(self.address)
362
363
364 class BlockingStdInChannel(ZMQSocketChannel):
365 """The stdin channel to handle raw_input requests that the kernel makes."""
366 msg_queue = None
367 proxy_methods = ['input']
70 368
369 def start(self):
370 self.socket = self.context.socket(zmq.DEALER)
371 self.socket.linger = 1000
372 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
373 self.socket.connect(self.address)
71 374
72 class BlockingStdInChannel(BlockingChannelMixin, StdInChannel):
73 pass
375 def input(self, string):
376 """Send a string of raw input to the kernel."""
377 content = dict(value=string)
378 msg = self.session.msg('input_reply', content)
379 self._queue_send(msg)
74 380
75 381
76 382 class BlockingHBChannel(HBChannel):
@@ -9,14 +9,15 b' Useful for test suites and blocking terminal interfaces.'
9 9 # the file COPYING.txt, distributed as part of this software.
10 10 #-----------------------------------------------------------------------------
11 11
12 #-----------------------------------------------------------------------------
13 # Imports
14 #-----------------------------------------------------------------------------
12 try:
13 from queue import Queue, Empty # Py 3
14 except ImportError:
15 from Queue import Queue, Empty # Py 2
15 16
16 17 # IPython imports
17 18 from IPython.utils.io import raw_print
18 19 from IPython.utils.traitlets import Type
19 from IPython.kernel.blocking.channels import BlockingChannelMixin
20 #from IPython.kernel.blocking.channels import BlockingChannelMixin
20 21
21 22 # Local imports
22 23 from .channels import (
@@ -26,9 +27,36 b' from .channels import ('
26 27 )
27 28 from .client import InProcessKernelClient
28 29
29 #-----------------------------------------------------------------------------
30 # Blocking kernel manager
31 #-----------------------------------------------------------------------------
30 class BlockingChannelMixin(object):
31
32 def __init__(self, *args, **kwds):
33 super(BlockingChannelMixin, self).__init__(*args, **kwds)
34 self._in_queue = Queue()
35
36 def call_handlers(self, msg):
37 self._in_queue.put(msg)
38
39 def get_msg(self, block=True, timeout=None):
40 """ Gets a message if there is one that is ready. """
41 if timeout is None:
42 # Queue.get(timeout=None) has stupid uninteruptible
43 # behavior, so wait for a week instead
44 timeout = 604800
45 return self._in_queue.get(block, timeout)
46
47 def get_msgs(self):
48 """ Get all messages that are currently ready. """
49 msgs = []
50 while True:
51 try:
52 msgs.append(self.get_msg(block=False))
53 except Empty:
54 break
55 return msgs
56
57 def msg_ready(self):
58 """ Is there a message that has been received? """
59 return not self._in_queue.empty()
32 60
33 61 class BlockingInProcessShellChannel(BlockingChannelMixin, InProcessShellChannel):
34 62 pass
General Comments 0
You need to be logged in to leave comments. Login now