##// END OF EJS Templates
Factor out code to set up ZMQ sockets
Thomas Kluyver -
Show More
@@ -1,391 +1,381 b''
1 """Blocking channels
1 """Blocking channels
2
2
3 Useful for test suites and blocking terminal interfaces.
3 Useful for test suites and blocking terminal interfaces.
4 """
4 """
5
5
6 # Copyright (c) IPython Development Team.
6 # Copyright (c) IPython Development Team.
7 # Distributed under the terms of the Modified BSD License.
7 # Distributed under the terms of the Modified BSD License.
8
8
9 import atexit
9 import atexit
10 import zmq
10 import zmq
11
11
12 try:
12 try:
13 from queue import Queue, Empty # Py 3
13 from queue import Queue, Empty # Py 3
14 except ImportError:
14 except ImportError:
15 from Queue import Queue, Empty # Py 2
15 from Queue import Queue, Empty # Py 2
16
16
17 from IPython.kernel.channels import IOPubChannel, HBChannel, \
17 from IPython.kernel.channels import HBChannel,\
18 ShellChannel, StdInChannel, InvalidPortNumber, major_protocol_version
18 make_iopub_socket, make_shell_socket, make_stdin_socket,\
19 InvalidPortNumber, major_protocol_version
19 from IPython.utils.py3compat import string_types, iteritems
20 from IPython.utils.py3compat import string_types, iteritems
20
21
21 # some utilities to validate message structure, these might get moved elsewhere
22 # some utilities to validate message structure, these might get moved elsewhere
22 # if they prove to have more generic utility
23 # if they prove to have more generic utility
23
24
24 def validate_string_list(lst):
25 def validate_string_list(lst):
25 """Validate that the input is a list of strings.
26 """Validate that the input is a list of strings.
26
27
27 Raises ValueError if not."""
28 Raises ValueError if not."""
28 if not isinstance(lst, list):
29 if not isinstance(lst, list):
29 raise ValueError('input %r must be a list' % lst)
30 raise ValueError('input %r must be a list' % lst)
30 for x in lst:
31 for x in lst:
31 if not isinstance(x, string_types):
32 if not isinstance(x, string_types):
32 raise ValueError('element %r in list must be a string' % x)
33 raise ValueError('element %r in list must be a string' % x)
33
34
34
35
35 def validate_string_dict(dct):
36 def validate_string_dict(dct):
36 """Validate that the input is a dict with string keys and values.
37 """Validate that the input is a dict with string keys and values.
37
38
38 Raises ValueError if not."""
39 Raises ValueError if not."""
39 for k,v in iteritems(dct):
40 for k,v in iteritems(dct):
40 if not isinstance(k, string_types):
41 if not isinstance(k, string_types):
41 raise ValueError('key %r in dict must be a string' % k)
42 raise ValueError('key %r in dict must be a string' % k)
42 if not isinstance(v, string_types):
43 if not isinstance(v, string_types):
43 raise ValueError('value %r in dict must be a string' % v)
44 raise ValueError('value %r in dict must be a string' % v)
44
45
45
46
46 class ZMQSocketChannel(object):
47 class ZMQSocketChannel(object):
47 """The base class for the channels that use ZMQ sockets."""
48 """The base class for the channels that use ZMQ sockets."""
48 context = None
49 context = None
49 session = None
50 session = None
50 socket = None
51 socket = None
51 ioloop = None
52 ioloop = None
52 stream = None
53 stream = None
53 _address = None
54 _address = None
54 _exiting = False
55 _exiting = False
55 proxy_methods = []
56 proxy_methods = []
56
57
57 def __init__(self, context, session, address):
58 def __init__(self, context, session, address):
58 """Create a channel.
59 """Create a channel.
59
60
60 Parameters
61 Parameters
61 ----------
62 ----------
62 context : :class:`zmq.Context`
63 context : :class:`zmq.Context`
63 The ZMQ context to use.
64 The ZMQ context to use.
64 session : :class:`session.Session`
65 session : :class:`session.Session`
65 The session to use.
66 The session to use.
66 address : zmq url
67 address : zmq url
67 Standard (ip, port) tuple that the kernel is listening on.
68 Standard (ip, port) tuple that the kernel is listening on.
68 """
69 """
69 super(ZMQSocketChannel, self).__init__()
70 super(ZMQSocketChannel, self).__init__()
70 self.daemon = True
71 self.daemon = True
71
72
72 self.context = context
73 self.context = context
73 self.session = session
74 self.session = session
74 if isinstance(address, tuple):
75 if isinstance(address, tuple):
75 if address[1] == 0:
76 if address[1] == 0:
76 message = 'The port number for a channel cannot be 0.'
77 message = 'The port number for a channel cannot be 0.'
77 raise InvalidPortNumber(message)
78 raise InvalidPortNumber(message)
78 address = "tcp://%s:%i" % address
79 address = "tcp://%s:%i" % address
79 self._address = address
80 self._address = address
80
81
81 def _recv(self, **kwargs):
82 def _recv(self, **kwargs):
82 msg = self.socket.recv_multipart(**kwargs)
83 msg = self.socket.recv_multipart(**kwargs)
83 ident,smsg = self.session.feed_identities(msg)
84 ident,smsg = self.session.feed_identities(msg)
84 return self.session.deserialize(smsg)
85 return self.session.deserialize(smsg)
85
86
86 def get_msg(self, block=True, timeout=None):
87 def get_msg(self, block=True, timeout=None):
87 """ Gets a message if there is one that is ready. """
88 """ Gets a message if there is one that is ready. """
88 if block:
89 if block:
89 if timeout is not None:
90 if timeout is not None:
90 timeout *= 1000 # seconds to ms
91 timeout *= 1000 # seconds to ms
91 ready = self.socket.poll(timeout)
92 ready = self.socket.poll(timeout)
92 else:
93 else:
93 ready = self.socket.poll(timeout=0)
94 ready = self.socket.poll(timeout=0)
94
95
95 if ready:
96 if ready:
96 return self._recv()
97 return self._recv()
97 else:
98 else:
98 raise Empty
99 raise Empty
99
100
100 def get_msgs(self):
101 def get_msgs(self):
101 """ Get all messages that are currently ready. """
102 """ Get all messages that are currently ready. """
102 msgs = []
103 msgs = []
103 while True:
104 while True:
104 try:
105 try:
105 msgs.append(self.get_msg(block=False))
106 msgs.append(self.get_msg(block=False))
106 except Empty:
107 except Empty:
107 break
108 break
108 return msgs
109 return msgs
109
110
110 def msg_ready(self):
111 def msg_ready(self):
111 """ Is there a message that has been received? """
112 """ Is there a message that has been received? """
112 return bool(self.socket.poll(timeout=0))
113 return bool(self.socket.poll(timeout=0))
113
114
114 def close(self):
115 def close(self):
115 if self.socket is not None:
116 if self.socket is not None:
116 try:
117 try:
117 self.socket.close(linger=0)
118 self.socket.close(linger=0)
118 except Exception:
119 except Exception:
119 pass
120 pass
120 self.socket = None
121 self.socket = None
121 stop = close
122 stop = close
122
123
123 def is_alive(self):
124 def is_alive(self):
124 return (self.socket is not None)
125 return (self.socket is not None)
125
126
126 @property
127 @property
127 def address(self):
128 def address(self):
128 """Get the channel's address as a zmq url string.
129 """Get the channel's address as a zmq url string.
129
130
130 These URLS have the form: 'tcp://127.0.0.1:5555'.
131 These URLS have the form: 'tcp://127.0.0.1:5555'.
131 """
132 """
132 return self._address
133 return self._address
133
134
134 def _queue_send(self, msg):
135 def _queue_send(self, msg):
135 """Pass a message to the ZMQ socket to send
136 """Pass a message to the ZMQ socket to send
136 """
137 """
137 self.session.send(self.socket, msg)
138 self.session.send(self.socket, msg)
138
139
139
140
140 class BlockingShellChannel(ZMQSocketChannel):
141 class BlockingShellChannel(ZMQSocketChannel):
141 """The shell channel for issuing request/replies to the kernel."""
142 """The shell channel for issuing request/replies to the kernel."""
142
143
143 command_queue = None
144 command_queue = None
144 # flag for whether execute requests should be allowed to call raw_input:
145 # flag for whether execute requests should be allowed to call raw_input:
145 allow_stdin = True
146 allow_stdin = True
146 proxy_methods = [
147 proxy_methods = [
147 'execute',
148 'execute',
148 'complete',
149 'complete',
149 'inspect',
150 'inspect',
150 'history',
151 'history',
151 'kernel_info',
152 'kernel_info',
152 'shutdown',
153 'shutdown',
153 'is_complete',
154 'is_complete',
154 ]
155 ]
155
156
156 def start(self):
157 def start(self):
157 self.socket = self.context.socket(zmq.DEALER)
158 self.socket = make_stdin_socket(self.context, self.session.bsession, self.address)
158 self.socket.linger = 1000
159 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
160 self.socket.connect(self.address)
161
159
162 def execute(self, code, silent=False, store_history=True,
160 def execute(self, code, silent=False, store_history=True,
163 user_expressions=None, allow_stdin=None):
161 user_expressions=None, allow_stdin=None):
164 """Execute code in the kernel.
162 """Execute code in the kernel.
165
163
166 Parameters
164 Parameters
167 ----------
165 ----------
168 code : str
166 code : str
169 A string of Python code.
167 A string of Python code.
170
168
171 silent : bool, optional (default False)
169 silent : bool, optional (default False)
172 If set, the kernel will execute the code as quietly possible, and
170 If set, the kernel will execute the code as quietly possible, and
173 will force store_history to be False.
171 will force store_history to be False.
174
172
175 store_history : bool, optional (default True)
173 store_history : bool, optional (default True)
176 If set, the kernel will store command history. This is forced
174 If set, the kernel will store command history. This is forced
177 to be False if silent is True.
175 to be False if silent is True.
178
176
179 user_expressions : dict, optional
177 user_expressions : dict, optional
180 A dict mapping names to expressions to be evaluated in the user's
178 A dict mapping names to expressions to be evaluated in the user's
181 dict. The expression values are returned as strings formatted using
179 dict. The expression values are returned as strings formatted using
182 :func:`repr`.
180 :func:`repr`.
183
181
184 allow_stdin : bool, optional (default self.allow_stdin)
182 allow_stdin : bool, optional (default self.allow_stdin)
185 Flag for whether the kernel can send stdin requests to frontends.
183 Flag for whether the kernel can send stdin requests to frontends.
186
184
187 Some frontends (e.g. the Notebook) do not support stdin requests.
185 Some frontends (e.g. the Notebook) do not support stdin requests.
188 If raw_input is called from code executed from such a frontend, a
186 If raw_input is called from code executed from such a frontend, a
189 StdinNotImplementedError will be raised.
187 StdinNotImplementedError will be raised.
190
188
191 Returns
189 Returns
192 -------
190 -------
193 The msg_id of the message sent.
191 The msg_id of the message sent.
194 """
192 """
195 if user_expressions is None:
193 if user_expressions is None:
196 user_expressions = {}
194 user_expressions = {}
197 if allow_stdin is None:
195 if allow_stdin is None:
198 allow_stdin = self.allow_stdin
196 allow_stdin = self.allow_stdin
199
197
200
198
201 # Don't waste network traffic if inputs are invalid
199 # Don't waste network traffic if inputs are invalid
202 if not isinstance(code, string_types):
200 if not isinstance(code, string_types):
203 raise ValueError('code %r must be a string' % code)
201 raise ValueError('code %r must be a string' % code)
204 validate_string_dict(user_expressions)
202 validate_string_dict(user_expressions)
205
203
206 # Create class for content/msg creation. Related to, but possibly
204 # Create class for content/msg creation. Related to, but possibly
207 # not in Session.
205 # not in Session.
208 content = dict(code=code, silent=silent, store_history=store_history,
206 content = dict(code=code, silent=silent, store_history=store_history,
209 user_expressions=user_expressions,
207 user_expressions=user_expressions,
210 allow_stdin=allow_stdin,
208 allow_stdin=allow_stdin,
211 )
209 )
212 msg = self.session.msg('execute_request', content)
210 msg = self.session.msg('execute_request', content)
213 self._queue_send(msg)
211 self._queue_send(msg)
214 return msg['header']['msg_id']
212 return msg['header']['msg_id']
215
213
216 def complete(self, code, cursor_pos=None):
214 def complete(self, code, cursor_pos=None):
217 """Tab complete text in the kernel's namespace.
215 """Tab complete text in the kernel's namespace.
218
216
219 Parameters
217 Parameters
220 ----------
218 ----------
221 code : str
219 code : str
222 The context in which completion is requested.
220 The context in which completion is requested.
223 Can be anything between a variable name and an entire cell.
221 Can be anything between a variable name and an entire cell.
224 cursor_pos : int, optional
222 cursor_pos : int, optional
225 The position of the cursor in the block of code where the completion was requested.
223 The position of the cursor in the block of code where the completion was requested.
226 Default: ``len(code)``
224 Default: ``len(code)``
227
225
228 Returns
226 Returns
229 -------
227 -------
230 The msg_id of the message sent.
228 The msg_id of the message sent.
231 """
229 """
232 if cursor_pos is None:
230 if cursor_pos is None:
233 cursor_pos = len(code)
231 cursor_pos = len(code)
234 content = dict(code=code, cursor_pos=cursor_pos)
232 content = dict(code=code, cursor_pos=cursor_pos)
235 msg = self.session.msg('complete_request', content)
233 msg = self.session.msg('complete_request', content)
236 self._queue_send(msg)
234 self._queue_send(msg)
237 return msg['header']['msg_id']
235 return msg['header']['msg_id']
238
236
239 def inspect(self, code, cursor_pos=None, detail_level=0):
237 def inspect(self, code, cursor_pos=None, detail_level=0):
240 """Get metadata information about an object in the kernel's namespace.
238 """Get metadata information about an object in the kernel's namespace.
241
239
242 It is up to the kernel to determine the appropriate object to inspect.
240 It is up to the kernel to determine the appropriate object to inspect.
243
241
244 Parameters
242 Parameters
245 ----------
243 ----------
246 code : str
244 code : str
247 The context in which info is requested.
245 The context in which info is requested.
248 Can be anything between a variable name and an entire cell.
246 Can be anything between a variable name and an entire cell.
249 cursor_pos : int, optional
247 cursor_pos : int, optional
250 The position of the cursor in the block of code where the info was requested.
248 The position of the cursor in the block of code where the info was requested.
251 Default: ``len(code)``
249 Default: ``len(code)``
252 detail_level : int, optional
250 detail_level : int, optional
253 The level of detail for the introspection (0-2)
251 The level of detail for the introspection (0-2)
254
252
255 Returns
253 Returns
256 -------
254 -------
257 The msg_id of the message sent.
255 The msg_id of the message sent.
258 """
256 """
259 if cursor_pos is None:
257 if cursor_pos is None:
260 cursor_pos = len(code)
258 cursor_pos = len(code)
261 content = dict(code=code, cursor_pos=cursor_pos,
259 content = dict(code=code, cursor_pos=cursor_pos,
262 detail_level=detail_level,
260 detail_level=detail_level,
263 )
261 )
264 msg = self.session.msg('inspect_request', content)
262 msg = self.session.msg('inspect_request', content)
265 self._queue_send(msg)
263 self._queue_send(msg)
266 return msg['header']['msg_id']
264 return msg['header']['msg_id']
267
265
268 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
266 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
269 """Get entries from the kernel's history list.
267 """Get entries from the kernel's history list.
270
268
271 Parameters
269 Parameters
272 ----------
270 ----------
273 raw : bool
271 raw : bool
274 If True, return the raw input.
272 If True, return the raw input.
275 output : bool
273 output : bool
276 If True, then return the output as well.
274 If True, then return the output as well.
277 hist_access_type : str
275 hist_access_type : str
278 'range' (fill in session, start and stop params), 'tail' (fill in n)
276 'range' (fill in session, start and stop params), 'tail' (fill in n)
279 or 'search' (fill in pattern param).
277 or 'search' (fill in pattern param).
280
278
281 session : int
279 session : int
282 For a range request, the session from which to get lines. Session
280 For a range request, the session from which to get lines. Session
283 numbers are positive integers; negative ones count back from the
281 numbers are positive integers; negative ones count back from the
284 current session.
282 current session.
285 start : int
283 start : int
286 The first line number of a history range.
284 The first line number of a history range.
287 stop : int
285 stop : int
288 The final (excluded) line number of a history range.
286 The final (excluded) line number of a history range.
289
287
290 n : int
288 n : int
291 The number of lines of history to get for a tail request.
289 The number of lines of history to get for a tail request.
292
290
293 pattern : str
291 pattern : str
294 The glob-syntax pattern for a search request.
292 The glob-syntax pattern for a search request.
295
293
296 Returns
294 Returns
297 -------
295 -------
298 The msg_id of the message sent.
296 The msg_id of the message sent.
299 """
297 """
300 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
298 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
301 **kwargs)
299 **kwargs)
302 msg = self.session.msg('history_request', content)
300 msg = self.session.msg('history_request', content)
303 self._queue_send(msg)
301 self._queue_send(msg)
304 return msg['header']['msg_id']
302 return msg['header']['msg_id']
305
303
306 def kernel_info(self):
304 def kernel_info(self):
307 """Request kernel info."""
305 """Request kernel info."""
308 msg = self.session.msg('kernel_info_request')
306 msg = self.session.msg('kernel_info_request')
309 self._queue_send(msg)
307 self._queue_send(msg)
310 return msg['header']['msg_id']
308 return msg['header']['msg_id']
311
309
312 def _handle_kernel_info_reply(self, msg):
310 def _handle_kernel_info_reply(self, msg):
313 """handle kernel info reply
311 """handle kernel info reply
314
312
315 sets protocol adaptation version
313 sets protocol adaptation version
316 """
314 """
317 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
315 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
318 if adapt_version != major_protocol_version:
316 if adapt_version != major_protocol_version:
319 self.session.adapt_version = adapt_version
317 self.session.adapt_version = adapt_version
320
318
321 def shutdown(self, restart=False):
319 def shutdown(self, restart=False):
322 """Request an immediate kernel shutdown.
320 """Request an immediate kernel shutdown.
323
321
324 Upon receipt of the (empty) reply, client code can safely assume that
322 Upon receipt of the (empty) reply, client code can safely assume that
325 the kernel has shut down and it's safe to forcefully terminate it if
323 the kernel has shut down and it's safe to forcefully terminate it if
326 it's still alive.
324 it's still alive.
327
325
328 The kernel will send the reply via a function registered with Python's
326 The kernel will send the reply via a function registered with Python's
329 atexit module, ensuring it's truly done as the kernel is done with all
327 atexit module, ensuring it's truly done as the kernel is done with all
330 normal operation.
328 normal operation.
331 """
329 """
332 # Send quit message to kernel. Once we implement kernel-side setattr,
330 # Send quit message to kernel. Once we implement kernel-side setattr,
333 # this should probably be done that way, but for now this will do.
331 # this should probably be done that way, but for now this will do.
334 msg = self.session.msg('shutdown_request', {'restart':restart})
332 msg = self.session.msg('shutdown_request', {'restart':restart})
335 self._queue_send(msg)
333 self._queue_send(msg)
336 return msg['header']['msg_id']
334 return msg['header']['msg_id']
337
335
338 def is_complete(self, code):
336 def is_complete(self, code):
339 msg = self.session.msg('is_complete_request', {'code': code})
337 msg = self.session.msg('is_complete_request', {'code': code})
340 self._queue_send(msg)
338 self._queue_send(msg)
341 return msg['header']['msg_id']
339 return msg['header']['msg_id']
342
340
343 def _recv(self, **kwargs):
341 def _recv(self, **kwargs):
344 # Listen for kernel_info_reply message to do protocol adaptation
342 # Listen for kernel_info_reply message to do protocol adaptation
345 msg = ZMQSocketChannel._recv(self, **kwargs)
343 msg = ZMQSocketChannel._recv(self, **kwargs)
346 if msg['msg_type'] == 'kernel_info_reply':
344 if msg['msg_type'] == 'kernel_info_reply':
347 self._handle_kernel_info_reply(msg)
345 self._handle_kernel_info_reply(msg)
348 return msg
346 return msg
349
347
350
348
351 class BlockingIOPubChannel(ZMQSocketChannel):
349 class BlockingIOPubChannel(ZMQSocketChannel):
352 """The iopub channel which listens for messages that the kernel publishes.
350 """The iopub channel which listens for messages that the kernel publishes.
353
351
354 This channel is where all output is published to frontends.
352 This channel is where all output is published to frontends.
355 """
353 """
356 def start(self):
354 def start(self):
357 self.socket = self.context.socket(zmq.SUB)
355 self.socket = make_iopub_socket(self.context, self.session.bsession, self.address)
358 self.socket.linger = 1000
359 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
360 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
361 self.socket.connect(self.address)
362
363
356
364 class BlockingStdInChannel(ZMQSocketChannel):
357 class BlockingStdInChannel(ZMQSocketChannel):
365 """The stdin channel to handle raw_input requests that the kernel makes."""
358 """The stdin channel to handle raw_input requests that the kernel makes."""
366 msg_queue = None
359 msg_queue = None
367 proxy_methods = ['input']
360 proxy_methods = ['input']
368
361
369 def start(self):
362 def start(self):
370 self.socket = self.context.socket(zmq.DEALER)
363 self.socket = make_stdin_socket(self.context, self.session.bsession, self.address)
371 self.socket.linger = 1000
372 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
373 self.socket.connect(self.address)
374
364
375 def input(self, string):
365 def input(self, string):
376 """Send a string of raw input to the kernel."""
366 """Send a string of raw input to the kernel."""
377 content = dict(value=string)
367 content = dict(value=string)
378 msg = self.session.msg('input_reply', content)
368 msg = self.session.msg('input_reply', content)
379 self._queue_send(msg)
369 self._queue_send(msg)
380
370
381
371
382 class BlockingHBChannel(HBChannel):
372 class BlockingHBChannel(HBChannel):
383
373
384 # This kernel needs quicker monitoring, shorten to 1 sec.
374 # This kernel needs quicker monitoring, shorten to 1 sec.
385 # less than 0.5s is unreliable, and will get occasional
375 # less than 0.5s is unreliable, and will get occasional
386 # false reports of missed beats.
376 # false reports of missed beats.
387 time_to_dead = 1.
377 time_to_dead = 1.
388
378
389 def call_handlers(self, since_last_heartbeat):
379 def call_handlers(self, since_last_heartbeat):
390 """ Pause beating on missed heartbeat. """
380 """ Pause beating on missed heartbeat. """
391 pass
381 pass
@@ -1,644 +1,665 b''
1 """Base classes to manage a Client's interaction with a running kernel"""
1 """Base classes to manage a Client's interaction with a running kernel"""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from __future__ import absolute_import
6 from __future__ import absolute_import
7
7
8 import atexit
8 import atexit
9 import errno
9 import errno
10 from threading import Thread
10 from threading import Thread
11 import time
11 import time
12
12
13 import zmq
13 import zmq
14 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
14 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
15 # during garbage collection of threads at exit:
15 # during garbage collection of threads at exit:
16 from zmq import ZMQError
16 from zmq import ZMQError
17 from zmq.eventloop import ioloop, zmqstream
17 from zmq.eventloop import ioloop, zmqstream
18
18
19 from IPython.core.release import kernel_protocol_version_info
19 from IPython.core.release import kernel_protocol_version_info
20
20
21 from .channelsabc import (
21 from .channelsabc import (
22 ShellChannelABC, IOPubChannelABC,
22 ShellChannelABC, IOPubChannelABC,
23 HBChannelABC, StdInChannelABC,
23 HBChannelABC, StdInChannelABC,
24 )
24 )
25 from IPython.utils.py3compat import string_types, iteritems
25 from IPython.utils.py3compat import string_types, iteritems
26
26
27 #-----------------------------------------------------------------------------
27 #-----------------------------------------------------------------------------
28 # Constants and exceptions
28 # Constants and exceptions
29 #-----------------------------------------------------------------------------
29 #-----------------------------------------------------------------------------
30
30
31 major_protocol_version = kernel_protocol_version_info[0]
31 major_protocol_version = kernel_protocol_version_info[0]
32
32
33 class InvalidPortNumber(Exception):
33 class InvalidPortNumber(Exception):
34 pass
34 pass
35
35
36 #-----------------------------------------------------------------------------
36 #-----------------------------------------------------------------------------
37 # Utility functions
37 # Utility functions
38 #-----------------------------------------------------------------------------
38 #-----------------------------------------------------------------------------
39
39
40 # some utilities to validate message structure, these might get moved elsewhere
40 # some utilities to validate message structure, these might get moved elsewhere
41 # if they prove to have more generic utility
41 # if they prove to have more generic utility
42
42
43 def validate_string_list(lst):
43 def validate_string_list(lst):
44 """Validate that the input is a list of strings.
44 """Validate that the input is a list of strings.
45
45
46 Raises ValueError if not."""
46 Raises ValueError if not."""
47 if not isinstance(lst, list):
47 if not isinstance(lst, list):
48 raise ValueError('input %r must be a list' % lst)
48 raise ValueError('input %r must be a list' % lst)
49 for x in lst:
49 for x in lst:
50 if not isinstance(x, string_types):
50 if not isinstance(x, string_types):
51 raise ValueError('element %r in list must be a string' % x)
51 raise ValueError('element %r in list must be a string' % x)
52
52
53
53
54 def validate_string_dict(dct):
54 def validate_string_dict(dct):
55 """Validate that the input is a dict with string keys and values.
55 """Validate that the input is a dict with string keys and values.
56
56
57 Raises ValueError if not."""
57 Raises ValueError if not."""
58 for k,v in iteritems(dct):
58 for k,v in iteritems(dct):
59 if not isinstance(k, string_types):
59 if not isinstance(k, string_types):
60 raise ValueError('key %r in dict must be a string' % k)
60 raise ValueError('key %r in dict must be a string' % k)
61 if not isinstance(v, string_types):
61 if not isinstance(v, string_types):
62 raise ValueError('value %r in dict must be a string' % v)
62 raise ValueError('value %r in dict must be a string' % v)
63
63
64
64
65 #-----------------------------------------------------------------------------
65 #-----------------------------------------------------------------------------
66 # ZMQ Socket Channel classes
66 # ZMQ Socket Channel classes
67 #-----------------------------------------------------------------------------
67 #-----------------------------------------------------------------------------
68
68
69 class ZMQSocketChannel(Thread):
69 class ZMQSocketChannel(Thread):
70 """The base class for the channels that use ZMQ sockets."""
70 """The base class for the channels that use ZMQ sockets."""
71 context = None
71 context = None
72 session = None
72 session = None
73 socket = None
73 socket = None
74 ioloop = None
74 ioloop = None
75 stream = None
75 stream = None
76 _address = None
76 _address = None
77 _exiting = False
77 _exiting = False
78 proxy_methods = []
78 proxy_methods = []
79
79
80 def __init__(self, context, session, address):
80 def __init__(self, context, session, address):
81 """Create a channel.
81 """Create a channel.
82
82
83 Parameters
83 Parameters
84 ----------
84 ----------
85 context : :class:`zmq.Context`
85 context : :class:`zmq.Context`
86 The ZMQ context to use.
86 The ZMQ context to use.
87 session : :class:`session.Session`
87 session : :class:`session.Session`
88 The session to use.
88 The session to use.
89 address : zmq url
89 address : zmq url
90 Standard (ip, port) tuple that the kernel is listening on.
90 Standard (ip, port) tuple that the kernel is listening on.
91 """
91 """
92 super(ZMQSocketChannel, self).__init__()
92 super(ZMQSocketChannel, self).__init__()
93 self.daemon = True
93 self.daemon = True
94
94
95 self.context = context
95 self.context = context
96 self.session = session
96 self.session = session
97 if isinstance(address, tuple):
97 if isinstance(address, tuple):
98 if address[1] == 0:
98 if address[1] == 0:
99 message = 'The port number for a channel cannot be 0.'
99 message = 'The port number for a channel cannot be 0.'
100 raise InvalidPortNumber(message)
100 raise InvalidPortNumber(message)
101 address = "tcp://%s:%i" % address
101 address = "tcp://%s:%i" % address
102 self._address = address
102 self._address = address
103 atexit.register(self._notice_exit)
103 atexit.register(self._notice_exit)
104
104
105 def _notice_exit(self):
105 def _notice_exit(self):
106 self._exiting = True
106 self._exiting = True
107
107
108 def _run_loop(self):
108 def _run_loop(self):
109 """Run my loop, ignoring EINTR events in the poller"""
109 """Run my loop, ignoring EINTR events in the poller"""
110 while True:
110 while True:
111 try:
111 try:
112 self.ioloop.start()
112 self.ioloop.start()
113 except ZMQError as e:
113 except ZMQError as e:
114 if e.errno == errno.EINTR:
114 if e.errno == errno.EINTR:
115 continue
115 continue
116 else:
116 else:
117 raise
117 raise
118 except Exception:
118 except Exception:
119 if self._exiting:
119 if self._exiting:
120 break
120 break
121 else:
121 else:
122 raise
122 raise
123 else:
123 else:
124 break
124 break
125
125
126 def stop(self):
126 def stop(self):
127 """Stop the channel's event loop and join its thread.
127 """Stop the channel's event loop and join its thread.
128
128
129 This calls :meth:`~threading.Thread.join` and returns when the thread
129 This calls :meth:`~threading.Thread.join` and returns when the thread
130 terminates. :class:`RuntimeError` will be raised if
130 terminates. :class:`RuntimeError` will be raised if
131 :meth:`~threading.Thread.start` is called again.
131 :meth:`~threading.Thread.start` is called again.
132 """
132 """
133 if self.ioloop is not None:
133 if self.ioloop is not None:
134 self.ioloop.stop()
134 self.ioloop.stop()
135 self.join()
135 self.join()
136 self.close()
136 self.close()
137
137
138 def close(self):
138 def close(self):
139 if self.ioloop is not None:
139 if self.ioloop is not None:
140 try:
140 try:
141 self.ioloop.close(all_fds=True)
141 self.ioloop.close(all_fds=True)
142 except Exception:
142 except Exception:
143 pass
143 pass
144 if self.socket is not None:
144 if self.socket is not None:
145 try:
145 try:
146 self.socket.close(linger=0)
146 self.socket.close(linger=0)
147 except Exception:
147 except Exception:
148 pass
148 pass
149 self.socket = None
149 self.socket = None
150
150
151 @property
151 @property
152 def address(self):
152 def address(self):
153 """Get the channel's address as a zmq url string.
153 """Get the channel's address as a zmq url string.
154
154
155 These URLS have the form: 'tcp://127.0.0.1:5555'.
155 These URLS have the form: 'tcp://127.0.0.1:5555'.
156 """
156 """
157 return self._address
157 return self._address
158
158
159 def _queue_send(self, msg):
159 def _queue_send(self, msg):
160 """Queue a message to be sent from the IOLoop's thread.
160 """Queue a message to be sent from the IOLoop's thread.
161
161
162 Parameters
162 Parameters
163 ----------
163 ----------
164 msg : message to send
164 msg : message to send
165
165
166 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
166 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
167 thread control of the action.
167 thread control of the action.
168 """
168 """
169 def thread_send():
169 def thread_send():
170 self.session.send(self.stream, msg)
170 self.session.send(self.stream, msg)
171 self.ioloop.add_callback(thread_send)
171 self.ioloop.add_callback(thread_send)
172
172
173 def _handle_recv(self, msg):
173 def _handle_recv(self, msg):
174 """Callback for stream.on_recv.
174 """Callback for stream.on_recv.
175
175
176 Unpacks message, and calls handlers with it.
176 Unpacks message, and calls handlers with it.
177 """
177 """
178 ident,smsg = self.session.feed_identities(msg)
178 ident,smsg = self.session.feed_identities(msg)
179 msg = self.session.deserialize(smsg)
179 msg = self.session.deserialize(smsg)
180 self.call_handlers(msg)
180 self.call_handlers(msg)
181
181
182
182
183
183
184 class ShellChannel(ZMQSocketChannel):
184 class ShellChannel(ZMQSocketChannel):
185 """The shell channel for issuing request/replies to the kernel."""
185 """The shell channel for issuing request/replies to the kernel."""
186
186
187 command_queue = None
187 command_queue = None
188 # flag for whether execute requests should be allowed to call raw_input:
188 # flag for whether execute requests should be allowed to call raw_input:
189 allow_stdin = True
189 allow_stdin = True
190 proxy_methods = [
190 proxy_methods = [
191 'execute',
191 'execute',
192 'complete',
192 'complete',
193 'inspect',
193 'inspect',
194 'history',
194 'history',
195 'kernel_info',
195 'kernel_info',
196 'shutdown',
196 'shutdown',
197 'is_complete',
197 'is_complete',
198 ]
198 ]
199
199
200 def __init__(self, context, session, address):
200 def __init__(self, context, session, address):
201 super(ShellChannel, self).__init__(context, session, address)
201 super(ShellChannel, self).__init__(context, session, address)
202 self.ioloop = ioloop.IOLoop()
202 self.ioloop = ioloop.IOLoop()
203
203
204 def run(self):
204 def run(self):
205 """The thread's main activity. Call start() instead."""
205 """The thread's main activity. Call start() instead."""
206 self.socket = self.context.socket(zmq.DEALER)
206 self.socket = self.context.socket(zmq.DEALER)
207 self.socket.linger = 1000
207 self.socket.linger = 1000
208 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
208 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
209 self.socket.connect(self.address)
209 self.socket.connect(self.address)
210 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
210 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
211 self.stream.on_recv(self._handle_recv)
211 self.stream.on_recv(self._handle_recv)
212 self._run_loop()
212 self._run_loop()
213
213
214 def call_handlers(self, msg):
214 def call_handlers(self, msg):
215 """This method is called in the ioloop thread when a message arrives.
215 """This method is called in the ioloop thread when a message arrives.
216
216
217 Subclasses should override this method to handle incoming messages.
217 Subclasses should override this method to handle incoming messages.
218 It is important to remember that this method is called in the thread
218 It is important to remember that this method is called in the thread
219 so that some logic must be done to ensure that the application level
219 so that some logic must be done to ensure that the application level
220 handlers are called in the application thread.
220 handlers are called in the application thread.
221 """
221 """
222 raise NotImplementedError('call_handlers must be defined in a subclass.')
222 raise NotImplementedError('call_handlers must be defined in a subclass.')
223
223
224 def execute(self, code, silent=False, store_history=True,
224 def execute(self, code, silent=False, store_history=True,
225 user_expressions=None, allow_stdin=None):
225 user_expressions=None, allow_stdin=None):
226 """Execute code in the kernel.
226 """Execute code in the kernel.
227
227
228 Parameters
228 Parameters
229 ----------
229 ----------
230 code : str
230 code : str
231 A string of Python code.
231 A string of Python code.
232
232
233 silent : bool, optional (default False)
233 silent : bool, optional (default False)
234 If set, the kernel will execute the code as quietly possible, and
234 If set, the kernel will execute the code as quietly possible, and
235 will force store_history to be False.
235 will force store_history to be False.
236
236
237 store_history : bool, optional (default True)
237 store_history : bool, optional (default True)
238 If set, the kernel will store command history. This is forced
238 If set, the kernel will store command history. This is forced
239 to be False if silent is True.
239 to be False if silent is True.
240
240
241 user_expressions : dict, optional
241 user_expressions : dict, optional
242 A dict mapping names to expressions to be evaluated in the user's
242 A dict mapping names to expressions to be evaluated in the user's
243 dict. The expression values are returned as strings formatted using
243 dict. The expression values are returned as strings formatted using
244 :func:`repr`.
244 :func:`repr`.
245
245
246 allow_stdin : bool, optional (default self.allow_stdin)
246 allow_stdin : bool, optional (default self.allow_stdin)
247 Flag for whether the kernel can send stdin requests to frontends.
247 Flag for whether the kernel can send stdin requests to frontends.
248
248
249 Some frontends (e.g. the Notebook) do not support stdin requests.
249 Some frontends (e.g. the Notebook) do not support stdin requests.
250 If raw_input is called from code executed from such a frontend, a
250 If raw_input is called from code executed from such a frontend, a
251 StdinNotImplementedError will be raised.
251 StdinNotImplementedError will be raised.
252
252
253 Returns
253 Returns
254 -------
254 -------
255 The msg_id of the message sent.
255 The msg_id of the message sent.
256 """
256 """
257 if user_expressions is None:
257 if user_expressions is None:
258 user_expressions = {}
258 user_expressions = {}
259 if allow_stdin is None:
259 if allow_stdin is None:
260 allow_stdin = self.allow_stdin
260 allow_stdin = self.allow_stdin
261
261
262
262
263 # Don't waste network traffic if inputs are invalid
263 # Don't waste network traffic if inputs are invalid
264 if not isinstance(code, string_types):
264 if not isinstance(code, string_types):
265 raise ValueError('code %r must be a string' % code)
265 raise ValueError('code %r must be a string' % code)
266 validate_string_dict(user_expressions)
266 validate_string_dict(user_expressions)
267
267
268 # Create class for content/msg creation. Related to, but possibly
268 # Create class for content/msg creation. Related to, but possibly
269 # not in Session.
269 # not in Session.
270 content = dict(code=code, silent=silent, store_history=store_history,
270 content = dict(code=code, silent=silent, store_history=store_history,
271 user_expressions=user_expressions,
271 user_expressions=user_expressions,
272 allow_stdin=allow_stdin,
272 allow_stdin=allow_stdin,
273 )
273 )
274 msg = self.session.msg('execute_request', content)
274 msg = self.session.msg('execute_request', content)
275 self._queue_send(msg)
275 self._queue_send(msg)
276 return msg['header']['msg_id']
276 return msg['header']['msg_id']
277
277
278 def complete(self, code, cursor_pos=None):
278 def complete(self, code, cursor_pos=None):
279 """Tab complete text in the kernel's namespace.
279 """Tab complete text in the kernel's namespace.
280
280
281 Parameters
281 Parameters
282 ----------
282 ----------
283 code : str
283 code : str
284 The context in which completion is requested.
284 The context in which completion is requested.
285 Can be anything between a variable name and an entire cell.
285 Can be anything between a variable name and an entire cell.
286 cursor_pos : int, optional
286 cursor_pos : int, optional
287 The position of the cursor in the block of code where the completion was requested.
287 The position of the cursor in the block of code where the completion was requested.
288 Default: ``len(code)``
288 Default: ``len(code)``
289
289
290 Returns
290 Returns
291 -------
291 -------
292 The msg_id of the message sent.
292 The msg_id of the message sent.
293 """
293 """
294 if cursor_pos is None:
294 if cursor_pos is None:
295 cursor_pos = len(code)
295 cursor_pos = len(code)
296 content = dict(code=code, cursor_pos=cursor_pos)
296 content = dict(code=code, cursor_pos=cursor_pos)
297 msg = self.session.msg('complete_request', content)
297 msg = self.session.msg('complete_request', content)
298 self._queue_send(msg)
298 self._queue_send(msg)
299 return msg['header']['msg_id']
299 return msg['header']['msg_id']
300
300
301 def inspect(self, code, cursor_pos=None, detail_level=0):
301 def inspect(self, code, cursor_pos=None, detail_level=0):
302 """Get metadata information about an object in the kernel's namespace.
302 """Get metadata information about an object in the kernel's namespace.
303
303
304 It is up to the kernel to determine the appropriate object to inspect.
304 It is up to the kernel to determine the appropriate object to inspect.
305
305
306 Parameters
306 Parameters
307 ----------
307 ----------
308 code : str
308 code : str
309 The context in which info is requested.
309 The context in which info is requested.
310 Can be anything between a variable name and an entire cell.
310 Can be anything between a variable name and an entire cell.
311 cursor_pos : int, optional
311 cursor_pos : int, optional
312 The position of the cursor in the block of code where the info was requested.
312 The position of the cursor in the block of code where the info was requested.
313 Default: ``len(code)``
313 Default: ``len(code)``
314 detail_level : int, optional
314 detail_level : int, optional
315 The level of detail for the introspection (0-2)
315 The level of detail for the introspection (0-2)
316
316
317 Returns
317 Returns
318 -------
318 -------
319 The msg_id of the message sent.
319 The msg_id of the message sent.
320 """
320 """
321 if cursor_pos is None:
321 if cursor_pos is None:
322 cursor_pos = len(code)
322 cursor_pos = len(code)
323 content = dict(code=code, cursor_pos=cursor_pos,
323 content = dict(code=code, cursor_pos=cursor_pos,
324 detail_level=detail_level,
324 detail_level=detail_level,
325 )
325 )
326 msg = self.session.msg('inspect_request', content)
326 msg = self.session.msg('inspect_request', content)
327 self._queue_send(msg)
327 self._queue_send(msg)
328 return msg['header']['msg_id']
328 return msg['header']['msg_id']
329
329
330 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
330 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
331 """Get entries from the kernel's history list.
331 """Get entries from the kernel's history list.
332
332
333 Parameters
333 Parameters
334 ----------
334 ----------
335 raw : bool
335 raw : bool
336 If True, return the raw input.
336 If True, return the raw input.
337 output : bool
337 output : bool
338 If True, then return the output as well.
338 If True, then return the output as well.
339 hist_access_type : str
339 hist_access_type : str
340 'range' (fill in session, start and stop params), 'tail' (fill in n)
340 'range' (fill in session, start and stop params), 'tail' (fill in n)
341 or 'search' (fill in pattern param).
341 or 'search' (fill in pattern param).
342
342
343 session : int
343 session : int
344 For a range request, the session from which to get lines. Session
344 For a range request, the session from which to get lines. Session
345 numbers are positive integers; negative ones count back from the
345 numbers are positive integers; negative ones count back from the
346 current session.
346 current session.
347 start : int
347 start : int
348 The first line number of a history range.
348 The first line number of a history range.
349 stop : int
349 stop : int
350 The final (excluded) line number of a history range.
350 The final (excluded) line number of a history range.
351
351
352 n : int
352 n : int
353 The number of lines of history to get for a tail request.
353 The number of lines of history to get for a tail request.
354
354
355 pattern : str
355 pattern : str
356 The glob-syntax pattern for a search request.
356 The glob-syntax pattern for a search request.
357
357
358 Returns
358 Returns
359 -------
359 -------
360 The msg_id of the message sent.
360 The msg_id of the message sent.
361 """
361 """
362 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
362 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
363 **kwargs)
363 **kwargs)
364 msg = self.session.msg('history_request', content)
364 msg = self.session.msg('history_request', content)
365 self._queue_send(msg)
365 self._queue_send(msg)
366 return msg['header']['msg_id']
366 return msg['header']['msg_id']
367
367
368 def kernel_info(self):
368 def kernel_info(self):
369 """Request kernel info."""
369 """Request kernel info."""
370 msg = self.session.msg('kernel_info_request')
370 msg = self.session.msg('kernel_info_request')
371 self._queue_send(msg)
371 self._queue_send(msg)
372 return msg['header']['msg_id']
372 return msg['header']['msg_id']
373
373
374 def _handle_kernel_info_reply(self, msg):
374 def _handle_kernel_info_reply(self, msg):
375 """handle kernel info reply
375 """handle kernel info reply
376
376
377 sets protocol adaptation version
377 sets protocol adaptation version
378 """
378 """
379 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
379 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
380 if adapt_version != major_protocol_version:
380 if adapt_version != major_protocol_version:
381 self.session.adapt_version = adapt_version
381 self.session.adapt_version = adapt_version
382
382
383 def shutdown(self, restart=False):
383 def shutdown(self, restart=False):
384 """Request an immediate kernel shutdown.
384 """Request an immediate kernel shutdown.
385
385
386 Upon receipt of the (empty) reply, client code can safely assume that
386 Upon receipt of the (empty) reply, client code can safely assume that
387 the kernel has shut down and it's safe to forcefully terminate it if
387 the kernel has shut down and it's safe to forcefully terminate it if
388 it's still alive.
388 it's still alive.
389
389
390 The kernel will send the reply via a function registered with Python's
390 The kernel will send the reply via a function registered with Python's
391 atexit module, ensuring it's truly done as the kernel is done with all
391 atexit module, ensuring it's truly done as the kernel is done with all
392 normal operation.
392 normal operation.
393 """
393 """
394 # Send quit message to kernel. Once we implement kernel-side setattr,
394 # Send quit message to kernel. Once we implement kernel-side setattr,
395 # this should probably be done that way, but for now this will do.
395 # this should probably be done that way, but for now this will do.
396 msg = self.session.msg('shutdown_request', {'restart':restart})
396 msg = self.session.msg('shutdown_request', {'restart':restart})
397 self._queue_send(msg)
397 self._queue_send(msg)
398 return msg['header']['msg_id']
398 return msg['header']['msg_id']
399
399
400 def is_complete(self, code):
400 def is_complete(self, code):
401 msg = self.session.msg('is_complete_request', {'code': code})
401 msg = self.session.msg('is_complete_request', {'code': code})
402 self._queue_send(msg)
402 self._queue_send(msg)
403 return msg['header']['msg_id']
403 return msg['header']['msg_id']
404
404
405
405
406 class IOPubChannel(ZMQSocketChannel):
406 class IOPubChannel(ZMQSocketChannel):
407 """The iopub channel which listens for messages that the kernel publishes.
407 """The iopub channel which listens for messages that the kernel publishes.
408
408
409 This channel is where all output is published to frontends.
409 This channel is where all output is published to frontends.
410 """
410 """
411
411
412 def __init__(self, context, session, address):
412 def __init__(self, context, session, address):
413 super(IOPubChannel, self).__init__(context, session, address)
413 super(IOPubChannel, self).__init__(context, session, address)
414 self.ioloop = ioloop.IOLoop()
414 self.ioloop = ioloop.IOLoop()
415
415
416 def run(self):
416 def run(self):
417 """The thread's main activity. Call start() instead."""
417 """The thread's main activity. Call start() instead."""
418 self.socket = self.context.socket(zmq.SUB)
418 self.socket = self.context.socket(zmq.SUB)
419 self.socket.linger = 1000
419 self.socket.linger = 1000
420 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
420 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
421 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
421 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
422 self.socket.connect(self.address)
422 self.socket.connect(self.address)
423 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
423 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
424 self.stream.on_recv(self._handle_recv)
424 self.stream.on_recv(self._handle_recv)
425 self._run_loop()
425 self._run_loop()
426
426
427 def call_handlers(self, msg):
427 def call_handlers(self, msg):
428 """This method is called in the ioloop thread when a message arrives.
428 """This method is called in the ioloop thread when a message arrives.
429
429
430 Subclasses should override this method to handle incoming messages.
430 Subclasses should override this method to handle incoming messages.
431 It is important to remember that this method is called in the thread
431 It is important to remember that this method is called in the thread
432 so that some logic must be done to ensure that the application leve
432 so that some logic must be done to ensure that the application leve
433 handlers are called in the application thread.
433 handlers are called in the application thread.
434 """
434 """
435 raise NotImplementedError('call_handlers must be defined in a subclass.')
435 raise NotImplementedError('call_handlers must be defined in a subclass.')
436
436
437 def flush(self, timeout=1.0):
437 def flush(self, timeout=1.0):
438 """Immediately processes all pending messages on the iopub channel.
438 """Immediately processes all pending messages on the iopub channel.
439
439
440 Callers should use this method to ensure that :meth:`call_handlers`
440 Callers should use this method to ensure that :meth:`call_handlers`
441 has been called for all messages that have been received on the
441 has been called for all messages that have been received on the
442 0MQ SUB socket of this channel.
442 0MQ SUB socket of this channel.
443
443
444 This method is thread safe.
444 This method is thread safe.
445
445
446 Parameters
446 Parameters
447 ----------
447 ----------
448 timeout : float, optional
448 timeout : float, optional
449 The maximum amount of time to spend flushing, in seconds. The
449 The maximum amount of time to spend flushing, in seconds. The
450 default is one second.
450 default is one second.
451 """
451 """
452 # We do the IOLoop callback process twice to ensure that the IOLoop
452 # We do the IOLoop callback process twice to ensure that the IOLoop
453 # gets to perform at least one full poll.
453 # gets to perform at least one full poll.
454 stop_time = time.time() + timeout
454 stop_time = time.time() + timeout
455 for i in range(2):
455 for i in range(2):
456 self._flushed = False
456 self._flushed = False
457 self.ioloop.add_callback(self._flush)
457 self.ioloop.add_callback(self._flush)
458 while not self._flushed and time.time() < stop_time:
458 while not self._flushed and time.time() < stop_time:
459 time.sleep(0.01)
459 time.sleep(0.01)
460
460
461 def _flush(self):
461 def _flush(self):
462 """Callback for :method:`self.flush`."""
462 """Callback for :method:`self.flush`."""
463 self.stream.flush()
463 self.stream.flush()
464 self._flushed = True
464 self._flushed = True
465
465
466
466
467 class StdInChannel(ZMQSocketChannel):
467 class StdInChannel(ZMQSocketChannel):
468 """The stdin channel to handle raw_input requests that the kernel makes."""
468 """The stdin channel to handle raw_input requests that the kernel makes."""
469
469
470 msg_queue = None
470 msg_queue = None
471 proxy_methods = ['input']
471 proxy_methods = ['input']
472
472
473 def __init__(self, context, session, address):
473 def __init__(self, context, session, address):
474 super(StdInChannel, self).__init__(context, session, address)
474 super(StdInChannel, self).__init__(context, session, address)
475 self.ioloop = ioloop.IOLoop()
475 self.ioloop = ioloop.IOLoop()
476
476
477 def run(self):
477 def run(self):
478 """The thread's main activity. Call start() instead."""
478 """The thread's main activity. Call start() instead."""
479 self.socket = self.context.socket(zmq.DEALER)
479 self.socket = self.context.socket(zmq.DEALER)
480 self.socket.linger = 1000
480 self.socket.linger = 1000
481 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
481 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
482 self.socket.connect(self.address)
482 self.socket.connect(self.address)
483 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
483 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
484 self.stream.on_recv(self._handle_recv)
484 self.stream.on_recv(self._handle_recv)
485 self._run_loop()
485 self._run_loop()
486
486
487 def call_handlers(self, msg):
487 def call_handlers(self, msg):
488 """This method is called in the ioloop thread when a message arrives.
488 """This method is called in the ioloop thread when a message arrives.
489
489
490 Subclasses should override this method to handle incoming messages.
490 Subclasses should override this method to handle incoming messages.
491 It is important to remember that this method is called in the thread
491 It is important to remember that this method is called in the thread
492 so that some logic must be done to ensure that the application leve
492 so that some logic must be done to ensure that the application leve
493 handlers are called in the application thread.
493 handlers are called in the application thread.
494 """
494 """
495 raise NotImplementedError('call_handlers must be defined in a subclass.')
495 raise NotImplementedError('call_handlers must be defined in a subclass.')
496
496
497 def input(self, string):
497 def input(self, string):
498 """Send a string of raw input to the kernel."""
498 """Send a string of raw input to the kernel."""
499 content = dict(value=string)
499 content = dict(value=string)
500 msg = self.session.msg('input_reply', content)
500 msg = self.session.msg('input_reply', content)
501 self._queue_send(msg)
501 self._queue_send(msg)
502
502
503 def make_shell_socket(context, identity, address):
504 socket = context.socket(zmq.DEALER)
505 socket.linger = 1000
506 socket.setsockopt(zmq.IDENTITY, identity)
507 socket.connect(address)
508 return socket
509
510 def make_iopub_socket(context, identity, address):
511 socket = context.socket(zmq.SUB)
512 socket.linger = 1000
513 socket.setsockopt(zmq.SUBSCRIBE,b'')
514 socket.setsockopt(zmq.IDENTITY, identity)
515 socket.connect(address)
516 return socket
517
518 def make_stdin_socket(context, identity, address):
519 socket = context.socket(zmq.DEALER)
520 socket.linger = 1000
521 socket.setsockopt(zmq.IDENTITY, identity)
522 socket.connect(address)
523 return socket
503
524
504 class HBChannel(ZMQSocketChannel):
525 class HBChannel(ZMQSocketChannel):
505 """The heartbeat channel which monitors the kernel heartbeat.
526 """The heartbeat channel which monitors the kernel heartbeat.
506
527
507 Note that the heartbeat channel is paused by default. As long as you start
528 Note that the heartbeat channel is paused by default. As long as you start
508 this channel, the kernel manager will ensure that it is paused and un-paused
529 this channel, the kernel manager will ensure that it is paused and un-paused
509 as appropriate.
530 as appropriate.
510 """
531 """
511
532
512 time_to_dead = 3.0
533 time_to_dead = 3.0
513 socket = None
534 socket = None
514 poller = None
535 poller = None
515 _running = None
536 _running = None
516 _pause = None
537 _pause = None
517 _beating = None
538 _beating = None
518
539
519 def __init__(self, context, session, address):
540 def __init__(self, context, session, address):
520 super(HBChannel, self).__init__(context, session, address)
541 super(HBChannel, self).__init__(context, session, address)
521 self._running = False
542 self._running = False
522 self._pause =True
543 self._pause =True
523 self.poller = zmq.Poller()
544 self.poller = zmq.Poller()
524
545
525 def _create_socket(self):
546 def _create_socket(self):
526 if self.socket is not None:
547 if self.socket is not None:
527 # close previous socket, before opening a new one
548 # close previous socket, before opening a new one
528 self.poller.unregister(self.socket)
549 self.poller.unregister(self.socket)
529 self.socket.close()
550 self.socket.close()
530 self.socket = self.context.socket(zmq.REQ)
551 self.socket = self.context.socket(zmq.REQ)
531 self.socket.linger = 1000
552 self.socket.linger = 1000
532 self.socket.connect(self.address)
553 self.socket.connect(self.address)
533
554
534 self.poller.register(self.socket, zmq.POLLIN)
555 self.poller.register(self.socket, zmq.POLLIN)
535
556
536 def _poll(self, start_time):
557 def _poll(self, start_time):
537 """poll for heartbeat replies until we reach self.time_to_dead.
558 """poll for heartbeat replies until we reach self.time_to_dead.
538
559
539 Ignores interrupts, and returns the result of poll(), which
560 Ignores interrupts, and returns the result of poll(), which
540 will be an empty list if no messages arrived before the timeout,
561 will be an empty list if no messages arrived before the timeout,
541 or the event tuple if there is a message to receive.
562 or the event tuple if there is a message to receive.
542 """
563 """
543
564
544 until_dead = self.time_to_dead - (time.time() - start_time)
565 until_dead = self.time_to_dead - (time.time() - start_time)
545 # ensure poll at least once
566 # ensure poll at least once
546 until_dead = max(until_dead, 1e-3)
567 until_dead = max(until_dead, 1e-3)
547 events = []
568 events = []
548 while True:
569 while True:
549 try:
570 try:
550 events = self.poller.poll(1000 * until_dead)
571 events = self.poller.poll(1000 * until_dead)
551 except ZMQError as e:
572 except ZMQError as e:
552 if e.errno == errno.EINTR:
573 if e.errno == errno.EINTR:
553 # ignore interrupts during heartbeat
574 # ignore interrupts during heartbeat
554 # this may never actually happen
575 # this may never actually happen
555 until_dead = self.time_to_dead - (time.time() - start_time)
576 until_dead = self.time_to_dead - (time.time() - start_time)
556 until_dead = max(until_dead, 1e-3)
577 until_dead = max(until_dead, 1e-3)
557 pass
578 pass
558 else:
579 else:
559 raise
580 raise
560 except Exception:
581 except Exception:
561 if self._exiting:
582 if self._exiting:
562 break
583 break
563 else:
584 else:
564 raise
585 raise
565 else:
586 else:
566 break
587 break
567 return events
588 return events
568
589
569 def run(self):
590 def run(self):
570 """The thread's main activity. Call start() instead."""
591 """The thread's main activity. Call start() instead."""
571 self._create_socket()
592 self._create_socket()
572 self._running = True
593 self._running = True
573 self._beating = True
594 self._beating = True
574
595
575 while self._running:
596 while self._running:
576 if self._pause:
597 if self._pause:
577 # just sleep, and skip the rest of the loop
598 # just sleep, and skip the rest of the loop
578 time.sleep(self.time_to_dead)
599 time.sleep(self.time_to_dead)
579 continue
600 continue
580
601
581 since_last_heartbeat = 0.0
602 since_last_heartbeat = 0.0
582 # io.rprint('Ping from HB channel') # dbg
603 # io.rprint('Ping from HB channel') # dbg
583 # no need to catch EFSM here, because the previous event was
604 # no need to catch EFSM here, because the previous event was
584 # either a recv or connect, which cannot be followed by EFSM
605 # either a recv or connect, which cannot be followed by EFSM
585 self.socket.send(b'ping')
606 self.socket.send(b'ping')
586 request_time = time.time()
607 request_time = time.time()
587 ready = self._poll(request_time)
608 ready = self._poll(request_time)
588 if ready:
609 if ready:
589 self._beating = True
610 self._beating = True
590 # the poll above guarantees we have something to recv
611 # the poll above guarantees we have something to recv
591 self.socket.recv()
612 self.socket.recv()
592 # sleep the remainder of the cycle
613 # sleep the remainder of the cycle
593 remainder = self.time_to_dead - (time.time() - request_time)
614 remainder = self.time_to_dead - (time.time() - request_time)
594 if remainder > 0:
615 if remainder > 0:
595 time.sleep(remainder)
616 time.sleep(remainder)
596 continue
617 continue
597 else:
618 else:
598 # nothing was received within the time limit, signal heart failure
619 # nothing was received within the time limit, signal heart failure
599 self._beating = False
620 self._beating = False
600 since_last_heartbeat = time.time() - request_time
621 since_last_heartbeat = time.time() - request_time
601 self.call_handlers(since_last_heartbeat)
622 self.call_handlers(since_last_heartbeat)
602 # and close/reopen the socket, because the REQ/REP cycle has been broken
623 # and close/reopen the socket, because the REQ/REP cycle has been broken
603 self._create_socket()
624 self._create_socket()
604 continue
625 continue
605
626
606 def pause(self):
627 def pause(self):
607 """Pause the heartbeat."""
628 """Pause the heartbeat."""
608 self._pause = True
629 self._pause = True
609
630
610 def unpause(self):
631 def unpause(self):
611 """Unpause the heartbeat."""
632 """Unpause the heartbeat."""
612 self._pause = False
633 self._pause = False
613
634
614 def is_beating(self):
635 def is_beating(self):
615 """Is the heartbeat running and responsive (and not paused)."""
636 """Is the heartbeat running and responsive (and not paused)."""
616 if self.is_alive() and not self._pause and self._beating:
637 if self.is_alive() and not self._pause and self._beating:
617 return True
638 return True
618 else:
639 else:
619 return False
640 return False
620
641
621 def stop(self):
642 def stop(self):
622 """Stop the channel's event loop and join its thread."""
643 """Stop the channel's event loop and join its thread."""
623 self._running = False
644 self._running = False
624 super(HBChannel, self).stop()
645 super(HBChannel, self).stop()
625
646
626 def call_handlers(self, since_last_heartbeat):
647 def call_handlers(self, since_last_heartbeat):
627 """This method is called in the ioloop thread when a message arrives.
648 """This method is called in the ioloop thread when a message arrives.
628
649
629 Subclasses should override this method to handle incoming messages.
650 Subclasses should override this method to handle incoming messages.
630 It is important to remember that this method is called in the thread
651 It is important to remember that this method is called in the thread
631 so that some logic must be done to ensure that the application level
652 so that some logic must be done to ensure that the application level
632 handlers are called in the application thread.
653 handlers are called in the application thread.
633 """
654 """
634 raise NotImplementedError('call_handlers must be defined in a subclass.')
655 raise NotImplementedError('call_handlers must be defined in a subclass.')
635
656
636
657
637 #---------------------------------------------------------------------#-----------------------------------------------------------------------------
658 #---------------------------------------------------------------------#-----------------------------------------------------------------------------
638 # ABC Registration
659 # ABC Registration
639 #-----------------------------------------------------------------------------
660 #-----------------------------------------------------------------------------
640
661
641 ShellChannelABC.register(ShellChannel)
662 ShellChannelABC.register(ShellChannel)
642 IOPubChannelABC.register(IOPubChannel)
663 IOPubChannelABC.register(IOPubChannel)
643 HBChannelABC.register(HBChannel)
664 HBChannelABC.register(HBChannel)
644 StdInChannelABC.register(StdInChannel)
665 StdInChannelABC.register(StdInChannel)
@@ -1,582 +1,563 b''
1 """ Defines a KernelClient that provides signals and slots.
1 """ Defines a KernelClient that provides signals and slots.
2 """
2 """
3 import atexit
3 import atexit
4 import errno
4 import errno
5 from threading import Thread
5 from threading import Thread
6 import time
6 import time
7
7
8 import zmq
8 import zmq
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 # during garbage collection of threads at exit:
10 # during garbage collection of threads at exit:
11 from zmq import ZMQError
11 from zmq import ZMQError
12 from zmq.eventloop import ioloop, zmqstream
12 from zmq.eventloop import ioloop, zmqstream
13
13
14 from IPython.external.qt import QtCore
14 from IPython.external.qt import QtCore
15
15
16 # Local imports
16 # Local imports
17 from IPython.utils.traitlets import Type
17 from IPython.utils.traitlets import Type
18 from IPython.kernel.channels import HBChannel
18 from IPython.kernel.channels import HBChannel,\
19 make_shell_socket, make_iopub_socket, make_stdin_socket
19 from IPython.kernel import KernelClient
20 from IPython.kernel import KernelClient
20
21
21 from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin)
22 from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin)
22 from .util import SuperQObject
23 from .util import SuperQObject
23
24
24 class QtHBChannel(QtHBChannelMixin, HBChannel):
25 class QtHBChannel(QtHBChannelMixin, HBChannel):
25 pass
26 pass
26
27
27 from IPython.core.release import kernel_protocol_version_info
28 from IPython.core.release import kernel_protocol_version_info
28
29
29 from IPython.kernel.channelsabc import (
30 from IPython.kernel.channelsabc import (
30 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
31 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
31 )
32 )
32 from IPython.utils.py3compat import string_types, iteritems
33 from IPython.utils.py3compat import string_types, iteritems
33
34
34 major_protocol_version = kernel_protocol_version_info[0]
35 major_protocol_version = kernel_protocol_version_info[0]
35
36
36 class InvalidPortNumber(Exception):
37 class InvalidPortNumber(Exception):
37 pass
38 pass
38
39
39 # some utilities to validate message structure, these might get moved elsewhere
40 # some utilities to validate message structure, these might get moved elsewhere
40 # if they prove to have more generic utility
41 # if they prove to have more generic utility
41
42
42 def validate_string_list(lst):
43 """Validate that the input is a list of strings.
44
45 Raises ValueError if not."""
46 if not isinstance(lst, list):
47 raise ValueError('input %r must be a list' % lst)
48 for x in lst:
49 if not isinstance(x, string_types):
50 raise ValueError('element %r in list must be a string' % x)
51
52
43
53 def validate_string_dict(dct):
44 def validate_string_dict(dct):
54 """Validate that the input is a dict with string keys and values.
45 """Validate that the input is a dict with string keys and values.
55
46
56 Raises ValueError if not."""
47 Raises ValueError if not."""
57 for k,v in iteritems(dct):
48 for k,v in iteritems(dct):
58 if not isinstance(k, string_types):
49 if not isinstance(k, string_types):
59 raise ValueError('key %r in dict must be a string' % k)
50 raise ValueError('key %r in dict must be a string' % k)
60 if not isinstance(v, string_types):
51 if not isinstance(v, string_types):
61 raise ValueError('value %r in dict must be a string' % v)
52 raise ValueError('value %r in dict must be a string' % v)
62
53
63
54
64
55
65 class QtZMQSocketChannel(SuperQObject, Thread):
56 class QtZMQSocketChannel(SuperQObject, Thread):
66 """The base class for the channels that use ZMQ sockets."""
57 """The base class for the channels that use ZMQ sockets."""
67 context = None
58 context = None
68 session = None
59 session = None
69 socket = None
60 socket = None
70 ioloop = None
61 ioloop = None
71 stream = None
62 stream = None
72 _address = None
63 _address = None
73 _exiting = False
64 _exiting = False
74 proxy_methods = []
65 proxy_methods = []
75
66
76 # Emitted when the channel is started.
67 # Emitted when the channel is started.
77 started = QtCore.Signal()
68 started = QtCore.Signal()
78
69
79 # Emitted when the channel is stopped.
70 # Emitted when the channel is stopped.
80 stopped = QtCore.Signal()
71 stopped = QtCore.Signal()
81
72
82 message_received = QtCore.Signal(object)
73 message_received = QtCore.Signal(object)
83
74
84 #---------------------------------------------------------------------------
75 #---------------------------------------------------------------------------
85 # InProcessChannel interface
76 # InProcessChannel interface
86 #---------------------------------------------------------------------------
77 #---------------------------------------------------------------------------
87
78
88 def call_handlers_later(self, *args, **kwds):
79 def call_handlers_later(self, *args, **kwds):
89 """ Call the message handlers later.
80 """ Call the message handlers later.
90 """
81 """
91 do_later = lambda: self.call_handlers(*args, **kwds)
82 do_later = lambda: self.call_handlers(*args, **kwds)
92 QtCore.QTimer.singleShot(0, do_later)
83 QtCore.QTimer.singleShot(0, do_later)
93
84
94 def process_events(self):
85 def process_events(self):
95 """ Process any pending GUI events.
86 """ Process any pending GUI events.
96 """
87 """
97 QtCore.QCoreApplication.instance().processEvents()
88 QtCore.QCoreApplication.instance().processEvents()
98
89
99 def __init__(self, context, session, address):
90 def __init__(self, context, session, address):
100 """Create a channel.
91 """Create a channel.
101
92
102 Parameters
93 Parameters
103 ----------
94 ----------
104 context : :class:`zmq.Context`
95 context : :class:`zmq.Context`
105 The ZMQ context to use.
96 The ZMQ context to use.
106 session : :class:`session.Session`
97 session : :class:`session.Session`
107 The session to use.
98 The session to use.
108 address : zmq url
99 address : zmq url
109 Standard (ip, port) tuple that the kernel is listening on.
100 Standard (ip, port) tuple that the kernel is listening on.
110 """
101 """
111 super(QtZMQSocketChannel, self).__init__()
102 super(QtZMQSocketChannel, self).__init__()
112 self.daemon = True
103 self.daemon = True
113
104
114 self.context = context
105 self.context = context
115 self.session = session
106 self.session = session
116 if isinstance(address, tuple):
107 if isinstance(address, tuple):
117 if address[1] == 0:
108 if address[1] == 0:
118 message = 'The port number for a channel cannot be 0.'
109 message = 'The port number for a channel cannot be 0.'
119 raise InvalidPortNumber(message)
110 raise InvalidPortNumber(message)
120 address = "tcp://%s:%i" % address
111 address = "tcp://%s:%i" % address
121 self._address = address
112 self._address = address
122 atexit.register(self._notice_exit)
113 atexit.register(self._notice_exit)
123
114
124 def _notice_exit(self):
115 def _notice_exit(self):
125 self._exiting = True
116 self._exiting = True
126
117
127 def _run_loop(self):
118 def _run_loop(self):
128 """Run my loop, ignoring EINTR events in the poller"""
119 """Run my loop, ignoring EINTR events in the poller"""
129 while True:
120 while True:
130 try:
121 try:
131 self.ioloop.start()
122 self.ioloop.start()
132 except ZMQError as e:
123 except ZMQError as e:
133 if e.errno == errno.EINTR:
124 if e.errno == errno.EINTR:
134 continue
125 continue
135 else:
126 else:
136 raise
127 raise
137 except Exception:
128 except Exception:
138 if self._exiting:
129 if self._exiting:
139 break
130 break
140 else:
131 else:
141 raise
132 raise
142 else:
133 else:
143 break
134 break
144
135
145 def start(self):
136 def start(self):
146 """ Reimplemented to emit signal.
137 """ Reimplemented to emit signal.
147 """
138 """
148 super(QtZMQSocketChannel, self).start()
139 super(QtZMQSocketChannel, self).start()
149 self.started.emit()
140 self.started.emit()
150
141
151 def stop(self):
142 def stop(self):
152 """Stop the channel's event loop and join its thread.
143 """Stop the channel's event loop and join its thread.
153
144
154 This calls :meth:`~threading.Thread.join` and returns when the thread
145 This calls :meth:`~threading.Thread.join` and returns when the thread
155 terminates. :class:`RuntimeError` will be raised if
146 terminates. :class:`RuntimeError` will be raised if
156 :meth:`~threading.Thread.start` is called again.
147 :meth:`~threading.Thread.start` is called again.
157 """
148 """
158 if self.ioloop is not None:
149 if self.ioloop is not None:
159 self.ioloop.stop()
150 self.ioloop.stop()
160 self.join()
151 self.join()
161 self.close()
152 self.close()
162 self.stopped.emit()
153 self.stopped.emit()
163
154
164 def close(self):
155 def close(self):
165 if self.ioloop is not None:
156 if self.ioloop is not None:
166 try:
157 try:
167 self.ioloop.close(all_fds=True)
158 self.ioloop.close(all_fds=True)
168 except Exception:
159 except Exception:
169 pass
160 pass
170 if self.socket is not None:
161 if self.socket is not None:
171 try:
162 try:
172 self.socket.close(linger=0)
163 self.socket.close(linger=0)
173 except Exception:
164 except Exception:
174 pass
165 pass
175 self.socket = None
166 self.socket = None
176
167
177 @property
168 @property
178 def address(self):
169 def address(self):
179 """Get the channel's address as a zmq url string.
170 """Get the channel's address as a zmq url string.
180
171
181 These URLS have the form: 'tcp://127.0.0.1:5555'.
172 These URLS have the form: 'tcp://127.0.0.1:5555'.
182 """
173 """
183 return self._address
174 return self._address
184
175
185 def _queue_send(self, msg):
176 def _queue_send(self, msg):
186 """Queue a message to be sent from the IOLoop's thread.
177 """Queue a message to be sent from the IOLoop's thread.
187
178
188 Parameters
179 Parameters
189 ----------
180 ----------
190 msg : message to send
181 msg : message to send
191
182
192 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
183 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
193 thread control of the action.
184 thread control of the action.
194 """
185 """
195 def thread_send():
186 def thread_send():
196 self.session.send(self.stream, msg)
187 self.session.send(self.stream, msg)
197 self.ioloop.add_callback(thread_send)
188 self.ioloop.add_callback(thread_send)
198
189
199 def _handle_recv(self, msg):
190 def _handle_recv(self, msg):
200 """Callback for stream.on_recv.
191 """Callback for stream.on_recv.
201
192
202 Unpacks message, and calls handlers with it.
193 Unpacks message, and calls handlers with it.
203 """
194 """
204 ident,smsg = self.session.feed_identities(msg)
195 ident,smsg = self.session.feed_identities(msg)
205 msg = self.session.deserialize(smsg)
196 msg = self.session.deserialize(smsg)
206 self.call_handlers(msg)
197 self.call_handlers(msg)
207
198
208 def call_handlers(self, msg):
199 def call_handlers(self, msg):
209 """This method is called in the ioloop thread when a message arrives.
200 """This method is called in the ioloop thread when a message arrives.
210
201
211 Subclasses should override this method to handle incoming messages.
202 Subclasses should override this method to handle incoming messages.
212 It is important to remember that this method is called in the thread
203 It is important to remember that this method is called in the thread
213 so that some logic must be done to ensure that the application level
204 so that some logic must be done to ensure that the application level
214 handlers are called in the application thread.
205 handlers are called in the application thread.
215 """
206 """
216 # Emit the generic signal.
207 # Emit the generic signal.
217 self.message_received.emit(msg)
208 self.message_received.emit(msg)
218
209
219
210
220 class QtShellChannel(QtZMQSocketChannel):
211 class QtShellChannel(QtZMQSocketChannel):
221 """The shell channel for issuing request/replies to the kernel."""
212 """The shell channel for issuing request/replies to the kernel."""
222
213
223 command_queue = None
214 command_queue = None
224 # flag for whether execute requests should be allowed to call raw_input:
215 # flag for whether execute requests should be allowed to call raw_input:
225 allow_stdin = True
216 allow_stdin = True
226 proxy_methods = [
217 proxy_methods = [
227 'execute',
218 'execute',
228 'complete',
219 'complete',
229 'inspect',
220 'inspect',
230 'history',
221 'history',
231 'kernel_info',
222 'kernel_info',
232 'shutdown',
223 'shutdown',
233 'is_complete',
224 'is_complete',
234 ]
225 ]
235
226
236 # Emitted when a reply has been received for the corresponding request type.
227 # Emitted when a reply has been received for the corresponding request type.
237 execute_reply = QtCore.Signal(object)
228 execute_reply = QtCore.Signal(object)
238 complete_reply = QtCore.Signal(object)
229 complete_reply = QtCore.Signal(object)
239 inspect_reply = QtCore.Signal(object)
230 inspect_reply = QtCore.Signal(object)
240 history_reply = QtCore.Signal(object)
231 history_reply = QtCore.Signal(object)
241 kernel_info_reply = QtCore.Signal(object)
232 kernel_info_reply = QtCore.Signal(object)
242
233
243 def __init__(self, context, session, address):
234 def __init__(self, context, session, address):
244 super(QtShellChannel, self).__init__(context, session, address)
235 super(QtShellChannel, self).__init__(context, session, address)
245 self.ioloop = ioloop.IOLoop()
236 self.ioloop = ioloop.IOLoop()
246
237
247 def run(self):
238 def run(self):
248 """The thread's main activity. Call start() instead."""
239 """The thread's main activity. Call start() instead."""
249 self.socket = self.context.socket(zmq.DEALER)
240 self.socket = make_shell_socket(self.context, self.session.bsession, self.address)
250 self.socket.linger = 1000
251 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
252 self.socket.connect(self.address)
253 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
241 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
254 self.stream.on_recv(self._handle_recv)
242 self.stream.on_recv(self._handle_recv)
255 self._run_loop()
243 self._run_loop()
256
244
257 def call_handlers(self, msg):
245 def call_handlers(self, msg):
258 super(QtShellChannel, self).call_handlers(msg)
246 super(QtShellChannel, self).call_handlers(msg)
259
247
260 # Catch kernel_info_reply for message spec adaptation
248 # Catch kernel_info_reply for message spec adaptation
261 msg_type = msg['header']['msg_type']
249 msg_type = msg['header']['msg_type']
262 if msg_type == 'kernel_info_reply':
250 if msg_type == 'kernel_info_reply':
263 self._handle_kernel_info_reply(msg)
251 self._handle_kernel_info_reply(msg)
264
252
265 # Emit specific signals
253 # Emit specific signals
266 signal = getattr(self, msg_type, None)
254 signal = getattr(self, msg_type, None)
267 if signal:
255 if signal:
268 signal.emit(msg)
256 signal.emit(msg)
269
257
270 def execute(self, code, silent=False, store_history=True,
258 def execute(self, code, silent=False, store_history=True,
271 user_expressions=None, allow_stdin=None):
259 user_expressions=None, allow_stdin=None):
272 """Execute code in the kernel.
260 """Execute code in the kernel.
273
261
274 Parameters
262 Parameters
275 ----------
263 ----------
276 code : str
264 code : str
277 A string of Python code.
265 A string of Python code.
278
266
279 silent : bool, optional (default False)
267 silent : bool, optional (default False)
280 If set, the kernel will execute the code as quietly possible, and
268 If set, the kernel will execute the code as quietly possible, and
281 will force store_history to be False.
269 will force store_history to be False.
282
270
283 store_history : bool, optional (default True)
271 store_history : bool, optional (default True)
284 If set, the kernel will store command history. This is forced
272 If set, the kernel will store command history. This is forced
285 to be False if silent is True.
273 to be False if silent is True.
286
274
287 user_expressions : dict, optional
275 user_expressions : dict, optional
288 A dict mapping names to expressions to be evaluated in the user's
276 A dict mapping names to expressions to be evaluated in the user's
289 dict. The expression values are returned as strings formatted using
277 dict. The expression values are returned as strings formatted using
290 :func:`repr`.
278 :func:`repr`.
291
279
292 allow_stdin : bool, optional (default self.allow_stdin)
280 allow_stdin : bool, optional (default self.allow_stdin)
293 Flag for whether the kernel can send stdin requests to frontends.
281 Flag for whether the kernel can send stdin requests to frontends.
294
282
295 Some frontends (e.g. the Notebook) do not support stdin requests.
283 Some frontends (e.g. the Notebook) do not support stdin requests.
296 If raw_input is called from code executed from such a frontend, a
284 If raw_input is called from code executed from such a frontend, a
297 StdinNotImplementedError will be raised.
285 StdinNotImplementedError will be raised.
298
286
299 Returns
287 Returns
300 -------
288 -------
301 The msg_id of the message sent.
289 The msg_id of the message sent.
302 """
290 """
303 if user_expressions is None:
291 if user_expressions is None:
304 user_expressions = {}
292 user_expressions = {}
305 if allow_stdin is None:
293 if allow_stdin is None:
306 allow_stdin = self.allow_stdin
294 allow_stdin = self.allow_stdin
307
295
308
296
309 # Don't waste network traffic if inputs are invalid
297 # Don't waste network traffic if inputs are invalid
310 if not isinstance(code, string_types):
298 if not isinstance(code, string_types):
311 raise ValueError('code %r must be a string' % code)
299 raise ValueError('code %r must be a string' % code)
312 validate_string_dict(user_expressions)
300 validate_string_dict(user_expressions)
313
301
314 # Create class for content/msg creation. Related to, but possibly
302 # Create class for content/msg creation. Related to, but possibly
315 # not in Session.
303 # not in Session.
316 content = dict(code=code, silent=silent, store_history=store_history,
304 content = dict(code=code, silent=silent, store_history=store_history,
317 user_expressions=user_expressions,
305 user_expressions=user_expressions,
318 allow_stdin=allow_stdin,
306 allow_stdin=allow_stdin,
319 )
307 )
320 msg = self.session.msg('execute_request', content)
308 msg = self.session.msg('execute_request', content)
321 self._queue_send(msg)
309 self._queue_send(msg)
322 return msg['header']['msg_id']
310 return msg['header']['msg_id']
323
311
324 def complete(self, code, cursor_pos=None):
312 def complete(self, code, cursor_pos=None):
325 """Tab complete text in the kernel's namespace.
313 """Tab complete text in the kernel's namespace.
326
314
327 Parameters
315 Parameters
328 ----------
316 ----------
329 code : str
317 code : str
330 The context in which completion is requested.
318 The context in which completion is requested.
331 Can be anything between a variable name and an entire cell.
319 Can be anything between a variable name and an entire cell.
332 cursor_pos : int, optional
320 cursor_pos : int, optional
333 The position of the cursor in the block of code where the completion was requested.
321 The position of the cursor in the block of code where the completion was requested.
334 Default: ``len(code)``
322 Default: ``len(code)``
335
323
336 Returns
324 Returns
337 -------
325 -------
338 The msg_id of the message sent.
326 The msg_id of the message sent.
339 """
327 """
340 if cursor_pos is None:
328 if cursor_pos is None:
341 cursor_pos = len(code)
329 cursor_pos = len(code)
342 content = dict(code=code, cursor_pos=cursor_pos)
330 content = dict(code=code, cursor_pos=cursor_pos)
343 msg = self.session.msg('complete_request', content)
331 msg = self.session.msg('complete_request', content)
344 self._queue_send(msg)
332 self._queue_send(msg)
345 return msg['header']['msg_id']
333 return msg['header']['msg_id']
346
334
347 def inspect(self, code, cursor_pos=None, detail_level=0):
335 def inspect(self, code, cursor_pos=None, detail_level=0):
348 """Get metadata information about an object in the kernel's namespace.
336 """Get metadata information about an object in the kernel's namespace.
349
337
350 It is up to the kernel to determine the appropriate object to inspect.
338 It is up to the kernel to determine the appropriate object to inspect.
351
339
352 Parameters
340 Parameters
353 ----------
341 ----------
354 code : str
342 code : str
355 The context in which info is requested.
343 The context in which info is requested.
356 Can be anything between a variable name and an entire cell.
344 Can be anything between a variable name and an entire cell.
357 cursor_pos : int, optional
345 cursor_pos : int, optional
358 The position of the cursor in the block of code where the info was requested.
346 The position of the cursor in the block of code where the info was requested.
359 Default: ``len(code)``
347 Default: ``len(code)``
360 detail_level : int, optional
348 detail_level : int, optional
361 The level of detail for the introspection (0-2)
349 The level of detail for the introspection (0-2)
362
350
363 Returns
351 Returns
364 -------
352 -------
365 The msg_id of the message sent.
353 The msg_id of the message sent.
366 """
354 """
367 if cursor_pos is None:
355 if cursor_pos is None:
368 cursor_pos = len(code)
356 cursor_pos = len(code)
369 content = dict(code=code, cursor_pos=cursor_pos,
357 content = dict(code=code, cursor_pos=cursor_pos,
370 detail_level=detail_level,
358 detail_level=detail_level,
371 )
359 )
372 msg = self.session.msg('inspect_request', content)
360 msg = self.session.msg('inspect_request', content)
373 self._queue_send(msg)
361 self._queue_send(msg)
374 return msg['header']['msg_id']
362 return msg['header']['msg_id']
375
363
376 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
364 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
377 """Get entries from the kernel's history list.
365 """Get entries from the kernel's history list.
378
366
379 Parameters
367 Parameters
380 ----------
368 ----------
381 raw : bool
369 raw : bool
382 If True, return the raw input.
370 If True, return the raw input.
383 output : bool
371 output : bool
384 If True, then return the output as well.
372 If True, then return the output as well.
385 hist_access_type : str
373 hist_access_type : str
386 'range' (fill in session, start and stop params), 'tail' (fill in n)
374 'range' (fill in session, start and stop params), 'tail' (fill in n)
387 or 'search' (fill in pattern param).
375 or 'search' (fill in pattern param).
388
376
389 session : int
377 session : int
390 For a range request, the session from which to get lines. Session
378 For a range request, the session from which to get lines. Session
391 numbers are positive integers; negative ones count back from the
379 numbers are positive integers; negative ones count back from the
392 current session.
380 current session.
393 start : int
381 start : int
394 The first line number of a history range.
382 The first line number of a history range.
395 stop : int
383 stop : int
396 The final (excluded) line number of a history range.
384 The final (excluded) line number of a history range.
397
385
398 n : int
386 n : int
399 The number of lines of history to get for a tail request.
387 The number of lines of history to get for a tail request.
400
388
401 pattern : str
389 pattern : str
402 The glob-syntax pattern for a search request.
390 The glob-syntax pattern for a search request.
403
391
404 Returns
392 Returns
405 -------
393 -------
406 The msg_id of the message sent.
394 The msg_id of the message sent.
407 """
395 """
408 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
396 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
409 **kwargs)
397 **kwargs)
410 msg = self.session.msg('history_request', content)
398 msg = self.session.msg('history_request', content)
411 self._queue_send(msg)
399 self._queue_send(msg)
412 return msg['header']['msg_id']
400 return msg['header']['msg_id']
413
401
414 def kernel_info(self):
402 def kernel_info(self):
415 """Request kernel info."""
403 """Request kernel info."""
416 msg = self.session.msg('kernel_info_request')
404 msg = self.session.msg('kernel_info_request')
417 self._queue_send(msg)
405 self._queue_send(msg)
418 return msg['header']['msg_id']
406 return msg['header']['msg_id']
419
407
420 def _handle_kernel_info_reply(self, msg):
408 def _handle_kernel_info_reply(self, msg):
421 """handle kernel info reply
409 """handle kernel info reply
422
410
423 sets protocol adaptation version
411 sets protocol adaptation version
424 """
412 """
425 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
413 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
426 if adapt_version != major_protocol_version:
414 if adapt_version != major_protocol_version:
427 self.session.adapt_version = adapt_version
415 self.session.adapt_version = adapt_version
428
416
429 def shutdown(self, restart=False):
417 def shutdown(self, restart=False):
430 """Request an immediate kernel shutdown.
418 """Request an immediate kernel shutdown.
431
419
432 Upon receipt of the (empty) reply, client code can safely assume that
420 Upon receipt of the (empty) reply, client code can safely assume that
433 the kernel has shut down and it's safe to forcefully terminate it if
421 the kernel has shut down and it's safe to forcefully terminate it if
434 it's still alive.
422 it's still alive.
435
423
436 The kernel will send the reply via a function registered with Python's
424 The kernel will send the reply via a function registered with Python's
437 atexit module, ensuring it's truly done as the kernel is done with all
425 atexit module, ensuring it's truly done as the kernel is done with all
438 normal operation.
426 normal operation.
439 """
427 """
440 # Send quit message to kernel. Once we implement kernel-side setattr,
428 # Send quit message to kernel. Once we implement kernel-side setattr,
441 # this should probably be done that way, but for now this will do.
429 # this should probably be done that way, but for now this will do.
442 msg = self.session.msg('shutdown_request', {'restart':restart})
430 msg = self.session.msg('shutdown_request', {'restart':restart})
443 self._queue_send(msg)
431 self._queue_send(msg)
444 return msg['header']['msg_id']
432 return msg['header']['msg_id']
445
433
446 def is_complete(self, code):
434 def is_complete(self, code):
447 msg = self.session.msg('is_complete_request', {'code': code})
435 msg = self.session.msg('is_complete_request', {'code': code})
448 self._queue_send(msg)
436 self._queue_send(msg)
449 return msg['header']['msg_id']
437 return msg['header']['msg_id']
450
438
451
439
452 class QtIOPubChannel(QtZMQSocketChannel):
440 class QtIOPubChannel(QtZMQSocketChannel):
453 """The iopub channel which listens for messages that the kernel publishes.
441 """The iopub channel which listens for messages that the kernel publishes.
454
442
455 This channel is where all output is published to frontends.
443 This channel is where all output is published to frontends.
456 """
444 """
457 # Emitted when a message of type 'stream' is received.
445 # Emitted when a message of type 'stream' is received.
458 stream_received = QtCore.Signal(object)
446 stream_received = QtCore.Signal(object)
459
447
460 # Emitted when a message of type 'execute_input' is received.
448 # Emitted when a message of type 'execute_input' is received.
461 execute_input_received = QtCore.Signal(object)
449 execute_input_received = QtCore.Signal(object)
462
450
463 # Emitted when a message of type 'execute_result' is received.
451 # Emitted when a message of type 'execute_result' is received.
464 execute_result_received = QtCore.Signal(object)
452 execute_result_received = QtCore.Signal(object)
465
453
466 # Emitted when a message of type 'error' is received.
454 # Emitted when a message of type 'error' is received.
467 error_received = QtCore.Signal(object)
455 error_received = QtCore.Signal(object)
468
456
469 # Emitted when a message of type 'display_data' is received
457 # Emitted when a message of type 'display_data' is received
470 display_data_received = QtCore.Signal(object)
458 display_data_received = QtCore.Signal(object)
471
459
472 # Emitted when a crash report message is received from the kernel's
460 # Emitted when a crash report message is received from the kernel's
473 # last-resort sys.excepthook.
461 # last-resort sys.excepthook.
474 crash_received = QtCore.Signal(object)
462 crash_received = QtCore.Signal(object)
475
463
476 # Emitted when a shutdown is noticed.
464 # Emitted when a shutdown is noticed.
477 shutdown_reply_received = QtCore.Signal(object)
465 shutdown_reply_received = QtCore.Signal(object)
478
466
479 def __init__(self, context, session, address):
467 def __init__(self, context, session, address):
480 super(QtIOPubChannel, self).__init__(context, session, address)
468 super(QtIOPubChannel, self).__init__(context, session, address)
481 self.ioloop = ioloop.IOLoop()
469 self.ioloop = ioloop.IOLoop()
482
470
483 def run(self):
471 def run(self):
484 """The thread's main activity. Call start() instead."""
472 """The thread's main activity. Call start() instead."""
485 self.socket = self.context.socket(zmq.SUB)
473 self.socket = make_iopub_socket(self.context, self.session.bsession, self.address)
486 self.socket.linger = 1000
487 self.socket.setsockopt(zmq.SUBSCRIBE,b'')
488 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
489 self.socket.connect(self.address)
490 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
474 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
491 self.stream.on_recv(self._handle_recv)
475 self.stream.on_recv(self._handle_recv)
492 self._run_loop()
476 self._run_loop()
493
477
494 def call_handlers(self, msg):
478 def call_handlers(self, msg):
495 super(QtIOPubChannel, self).call_handlers(msg)
479 super(QtIOPubChannel, self).call_handlers(msg)
496
480
497 # Emit signals for specialized message types.
481 # Emit signals for specialized message types.
498 msg_type = msg['header']['msg_type']
482 msg_type = msg['header']['msg_type']
499 signal = getattr(self, msg_type + '_received', None)
483 signal = getattr(self, msg_type + '_received', None)
500 if signal:
484 if signal:
501 signal.emit(msg)
485 signal.emit(msg)
502
486
503 def flush(self, timeout=1.0):
487 def flush(self, timeout=1.0):
504 """Immediately processes all pending messages on the iopub channel.
488 """Immediately processes all pending messages on the iopub channel.
505
489
506 Callers should use this method to ensure that :meth:`call_handlers`
490 Callers should use this method to ensure that :meth:`call_handlers`
507 has been called for all messages that have been received on the
491 has been called for all messages that have been received on the
508 0MQ SUB socket of this channel.
492 0MQ SUB socket of this channel.
509
493
510 This method is thread safe.
494 This method is thread safe.
511
495
512 Parameters
496 Parameters
513 ----------
497 ----------
514 timeout : float, optional
498 timeout : float, optional
515 The maximum amount of time to spend flushing, in seconds. The
499 The maximum amount of time to spend flushing, in seconds. The
516 default is one second.
500 default is one second.
517 """
501 """
518 # We do the IOLoop callback process twice to ensure that the IOLoop
502 # We do the IOLoop callback process twice to ensure that the IOLoop
519 # gets to perform at least one full poll.
503 # gets to perform at least one full poll.
520 stop_time = time.time() + timeout
504 stop_time = time.time() + timeout
521 for i in range(2):
505 for i in range(2):
522 self._flushed = False
506 self._flushed = False
523 self.ioloop.add_callback(self._flush)
507 self.ioloop.add_callback(self._flush)
524 while not self._flushed and time.time() < stop_time:
508 while not self._flushed and time.time() < stop_time:
525 time.sleep(0.01)
509 time.sleep(0.01)
526
510
527 def _flush(self):
511 def _flush(self):
528 """Callback for :method:`self.flush`."""
512 """Callback for :method:`self.flush`."""
529 self.stream.flush()
513 self.stream.flush()
530 self._flushed = True
514 self._flushed = True
531
515
532
516
533 class QtStdInChannel(QtZMQSocketChannel):
517 class QtStdInChannel(QtZMQSocketChannel):
534 """The stdin channel to handle raw_input requests that the kernel makes."""
518 """The stdin channel to handle raw_input requests that the kernel makes."""
535
519
536 msg_queue = None
520 msg_queue = None
537 proxy_methods = ['input']
521 proxy_methods = ['input']
538
522
539 # Emitted when an input request is received.
523 # Emitted when an input request is received.
540 input_requested = QtCore.Signal(object)
524 input_requested = QtCore.Signal(object)
541
525
542 def __init__(self, context, session, address):
526 def __init__(self, context, session, address):
543 super(QtStdInChannel, self).__init__(context, session, address)
527 super(QtStdInChannel, self).__init__(context, session, address)
544 self.ioloop = ioloop.IOLoop()
528 self.ioloop = ioloop.IOLoop()
545
529
546 def run(self):
530 def run(self):
547 """The thread's main activity. Call start() instead."""
531 """The thread's main activity. Call start() instead."""
548 self.socket = self.context.socket(zmq.DEALER)
532 self.socket = make_stdin_socket(self.context, self.session.bsession, self.address)
549 self.socket.linger = 1000
550 self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
551 self.socket.connect(self.address)
552 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
533 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
553 self.stream.on_recv(self._handle_recv)
534 self.stream.on_recv(self._handle_recv)
554 self._run_loop()
535 self._run_loop()
555
536
556 def call_handlers(self, msg):
537 def call_handlers(self, msg):
557 super(QtStdInChannel, self).call_handlers(msg)
538 super(QtStdInChannel, self).call_handlers(msg)
558
539
559 # Emit signals for specialized message types.
540 # Emit signals for specialized message types.
560 msg_type = msg['header']['msg_type']
541 msg_type = msg['header']['msg_type']
561 if msg_type == 'input_request':
542 if msg_type == 'input_request':
562 self.input_requested.emit(msg)
543 self.input_requested.emit(msg)
563
544
564 def input(self, string):
545 def input(self, string):
565 """Send a string of raw input to the kernel."""
546 """Send a string of raw input to the kernel."""
566 content = dict(value=string)
547 content = dict(value=string)
567 msg = self.session.msg('input_reply', content)
548 msg = self.session.msg('input_reply', content)
568 self._queue_send(msg)
549 self._queue_send(msg)
569
550
570 ShellChannelABC.register(QtShellChannel)
551 ShellChannelABC.register(QtShellChannel)
571 IOPubChannelABC.register(QtIOPubChannel)
552 IOPubChannelABC.register(QtIOPubChannel)
572 StdInChannelABC.register(QtStdInChannel)
553 StdInChannelABC.register(QtStdInChannel)
573
554
574
555
575 class QtKernelClient(QtKernelClientMixin, KernelClient):
556 class QtKernelClient(QtKernelClientMixin, KernelClient):
576 """ A KernelClient that provides signals and slots.
557 """ A KernelClient that provides signals and slots.
577 """
558 """
578
559
579 iopub_channel_class = Type(QtIOPubChannel)
560 iopub_channel_class = Type(QtIOPubChannel)
580 shell_channel_class = Type(QtShellChannel)
561 shell_channel_class = Type(QtShellChannel)
581 stdin_channel_class = Type(QtStdInChannel)
562 stdin_channel_class = Type(QtStdInChannel)
582 hb_channel_class = Type(QtHBChannel)
563 hb_channel_class = Type(QtHBChannel)
General Comments 0
You need to be logged in to leave comments. Login now