##// END OF EJS Templates
General cleanup of kernelmanager.py....
Brian Granger -
Show More
@@ -76,11 +76,7 b' class ZmqSocketChannel(Thread):'
76
76
77 class SubSocketChannel(ZmqSocketChannel):
77 class SubSocketChannel(ZmqSocketChannel):
78
78
79 handlers = None
80 _overriden_call_handler = None
81
82 def __init__(self, context, session, address=None):
79 def __init__(self, context, session, address=None):
83 self.handlers = {}
84 super(SubSocketChannel, self).__init__(context, session, address)
80 super(SubSocketChannel, self).__init__(context, session, address)
85
81
86 def run(self):
82 def run(self):
@@ -105,53 +101,22 b' class SubSocketChannel(ZmqSocketChannel):'
105 self._handle_recv()
101 self._handle_recv()
106
102
107 def _handle_err(self):
103 def _handle_err(self):
104 # We don't want to let this go silently, so eventually we should log.
108 raise zmq.ZmqError()
105 raise zmq.ZmqError()
109
106
110 def _handle_recv(self):
107 def _handle_recv(self):
111 msg = self.socket.recv_json()
108 msg = self.socket.recv_json()
112 self.call_handlers(msg)
109 self.call_handlers(msg)
113
110
114 def override_call_handler(self, func):
115 """Permanently override the call_handler.
116
117 The function func will be called as::
118
119 func(handler, msg)
120
121 And must call::
122
123 handler(msg)
124
125 in the main thread.
126 """
127 assert callable(func), "not a callable: %r" % func
128 self._overriden_call_handler = func
129
130 def call_handlers(self, msg):
111 def call_handlers(self, msg):
131 handler = self.handlers.get(msg['msg_type'], None)
112 """This method is called in the ioloop thread when a message arrives.
132 if handler is not None:
133 try:
134 self.call_handler(handler, msg)
135 except:
136 # XXX: This should be logged at least
137 traceback.print_last()
138
113
139 def call_handler(self, handler, msg):
114 Subclasses should override this method to handle incoming messages.
140 if self._overriden_call_handler is not None:
115 It is important to remember that this method is called in the thread
141 self._overriden_call_handler(handler, msg)
116 so that some logic must be done to ensure that the application leve
142 elif hasattr(self, '_call_handler'):
117 handlers are called in the application thread.
143 call_handler = getattr(self, '_call_handler')
118 """
144 call_handler(handler, msg)
119 raise NotImplementedError('call_handlers must be defined in a subclass.')
145 else:
146 raise RuntimeError('no handler!')
147
148 def add_handler(self, callback, msg_type):
149 """Register a callback for msg type."""
150 self.handlers[msg_type] = callback
151
152 def remove_handler(self, msg_type):
153 """Remove the callback for msg type."""
154 self.handlers.pop(msg_type, None)
155
120
156 def flush(self, timeout=1.0):
121 def flush(self, timeout=1.0):
157 """Immediately processes all pending messages on the SUB channel.
122 """Immediately processes all pending messages on the SUB channel.
@@ -228,6 +193,7 b' class XReqSocketChannel(ZmqSocketChannel):'
228 self.socket.send_json(msg)
193 self.socket.send_json(msg)
229
194
230 def _handle_err(self):
195 def _handle_err(self):
196 # We don't want to let this go silently, so eventually we should log.
231 raise zmq.ZmqError()
197 raise zmq.ZmqError()
232
198
233 def _queue_request(self, msg, callback):
199 def _queue_request(self, msg, callback):
General Comments 0
You need to be logged in to leave comments. Login now