Show More
@@ -3,6 +3,7 b'' | |||||
3 |
|
3 | |||
4 | # System library imports. |
|
4 | # System library imports. | |
5 | from PyQt4 import QtCore |
|
5 | from PyQt4 import QtCore | |
|
6 | import zmq | |||
6 |
|
7 | |||
7 | # IPython imports. |
|
8 | # IPython imports. | |
8 | from IPython.zmq.kernelmanager import KernelManager, SubSocketChannel, \ |
|
9 | from IPython.zmq.kernelmanager import KernelManager, SubSocketChannel, \ | |
@@ -10,6 +11,7 b' from IPython.zmq.kernelmanager import KernelManager, SubSocketChannel, \\' | |||||
10 | from util import MetaQObjectHasTraits |
|
11 | from util import MetaQObjectHasTraits | |
11 |
|
12 | |||
12 |
|
13 | |||
|
14 | ||||
13 | class QtSubSocketChannel(SubSocketChannel, QtCore.QObject): |
|
15 | class QtSubSocketChannel(SubSocketChannel, QtCore.QObject): | |
14 |
|
16 | |||
15 | # Emitted when any message is received. |
|
17 | # Emitted when any message is received. | |
@@ -95,6 +97,7 b' class QtXReqSocketChannel(XReqSocketChannel, QtCore.QObject):' | |||||
95 | """ Reimplemented to skip callback handling. |
|
97 | """ Reimplemented to skip callback handling. | |
96 | """ |
|
98 | """ | |
97 | self.command_queue.put(msg) |
|
99 | self.command_queue.put(msg) | |
|
100 | self.add_io_state(zmq.POLLOUT) | |||
98 |
|
101 | |||
99 |
|
102 | |||
100 | class QtRepSocketChannel(RepSocketChannel, QtCore.QObject): |
|
103 | class QtRepSocketChannel(RepSocketChannel, QtCore.QObject): |
@@ -34,6 +34,12 b' class ZmqSocketChannel(Thread):' | |||||
34 | """ The base class for the channels that use ZMQ sockets. |
|
34 | """ The base class for the channels that use ZMQ sockets. | |
35 | """ |
|
35 | """ | |
36 |
|
36 | |||
|
37 | context = None | |||
|
38 | session = None | |||
|
39 | socket = None | |||
|
40 | ioloop = None | |||
|
41 | iostate = None | |||
|
42 | ||||
37 | def __init__(self, context, session, address=None): |
|
43 | def __init__(self, context, session, address=None): | |
38 | super(ZmqSocketChannel, self).__init__() |
|
44 | super(ZmqSocketChannel, self).__init__() | |
39 | self.daemon = True |
|
45 | self.daemon = True | |
@@ -41,7 +47,6 b' class ZmqSocketChannel(Thread):' | |||||
41 | self.context = context |
|
47 | self.context = context | |
42 | self.session = session |
|
48 | self.session = session | |
43 | self.address = address |
|
49 | self.address = address | |
44 | self.socket = None |
|
|||
45 |
|
50 | |||
46 | def stop(self): |
|
51 | def stop(self): | |
47 | """Stop the thread's activity. Returns when the thread terminates. |
|
52 | """Stop the thread's activity. Returns when the thread terminates. | |
@@ -73,6 +78,28 b' class ZmqSocketChannel(Thread):' | |||||
73 |
|
78 | |||
74 | address = property(get_address, set_adresss) |
|
79 | address = property(get_address, set_adresss) | |
75 |
|
80 | |||
|
81 | def add_io_state(self, state): | |||
|
82 | """Add IO state to the eventloop. | |||
|
83 | ||||
|
84 | This is thread safe as it uses the thread safe IOLoop.add_callback. | |||
|
85 | """ | |||
|
86 | def add_io_state_callback(): | |||
|
87 | if not self.iostate & state: | |||
|
88 | self.iostate = self.iostate | state | |||
|
89 | self.ioloop.update_handler(self.socket, self.iostate) | |||
|
90 | self.ioloop.add_callback(add_io_state_callback) | |||
|
91 | ||||
|
92 | def drop_io_state(self, state): | |||
|
93 | """Drop IO state from the eventloop. | |||
|
94 | ||||
|
95 | This is thread safe as it uses the thread safe IOLoop.add_callback. | |||
|
96 | """ | |||
|
97 | def drop_io_state_callback(): | |||
|
98 | if self.iostate & state: | |||
|
99 | self.iostate = self.iostate & (~state) | |||
|
100 | self.ioloop.update_handler(self.socket, self.iostate) | |||
|
101 | self.ioloop.add_callback(drop_io_state_callback) | |||
|
102 | ||||
76 |
|
103 | |||
77 | class SubSocketChannel(ZmqSocketChannel): |
|
104 | class SubSocketChannel(ZmqSocketChannel): | |
78 |
|
105 | |||
@@ -85,8 +112,9 b' class SubSocketChannel(ZmqSocketChannel):' | |||||
85 | self.socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
112 | self.socket.setsockopt(zmq.IDENTITY, self.session.session) | |
86 | self.socket.connect('tcp://%s:%i' % self.address) |
|
113 | self.socket.connect('tcp://%s:%i' % self.address) | |
87 | self.ioloop = ioloop.IOLoop() |
|
114 | self.ioloop = ioloop.IOLoop() | |
|
115 | self.iostate = POLLIN|POLLERR | |||
88 | self.ioloop.add_handler(self.socket, self._handle_events, |
|
116 | self.ioloop.add_handler(self.socket, self._handle_events, | |
89 |
|
|
117 | self.iostate) | |
90 | self.ioloop.start() |
|
118 | self.ioloop.start() | |
91 |
|
119 | |||
92 | def stop(self): |
|
120 | def stop(self): | |
@@ -171,8 +199,9 b' class XReqSocketChannel(ZmqSocketChannel):' | |||||
171 | self.socket.setsockopt(zmq.IDENTITY, self.session.session) |
|
199 | self.socket.setsockopt(zmq.IDENTITY, self.session.session) | |
172 | self.socket.connect('tcp://%s:%i' % self.address) |
|
200 | self.socket.connect('tcp://%s:%i' % self.address) | |
173 | self.ioloop = ioloop.IOLoop() |
|
201 | self.ioloop = ioloop.IOLoop() | |
|
202 | self.iostate = POLLERR|POLLIN | |||
174 | self.ioloop.add_handler(self.socket, self._handle_events, |
|
203 | self.ioloop.add_handler(self.socket, self._handle_events, | |
175 |
|
|
204 | self.iostate) | |
176 | self.ioloop.start() |
|
205 | self.ioloop.start() | |
177 |
|
206 | |||
178 | def stop(self): |
|
207 | def stop(self): | |
@@ -180,7 +209,6 b' class XReqSocketChannel(ZmqSocketChannel):' | |||||
180 | super(XReqSocketChannel, self).stop() |
|
209 | super(XReqSocketChannel, self).stop() | |
181 |
|
210 | |||
182 | def _handle_events(self, socket, events): |
|
211 | def _handle_events(self, socket, events): | |
183 | # Turn on and off POLLOUT depending on if we have made a request |
|
|||
184 | if events & POLLERR: |
|
212 | if events & POLLERR: | |
185 | self._handle_err() |
|
213 | self._handle_err() | |
186 | if events & POLLOUT: |
|
214 | if events & POLLOUT: | |
@@ -199,6 +227,8 b' class XReqSocketChannel(ZmqSocketChannel):' | |||||
199 | pass |
|
227 | pass | |
200 | else: |
|
228 | else: | |
201 | self.socket.send_json(msg) |
|
229 | self.socket.send_json(msg) | |
|
230 | if self.command_queue.empty(): | |||
|
231 | self.drop_io_state(POLLOUT) | |||
202 |
|
232 | |||
203 | def _handle_err(self): |
|
233 | def _handle_err(self): | |
204 | # We don't want to let this go silently, so eventually we should log. |
|
234 | # We don't want to let this go silently, so eventually we should log. | |
@@ -208,6 +238,7 b' class XReqSocketChannel(ZmqSocketChannel):' | |||||
208 | handler = self._find_handler(msg['msg_type'], callback) |
|
238 | handler = self._find_handler(msg['msg_type'], callback) | |
209 | self.handler_queue.put(handler) |
|
239 | self.handler_queue.put(handler) | |
210 | self.command_queue.put(msg) |
|
240 | self.command_queue.put(msg) | |
|
241 | self.add_io_state(POLLOUT) | |||
211 |
|
242 | |||
212 | def execute(self, code, callback=None): |
|
243 | def execute(self, code, callback=None): | |
213 | # Create class for content/msg creation. Related to, but possibly |
|
244 | # Create class for content/msg creation. Related to, but possibly |
General Comments 0
You need to be logged in to leave comments.
Login now