diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py --- a/mercurial/wireprotoframing.py +++ b/mercurial/wireprotoframing.py @@ -533,9 +533,11 @@ class serverreactor(object): """ self._deferoutput = deferoutput self._state = 'idle' + self._nextoutgoingstreamid = 2 self._bufferedframegens = [] # stream id -> stream instance for all active streams from the client. self._incomingstreams = {} + self._outgoingstreams = {} # request id -> dict of commands that are actively being received. self._receivingcommands = {} # Request IDs that have been received and are actively being processed. @@ -638,6 +640,16 @@ class serverreactor(object): application=True), } + def makeoutputstream(self): + """Create a stream to be used for sending data to the client.""" + streamid = self._nextoutgoingstreamid + self._nextoutgoingstreamid += 2 + + s = stream(streamid) + self._outgoingstreams[streamid] = s + + return s + def _makeerrorresult(self, msg): return 'error', { 'message': msg, diff --git a/mercurial/wireprotoserver.py b/mercurial/wireprotoserver.py --- a/mercurial/wireprotoserver.py +++ b/mercurial/wireprotoserver.py @@ -432,6 +432,8 @@ def _processhttpv2request(ui, repo, req, reactor = wireprotoframing.serverreactor(deferoutput=True) seencommand = False + outstream = reactor.makeoutputstream() + while True: frame = wireprotoframing.readframe(req.bodyfh) if not frame: @@ -444,8 +446,8 @@ def _processhttpv2request(ui, repo, req, continue elif action == 'runcommand': sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm, - reqcommand, reactor, meta, - issubsequent=seencommand) + reqcommand, reactor, outstream, + meta, issubsequent=seencommand) if sentoutput: return @@ -476,7 +478,7 @@ def _processhttpv2request(ui, repo, req, % action) def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor, - command, issubsequent): + outstream, command, issubsequent): """Dispatch a wire protocol command made from HTTPv2 requests. The authenticated permission (``authedperm``) along with the original @@ -546,10 +548,9 @@ def _httpv2runcommand(ui, repo, req, res res.status = b'200 OK' res.headers[b'Content-Type'] = FRAMINGTYPE - stream = wireprotoframing.stream(2) if isinstance(rsp, wireprototypes.bytesresponse): - action, meta = reactor.onbytesresponseready(stream, + action, meta = reactor.onbytesresponseready(outstream, command['requestid'], rsp.data) else: diff --git a/tests/test-http-api-httpv2.t b/tests/test-http-api-httpv2.t --- a/tests/test-http-api-httpv2.t +++ b/tests/test-http-api-httpv2.t @@ -472,7 +472,7 @@ Multiple requests to "multirequest" URL s> \x1d\x00\x00\x01\x00\x02\x01Bcustomreadonly bytes response s> \r\n s> 25\r\n - s> \x1d\x00\x00\x03\x00\x02\x01Bcustomreadonly bytes response + s> \x1d\x00\x00\x03\x00\x02\x00Bcustomreadonly bytes response s> \r\n s> 0\r\n s> \r\n @@ -511,7 +511,7 @@ Interleaved requests to "multirequest" a s> \x00\x00\x00\x03\x00\x02\x01B s> \r\n s> 26\r\n - s> \x1e\x00\x00\x01\x00\x02\x01Bbookmarks \n + s> \x1e\x00\x00\x01\x00\x02\x00Bbookmarks \n s> namespaces \n s> phases s> \r\n diff --git a/tests/test-wireproto-serverreactor.py b/tests/test-wireproto-serverreactor.py --- a/tests/test-wireproto-serverreactor.py +++ b/tests/test-wireproto-serverreactor.py @@ -375,7 +375,7 @@ class ServerReactorTests(unittest.TestCa """Multiple fully serviced commands with same request ID is allowed.""" reactor = makereactor() results = [] - outstream = framing.stream(2) + outstream = reactor.makeoutputstream() results.append(self._sendsingleframe( reactor, ffs(b'1 1 stream-begin command-name eos command'))) result = reactor.onbytesresponseready(outstream, 1, b'response1') @@ -530,7 +530,7 @@ class ServerReactorTests(unittest.TestCa instream = framing.stream(1) list(sendcommandframes(reactor, instream, 1, b'mycommand', {})) - outstream = framing.stream(2) + outstream = reactor.makeoutputstream() result = reactor.onbytesresponseready(outstream, 1, b'response') self.assertaction(result, 'sendframes') self.assertframesequal(result[1]['framegen'], [ @@ -546,7 +546,7 @@ class ServerReactorTests(unittest.TestCa instream = framing.stream(1) list(sendcommandframes(reactor, instream, 1, b'mycommand', {})) - outstream = framing.stream(2) + outstream = reactor.makeoutputstream() result = reactor.onbytesresponseready(outstream, 1, first + second) self.assertaction(result, 'sendframes') self.assertframesequal(result[1]['framegen'], [ @@ -559,7 +559,7 @@ class ServerReactorTests(unittest.TestCa instream = framing.stream(1) list(sendcommandframes(reactor, instream, 1, b'mycommand', {})) - outstream = framing.stream(2) + outstream = reactor.makeoutputstream() result = reactor.onapplicationerror(outstream, 1, b'some message') self.assertaction(result, 'sendframes') self.assertframesequal(result[1]['framegen'], [ @@ -575,7 +575,7 @@ class ServerReactorTests(unittest.TestCa self.assertEqual(len(results), 1) self.assertaction(results[0], 'runcommand') - outstream = framing.stream(2) + outstream = reactor.makeoutputstream() result = reactor.onbytesresponseready(outstream, 1, b'response') self.assertaction(result, 'noop') result = reactor.oninputeof() @@ -590,7 +590,7 @@ class ServerReactorTests(unittest.TestCa list(sendcommandframes(reactor, instream, 1, b'command1', {})) list(sendcommandframes(reactor, instream, 3, b'command2', {})) - outstream = framing.stream(2) + outstream = reactor.makeoutputstream() result = reactor.onbytesresponseready(outstream, 1, b'response1') self.assertaction(result, 'noop') result = reactor.onbytesresponseready(outstream, 3, b'response2') @@ -610,7 +610,7 @@ class ServerReactorTests(unittest.TestCa list(sendcommandframes(reactor, instream, 5, b'command3', {})) # Register results for commands out of order. - outstream = framing.stream(2) + outstream = reactor.makeoutputstream() reactor.onbytesresponseready(outstream, 3, b'response3') reactor.onbytesresponseready(outstream, 1, b'response1') reactor.onbytesresponseready(outstream, 5, b'response5') @@ -640,7 +640,7 @@ class ServerReactorTests(unittest.TestCa reactor = makereactor() instream = framing.stream(1) list(sendcommandframes(reactor, instream, 1, b'command1', {})) - outstream = framing.stream(2) + outstream = reactor.makeoutputstream() reactor.onbytesresponseready(outstream, 1, b'response') # We've registered the response but haven't sent it. From the @@ -672,7 +672,7 @@ class ServerReactorTests(unittest.TestCa reactor = makereactor() instream = framing.stream(1) list(sendcommandframes(reactor, instream, 1, b'command1', {})) - outstream = framing.stream(2) + outstream = reactor.makeoutputstream() res = reactor.onbytesresponseready(outstream, 1, b'response') list(res[1]['framegen'])