##// END OF EJS Templates
Fixed high CPU usage of XREQ channel....
Brian Granger -
Show More
@@ -3,6 +3,7 b''
3 3
4 4 # System library imports.
5 5 from PyQt4 import QtCore
6 import zmq
6 7
7 8 # IPython imports.
8 9 from IPython.zmq.kernelmanager import KernelManager, SubSocketChannel, \
@@ -10,6 +11,7 b' from IPython.zmq.kernelmanager import KernelManager, SubSocketChannel, \\'
10 11 from util import MetaQObjectHasTraits
11 12
12 13
14
13 15 class QtSubSocketChannel(SubSocketChannel, QtCore.QObject):
14 16
15 17 # Emitted when any message is received.
@@ -95,6 +97,7 b' class QtXReqSocketChannel(XReqSocketChannel, QtCore.QObject):'
95 97 """ Reimplemented to skip callback handling.
96 98 """
97 99 self.command_queue.put(msg)
100 self.add_io_state(zmq.POLLOUT)
98 101
99 102
100 103 class QtRepSocketChannel(RepSocketChannel, QtCore.QObject):
@@ -34,6 +34,12 b' class ZmqSocketChannel(Thread):'
34 34 """ The base class for the channels that use ZMQ sockets.
35 35 """
36 36
37 context = None
38 session = None
39 socket = None
40 ioloop = None
41 iostate = None
42
37 43 def __init__(self, context, session, address=None):
38 44 super(ZmqSocketChannel, self).__init__()
39 45 self.daemon = True
@@ -41,7 +47,6 b' class ZmqSocketChannel(Thread):'
41 47 self.context = context
42 48 self.session = session
43 49 self.address = address
44 self.socket = None
45 50
46 51 def stop(self):
47 52 """Stop the thread's activity. Returns when the thread terminates.
@@ -73,6 +78,28 b' class ZmqSocketChannel(Thread):'
73 78
74 79 address = property(get_address, set_adresss)
75 80
81 def add_io_state(self, state):
82 """Add IO state to the eventloop.
83
84 This is thread safe as it uses the thread safe IOLoop.add_callback.
85 """
86 def add_io_state_callback():
87 if not self.iostate & state:
88 self.iostate = self.iostate | state
89 self.ioloop.update_handler(self.socket, self.iostate)
90 self.ioloop.add_callback(add_io_state_callback)
91
92 def drop_io_state(self, state):
93 """Drop IO state from the eventloop.
94
95 This is thread safe as it uses the thread safe IOLoop.add_callback.
96 """
97 def drop_io_state_callback():
98 if self.iostate & state:
99 self.iostate = self.iostate & (~state)
100 self.ioloop.update_handler(self.socket, self.iostate)
101 self.ioloop.add_callback(drop_io_state_callback)
102
76 103
77 104 class SubSocketChannel(ZmqSocketChannel):
78 105
@@ -85,8 +112,9 b' class SubSocketChannel(ZmqSocketChannel):'
85 112 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
86 113 self.socket.connect('tcp://%s:%i' % self.address)
87 114 self.ioloop = ioloop.IOLoop()
115 self.iostate = POLLIN|POLLERR
88 116 self.ioloop.add_handler(self.socket, self._handle_events,
89 POLLIN|POLLERR)
117 self.iostate)
90 118 self.ioloop.start()
91 119
92 120 def stop(self):
@@ -171,8 +199,9 b' class XReqSocketChannel(ZmqSocketChannel):'
171 199 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
172 200 self.socket.connect('tcp://%s:%i' % self.address)
173 201 self.ioloop = ioloop.IOLoop()
202 self.iostate = POLLERR|POLLIN
174 203 self.ioloop.add_handler(self.socket, self._handle_events,
175 POLLIN|POLLOUT|POLLERR)
204 self.iostate)
176 205 self.ioloop.start()
177 206
178 207 def stop(self):
@@ -180,7 +209,6 b' class XReqSocketChannel(ZmqSocketChannel):'
180 209 super(XReqSocketChannel, self).stop()
181 210
182 211 def _handle_events(self, socket, events):
183 # Turn on and off POLLOUT depending on if we have made a request
184 212 if events & POLLERR:
185 213 self._handle_err()
186 214 if events & POLLOUT:
@@ -199,6 +227,8 b' class XReqSocketChannel(ZmqSocketChannel):'
199 227 pass
200 228 else:
201 229 self.socket.send_json(msg)
230 if self.command_queue.empty():
231 self.drop_io_state(POLLOUT)
202 232
203 233 def _handle_err(self):
204 234 # We don't want to let this go silently, so eventually we should log.
@@ -208,6 +238,7 b' class XReqSocketChannel(ZmqSocketChannel):'
208 238 handler = self._find_handler(msg['msg_type'], callback)
209 239 self.handler_queue.put(handler)
210 240 self.command_queue.put(msg)
241 self.add_io_state(POLLOUT)
211 242
212 243 def execute(self, code, callback=None):
213 244 # Create class for content/msg creation. Related to, but possibly
General Comments 0
You need to be logged in to leave comments. Login now