Show More
@@ -76,11 +76,7 b' class ZmqSocketChannel(Thread):' | |||||
76 |
|
76 | |||
77 | class SubSocketChannel(ZmqSocketChannel): |
|
77 | class SubSocketChannel(ZmqSocketChannel): | |
78 |
|
78 | |||
79 | handlers = None |
|
|||
80 | _overriden_call_handler = None |
|
|||
81 |
|
||||
82 | def __init__(self, context, session, address=None): |
|
79 | def __init__(self, context, session, address=None): | |
83 | self.handlers = {} |
|
|||
84 | super(SubSocketChannel, self).__init__(context, session, address) |
|
80 | super(SubSocketChannel, self).__init__(context, session, address) | |
85 |
|
81 | |||
86 | def run(self): |
|
82 | def run(self): | |
@@ -105,53 +101,22 b' class SubSocketChannel(ZmqSocketChannel):' | |||||
105 | self._handle_recv() |
|
101 | self._handle_recv() | |
106 |
|
102 | |||
107 | def _handle_err(self): |
|
103 | def _handle_err(self): | |
|
104 | # We don't want to let this go silently, so eventually we should log. | |||
108 | raise zmq.ZmqError() |
|
105 | raise zmq.ZmqError() | |
109 |
|
106 | |||
110 | def _handle_recv(self): |
|
107 | def _handle_recv(self): | |
111 | msg = self.socket.recv_json() |
|
108 | msg = self.socket.recv_json() | |
112 | self.call_handlers(msg) |
|
109 | self.call_handlers(msg) | |
113 |
|
110 | |||
114 | def override_call_handler(self, func): |
|
|||
115 | """Permanently override the call_handler. |
|
|||
116 |
|
||||
117 | The function func will be called as:: |
|
|||
118 |
|
||||
119 | func(handler, msg) |
|
|||
120 |
|
||||
121 | And must call:: |
|
|||
122 |
|
||||
123 | handler(msg) |
|
|||
124 |
|
||||
125 | in the main thread. |
|
|||
126 | """ |
|
|||
127 | assert callable(func), "not a callable: %r" % func |
|
|||
128 | self._overriden_call_handler = func |
|
|||
129 |
|
||||
130 | def call_handlers(self, msg): |
|
111 | def call_handlers(self, msg): | |
131 | handler = self.handlers.get(msg['msg_type'], None) |
|
112 | """This method is called in the ioloop thread when a message arrives. | |
132 | if handler is not None: |
|
|||
133 | try: |
|
|||
134 | self.call_handler(handler, msg) |
|
|||
135 | except: |
|
|||
136 | # XXX: This should be logged at least |
|
|||
137 | traceback.print_last() |
|
|||
138 |
|
|
113 | ||
139 | def call_handler(self, handler, msg): |
|
114 | Subclasses should override this method to handle incoming messages. | |
140 | if self._overriden_call_handler is not None: |
|
115 | It is important to remember that this method is called in the thread | |
141 | self._overriden_call_handler(handler, msg) |
|
116 | so that some logic must be done to ensure that the application leve | |
142 | elif hasattr(self, '_call_handler'): |
|
117 | handlers are called in the application thread. | |
143 | call_handler = getattr(self, '_call_handler') |
|
118 | """ | |
144 | call_handler(handler, msg) |
|
119 | raise NotImplementedError('call_handlers must be defined in a subclass.') | |
145 | else: |
|
|||
146 | raise RuntimeError('no handler!') |
|
|||
147 |
|
||||
148 | def add_handler(self, callback, msg_type): |
|
|||
149 | """Register a callback for msg type.""" |
|
|||
150 | self.handlers[msg_type] = callback |
|
|||
151 |
|
||||
152 | def remove_handler(self, msg_type): |
|
|||
153 | """Remove the callback for msg type.""" |
|
|||
154 | self.handlers.pop(msg_type, None) |
|
|||
155 |
|
120 | |||
156 | def flush(self, timeout=1.0): |
|
121 | def flush(self, timeout=1.0): | |
157 | """Immediately processes all pending messages on the SUB channel. |
|
122 | """Immediately processes all pending messages on the SUB channel. | |
@@ -228,6 +193,7 b' class XReqSocketChannel(ZmqSocketChannel):' | |||||
228 | self.socket.send_json(msg) |
|
193 | self.socket.send_json(msg) | |
229 |
|
194 | |||
230 | def _handle_err(self): |
|
195 | def _handle_err(self): | |
|
196 | # We don't want to let this go silently, so eventually we should log. | |||
231 | raise zmq.ZmqError() |
|
197 | raise zmq.ZmqError() | |
232 |
|
198 | |||
233 | def _queue_request(self, msg, callback): |
|
199 | def _queue_request(self, msg, callback): |
General Comments 0
You need to be logged in to leave comments.
Login now