##// END OF EJS Templates
KernelClient: factor out generic threaded client from Qt client
Björn Linse -
Show More
@@ -0,0 +1,229 b''
1 """ Defines a KernelClient that provides thread-safe sockets with async callbacks on message replies.
2 """
3 import atexit
4 import errno
5 from threading import Thread
6 import time
7
8 import zmq
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 # during garbage collection of threads at exit:
11 from zmq import ZMQError
12 from zmq.eventloop import ioloop, zmqstream
13
14 # Local imports
15 from IPython.utils.traitlets import Type, Instance
16 from IPython.kernel.channels import HBChannel
17 from IPython.kernel import KernelClient
18 from IPython.kernel.channels import HBChannel
19
20 class ThreadedZMQSocketChannel(object):
21 """A ZMQ socket invoking a callback in the ioloop"""
22 session = None
23 socket = None
24 ioloop = None
25 stream = None
26 _inspect = None
27
28 def __init__(self, socket, session, loop):
29 """Create a channel.
30
31 Parameters
32 ----------
33 socket : :class:`zmq.Socket`
34 The ZMQ socket to use.
35 session : :class:`session.Session`
36 The session to use.
37 loop
38 A pyzmq ioloop to connect the socket to using a ZMQStream
39 """
40 super(ThreadedZMQSocketChannel, self).__init__()
41
42 self.socket = socket
43 self.session = session
44 self.ioloop = loop
45
46 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
47 self.stream.on_recv(self._handle_recv)
48
49 _is_alive = False
50 def is_alive(self):
51 return self._is_alive
52
53 def start(self):
54 self._is_alive = True
55
56 def stop(self):
57 self._is_alive = False
58
59 def close(self):
60 if self.socket is not None:
61 try:
62 self.socket.close(linger=0)
63 except Exception:
64 pass
65 self.socket = None
66
67 def send(self, msg):
68 """Queue a message to be sent from the IOLoop's thread.
69
70 Parameters
71 ----------
72 msg : message to send
73
74 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
75 thread control of the action.
76 """
77 def thread_send():
78 self.session.send(self.stream, msg)
79 self.ioloop.add_callback(thread_send)
80
81 def _handle_recv(self, msg):
82 """Callback for stream.on_recv.
83
84 Unpacks message, and calls handlers with it.
85 """
86 ident,smsg = self.session.feed_identities(msg)
87 msg = self.session.deserialize(smsg)
88 # let client inspect messages
89 if self._inspect:
90 self._inspect(msg)
91 self.call_handlers(msg)
92
93 def call_handlers(self, msg):
94 """This method is called in the ioloop thread when a message arrives.
95
96 Subclasses should override this method to handle incoming messages.
97 It is important to remember that this method is called in the thread
98 so that some logic must be done to ensure that the application level
99 handlers are called in the application thread.
100 """
101 pass
102
103 def process_events(self):
104 """Subclasses should override this with a method
105 processing any pending GUI events.
106 """
107 pass
108
109
110 def flush(self, timeout=1.0):
111 """Immediately processes all pending messages on this channel.
112
113 This is only used for the IOPub channel.
114
115 Callers should use this method to ensure that :meth:`call_handlers`
116 has been called for all messages that have been received on the
117 0MQ SUB socket of this channel.
118
119 This method is thread safe.
120
121 Parameters
122 ----------
123 timeout : float, optional
124 The maximum amount of time to spend flushing, in seconds. The
125 default is one second.
126 """
127 # We do the IOLoop callback process twice to ensure that the IOLoop
128 # gets to perform at least one full poll.
129 stop_time = time.time() + timeout
130 for i in range(2):
131 self._flushed = False
132 self.ioloop.add_callback(self._flush)
133 while not self._flushed and time.time() < stop_time:
134 time.sleep(0.01)
135
136 def _flush(self):
137 """Callback for :method:`self.flush`."""
138 self.stream.flush()
139 self._flushed = True
140
141
142 class IOLoopThread(Thread):
143 """Run a pyzmq ioloop in a thread to send and receive messages
144 """
145 def __init__(self, loop):
146 super(IOLoopThread, self).__init__()
147 self.daemon = True
148 atexit.register(self._notice_exit)
149 self.ioloop = loop or ioloop.IOLoop()
150
151 def _notice_exit(self):
152 self._exiting = True
153
154 def run(self):
155 """Run my loop, ignoring EINTR events in the poller"""
156 while True:
157 try:
158 self.ioloop.start()
159 except ZMQError as e:
160 if e.errno == errno.EINTR:
161 continue
162 else:
163 raise
164 except Exception:
165 if self._exiting:
166 break
167 else:
168 raise
169 else:
170 break
171
172 def stop(self):
173 """Stop the channel's event loop and join its thread.
174
175 This calls :meth:`~threading.Thread.join` and returns when the thread
176 terminates. :class:`RuntimeError` will be raised if
177 :meth:`~threading.Thread.start` is called again.
178 """
179 if self.ioloop is not None:
180 self.ioloop.stop()
181 self.join()
182 self.close()
183
184 def close(self):
185 if self.ioloop is not None:
186 try:
187 self.ioloop.close(all_fds=True)
188 except Exception:
189 pass
190
191
192 class ThreadedKernelClient(KernelClient):
193 """ A KernelClient that provides thread-safe sockets with async callbacks on message replies.
194 """
195
196 _ioloop = None
197 @property
198 def ioloop(self):
199 if self._ioloop is None:
200 self._ioloop = ioloop.IOLoop()
201 return self._ioloop
202
203 ioloop_thread = Instance(IOLoopThread)
204
205 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
206 if shell:
207 self.shell_channel._inspect = self._check_kernel_info_reply
208
209 self.ioloop_thread = IOLoopThread(self.ioloop)
210 self.ioloop_thread.start()
211
212 super(ThreadedKernelClient, self).start_channels(shell, iopub, stdin, hb)
213
214 def _check_kernel_info_reply(self, msg):
215 """This is run in the ioloop thread when the kernel info reply is recieved
216 """
217 if msg['msg_type'] == 'kernel_info_reply':
218 self._handle_kernel_info_reply(msg)
219 self.shell_channel._inspect = None
220
221 def stop_channels(self):
222 super(ThreadedKernelClient, self).stop_channels()
223 if self.ioloop_thread.is_alive():
224 self.ioloop_thread.stop()
225
226 iopub_channel_class = Type(ThreadedZMQSocketChannel)
227 shell_channel_class = Type(ThreadedZMQSocketChannel)
228 stdin_channel_class = Type(ThreadedZMQSocketChannel)
229 hb_channel_class = Type(HBChannel)
@@ -1,386 +1,387 b''
1 """Base class to manage the interaction with a running kernel"""
1 """Base class to manage the interaction with a running kernel"""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 from __future__ import absolute_import
6 from __future__ import absolute_import
7 from IPython.kernel.channels import major_protocol_version
7 from IPython.kernel.channels import major_protocol_version
8 from IPython.utils.py3compat import string_types, iteritems
8 from IPython.utils.py3compat import string_types, iteritems
9
9
10 import zmq
10 import zmq
11
11
12 from IPython.utils.traitlets import (
12 from IPython.utils.traitlets import (
13 Any, Instance, Type,
13 Any, Instance, Type,
14 )
14 )
15
15
16 from .channelsabc import (ChannelABC, HBChannelABC)
16 from .channelsabc import (ChannelABC, HBChannelABC)
17 from .clientabc import KernelClientABC
17 from .clientabc import KernelClientABC
18 from .connect import ConnectionFileMixin
18 from .connect import ConnectionFileMixin
19
19
20
20
21 # some utilities to validate message structure, these might get moved elsewhere
21 # some utilities to validate message structure, these might get moved elsewhere
22 # if they prove to have more generic utility
22 # if they prove to have more generic utility
23
23
24 def validate_string_dict(dct):
24 def validate_string_dict(dct):
25 """Validate that the input is a dict with string keys and values.
25 """Validate that the input is a dict with string keys and values.
26
26
27 Raises ValueError if not."""
27 Raises ValueError if not."""
28 for k,v in iteritems(dct):
28 for k,v in iteritems(dct):
29 if not isinstance(k, string_types):
29 if not isinstance(k, string_types):
30 raise ValueError('key %r in dict must be a string' % k)
30 raise ValueError('key %r in dict must be a string' % k)
31 if not isinstance(v, string_types):
31 if not isinstance(v, string_types):
32 raise ValueError('value %r in dict must be a string' % v)
32 raise ValueError('value %r in dict must be a string' % v)
33
33
34
34
35 class KernelClient(ConnectionFileMixin):
35 class KernelClient(ConnectionFileMixin):
36 """Communicates with a single kernel on any host via zmq channels.
36 """Communicates with a single kernel on any host via zmq channels.
37
37
38 There are four channels associated with each kernel:
38 There are four channels associated with each kernel:
39
39
40 * shell: for request/reply calls to the kernel.
40 * shell: for request/reply calls to the kernel.
41 * iopub: for the kernel to publish results to frontends.
41 * iopub: for the kernel to publish results to frontends.
42 * hb: for monitoring the kernel's heartbeat.
42 * hb: for monitoring the kernel's heartbeat.
43 * stdin: for frontends to reply to raw_input calls in the kernel.
43 * stdin: for frontends to reply to raw_input calls in the kernel.
44
44
45 The methods of the channels are exposed as methods of the client itself
45 The methods of the channels are exposed as methods of the client itself
46 (KernelClient.execute, complete, history, etc.).
46 (KernelClient.execute, complete, history, etc.).
47 See the channels themselves for documentation of these methods.
47 See the channels themselves for documentation of these methods.
48
48
49 """
49 """
50
50
51 # The PyZMQ Context to use for communication with the kernel.
51 # The PyZMQ Context to use for communication with the kernel.
52 context = Instance(zmq.Context)
52 context = Instance(zmq.Context)
53 def _context_default(self):
53 def _context_default(self):
54 return zmq.Context.instance()
54 return zmq.Context.instance()
55
55
56 # The classes to use for the various channels
56 # The classes to use for the various channels
57 shell_channel_class = Type(ChannelABC)
57 shell_channel_class = Type(ChannelABC)
58 iopub_channel_class = Type(ChannelABC)
58 iopub_channel_class = Type(ChannelABC)
59 stdin_channel_class = Type(ChannelABC)
59 stdin_channel_class = Type(ChannelABC)
60 hb_channel_class = Type(HBChannelABC)
60 hb_channel_class = Type(HBChannelABC)
61
61
62 # Protected traits
62 # Protected traits
63 _shell_channel = Any
63 _shell_channel = Any
64 _iopub_channel = Any
64 _iopub_channel = Any
65 _stdin_channel = Any
65 _stdin_channel = Any
66 _hb_channel = Any
66 _hb_channel = Any
67
67
68 # flag for whether execute requests should be allowed to call raw_input:
68 # flag for whether execute requests should be allowed to call raw_input:
69 allow_stdin = True
69 allow_stdin = True
70
70
71 #--------------------------------------------------------------------------
71 #--------------------------------------------------------------------------
72 # Channel proxy methods
72 # Channel proxy methods
73 #--------------------------------------------------------------------------
73 #--------------------------------------------------------------------------
74
74
75 def _get_msg(channel, *args, **kwargs):
75 def _get_msg(channel, *args, **kwargs):
76 return channel.get_msg(*args, **kwargs)
76 return channel.get_msg(*args, **kwargs)
77
77
78 def get_shell_msg(self, *args, **kwargs):
78 def get_shell_msg(self, *args, **kwargs):
79 """Get a message from the shell channel"""
79 """Get a message from the shell channel"""
80 return self.shell_channel.get_msg(*args, **kwargs)
80 return self.shell_channel.get_msg(*args, **kwargs)
81
81
82 def get_iopub_msg(self, *args, **kwargs):
82 def get_iopub_msg(self, *args, **kwargs):
83 """Get a message from the iopub channel"""
83 """Get a message from the iopub channel"""
84 return self.iopub_channel.get_msg(*args, **kwargs)
84 return self.iopub_channel.get_msg(*args, **kwargs)
85
85
86 def get_stdin_msg(self, *args, **kwargs):
86 def get_stdin_msg(self, *args, **kwargs):
87 """Get a message from the stdin channel"""
87 """Get a message from the stdin channel"""
88 return self.stdin_channel.get_msg(*args, **kwargs)
88 return self.stdin_channel.get_msg(*args, **kwargs)
89
89
90 #--------------------------------------------------------------------------
90 #--------------------------------------------------------------------------
91 # Channel management methods
91 # Channel management methods
92 #--------------------------------------------------------------------------
92 #--------------------------------------------------------------------------
93
93
94 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
94 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
95 """Starts the channels for this kernel.
95 """Starts the channels for this kernel.
96
96
97 This will create the channels if they do not exist and then start
97 This will create the channels if they do not exist and then start
98 them (their activity runs in a thread). If port numbers of 0 are
98 them (their activity runs in a thread). If port numbers of 0 are
99 being used (random ports) then you must first call
99 being used (random ports) then you must first call
100 :meth:`start_kernel`. If the channels have been stopped and you
100 :meth:`start_kernel`. If the channels have been stopped and you
101 call this, :class:`RuntimeError` will be raised.
101 call this, :class:`RuntimeError` will be raised.
102 """
102 """
103 if shell:
103 if shell:
104 self.shell_channel.start()
104 self.shell_channel.start()
105 self.kernel_info()
105 self.kernel_info()
106 if iopub:
106 if iopub:
107 self.iopub_channel.start()
107 self.iopub_channel.start()
108 if stdin:
108 if stdin:
109 self.stdin_channel.start()
109 self.stdin_channel.start()
110 self.allow_stdin = True
110 self.allow_stdin = True
111 else:
111 else:
112 self.allow_stdin = False
112 self.allow_stdin = False
113 if hb:
113 if hb:
114 self.hb_channel.start()
114 self.hb_channel.start()
115
115
116 def stop_channels(self):
116 def stop_channels(self):
117 """Stops all the running channels for this kernel.
117 """Stops all the running channels for this kernel.
118
118
119 This stops their event loops and joins their threads.
119 This stops their event loops and joins their threads.
120 """
120 """
121 if self.shell_channel.is_alive():
121 if self.shell_channel.is_alive():
122 self.shell_channel.stop()
122 self.shell_channel.stop()
123 if self.iopub_channel.is_alive():
123 if self.iopub_channel.is_alive():
124 self.iopub_channel.stop()
124 self.iopub_channel.stop()
125 if self.stdin_channel.is_alive():
125 if self.stdin_channel.is_alive():
126 self.stdin_channel.stop()
126 self.stdin_channel.stop()
127 if self.hb_channel.is_alive():
127 if self.hb_channel.is_alive():
128 self.hb_channel.stop()
128 self.hb_channel.stop()
129
129
130 @property
130 @property
131 def channels_running(self):
131 def channels_running(self):
132 """Are any of the channels created and running?"""
132 """Are any of the channels created and running?"""
133 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
133 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
134 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
134 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
135
135
136 ioloop = None # Overridden in subclasses that use pyzmq event loop
136 ioloop = None # Overridden in subclasses that use pyzmq event loop
137
137
138 @property
138 @property
139 def shell_channel(self):
139 def shell_channel(self):
140 """Get the shell channel object for this kernel."""
140 """Get the shell channel object for this kernel."""
141 if self._shell_channel is None:
141 if self._shell_channel is None:
142 url = self._make_url('shell')
142 url = self._make_url('shell')
143 self.log.debug("connecting shell channel to %s", url)
143 self.log.debug("connecting shell channel to %s", url)
144 socket = self.connect_shell(identity=self.session.bsession)
144 socket = self.connect_shell(identity=self.session.bsession)
145 self._shell_channel = self.shell_channel_class(
145 self._shell_channel = self.shell_channel_class(
146 socket, self.session, self.ioloop
146 socket, self.session, self.ioloop
147 )
147 )
148 return self._shell_channel
148 return self._shell_channel
149
149
150 @property
150 @property
151 def iopub_channel(self):
151 def iopub_channel(self):
152 """Get the iopub channel object for this kernel."""
152 """Get the iopub channel object for this kernel."""
153 if self._iopub_channel is None:
153 if self._iopub_channel is None:
154 url = self._make_url('iopub')
154 url = self._make_url('iopub')
155 self.log.debug("connecting iopub channel to %s", url)
155 self.log.debug("connecting iopub channel to %s", url)
156 socket = self.connect_iopub()
156 socket = self.connect_iopub()
157 self._iopub_channel = self.iopub_channel_class(
157 self._iopub_channel = self.iopub_channel_class(
158 socket, self.session, self.ioloop
158 socket, self.session, self.ioloop
159 )
159 )
160 return self._iopub_channel
160 return self._iopub_channel
161
161
162 @property
162 @property
163 def stdin_channel(self):
163 def stdin_channel(self):
164 """Get the stdin channel object for this kernel."""
164 """Get the stdin channel object for this kernel."""
165 if self._stdin_channel is None:
165 if self._stdin_channel is None:
166 url = self._make_url('stdin')
166 url = self._make_url('stdin')
167 self.log.debug("connecting stdin channel to %s", url)
167 self.log.debug("connecting stdin channel to %s", url)
168 socket = self.connect_stdin(identity=self.session.bsession)
168 socket = self.connect_stdin(identity=self.session.bsession)
169 self._stdin_channel = self.stdin_channel_class(
169 self._stdin_channel = self.stdin_channel_class(
170 socket, self.session, self.ioloop
170 socket, self.session, self.ioloop
171 )
171 )
172 return self._stdin_channel
172 return self._stdin_channel
173
173
174 @property
174 @property
175 def hb_channel(self):
175 def hb_channel(self):
176 """Get the hb channel object for this kernel."""
176 """Get the hb channel object for this kernel."""
177 if self._hb_channel is None:
177 if self._hb_channel is None:
178 url = self._make_url('hb')
178 url = self._make_url('hb')
179 self.log.debug("connecting heartbeat channel to %s", url)
179 self.log.debug("connecting heartbeat channel to %s", url)
180 self._hb_channel = self.hb_channel_class(
180 self._hb_channel = self.hb_channel_class(
181 self.context, self.session, url
181 self.context, self.session, url
182 )
182 )
183 return self._hb_channel
183 return self._hb_channel
184
184
185 def is_alive(self):
185 def is_alive(self):
186 """Is the kernel process still running?"""
186 """Is the kernel process still running?"""
187 if self._hb_channel is not None:
187 if self._hb_channel is not None:
188 # We didn't start the kernel with this KernelManager so we
188 # We didn't start the kernel with this KernelManager so we
189 # use the heartbeat.
189 # use the heartbeat.
190 return self._hb_channel.is_beating()
190 return self._hb_channel.is_beating()
191 else:
191 else:
192 # no heartbeat and not local, we can't tell if it's running,
192 # no heartbeat and not local, we can't tell if it's running,
193 # so naively return True
193 # so naively return True
194 return True
194 return True
195
195
196
196
197 # Methods to send specific messages on channels
197 # Methods to send specific messages on channels
198 def execute(self, code, silent=False, store_history=True,
198 def execute(self, code, silent=False, store_history=True,
199 user_expressions=None, allow_stdin=None):
199 user_expressions=None, allow_stdin=None):
200 """Execute code in the kernel.
200 """Execute code in the kernel.
201
201
202 Parameters
202 Parameters
203 ----------
203 ----------
204 code : str
204 code : str
205 A string of Python code.
205 A string of Python code.
206
206
207 silent : bool, optional (default False)
207 silent : bool, optional (default False)
208 If set, the kernel will execute the code as quietly possible, and
208 If set, the kernel will execute the code as quietly possible, and
209 will force store_history to be False.
209 will force store_history to be False.
210
210
211 store_history : bool, optional (default True)
211 store_history : bool, optional (default True)
212 If set, the kernel will store command history. This is forced
212 If set, the kernel will store command history. This is forced
213 to be False if silent is True.
213 to be False if silent is True.
214
214
215 user_expressions : dict, optional
215 user_expressions : dict, optional
216 A dict mapping names to expressions to be evaluated in the user's
216 A dict mapping names to expressions to be evaluated in the user's
217 dict. The expression values are returned as strings formatted using
217 dict. The expression values are returned as strings formatted using
218 :func:`repr`.
218 :func:`repr`.
219
219
220 allow_stdin : bool, optional (default self.allow_stdin)
220 allow_stdin : bool, optional (default self.allow_stdin)
221 Flag for whether the kernel can send stdin requests to frontends.
221 Flag for whether the kernel can send stdin requests to frontends.
222
222
223 Some frontends (e.g. the Notebook) do not support stdin requests.
223 Some frontends (e.g. the Notebook) do not support stdin requests.
224 If raw_input is called from code executed from such a frontend, a
224 If raw_input is called from code executed from such a frontend, a
225 StdinNotImplementedError will be raised.
225 StdinNotImplementedError will be raised.
226
226
227 Returns
227 Returns
228 -------
228 -------
229 The msg_id of the message sent.
229 The msg_id of the message sent.
230 """
230 """
231 if user_expressions is None:
231 if user_expressions is None:
232 user_expressions = {}
232 user_expressions = {}
233 if allow_stdin is None:
233 if allow_stdin is None:
234 allow_stdin = self.allow_stdin
234 allow_stdin = self.allow_stdin
235
235
236
236
237 # Don't waste network traffic if inputs are invalid
237 # Don't waste network traffic if inputs are invalid
238 if not isinstance(code, string_types):
238 if not isinstance(code, string_types):
239 raise ValueError('code %r must be a string' % code)
239 raise ValueError('code %r must be a string' % code)
240 validate_string_dict(user_expressions)
240 validate_string_dict(user_expressions)
241
241
242 # Create class for content/msg creation. Related to, but possibly
242 # Create class for content/msg creation. Related to, but possibly
243 # not in Session.
243 # not in Session.
244 content = dict(code=code, silent=silent, store_history=store_history,
244 content = dict(code=code, silent=silent, store_history=store_history,
245 user_expressions=user_expressions,
245 user_expressions=user_expressions,
246 allow_stdin=allow_stdin,
246 allow_stdin=allow_stdin,
247 )
247 )
248 msg = self.session.msg('execute_request', content)
248 msg = self.session.msg('execute_request', content)
249 self.shell_channel.send(msg)
249 self.shell_channel.send(msg)
250 return msg['header']['msg_id']
250 return msg['header']['msg_id']
251
251
252 def complete(self, code, cursor_pos=None):
252 def complete(self, code, cursor_pos=None):
253 """Tab complete text in the kernel's namespace.
253 """Tab complete text in the kernel's namespace.
254
254
255 Parameters
255 Parameters
256 ----------
256 ----------
257 code : str
257 code : str
258 The context in which completion is requested.
258 The context in which completion is requested.
259 Can be anything between a variable name and an entire cell.
259 Can be anything between a variable name and an entire cell.
260 cursor_pos : int, optional
260 cursor_pos : int, optional
261 The position of the cursor in the block of code where the completion was requested.
261 The position of the cursor in the block of code where the completion was requested.
262 Default: ``len(code)``
262 Default: ``len(code)``
263
263
264 Returns
264 Returns
265 -------
265 -------
266 The msg_id of the message sent.
266 The msg_id of the message sent.
267 """
267 """
268 if cursor_pos is None:
268 if cursor_pos is None:
269 cursor_pos = len(code)
269 cursor_pos = len(code)
270 content = dict(code=code, cursor_pos=cursor_pos)
270 content = dict(code=code, cursor_pos=cursor_pos)
271 msg = self.session.msg('complete_request', content)
271 msg = self.session.msg('complete_request', content)
272 self.shell_channel.send(msg)
272 self.shell_channel.send(msg)
273 return msg['header']['msg_id']
273 return msg['header']['msg_id']
274
274
275 def inspect(self, code, cursor_pos=None, detail_level=0):
275 def inspect(self, code, cursor_pos=None, detail_level=0):
276 """Get metadata information about an object in the kernel's namespace.
276 """Get metadata information about an object in the kernel's namespace.
277
277
278 It is up to the kernel to determine the appropriate object to inspect.
278 It is up to the kernel to determine the appropriate object to inspect.
279
279
280 Parameters
280 Parameters
281 ----------
281 ----------
282 code : str
282 code : str
283 The context in which info is requested.
283 The context in which info is requested.
284 Can be anything between a variable name and an entire cell.
284 Can be anything between a variable name and an entire cell.
285 cursor_pos : int, optional
285 cursor_pos : int, optional
286 The position of the cursor in the block of code where the info was requested.
286 The position of the cursor in the block of code where the info was requested.
287 Default: ``len(code)``
287 Default: ``len(code)``
288 detail_level : int, optional
288 detail_level : int, optional
289 The level of detail for the introspection (0-2)
289 The level of detail for the introspection (0-2)
290
290
291 Returns
291 Returns
292 -------
292 -------
293 The msg_id of the message sent.
293 The msg_id of the message sent.
294 """
294 """
295 if cursor_pos is None:
295 if cursor_pos is None:
296 cursor_pos = len(code)
296 cursor_pos = len(code)
297 content = dict(code=code, cursor_pos=cursor_pos,
297 content = dict(code=code, cursor_pos=cursor_pos,
298 detail_level=detail_level,
298 detail_level=detail_level,
299 )
299 )
300 msg = self.session.msg('inspect_request', content)
300 msg = self.session.msg('inspect_request', content)
301 self.shell_channel.send(msg)
301 self.shell_channel.send(msg)
302 return msg['header']['msg_id']
302 return msg['header']['msg_id']
303
303
304 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
304 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
305 """Get entries from the kernel's history list.
305 """Get entries from the kernel's history list.
306
306
307 Parameters
307 Parameters
308 ----------
308 ----------
309 raw : bool
309 raw : bool
310 If True, return the raw input.
310 If True, return the raw input.
311 output : bool
311 output : bool
312 If True, then return the output as well.
312 If True, then return the output as well.
313 hist_access_type : str
313 hist_access_type : str
314 'range' (fill in session, start and stop params), 'tail' (fill in n)
314 'range' (fill in session, start and stop params), 'tail' (fill in n)
315 or 'search' (fill in pattern param).
315 or 'search' (fill in pattern param).
316
316
317 session : int
317 session : int
318 For a range request, the session from which to get lines. Session
318 For a range request, the session from which to get lines. Session
319 numbers are positive integers; negative ones count back from the
319 numbers are positive integers; negative ones count back from the
320 current session.
320 current session.
321 start : int
321 start : int
322 The first line number of a history range.
322 The first line number of a history range.
323 stop : int
323 stop : int
324 The final (excluded) line number of a history range.
324 The final (excluded) line number of a history range.
325
325
326 n : int
326 n : int
327 The number of lines of history to get for a tail request.
327 The number of lines of history to get for a tail request.
328
328
329 pattern : str
329 pattern : str
330 The glob-syntax pattern for a search request.
330 The glob-syntax pattern for a search request.
331
331
332 Returns
332 Returns
333 -------
333 -------
334 The msg_id of the message sent.
334 The msg_id of the message sent.
335 """
335 """
336 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
336 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
337 **kwargs)
337 **kwargs)
338 msg = self.session.msg('history_request', content)
338 msg = self.session.msg('history_request', content)
339 self.shell_channel.send(msg)
339 self.shell_channel.send(msg)
340 return msg['header']['msg_id']
340 return msg['header']['msg_id']
341
341
342 def kernel_info(self):
342 def kernel_info(self):
343 """Request kernel info."""
343 """Request kernel info."""
344 msg = self.session.msg('kernel_info_request')
344 msg = self.session.msg('kernel_info_request')
345 self.shell_channel.send(msg)
345 self.shell_channel.send(msg)
346 return msg['header']['msg_id']
346 return msg['header']['msg_id']
347
347
348 def _handle_kernel_info_reply(self, msg):
348 def _handle_kernel_info_reply(self, msg):
349 """handle kernel info reply
349 """handle kernel info reply
350
350
351 sets protocol adaptation version
351 sets protocol adaptation version. This might
352 be run from a separate thread.
352 """
353 """
353 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
354 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
354 if adapt_version != major_protocol_version:
355 if adapt_version != major_protocol_version:
355 self.session.adapt_version = adapt_version
356 self.session.adapt_version = adapt_version
356
357
357 def shutdown(self, restart=False):
358 def shutdown(self, restart=False):
358 """Request an immediate kernel shutdown.
359 """Request an immediate kernel shutdown.
359
360
360 Upon receipt of the (empty) reply, client code can safely assume that
361 Upon receipt of the (empty) reply, client code can safely assume that
361 the kernel has shut down and it's safe to forcefully terminate it if
362 the kernel has shut down and it's safe to forcefully terminate it if
362 it's still alive.
363 it's still alive.
363
364
364 The kernel will send the reply via a function registered with Python's
365 The kernel will send the reply via a function registered with Python's
365 atexit module, ensuring it's truly done as the kernel is done with all
366 atexit module, ensuring it's truly done as the kernel is done with all
366 normal operation.
367 normal operation.
367 """
368 """
368 # Send quit message to kernel. Once we implement kernel-side setattr,
369 # Send quit message to kernel. Once we implement kernel-side setattr,
369 # this should probably be done that way, but for now this will do.
370 # this should probably be done that way, but for now this will do.
370 msg = self.session.msg('shutdown_request', {'restart':restart})
371 msg = self.session.msg('shutdown_request', {'restart':restart})
371 self.shell_channel.send(msg)
372 self.shell_channel.send(msg)
372 return msg['header']['msg_id']
373 return msg['header']['msg_id']
373
374
374 def is_complete(self, code):
375 def is_complete(self, code):
375 msg = self.session.msg('is_complete_request', {'code': code})
376 msg = self.session.msg('is_complete_request', {'code': code})
376 self.shell_channel.send(msg)
377 self.shell_channel.send(msg)
377 return msg['header']['msg_id']
378 return msg['header']['msg_id']
378
379
379 def input(self, string):
380 def input(self, string):
380 """Send a string of raw input to the kernel."""
381 """Send a string of raw input to the kernel."""
381 content = dict(value=string)
382 content = dict(value=string)
382 msg = self.session.msg('input_reply', content)
383 msg = self.session.msg('input_reply', content)
383 self.stdin_channel.send(msg)
384 self.stdin_channel.send(msg)
384
385
385
386
386 KernelClientABC.register(KernelClient)
387 KernelClientABC.register(KernelClient)
@@ -1,249 +1,71 b''
1 """ Defines a KernelClient that provides signals and slots.
1 """ Defines a KernelClient that provides signals and slots.
2 """
2 """
3 import atexit
3 import atexit
4 import errno
4 import errno
5 from threading import Thread
5 from threading import Thread
6 import time
6 import time
7
7
8 import zmq
8 import zmq
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 # during garbage collection of threads at exit:
10 # during garbage collection of threads at exit:
11 from zmq import ZMQError
11 from zmq import ZMQError
12 from zmq.eventloop import ioloop, zmqstream
12 from zmq.eventloop import ioloop, zmqstream
13
13
14 from IPython.external.qt import QtCore
14 from IPython.external.qt import QtCore
15
15
16 # Local imports
16 # Local imports
17 from IPython.utils.traitlets import Type, Instance
17 from IPython.utils.traitlets import Type, Instance
18 from IPython.kernel.channels import HBChannel
18 from IPython.kernel.channels import HBChannel
19 from IPython.kernel import KernelClient
19 from IPython.kernel import KernelClient
20 from IPython.kernel.channels import InvalidPortNumber
21 from IPython.kernel.threaded import ThreadedKernelClient, ThreadedZMQSocketChannel
20
22
21 from .kernel_mixins import QtKernelClientMixin
23 from .kernel_mixins import QtKernelClientMixin
22 from .util import SuperQObject
24 from .util import SuperQObject
23
25
24 class QtHBChannel(SuperQObject, HBChannel):
26 class QtHBChannel(SuperQObject, HBChannel):
25 # A longer timeout than the base class
27 # A longer timeout than the base class
26 time_to_dead = 3.0
28 time_to_dead = 3.0
27
29
28 # Emitted when the kernel has died.
30 # Emitted when the kernel has died.
29 kernel_died = QtCore.Signal(object)
31 kernel_died = QtCore.Signal(object)
30
32
31 def call_handlers(self, since_last_heartbeat):
33 def call_handlers(self, since_last_heartbeat):
32 """ Reimplemented to emit signals instead of making callbacks.
34 """ Reimplemented to emit signals instead of making callbacks.
33 """
35 """
34 # Emit the generic signal.
36 # Emit the generic signal.
35 self.kernel_died.emit(since_last_heartbeat)
37 self.kernel_died.emit(since_last_heartbeat)
36
38
37 from IPython.core.release import kernel_protocol_version_info
39 from IPython.core.release import kernel_protocol_version_info
38
40
39 major_protocol_version = kernel_protocol_version_info[0]
41 major_protocol_version = kernel_protocol_version_info[0]
40
42
41 class InvalidPortNumber(Exception):
43 class QtZMQSocketChannel(ThreadedZMQSocketChannel,SuperQObject):
42 pass
43
44
45 class QtZMQSocketChannel(SuperQObject):
46 """A ZMQ socket emitting a Qt signal when a message is received."""
44 """A ZMQ socket emitting a Qt signal when a message is received."""
47 session = None
48 socket = None
49 ioloop = None
50 stream = None
51
52 message_received = QtCore.Signal(object)
45 message_received = QtCore.Signal(object)
53
46
54 def process_events(self):
47 def process_events(self):
55 """ Process any pending GUI events.
48 """ Process any pending GUI events.
56 """
49 """
57 QtCore.QCoreApplication.instance().processEvents()
50 QtCore.QCoreApplication.instance().processEvents()
58
51
59 def __init__(self, socket, session, loop):
60 """Create a channel.
61
62 Parameters
63 ----------
64 socket : :class:`zmq.Socket`
65 The ZMQ socket to use.
66 session : :class:`session.Session`
67 The session to use.
68 loop
69 A pyzmq ioloop to connect the socket to using a ZMQStream
70 """
71 super(QtZMQSocketChannel, self).__init__()
72
73 self.socket = socket
74 self.session = session
75 self.ioloop = loop
76
77 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
78 self.stream.on_recv(self._handle_recv)
79
80 _is_alive = False
81 def is_alive(self):
82 return self._is_alive
83
84 def start(self):
85 self._is_alive = True
86
87 def stop(self):
88 self._is_alive = False
89
90 def close(self):
91 if self.socket is not None:
92 try:
93 self.socket.close(linger=0)
94 except Exception:
95 pass
96 self.socket = None
97
98 def send(self, msg):
99 """Queue a message to be sent from the IOLoop's thread.
100
101 Parameters
102 ----------
103 msg : message to send
104
105 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
106 thread control of the action.
107 """
108 def thread_send():
109 self.session.send(self.stream, msg)
110 self.ioloop.add_callback(thread_send)
111
112 def _handle_recv(self, msg):
113 """Callback for stream.on_recv.
114
115 Unpacks message, and calls handlers with it.
116 """
117 ident,smsg = self.session.feed_identities(msg)
118 msg = self.session.deserialize(smsg)
119 self.call_handlers(msg)
120
52
121 def call_handlers(self, msg):
53 def call_handlers(self, msg):
122 """This method is called in the ioloop thread when a message arrives.
54 """This method is called in the ioloop thread when a message arrives.
123
55
124 Subclasses should override this method to handle incoming messages.
125 It is important to remember that this method is called in the thread
56 It is important to remember that this method is called in the thread
126 so that some logic must be done to ensure that the application level
57 so that some logic must be done to ensure that the application level
127 handlers are called in the application thread.
58 handlers are called in the application thread.
128 """
59 """
129 # Emit the generic signal.
60 # Emit the generic signal.
130 self.message_received.emit(msg)
61 self.message_received.emit(msg)
131
62
132 def flush(self, timeout=1.0):
133 """Immediately processes all pending messages on this channel.
134
63
135 This is only used for the IOPub channel.
64 class QtKernelClient(QtKernelClientMixin, ThreadedKernelClient):
136
137 Callers should use this method to ensure that :meth:`call_handlers`
138 has been called for all messages that have been received on the
139 0MQ SUB socket of this channel.
140
141 This method is thread safe.
142
143 Parameters
144 ----------
145 timeout : float, optional
146 The maximum amount of time to spend flushing, in seconds. The
147 default is one second.
148 """
149 # We do the IOLoop callback process twice to ensure that the IOLoop
150 # gets to perform at least one full poll.
151 stop_time = time.time() + timeout
152 for i in range(2):
153 self._flushed = False
154 self.ioloop.add_callback(self._flush)
155 while not self._flushed and time.time() < stop_time:
156 time.sleep(0.01)
157
158 def _flush(self):
159 """Callback for :method:`self.flush`."""
160 self.stream.flush()
161 self._flushed = True
162
163
164 class IOLoopThread(Thread):
165 """Run a pyzmq ioloop in a thread to send and receive messages
166 """
167 def __init__(self, loop):
168 super(IOLoopThread, self).__init__()
169 self.daemon = True
170 atexit.register(self._notice_exit)
171 self.ioloop = loop or ioloop.IOLoop()
172
173 def _notice_exit(self):
174 self._exiting = True
175
176 def run(self):
177 """Run my loop, ignoring EINTR events in the poller"""
178 while True:
179 try:
180 self.ioloop.start()
181 except ZMQError as e:
182 if e.errno == errno.EINTR:
183 continue
184 else:
185 raise
186 except Exception:
187 if self._exiting:
188 break
189 else:
190 raise
191 else:
192 break
193
194 def stop(self):
195 """Stop the channel's event loop and join its thread.
196
197 This calls :meth:`~threading.Thread.join` and returns when the thread
198 terminates. :class:`RuntimeError` will be raised if
199 :meth:`~threading.Thread.start` is called again.
200 """
201 if self.ioloop is not None:
202 self.ioloop.stop()
203 self.join()
204 self.close()
205
206 def close(self):
207 if self.ioloop is not None:
208 try:
209 self.ioloop.close(all_fds=True)
210 except Exception:
211 pass
212
213
214 class QtKernelClient(QtKernelClientMixin, KernelClient):
215 """ A KernelClient that provides signals and slots.
65 """ A KernelClient that provides signals and slots.
216 """
66 """
217
67
218 _ioloop = None
219 @property
220 def ioloop(self):
221 if self._ioloop is None:
222 self._ioloop = ioloop.IOLoop()
223 return self._ioloop
224
225 ioloop_thread = Instance(IOLoopThread)
226
227 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
228 if shell:
229 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
230
231 self.ioloop_thread = IOLoopThread(self.ioloop)
232 self.ioloop_thread.start()
233
234 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
235
236 def _check_kernel_info_reply(self, msg):
237 if msg['msg_type'] == 'kernel_info_reply':
238 self._handle_kernel_info_reply(msg)
239 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
240
241 def stop_channels(self):
242 super(QtKernelClient, self).stop_channels()
243 if self.ioloop_thread.is_alive():
244 self.ioloop_thread.stop()
245
246 iopub_channel_class = Type(QtZMQSocketChannel)
68 iopub_channel_class = Type(QtZMQSocketChannel)
247 shell_channel_class = Type(QtZMQSocketChannel)
69 shell_channel_class = Type(QtZMQSocketChannel)
248 stdin_channel_class = Type(QtZMQSocketChannel)
70 stdin_channel_class = Type(QtZMQSocketChannel)
249 hb_channel_class = Type(QtHBChannel)
71 hb_channel_class = Type(QtHBChannel)
General Comments 0
You need to be logged in to leave comments. Login now