##// END OF EJS Templates
Rename _queue_send to send
Thomas Kluyver -
Show More
@@ -1,92 +1,92 b''
1 """Blocking channels
1 """Blocking channels
2
2
3 Useful for test suites and blocking terminal interfaces.
3 Useful for test suites and blocking terminal interfaces.
4 """
4 """
5
5
6 # Copyright (c) IPython Development Team.
6 # Copyright (c) IPython Development Team.
7 # Distributed under the terms of the Modified BSD License.
7 # Distributed under the terms of the Modified BSD License.
8
8
9 try:
9 try:
10 from queue import Queue, Empty # Py 3
10 from queue import Queue, Empty # Py 3
11 except ImportError:
11 except ImportError:
12 from Queue import Queue, Empty # Py 2
12 from Queue import Queue, Empty # Py 2
13
13
14
14
15 class ZMQSocketChannel(object):
15 class ZMQSocketChannel(object):
16 """A ZMQ socket in a simple blocking API"""
16 """A ZMQ socket in a simple blocking API"""
17 session = None
17 session = None
18 socket = None
18 socket = None
19 stream = None
19 stream = None
20 _exiting = False
20 _exiting = False
21 proxy_methods = []
21 proxy_methods = []
22
22
23 def __init__(self, socket, session, loop=None):
23 def __init__(self, socket, session, loop=None):
24 """Create a channel.
24 """Create a channel.
25
25
26 Parameters
26 Parameters
27 ----------
27 ----------
28 socket : :class:`zmq.Socket`
28 socket : :class:`zmq.Socket`
29 The ZMQ socket to use.
29 The ZMQ socket to use.
30 session : :class:`session.Session`
30 session : :class:`session.Session`
31 The session to use.
31 The session to use.
32 loop
32 loop
33 Unused here, for other implementations
33 Unused here, for other implementations
34 """
34 """
35 super(ZMQSocketChannel, self).__init__()
35 super(ZMQSocketChannel, self).__init__()
36
36
37 self.socket = socket
37 self.socket = socket
38 self.session = session
38 self.session = session
39
39
40 def _recv(self, **kwargs):
40 def _recv(self, **kwargs):
41 msg = self.socket.recv_multipart(**kwargs)
41 msg = self.socket.recv_multipart(**kwargs)
42 ident,smsg = self.session.feed_identities(msg)
42 ident,smsg = self.session.feed_identities(msg)
43 return self.session.deserialize(smsg)
43 return self.session.deserialize(smsg)
44
44
45 def get_msg(self, block=True, timeout=None):
45 def get_msg(self, block=True, timeout=None):
46 """ Gets a message if there is one that is ready. """
46 """ Gets a message if there is one that is ready. """
47 if block:
47 if block:
48 if timeout is not None:
48 if timeout is not None:
49 timeout *= 1000 # seconds to ms
49 timeout *= 1000 # seconds to ms
50 ready = self.socket.poll(timeout)
50 ready = self.socket.poll(timeout)
51 else:
51 else:
52 ready = self.socket.poll(timeout=0)
52 ready = self.socket.poll(timeout=0)
53
53
54 if ready:
54 if ready:
55 return self._recv()
55 return self._recv()
56 else:
56 else:
57 raise Empty
57 raise Empty
58
58
59 def get_msgs(self):
59 def get_msgs(self):
60 """ Get all messages that are currently ready. """
60 """ Get all messages that are currently ready. """
61 msgs = []
61 msgs = []
62 while True:
62 while True:
63 try:
63 try:
64 msgs.append(self.get_msg(block=False))
64 msgs.append(self.get_msg(block=False))
65 except Empty:
65 except Empty:
66 break
66 break
67 return msgs
67 return msgs
68
68
69 def msg_ready(self):
69 def msg_ready(self):
70 """ Is there a message that has been received? """
70 """ Is there a message that has been received? """
71 return bool(self.socket.poll(timeout=0))
71 return bool(self.socket.poll(timeout=0))
72
72
73 def close(self):
73 def close(self):
74 if self.socket is not None:
74 if self.socket is not None:
75 try:
75 try:
76 self.socket.close(linger=0)
76 self.socket.close(linger=0)
77 except Exception:
77 except Exception:
78 pass
78 pass
79 self.socket = None
79 self.socket = None
80 stop = close
80 stop = close
81
81
82 def is_alive(self):
82 def is_alive(self):
83 return (self.socket is not None)
83 return (self.socket is not None)
84
84
85 def _queue_send(self, msg):
85 def send(self, msg):
86 """Pass a message to the ZMQ socket to send
86 """Pass a message to the ZMQ socket to send
87 """
87 """
88 self.session.send(self.socket, msg)
88 self.session.send(self.socket, msg)
89
89
90 def start(self):
90 def start(self):
91 pass
91 pass
92
92
@@ -1,389 +1,389 b''
1 """Base class to manage the interaction with a running kernel"""
1 """Base class to manage the interaction with a running kernel"""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from __future__ import absolute_import
6 from __future__ import absolute_import
7 from IPython.kernel.channels import major_protocol_version
7 from IPython.kernel.channels import major_protocol_version
8 from IPython.utils.py3compat import string_types, iteritems
8 from IPython.utils.py3compat import string_types, iteritems
9
9
10 import zmq
10 import zmq
11
11
12 from IPython.utils.traitlets import (
12 from IPython.utils.traitlets import (
13 Any, Instance, Type,
13 Any, Instance, Type,
14 )
14 )
15
15
16 from .channelsabc import (ChannelABC, HBChannelABC)
16 from .channelsabc import (ChannelABC, HBChannelABC)
17 from .channels import (
17 from .channels import (
18 make_shell_socket, make_stdin_socket, make_iopub_socket
18 make_shell_socket, make_stdin_socket, make_iopub_socket
19 )
19 )
20 from .clientabc import KernelClientABC
20 from .clientabc import KernelClientABC
21 from .connect import ConnectionFileMixin
21 from .connect import ConnectionFileMixin
22
22
23
23
24 # some utilities to validate message structure, these might get moved elsewhere
24 # some utilities to validate message structure, these might get moved elsewhere
25 # if they prove to have more generic utility
25 # if they prove to have more generic utility
26
26
27 def validate_string_dict(dct):
27 def validate_string_dict(dct):
28 """Validate that the input is a dict with string keys and values.
28 """Validate that the input is a dict with string keys and values.
29
29
30 Raises ValueError if not."""
30 Raises ValueError if not."""
31 for k,v in iteritems(dct):
31 for k,v in iteritems(dct):
32 if not isinstance(k, string_types):
32 if not isinstance(k, string_types):
33 raise ValueError('key %r in dict must be a string' % k)
33 raise ValueError('key %r in dict must be a string' % k)
34 if not isinstance(v, string_types):
34 if not isinstance(v, string_types):
35 raise ValueError('value %r in dict must be a string' % v)
35 raise ValueError('value %r in dict must be a string' % v)
36
36
37
37
38 class KernelClient(ConnectionFileMixin):
38 class KernelClient(ConnectionFileMixin):
39 """Communicates with a single kernel on any host via zmq channels.
39 """Communicates with a single kernel on any host via zmq channels.
40
40
41 There are four channels associated with each kernel:
41 There are four channels associated with each kernel:
42
42
43 * shell: for request/reply calls to the kernel.
43 * shell: for request/reply calls to the kernel.
44 * iopub: for the kernel to publish results to frontends.
44 * iopub: for the kernel to publish results to frontends.
45 * hb: for monitoring the kernel's heartbeat.
45 * hb: for monitoring the kernel's heartbeat.
46 * stdin: for frontends to reply to raw_input calls in the kernel.
46 * stdin: for frontends to reply to raw_input calls in the kernel.
47
47
48 The methods of the channels are exposed as methods of the client itself
48 The methods of the channels are exposed as methods of the client itself
49 (KernelClient.execute, complete, history, etc.).
49 (KernelClient.execute, complete, history, etc.).
50 See the channels themselves for documentation of these methods.
50 See the channels themselves for documentation of these methods.
51
51
52 """
52 """
53
53
54 # The PyZMQ Context to use for communication with the kernel.
54 # The PyZMQ Context to use for communication with the kernel.
55 context = Instance(zmq.Context)
55 context = Instance(zmq.Context)
56 def _context_default(self):
56 def _context_default(self):
57 return zmq.Context.instance()
57 return zmq.Context.instance()
58
58
59 # The classes to use for the various channels
59 # The classes to use for the various channels
60 shell_channel_class = Type(ChannelABC)
60 shell_channel_class = Type(ChannelABC)
61 iopub_channel_class = Type(ChannelABC)
61 iopub_channel_class = Type(ChannelABC)
62 stdin_channel_class = Type(ChannelABC)
62 stdin_channel_class = Type(ChannelABC)
63 hb_channel_class = Type(HBChannelABC)
63 hb_channel_class = Type(HBChannelABC)
64
64
65 # Protected traits
65 # Protected traits
66 _shell_channel = Any
66 _shell_channel = Any
67 _iopub_channel = Any
67 _iopub_channel = Any
68 _stdin_channel = Any
68 _stdin_channel = Any
69 _hb_channel = Any
69 _hb_channel = Any
70
70
71 # flag for whether execute requests should be allowed to call raw_input:
71 # flag for whether execute requests should be allowed to call raw_input:
72 allow_stdin = True
72 allow_stdin = True
73
73
74 #--------------------------------------------------------------------------
74 #--------------------------------------------------------------------------
75 # Channel proxy methods
75 # Channel proxy methods
76 #--------------------------------------------------------------------------
76 #--------------------------------------------------------------------------
77
77
78 def _get_msg(channel, *args, **kwargs):
78 def _get_msg(channel, *args, **kwargs):
79 return channel.get_msg(*args, **kwargs)
79 return channel.get_msg(*args, **kwargs)
80
80
81 def get_shell_msg(self, *args, **kwargs):
81 def get_shell_msg(self, *args, **kwargs):
82 """Get a message from the shell channel"""
82 """Get a message from the shell channel"""
83 return self.shell_channel.get_msg(*args, **kwargs)
83 return self.shell_channel.get_msg(*args, **kwargs)
84
84
85 def get_iopub_msg(self, *args, **kwargs):
85 def get_iopub_msg(self, *args, **kwargs):
86 """Get a message from the iopub channel"""
86 """Get a message from the iopub channel"""
87 return self.iopub_channel.get_msg(*args, **kwargs)
87 return self.iopub_channel.get_msg(*args, **kwargs)
88
88
89 def get_stdin_msg(self, *args, **kwargs):
89 def get_stdin_msg(self, *args, **kwargs):
90 """Get a message from the stdin channel"""
90 """Get a message from the stdin channel"""
91 return self.stdin_channel.get_msg(*args, **kwargs)
91 return self.stdin_channel.get_msg(*args, **kwargs)
92
92
93 #--------------------------------------------------------------------------
93 #--------------------------------------------------------------------------
94 # Channel management methods
94 # Channel management methods
95 #--------------------------------------------------------------------------
95 #--------------------------------------------------------------------------
96
96
97 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
97 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
98 """Starts the channels for this kernel.
98 """Starts the channels for this kernel.
99
99
100 This will create the channels if they do not exist and then start
100 This will create the channels if they do not exist and then start
101 them (their activity runs in a thread). If port numbers of 0 are
101 them (their activity runs in a thread). If port numbers of 0 are
102 being used (random ports) then you must first call
102 being used (random ports) then you must first call
103 :meth:`start_kernel`. If the channels have been stopped and you
103 :meth:`start_kernel`. If the channels have been stopped and you
104 call this, :class:`RuntimeError` will be raised.
104 call this, :class:`RuntimeError` will be raised.
105 """
105 """
106 if shell:
106 if shell:
107 self.shell_channel.start()
107 self.shell_channel.start()
108 self.kernel_info()
108 self.kernel_info()
109 if iopub:
109 if iopub:
110 self.iopub_channel.start()
110 self.iopub_channel.start()
111 if stdin:
111 if stdin:
112 self.stdin_channel.start()
112 self.stdin_channel.start()
113 self.allow_stdin = True
113 self.allow_stdin = True
114 else:
114 else:
115 self.allow_stdin = False
115 self.allow_stdin = False
116 if hb:
116 if hb:
117 self.hb_channel.start()
117 self.hb_channel.start()
118
118
119 def stop_channels(self):
119 def stop_channels(self):
120 """Stops all the running channels for this kernel.
120 """Stops all the running channels for this kernel.
121
121
122 This stops their event loops and joins their threads.
122 This stops their event loops and joins their threads.
123 """
123 """
124 if self.shell_channel.is_alive():
124 if self.shell_channel.is_alive():
125 self.shell_channel.stop()
125 self.shell_channel.stop()
126 if self.iopub_channel.is_alive():
126 if self.iopub_channel.is_alive():
127 self.iopub_channel.stop()
127 self.iopub_channel.stop()
128 if self.stdin_channel.is_alive():
128 if self.stdin_channel.is_alive():
129 self.stdin_channel.stop()
129 self.stdin_channel.stop()
130 if self.hb_channel.is_alive():
130 if self.hb_channel.is_alive():
131 self.hb_channel.stop()
131 self.hb_channel.stop()
132
132
133 @property
133 @property
134 def channels_running(self):
134 def channels_running(self):
135 """Are any of the channels created and running?"""
135 """Are any of the channels created and running?"""
136 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
136 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
137 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
137 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
138
138
139 ioloop = None # Overridden in subclasses that use pyzmq event loop
139 ioloop = None # Overridden in subclasses that use pyzmq event loop
140
140
141 @property
141 @property
142 def shell_channel(self):
142 def shell_channel(self):
143 """Get the shell channel object for this kernel."""
143 """Get the shell channel object for this kernel."""
144 if self._shell_channel is None:
144 if self._shell_channel is None:
145 url = self._make_url('shell')
145 url = self._make_url('shell')
146 self.log.debug("connecting shell channel to %s", url)
146 self.log.debug("connecting shell channel to %s", url)
147 socket = make_shell_socket(self.context, self.session.bsession, url)
147 socket = make_shell_socket(self.context, self.session.bsession, url)
148 self._shell_channel = self.shell_channel_class(
148 self._shell_channel = self.shell_channel_class(
149 socket, self.session, self.ioloop
149 socket, self.session, self.ioloop
150 )
150 )
151 return self._shell_channel
151 return self._shell_channel
152
152
153 @property
153 @property
154 def iopub_channel(self):
154 def iopub_channel(self):
155 """Get the iopub channel object for this kernel."""
155 """Get the iopub channel object for this kernel."""
156 if self._iopub_channel is None:
156 if self._iopub_channel is None:
157 url = self._make_url('iopub')
157 url = self._make_url('iopub')
158 self.log.debug("connecting iopub channel to %s", url)
158 self.log.debug("connecting iopub channel to %s", url)
159 socket = make_iopub_socket(self.context, self.session.bsession, url)
159 socket = make_iopub_socket(self.context, self.session.bsession, url)
160 self._iopub_channel = self.iopub_channel_class(
160 self._iopub_channel = self.iopub_channel_class(
161 socket, self.session, self.ioloop
161 socket, self.session, self.ioloop
162 )
162 )
163 return self._iopub_channel
163 return self._iopub_channel
164
164
165 @property
165 @property
166 def stdin_channel(self):
166 def stdin_channel(self):
167 """Get the stdin channel object for this kernel."""
167 """Get the stdin channel object for this kernel."""
168 if self._stdin_channel is None:
168 if self._stdin_channel is None:
169 url = self._make_url('stdin')
169 url = self._make_url('stdin')
170 self.log.debug("connecting stdin channel to %s", url)
170 self.log.debug("connecting stdin channel to %s", url)
171 socket = make_stdin_socket(self.context, self.session.bsession, url)
171 socket = make_stdin_socket(self.context, self.session.bsession, url)
172 self._stdin_channel = self.stdin_channel_class(
172 self._stdin_channel = self.stdin_channel_class(
173 socket, self.session, self.ioloop
173 socket, self.session, self.ioloop
174 )
174 )
175 return self._stdin_channel
175 return self._stdin_channel
176
176
177 @property
177 @property
178 def hb_channel(self):
178 def hb_channel(self):
179 """Get the hb channel object for this kernel."""
179 """Get the hb channel object for this kernel."""
180 if self._hb_channel is None:
180 if self._hb_channel is None:
181 url = self._make_url('hb')
181 url = self._make_url('hb')
182 self.log.debug("connecting heartbeat channel to %s", url)
182 self.log.debug("connecting heartbeat channel to %s", url)
183 self._hb_channel = self.hb_channel_class(
183 self._hb_channel = self.hb_channel_class(
184 self.context, self.session, url
184 self.context, self.session, url
185 )
185 )
186 return self._hb_channel
186 return self._hb_channel
187
187
188 def is_alive(self):
188 def is_alive(self):
189 """Is the kernel process still running?"""
189 """Is the kernel process still running?"""
190 if self._hb_channel is not None:
190 if self._hb_channel is not None:
191 # We didn't start the kernel with this KernelManager so we
191 # We didn't start the kernel with this KernelManager so we
192 # use the heartbeat.
192 # use the heartbeat.
193 return self._hb_channel.is_beating()
193 return self._hb_channel.is_beating()
194 else:
194 else:
195 # no heartbeat and not local, we can't tell if it's running,
195 # no heartbeat and not local, we can't tell if it's running,
196 # so naively return True
196 # so naively return True
197 return True
197 return True
198
198
199
199
200 # Methods to send specific messages on channels
200 # Methods to send specific messages on channels
201 def execute(self, code, silent=False, store_history=True,
201 def execute(self, code, silent=False, store_history=True,
202 user_expressions=None, allow_stdin=None):
202 user_expressions=None, allow_stdin=None):
203 """Execute code in the kernel.
203 """Execute code in the kernel.
204
204
205 Parameters
205 Parameters
206 ----------
206 ----------
207 code : str
207 code : str
208 A string of Python code.
208 A string of Python code.
209
209
210 silent : bool, optional (default False)
210 silent : bool, optional (default False)
211 If set, the kernel will execute the code as quietly possible, and
211 If set, the kernel will execute the code as quietly possible, and
212 will force store_history to be False.
212 will force store_history to be False.
213
213
214 store_history : bool, optional (default True)
214 store_history : bool, optional (default True)
215 If set, the kernel will store command history. This is forced
215 If set, the kernel will store command history. This is forced
216 to be False if silent is True.
216 to be False if silent is True.
217
217
218 user_expressions : dict, optional
218 user_expressions : dict, optional
219 A dict mapping names to expressions to be evaluated in the user's
219 A dict mapping names to expressions to be evaluated in the user's
220 dict. The expression values are returned as strings formatted using
220 dict. The expression values are returned as strings formatted using
221 :func:`repr`.
221 :func:`repr`.
222
222
223 allow_stdin : bool, optional (default self.allow_stdin)
223 allow_stdin : bool, optional (default self.allow_stdin)
224 Flag for whether the kernel can send stdin requests to frontends.
224 Flag for whether the kernel can send stdin requests to frontends.
225
225
226 Some frontends (e.g. the Notebook) do not support stdin requests.
226 Some frontends (e.g. the Notebook) do not support stdin requests.
227 If raw_input is called from code executed from such a frontend, a
227 If raw_input is called from code executed from such a frontend, a
228 StdinNotImplementedError will be raised.
228 StdinNotImplementedError will be raised.
229
229
230 Returns
230 Returns
231 -------
231 -------
232 The msg_id of the message sent.
232 The msg_id of the message sent.
233 """
233 """
234 if user_expressions is None:
234 if user_expressions is None:
235 user_expressions = {}
235 user_expressions = {}
236 if allow_stdin is None:
236 if allow_stdin is None:
237 allow_stdin = self.allow_stdin
237 allow_stdin = self.allow_stdin
238
238
239
239
240 # Don't waste network traffic if inputs are invalid
240 # Don't waste network traffic if inputs are invalid
241 if not isinstance(code, string_types):
241 if not isinstance(code, string_types):
242 raise ValueError('code %r must be a string' % code)
242 raise ValueError('code %r must be a string' % code)
243 validate_string_dict(user_expressions)
243 validate_string_dict(user_expressions)
244
244
245 # Create class for content/msg creation. Related to, but possibly
245 # Create class for content/msg creation. Related to, but possibly
246 # not in Session.
246 # not in Session.
247 content = dict(code=code, silent=silent, store_history=store_history,
247 content = dict(code=code, silent=silent, store_history=store_history,
248 user_expressions=user_expressions,
248 user_expressions=user_expressions,
249 allow_stdin=allow_stdin,
249 allow_stdin=allow_stdin,
250 )
250 )
251 msg = self.session.msg('execute_request', content)
251 msg = self.session.msg('execute_request', content)
252 self.shell_channel._queue_send(msg)
252 self.shell_channel.send(msg)
253 return msg['header']['msg_id']
253 return msg['header']['msg_id']
254
254
255 def complete(self, code, cursor_pos=None):
255 def complete(self, code, cursor_pos=None):
256 """Tab complete text in the kernel's namespace.
256 """Tab complete text in the kernel's namespace.
257
257
258 Parameters
258 Parameters
259 ----------
259 ----------
260 code : str
260 code : str
261 The context in which completion is requested.
261 The context in which completion is requested.
262 Can be anything between a variable name and an entire cell.
262 Can be anything between a variable name and an entire cell.
263 cursor_pos : int, optional
263 cursor_pos : int, optional
264 The position of the cursor in the block of code where the completion was requested.
264 The position of the cursor in the block of code where the completion was requested.
265 Default: ``len(code)``
265 Default: ``len(code)``
266
266
267 Returns
267 Returns
268 -------
268 -------
269 The msg_id of the message sent.
269 The msg_id of the message sent.
270 """
270 """
271 if cursor_pos is None:
271 if cursor_pos is None:
272 cursor_pos = len(code)
272 cursor_pos = len(code)
273 content = dict(code=code, cursor_pos=cursor_pos)
273 content = dict(code=code, cursor_pos=cursor_pos)
274 msg = self.session.msg('complete_request', content)
274 msg = self.session.msg('complete_request', content)
275 self.shell_channel._queue_send(msg)
275 self.shell_channel.send(msg)
276 return msg['header']['msg_id']
276 return msg['header']['msg_id']
277
277
278 def inspect(self, code, cursor_pos=None, detail_level=0):
278 def inspect(self, code, cursor_pos=None, detail_level=0):
279 """Get metadata information about an object in the kernel's namespace.
279 """Get metadata information about an object in the kernel's namespace.
280
280
281 It is up to the kernel to determine the appropriate object to inspect.
281 It is up to the kernel to determine the appropriate object to inspect.
282
282
283 Parameters
283 Parameters
284 ----------
284 ----------
285 code : str
285 code : str
286 The context in which info is requested.
286 The context in which info is requested.
287 Can be anything between a variable name and an entire cell.
287 Can be anything between a variable name and an entire cell.
288 cursor_pos : int, optional
288 cursor_pos : int, optional
289 The position of the cursor in the block of code where the info was requested.
289 The position of the cursor in the block of code where the info was requested.
290 Default: ``len(code)``
290 Default: ``len(code)``
291 detail_level : int, optional
291 detail_level : int, optional
292 The level of detail for the introspection (0-2)
292 The level of detail for the introspection (0-2)
293
293
294 Returns
294 Returns
295 -------
295 -------
296 The msg_id of the message sent.
296 The msg_id of the message sent.
297 """
297 """
298 if cursor_pos is None:
298 if cursor_pos is None:
299 cursor_pos = len(code)
299 cursor_pos = len(code)
300 content = dict(code=code, cursor_pos=cursor_pos,
300 content = dict(code=code, cursor_pos=cursor_pos,
301 detail_level=detail_level,
301 detail_level=detail_level,
302 )
302 )
303 msg = self.session.msg('inspect_request', content)
303 msg = self.session.msg('inspect_request', content)
304 self.shell_channel._queue_send(msg)
304 self.shell_channel.send(msg)
305 return msg['header']['msg_id']
305 return msg['header']['msg_id']
306
306
307 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
307 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
308 """Get entries from the kernel's history list.
308 """Get entries from the kernel's history list.
309
309
310 Parameters
310 Parameters
311 ----------
311 ----------
312 raw : bool
312 raw : bool
313 If True, return the raw input.
313 If True, return the raw input.
314 output : bool
314 output : bool
315 If True, then return the output as well.
315 If True, then return the output as well.
316 hist_access_type : str
316 hist_access_type : str
317 'range' (fill in session, start and stop params), 'tail' (fill in n)
317 'range' (fill in session, start and stop params), 'tail' (fill in n)
318 or 'search' (fill in pattern param).
318 or 'search' (fill in pattern param).
319
319
320 session : int
320 session : int
321 For a range request, the session from which to get lines. Session
321 For a range request, the session from which to get lines. Session
322 numbers are positive integers; negative ones count back from the
322 numbers are positive integers; negative ones count back from the
323 current session.
323 current session.
324 start : int
324 start : int
325 The first line number of a history range.
325 The first line number of a history range.
326 stop : int
326 stop : int
327 The final (excluded) line number of a history range.
327 The final (excluded) line number of a history range.
328
328
329 n : int
329 n : int
330 The number of lines of history to get for a tail request.
330 The number of lines of history to get for a tail request.
331
331
332 pattern : str
332 pattern : str
333 The glob-syntax pattern for a search request.
333 The glob-syntax pattern for a search request.
334
334
335 Returns
335 Returns
336 -------
336 -------
337 The msg_id of the message sent.
337 The msg_id of the message sent.
338 """
338 """
339 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
339 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
340 **kwargs)
340 **kwargs)
341 msg = self.session.msg('history_request', content)
341 msg = self.session.msg('history_request', content)
342 self.shell_channel._queue_send(msg)
342 self.shell_channel.send(msg)
343 return msg['header']['msg_id']
343 return msg['header']['msg_id']
344
344
345 def kernel_info(self):
345 def kernel_info(self):
346 """Request kernel info."""
346 """Request kernel info."""
347 msg = self.session.msg('kernel_info_request')
347 msg = self.session.msg('kernel_info_request')
348 self.shell_channel._queue_send(msg)
348 self.shell_channel.send(msg)
349 return msg['header']['msg_id']
349 return msg['header']['msg_id']
350
350
351 def _handle_kernel_info_reply(self, msg):
351 def _handle_kernel_info_reply(self, msg):
352 """handle kernel info reply
352 """handle kernel info reply
353
353
354 sets protocol adaptation version
354 sets protocol adaptation version
355 """
355 """
356 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
356 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
357 if adapt_version != major_protocol_version:
357 if adapt_version != major_protocol_version:
358 self.session.adapt_version = adapt_version
358 self.session.adapt_version = adapt_version
359
359
360 def shutdown(self, restart=False):
360 def shutdown(self, restart=False):
361 """Request an immediate kernel shutdown.
361 """Request an immediate kernel shutdown.
362
362
363 Upon receipt of the (empty) reply, client code can safely assume that
363 Upon receipt of the (empty) reply, client code can safely assume that
364 the kernel has shut down and it's safe to forcefully terminate it if
364 the kernel has shut down and it's safe to forcefully terminate it if
365 it's still alive.
365 it's still alive.
366
366
367 The kernel will send the reply via a function registered with Python's
367 The kernel will send the reply via a function registered with Python's
368 atexit module, ensuring it's truly done as the kernel is done with all
368 atexit module, ensuring it's truly done as the kernel is done with all
369 normal operation.
369 normal operation.
370 """
370 """
371 # Send quit message to kernel. Once we implement kernel-side setattr,
371 # Send quit message to kernel. Once we implement kernel-side setattr,
372 # this should probably be done that way, but for now this will do.
372 # this should probably be done that way, but for now this will do.
373 msg = self.session.msg('shutdown_request', {'restart':restart})
373 msg = self.session.msg('shutdown_request', {'restart':restart})
374 self.shell_channel._queue_send(msg)
374 self.shell_channel.send(msg)
375 return msg['header']['msg_id']
375 return msg['header']['msg_id']
376
376
377 def is_complete(self, code):
377 def is_complete(self, code):
378 msg = self.session.msg('is_complete_request', {'code': code})
378 msg = self.session.msg('is_complete_request', {'code': code})
379 self.shell_channel._queue_send(msg)
379 self.shell_channel.send(msg)
380 return msg['header']['msg_id']
380 return msg['header']['msg_id']
381
381
382 def input(self, string):
382 def input(self, string):
383 """Send a string of raw input to the kernel."""
383 """Send a string of raw input to the kernel."""
384 content = dict(value=string)
384 content = dict(value=string)
385 msg = self.session.msg('input_reply', content)
385 msg = self.session.msg('input_reply', content)
386 self.stdin_channel._queue_send(msg)
386 self.stdin_channel.send(msg)
387
387
388
388
389 KernelClientABC.register(KernelClient)
389 KernelClientABC.register(KernelClient)
@@ -1,250 +1,250 b''
1 """ Defines a KernelClient that provides signals and slots.
1 """ Defines a KernelClient that provides signals and slots.
2 """
2 """
3 import atexit
3 import atexit
4 import errno
4 import errno
5 from threading import Thread
5 from threading import Thread
6 import time
6 import time
7
7
8 import zmq
8 import zmq
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 # during garbage collection of threads at exit:
10 # during garbage collection of threads at exit:
11 from zmq import ZMQError
11 from zmq import ZMQError
12 from zmq.eventloop import ioloop, zmqstream
12 from zmq.eventloop import ioloop, zmqstream
13
13
14 from IPython.external.qt import QtCore
14 from IPython.external.qt import QtCore
15
15
16 # Local imports
16 # Local imports
17 from IPython.utils.traitlets import Type, Instance
17 from IPython.utils.traitlets import Type, Instance
18 from IPython.kernel.channels import HBChannel,\
18 from IPython.kernel.channels import HBChannel,\
19 make_shell_socket, make_iopub_socket, make_stdin_socket
19 make_shell_socket, make_iopub_socket, make_stdin_socket
20 from IPython.kernel import KernelClient
20 from IPython.kernel import KernelClient
21
21
22 from .kernel_mixins import QtKernelClientMixin
22 from .kernel_mixins import QtKernelClientMixin
23 from .util import SuperQObject
23 from .util import SuperQObject
24
24
25 class QtHBChannel(SuperQObject, HBChannel):
25 class QtHBChannel(SuperQObject, HBChannel):
26 # A longer timeout than the base class
26 # A longer timeout than the base class
27 time_to_dead = 3.0
27 time_to_dead = 3.0
28
28
29 # Emitted when the kernel has died.
29 # Emitted when the kernel has died.
30 kernel_died = QtCore.Signal(object)
30 kernel_died = QtCore.Signal(object)
31
31
32 def call_handlers(self, since_last_heartbeat):
32 def call_handlers(self, since_last_heartbeat):
33 """ Reimplemented to emit signals instead of making callbacks.
33 """ Reimplemented to emit signals instead of making callbacks.
34 """
34 """
35 # Emit the generic signal.
35 # Emit the generic signal.
36 self.kernel_died.emit(since_last_heartbeat)
36 self.kernel_died.emit(since_last_heartbeat)
37
37
38 from IPython.core.release import kernel_protocol_version_info
38 from IPython.core.release import kernel_protocol_version_info
39
39
40 major_protocol_version = kernel_protocol_version_info[0]
40 major_protocol_version = kernel_protocol_version_info[0]
41
41
42 class InvalidPortNumber(Exception):
42 class InvalidPortNumber(Exception):
43 pass
43 pass
44
44
45
45
46 class QtZMQSocketChannel(SuperQObject):
46 class QtZMQSocketChannel(SuperQObject):
47 """A ZMQ socket emitting a Qt signal when a message is received."""
47 """A ZMQ socket emitting a Qt signal when a message is received."""
48 session = None
48 session = None
49 socket = None
49 socket = None
50 ioloop = None
50 ioloop = None
51 stream = None
51 stream = None
52
52
53 message_received = QtCore.Signal(object)
53 message_received = QtCore.Signal(object)
54
54
55 def process_events(self):
55 def process_events(self):
56 """ Process any pending GUI events.
56 """ Process any pending GUI events.
57 """
57 """
58 QtCore.QCoreApplication.instance().processEvents()
58 QtCore.QCoreApplication.instance().processEvents()
59
59
60 def __init__(self, socket, session, loop):
60 def __init__(self, socket, session, loop):
61 """Create a channel.
61 """Create a channel.
62
62
63 Parameters
63 Parameters
64 ----------
64 ----------
65 socket : :class:`zmq.Socket`
65 socket : :class:`zmq.Socket`
66 The ZMQ socket to use.
66 The ZMQ socket to use.
67 session : :class:`session.Session`
67 session : :class:`session.Session`
68 The session to use.
68 The session to use.
69 loop
69 loop
70 A pyzmq ioloop to connect the socket to using a ZMQStream
70 A pyzmq ioloop to connect the socket to using a ZMQStream
71 """
71 """
72 super(QtZMQSocketChannel, self).__init__()
72 super(QtZMQSocketChannel, self).__init__()
73
73
74 self.socket = socket
74 self.socket = socket
75 self.session = session
75 self.session = session
76 self.ioloop = loop
76 self.ioloop = loop
77
77
78 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
78 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
79 self.stream.on_recv(self._handle_recv)
79 self.stream.on_recv(self._handle_recv)
80
80
81 _is_alive = False
81 _is_alive = False
82 def is_alive(self):
82 def is_alive(self):
83 return self._is_alive
83 return self._is_alive
84
84
85 def start(self):
85 def start(self):
86 self._is_alive = True
86 self._is_alive = True
87
87
88 def stop(self):
88 def stop(self):
89 self._is_alive = False
89 self._is_alive = False
90
90
91 def close(self):
91 def close(self):
92 if self.socket is not None:
92 if self.socket is not None:
93 try:
93 try:
94 self.socket.close(linger=0)
94 self.socket.close(linger=0)
95 except Exception:
95 except Exception:
96 pass
96 pass
97 self.socket = None
97 self.socket = None
98
98
99 def _queue_send(self, msg):
99 def send(self, msg):
100 """Queue a message to be sent from the IOLoop's thread.
100 """Queue a message to be sent from the IOLoop's thread.
101
101
102 Parameters
102 Parameters
103 ----------
103 ----------
104 msg : message to send
104 msg : message to send
105
105
106 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
106 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
107 thread control of the action.
107 thread control of the action.
108 """
108 """
109 def thread_send():
109 def thread_send():
110 self.session.send(self.stream, msg)
110 self.session.send(self.stream, msg)
111 self.ioloop.add_callback(thread_send)
111 self.ioloop.add_callback(thread_send)
112
112
113 def _handle_recv(self, msg):
113 def _handle_recv(self, msg):
114 """Callback for stream.on_recv.
114 """Callback for stream.on_recv.
115
115
116 Unpacks message, and calls handlers with it.
116 Unpacks message, and calls handlers with it.
117 """
117 """
118 ident,smsg = self.session.feed_identities(msg)
118 ident,smsg = self.session.feed_identities(msg)
119 msg = self.session.deserialize(smsg)
119 msg = self.session.deserialize(smsg)
120 self.call_handlers(msg)
120 self.call_handlers(msg)
121
121
122 def call_handlers(self, msg):
122 def call_handlers(self, msg):
123 """This method is called in the ioloop thread when a message arrives.
123 """This method is called in the ioloop thread when a message arrives.
124
124
125 Subclasses should override this method to handle incoming messages.
125 Subclasses should override this method to handle incoming messages.
126 It is important to remember that this method is called in the thread
126 It is important to remember that this method is called in the thread
127 so that some logic must be done to ensure that the application level
127 so that some logic must be done to ensure that the application level
128 handlers are called in the application thread.
128 handlers are called in the application thread.
129 """
129 """
130 # Emit the generic signal.
130 # Emit the generic signal.
131 self.message_received.emit(msg)
131 self.message_received.emit(msg)
132
132
133 def flush(self, timeout=1.0):
133 def flush(self, timeout=1.0):
134 """Immediately processes all pending messages on this channel.
134 """Immediately processes all pending messages on this channel.
135
135
136 This is only used for the IOPub channel.
136 This is only used for the IOPub channel.
137
137
138 Callers should use this method to ensure that :meth:`call_handlers`
138 Callers should use this method to ensure that :meth:`call_handlers`
139 has been called for all messages that have been received on the
139 has been called for all messages that have been received on the
140 0MQ SUB socket of this channel.
140 0MQ SUB socket of this channel.
141
141
142 This method is thread safe.
142 This method is thread safe.
143
143
144 Parameters
144 Parameters
145 ----------
145 ----------
146 timeout : float, optional
146 timeout : float, optional
147 The maximum amount of time to spend flushing, in seconds. The
147 The maximum amount of time to spend flushing, in seconds. The
148 default is one second.
148 default is one second.
149 """
149 """
150 # We do the IOLoop callback process twice to ensure that the IOLoop
150 # We do the IOLoop callback process twice to ensure that the IOLoop
151 # gets to perform at least one full poll.
151 # gets to perform at least one full poll.
152 stop_time = time.time() + timeout
152 stop_time = time.time() + timeout
153 for i in range(2):
153 for i in range(2):
154 self._flushed = False
154 self._flushed = False
155 self.ioloop.add_callback(self._flush)
155 self.ioloop.add_callback(self._flush)
156 while not self._flushed and time.time() < stop_time:
156 while not self._flushed and time.time() < stop_time:
157 time.sleep(0.01)
157 time.sleep(0.01)
158
158
159 def _flush(self):
159 def _flush(self):
160 """Callback for :method:`self.flush`."""
160 """Callback for :method:`self.flush`."""
161 self.stream.flush()
161 self.stream.flush()
162 self._flushed = True
162 self._flushed = True
163
163
164
164
165 class IOLoopThread(Thread):
165 class IOLoopThread(Thread):
166 """Run a pyzmq ioloop in a thread to send and receive messages
166 """Run a pyzmq ioloop in a thread to send and receive messages
167 """
167 """
168 def __init__(self, loop):
168 def __init__(self, loop):
169 super(IOLoopThread, self).__init__()
169 super(IOLoopThread, self).__init__()
170 self.daemon = True
170 self.daemon = True
171 atexit.register(self._notice_exit)
171 atexit.register(self._notice_exit)
172 self.ioloop = loop or ioloop.IOLoop()
172 self.ioloop = loop or ioloop.IOLoop()
173
173
174 def _notice_exit(self):
174 def _notice_exit(self):
175 self._exiting = True
175 self._exiting = True
176
176
177 def run(self):
177 def run(self):
178 """Run my loop, ignoring EINTR events in the poller"""
178 """Run my loop, ignoring EINTR events in the poller"""
179 while True:
179 while True:
180 try:
180 try:
181 self.ioloop.start()
181 self.ioloop.start()
182 except ZMQError as e:
182 except ZMQError as e:
183 if e.errno == errno.EINTR:
183 if e.errno == errno.EINTR:
184 continue
184 continue
185 else:
185 else:
186 raise
186 raise
187 except Exception:
187 except Exception:
188 if self._exiting:
188 if self._exiting:
189 break
189 break
190 else:
190 else:
191 raise
191 raise
192 else:
192 else:
193 break
193 break
194
194
195 def stop(self):
195 def stop(self):
196 """Stop the channel's event loop and join its thread.
196 """Stop the channel's event loop and join its thread.
197
197
198 This calls :meth:`~threading.Thread.join` and returns when the thread
198 This calls :meth:`~threading.Thread.join` and returns when the thread
199 terminates. :class:`RuntimeError` will be raised if
199 terminates. :class:`RuntimeError` will be raised if
200 :meth:`~threading.Thread.start` is called again.
200 :meth:`~threading.Thread.start` is called again.
201 """
201 """
202 if self.ioloop is not None:
202 if self.ioloop is not None:
203 self.ioloop.stop()
203 self.ioloop.stop()
204 self.join()
204 self.join()
205 self.close()
205 self.close()
206
206
207 def close(self):
207 def close(self):
208 if self.ioloop is not None:
208 if self.ioloop is not None:
209 try:
209 try:
210 self.ioloop.close(all_fds=True)
210 self.ioloop.close(all_fds=True)
211 except Exception:
211 except Exception:
212 pass
212 pass
213
213
214
214
215 class QtKernelClient(QtKernelClientMixin, KernelClient):
215 class QtKernelClient(QtKernelClientMixin, KernelClient):
216 """ A KernelClient that provides signals and slots.
216 """ A KernelClient that provides signals and slots.
217 """
217 """
218
218
219 _ioloop = None
219 _ioloop = None
220 @property
220 @property
221 def ioloop(self):
221 def ioloop(self):
222 if self._ioloop is None:
222 if self._ioloop is None:
223 self._ioloop = ioloop.IOLoop()
223 self._ioloop = ioloop.IOLoop()
224 return self._ioloop
224 return self._ioloop
225
225
226 ioloop_thread = Instance(IOLoopThread)
226 ioloop_thread = Instance(IOLoopThread)
227
227
228 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
228 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
229 if shell:
229 if shell:
230 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
230 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
231
231
232 self.ioloop_thread = IOLoopThread(self.ioloop)
232 self.ioloop_thread = IOLoopThread(self.ioloop)
233 self.ioloop_thread.start()
233 self.ioloop_thread.start()
234
234
235 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
235 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
236
236
237 def _check_kernel_info_reply(self, msg):
237 def _check_kernel_info_reply(self, msg):
238 if msg['msg_type'] == 'kernel_info_reply':
238 if msg['msg_type'] == 'kernel_info_reply':
239 self._handle_kernel_info_reply(msg)
239 self._handle_kernel_info_reply(msg)
240 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
240 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
241
241
242 def stop_channels(self):
242 def stop_channels(self):
243 super(QtKernelClient, self).stop_channels()
243 super(QtKernelClient, self).stop_channels()
244 if self.ioloop_thread.is_alive():
244 if self.ioloop_thread.is_alive():
245 self.ioloop_thread.stop()
245 self.ioloop_thread.stop()
246
246
247 iopub_channel_class = Type(QtZMQSocketChannel)
247 iopub_channel_class = Type(QtZMQSocketChannel)
248 shell_channel_class = Type(QtZMQSocketChannel)
248 shell_channel_class = Type(QtZMQSocketChannel)
249 stdin_channel_class = Type(QtZMQSocketChannel)
249 stdin_channel_class = Type(QtZMQSocketChannel)
250 hb_channel_class = Type(QtHBChannel)
250 hb_channel_class = Type(QtHBChannel)
General Comments 0
You need to be logged in to leave comments. Login now