##// END OF EJS Templates
Add missing msg queue + comment out debug printing in BlockingKernelManager.
epatters -
Show More
@@ -1,114 +1,121 b''
1 1 """Implement a fully blocking kernel manager.
2 2
3 3 Useful for test suites and blocking terminal interfaces.
4 4 """
5 5 #-----------------------------------------------------------------------------
6 6 # Copyright (C) 2010 The IPython Development Team
7 7 #
8 8 # Distributed under the terms of the BSD License. The full license is in
9 9 # the file COPYING.txt, distributed as part of this software.
10 10 #-----------------------------------------------------------------------------
11 11
12 12 #-----------------------------------------------------------------------------
13 13 # Imports
14 14 #-----------------------------------------------------------------------------
15 15 from __future__ import print_function
16 16
17 17 # Stdlib
18 18 from Queue import Queue, Empty
19 19
20 20 # Our own
21 21 from IPython.utils import io
22 22 from IPython.utils.traitlets import Type
23 23
24 24 from .kernelmanager import (KernelManager, SubSocketChannel,
25 25 XReqSocketChannel, RepSocketChannel, HBSocketChannel)
26 26
27 27 #-----------------------------------------------------------------------------
28 28 # Functions and classes
29 29 #-----------------------------------------------------------------------------
30 30
31 31 class BlockingSubSocketChannel(SubSocketChannel):
32 32
33 33 def __init__(self, context, session, address=None):
34 super(BlockingSubSocketChannel, self).__init__(context, session, address)
34 super(BlockingSubSocketChannel, self).__init__(context, session,
35 address)
35 36 self._in_queue = Queue()
36 37
37 38 def call_handlers(self, msg):
38 io.rprint('[[Sub]]', msg) # dbg
39 #io.rprint('[[Sub]]', msg) # dbg
39 40 self._in_queue.put(msg)
40 41
41 42 def msg_ready(self):
42 43 """Is there a message that has been received?"""
43 44 if self._in_queue.qsize() == 0:
44 45 return False
45 46 else:
46 47 return True
47 48
48 49 def get_msg(self, block=True, timeout=None):
49 50 """Get a message if there is one that is ready."""
50 51 return self._in_queue.get(block, timeout)
51 52
52 53 def get_msgs(self):
53 54 """Get all messages that are currently ready."""
54 55 msgs = []
55 56 while True:
56 57 try:
57 58 msgs.append(self.get_msg(block=False))
58 59 except Empty:
59 60 break
60 61 return msgs
61 62
62
63 63
64 64 class BlockingXReqSocketChannel(XReqSocketChannel):
65 65
66 66 def __init__(self, context, session, address=None):
67 super(BlockingXReqSocketChannel, self).__init__(context, session, address)
67 super(BlockingXReqSocketChannel, self).__init__(context, session,
68 address)
68 69 self._in_queue = Queue()
69 70
70 71 def call_handlers(self, msg):
71 io.rprint('[[XReq]]', msg) # dbg
72 #io.rprint('[[XReq]]', msg) # dbg
73 self._in_queue.put(msg)
72 74
73 75 def msg_ready(self):
74 76 """Is there a message that has been received?"""
75 77 if self._in_queue.qsize() == 0:
76 78 return False
77 79 else:
78 80 return True
79 81
80 82 def get_msg(self, block=True, timeout=None):
81 83 """Get a message if there is one that is ready."""
82 84 return self._in_queue.get(block, timeout)
83 85
84 86 def get_msgs(self):
85 87 """Get all messages that are currently ready."""
86 88 msgs = []
87 89 while True:
88 90 try:
89 91 msgs.append(self.get_msg(block=False))
90 92 except Empty:
91 93 break
92 94 return msgs
95
93 96
94 97 class BlockingRepSocketChannel(RepSocketChannel):
98
95 99 def call_handlers(self, msg):
96 io.rprint('[[Rep]]', msg) # dbg
100 #io.rprint('[[Rep]]', msg) # dbg
101 pass
97 102
98 103
99 104 class BlockingHBSocketChannel(HBSocketChannel):
105
100 106 # This kernel needs rapid monitoring capabilities
101 107 time_to_dead = 0.2
102 108
103 109 def call_handlers(self, since_last_heartbeat):
104 io.rprint('[[Heart]]', since_last_heartbeat) # dbg
105
110 #io.rprint('[[Heart]]', since_last_heartbeat) # dbg
111 pass
112
106 113
107 114 class BlockingKernelManager(KernelManager):
108 115
109 116 # The classes to use for the various channels.
110 117 xreq_channel_class = Type(BlockingXReqSocketChannel)
111 118 sub_channel_class = Type(BlockingSubSocketChannel)
112 119 rep_channel_class = Type(BlockingRepSocketChannel)
113 120 hb_channel_class = Type(BlockingHBSocketChannel)
114 121
General Comments 0
You need to be logged in to leave comments. Login now