Show More
@@ -135,6 +135,7 b' class commandresponse(object):' | |||||
135 | self._serviceable = threading.Event() |
|
135 | self._serviceable = threading.Event() | |
136 |
|
136 | |||
137 | self._pendingevents = [] |
|
137 | self._pendingevents = [] | |
|
138 | self._pendingerror = None | |||
138 | self._decoder = cborutil.bufferingdecoder() |
|
139 | self._decoder = cborutil.bufferingdecoder() | |
139 | self._seeninitial = False |
|
140 | self._seeninitial = False | |
140 | self._redirect = None |
|
141 | self._redirect = None | |
@@ -169,6 +170,12 b' class commandresponse(object):' | |||||
169 |
|
170 | |||
170 | self._serviceable.set() |
|
171 | self._serviceable.set() | |
171 |
|
172 | |||
|
173 | def _onerror(self, e): | |||
|
174 | self._pendingerror = e | |||
|
175 | ||||
|
176 | with self._lock: | |||
|
177 | self._serviceable.set() | |||
|
178 | ||||
172 | def _handleinitial(self, o): |
|
179 | def _handleinitial(self, o): | |
173 | self._seeninitial = True |
|
180 | self._seeninitial = True | |
174 | if o[b'status'] == b'ok': |
|
181 | if o[b'status'] == b'ok': | |
@@ -212,6 +219,9 b' class commandresponse(object):' | |||||
212 | # our state. |
|
219 | # our state. | |
213 | self._serviceable.wait(1.0) |
|
220 | self._serviceable.wait(1.0) | |
214 |
|
221 | |||
|
222 | if self._pendingerror: | |||
|
223 | raise self._pendingerror | |||
|
224 | ||||
215 | with self._lock: |
|
225 | with self._lock: | |
216 | self._serviceable.clear() |
|
226 | self._serviceable.clear() | |
217 |
|
227 | |||
@@ -342,9 +352,18 b' class clienthandler(object):' | |||||
342 | try: |
|
352 | try: | |
343 | self._processresponsedata(frame, meta, response) |
|
353 | self._processresponsedata(frame, meta, response) | |
344 | except BaseException as e: |
|
354 | except BaseException as e: | |
345 | self._futures[frame.requestid].set_exception(e) |
|
355 | # If an exception occurs before the future is resolved, | |
346 | del self._futures[frame.requestid] |
|
356 | # fail the future. Otherwise, we stuff the exception on | |
347 | response._oninputcomplete() |
|
357 | # the response object so it can be raised during objects() | |
|
358 | # iteration. If nothing is consuming objects(), we could | |||
|
359 | # silently swallow this exception. That's a risk we'll have to | |||
|
360 | # take. | |||
|
361 | if frame.requestid in self._futures: | |||
|
362 | self._futures[frame.requestid].set_exception(e) | |||
|
363 | del self._futures[frame.requestid] | |||
|
364 | response._oninputcomplete() | |||
|
365 | else: | |||
|
366 | response._onerror(e) | |||
348 | else: |
|
367 | else: | |
349 | raise error.ProgrammingError( |
|
368 | raise error.ProgrammingError( | |
350 | 'unhandled action from clientreactor: %s' % action) |
|
369 | 'unhandled action from clientreactor: %s' % action) |
General Comments 0
You need to be logged in to leave comments.
Login now