##// END OF EJS Templates
Remove redundant make_*_socket functions
Thomas Kluyver -
Show More
@@ -1,226 +1,203 b''
1 1 """Base classes to manage a Client's interaction with a running kernel"""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from __future__ import absolute_import
7 7
8 8 import atexit
9 9 import errno
10 10 from threading import Thread
11 11 import time
12 12
13 13 import zmq
14 14 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
15 15 # during garbage collection of threads at exit:
16 16 from zmq import ZMQError
17 17
18 18 from IPython.core.release import kernel_protocol_version_info
19 19
20 20 from .channelsabc import HBChannelABC
21 21
22 22 #-----------------------------------------------------------------------------
23 23 # Constants and exceptions
24 24 #-----------------------------------------------------------------------------
25 25
26 26 major_protocol_version = kernel_protocol_version_info[0]
27 27
28 28 class InvalidPortNumber(Exception):
29 29 pass
30 30
31
32 def make_shell_socket(context, identity, address):
33 socket = context.socket(zmq.DEALER)
34 socket.linger = 1000
35 socket.identity = identity
36 socket.connect(address)
37 return socket
38
39 def make_iopub_socket(context, identity, address):
40 socket = context.socket(zmq.SUB)
41 socket.linger = 1000
42 socket.subscribe = b''
43 socket.identity = identity
44 socket.connect(address)
45 return socket
46
47 def make_stdin_socket(context, identity, address):
48 socket = context.socket(zmq.DEALER)
49 socket.linger = 1000
50 socket.identity = identity
51 socket.connect(address)
52 return socket
53
54 31 class HBChannel(Thread):
55 32 """The heartbeat channel which monitors the kernel heartbeat.
56 33
57 34 Note that the heartbeat channel is paused by default. As long as you start
58 35 this channel, the kernel manager will ensure that it is paused and un-paused
59 36 as appropriate.
60 37 """
61 38 context = None
62 39 session = None
63 40 socket = None
64 41 address = None
65 42 _exiting = False
66 43
67 44 time_to_dead = 1.
68 45 poller = None
69 46 _running = None
70 47 _pause = None
71 48 _beating = None
72 49
73 50 def __init__(self, context, session, address):
74 51 """Create the heartbeat monitor thread.
75 52
76 53 Parameters
77 54 ----------
78 55 context : :class:`zmq.Context`
79 56 The ZMQ context to use.
80 57 session : :class:`session.Session`
81 58 The session to use.
82 59 address : zmq url
83 60 Standard (ip, port) tuple that the kernel is listening on.
84 61 """
85 62 super(HBChannel, self).__init__()
86 63 self.daemon = True
87 64
88 65 self.context = context
89 66 self.session = session
90 67 if isinstance(address, tuple):
91 68 if address[1] == 0:
92 69 message = 'The port number for a channel cannot be 0.'
93 70 raise InvalidPortNumber(message)
94 71 address = "tcp://%s:%i" % address
95 72 self.address = address
96 73 atexit.register(self._notice_exit)
97 74
98 75 self._running = False
99 76 self._pause = True
100 77 self.poller = zmq.Poller()
101 78
102 79 def _notice_exit(self):
103 80 self._exiting = True
104 81
105 82 def _create_socket(self):
106 83 if self.socket is not None:
107 84 # close previous socket, before opening a new one
108 85 self.poller.unregister(self.socket)
109 86 self.socket.close()
110 87 self.socket = self.context.socket(zmq.REQ)
111 88 self.socket.linger = 1000
112 89 self.socket.connect(self.address)
113 90
114 91 self.poller.register(self.socket, zmq.POLLIN)
115 92
116 93 def _poll(self, start_time):
117 94 """poll for heartbeat replies until we reach self.time_to_dead.
118 95
119 96 Ignores interrupts, and returns the result of poll(), which
120 97 will be an empty list if no messages arrived before the timeout,
121 98 or the event tuple if there is a message to receive.
122 99 """
123 100
124 101 until_dead = self.time_to_dead - (time.time() - start_time)
125 102 # ensure poll at least once
126 103 until_dead = max(until_dead, 1e-3)
127 104 events = []
128 105 while True:
129 106 try:
130 107 events = self.poller.poll(1000 * until_dead)
131 108 except ZMQError as e:
132 109 if e.errno == errno.EINTR:
133 110 # ignore interrupts during heartbeat
134 111 # this may never actually happen
135 112 until_dead = self.time_to_dead - (time.time() - start_time)
136 113 until_dead = max(until_dead, 1e-3)
137 114 pass
138 115 else:
139 116 raise
140 117 except Exception:
141 118 if self._exiting:
142 119 break
143 120 else:
144 121 raise
145 122 else:
146 123 break
147 124 return events
148 125
149 126 def run(self):
150 127 """The thread's main activity. Call start() instead."""
151 128 self._create_socket()
152 129 self._running = True
153 130 self._beating = True
154 131
155 132 while self._running:
156 133 if self._pause:
157 134 # just sleep, and skip the rest of the loop
158 135 time.sleep(self.time_to_dead)
159 136 continue
160 137
161 138 since_last_heartbeat = 0.0
162 139 # io.rprint('Ping from HB channel') # dbg
163 140 # no need to catch EFSM here, because the previous event was
164 141 # either a recv or connect, which cannot be followed by EFSM
165 142 self.socket.send(b'ping')
166 143 request_time = time.time()
167 144 ready = self._poll(request_time)
168 145 if ready:
169 146 self._beating = True
170 147 # the poll above guarantees we have something to recv
171 148 self.socket.recv()
172 149 # sleep the remainder of the cycle
173 150 remainder = self.time_to_dead - (time.time() - request_time)
174 151 if remainder > 0:
175 152 time.sleep(remainder)
176 153 continue
177 154 else:
178 155 # nothing was received within the time limit, signal heart failure
179 156 self._beating = False
180 157 since_last_heartbeat = time.time() - request_time
181 158 self.call_handlers(since_last_heartbeat)
182 159 # and close/reopen the socket, because the REQ/REP cycle has been broken
183 160 self._create_socket()
184 161 continue
185 162
186 163 def pause(self):
187 164 """Pause the heartbeat."""
188 165 self._pause = True
189 166
190 167 def unpause(self):
191 168 """Unpause the heartbeat."""
192 169 self._pause = False
193 170
194 171 def is_beating(self):
195 172 """Is the heartbeat running and responsive (and not paused)."""
196 173 if self.is_alive() and not self._pause and self._beating:
197 174 return True
198 175 else:
199 176 return False
200 177
201 178 def stop(self):
202 179 """Stop the channel's event loop and join its thread."""
203 180 self._running = False
204 181 self.join()
205 182 self.close()
206 183
207 184 def close(self):
208 185 if self.socket is not None:
209 186 try:
210 187 self.socket.close(linger=0)
211 188 except Exception:
212 189 pass
213 190 self.socket = None
214 191
215 192 def call_handlers(self, since_last_heartbeat):
216 193 """This method is called in the ioloop thread when a message arrives.
217 194
218 195 Subclasses should override this method to handle incoming messages.
219 196 It is important to remember that this method is called in the thread
220 197 so that some logic must be done to ensure that the application level
221 198 handlers are called in the application thread.
222 199 """
223 200 pass
224 201
225 202
226 203 HBChannelABC.register(HBChannel)
@@ -1,389 +1,386 b''
1 1 """Base class to manage the interaction with a running kernel"""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 from __future__ import absolute_import
7 7 from IPython.kernel.channels import major_protocol_version
8 8 from IPython.utils.py3compat import string_types, iteritems
9 9
10 10 import zmq
11 11
12 12 from IPython.utils.traitlets import (
13 13 Any, Instance, Type,
14 14 )
15 15
16 16 from .channelsabc import (ChannelABC, HBChannelABC)
17 from .channels import (
18 make_shell_socket, make_stdin_socket, make_iopub_socket
19 )
20 17 from .clientabc import KernelClientABC
21 18 from .connect import ConnectionFileMixin
22 19
23 20
24 21 # some utilities to validate message structure, these might get moved elsewhere
25 22 # if they prove to have more generic utility
26 23
27 24 def validate_string_dict(dct):
28 25 """Validate that the input is a dict with string keys and values.
29 26
30 27 Raises ValueError if not."""
31 28 for k,v in iteritems(dct):
32 29 if not isinstance(k, string_types):
33 30 raise ValueError('key %r in dict must be a string' % k)
34 31 if not isinstance(v, string_types):
35 32 raise ValueError('value %r in dict must be a string' % v)
36 33
37 34
38 35 class KernelClient(ConnectionFileMixin):
39 36 """Communicates with a single kernel on any host via zmq channels.
40 37
41 38 There are four channels associated with each kernel:
42 39
43 40 * shell: for request/reply calls to the kernel.
44 41 * iopub: for the kernel to publish results to frontends.
45 42 * hb: for monitoring the kernel's heartbeat.
46 43 * stdin: for frontends to reply to raw_input calls in the kernel.
47 44
48 45 The methods of the channels are exposed as methods of the client itself
49 46 (KernelClient.execute, complete, history, etc.).
50 47 See the channels themselves for documentation of these methods.
51 48
52 49 """
53 50
54 51 # The PyZMQ Context to use for communication with the kernel.
55 52 context = Instance(zmq.Context)
56 53 def _context_default(self):
57 54 return zmq.Context.instance()
58 55
59 56 # The classes to use for the various channels
60 57 shell_channel_class = Type(ChannelABC)
61 58 iopub_channel_class = Type(ChannelABC)
62 59 stdin_channel_class = Type(ChannelABC)
63 60 hb_channel_class = Type(HBChannelABC)
64 61
65 62 # Protected traits
66 63 _shell_channel = Any
67 64 _iopub_channel = Any
68 65 _stdin_channel = Any
69 66 _hb_channel = Any
70 67
71 68 # flag for whether execute requests should be allowed to call raw_input:
72 69 allow_stdin = True
73 70
74 71 #--------------------------------------------------------------------------
75 72 # Channel proxy methods
76 73 #--------------------------------------------------------------------------
77 74
78 75 def _get_msg(channel, *args, **kwargs):
79 76 return channel.get_msg(*args, **kwargs)
80 77
81 78 def get_shell_msg(self, *args, **kwargs):
82 79 """Get a message from the shell channel"""
83 80 return self.shell_channel.get_msg(*args, **kwargs)
84 81
85 82 def get_iopub_msg(self, *args, **kwargs):
86 83 """Get a message from the iopub channel"""
87 84 return self.iopub_channel.get_msg(*args, **kwargs)
88 85
89 86 def get_stdin_msg(self, *args, **kwargs):
90 87 """Get a message from the stdin channel"""
91 88 return self.stdin_channel.get_msg(*args, **kwargs)
92 89
93 90 #--------------------------------------------------------------------------
94 91 # Channel management methods
95 92 #--------------------------------------------------------------------------
96 93
97 94 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
98 95 """Starts the channels for this kernel.
99 96
100 97 This will create the channels if they do not exist and then start
101 98 them (their activity runs in a thread). If port numbers of 0 are
102 99 being used (random ports) then you must first call
103 100 :meth:`start_kernel`. If the channels have been stopped and you
104 101 call this, :class:`RuntimeError` will be raised.
105 102 """
106 103 if shell:
107 104 self.shell_channel.start()
108 105 self.kernel_info()
109 106 if iopub:
110 107 self.iopub_channel.start()
111 108 if stdin:
112 109 self.stdin_channel.start()
113 110 self.allow_stdin = True
114 111 else:
115 112 self.allow_stdin = False
116 113 if hb:
117 114 self.hb_channel.start()
118 115
119 116 def stop_channels(self):
120 117 """Stops all the running channels for this kernel.
121 118
122 119 This stops their event loops and joins their threads.
123 120 """
124 121 if self.shell_channel.is_alive():
125 122 self.shell_channel.stop()
126 123 if self.iopub_channel.is_alive():
127 124 self.iopub_channel.stop()
128 125 if self.stdin_channel.is_alive():
129 126 self.stdin_channel.stop()
130 127 if self.hb_channel.is_alive():
131 128 self.hb_channel.stop()
132 129
133 130 @property
134 131 def channels_running(self):
135 132 """Are any of the channels created and running?"""
136 133 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
137 134 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
138 135
139 136 ioloop = None # Overridden in subclasses that use pyzmq event loop
140 137
141 138 @property
142 139 def shell_channel(self):
143 140 """Get the shell channel object for this kernel."""
144 141 if self._shell_channel is None:
145 142 url = self._make_url('shell')
146 143 self.log.debug("connecting shell channel to %s", url)
147 socket = make_shell_socket(self.context, self.session.bsession, url)
144 socket = self.connect_shell(identity=self.session.bsession)
148 145 self._shell_channel = self.shell_channel_class(
149 146 socket, self.session, self.ioloop
150 147 )
151 148 return self._shell_channel
152 149
153 150 @property
154 151 def iopub_channel(self):
155 152 """Get the iopub channel object for this kernel."""
156 153 if self._iopub_channel is None:
157 154 url = self._make_url('iopub')
158 155 self.log.debug("connecting iopub channel to %s", url)
159 socket = make_iopub_socket(self.context, self.session.bsession, url)
156 socket = self.connect_iopub()
160 157 self._iopub_channel = self.iopub_channel_class(
161 158 socket, self.session, self.ioloop
162 159 )
163 160 return self._iopub_channel
164 161
165 162 @property
166 163 def stdin_channel(self):
167 164 """Get the stdin channel object for this kernel."""
168 165 if self._stdin_channel is None:
169 166 url = self._make_url('stdin')
170 167 self.log.debug("connecting stdin channel to %s", url)
171 socket = make_stdin_socket(self.context, self.session.bsession, url)
168 socket = self.connect_stdin(identity=self.session.bsession)
172 169 self._stdin_channel = self.stdin_channel_class(
173 170 socket, self.session, self.ioloop
174 171 )
175 172 return self._stdin_channel
176 173
177 174 @property
178 175 def hb_channel(self):
179 176 """Get the hb channel object for this kernel."""
180 177 if self._hb_channel is None:
181 178 url = self._make_url('hb')
182 179 self.log.debug("connecting heartbeat channel to %s", url)
183 180 self._hb_channel = self.hb_channel_class(
184 181 self.context, self.session, url
185 182 )
186 183 return self._hb_channel
187 184
188 185 def is_alive(self):
189 186 """Is the kernel process still running?"""
190 187 if self._hb_channel is not None:
191 188 # We didn't start the kernel with this KernelManager so we
192 189 # use the heartbeat.
193 190 return self._hb_channel.is_beating()
194 191 else:
195 192 # no heartbeat and not local, we can't tell if it's running,
196 193 # so naively return True
197 194 return True
198 195
199 196
200 197 # Methods to send specific messages on channels
201 198 def execute(self, code, silent=False, store_history=True,
202 199 user_expressions=None, allow_stdin=None):
203 200 """Execute code in the kernel.
204 201
205 202 Parameters
206 203 ----------
207 204 code : str
208 205 A string of Python code.
209 206
210 207 silent : bool, optional (default False)
211 208 If set, the kernel will execute the code as quietly possible, and
212 209 will force store_history to be False.
213 210
214 211 store_history : bool, optional (default True)
215 212 If set, the kernel will store command history. This is forced
216 213 to be False if silent is True.
217 214
218 215 user_expressions : dict, optional
219 216 A dict mapping names to expressions to be evaluated in the user's
220 217 dict. The expression values are returned as strings formatted using
221 218 :func:`repr`.
222 219
223 220 allow_stdin : bool, optional (default self.allow_stdin)
224 221 Flag for whether the kernel can send stdin requests to frontends.
225 222
226 223 Some frontends (e.g. the Notebook) do not support stdin requests.
227 224 If raw_input is called from code executed from such a frontend, a
228 225 StdinNotImplementedError will be raised.
229 226
230 227 Returns
231 228 -------
232 229 The msg_id of the message sent.
233 230 """
234 231 if user_expressions is None:
235 232 user_expressions = {}
236 233 if allow_stdin is None:
237 234 allow_stdin = self.allow_stdin
238 235
239 236
240 237 # Don't waste network traffic if inputs are invalid
241 238 if not isinstance(code, string_types):
242 239 raise ValueError('code %r must be a string' % code)
243 240 validate_string_dict(user_expressions)
244 241
245 242 # Create class for content/msg creation. Related to, but possibly
246 243 # not in Session.
247 244 content = dict(code=code, silent=silent, store_history=store_history,
248 245 user_expressions=user_expressions,
249 246 allow_stdin=allow_stdin,
250 247 )
251 248 msg = self.session.msg('execute_request', content)
252 249 self.shell_channel.send(msg)
253 250 return msg['header']['msg_id']
254 251
255 252 def complete(self, code, cursor_pos=None):
256 253 """Tab complete text in the kernel's namespace.
257 254
258 255 Parameters
259 256 ----------
260 257 code : str
261 258 The context in which completion is requested.
262 259 Can be anything between a variable name and an entire cell.
263 260 cursor_pos : int, optional
264 261 The position of the cursor in the block of code where the completion was requested.
265 262 Default: ``len(code)``
266 263
267 264 Returns
268 265 -------
269 266 The msg_id of the message sent.
270 267 """
271 268 if cursor_pos is None:
272 269 cursor_pos = len(code)
273 270 content = dict(code=code, cursor_pos=cursor_pos)
274 271 msg = self.session.msg('complete_request', content)
275 272 self.shell_channel.send(msg)
276 273 return msg['header']['msg_id']
277 274
278 275 def inspect(self, code, cursor_pos=None, detail_level=0):
279 276 """Get metadata information about an object in the kernel's namespace.
280 277
281 278 It is up to the kernel to determine the appropriate object to inspect.
282 279
283 280 Parameters
284 281 ----------
285 282 code : str
286 283 The context in which info is requested.
287 284 Can be anything between a variable name and an entire cell.
288 285 cursor_pos : int, optional
289 286 The position of the cursor in the block of code where the info was requested.
290 287 Default: ``len(code)``
291 288 detail_level : int, optional
292 289 The level of detail for the introspection (0-2)
293 290
294 291 Returns
295 292 -------
296 293 The msg_id of the message sent.
297 294 """
298 295 if cursor_pos is None:
299 296 cursor_pos = len(code)
300 297 content = dict(code=code, cursor_pos=cursor_pos,
301 298 detail_level=detail_level,
302 299 )
303 300 msg = self.session.msg('inspect_request', content)
304 301 self.shell_channel.send(msg)
305 302 return msg['header']['msg_id']
306 303
307 304 def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
308 305 """Get entries from the kernel's history list.
309 306
310 307 Parameters
311 308 ----------
312 309 raw : bool
313 310 If True, return the raw input.
314 311 output : bool
315 312 If True, then return the output as well.
316 313 hist_access_type : str
317 314 'range' (fill in session, start and stop params), 'tail' (fill in n)
318 315 or 'search' (fill in pattern param).
319 316
320 317 session : int
321 318 For a range request, the session from which to get lines. Session
322 319 numbers are positive integers; negative ones count back from the
323 320 current session.
324 321 start : int
325 322 The first line number of a history range.
326 323 stop : int
327 324 The final (excluded) line number of a history range.
328 325
329 326 n : int
330 327 The number of lines of history to get for a tail request.
331 328
332 329 pattern : str
333 330 The glob-syntax pattern for a search request.
334 331
335 332 Returns
336 333 -------
337 334 The msg_id of the message sent.
338 335 """
339 336 content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
340 337 **kwargs)
341 338 msg = self.session.msg('history_request', content)
342 339 self.shell_channel.send(msg)
343 340 return msg['header']['msg_id']
344 341
345 342 def kernel_info(self):
346 343 """Request kernel info."""
347 344 msg = self.session.msg('kernel_info_request')
348 345 self.shell_channel.send(msg)
349 346 return msg['header']['msg_id']
350 347
351 348 def _handle_kernel_info_reply(self, msg):
352 349 """handle kernel info reply
353 350
354 351 sets protocol adaptation version
355 352 """
356 353 adapt_version = int(msg['content']['protocol_version'].split('.')[0])
357 354 if adapt_version != major_protocol_version:
358 355 self.session.adapt_version = adapt_version
359 356
360 357 def shutdown(self, restart=False):
361 358 """Request an immediate kernel shutdown.
362 359
363 360 Upon receipt of the (empty) reply, client code can safely assume that
364 361 the kernel has shut down and it's safe to forcefully terminate it if
365 362 it's still alive.
366 363
367 364 The kernel will send the reply via a function registered with Python's
368 365 atexit module, ensuring it's truly done as the kernel is done with all
369 366 normal operation.
370 367 """
371 368 # Send quit message to kernel. Once we implement kernel-side setattr,
372 369 # this should probably be done that way, but for now this will do.
373 370 msg = self.session.msg('shutdown_request', {'restart':restart})
374 371 self.shell_channel.send(msg)
375 372 return msg['header']['msg_id']
376 373
377 374 def is_complete(self, code):
378 375 msg = self.session.msg('is_complete_request', {'code': code})
379 376 self.shell_channel.send(msg)
380 377 return msg['header']['msg_id']
381 378
382 379 def input(self, string):
383 380 """Send a string of raw input to the kernel."""
384 381 content = dict(value=string)
385 382 msg = self.session.msg('input_reply', content)
386 383 self.stdin_channel.send(msg)
387 384
388 385
389 386 KernelClientABC.register(KernelClient)
@@ -1,250 +1,249 b''
1 1 """ Defines a KernelClient that provides signals and slots.
2 2 """
3 3 import atexit
4 4 import errno
5 5 from threading import Thread
6 6 import time
7 7
8 8 import zmq
9 9 # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
10 10 # during garbage collection of threads at exit:
11 11 from zmq import ZMQError
12 12 from zmq.eventloop import ioloop, zmqstream
13 13
14 14 from IPython.external.qt import QtCore
15 15
16 16 # Local imports
17 17 from IPython.utils.traitlets import Type, Instance
18 from IPython.kernel.channels import HBChannel,\
19 make_shell_socket, make_iopub_socket, make_stdin_socket
18 from IPython.kernel.channels import HBChannel
20 19 from IPython.kernel import KernelClient
21 20
22 21 from .kernel_mixins import QtKernelClientMixin
23 22 from .util import SuperQObject
24 23
25 24 class QtHBChannel(SuperQObject, HBChannel):
26 25 # A longer timeout than the base class
27 26 time_to_dead = 3.0
28 27
29 28 # Emitted when the kernel has died.
30 29 kernel_died = QtCore.Signal(object)
31 30
32 31 def call_handlers(self, since_last_heartbeat):
33 32 """ Reimplemented to emit signals instead of making callbacks.
34 33 """
35 34 # Emit the generic signal.
36 35 self.kernel_died.emit(since_last_heartbeat)
37 36
38 37 from IPython.core.release import kernel_protocol_version_info
39 38
40 39 major_protocol_version = kernel_protocol_version_info[0]
41 40
42 41 class InvalidPortNumber(Exception):
43 42 pass
44 43
45 44
46 45 class QtZMQSocketChannel(SuperQObject):
47 46 """A ZMQ socket emitting a Qt signal when a message is received."""
48 47 session = None
49 48 socket = None
50 49 ioloop = None
51 50 stream = None
52 51
53 52 message_received = QtCore.Signal(object)
54 53
55 54 def process_events(self):
56 55 """ Process any pending GUI events.
57 56 """
58 57 QtCore.QCoreApplication.instance().processEvents()
59 58
60 59 def __init__(self, socket, session, loop):
61 60 """Create a channel.
62 61
63 62 Parameters
64 63 ----------
65 64 socket : :class:`zmq.Socket`
66 65 The ZMQ socket to use.
67 66 session : :class:`session.Session`
68 67 The session to use.
69 68 loop
70 69 A pyzmq ioloop to connect the socket to using a ZMQStream
71 70 """
72 71 super(QtZMQSocketChannel, self).__init__()
73 72
74 73 self.socket = socket
75 74 self.session = session
76 75 self.ioloop = loop
77 76
78 77 self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
79 78 self.stream.on_recv(self._handle_recv)
80 79
81 80 _is_alive = False
82 81 def is_alive(self):
83 82 return self._is_alive
84 83
85 84 def start(self):
86 85 self._is_alive = True
87 86
88 87 def stop(self):
89 88 self._is_alive = False
90 89
91 90 def close(self):
92 91 if self.socket is not None:
93 92 try:
94 93 self.socket.close(linger=0)
95 94 except Exception:
96 95 pass
97 96 self.socket = None
98 97
99 98 def send(self, msg):
100 99 """Queue a message to be sent from the IOLoop's thread.
101 100
102 101 Parameters
103 102 ----------
104 103 msg : message to send
105 104
106 105 This is threadsafe, as it uses IOLoop.add_callback to give the loop's
107 106 thread control of the action.
108 107 """
109 108 def thread_send():
110 109 self.session.send(self.stream, msg)
111 110 self.ioloop.add_callback(thread_send)
112 111
113 112 def _handle_recv(self, msg):
114 113 """Callback for stream.on_recv.
115 114
116 115 Unpacks message, and calls handlers with it.
117 116 """
118 117 ident,smsg = self.session.feed_identities(msg)
119 118 msg = self.session.deserialize(smsg)
120 119 self.call_handlers(msg)
121 120
122 121 def call_handlers(self, msg):
123 122 """This method is called in the ioloop thread when a message arrives.
124 123
125 124 Subclasses should override this method to handle incoming messages.
126 125 It is important to remember that this method is called in the thread
127 126 so that some logic must be done to ensure that the application level
128 127 handlers are called in the application thread.
129 128 """
130 129 # Emit the generic signal.
131 130 self.message_received.emit(msg)
132 131
133 132 def flush(self, timeout=1.0):
134 133 """Immediately processes all pending messages on this channel.
135 134
136 135 This is only used for the IOPub channel.
137 136
138 137 Callers should use this method to ensure that :meth:`call_handlers`
139 138 has been called for all messages that have been received on the
140 139 0MQ SUB socket of this channel.
141 140
142 141 This method is thread safe.
143 142
144 143 Parameters
145 144 ----------
146 145 timeout : float, optional
147 146 The maximum amount of time to spend flushing, in seconds. The
148 147 default is one second.
149 148 """
150 149 # We do the IOLoop callback process twice to ensure that the IOLoop
151 150 # gets to perform at least one full poll.
152 151 stop_time = time.time() + timeout
153 152 for i in range(2):
154 153 self._flushed = False
155 154 self.ioloop.add_callback(self._flush)
156 155 while not self._flushed and time.time() < stop_time:
157 156 time.sleep(0.01)
158 157
159 158 def _flush(self):
160 159 """Callback for :method:`self.flush`."""
161 160 self.stream.flush()
162 161 self._flushed = True
163 162
164 163
165 164 class IOLoopThread(Thread):
166 165 """Run a pyzmq ioloop in a thread to send and receive messages
167 166 """
168 167 def __init__(self, loop):
169 168 super(IOLoopThread, self).__init__()
170 169 self.daemon = True
171 170 atexit.register(self._notice_exit)
172 171 self.ioloop = loop or ioloop.IOLoop()
173 172
174 173 def _notice_exit(self):
175 174 self._exiting = True
176 175
177 176 def run(self):
178 177 """Run my loop, ignoring EINTR events in the poller"""
179 178 while True:
180 179 try:
181 180 self.ioloop.start()
182 181 except ZMQError as e:
183 182 if e.errno == errno.EINTR:
184 183 continue
185 184 else:
186 185 raise
187 186 except Exception:
188 187 if self._exiting:
189 188 break
190 189 else:
191 190 raise
192 191 else:
193 192 break
194 193
195 194 def stop(self):
196 195 """Stop the channel's event loop and join its thread.
197 196
198 197 This calls :meth:`~threading.Thread.join` and returns when the thread
199 198 terminates. :class:`RuntimeError` will be raised if
200 199 :meth:`~threading.Thread.start` is called again.
201 200 """
202 201 if self.ioloop is not None:
203 202 self.ioloop.stop()
204 203 self.join()
205 204 self.close()
206 205
207 206 def close(self):
208 207 if self.ioloop is not None:
209 208 try:
210 209 self.ioloop.close(all_fds=True)
211 210 except Exception:
212 211 pass
213 212
214 213
215 214 class QtKernelClient(QtKernelClientMixin, KernelClient):
216 215 """ A KernelClient that provides signals and slots.
217 216 """
218 217
219 218 _ioloop = None
220 219 @property
221 220 def ioloop(self):
222 221 if self._ioloop is None:
223 222 self._ioloop = ioloop.IOLoop()
224 223 return self._ioloop
225 224
226 225 ioloop_thread = Instance(IOLoopThread)
227 226
228 227 def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
229 228 if shell:
230 229 self.shell_channel.message_received.connect(self._check_kernel_info_reply)
231 230
232 231 self.ioloop_thread = IOLoopThread(self.ioloop)
233 232 self.ioloop_thread.start()
234 233
235 234 super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
236 235
237 236 def _check_kernel_info_reply(self, msg):
238 237 if msg['msg_type'] == 'kernel_info_reply':
239 238 self._handle_kernel_info_reply(msg)
240 239 self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
241 240
242 241 def stop_channels(self):
243 242 super(QtKernelClient, self).stop_channels()
244 243 if self.ioloop_thread.is_alive():
245 244 self.ioloop_thread.stop()
246 245
247 246 iopub_channel_class = Type(QtZMQSocketChannel)
248 247 shell_channel_class = Type(QtZMQSocketChannel)
249 248 stdin_channel_class = Type(QtZMQSocketChannel)
250 249 hb_channel_class = Type(QtHBChannel)
General Comments 0
You need to be logged in to leave comments. Login now