##// END OF EJS Templates
Fixed high CPU usage of XREQ channel....
Brian Granger -
Show More
@@ -3,6 +3,7 b''
3
3
4 # System library imports.
4 # System library imports.
5 from PyQt4 import QtCore
5 from PyQt4 import QtCore
6 import zmq
6
7
7 # IPython imports.
8 # IPython imports.
8 from IPython.zmq.kernelmanager import KernelManager, SubSocketChannel, \
9 from IPython.zmq.kernelmanager import KernelManager, SubSocketChannel, \
@@ -10,6 +11,7 b' from IPython.zmq.kernelmanager import KernelManager, SubSocketChannel, \\'
10 from util import MetaQObjectHasTraits
11 from util import MetaQObjectHasTraits
11
12
12
13
14
13 class QtSubSocketChannel(SubSocketChannel, QtCore.QObject):
15 class QtSubSocketChannel(SubSocketChannel, QtCore.QObject):
14
16
15 # Emitted when any message is received.
17 # Emitted when any message is received.
@@ -95,6 +97,7 b' class QtXReqSocketChannel(XReqSocketChannel, QtCore.QObject):'
95 """ Reimplemented to skip callback handling.
97 """ Reimplemented to skip callback handling.
96 """
98 """
97 self.command_queue.put(msg)
99 self.command_queue.put(msg)
100 self.add_io_state(zmq.POLLOUT)
98
101
99
102
100 class QtRepSocketChannel(RepSocketChannel, QtCore.QObject):
103 class QtRepSocketChannel(RepSocketChannel, QtCore.QObject):
@@ -34,6 +34,12 b' class ZmqSocketChannel(Thread):'
34 """ The base class for the channels that use ZMQ sockets.
34 """ The base class for the channels that use ZMQ sockets.
35 """
35 """
36
36
37 context = None
38 session = None
39 socket = None
40 ioloop = None
41 iostate = None
42
37 def __init__(self, context, session, address=None):
43 def __init__(self, context, session, address=None):
38 super(ZmqSocketChannel, self).__init__()
44 super(ZmqSocketChannel, self).__init__()
39 self.daemon = True
45 self.daemon = True
@@ -41,7 +47,6 b' class ZmqSocketChannel(Thread):'
41 self.context = context
47 self.context = context
42 self.session = session
48 self.session = session
43 self.address = address
49 self.address = address
44 self.socket = None
45
50
46 def stop(self):
51 def stop(self):
47 """Stop the thread's activity. Returns when the thread terminates.
52 """Stop the thread's activity. Returns when the thread terminates.
@@ -73,6 +78,28 b' class ZmqSocketChannel(Thread):'
73
78
74 address = property(get_address, set_adresss)
79 address = property(get_address, set_adresss)
75
80
81 def add_io_state(self, state):
82 """Add IO state to the eventloop.
83
84 This is thread safe as it uses the thread safe IOLoop.add_callback.
85 """
86 def add_io_state_callback():
87 if not self.iostate & state:
88 self.iostate = self.iostate | state
89 self.ioloop.update_handler(self.socket, self.iostate)
90 self.ioloop.add_callback(add_io_state_callback)
91
92 def drop_io_state(self, state):
93 """Drop IO state from the eventloop.
94
95 This is thread safe as it uses the thread safe IOLoop.add_callback.
96 """
97 def drop_io_state_callback():
98 if self.iostate & state:
99 self.iostate = self.iostate & (~state)
100 self.ioloop.update_handler(self.socket, self.iostate)
101 self.ioloop.add_callback(drop_io_state_callback)
102
76
103
77 class SubSocketChannel(ZmqSocketChannel):
104 class SubSocketChannel(ZmqSocketChannel):
78
105
@@ -85,8 +112,9 b' class SubSocketChannel(ZmqSocketChannel):'
85 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
112 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
86 self.socket.connect('tcp://%s:%i' % self.address)
113 self.socket.connect('tcp://%s:%i' % self.address)
87 self.ioloop = ioloop.IOLoop()
114 self.ioloop = ioloop.IOLoop()
115 self.iostate = POLLIN|POLLERR
88 self.ioloop.add_handler(self.socket, self._handle_events,
116 self.ioloop.add_handler(self.socket, self._handle_events,
89 POLLIN|POLLERR)
117 self.iostate)
90 self.ioloop.start()
118 self.ioloop.start()
91
119
92 def stop(self):
120 def stop(self):
@@ -171,8 +199,9 b' class XReqSocketChannel(ZmqSocketChannel):'
171 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
199 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
172 self.socket.connect('tcp://%s:%i' % self.address)
200 self.socket.connect('tcp://%s:%i' % self.address)
173 self.ioloop = ioloop.IOLoop()
201 self.ioloop = ioloop.IOLoop()
202 self.iostate = POLLERR|POLLIN
174 self.ioloop.add_handler(self.socket, self._handle_events,
203 self.ioloop.add_handler(self.socket, self._handle_events,
175 POLLIN|POLLOUT|POLLERR)
204 self.iostate)
176 self.ioloop.start()
205 self.ioloop.start()
177
206
178 def stop(self):
207 def stop(self):
@@ -180,7 +209,6 b' class XReqSocketChannel(ZmqSocketChannel):'
180 super(XReqSocketChannel, self).stop()
209 super(XReqSocketChannel, self).stop()
181
210
182 def _handle_events(self, socket, events):
211 def _handle_events(self, socket, events):
183 # Turn on and off POLLOUT depending on if we have made a request
184 if events & POLLERR:
212 if events & POLLERR:
185 self._handle_err()
213 self._handle_err()
186 if events & POLLOUT:
214 if events & POLLOUT:
@@ -199,6 +227,8 b' class XReqSocketChannel(ZmqSocketChannel):'
199 pass
227 pass
200 else:
228 else:
201 self.socket.send_json(msg)
229 self.socket.send_json(msg)
230 if self.command_queue.empty():
231 self.drop_io_state(POLLOUT)
202
232
203 def _handle_err(self):
233 def _handle_err(self):
204 # We don't want to let this go silently, so eventually we should log.
234 # We don't want to let this go silently, so eventually we should log.
@@ -208,6 +238,7 b' class XReqSocketChannel(ZmqSocketChannel):'
208 handler = self._find_handler(msg['msg_type'], callback)
238 handler = self._find_handler(msg['msg_type'], callback)
209 self.handler_queue.put(handler)
239 self.handler_queue.put(handler)
210 self.command_queue.put(msg)
240 self.command_queue.put(msg)
241 self.add_io_state(POLLOUT)
211
242
212 def execute(self, code, callback=None):
243 def execute(self, code, callback=None):
213 # Create class for content/msg creation. Related to, but possibly
244 # Create class for content/msg creation. Related to, but possibly
General Comments 0
You need to be logged in to leave comments. Login now