##// END OF EJS Templates
Cleanup naming and organization of channels....
Brian Granger -
Show More
@@ -30,7 +30,7 b' class BaseFrontendMixin(object):'
30 old_manager.stopped_channels.disconnect(self._stopped_channels)
30 old_manager.stopped_channels.disconnect(self._stopped_channels)
31
31
32 # Disconnect the old kernel manager's channels.
32 # Disconnect the old kernel manager's channels.
33 old_manager.sub_channel.message_received.disconnect(self._dispatch)
33 old_manager.iopub_channel.message_received.disconnect(self._dispatch)
34 old_manager.shell_channel.message_received.disconnect(self._dispatch)
34 old_manager.shell_channel.message_received.disconnect(self._dispatch)
35 old_manager.stdin_channel.message_received.disconnect(self._dispatch)
35 old_manager.stdin_channel.message_received.disconnect(self._dispatch)
36 old_manager.hb_channel.kernel_died.disconnect(
36 old_manager.hb_channel.kernel_died.disconnect(
@@ -51,7 +51,7 b' class BaseFrontendMixin(object):'
51 kernel_manager.stopped_channels.connect(self._stopped_channels)
51 kernel_manager.stopped_channels.connect(self._stopped_channels)
52
52
53 # Connect the new kernel manager's channels.
53 # Connect the new kernel manager's channels.
54 kernel_manager.sub_channel.message_received.connect(self._dispatch)
54 kernel_manager.iopub_channel.message_received.connect(self._dispatch)
55 kernel_manager.shell_channel.message_received.connect(self._dispatch)
55 kernel_manager.shell_channel.message_received.connect(self._dispatch)
56 kernel_manager.stdin_channel.message_received.connect(self._dispatch)
56 kernel_manager.stdin_channel.message_received.connect(self._dispatch)
57 kernel_manager.hb_channel.kernel_died.connect(self._handle_kernel_died)
57 kernel_manager.hb_channel.kernel_died.connect(self._handle_kernel_died)
@@ -69,7 +69,7 b' class QtShellChannelMixin(ChannelQObject):'
69 _handlers_called = False
69 _handlers_called = False
70
70
71 #---------------------------------------------------------------------------
71 #---------------------------------------------------------------------------
72 # 'ShellSocketChannel' interface
72 # 'ShellChannel' interface
73 #---------------------------------------------------------------------------
73 #---------------------------------------------------------------------------
74
74
75 def call_handlers(self, msg):
75 def call_handlers(self, msg):
@@ -98,7 +98,7 b' class QtShellChannelMixin(ChannelQObject):'
98 self._handlers_called = False
98 self._handlers_called = False
99
99
100
100
101 class QtSubChannelMixin(ChannelQObject):
101 class QtIOPubChannelMixin(ChannelQObject):
102
102
103 # Emitted when any message is received.
103 # Emitted when any message is received.
104 message_received = QtCore.Signal(object)
104 message_received = QtCore.Signal(object)
@@ -126,7 +126,7 b' class QtSubChannelMixin(ChannelQObject):'
126 shutdown_reply_received = QtCore.Signal(object)
126 shutdown_reply_received = QtCore.Signal(object)
127
127
128 #---------------------------------------------------------------------------
128 #---------------------------------------------------------------------------
129 # 'SubSocketChannel' interface
129 # 'IOPubChannel' interface
130 #---------------------------------------------------------------------------
130 #---------------------------------------------------------------------------
131
131
132 def call_handlers(self, msg):
132 def call_handlers(self, msg):
@@ -145,7 +145,7 b' class QtSubChannelMixin(ChannelQObject):'
145 def flush(self):
145 def flush(self):
146 """ Reimplemented to ensure that signals are dispatched immediately.
146 """ Reimplemented to ensure that signals are dispatched immediately.
147 """
147 """
148 super(QtSubChannelMixin, self).flush()
148 super(QtIOPubChannelMixin, self).flush()
149 QtCore.QCoreApplication.instance().processEvents()
149 QtCore.QCoreApplication.instance().processEvents()
150
150
151
151
@@ -158,7 +158,7 b' class QtStdInChannelMixin(ChannelQObject):'
158 input_requested = QtCore.Signal(object)
158 input_requested = QtCore.Signal(object)
159
159
160 #---------------------------------------------------------------------------
160 #---------------------------------------------------------------------------
161 # 'StdInSocketChannel' interface
161 # 'StdInChannel' interface
162 #---------------------------------------------------------------------------
162 #---------------------------------------------------------------------------
163
163
164 def call_handlers(self, msg):
164 def call_handlers(self, msg):
@@ -179,7 +179,7 b' class QtHBChannelMixin(ChannelQObject):'
179 kernel_died = QtCore.Signal(object)
179 kernel_died = QtCore.Signal(object)
180
180
181 #---------------------------------------------------------------------------
181 #---------------------------------------------------------------------------
182 # 'HBSocketChannel' interface
182 # 'HBChannel' interface
183 #---------------------------------------------------------------------------
183 #---------------------------------------------------------------------------
184
184
185 def call_handlers(self, since_last_heartbeat):
185 def call_handlers(self, since_last_heartbeat):
@@ -205,7 +205,7 b' class QtKernelManagerMixin(HasTraits, SuperQObject):'
205 stopped_channels = QtCore.Signal()
205 stopped_channels = QtCore.Signal()
206
206
207 # Use Qt-specific channel classes that emit signals.
207 # Use Qt-specific channel classes that emit signals.
208 sub_channel_class = Type(QtSubChannelMixin)
208 iopub_channel_class = Type(QtIOPubChannelMixin)
209 shell_channel_class = Type(QtShellChannelMixin)
209 shell_channel_class = Type(QtShellChannelMixin)
210 stdin_channel_class = Type(QtStdInChannelMixin)
210 stdin_channel_class = Type(QtStdInChannelMixin)
211 hb_channel_class = Type(QtHBChannelMixin)
211 hb_channel_class = Type(QtHBChannelMixin)
@@ -396,7 +396,7 b' class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin):'
396 if info and info.kind == 'user' and not self._hidden:
396 if info and info.kind == 'user' and not self._hidden:
397 # Make sure that all output from the SUB channel has been processed
397 # Make sure that all output from the SUB channel has been processed
398 # before writing a new prompt.
398 # before writing a new prompt.
399 self.kernel_manager.sub_channel.flush()
399 self.kernel_manager.iopub_channel.flush()
400
400
401 # Reset the ANSI style information to prevent bad text in stdout
401 # Reset the ANSI style information to prevent bad text in stdout
402 # from messing up our colors. We're not a true terminal so we're
402 # from messing up our colors. We're not a true terminal so we're
@@ -431,7 +431,7 b' class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin):'
431
431
432 # Make sure that all output from the SUB channel has been processed
432 # Make sure that all output from the SUB channel has been processed
433 # before entering readline mode.
433 # before entering readline mode.
434 self.kernel_manager.sub_channel.flush()
434 self.kernel_manager.iopub_channel.flush()
435
435
436 def callback(line):
436 def callback(line):
437 self.kernel_manager.stdin_channel.input(line)
437 self.kernel_manager.stdin_channel.input(line)
@@ -3,23 +3,23 b''
3
3
4 # Local imports.
4 # Local imports.
5 from IPython.inprocess.kernelmanager import \
5 from IPython.inprocess.kernelmanager import \
6 ShellInProcessChannel, SubInProcessChannel, StdInInProcessChannel, \
6 InProcessShellChannel, InProcessIOPubChannel, InProcessStdInChannel, \
7 HBInProcessChannel, InProcessKernelManager
7 InProcessHBChannel, InProcessKernelManager
8 from IPython.utils.traitlets import Type
8 from IPython.utils.traitlets import Type
9 from base_kernelmanager import QtShellChannelMixin, QtSubChannelMixin, \
9 from base_kernelmanager import QtShellChannelMixin, QtIOPubChannelMixin, \
10 QtStdInChannelMixin, QtHBChannelMixin, QtKernelManagerMixin
10 QtStdInChannelMixin, QtHBChannelMixin, QtKernelManagerMixin
11
11
12
12
13 class QtShellInProcessChannel(QtShellChannelMixin, ShellInProcessChannel):
13 class QtInProcessShellChannel(QtShellChannelMixin, InProcessShellChannel):
14 pass
14 pass
15
15
16 class QtSubInProcessChannel(QtSubChannelMixin, SubInProcessChannel):
16 class QtInProcessIOPubChannel(QtIOPubChannelMixin, InProcessIOPubChannel):
17 pass
17 pass
18
18
19 class QtStdInInProcessChannel(QtStdInChannelMixin, StdInInProcessChannel):
19 class QtInProcessStdInChannel(QtStdInChannelMixin, InProcessStdInChannel):
20 pass
20 pass
21
21
22 class QtHBInProcessChannel(QtHBChannelMixin, HBInProcessChannel):
22 class QtInProcessHBChannel(QtHBChannelMixin, InProcessHBChannel):
23 pass
23 pass
24
24
25
25
@@ -27,7 +27,7 b' class QtInProcessKernelManager(QtKernelManagerMixin, InProcessKernelManager):'
27 """ An in-process KernelManager with signals and slots.
27 """ An in-process KernelManager with signals and slots.
28 """
28 """
29
29
30 sub_channel_class = Type(QtSubInProcessChannel)
30 iopub_channel_class = Type(QtInProcessIOPubChannel)
31 shell_channel_class = Type(QtShellInProcessChannel)
31 shell_channel_class = Type(QtInProcessShellChannel)
32 stdin_channel_class = Type(QtStdInInProcessChannel)
32 stdin_channel_class = Type(QtInProcessStdInChannel)
33 hb_channel_class = Type(QtHBInProcessChannel)
33 hb_channel_class = Type(QtInProcessHBChannel)
@@ -3,22 +3,22 b''
3
3
4 # Local imports.
4 # Local imports.
5 from IPython.utils.traitlets import Type
5 from IPython.utils.traitlets import Type
6 from IPython.zmq.kernelmanager import ShellSocketChannel, SubSocketChannel, \
6 from IPython.zmq.kernelmanager import ShellChannel, IOPubChannel, \
7 StdInSocketChannel, HBSocketChannel, KernelManager
7 StdInChannel, HBChannel, KernelManager
8 from base_kernelmanager import QtShellChannelMixin, QtSubChannelMixin, \
8 from base_kernelmanager import QtShellChannelMixin, QtIOPubChannelMixin, \
9 QtStdInChannelMixin, QtHBChannelMixin, QtKernelManagerMixin
9 QtStdInChannelMixin, QtHBChannelMixin, QtKernelManagerMixin
10
10
11
11
12 class QtShellSocketChannel(QtShellChannelMixin, ShellSocketChannel):
12 class QtShellChannel(QtShellChannelMixin, ShellChannel):
13 pass
13 pass
14
14
15 class QtSubSocketChannel(QtSubChannelMixin, SubSocketChannel):
15 class QtIOPubChannel(QtIOPubChannelMixin, IOPubChannel):
16 pass
16 pass
17
17
18 class QtStdInSocketChannel(QtStdInChannelMixin, StdInSocketChannel):
18 class QtStdInChannel(QtStdInChannelMixin, StdInChannel):
19 pass
19 pass
20
20
21 class QtHBSocketChannel(QtHBChannelMixin, HBSocketChannel):
21 class QtHBChannel(QtHBChannelMixin, HBChannel):
22 pass
22 pass
23
23
24
24
@@ -26,7 +26,7 b' class QtKernelManager(QtKernelManagerMixin, KernelManager):'
26 """ A KernelManager that provides signals and slots.
26 """ A KernelManager that provides signals and slots.
27 """
27 """
28
28
29 sub_channel_class = Type(QtSubSocketChannel)
29 iopub_channel_class = Type(QtIOPubChannel)
30 shell_channel_class = Type(QtShellSocketChannel)
30 shell_channel_class = Type(QtShellChannel)
31 stdin_channel_class = Type(QtStdInSocketChannel)
31 stdin_channel_class = Type(QtStdInChannel)
32 hb_channel_class = Type(QtHBSocketChannel)
32 hb_channel_class = Type(QtHBChannel)
@@ -211,8 +211,8 b' class ZMQTerminalInteractiveShell(TerminalInteractiveShell):'
211 sub_msg: message receive from kernel in the sub socket channel
211 sub_msg: message receive from kernel in the sub socket channel
212 capture by kernel manager.
212 capture by kernel manager.
213 """
213 """
214 while self.km.sub_channel.msg_ready():
214 while self.km.iopub_channel.msg_ready():
215 sub_msg = self.km.sub_channel.get_msg()
215 sub_msg = self.km.iopub_channel.get_msg()
216 msg_type = sub_msg['header']['msg_type']
216 msg_type = sub_msg['header']['msg_type']
217 parent = sub_msg["parent_header"]
217 parent = sub_msg["parent_header"]
218 if (not parent) or self.session_id == parent['session']:
218 if (not parent) or self.session_id == parent['session']:
@@ -21,51 +21,22 b' from threading import Event'
21 # Local imports.
21 # Local imports.
22 from IPython.utils.io import raw_print
22 from IPython.utils.io import raw_print
23 from IPython.utils.traitlets import Type
23 from IPython.utils.traitlets import Type
24 from kernelmanager import InProcessKernelManager, ShellInProcessChannel, \
24 from kernelmanager import InProcessKernelManager, InProcessShellChannel, \
25 SubInProcessChannel, StdInInProcessChannel
25 InProcessIOPubChannel, InProcessStdInChannel
26 from IPython.zmq.blockingkernelmanager import BlockingChannelMixin
26
27
27 #-----------------------------------------------------------------------------
28 # Utility classes
29 #-----------------------------------------------------------------------------
30
31 class BlockingChannelMixin(object):
32
33 def __init__(self, *args, **kwds):
34 super(BlockingChannelMixin, self).__init__(*args, **kwds)
35 self._in_queue = Queue.Queue()
36
37 def call_handlers(self, msg):
38 self._in_queue.put(msg)
39
40 def get_msg(self, block=True, timeout=None):
41 """ Gets a message if there is one that is ready. """
42 return self._in_queue.get(block, timeout)
43
44 def get_msgs(self):
45 """ Get all messages that are currently ready. """
46 msgs = []
47 while True:
48 try:
49 msgs.append(self.get_msg(block=False))
50 except Queue.Empty:
51 break
52 return msgs
53
54 def msg_ready(self):
55 """ Is there a message that has been received? """
56 return not self._in_queue.empty()
57
28
58 #-----------------------------------------------------------------------------
29 #-----------------------------------------------------------------------------
59 # Blocking kernel manager
30 # Blocking kernel manager
60 #-----------------------------------------------------------------------------
31 #-----------------------------------------------------------------------------
61
32
62 class BlockingShellInProcessChannel(BlockingChannelMixin, ShellInProcessChannel):
33 class BlockingInProcessShellChannel(BlockingChannelMixin, InProcessShellChannel):
63 pass
34 pass
64
35
65 class BlockingSubInProcessChannel(BlockingChannelMixin, SubInProcessChannel):
36 class BlockingInProcessIOPubChannel(BlockingChannelMixin, InProcessIOPubChannel):
66 pass
37 pass
67
38
68 class BlockingStdInInProcessChannel(BlockingChannelMixin, StdInInProcessChannel):
39 class BlockingInProcessStdInChannel(BlockingChannelMixin, InProcessStdInChannel):
69
40
70 def call_handlers(self, msg):
41 def call_handlers(self, msg):
71 """ Overridden for the in-process channel.
42 """ Overridden for the in-process channel.
@@ -82,6 +53,6 b' class BlockingStdInInProcessChannel(BlockingChannelMixin, StdInInProcessChannel)'
82 class BlockingInProcessKernelManager(InProcessKernelManager):
53 class BlockingInProcessKernelManager(InProcessKernelManager):
83
54
84 # The classes to use for the various channels.
55 # The classes to use for the various channels.
85 shell_channel_class = Type(BlockingShellInProcessChannel)
56 shell_channel_class = Type(BlockingInProcessShellChannel)
86 sub_channel_class = Type(BlockingSubInProcessChannel)
57 iopub_channel_class = Type(BlockingInProcessIOPubChannel)
87 stdin_channel_class = Type(BlockingStdInInProcessChannel)
58 stdin_channel_class = Type(BlockingInProcessStdInChannel)
@@ -123,7 +123,7 b' class InProcessKernel(Kernel):'
123 """
123 """
124 ident, msg = self.session.recv(self.iopub_socket, copy=False)
124 ident, msg = self.session.recv(self.iopub_socket, copy=False)
125 for frontend in self.frontends:
125 for frontend in self.frontends:
126 frontend.sub_channel.call_handlers(msg)
126 frontend.iopub_channel.call_handlers(msg)
127
127
128 #------ Trait initializers -----------------------------------------------
128 #------ Trait initializers -----------------------------------------------
129
129
@@ -71,7 +71,7 b' class InProcessChannel(object):'
71 raise NotImplementedError
71 raise NotImplementedError
72
72
73
73
74 class ShellInProcessChannel(InProcessChannel):
74 class InProcessShellChannel(InProcessChannel):
75 """The DEALER channel for issues request/replies to the kernel.
75 """The DEALER channel for issues request/replies to the kernel.
76 """
76 """
77
77
@@ -240,7 +240,7 b' class ShellInProcessChannel(InProcessChannel):'
240 self.call_handlers_later(reply_msg)
240 self.call_handlers_later(reply_msg)
241
241
242
242
243 class SubInProcessChannel(InProcessChannel):
243 class InProcessIOPubChannel(InProcessChannel):
244 """The SUB channel which listens for messages that the kernel publishes.
244 """The SUB channel which listens for messages that the kernel publishes.
245 """
245 """
246
246
@@ -252,7 +252,7 b' class SubInProcessChannel(InProcessChannel):'
252 pass
252 pass
253
253
254
254
255 class StdInInProcessChannel(InProcessChannel):
255 class InProcessStdInChannel(InProcessChannel):
256 """ A reply channel to handle raw_input requests that the kernel makes. """
256 """ A reply channel to handle raw_input requests that the kernel makes. """
257
257
258 def input(self, string):
258 def input(self, string):
@@ -264,13 +264,13 b' class StdInInProcessChannel(InProcessChannel):'
264 kernel.raw_input_str = string
264 kernel.raw_input_str = string
265
265
266
266
267 class HBInProcessChannel(InProcessChannel):
267 class InProcessHBChannel(InProcessChannel):
268 """ A dummy heartbeat channel. """
268 """ A dummy heartbeat channel. """
269
269
270 time_to_dead = 3.0
270 time_to_dead = 3.0
271
271
272 def __init__(self, *args, **kwds):
272 def __init__(self, *args, **kwds):
273 super(HBInProcessChannel, self).__init__(*args, **kwds)
273 super(InProcessHBChannel, self).__init__(*args, **kwds)
274 self._pause = True
274 self._pause = True
275
275
276 def pause(self):
276 def pause(self):
@@ -310,14 +310,14 b' class InProcessKernelManager(HasTraits):'
310 kernel = Instance('IPython.inprocess.ipkernel.InProcessKernel')
310 kernel = Instance('IPython.inprocess.ipkernel.InProcessKernel')
311
311
312 # The classes to use for the various channels.
312 # The classes to use for the various channels.
313 shell_channel_class = Type(ShellInProcessChannel)
313 shell_channel_class = Type(InProcessShellChannel)
314 sub_channel_class = Type(SubInProcessChannel)
314 iopub_channel_class = Type(InProcessIOPubChannel)
315 stdin_channel_class = Type(StdInInProcessChannel)
315 stdin_channel_class = Type(InProcessStdInChannel)
316 hb_channel_class = Type(HBInProcessChannel)
316 hb_channel_class = Type(InProcessHBChannel)
317
317
318 # Protected traits.
318 # Protected traits.
319 _shell_channel = Any
319 _shell_channel = Any
320 _sub_channel = Any
320 _iopub_channel = Any
321 _stdin_channel = Any
321 _stdin_channel = Any
322 _hb_channel = Any
322 _hb_channel = Any
323
323
@@ -331,7 +331,7 b' class InProcessKernelManager(HasTraits):'
331 if shell:
331 if shell:
332 self.shell_channel.start()
332 self.shell_channel.start()
333 if sub:
333 if sub:
334 self.sub_channel.start()
334 self.iopub_channel.start()
335 if stdin:
335 if stdin:
336 self.stdin_channel.start()
336 self.stdin_channel.start()
337 self.shell_channel.allow_stdin = True
337 self.shell_channel.allow_stdin = True
@@ -345,8 +345,8 b' class InProcessKernelManager(HasTraits):'
345 """
345 """
346 if self.shell_channel.is_alive():
346 if self.shell_channel.is_alive():
347 self.shell_channel.stop()
347 self.shell_channel.stop()
348 if self.sub_channel.is_alive():
348 if self.iopub_channel.is_alive():
349 self.sub_channel.stop()
349 self.iopub_channel.stop()
350 if self.stdin_channel.is_alive():
350 if self.stdin_channel.is_alive():
351 self.stdin_channel.stop()
351 self.stdin_channel.stop()
352 if self.hb_channel.is_alive():
352 if self.hb_channel.is_alive():
@@ -355,7 +355,7 b' class InProcessKernelManager(HasTraits):'
355 @property
355 @property
356 def channels_running(self):
356 def channels_running(self):
357 """ Are any of the channels created and running? """
357 """ Are any of the channels created and running? """
358 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
358 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
359 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
359 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
360
360
361 #--------------------------------------------------------------------------
361 #--------------------------------------------------------------------------
@@ -421,11 +421,11 b' class InProcessKernelManager(HasTraits):'
421 return self._shell_channel
421 return self._shell_channel
422
422
423 @property
423 @property
424 def sub_channel(self):
424 def iopub_channel(self):
425 """Get the SUB socket channel object."""
425 """Get the SUB socket channel object."""
426 if self._sub_channel is None:
426 if self._iopub_channel is None:
427 self._sub_channel = self.sub_channel_class(self)
427 self._iopub_channel = self.iopub_channel_class(self)
428 return self._sub_channel
428 return self._iopub_channel
429
429
430 @property
430 @property
431 def stdin_channel(self):
431 def stdin_channel(self):
@@ -80,7 +80,7 b' def get_stream_message(kernel_manager, timeout=5):'
80 """ Gets a single stream message synchronously from the sub channel.
80 """ Gets a single stream message synchronously from the sub channel.
81 """
81 """
82 while True:
82 while True:
83 msg = kernel_manager.sub_channel.get_msg(timeout=timeout)
83 msg = kernel_manager.iopub_channel.get_msg(timeout=timeout)
84 if msg['header']['msg_type'] == 'stream':
84 if msg['header']['msg_type'] == 'stream':
85 return msg
85 return msg
86
86
@@ -13,26 +13,56 b' Useful for test suites and blocking terminal interfaces.'
13 # Imports
13 # Imports
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 # Local imports.
17 from IPython.inprocess.blockingkernelmanager import BlockingChannelMixin
18 from IPython.utils.traitlets import Type
16 from IPython.utils.traitlets import Type
19 from kernelmanager import KernelManager, SubSocketChannel, HBSocketChannel, \
17 from kernelmanager import KernelManager, IOPubChannel, HBChannel, \
20 ShellSocketChannel, StdInSocketChannel
18 ShellChannel, StdInChannel
21
19
22 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
23 # Blocking kernel manager
21 # Blocking kernel manager
24 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
25
23
26 class BlockingSubSocketChannel(BlockingChannelMixin, SubSocketChannel):
24
25 class BlockingChannelMixin(object):
26
27 def __init__(self, *args, **kwds):
28 super(BlockingChannelMixin, self).__init__(*args, **kwds)
29 self._in_queue = Queue.Queue()
30
31 def call_handlers(self, msg):
32 self._in_queue.put(msg)
33
34 def get_msg(self, block=True, timeout=None):
35 """ Gets a message if there is one that is ready. """
36 return self._in_queue.get(block, timeout)
37
38 def get_msgs(self):
39 """ Get all messages that are currently ready. """
40 msgs = []
41 while True:
42 try:
43 msgs.append(self.get_msg(block=False))
44 except Queue.Empty:
45 break
46 return msgs
47
48 def msg_ready(self):
49 """ Is there a message that has been received? """
50 return not self._in_queue.empty()
51
52
53 class BlockingIOPubChannel(BlockingChannelMixin, IOPubChannel):
27 pass
54 pass
28
55
29 class BlockingShellSocketChannel(BlockingChannelMixin, ShellSocketChannel):
56
57 class BlockingShellChannel(BlockingChannelMixin, ShellChannel):
30 pass
58 pass
31
59
32 class BlockingStdInSocketChannel(BlockingChannelMixin, StdInSocketChannel):
60
61 class BlockingStdInChannel(BlockingChannelMixin, StdInChannel):
33 pass
62 pass
34
63
35 class BlockingHBSocketChannel(HBSocketChannel):
64
65 class BlockingHBChannel(HBChannel):
36
66
37 # This kernel needs quicker monitoring, shorten to 1 sec.
67 # This kernel needs quicker monitoring, shorten to 1 sec.
38 # less than 0.5s is unreliable, and will get occasional
68 # less than 0.5s is unreliable, and will get occasional
@@ -43,11 +73,12 b' class BlockingHBSocketChannel(HBSocketChannel):'
43 """ Pause beating on missed heartbeat. """
73 """ Pause beating on missed heartbeat. """
44 pass
74 pass
45
75
76
46 class BlockingKernelManager(KernelManager):
77 class BlockingKernelManager(KernelManager):
47
78
48 # The classes to use for the various channels.
79 # The classes to use for the various channels.
49 shell_channel_class = Type(BlockingShellSocketChannel)
80 shell_channel_class = Type(BlockingShellChannel)
50 sub_channel_class = Type(BlockingSubSocketChannel)
81 iopub_channel_class = Type(BlockingIOPubChannel)
51 stdin_channel_class = Type(BlockingStdInSocketChannel)
82 stdin_channel_class = Type(BlockingStdInChannel)
52 hb_channel_class = Type(BlockingHBSocketChannel)
83 hb_channel_class = Type(BlockingHBChannel)
53
84
@@ -180,7 +180,7 b' class ZMQSocketChannel(Thread):'
180
180
181
181
182
182
183 class ShellSocketChannel(ZMQSocketChannel):
183 class ShellChannel(ZMQSocketChannel):
184 """The DEALER channel for issues request/replies to the kernel.
184 """The DEALER channel for issues request/replies to the kernel.
185 """
185 """
186
186
@@ -189,7 +189,7 b' class ShellSocketChannel(ZMQSocketChannel):'
189 allow_stdin = True
189 allow_stdin = True
190
190
191 def __init__(self, context, session, address):
191 def __init__(self, context, session, address):
192 super(ShellSocketChannel, self).__init__(context, session, address)
192 super(ShellChannel, self).__init__(context, session, address)
193 self.ioloop = ioloop.IOLoop()
193 self.ioloop = ioloop.IOLoop()
194
194
195 def run(self):
195 def run(self):
@@ -207,7 +207,7 b' class ShellSocketChannel(ZMQSocketChannel):'
207
207
208 def stop(self):
208 def stop(self):
209 self.ioloop.stop()
209 self.ioloop.stop()
210 super(ShellSocketChannel, self).stop()
210 super(ShellChannel, self).stop()
211
211
212 def call_handlers(self, msg):
212 def call_handlers(self, msg):
213 """This method is called in the ioloop thread when a message arrives.
213 """This method is called in the ioloop thread when a message arrives.
@@ -389,12 +389,12 b' class ShellSocketChannel(ZMQSocketChannel):'
389
389
390
390
391
391
392 class SubSocketChannel(ZMQSocketChannel):
392 class IOPubChannel(ZMQSocketChannel):
393 """The SUB channel which listens for messages that the kernel publishes.
393 """The SUB channel which listens for messages that the kernel publishes.
394 """
394 """
395
395
396 def __init__(self, context, session, address):
396 def __init__(self, context, session, address):
397 super(SubSocketChannel, self).__init__(context, session, address)
397 super(IOPubChannel, self).__init__(context, session, address)
398 self.ioloop = ioloop.IOLoop()
398 self.ioloop = ioloop.IOLoop()
399
399
400 def run(self):
400 def run(self):
@@ -413,7 +413,7 b' class SubSocketChannel(ZMQSocketChannel):'
413
413
414 def stop(self):
414 def stop(self):
415 self.ioloop.stop()
415 self.ioloop.stop()
416 super(SubSocketChannel, self).stop()
416 super(IOPubChannel, self).stop()
417
417
418 def call_handlers(self, msg):
418 def call_handlers(self, msg):
419 """This method is called in the ioloop thread when a message arrives.
419 """This method is called in the ioloop thread when a message arrives.
@@ -455,13 +455,13 b' class SubSocketChannel(ZMQSocketChannel):'
455 self._flushed = True
455 self._flushed = True
456
456
457
457
458 class StdInSocketChannel(ZMQSocketChannel):
458 class StdInChannel(ZMQSocketChannel):
459 """A reply channel to handle raw_input requests that the kernel makes."""
459 """A reply channel to handle raw_input requests that the kernel makes."""
460
460
461 msg_queue = None
461 msg_queue = None
462
462
463 def __init__(self, context, session, address):
463 def __init__(self, context, session, address):
464 super(StdInSocketChannel, self).__init__(context, session, address)
464 super(StdInChannel, self).__init__(context, session, address)
465 self.ioloop = ioloop.IOLoop()
465 self.ioloop = ioloop.IOLoop()
466
466
467 def run(self):
467 def run(self):
@@ -480,7 +480,7 b' class StdInSocketChannel(ZMQSocketChannel):'
480
480
481 def stop(self):
481 def stop(self):
482 self.ioloop.stop()
482 self.ioloop.stop()
483 super(StdInSocketChannel, self).stop()
483 super(StdInChannel, self).stop()
484
484
485 def call_handlers(self, msg):
485 def call_handlers(self, msg):
486 """This method is called in the ioloop thread when a message arrives.
486 """This method is called in the ioloop thread when a message arrives.
@@ -499,7 +499,7 b' class StdInSocketChannel(ZMQSocketChannel):'
499 self._queue_send(msg)
499 self._queue_send(msg)
500
500
501
501
502 class HBSocketChannel(ZMQSocketChannel):
502 class HBChannel(ZMQSocketChannel):
503 """The heartbeat channel which monitors the kernel heartbeat.
503 """The heartbeat channel which monitors the kernel heartbeat.
504
504
505 Note that the heartbeat channel is paused by default. As long as you start
505 Note that the heartbeat channel is paused by default. As long as you start
@@ -515,7 +515,7 b' class HBSocketChannel(ZMQSocketChannel):'
515 _beating = None
515 _beating = None
516
516
517 def __init__(self, context, session, address):
517 def __init__(self, context, session, address):
518 super(HBSocketChannel, self).__init__(context, session, address)
518 super(HBChannel, self).__init__(context, session, address)
519 self._running = False
519 self._running = False
520 self._pause =True
520 self._pause =True
521 self.poller = zmq.Poller()
521 self.poller = zmq.Poller()
@@ -622,7 +622,7 b' class HBSocketChannel(ZMQSocketChannel):'
622
622
623 def stop(self):
623 def stop(self):
624 self._running = False
624 self._running = False
625 super(HBSocketChannel, self).stop()
625 super(HBChannel, self).stop()
626
626
627 def call_handlers(self, since_last_heartbeat):
627 def call_handlers(self, since_last_heartbeat):
628 """This method is called in the ioloop thread when a message arrives.
628 """This method is called in the ioloop thread when a message arrives.
@@ -678,15 +678,15 b' class KernelManager(Configurable):'
678 hb_port = Integer(0)
678 hb_port = Integer(0)
679
679
680 # The classes to use for the various channels.
680 # The classes to use for the various channels.
681 shell_channel_class = Type(ShellSocketChannel)
681 shell_channel_class = Type(ShellChannel)
682 sub_channel_class = Type(SubSocketChannel)
682 iopub_channel_class = Type(IOPubChannel)
683 stdin_channel_class = Type(StdInSocketChannel)
683 stdin_channel_class = Type(StdInChannel)
684 hb_channel_class = Type(HBSocketChannel)
684 hb_channel_class = Type(HBChannel)
685
685
686 # Protected traits.
686 # Protected traits.
687 _launch_args = Any
687 _launch_args = Any
688 _shell_channel = Any
688 _shell_channel = Any
689 _sub_channel = Any
689 _iopub_channel = Any
690 _stdin_channel = Any
690 _stdin_channel = Any
691 _hb_channel = Any
691 _hb_channel = Any
692 _connection_file_written=Bool(False)
692 _connection_file_written=Bool(False)
@@ -709,7 +709,7 b' class KernelManager(Configurable):'
709 if shell:
709 if shell:
710 self.shell_channel.start()
710 self.shell_channel.start()
711 if sub:
711 if sub:
712 self.sub_channel.start()
712 self.iopub_channel.start()
713 if stdin:
713 if stdin:
714 self.stdin_channel.start()
714 self.stdin_channel.start()
715 self.shell_channel.allow_stdin = True
715 self.shell_channel.allow_stdin = True
@@ -723,8 +723,8 b' class KernelManager(Configurable):'
723 """
723 """
724 if self.shell_channel.is_alive():
724 if self.shell_channel.is_alive():
725 self.shell_channel.stop()
725 self.shell_channel.stop()
726 if self.sub_channel.is_alive():
726 if self.iopub_channel.is_alive():
727 self.sub_channel.stop()
727 self.iopub_channel.stop()
728 if self.stdin_channel.is_alive():
728 if self.stdin_channel.is_alive():
729 self.stdin_channel.stop()
729 self.stdin_channel.stop()
730 if self.hb_channel.is_alive():
730 if self.hb_channel.is_alive():
@@ -733,7 +733,7 b' class KernelManager(Configurable):'
733 @property
733 @property
734 def channels_running(self):
734 def channels_running(self):
735 """Are any of the channels created and running?"""
735 """Are any of the channels created and running?"""
736 return (self.shell_channel.is_alive() or self.sub_channel.is_alive() or
736 return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
737 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
737 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
738
738
739 #--------------------------------------------------------------------------
739 #--------------------------------------------------------------------------
@@ -1007,14 +1007,14 b' class KernelManager(Configurable):'
1007 return self._shell_channel
1007 return self._shell_channel
1008
1008
1009 @property
1009 @property
1010 def sub_channel(self):
1010 def iopub_channel(self):
1011 """Get the SUB socket channel object."""
1011 """Get the SUB socket channel object."""
1012 if self._sub_channel is None:
1012 if self._iopub_channel is None:
1013 self._sub_channel = self.sub_channel_class(self.context,
1013 self._iopub_channel = self.iopub_channel_class(self.context,
1014 self.session,
1014 self.session,
1015 self._make_url(self.iopub_port),
1015 self._make_url(self.iopub_port),
1016 )
1016 )
1017 return self._sub_channel
1017 return self._iopub_channel
1018
1018
1019 @property
1019 @property
1020 def stdin_channel(self):
1020 def stdin_channel(self):
@@ -48,7 +48,7 b' def teardown():'
48
48
49 def flush_channels():
49 def flush_channels():
50 """flush any messages waiting on the queue"""
50 """flush any messages waiting on the queue"""
51 for channel in (KM.shell_channel, KM.sub_channel):
51 for channel in (KM.shell_channel, KM.iopub_channel):
52 while True:
52 while True:
53 try:
53 try:
54 msg = channel.get_msg(block=True, timeout=0.1)
54 msg = channel.get_msg(block=True, timeout=0.1)
@@ -61,7 +61,7 b' def flush_channels():'
61 def execute(code='', **kwargs):
61 def execute(code='', **kwargs):
62 """wrapper for doing common steps for validating an execution request"""
62 """wrapper for doing common steps for validating an execution request"""
63 shell = KM.shell_channel
63 shell = KM.shell_channel
64 sub = KM.sub_channel
64 sub = KM.iopub_channel
65
65
66 msg_id = shell.execute(code=code, **kwargs)
66 msg_id = shell.execute(code=code, **kwargs)
67 reply = shell.get_msg(timeout=2)
67 reply = shell.get_msg(timeout=2)
@@ -310,23 +310,23 b' def test_execute_silent():'
310 msg_id, reply = execute(code='x=1', silent=True)
310 msg_id, reply = execute(code='x=1', silent=True)
311
311
312 # flush status=idle
312 # flush status=idle
313 status = KM.sub_channel.get_msg(timeout=2)
313 status = KM.iopub_channel.get_msg(timeout=2)
314 for tst in validate_message(status, 'status', msg_id):
314 for tst in validate_message(status, 'status', msg_id):
315 yield tst
315 yield tst
316 nt.assert_equal(status['content']['execution_state'], 'idle')
316 nt.assert_equal(status['content']['execution_state'], 'idle')
317
317
318 yield nt.assert_raises(Empty, KM.sub_channel.get_msg, timeout=0.1)
318 yield nt.assert_raises(Empty, KM.iopub_channel.get_msg, timeout=0.1)
319 count = reply['execution_count']
319 count = reply['execution_count']
320
320
321 msg_id, reply = execute(code='x=2', silent=True)
321 msg_id, reply = execute(code='x=2', silent=True)
322
322
323 # flush status=idle
323 # flush status=idle
324 status = KM.sub_channel.get_msg(timeout=2)
324 status = KM.iopub_channel.get_msg(timeout=2)
325 for tst in validate_message(status, 'status', msg_id):
325 for tst in validate_message(status, 'status', msg_id):
326 yield tst
326 yield tst
327 yield nt.assert_equal(status['content']['execution_state'], 'idle')
327 yield nt.assert_equal(status['content']['execution_state'], 'idle')
328
328
329 yield nt.assert_raises(Empty, KM.sub_channel.get_msg, timeout=0.1)
329 yield nt.assert_raises(Empty, KM.iopub_channel.get_msg, timeout=0.1)
330 count_2 = reply['execution_count']
330 count_2 = reply['execution_count']
331 yield nt.assert_equal(count_2, count)
331 yield nt.assert_equal(count_2, count)
332
332
@@ -339,7 +339,7 b' def test_execute_error():'
339 yield nt.assert_equal(reply['status'], 'error')
339 yield nt.assert_equal(reply['status'], 'error')
340 yield nt.assert_equal(reply['ename'], 'ZeroDivisionError')
340 yield nt.assert_equal(reply['ename'], 'ZeroDivisionError')
341
341
342 pyerr = KM.sub_channel.get_msg(timeout=2)
342 pyerr = KM.iopub_channel.get_msg(timeout=2)
343 for tst in validate_message(pyerr, 'pyerr', msg_id):
343 for tst in validate_message(pyerr, 'pyerr', msg_id):
344 yield tst
344 yield tst
345
345
@@ -475,7 +475,7 b' def test_stream():'
475
475
476 msg_id, reply = execute("print('hi')")
476 msg_id, reply = execute("print('hi')")
477
477
478 stdout = KM.sub_channel.get_msg(timeout=2)
478 stdout = KM.iopub_channel.get_msg(timeout=2)
479 for tst in validate_message(stdout, 'stream', msg_id):
479 for tst in validate_message(stdout, 'stream', msg_id):
480 yield tst
480 yield tst
481 content = stdout['content']
481 content = stdout['content']
@@ -489,7 +489,7 b' def test_display_data():'
489
489
490 msg_id, reply = execute("from IPython.core.display import display; display(1)")
490 msg_id, reply = execute("from IPython.core.display import display; display(1)")
491
491
492 display = KM.sub_channel.get_msg(timeout=2)
492 display = KM.iopub_channel.get_msg(timeout=2)
493 for tst in validate_message(display, 'display_data', parent=msg_id):
493 for tst in validate_message(display, 'display_data', parent=msg_id):
494 yield tst
494 yield tst
495 data = display['content']['data']
495 data = display['content']['data']
General Comments 0
You need to be logged in to leave comments. Login now