##// END OF EJS Templates
Cleanup naming and organization of channels....
Brian Granger -
Show More
@@ -30,7 +30,7 b' class BaseFrontendMixin(object):'
30 30 old_manager.stopped_channels.disconnect(self._stopped_channels)
31 31
32 32 # Disconnect the old kernel manager's channels.
33 old_manager.sub_channel.message_received.disconnect(self._dispatch)
33 old_manager.iopub_channel.message_received.disconnect(self._dispatch)
34 34 old_manager.shell_channel.message_received.disconnect(self._dispatch)
35 35 old_manager.stdin_channel.message_received.disconnect(self._dispatch)
36 36 old_manager.hb_channel.kernel_died.disconnect(
@@ -51,7 +51,7 b' class BaseFrontendMixin(object):'
51 51 kernel_manager.stopped_channels.connect(self._stopped_channels)
52 52
53 53 # Connect the new kernel manager's channels.
54 kernel_manager.sub_channel.message_received.connect(self._dispatch)
54 kernel_manager.iopub_channel.message_received.connect(self._dispatch)
55 55 kernel_manager.shell_channel.message_received.connect(self._dispatch)
56 56 kernel_manager.stdin_channel.message_received.connect(self._dispatch)
57 57 kernel_manager.hb_channel.kernel_died.connect(self._handle_kernel_died)
@@ -69,7 +69,7 b' class QtShellChannelMixin(ChannelQObject):'
69 69 _handlers_called = False
70 70
71 71 #---------------------------------------------------------------------------
72 # 'ShellSocketChannel' interface
72 # 'ShellChannel' interface
73 73 #---------------------------------------------------------------------------
74 74
75 75 def call_handlers(self, msg):
@@ -98,7 +98,7 b' class QtShellChannelMixin(ChannelQObject):'
98 98 self._handlers_called = False
99 99
100 100
101 class QtSubChannelMixin(ChannelQObject):
101 class QtIOPubChannelMixin(ChannelQObject):
102 102
103 103 # Emitted when any message is received.
104 104 message_received = QtCore.Signal(object)
@@ -126,7 +126,7 b' class QtSubChannelMixin(ChannelQObject):'
126 126 shutdown_reply_received = QtCore.Signal(object)
127 127
128 128 #---------------------------------------------------------------------------
129 # 'SubSocketChannel' interface
129 # 'IOPubChannel' interface
130 130 #---------------------------------------------------------------------------
131 131
132 132 def call_handlers(self, msg):
@@ -145,7 +145,7 b' class QtSubChannelMixin(ChannelQObject):'
145 145 def flush(self):
146 146 """ Reimplemented to ensure that signals are dispatched immediately.
147 147 """
148 super(QtSubChannelMixin, self).flush()
148 super(QtIOPubChannelMixin, self).flush()
149 149 QtCore.QCoreApplication.instance().processEvents()
150 150
151 151
@@ -158,7 +158,7 b' class QtStdInChannelMixin(ChannelQObject):'
158 158 input_requested = QtCore.Signal(object)
159 159
160 160 #---------------------------------------------------------------------------
161 # 'StdInSocketChannel' interface
161 # 'StdInChannel' interface
162 162 #---------------------------------------------------------------------------
163 163
164 164 def call_handlers(self, msg):
@@ -179,7 +179,7 b' class QtHBChannelMixin(ChannelQObject):'
179 179 kernel_died = QtCore.Signal(object)
180 180
181 181 #---------------------------------------------------------------------------
182 # 'HBSocketChannel' interface
182 # 'HBChannel' interface
183 183 #---------------------------------------------------------------------------
184 184
185 185 def call_handlers(self, since_last_heartbeat):
@@ -205,7 +205,7 b' class QtKernelManagerMixin(HasTraits, SuperQObject):'
205 205 stopped_channels = QtCore.Signal()
206 206
207 207 # Use Qt-specific channel classes that emit signals.
208 sub_channel_class = Type(QtSubChannelMixin)
208 iopub_channel_class = Type(QtIOPubChannelMixin)
209 209 shell_channel_class = Type(QtShellChannelMixin)
210 210 stdin_channel_class = Type(QtStdInChannelMixin)
211 211 hb_channel_class = Type(QtHBChannelMixin)
@@ -396,7 +396,7 b' class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin):'
396 396 if info and info.kind == 'user' and not self._hidden:
397 397 # Make sure that all output from the SUB channel has been processed
398 398 # before writing a new prompt.
399 self.kernel_manager.sub_channel.flush()
399 self.kernel_manager.iopub_channel.flush()
400 400
401 401 # Reset the ANSI style information to prevent bad text in stdout
402 402 # from messing up our colors. We're not a true terminal so we're
@@ -431,7 +431,7 b' class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin):'
431 431
432 432 # Make sure that all output from the SUB channel has been processed
433 433 # before entering readline mode.
434 self.kernel_manager.sub_channel.flush()
434 self.kernel_manager.iopub_channel.flush()
435 435
436 436 def callback(line):
437 437 self.kernel_manager.stdin_channel.input(line)
@@ -3,23 +3,23 b''
3 3
4 4 # Local imports.
5 5 from IPython.inprocess.kernelmanager import \
6 ShellInProcessChannel, SubInProcessChannel, StdInInProcessChannel, \
7 HBInProcessChannel, InProcessKernelManager
6 InProcessShellChannel, InProcessIOPubChannel, InProcessStdInChannel, \
7 InProcessHBChannel, InProcessKernelManager
8 8 from IPython.utils.traitlets import Type
9 from base_kernelmanager import QtShellChannelMixin, QtSubChannelMixin, \
9 from base_kernelmanager import QtShellChannelMixin, QtIOPubChannelMixin, \
10 10 QtStdInChannelMixin, QtHBChannelMixin, QtKernelManagerMixin
11 11
12 12
13 class QtShellInProcessChannel(QtShellChannelMixin, ShellInProcessChannel):
13 class QtInProcessShellChannel(QtShellChannelMixin, InProcessShellChannel):
14 14 pass
15 15
16 class QtSubInProcessChannel(QtSubChannelMixin, SubInProcessChannel):
16 class QtInProcessIOPubChannel(QtIOPubChannelMixin, InProcessIOPubChannel):
17 17 pass
18 18
19 class QtStdInInProcessChannel(QtStdInChannelMixin, StdInInProcessChannel):
19 class QtInProcessStdInChannel(QtStdInChannelMixin, InProcessStdInChannel):
20 20 pass
21 21
22 class QtHBInProcessChannel(QtHBChannelMixin, HBInProcessChannel):
22 class QtInProcessHBChannel(QtHBChannelMixin, InProcessHBChannel):
23 23 pass
24 24
25 25
@@ -27,7 +27,7 b' class QtInProcessKernelManager(QtKernelManagerMixin, InProcessKernelManager):'
27 27 """ An in-process KernelManager with signals and slots.
28 28 """
29 29
30 sub_channel_class = Type(QtSubInProcessChannel)
31 shell_channel_class = Type(QtShellInProcessChannel)
32 stdin_channel_class = Type(QtStdInInProcessChannel)
33 hb_channel_class = Type(QtHBInProcessChannel)
30 iopub_channel_class = Type(QtInProcessIOPubChannel)
31 shell_channel_class = Type(QtInProcessShellChannel)
32 stdin_channel_class = Type(QtInProcessStdInChannel)
33 hb_channel_class = Type(QtInProcessHBChannel)
@@ -3,22 +3,22 b''
3 3
4 4 # Local imports.
5 5 from IPython.utils.traitlets import Type
6 from IPython.zmq.kernelmanager import ShellSocketChannel, SubSocketChannel, \
7 StdInSocketChannel, HBSocketChannel, KernelManager
8 from base_kernelmanager import QtShellChannelMixin, QtSubChannelMixin, \
6 from IPython.zmq.kernelmanager import ShellChannel, IOPubChannel, \
7 StdInChannel, HBChannel, KernelManager
8 from base_kernelmanager import QtShellChannelMixin, QtIOPubChannelMixin, \
9 9 QtStdInChannelMixin, QtHBChannelMixin, QtKernelManagerMixin
10 10
11 11
12 class QtShellSocketChannel(QtShellChannelMixin, ShellSocketChannel):
12 class QtShellChannel(QtShellChannelMixin, ShellChannel):
13 13 pass
14 14
15 class QtSubSocketChannel(QtSubChannelMixin, SubSocketChannel):
15 class QtIOPubChannel(QtIOPubChannelMixin, IOPubChannel):
16 16 pass
17 17
18 class QtStdInSocketChannel(QtStdInChannelMixin, StdInSocketChannel):
18 class QtStdInChannel(QtStdInChannelMixin, StdInChannel):
19 19 pass
20 20
21 class QtHBSocketChannel(QtHBChannelMixin, HBSocketChannel):
21 class QtHBChannel(QtHBChannelMixin, HBChannel):
22 22 pass
23 23
24 24
@@ -26,7 +26,7 b' class QtKernelManager(QtKernelManagerMixin, KernelManager):'
26 26 """ A KernelManager that provides signals and slots.
27 27 """
28 28
29 sub_channel_class = Type(QtSubSocketChannel)
30 shell_channel_class = Type(QtShellSocketChannel)
31 stdin_channel_class = Type(QtStdInSocketChannel)
32 hb_channel_class = Type(QtHBSocketChannel)
29 iopub_channel_class = Type(QtIOPubChannel)
30 shell_channel_class = Type(QtShellChannel)
31 stdin_channel_class = Type(QtStdInChannel)
32 hb_channel_class = Type(QtHBChannel)
@@ -211,8 +211,8 b' class ZMQTerminalInteractiveShell(TerminalInteractiveShell):'
211 211 sub_msg: message receive from kernel in the sub socket channel
212 212 capture by kernel manager.
213 213 """
214 while self.km.sub_channel.msg_ready():
215 sub_msg = self.km.sub_channel.get_msg()
214 while self.km.iopub_channel.msg_ready():
215 sub_msg = self.km.iopub_channel.get_msg()
216 216 msg_type = sub_msg['header']['msg_type']
217 217 parent = sub_msg["parent_header"]
218 218 if (not parent) or self.session_id == parent['session']:
@@ -21,51 +21,22 b' from threading import Event'
21 21 # Local imports.
22 22 from IPython.utils.io import raw_print
23 23 from IPython.utils.traitlets import Type
24 from kernelmanager import InProcessKernelManager, ShellInProcessChannel, \
25 SubInProcessChannel, StdInInProcessChannel
24 from kernelmanager import InProcessKernelManager, InProcessShellChannel, \
25 InProcessIOPubChannel, InProcessStdInChannel
26 from IPython.zmq.blockingkernelmanager import BlockingChannelMixin
26 27
27 #-----------------------------------------------------------------------------
28 # Utility classes
29 #-----------------------------------------------------------------------------
30
31 class BlockingChannelMixin(object):
32
33 def __init__(self, *args, **kwds):
34 super(BlockingChannelMixin, self).__init__(*args, **kwds)
35 self._in_queue = Queue.Queue()
36
37 def call_handlers(self, msg):
38 self._in_queue.put(msg)
39
40 def get_msg(self, block=True, timeout=None):
41 """ Gets a message if there is one that is ready. """
42 return self._in_queue.get(block, timeout)
43
44 def get_msgs(self):
45 """ Get all messages that are currently ready. """
46 msgs = []
47 while True:
48 try:
49 msgs.append(self.get_msg(block=False))
50 except Queue.Empty:
51 break
52 return msgs
53
54 def msg_ready(self):
55 """ Is there a message that has been received? """
56 return not self._in_queue.empty()
57 28
58 29 #-----------------------------------------------------------------------------
59 30 # Blocking kernel manager
60 31 #-----------------------------------------------------------------------------
61 32
62 class BlockingShellInProcessChannel(BlockingChannelMixin, ShellInProcessChannel):
33 class BlockingInProcessShellChannel(BlockingChannelMixin, InProcessShellChannel):
63 34 pass
64 35
65 class BlockingSubInProcessChannel(BlockingChannelMixin, SubInProcessChannel):
36 class BlockingInProcessIOPubChannel(BlockingChannelMixin, InProcessIOPubChannel):
66 37 pass
67 38
68 class BlockingStdInInProcessChannel(BlockingChannelMixin, StdInInProcessChannel):
39 class BlockingInProcessStdInChannel(BlockingChannelMixin, InProcessStdInChannel):
69 40
70 41 def call_handlers(self, msg):
71 42 """ Overridden for the in-process channel.
@@ -82,6 +53,6 b' class BlockingStdInInProcessChannel(BlockingChannelMixin, StdInInProcessChannel)'
82 53 class BlockingInProcessKernelManager(InProcessKernelManager):
83 54
84 55 # The classes to use for the various channels.
85 shell_channel_class = Type(BlockingShellInProcessChannel)
86 sub_channel_class = Type(BlockingSubInProcessChannel)
87 stdin_channel_class = Type(BlockingStdInInProcessChannel)
56 shell_channel_class = Type(BlockingInProcessShellChannel)
57 iopub_channel_class = Type(BlockingInProcessIOPubChannel)
58 stdin_channel_class = Type(BlockingInProcessStdInChannel)
@@ -123,7 +123,7 b' class InProcessKernel(Kernel):'
123 123 """
124 124 ident, msg = self.session.recv(self.iopub_socket, copy=False)
125 125 for frontend in self.frontends:
126 frontend.sub_channel.call_handlers(msg)
126 frontend.iopub_channel.call_handlers(msg)
127 127
128 128 #------ Trait initializers -----------------------------------------------
129 129
@@ -71,7 +71,7 b' class InProcessChannel(object):'
71 71 raise NotImplementedError
72 72
73 73
74 class ShellInProcessChannel(InProcessChannel):
74 class InProcessShellChannel(InProcessChannel):
75 75 """The DEALER channel for issues request/replies to the kernel.
76 76 """
77 77
@@ -240,7 +240,7 b' class ShellInProcessChannel(InProcessChannel):'
240 240 self.call_handlers_later(reply_msg)
241 241
242 242
243 class SubInProcessChannel(InProcessChannel):
243 class InProcessIOPubChannel(InProcessChannel):
244 244 """The SUB channel which listens for messages that the kernel publishes.
245 245 """
246 246
@@ -252,7 +252,7 b' class SubInProcessChannel(InProcessChannel):'
252 252 pass
253 253
254 254
255 class StdInInProcessChannel(InProcessChannel):
255 class InProcessStdInChannel(InProcessChannel):
256 256 """ A reply channel to handle raw_input requests that the kernel makes. """
257 257
258 258 def input(self, string):
@@ -264,13 +264,13 b' class StdInInProcessChannel(InProcessChannel):'
264 264 kernel.raw_input_str = string
265 265
266 266
267 class HBInProcessChannel(InProcessChannel):
267 class InProcessHBChannel(InProcessChannel):
268 268 """ A dummy heartbeat channel. """
269 269
270 270 time_to_dead = 3.0
271 271
272 272 def __init__(self, *args, **kwds):
273 super(HBInProcessChannel, self).__init__(*args, **kwds)
273 super(InProcessHBChannel, self).__init__(*args, **kwds)
274 274 self._pause = True
275 275
276 276 def pause(self):
@@ -310,14 +310,14 b' class InProcessKernelManager(HasTraits):'
310 310 kernel = Instance('IPython.inprocess.ipkernel.InProcessKernel')
311 311
312 312 # The classes to use for the various channels.
313 shell_channel_class = Type(ShellInProcessChannel)
314 sub_channel_class = Type(SubInProcessChannel)
315 stdin_channel_class = Type(StdInInProcessChannel)
316 hb_channel_class = Type(HBInProcessChannel)
313 shell_channel_class = Type(InProcessShellChannel)
314 iopub_channel_class = Type(InProcessIOPubChannel)
315 stdin_channel_class = Type(InProcessStdInChannel)
316 hb_channel_class = Type(InProcessHBChannel)
317 317
318 318 # Protected traits.
319 319 _shell_channel = Any
320 _sub_channel = Any
320 _iopub_channel = Any
321 321 _stdin_channel = Any
322 322 _hb_channel = Any
323 323
@@ -331,7 +331,7 b' class InProcessKernelManager(HasTraits):'
331 331 if shell:
332 332 self.shell_channel.start()
333 333 if sub:
334 self.sub_channel.start()
334 self.iopub_channel.start()
335 335 if stdin:
336 336 self.stdin_channel.start()
337 337 self.shell_channel.allow_stdin = True
@@ -345,8 +345,8 b' class InProcessKernelManager(HasTraits):'
345 345 """
346 346 if self.shell_channel.is_alive():
347 347 self.shell_channel.stop()
348 if self.sub_channel.is_alive():
349 self.sub_channel.stop()
348 if self.iopub_channel.is_alive():
349 self.iopub_channel.stop()
350 350 if self.stdin_channel.is_alive():
351 351 self.stdin_channel.stop()
352 352 if self.hb_channel.is_alive():
@@ -355,7 +355,7 b' class InProcessKernelManager(HasTraits):'
355 355 @property
356 356 def channels_running(self):
357 357 """ Are any of the channels created and running? """
358 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
358 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
359 359 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
360 360
361 361 #--------------------------------------------------------------------------
@@ -421,11 +421,11 b' class InProcessKernelManager(HasTraits):'
421 421 return self._shell_channel
422 422
423 423 @property
424 def sub_channel(self):
424 def iopub_channel(self):
425 425 """Get the SUB socket channel object."""
426 if self._sub_channel is None:
427 self._sub_channel = self.sub_channel_class(self)
428 return self._sub_channel
426 if self._iopub_channel is None:
427 self._iopub_channel = self.iopub_channel_class(self)
428 return self._iopub_channel
429 429
430 430 @property
431 431 def stdin_channel(self):
@@ -80,7 +80,7 b' def get_stream_message(kernel_manager, timeout=5):'
80 80 """ Gets a single stream message synchronously from the sub channel.
81 81 """
82 82 while True:
83 msg = kernel_manager.sub_channel.get_msg(timeout=timeout)
83 msg = kernel_manager.iopub_channel.get_msg(timeout=timeout)
84 84 if msg['header']['msg_type'] == 'stream':
85 85 return msg
86 86
@@ -13,26 +13,56 b' Useful for test suites and blocking terminal interfaces.'
13 13 # Imports
14 14 #-----------------------------------------------------------------------------
15 15
16 # Local imports.
17 from IPython.inprocess.blockingkernelmanager import BlockingChannelMixin
18 16 from IPython.utils.traitlets import Type
19 from kernelmanager import KernelManager, SubSocketChannel, HBSocketChannel, \
20 ShellSocketChannel, StdInSocketChannel
17 from kernelmanager import KernelManager, IOPubChannel, HBChannel, \
18 ShellChannel, StdInChannel
21 19
22 20 #-----------------------------------------------------------------------------
23 21 # Blocking kernel manager
24 22 #-----------------------------------------------------------------------------
25 23
26 class BlockingSubSocketChannel(BlockingChannelMixin, SubSocketChannel):
24
25 class BlockingChannelMixin(object):
26
27 def __init__(self, *args, **kwds):
28 super(BlockingChannelMixin, self).__init__(*args, **kwds)
29 self._in_queue = Queue.Queue()
30
31 def call_handlers(self, msg):
32 self._in_queue.put(msg)
33
34 def get_msg(self, block=True, timeout=None):
35 """ Gets a message if there is one that is ready. """
36 return self._in_queue.get(block, timeout)
37
38 def get_msgs(self):
39 """ Get all messages that are currently ready. """
40 msgs = []
41 while True:
42 try:
43 msgs.append(self.get_msg(block=False))
44 except Queue.Empty:
45 break
46 return msgs
47
48 def msg_ready(self):
49 """ Is there a message that has been received? """
50 return not self._in_queue.empty()
51
52
53 class BlockingIOPubChannel(BlockingChannelMixin, IOPubChannel):
27 54 pass
28 55
29 class BlockingShellSocketChannel(BlockingChannelMixin, ShellSocketChannel):
56
57 class BlockingShellChannel(BlockingChannelMixin, ShellChannel):
30 58 pass
31 59
32 class BlockingStdInSocketChannel(BlockingChannelMixin, StdInSocketChannel):
60
61 class BlockingStdInChannel(BlockingChannelMixin, StdInChannel):
33 62 pass
34 63
35 class BlockingHBSocketChannel(HBSocketChannel):
64
65 class BlockingHBChannel(HBChannel):
36 66
37 67 # This kernel needs quicker monitoring, shorten to 1 sec.
38 68 # less than 0.5s is unreliable, and will get occasional
@@ -43,11 +73,12 b' class BlockingHBSocketChannel(HBSocketChannel):'
43 73 """ Pause beating on missed heartbeat. """
44 74 pass
45 75
76
46 77 class BlockingKernelManager(KernelManager):
47 78
48 79 # The classes to use for the various channels.
49 shell_channel_class = Type(BlockingShellSocketChannel)
50 sub_channel_class = Type(BlockingSubSocketChannel)
51 stdin_channel_class = Type(BlockingStdInSocketChannel)
52 hb_channel_class = Type(BlockingHBSocketChannel)
80 shell_channel_class = Type(BlockingShellChannel)
81 iopub_channel_class = Type(BlockingIOPubChannel)
82 stdin_channel_class = Type(BlockingStdInChannel)
83 hb_channel_class = Type(BlockingHBChannel)
53 84
@@ -180,7 +180,7 b' class ZMQSocketChannel(Thread):'
180 180
181 181
182 182
183 class ShellSocketChannel(ZMQSocketChannel):
183 class ShellChannel(ZMQSocketChannel):
184 184 """The DEALER channel for issues request/replies to the kernel.
185 185 """
186 186
@@ -189,7 +189,7 b' class ShellSocketChannel(ZMQSocketChannel):'
189 189 allow_stdin = True
190 190
191 191 def __init__(self, context, session, address):
192 super(ShellSocketChannel, self).__init__(context, session, address)
192 super(ShellChannel, self).__init__(context, session, address)
193 193 self.ioloop = ioloop.IOLoop()
194 194
195 195 def run(self):
@@ -207,7 +207,7 b' class ShellSocketChannel(ZMQSocketChannel):'
207 207
208 208 def stop(self):
209 209 self.ioloop.stop()
210 super(ShellSocketChannel, self).stop()
210 super(ShellChannel, self).stop()
211 211
212 212 def call_handlers(self, msg):
213 213 """This method is called in the ioloop thread when a message arrives.
@@ -389,12 +389,12 b' class ShellSocketChannel(ZMQSocketChannel):'
389 389
390 390
391 391
392 class SubSocketChannel(ZMQSocketChannel):
392 class IOPubChannel(ZMQSocketChannel):
393 393 """The SUB channel which listens for messages that the kernel publishes.
394 394 """
395 395
396 396 def __init__(self, context, session, address):
397 super(SubSocketChannel, self).__init__(context, session, address)
397 super(IOPubChannel, self).__init__(context, session, address)
398 398 self.ioloop = ioloop.IOLoop()
399 399
400 400 def run(self):
@@ -413,7 +413,7 b' class SubSocketChannel(ZMQSocketChannel):'
413 413
414 414 def stop(self):
415 415 self.ioloop.stop()
416 super(SubSocketChannel, self).stop()
416 super(IOPubChannel, self).stop()
417 417
418 418 def call_handlers(self, msg):
419 419 """This method is called in the ioloop thread when a message arrives.
@@ -455,13 +455,13 b' class SubSocketChannel(ZMQSocketChannel):'
455 455 self._flushed = True
456 456
457 457
458 class StdInSocketChannel(ZMQSocketChannel):
458 class StdInChannel(ZMQSocketChannel):
459 459 """A reply channel to handle raw_input requests that the kernel makes."""
460 460
461 461 msg_queue = None
462 462
463 463 def __init__(self, context, session, address):
464 super(StdInSocketChannel, self).__init__(context, session, address)
464 super(StdInChannel, self).__init__(context, session, address)
465 465 self.ioloop = ioloop.IOLoop()
466 466
467 467 def run(self):
@@ -480,7 +480,7 b' class StdInSocketChannel(ZMQSocketChannel):'
480 480
481 481 def stop(self):
482 482 self.ioloop.stop()
483 super(StdInSocketChannel, self).stop()
483 super(StdInChannel, self).stop()
484 484
485 485 def call_handlers(self, msg):
486 486 """This method is called in the ioloop thread when a message arrives.
@@ -499,7 +499,7 b' class StdInSocketChannel(ZMQSocketChannel):'
499 499 self._queue_send(msg)
500 500
501 501
502 class HBSocketChannel(ZMQSocketChannel):
502 class HBChannel(ZMQSocketChannel):
503 503 """The heartbeat channel which monitors the kernel heartbeat.
504 504
505 505 Note that the heartbeat channel is paused by default. As long as you start
@@ -515,7 +515,7 b' class HBSocketChannel(ZMQSocketChannel):'
515 515 _beating = None
516 516
517 517 def __init__(self, context, session, address):
518 super(HBSocketChannel, self).__init__(context, session, address)
518 super(HBChannel, self).__init__(context, session, address)
519 519 self._running = False
520 520 self._pause =True
521 521 self.poller = zmq.Poller()
@@ -622,7 +622,7 b' class HBSocketChannel(ZMQSocketChannel):'
622 622
623 623 def stop(self):
624 624 self._running = False
625 super(HBSocketChannel, self).stop()
625 super(HBChannel, self).stop()
626 626
627 627 def call_handlers(self, since_last_heartbeat):
628 628 """This method is called in the ioloop thread when a message arrives.
@@ -678,15 +678,15 b' class KernelManager(Configurable):'
678 678 hb_port = Integer(0)
679 679
680 680 # The classes to use for the various channels.
681 shell_channel_class = Type(ShellSocketChannel)
682 sub_channel_class = Type(SubSocketChannel)
683 stdin_channel_class = Type(StdInSocketChannel)
684 hb_channel_class = Type(HBSocketChannel)
681 shell_channel_class = Type(ShellChannel)
682 iopub_channel_class = Type(IOPubChannel)
683 stdin_channel_class = Type(StdInChannel)
684 hb_channel_class = Type(HBChannel)
685 685
686 686 # Protected traits.
687 687 _launch_args = Any
688 688 _shell_channel = Any
689 _sub_channel = Any
689 _iopub_channel = Any
690 690 _stdin_channel = Any
691 691 _hb_channel = Any
692 692 _connection_file_written=Bool(False)
@@ -709,7 +709,7 b' class KernelManager(Configurable):'
709 709 if shell:
710 710 self.shell_channel.start()
711 711 if sub:
712 self.sub_channel.start()
712 self.iopub_channel.start()
713 713 if stdin:
714 714 self.stdin_channel.start()
715 715 self.shell_channel.allow_stdin = True
@@ -723,8 +723,8 b' class KernelManager(Configurable):'
723 723 """
724 724 if self.shell_channel.is_alive():
725 725 self.shell_channel.stop()
726 if self.sub_channel.is_alive():
727 self.sub_channel.stop()
726 if self.iopub_channel.is_alive():
727 self.iopub_channel.stop()
728 728 if self.stdin_channel.is_alive():
729 729 self.stdin_channel.stop()
730 730 if self.hb_channel.is_alive():
@@ -733,7 +733,7 b' class KernelManager(Configurable):'
733 733 @property
734 734 def channels_running(self):
735 735 """Are any of the channels created and running?"""
736 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
736 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
737 737 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
738 738
739 739 #--------------------------------------------------------------------------
@@ -1007,14 +1007,14 b' class KernelManager(Configurable):'
1007 1007 return self._shell_channel
1008 1008
1009 1009 @property
1010 def sub_channel(self):
1010 def iopub_channel(self):
1011 1011 """Get the SUB socket channel object."""
1012 if self._sub_channel is None:
1013 self._sub_channel = self.sub_channel_class(self.context,
1012 if self._iopub_channel is None:
1013 self._iopub_channel = self.iopub_channel_class(self.context,
1014 1014 self.session,
1015 1015 self._make_url(self.iopub_port),
1016 1016 )
1017 return self._sub_channel
1017 return self._iopub_channel
1018 1018
1019 1019 @property
1020 1020 def stdin_channel(self):
@@ -48,7 +48,7 b' def teardown():'
48 48
49 49 def flush_channels():
50 50 """flush any messages waiting on the queue"""
51 for channel in (KM.shell_channel, KM.sub_channel):
51 for channel in (KM.shell_channel, KM.iopub_channel):
52 52 while True:
53 53 try:
54 54 msg = channel.get_msg(block=True, timeout=0.1)
@@ -61,7 +61,7 b' def flush_channels():'
61 61 def execute(code='', **kwargs):
62 62 """wrapper for doing common steps for validating an execution request"""
63 63 shell = KM.shell_channel
64 sub = KM.sub_channel
64 sub = KM.iopub_channel
65 65
66 66 msg_id = shell.execute(code=code, **kwargs)
67 67 reply = shell.get_msg(timeout=2)
@@ -310,23 +310,23 b' def test_execute_silent():'
310 310 msg_id, reply = execute(code='x=1', silent=True)
311 311
312 312 # flush status=idle
313 status = KM.sub_channel.get_msg(timeout=2)
313 status = KM.iopub_channel.get_msg(timeout=2)
314 314 for tst in validate_message(status, 'status', msg_id):
315 315 yield tst
316 316 nt.assert_equal(status['content']['execution_state'], 'idle')
317 317
318 yield nt.assert_raises(Empty, KM.sub_channel.get_msg, timeout=0.1)
318 yield nt.assert_raises(Empty, KM.iopub_channel.get_msg, timeout=0.1)
319 319 count = reply['execution_count']
320 320
321 321 msg_id, reply = execute(code='x=2', silent=True)
322 322
323 323 # flush status=idle
324 status = KM.sub_channel.get_msg(timeout=2)
324 status = KM.iopub_channel.get_msg(timeout=2)
325 325 for tst in validate_message(status, 'status', msg_id):
326 326 yield tst
327 327 yield nt.assert_equal(status['content']['execution_state'], 'idle')
328 328
329 yield nt.assert_raises(Empty, KM.sub_channel.get_msg, timeout=0.1)
329 yield nt.assert_raises(Empty, KM.iopub_channel.get_msg, timeout=0.1)
330 330 count_2 = reply['execution_count']
331 331 yield nt.assert_equal(count_2, count)
332 332
@@ -339,7 +339,7 b' def test_execute_error():'
339 339 yield nt.assert_equal(reply['status'], 'error')
340 340 yield nt.assert_equal(reply['ename'], 'ZeroDivisionError')
341 341
342 pyerr = KM.sub_channel.get_msg(timeout=2)
342 pyerr = KM.iopub_channel.get_msg(timeout=2)
343 343 for tst in validate_message(pyerr, 'pyerr', msg_id):
344 344 yield tst
345 345
@@ -475,7 +475,7 b' def test_stream():'
475 475
476 476 msg_id, reply = execute("print('hi')")
477 477
478 stdout = KM.sub_channel.get_msg(timeout=2)
478 stdout = KM.iopub_channel.get_msg(timeout=2)
479 479 for tst in validate_message(stdout, 'stream', msg_id):
480 480 yield tst
481 481 content = stdout['content']
@@ -489,7 +489,7 b' def test_display_data():'
489 489
490 490 msg_id, reply = execute("from IPython.core.display import display; display(1)")
491 491
492 display = KM.sub_channel.get_msg(timeout=2)
492 display = KM.iopub_channel.get_msg(timeout=2)
493 493 for tst in validate_message(display, 'display_data', parent=msg_id):
494 494 yield tst
495 495 data = display['content']['data']
General Comments 0
You need to be logged in to leave comments. Login now