##// END OF EJS Templates
Share an IOLoop among Qt channels, rather than one each
Thomas Kluyver -
Show More
@@ -1,143 +1,143 b''
1 """Blocking channels
1 """Blocking channels
2
2
3 Useful for test suites and blocking terminal interfaces.
3 Useful for test suites and blocking terminal interfaces.
4 """
4 """
5
5
6 # Copyright (c) IPython Development Team.
6 # Copyright (c) IPython Development Team.
7 # Distributed under the terms of the Modified BSD License.
7 # Distributed under the terms of the Modified BSD License.
8
8
9 try:
9 try:
10 from queue import Queue, Empty # Py 3
10 from queue import Queue, Empty # Py 3
11 except ImportError:
11 except ImportError:
12 from Queue import Queue, Empty # Py 2
12 from Queue import Queue, Empty # Py 2
13
13
14 from IPython.kernel.channelsabc import ShellChannelABC, IOPubChannelABC, \
14 from IPython.kernel.channelsabc import ShellChannelABC, IOPubChannelABC, \
15 StdInChannelABC
15 StdInChannelABC
16 from IPython.kernel.channels import HBChannel,\
16 from IPython.kernel.channels import HBChannel,\
17 make_iopub_socket, make_shell_socket, make_stdin_socket,\
17 make_iopub_socket, make_shell_socket, make_stdin_socket,\
18 InvalidPortNumber, major_protocol_version
18 InvalidPortNumber, major_protocol_version
19 from IPython.utils.py3compat import string_types, iteritems
19 from IPython.utils.py3compat import string_types, iteritems
20
20
21 # some utilities to validate message structure, these might get moved elsewhere
21 # some utilities to validate message structure, these might get moved elsewhere
22 # if they prove to have more generic utility
22 # if they prove to have more generic utility
23
23
24 def validate_string_list(lst):
24 def validate_string_list(lst):
25 """Validate that the input is a list of strings.
25 """Validate that the input is a list of strings.
26
26
27 Raises ValueError if not."""
27 Raises ValueError if not."""
28 if not isinstance(lst, list):
28 if not isinstance(lst, list):
29 raise ValueError('input %r must be a list' % lst)
29 raise ValueError('input %r must be a list' % lst)
30 for x in lst:
30 for x in lst:
31 if not isinstance(x, string_types):
31 if not isinstance(x, string_types):
32 raise ValueError('element %r in list must be a string' % x)
32 raise ValueError('element %r in list must be a string' % x)
33
33
34
34
35 def validate_string_dict(dct):
35 def validate_string_dict(dct):
36 """Validate that the input is a dict with string keys and values.
36 """Validate that the input is a dict with string keys and values.
37
37
38 Raises ValueError if not."""
38 Raises ValueError if not."""
39 for k,v in iteritems(dct):
39 for k,v in iteritems(dct):
40 if not isinstance(k, string_types):
40 if not isinstance(k, string_types):
41 raise ValueError('key %r in dict must be a string' % k)
41 raise ValueError('key %r in dict must be a string' % k)
42 if not isinstance(v, string_types):
42 if not isinstance(v, string_types):
43 raise ValueError('value %r in dict must be a string' % v)
43 raise ValueError('value %r in dict must be a string' % v)
44
44
45
45
46 class ZMQSocketChannel(object):
46 class ZMQSocketChannel(object):
47 """The base class for the channels that use ZMQ sockets."""
47 """The base class for the channels that use ZMQ sockets."""
48 session = None
48 session = None
49 socket = None
49 socket = None
50 stream = None
50 stream = None
51 _exiting = False
51 _exiting = False
52 proxy_methods = []
52 proxy_methods = []
53
53
54 def __init__(self, socket, session):
54 def __init__(self, socket, session, loop=None):
55 """Create a channel.
55 """Create a channel.
56
56
57 Parameters
57 Parameters
58 ----------
58 ----------
59 context : :class:`zmq.Context`
59 socket : :class:`zmq.Socket`
60 The ZMQ context to use.
60 The ZMQ socket to use.
61 session : :class:`session.Session`
61 session : :class:`session.Session`
62 The session to use.
62 The session to use.
63 address : zmq url
63 loop
64 Standard (ip, port) tuple that the kernel is listening on.
64 Unused here, for other implementations
65 """
65 """
66 super(ZMQSocketChannel, self).__init__()
66 super(ZMQSocketChannel, self).__init__()
67
67
68 self.socket = socket
68 self.socket = socket
69 self.session = session
69 self.session = session
70
70
71 def _recv(self, **kwargs):
71 def _recv(self, **kwargs):
72 msg = self.socket.recv_multipart(**kwargs)
72 msg = self.socket.recv_multipart(**kwargs)
73 ident,smsg = self.session.feed_identities(msg)
73 ident,smsg = self.session.feed_identities(msg)
74 return self.session.deserialize(smsg)
74 return self.session.deserialize(smsg)
75
75
76 def get_msg(self, block=True, timeout=None):
76 def get_msg(self, block=True, timeout=None):
77 """ Gets a message if there is one that is ready. """
77 """ Gets a message if there is one that is ready. """
78 if block:
78 if block:
79 if timeout is not None:
79 if timeout is not None:
80 timeout *= 1000 # seconds to ms
80 timeout *= 1000 # seconds to ms
81 ready = self.socket.poll(timeout)
81 ready = self.socket.poll(timeout)
82 else:
82 else:
83 ready = self.socket.poll(timeout=0)
83 ready = self.socket.poll(timeout=0)
84
84
85 if ready:
85 if ready:
86 return self._recv()
86 return self._recv()
87 else:
87 else:
88 raise Empty
88 raise Empty
89
89
90 def get_msgs(self):
90 def get_msgs(self):
91 """ Get all messages that are currently ready. """
91 """ Get all messages that are currently ready. """
92 msgs = []
92 msgs = []
93 while True:
93 while True:
94 try:
94 try:
95 msgs.append(self.get_msg(block=False))
95 msgs.append(self.get_msg(block=False))
96 except Empty:
96 except Empty:
97 break
97 break
98 return msgs
98 return msgs
99
99
100 def msg_ready(self):
100 def msg_ready(self):
101 """ Is there a message that has been received? """
101 """ Is there a message that has been received? """
102 return bool(self.socket.poll(timeout=0))
102 return bool(self.socket.poll(timeout=0))
103
103
104 def close(self):
104 def close(self):
105 if self.socket is not None:
105 if self.socket is not None:
106 try:
106 try:
107 self.socket.close(linger=0)
107 self.socket.close(linger=0)
108 except Exception:
108 except Exception:
109 pass
109 pass
110 self.socket = None
110 self.socket = None
111 stop = close
111 stop = close
112
112
113 def is_alive(self):
113 def is_alive(self):
114 return (self.socket is not None)
114 return (self.socket is not None)
115
115
116 @property
116 @property
117 def address(self):
117 def address(self):
118 """Get the channel's address as a zmq url string.
118 """Get the channel's address as a zmq url string.
119
119
120 These URLS have the form: 'tcp://127.0.0.1:5555'.
120 These URLS have the form: 'tcp://127.0.0.1:5555'.
121 """
121 """
122 return self._address
122 return self._address
123
123
124 def _queue_send(self, msg):
124 def _queue_send(self, msg):
125 """Pass a message to the ZMQ socket to send
125 """Pass a message to the ZMQ socket to send
126 """
126 """
127 self.session.send(self.socket, msg)
127 self.session.send(self.socket, msg)
128
128
129 def start(self):
129 def start(self):
130 pass
130 pass
131
131
132
132
133
133
134 class BlockingHBChannel(HBChannel):
134 class BlockingHBChannel(HBChannel):
135
135
136 # This kernel needs quicker monitoring, shorten to 1 sec.
136 # This kernel needs quicker monitoring, shorten to 1 sec.
137 # less than 0.5s is unreliable, and will get occasional
137 # less than 0.5s is unreliable, and will get occasional
138 # false reports of missed beats.
138 # false reports of missed beats.
139 time_to_dead = 1.
139 time_to_dead = 1.
140
140
141 def call_handlers(self, since_last_heartbeat):
141 def call_handlers(self, since_last_heartbeat):
142 """ Pause beating on missed heartbeat. """
142 """ Pause beating on missed heartbeat. """
143 pass
143 pass
@@ -1,375 +1,377 b''
1 """Base class to manage the interaction with a running kernel"""
1 """Base class to manage the interaction with a running kernel"""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from __future__ import absolute_import
6 from __future__ import absolute_import
7 from IPython.kernel.channels import validate_string_dict, major_protocol_version
7 from IPython.kernel.channels import validate_string_dict, major_protocol_version
8 from IPython.utils.py3compat import string_types
8 from IPython.utils.py3compat import string_types
9
9
10 import zmq
10 import zmq
11
11
12 from IPython.utils.traitlets import (
12 from IPython.utils.traitlets import (
13 Any, Instance, Type,
13 Any, Instance, Type,
14 )
14 )
15
15
16 from .channelsabc import (
16 from .channelsabc import (
17 ShellChannelABC, IOPubChannelABC, HBChannelABC, StdInChannelABC
17 ShellChannelABC, IOPubChannelABC, HBChannelABC, StdInChannelABC
18 )
18 )
19 from .channels import (
19 from .channels import (
20 make_shell_socket, make_stdin_socket, make_iopub_socket
20 make_shell_socket, make_stdin_socket, make_iopub_socket
21 )
21 )
22 from .clientabc import KernelClientABC
22 from .clientabc import KernelClientABC
23 from .connect import ConnectionFileMixin
23 from .connect import ConnectionFileMixin
24
24
25
25
26 class KernelClient(ConnectionFileMixin):
26 class KernelClient(ConnectionFileMixin):
27 """Communicates with a single kernel on any host via zmq channels.
27 """Communicates with a single kernel on any host via zmq channels.
28
28
29 There are four channels associated with each kernel:
29 There are four channels associated with each kernel:
30
30
31 * shell: for request/reply calls to the kernel.
31 * shell: for request/reply calls to the kernel.
32 * iopub: for the kernel to publish results to frontends.
32 * iopub: for the kernel to publish results to frontends.
33 * hb: for monitoring the kernel's heartbeat.
33 * hb: for monitoring the kernel's heartbeat.
34 * stdin: for frontends to reply to raw_input calls in the kernel.
34 * stdin: for frontends to reply to raw_input calls in the kernel.
35
35
36 The methods of the channels are exposed as methods of the client itself
36 The methods of the channels are exposed as methods of the client itself
37 (KernelClient.execute, complete, history, etc.).
37 (KernelClient.execute, complete, history, etc.).
38 See the channels themselves for documentation of these methods.
38 See the channels themselves for documentation of these methods.
39
39
40 """
40 """
41
41
42 # The PyZMQ Context to use for communication with the kernel.
42 # The PyZMQ Context to use for communication with the kernel.
43 context = Instance(zmq.Context)
43 context = Instance(zmq.Context)
44 def _context_default(self):
44 def _context_default(self):
45 return zmq.Context.instance()
45 return zmq.Context.instance()
46
46
47 # The classes to use for the various channels
47 # The classes to use for the various channels
48 shell_channel_class = Type(ShellChannelABC)
48 shell_channel_class = Type(ShellChannelABC)
49 iopub_channel_class = Type(IOPubChannelABC)
49 iopub_channel_class = Type(IOPubChannelABC)
50 stdin_channel_class = Type(StdInChannelABC)
50 stdin_channel_class = Type(StdInChannelABC)
51 hb_channel_class = Type(HBChannelABC)
51 hb_channel_class = Type(HBChannelABC)
52
52
53 # Protected traits
53 # Protected traits
54 _shell_channel = Any
54 _shell_channel = Any
55 _iopub_channel = Any
55 _iopub_channel = Any
56 _stdin_channel = Any
56 _stdin_channel = Any
57 _hb_channel = Any
57 _hb_channel = Any
58
58
59 # flag for whether execute requests should be allowed to call raw_input:
59 # flag for whether execute requests should be allowed to call raw_input:
60 allow_stdin = True
60 allow_stdin = True
61
61
62 #--------------------------------------------------------------------------
62 #--------------------------------------------------------------------------
63 # Channel proxy methods
63 # Channel proxy methods
64 #--------------------------------------------------------------------------
64 #--------------------------------------------------------------------------
65
65
66 def _get_msg(channel, *args, **kwargs):
66 def _get_msg(channel, *args, **kwargs):
67 return channel.get_msg(*args, **kwargs)
67 return channel.get_msg(*args, **kwargs)
68
68
69 def get_shell_msg(self, *args, **kwargs):
69 def get_shell_msg(self, *args, **kwargs):
70 """Get a message from the shell channel"""
70 """Get a message from the shell channel"""
71 return self.shell_channel.get_msg(*args, **kwargs)
71 return self.shell_channel.get_msg(*args, **kwargs)
72
72
73 def get_iopub_msg(self, *args, **kwargs):
73 def get_iopub_msg(self, *args, **kwargs):
74 """Get a message from the iopub channel"""
74 """Get a message from the iopub channel"""
75 return self.iopub_channel.get_msg(*args, **kwargs)
75 return self.iopub_channel.get_msg(*args, **kwargs)
76
76
77 def get_stdin_msg(self, *args, **kwargs):
77 def get_stdin_msg(self, *args, **kwargs):
78 """Get a message from the stdin channel"""
78 """Get a message from the stdin channel"""
79 return self.stdin_channel.get_msg(*args, **kwargs)
79 return self.stdin_channel.get_msg(*args, **kwargs)
80
80
81 #--------------------------------------------------------------------------
81 #--------------------------------------------------------------------------
82 # Channel management methods
82 # Channel management methods
83 #--------------------------------------------------------------------------
83 #--------------------------------------------------------------------------
84
84
85 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
85 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
86 """Starts the channels for this kernel.
86 """Starts the channels for this kernel.
87
87
88 This will create the channels if they do not exist and then start
88 This will create the channels if they do not exist and then start
89 them (their activity runs in a thread). If port numbers of 0 are
89 them (their activity runs in a thread). If port numbers of 0 are
90 being used (random ports) then you must first call
90 being used (random ports) then you must first call
91 :meth:`start_kernel`. If the channels have been stopped and you
91 :meth:`start_kernel`. If the channels have been stopped and you
92 call this, :class:`RuntimeError` will be raised.
92 call this, :class:`RuntimeError` will be raised.
93 """
93 """
94 if shell:
94 if shell:
95 self.shell_channel.start()
95 self.shell_channel.start()
96 self.kernel_info()
96 self.kernel_info()
97 if iopub:
97 if iopub:
98 self.iopub_channel.start()
98 self.iopub_channel.start()
99 if stdin:
99 if stdin:
100 self.stdin_channel.start()
100 self.stdin_channel.start()
101 self.allow_stdin = True
101 self.allow_stdin = True
102 else:
102 else:
103 self.allow_stdin = False
103 self.allow_stdin = False
104 if hb:
104 if hb:
105 self.hb_channel.start()
105 self.hb_channel.start()
106
106
107 def stop_channels(self):
107 def stop_channels(self):
108 """Stops all the running channels for this kernel.
108 """Stops all the running channels for this kernel.
109
109
110 This stops their event loops and joins their threads.
110 This stops their event loops and joins their threads.
111 """
111 """
112 if self.shell_channel.is_alive():
112 if self.shell_channel.is_alive():
113 self.shell_channel.stop()
113 self.shell_channel.stop()
114 if self.iopub_channel.is_alive():
114 if self.iopub_channel.is_alive():
115 self.iopub_channel.stop()
115 self.iopub_channel.stop()
116 if self.stdin_channel.is_alive():
116 if self.stdin_channel.is_alive():
117 self.stdin_channel.stop()
117 self.stdin_channel.stop()
118 if self.hb_channel.is_alive():
118 if self.hb_channel.is_alive():
119 self.hb_channel.stop()
119 self.hb_channel.stop()
120
120
121 @property
121 @property
122 def channels_running(self):
122 def channels_running(self):
123 """Are any of the channels created and running?"""
123 """Are any of the channels created and running?"""
124 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
124 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
125 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
125 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
126
126
127 ioloop = None # Overridden in subclasses that use pyzmq event loop
128
127 @property
129 @property
128 def shell_channel(self):
130 def shell_channel(self):
129 """Get the shell channel object for this kernel."""
131 """Get the shell channel object for this kernel."""
130 if self._shell_channel is None:
132 if self._shell_channel is None:
131 url = self._make_url('shell')
133 url = self._make_url('shell')
132 self.log.debug("connecting shell channel to %s", url)
134 self.log.debug("connecting shell channel to %s", url)
133 socket = make_shell_socket(self.context, self.session.bsession, url)
135 socket = make_shell_socket(self.context, self.session.bsession, url)
134 self._shell_channel = self.shell_channel_class(
136 self._shell_channel = self.shell_channel_class(
135 socket, self.session
137 socket, self.session, self.ioloop
136 )
138 )
137 return self._shell_channel
139 return self._shell_channel
138
140
139 @property
141 @property
140 def iopub_channel(self):
142 def iopub_channel(self):
141 """Get the iopub channel object for this kernel."""
143 """Get the iopub channel object for this kernel."""
142 if self._iopub_channel is None:
144 if self._iopub_channel is None:
143 url = self._make_url('iopub')
145 url = self._make_url('iopub')
144 self.log.debug("connecting iopub channel to %s", url)
146 self.log.debug("connecting iopub channel to %s", url)
145 socket = make_iopub_socket(self.context, self.session.bsession, url)
147 socket = make_iopub_socket(self.context, self.session.bsession, url)
146 self._iopub_channel = self.iopub_channel_class(
148 self._iopub_channel = self.iopub_channel_class(
147 socket, self.session
149 socket, self.session, self.ioloop
148 )
150 )
149 return self._iopub_channel
151 return self._iopub_channel
150
152
151 @property
153 @property
152 def stdin_channel(self):
154 def stdin_channel(self):
153 """Get the stdin channel object for this kernel."""
155 """Get the stdin channel object for this kernel."""
154 if self._stdin_channel is None:
156 if self._stdin_channel is None:
155 url = self._make_url('stdin')
157 url = self._make_url('stdin')
156 self.log.debug("connecting stdin channel to %s", url)
158 self.log.debug("connecting stdin channel to %s", url)
157 socket = make_stdin_socket(self.context, self.session.bsession, url)
159 socket = make_stdin_socket(self.context, self.session.bsession, url)
158 self._stdin_channel = self.stdin_channel_class(
160 self._stdin_channel = self.stdin_channel_class(
159 socket, self.session
161 socket, self.session, self.ioloop
160 )
162 )
161 return self._stdin_channel
163 return self._stdin_channel
162
164
163 @property
165 @property
164 def hb_channel(self):
166 def hb_channel(self):
165 """Get the hb channel object for this kernel."""
167 """Get the hb channel object for this kernel."""
166 if self._hb_channel is None:
168 if self._hb_channel is None:
167 url = self._make_url('hb')
169 url = self._make_url('hb')
168 self.log.debug("connecting heartbeat channel to %s", url)
170 self.log.debug("connecting heartbeat channel to %s", url)
169 self._hb_channel = self.hb_channel_class(
171 self._hb_channel = self.hb_channel_class(
170 self.context, self.session, url
172 self.context, self.session, url
171 )
173 )
172 return self._hb_channel
174 return self._hb_channel
173
175
174 def is_alive(self):
176 def is_alive(self):
175 """Is the kernel process still running?"""
177 """Is the kernel process still running?"""
176 if self._hb_channel is not None:
178 if self._hb_channel is not None:
177 # We didn't start the kernel with this KernelManager so we
179 # We didn't start the kernel with this KernelManager so we
178 # use the heartbeat.
180 # use the heartbeat.
179 return self._hb_channel.is_beating()
181 return self._hb_channel.is_beating()
180 else:
182 else:
181 # no heartbeat and not local, we can't tell if it's running,
183 # no heartbeat and not local, we can't tell if it's running,
182 # so naively return True
184 # so naively return True
183 return True
185 return True
184
186
185
187
186 # Methods to send specific messages on channels
188 # Methods to send specific messages on channels
187 def execute(self, code, silent=False, store_history=True,
189 def execute(self, code, silent=False, store_history=True,
188 user_expressions=None, allow_stdin=None):
190 user_expressions=None, allow_stdin=None):
189 """Execute code in the kernel.
191 """Execute code in the kernel.
190
192
191 Parameters
193 Parameters
192 ----------
194 ----------
193 code : str
195 code : str
194 A string of Python code.
196 A string of Python code.
195
197
196 silent : bool, optional (default False)
198 silent : bool, optional (default False)
197 If set, the kernel will execute the code as quietly possible, and
199 If set, the kernel will execute the code as quietly possible, and
198 will force store_history to be False.
200 will force store_history to be False.
199
201
200 store_history : bool, optional (default True)
202 store_history : bool, optional (default True)
201 If set, the kernel will store command history. This is forced
203 If set, the kernel will store command history. This is forced
202 to be False if silent is True.
204 to be False if silent is True.
203
205
204 user_expressions : dict, optional
206 user_expressions : dict, optional
205 A dict mapping names to expressions to be evaluated in the user's
207 A dict mapping names to expressions to be evaluated in the user's
206 dict. The expression values are returned as strings formatted using
208 dict. The expression values are returned as strings formatted using
207 :func:`repr`.
209 :func:`repr`.
208
210
209 allow_stdin : bool, optional (default self.allow_stdin)
211 allow_stdin : bool, optional (default self.allow_stdin)
210 Flag for whether the kernel can send stdin requests to frontends.
212 Flag for whether the kernel can send stdin requests to frontends.
211
213
212 Some frontends (e.g. the Notebook) do not support stdin requests.
214 Some frontends (e.g. the Notebook) do not support stdin requests.
213 If raw_input is called from code executed from such a frontend, a
215 If raw_input is called from code executed from such a frontend, a
214 StdinNotImplementedError will be raised.
216 StdinNotImplementedError will be raised.
215
217
216 Returns
218 Returns
217 -------
219 -------
218 The msg_id of the message sent.
220 The msg_id of the message sent.
219 """
221 """
220 if user_expressions is None:
222 if user_expressions is None:
221 user_expressions = {}
223 user_expressions = {}
222 if allow_stdin is None:
224 if allow_stdin is None:
223 allow_stdin = self.allow_stdin
225 allow_stdin = self.allow_stdin
224
226
225
227
226 # Don't waste network traffic if inputs are invalid
228 # Don't waste network traffic if inputs are invalid
227 if not isinstance(code, string_types):
229 if not isinstance(code, string_types):
228 raise ValueError('code %r must be a string' % code)
230 raise ValueError('code %r must be a string' % code)
229 validate_string_dict(user_expressions)
231 validate_string_dict(user_expressions)
230
232
231 # Create class for content/msg creation. Related to, but possibly
233 # Create class for content/msg creation. Related to, but possibly
232 # not in Session.
234 # not in Session.
233 content = dict(code=code, silent=silent, store_history=store_history,
235 content = dict(code=code, silent=silent, store_history=store_history,
234 user_expressions=user_expressions,
236 user_expressions=user_expressions,
235 allow_stdin=allow_stdin,
237 allow_stdin=allow_stdin,
236 )
238 )
237 msg = self.session.msg('execute_request', content)
239 msg = self.session.msg('execute_request', content)
238 self.shell_channel._queue_send(msg)
240 self.shell_channel._queue_send(msg)
239 return msg['header']['msg_id']
241 return msg['header']['msg_id']
240
242
241 def complete(self, code, cursor_pos=None):
243 def complete(self, code, cursor_pos=None):
242 """Tab complete text in the kernel's namespace.
244 """Tab complete text in the kernel's namespace.
243
245
244 Parameters
246 Parameters
245 ----------
247 ----------
246 code : str
248 code : str
247 The context in which completion is requested.
249 The context in which completion is requested.
248 Can be anything between a variable name and an entire cell.
250 Can be anything between a variable name and an entire cell.
249 cursor_pos : int, optional
251 cursor_pos : int, optional
250 The position of the cursor in the block of code where the completion was requested.
252 The position of the cursor in the block of code where the completion was requested.
251 Default: ``len(code)``
253 Default: ``len(code)``
252
254
253 Returns
255 Returns
254 -------
256 -------
255 The msg_id of the message sent.
257 The msg_id of the message sent.
256 """
258 """
257 if cursor_pos is None:
259 if cursor_pos is None:
258 cursor_pos = len(code)
260 cursor_pos = len(code)
259 content = dict(code=code, cursor_pos=cursor_pos)
261 content = dict(code=code, cursor_pos=cursor_pos)
260 msg = self.session.msg('complete_request', content)
262 msg = self.session.msg('complete_request', content)
261 self.shell_channel._queue_send(msg)
263 self.shell_channel._queue_send(msg)
262 return msg['header']['msg_id']
264 return msg['header']['msg_id']
263
265
264 def inspect(self, code, cursor_pos=None, detail_level=0):
266 def inspect(self, code, cursor_pos=None, detail_level=0):
265 """Get metadata information about an object in the kernel's namespace.
267 """Get metadata information about an object in the kernel's namespace.
266
268
267 It is up to the kernel to determine the appropriate object to inspect.
269 It is up to the kernel to determine the appropriate object to inspect.
268
270
269 Parameters
271 Parameters
270 ----------
272 ----------
271 code : str
273 code : str
272 The context in which info is requested.
274 The context in which info is requested.
273 Can be anything between a variable name and an entire cell.
275 Can be anything between a variable name and an entire cell.
274 cursor_pos : int, optional
276 cursor_pos : int, optional
275 The position of the cursor in the block of code where the info was requested.
277 The position of the cursor in the block of code where the info was requested.
276 Default: ``len(code)``
278 Default: ``len(code)``
277 detail_level : int, optional
279 detail_level : int, optional
278 The level of detail for the introspection (0-2)
280 The level of detail for the introspection (0-2)
279
281
280 Returns
282 Returns
281 -------
283 -------
282 The msg_id of the message sent.
284 The msg_id of the message sent.
283 """
285 """
284 if cursor_pos is None:
286 if cursor_pos is None:
285 cursor_pos = len(code)
287 cursor_pos = len(code)
286 content = dict(code=code, cursor_pos=cursor_pos,
288 content = dict(code=code, cursor_pos=cursor_pos,
287 detail_level=detail_level,
289 detail_level=detail_level,
288 )
290 )
289 msg = self.session.msg('inspect_request', content)
291 msg = self.session.msg('inspect_request', content)
290 self.shell_channel._queue_send(msg)
292 self.shell_channel._queue_send(msg)
291 return msg['header']['msg_id']
293 return msg['header']['msg_id']
292
294
293 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
295 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
294 """Get entries from the kernel's history list.
296 """Get entries from the kernel's history list.
295
297
296 Parameters
298 Parameters
297 ----------
299 ----------
298 raw : bool
300 raw : bool
299 If True, return the raw input.
301 If True, return the raw input.
300 output : bool
302 output : bool
301 If True, then return the output as well.
303 If True, then return the output as well.
302 hist_access_type : str
304 hist_access_type : str
303 'range' (fill in session, start and stop params), 'tail' (fill in n)
305 'range' (fill in session, start and stop params), 'tail' (fill in n)
304 or 'search' (fill in pattern param).
306 or 'search' (fill in pattern param).
305
307
306 session : int
308 session : int
307 For a range request, the session from which to get lines. Session
309 For a range request, the session from which to get lines. Session
308 numbers are positive integers; negative ones count back from the
310 numbers are positive integers; negative ones count back from the
309 current session.
311 current session.
310 start : int
312 start : int
311 The first line number of a history range.
313 The first line number of a history range.
312 stop : int
314 stop : int
313 The final (excluded) line number of a history range.
315 The final (excluded) line number of a history range.
314
316
315 n : int
317 n : int
316 The number of lines of history to get for a tail request.
318 The number of lines of history to get for a tail request.
317
319
318 pattern : str
320 pattern : str
319 The glob-syntax pattern for a search request.
321 The glob-syntax pattern for a search request.
320
322
321 Returns
323 Returns
322 -------
324 -------
323 The msg_id of the message sent.
325 The msg_id of the message sent.
324 """
326 """
325 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
327 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
326 **kwargs)
328 **kwargs)
327 msg = self.session.msg('history_request', content)
329 msg = self.session.msg('history_request', content)
328 self.shell_channel._queue_send(msg)
330 self.shell_channel._queue_send(msg)
329 return msg['header']['msg_id']
331 return msg['header']['msg_id']
330
332
331 def kernel_info(self):
333 def kernel_info(self):
332 """Request kernel info."""
334 """Request kernel info."""
333 msg = self.session.msg('kernel_info_request')
335 msg = self.session.msg('kernel_info_request')
334 self.shell_channel._queue_send(msg)
336 self.shell_channel._queue_send(msg)
335 return msg['header']['msg_id']
337 return msg['header']['msg_id']
336
338
337 def _handle_kernel_info_reply(self, msg):
339 def _handle_kernel_info_reply(self, msg):
338 """handle kernel info reply
340 """handle kernel info reply
339
341
340 sets protocol adaptation version
342 sets protocol adaptation version
341 """
343 """
342 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
344 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
343 if adapt_version != major_protocol_version:
345 if adapt_version != major_protocol_version:
344 self.session.adapt_version = adapt_version
346 self.session.adapt_version = adapt_version
345
347
346 def shutdown(self, restart=False):
348 def shutdown(self, restart=False):
347 """Request an immediate kernel shutdown.
349 """Request an immediate kernel shutdown.
348
350
349 Upon receipt of the (empty) reply, client code can safely assume that
351 Upon receipt of the (empty) reply, client code can safely assume that
350 the kernel has shut down and it's safe to forcefully terminate it if
352 the kernel has shut down and it's safe to forcefully terminate it if
351 it's still alive.
353 it's still alive.
352
354
353 The kernel will send the reply via a function registered with Python's
355 The kernel will send the reply via a function registered with Python's
354 atexit module, ensuring it's truly done as the kernel is done with all
356 atexit module, ensuring it's truly done as the kernel is done with all
355 normal operation.
357 normal operation.
356 """
358 """
357 # Send quit message to kernel. Once we implement kernel-side setattr,
359 # Send quit message to kernel. Once we implement kernel-side setattr,
358 # this should probably be done that way, but for now this will do.
360 # this should probably be done that way, but for now this will do.
359 msg = self.session.msg('shutdown_request', {'restart':restart})
361 msg = self.session.msg('shutdown_request', {'restart':restart})
360 self.shell_channel._queue_send(msg)
362 self.shell_channel._queue_send(msg)
361 return msg['header']['msg_id']
363 return msg['header']['msg_id']
362
364
363 def is_complete(self, code):
365 def is_complete(self, code):
364 msg = self.session.msg('is_complete_request', {'code': code})
366 msg = self.session.msg('is_complete_request', {'code': code})
365 self.shell_channel._queue_send(msg)
367 self.shell_channel._queue_send(msg)
366 return msg['header']['msg_id']
368 return msg['header']['msg_id']
367
369
368 def input(self, string):
370 def input(self, string):
369 """Send a string of raw input to the kernel."""
371 """Send a string of raw input to the kernel."""
370 content = dict(value=string)
372 content = dict(value=string)
371 msg = self.session.msg('input_reply', content)
373 msg = self.session.msg('input_reply', content)
372 self.stdin_channel._queue_send(msg)
374 self.stdin_channel._queue_send(msg)
373
375
374
376
375 KernelClientABC.register(KernelClient)
377 KernelClientABC.register(KernelClient)
@@ -1,257 +1,278 b''
1 """ Defines a KernelClient that provides signals and slots.
1 """ Defines a KernelClient that provides signals and slots.
2 """
2 """
3 import atexit
3 import atexit
4 import errno
4 import errno
5 from threading import Thread
5 from threading import Thread
6 import time
6 import time
7
7
8 import zmq
8 import zmq
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 # during garbage collection of threads at exit:
10 # during garbage collection of threads at exit:
11 from zmq import ZMQError
11 from zmq import ZMQError
12 from zmq.eventloop import ioloop, zmqstream
12 from zmq.eventloop import ioloop, zmqstream
13
13
14 from IPython.external.qt import QtCore
14 from IPython.external.qt import QtCore
15
15
16 # Local imports
16 # Local imports
17 from IPython.utils.traitlets import Type
17 from IPython.utils.traitlets import Type, Instance
18 from IPython.kernel.channels import HBChannel,\
18 from IPython.kernel.channels import HBChannel,\
19 make_shell_socket, make_iopub_socket, make_stdin_socket
19 make_shell_socket, make_iopub_socket, make_stdin_socket
20 from IPython.kernel import KernelClient
20 from IPython.kernel import KernelClient
21
21
22 from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin)
22 from .kernel_mixins import (QtHBChannelMixin, QtKernelClientMixin)
23 from .util import SuperQObject
23 from .util import SuperQObject
24
24
25 class QtHBChannel(QtHBChannelMixin, HBChannel):
25 class QtHBChannel(QtHBChannelMixin, HBChannel):
26 pass
26 pass
27
27
28 from IPython.core.release import kernel_protocol_version_info
28 from IPython.core.release import kernel_protocol_version_info
29
29
30 from IPython.kernel.channelsabc import (
30 from IPython.kernel.channelsabc import (
31 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
31 ShellChannelABC, IOPubChannelABC, StdInChannelABC,
32 )
32 )
33 from IPython.utils.py3compat import string_types, iteritems
33 from IPython.utils.py3compat import string_types, iteritems
34
34
35 major_protocol_version = kernel_protocol_version_info[0]
35 major_protocol_version = kernel_protocol_version_info[0]
36
36
37 class InvalidPortNumber(Exception):
37 class InvalidPortNumber(Exception):
38 pass
38 pass
39
39
40 # some utilities to validate message structure, these might get moved elsewhere
40 # some utilities to validate message structure, these might get moved elsewhere
41 # if they prove to have more generic utility
41 # if they prove to have more generic utility
42
42
43
43
44 def validate_string_dict(dct):
44 def validate_string_dict(dct):
45 """Validate that the input is a dict with string keys and values.
45 """Validate that the input is a dict with string keys and values.
46
46
47 Raises ValueError if not."""
47 Raises ValueError if not."""
48 for k,v in iteritems(dct):
48 for k,v in iteritems(dct):
49 if not isinstance(k, string_types):
49 if not isinstance(k, string_types):
50 raise ValueError('key %r in dict must be a string' % k)
50 raise ValueError('key %r in dict must be a string' % k)
51 if not isinstance(v, string_types):
51 if not isinstance(v, string_types):
52 raise ValueError('value %r in dict must be a string' % v)
52 raise ValueError('value %r in dict must be a string' % v)
53
53
54
54
55
55
56 class QtZMQSocketChannel(SuperQObject, Thread):
56 class QtZMQSocketChannel(SuperQObject):
57 """The base class for the channels that use ZMQ sockets."""
57 """The base class for the channels that use ZMQ sockets."""
58 session = None
58 session = None
59 socket = None
59 socket = None
60 ioloop = None
60 ioloop = None
61 stream = None
61 stream = None
62 _exiting = False
63 proxy_methods = []
64
65 # Emitted when the channel is started.
66 started = QtCore.Signal()
67
68 # Emitted when the channel is stopped.
69 stopped = QtCore.Signal()
70
62
71 message_received = QtCore.Signal(object)
63 message_received = QtCore.Signal(object)
72
64
73 #---------------------------------------------------------------------------
65 #---------------------------------------------------------------------------
74 # InProcessChannel interface
66 # InProcessChannel interface
75 #---------------------------------------------------------------------------
67 #---------------------------------------------------------------------------
76
68
77 def call_handlers_later(self, *args, **kwds):
69 def call_handlers_later(self, *args, **kwds):
78 """ Call the message handlers later.
70 """ Call the message handlers later.
79 """
71 """
80 do_later = lambda: self.call_handlers(*args, **kwds)
72 do_later = lambda: self.call_handlers(*args, **kwds)
81 QtCore.QTimer.singleShot(0, do_later)
73 QtCore.QTimer.singleShot(0, do_later)
82
74
83 def process_events(self):
75 def process_events(self):
84 """ Process any pending GUI events.
76 """ Process any pending GUI events.
85 """
77 """
86 QtCore.QCoreApplication.instance().processEvents()
78 QtCore.QCoreApplication.instance().processEvents()
87
79
88 def __init__(self, socket, session):
80 def __init__(self, socket, session, loop):
89 """Create a channel.
81 """Create a channel.
90
82
91 Parameters
83 Parameters
92 ----------
84 ----------
93 context : :class:`zmq.Context`
85 socket : :class:`zmq.Socket`
94 The ZMQ context to use.
86 The ZMQ socket to use.
95 session : :class:`session.Session`
87 session : :class:`session.Session`
96 The session to use.
88 The session to use.
97 address : zmq url
89 loop
98 Standard (ip, port) tuple that the kernel is listening on.
90 A pyzmq ioloop to connect the socket to using a ZMQStream
99 """
91 """
100 super(QtZMQSocketChannel, self).__init__()
92 super(QtZMQSocketChannel, self).__init__()
101 self.daemon = True
102
93
103 self.socket = socket
94 self.socket = socket
104 self.session = session
95 self.session = session
105 atexit.register(self._notice_exit)
96 self.ioloop = loop
106 self.ioloop = ioloop.IOLoop()
107
97
108 def _notice_exit(self):
98 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
109 self._exiting = True
99 self.stream.on_recv(self._handle_recv)
110
100
111 def _run_loop(self):
101 _is_alive = False
112 """Run my loop, ignoring EINTR events in the poller"""
102 def is_alive(self):
113 while True:
103 return self._is_alive
114 try:
115 self.ioloop.start()
116 except ZMQError as e:
117 if e.errno == errno.EINTR:
118 continue
119 else:
120 raise
121 except Exception:
122 if self._exiting:
123 break
124 else:
125 raise
126 else:
127 break
128
104
129 def start(self):
105 def start(self):
130 """ Reimplemented to emit signal.
106 self._is_alive = True
131 """
132 super(QtZMQSocketChannel, self).start()
133 self.started.emit()
134
135 def run(self):
136 """The thread's main activity. Call start() instead."""
137 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
138 self.stream.on_recv(self._handle_recv)
139 self._run_loop()
140
107
141 def stop(self):
108 def stop(self):
142 """Stop the channel's event loop and join its thread.
109 self._is_alive = False
143
144 This calls :meth:`~threading.Thread.join` and returns when the thread
145 terminates. :class:`RuntimeError` will be raised if
146 :meth:`~threading.Thread.start` is called again.
147 """
148 if self.ioloop is not None:
149 self.ioloop.stop()
150 self.join()
151 self.close()
152 self.stopped.emit()
153
110
154 def close(self):
111 def close(self):
155 if self.ioloop is not None:
156 try:
157 self.ioloop.close(all_fds=True)
158 except Exception:
159 pass
160 if self.socket is not None:
112 if self.socket is not None:
161 try:
113 try:
162 self.socket.close(linger=0)
114 self.socket.close(linger=0)
163 except Exception:
115 except Exception:
164 pass
116 pass
165 self.socket = None
117 self.socket = None
166
118
167 @property
119 @property
168 def address(self):
120 def address(self):
169 """Get the channel's address as a zmq url string.
121 """Get the channel's address as a zmq url string.
170
122
171 These URLS have the form: 'tcp://127.0.0.1:5555'.
123 These URLS have the form: 'tcp://127.0.0.1:5555'.
172 """
124 """
173 return self._address
125 return self._address
174
126
175 def _queue_send(self, msg):
127 def _queue_send(self, msg):
176 """Queue a message to be sent from the IOLoop's thread.
128 """Queue a message to be sent from the IOLoop's thread.
177
129
178 Parameters
130 Parameters
179 ----------
131 ----------
180 msg : message to send
132 msg : message to send
181
133
182 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
134 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
183 thread control of the action.
135 thread control of the action.
184 """
136 """
185 def thread_send():
137 def thread_send():
186 self.session.send(self.stream, msg)
138 self.session.send(self.stream, msg)
187 self.ioloop.add_callback(thread_send)
139 self.ioloop.add_callback(thread_send)
188
140
189 def _handle_recv(self, msg):
141 def _handle_recv(self, msg):
190 """Callback for stream.on_recv.
142 """Callback for stream.on_recv.
191
143
192 Unpacks message, and calls handlers with it.
144 Unpacks message, and calls handlers with it.
193 """
145 """
194 ident,smsg = self.session.feed_identities(msg)
146 ident,smsg = self.session.feed_identities(msg)
195 msg = self.session.deserialize(smsg)
147 msg = self.session.deserialize(smsg)
196 self.call_handlers(msg)
148 self.call_handlers(msg)
197
149
198 def call_handlers(self, msg):
150 def call_handlers(self, msg):
199 """This method is called in the ioloop thread when a message arrives.
151 """This method is called in the ioloop thread when a message arrives.
200
152
201 Subclasses should override this method to handle incoming messages.
153 Subclasses should override this method to handle incoming messages.
202 It is important to remember that this method is called in the thread
154 It is important to remember that this method is called in the thread
203 so that some logic must be done to ensure that the application level
155 so that some logic must be done to ensure that the application level
204 handlers are called in the application thread.
156 handlers are called in the application thread.
205 """
157 """
206 # Emit the generic signal.
158 # Emit the generic signal.
207 self.message_received.emit(msg)
159 self.message_received.emit(msg)
208
160
209 def flush(self, timeout=1.0):
161 def flush(self, timeout=1.0):
210 """Immediately processes all pending messages on this channel.
162 """Immediately processes all pending messages on this channel.
211
163
212 This is only used for the IOPub channel.
164 This is only used for the IOPub channel.
213
165
214 Callers should use this method to ensure that :meth:`call_handlers`
166 Callers should use this method to ensure that :meth:`call_handlers`
215 has been called for all messages that have been received on the
167 has been called for all messages that have been received on the
216 0MQ SUB socket of this channel.
168 0MQ SUB socket of this channel.
217
169
218 This method is thread safe.
170 This method is thread safe.
219
171
220 Parameters
172 Parameters
221 ----------
173 ----------
222 timeout : float, optional
174 timeout : float, optional
223 The maximum amount of time to spend flushing, in seconds. The
175 The maximum amount of time to spend flushing, in seconds. The
224 default is one second.
176 default is one second.
225 """
177 """
226 # We do the IOLoop callback process twice to ensure that the IOLoop
178 # We do the IOLoop callback process twice to ensure that the IOLoop
227 # gets to perform at least one full poll.
179 # gets to perform at least one full poll.
228 stop_time = time.time() + timeout
180 stop_time = time.time() + timeout
229 for i in range(2):
181 for i in range(2):
230 self._flushed = False
182 self._flushed = False
231 self.ioloop.add_callback(self._flush)
183 self.ioloop.add_callback(self._flush)
232 while not self._flushed and time.time() < stop_time:
184 while not self._flushed and time.time() < stop_time:
233 time.sleep(0.01)
185 time.sleep(0.01)
234
186
235 def _flush(self):
187 def _flush(self):
236 """Callback for :method:`self.flush`."""
188 """Callback for :method:`self.flush`."""
237 self.stream.flush()
189 self.stream.flush()
238 self._flushed = True
190 self._flushed = True
239
191
240
192
193 class IOLoopThread(Thread):
194 """Run a pyzmq ioloop in a thread to send and receive messages
195 """
196 def __init__(self, loop):
197 super(IOLoopThread, self).__init__()
198 self.daemon = True
199 atexit.register(self._notice_exit)
200 self.ioloop = loop or ioloop.IOLoop()
201
202 def _notice_exit(self):
203 self._exiting = True
204
205 def run(self):
206 """Run my loop, ignoring EINTR events in the poller"""
207 while True:
208 try:
209 self.ioloop.start()
210 except ZMQError as e:
211 if e.errno == errno.EINTR:
212 continue
213 else:
214 raise
215 except Exception:
216 if self._exiting:
217 break
218 else:
219 raise
220 else:
221 break
222
223 def stop(self):
224 """Stop the channel's event loop and join its thread.
225
226 This calls :meth:`~threading.Thread.join` and returns when the thread
227 terminates. :class:`RuntimeError` will be raised if
228 :meth:`~threading.Thread.start` is called again.
229 """
230 if self.ioloop is not None:
231 self.ioloop.stop()
232 self.join()
233 self.close()
234
235 def close(self):
236 if self.ioloop is not None:
237 try:
238 self.ioloop.close(all_fds=True)
239 except Exception:
240 pass
241
242
241 class QtKernelClient(QtKernelClientMixin, KernelClient):
243 class QtKernelClient(QtKernelClientMixin, KernelClient):
242 """ A KernelClient that provides signals and slots.
244 """ A KernelClient that provides signals and slots.
243 """
245 """
246
247 _ioloop = None
248 @property
249 def ioloop(self):
250 if self._ioloop is None:
251 self._ioloop = ioloop.IOLoop()
252 return self._ioloop
253
254 ioloop_thread = Instance(IOLoopThread)
255
244 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
256 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
245 if shell:
257 if shell:
246 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
258 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
259
260 self.channel_listener_thread = IOLoopThread(self.ioloop)
261 self.channel_listener_thread.start()
262
247 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
263 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
248
264
249 def _check_kernel_info_reply(self, msg):
265 def _check_kernel_info_reply(self, msg):
250 if msg['msg_type'] == 'kernel_info_reply':
266 if msg['msg_type'] == 'kernel_info_reply':
251 self._handle_kernel_info_reply(msg)
267 self._handle_kernel_info_reply(msg)
252 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
268 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
253
269
270 def stop_channels(self):
271 super(QtKernelClient, self).stop_channels()
272 if self.ioloop_thread.is_alive():
273 self.ioloop_thread.stop()
274
254 iopub_channel_class = Type(QtZMQSocketChannel)
275 iopub_channel_class = Type(QtZMQSocketChannel)
255 shell_channel_class = Type(QtZMQSocketChannel)
276 shell_channel_class = Type(QtZMQSocketChannel)
256 stdin_channel_class = Type(QtZMQSocketChannel)
277 stdin_channel_class = Type(QtZMQSocketChannel)
257 hb_channel_class = Type(QtHBChannel)
278 hb_channel_class = Type(QtHBChannel)
General Comments 0
You need to be logged in to leave comments. Login now