diff --git a/mercurial/wireprotov2peer.py b/mercurial/wireprotov2peer.py --- a/mercurial/wireprotov2peer.py +++ b/mercurial/wireprotov2peer.py @@ -135,6 +135,7 @@ class commandresponse(object): self._serviceable = threading.Event() self._pendingevents = [] + self._pendingerror = None self._decoder = cborutil.bufferingdecoder() self._seeninitial = False self._redirect = None @@ -169,6 +170,12 @@ class commandresponse(object): self._serviceable.set() + def _onerror(self, e): + self._pendingerror = e + + with self._lock: + self._serviceable.set() + def _handleinitial(self, o): self._seeninitial = True if o[b'status'] == b'ok': @@ -212,6 +219,9 @@ class commandresponse(object): # our state. self._serviceable.wait(1.0) + if self._pendingerror: + raise self._pendingerror + with self._lock: self._serviceable.clear() @@ -342,9 +352,18 @@ class clienthandler(object): try: self._processresponsedata(frame, meta, response) except BaseException as e: - self._futures[frame.requestid].set_exception(e) - del self._futures[frame.requestid] - response._oninputcomplete() + # If an exception occurs before the future is resolved, + # fail the future. Otherwise, we stuff the exception on + # the response object so it can be raised during objects() + # iteration. If nothing is consuming objects(), we could + # silently swallow this exception. That's a risk we'll have to + # take. + if frame.requestid in self._futures: + self._futures[frame.requestid].set_exception(e) + del self._futures[frame.requestid] + response._oninputcomplete() + else: + response._onerror(e) else: raise error.ProgrammingError( 'unhandled action from clientreactor: %s' % action)