##// END OF EJS Templates
Add missing msg queue + comment out debug printing in BlockingKernelManager.
epatters -
Show More
@@ -1,114 +1,121 b''
1 """Implement a fully blocking kernel manager.
1 """Implement a fully blocking kernel manager.
2
2
3 Useful for test suites and blocking terminal interfaces.
3 Useful for test suites and blocking terminal interfaces.
4 """
4 """
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2010 The IPython Development Team
6 # Copyright (C) 2010 The IPython Development Team
7 #
7 #
8 # Distributed under the terms of the BSD License. The full license is in
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING.txt, distributed as part of this software.
9 # the file COPYING.txt, distributed as part of this software.
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11
11
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Imports
13 # Imports
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 from __future__ import print_function
15 from __future__ import print_function
16
16
17 # Stdlib
17 # Stdlib
18 from Queue import Queue, Empty
18 from Queue import Queue, Empty
19
19
20 # Our own
20 # Our own
21 from IPython.utils import io
21 from IPython.utils import io
22 from IPython.utils.traitlets import Type
22 from IPython.utils.traitlets import Type
23
23
24 from .kernelmanager import (KernelManager, SubSocketChannel,
24 from .kernelmanager import (KernelManager, SubSocketChannel,
25 XReqSocketChannel, RepSocketChannel, HBSocketChannel)
25 XReqSocketChannel, RepSocketChannel, HBSocketChannel)
26
26
27 #-----------------------------------------------------------------------------
27 #-----------------------------------------------------------------------------
28 # Functions and classes
28 # Functions and classes
29 #-----------------------------------------------------------------------------
29 #-----------------------------------------------------------------------------
30
30
31 class BlockingSubSocketChannel(SubSocketChannel):
31 class BlockingSubSocketChannel(SubSocketChannel):
32
32
33 def __init__(self, context, session, address=None):
33 def __init__(self, context, session, address=None):
34 super(BlockingSubSocketChannel, self).__init__(context, session, address)
34 super(BlockingSubSocketChannel, self).__init__(context, session,
35 address)
35 self._in_queue = Queue()
36 self._in_queue = Queue()
36
37
37 def call_handlers(self, msg):
38 def call_handlers(self, msg):
38 io.rprint('[[Sub]]', msg) # dbg
39 #io.rprint('[[Sub]]', msg) # dbg
39 self._in_queue.put(msg)
40 self._in_queue.put(msg)
40
41
41 def msg_ready(self):
42 def msg_ready(self):
42 """Is there a message that has been received?"""
43 """Is there a message that has been received?"""
43 if self._in_queue.qsize() == 0:
44 if self._in_queue.qsize() == 0:
44 return False
45 return False
45 else:
46 else:
46 return True
47 return True
47
48
48 def get_msg(self, block=True, timeout=None):
49 def get_msg(self, block=True, timeout=None):
49 """Get a message if there is one that is ready."""
50 """Get a message if there is one that is ready."""
50 return self._in_queue.get(block, timeout)
51 return self._in_queue.get(block, timeout)
51
52
52 def get_msgs(self):
53 def get_msgs(self):
53 """Get all messages that are currently ready."""
54 """Get all messages that are currently ready."""
54 msgs = []
55 msgs = []
55 while True:
56 while True:
56 try:
57 try:
57 msgs.append(self.get_msg(block=False))
58 msgs.append(self.get_msg(block=False))
58 except Empty:
59 except Empty:
59 break
60 break
60 return msgs
61 return msgs
61
62
62
63
63
64 class BlockingXReqSocketChannel(XReqSocketChannel):
64 class BlockingXReqSocketChannel(XReqSocketChannel):
65
65
66 def __init__(self, context, session, address=None):
66 def __init__(self, context, session, address=None):
67 super(BlockingXReqSocketChannel, self).__init__(context, session, address)
67 super(BlockingXReqSocketChannel, self).__init__(context, session,
68 address)
68 self._in_queue = Queue()
69 self._in_queue = Queue()
69
70
70 def call_handlers(self, msg):
71 def call_handlers(self, msg):
71 io.rprint('[[XReq]]', msg) # dbg
72 #io.rprint('[[XReq]]', msg) # dbg
73 self._in_queue.put(msg)
72
74
73 def msg_ready(self):
75 def msg_ready(self):
74 """Is there a message that has been received?"""
76 """Is there a message that has been received?"""
75 if self._in_queue.qsize() == 0:
77 if self._in_queue.qsize() == 0:
76 return False
78 return False
77 else:
79 else:
78 return True
80 return True
79
81
80 def get_msg(self, block=True, timeout=None):
82 def get_msg(self, block=True, timeout=None):
81 """Get a message if there is one that is ready."""
83 """Get a message if there is one that is ready."""
82 return self._in_queue.get(block, timeout)
84 return self._in_queue.get(block, timeout)
83
85
84 def get_msgs(self):
86 def get_msgs(self):
85 """Get all messages that are currently ready."""
87 """Get all messages that are currently ready."""
86 msgs = []
88 msgs = []
87 while True:
89 while True:
88 try:
90 try:
89 msgs.append(self.get_msg(block=False))
91 msgs.append(self.get_msg(block=False))
90 except Empty:
92 except Empty:
91 break
93 break
92 return msgs
94 return msgs
95
93
96
94 class BlockingRepSocketChannel(RepSocketChannel):
97 class BlockingRepSocketChannel(RepSocketChannel):
98
95 def call_handlers(self, msg):
99 def call_handlers(self, msg):
96 io.rprint('[[Rep]]', msg) # dbg
100 #io.rprint('[[Rep]]', msg) # dbg
101 pass
97
102
98
103
99 class BlockingHBSocketChannel(HBSocketChannel):
104 class BlockingHBSocketChannel(HBSocketChannel):
105
100 # This kernel needs rapid monitoring capabilities
106 # This kernel needs rapid monitoring capabilities
101 time_to_dead = 0.2
107 time_to_dead = 0.2
102
108
103 def call_handlers(self, since_last_heartbeat):
109 def call_handlers(self, since_last_heartbeat):
104 io.rprint('[[Heart]]', since_last_heartbeat) # dbg
110 #io.rprint('[[Heart]]', since_last_heartbeat) # dbg
105
111 pass
112
106
113
107 class BlockingKernelManager(KernelManager):
114 class BlockingKernelManager(KernelManager):
108
115
109 # The classes to use for the various channels.
116 # The classes to use for the various channels.
110 xreq_channel_class = Type(BlockingXReqSocketChannel)
117 xreq_channel_class = Type(BlockingXReqSocketChannel)
111 sub_channel_class = Type(BlockingSubSocketChannel)
118 sub_channel_class = Type(BlockingSubSocketChannel)
112 rep_channel_class = Type(BlockingRepSocketChannel)
119 rep_channel_class = Type(BlockingRepSocketChannel)
113 hb_channel_class = Type(BlockingHBSocketChannel)
120 hb_channel_class = Type(BlockingHBSocketChannel)
114
121
General Comments 0
You need to be logged in to leave comments. Login now