Show More
@@ -533,9 +533,11 class serverreactor(object): | |||||
533 | """ |
|
533 | """ | |
534 | self._deferoutput = deferoutput |
|
534 | self._deferoutput = deferoutput | |
535 | self._state = 'idle' |
|
535 | self._state = 'idle' | |
|
536 | self._nextoutgoingstreamid = 2 | |||
536 | self._bufferedframegens = [] |
|
537 | self._bufferedframegens = [] | |
537 | # stream id -> stream instance for all active streams from the client. |
|
538 | # stream id -> stream instance for all active streams from the client. | |
538 | self._incomingstreams = {} |
|
539 | self._incomingstreams = {} | |
|
540 | self._outgoingstreams = {} | |||
539 | # request id -> dict of commands that are actively being received. |
|
541 | # request id -> dict of commands that are actively being received. | |
540 | self._receivingcommands = {} |
|
542 | self._receivingcommands = {} | |
541 | # Request IDs that have been received and are actively being processed. |
|
543 | # Request IDs that have been received and are actively being processed. | |
@@ -638,6 +640,16 class serverreactor(object): | |||||
638 | application=True), |
|
640 | application=True), | |
639 | } |
|
641 | } | |
640 |
|
642 | |||
|
643 | def makeoutputstream(self): | |||
|
644 | """Create a stream to be used for sending data to the client.""" | |||
|
645 | streamid = self._nextoutgoingstreamid | |||
|
646 | self._nextoutgoingstreamid += 2 | |||
|
647 | ||||
|
648 | s = stream(streamid) | |||
|
649 | self._outgoingstreams[streamid] = s | |||
|
650 | ||||
|
651 | return s | |||
|
652 | ||||
641 | def _makeerrorresult(self, msg): |
|
653 | def _makeerrorresult(self, msg): | |
642 | return 'error', { |
|
654 | return 'error', { | |
643 | 'message': msg, |
|
655 | 'message': msg, |
@@ -432,6 +432,8 def _processhttpv2request(ui, repo, req, | |||||
432 | reactor = wireprotoframing.serverreactor(deferoutput=True) |
|
432 | reactor = wireprotoframing.serverreactor(deferoutput=True) | |
433 | seencommand = False |
|
433 | seencommand = False | |
434 |
|
434 | |||
|
435 | outstream = reactor.makeoutputstream() | |||
|
436 | ||||
435 | while True: |
|
437 | while True: | |
436 | frame = wireprotoframing.readframe(req.bodyfh) |
|
438 | frame = wireprotoframing.readframe(req.bodyfh) | |
437 | if not frame: |
|
439 | if not frame: | |
@@ -444,8 +446,8 def _processhttpv2request(ui, repo, req, | |||||
444 | continue |
|
446 | continue | |
445 | elif action == 'runcommand': |
|
447 | elif action == 'runcommand': | |
446 | sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm, |
|
448 | sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm, | |
447 |
reqcommand, reactor, |
|
449 | reqcommand, reactor, outstream, | |
448 | issubsequent=seencommand) |
|
450 | meta, issubsequent=seencommand) | |
449 |
|
451 | |||
450 | if sentoutput: |
|
452 | if sentoutput: | |
451 | return |
|
453 | return | |
@@ -476,7 +478,7 def _processhttpv2request(ui, repo, req, | |||||
476 | % action) |
|
478 | % action) | |
477 |
|
479 | |||
478 | def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor, |
|
480 | def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor, | |
479 | command, issubsequent): |
|
481 | outstream, command, issubsequent): | |
480 | """Dispatch a wire protocol command made from HTTPv2 requests. |
|
482 | """Dispatch a wire protocol command made from HTTPv2 requests. | |
481 |
|
483 | |||
482 | The authenticated permission (``authedperm``) along with the original |
|
484 | The authenticated permission (``authedperm``) along with the original | |
@@ -546,10 +548,9 def _httpv2runcommand(ui, repo, req, res | |||||
546 |
|
548 | |||
547 | res.status = b'200 OK' |
|
549 | res.status = b'200 OK' | |
548 | res.headers[b'Content-Type'] = FRAMINGTYPE |
|
550 | res.headers[b'Content-Type'] = FRAMINGTYPE | |
549 | stream = wireprotoframing.stream(2) |
|
|||
550 |
|
551 | |||
551 | if isinstance(rsp, wireprototypes.bytesresponse): |
|
552 | if isinstance(rsp, wireprototypes.bytesresponse): | |
552 | action, meta = reactor.onbytesresponseready(stream, |
|
553 | action, meta = reactor.onbytesresponseready(outstream, | |
553 | command['requestid'], |
|
554 | command['requestid'], | |
554 | rsp.data) |
|
555 | rsp.data) | |
555 | else: |
|
556 | else: |
@@ -472,7 +472,7 Multiple requests to "multirequest" URL | |||||
472 | s> \x1d\x00\x00\x01\x00\x02\x01Bcustomreadonly bytes response |
|
472 | s> \x1d\x00\x00\x01\x00\x02\x01Bcustomreadonly bytes response | |
473 | s> \r\n |
|
473 | s> \r\n | |
474 | s> 25\r\n |
|
474 | s> 25\r\n | |
475 |
s> \x1d\x00\x00\x03\x00\x02\x0 |
|
475 | s> \x1d\x00\x00\x03\x00\x02\x00Bcustomreadonly bytes response | |
476 | s> \r\n |
|
476 | s> \r\n | |
477 | s> 0\r\n |
|
477 | s> 0\r\n | |
478 | s> \r\n |
|
478 | s> \r\n | |
@@ -511,7 +511,7 Interleaved requests to "multirequest" a | |||||
511 | s> \x00\x00\x00\x03\x00\x02\x01B |
|
511 | s> \x00\x00\x00\x03\x00\x02\x01B | |
512 | s> \r\n |
|
512 | s> \r\n | |
513 | s> 26\r\n |
|
513 | s> 26\r\n | |
514 |
s> \x1e\x00\x00\x01\x00\x02\x0 |
|
514 | s> \x1e\x00\x00\x01\x00\x02\x00Bbookmarks \n | |
515 | s> namespaces \n |
|
515 | s> namespaces \n | |
516 | s> phases |
|
516 | s> phases | |
517 | s> \r\n |
|
517 | s> \r\n |
@@ -375,7 +375,7 class ServerReactorTests(unittest.TestCa | |||||
375 | """Multiple fully serviced commands with same request ID is allowed.""" |
|
375 | """Multiple fully serviced commands with same request ID is allowed.""" | |
376 | reactor = makereactor() |
|
376 | reactor = makereactor() | |
377 | results = [] |
|
377 | results = [] | |
378 |
outstream = |
|
378 | outstream = reactor.makeoutputstream() | |
379 | results.append(self._sendsingleframe( |
|
379 | results.append(self._sendsingleframe( | |
380 | reactor, ffs(b'1 1 stream-begin command-name eos command'))) |
|
380 | reactor, ffs(b'1 1 stream-begin command-name eos command'))) | |
381 | result = reactor.onbytesresponseready(outstream, 1, b'response1') |
|
381 | result = reactor.onbytesresponseready(outstream, 1, b'response1') | |
@@ -530,7 +530,7 class ServerReactorTests(unittest.TestCa | |||||
530 | instream = framing.stream(1) |
|
530 | instream = framing.stream(1) | |
531 | list(sendcommandframes(reactor, instream, 1, b'mycommand', {})) |
|
531 | list(sendcommandframes(reactor, instream, 1, b'mycommand', {})) | |
532 |
|
532 | |||
533 |
outstream = |
|
533 | outstream = reactor.makeoutputstream() | |
534 | result = reactor.onbytesresponseready(outstream, 1, b'response') |
|
534 | result = reactor.onbytesresponseready(outstream, 1, b'response') | |
535 | self.assertaction(result, 'sendframes') |
|
535 | self.assertaction(result, 'sendframes') | |
536 | self.assertframesequal(result[1]['framegen'], [ |
|
536 | self.assertframesequal(result[1]['framegen'], [ | |
@@ -546,7 +546,7 class ServerReactorTests(unittest.TestCa | |||||
546 | instream = framing.stream(1) |
|
546 | instream = framing.stream(1) | |
547 | list(sendcommandframes(reactor, instream, 1, b'mycommand', {})) |
|
547 | list(sendcommandframes(reactor, instream, 1, b'mycommand', {})) | |
548 |
|
548 | |||
549 |
outstream = |
|
549 | outstream = reactor.makeoutputstream() | |
550 | result = reactor.onbytesresponseready(outstream, 1, first + second) |
|
550 | result = reactor.onbytesresponseready(outstream, 1, first + second) | |
551 | self.assertaction(result, 'sendframes') |
|
551 | self.assertaction(result, 'sendframes') | |
552 | self.assertframesequal(result[1]['framegen'], [ |
|
552 | self.assertframesequal(result[1]['framegen'], [ | |
@@ -559,7 +559,7 class ServerReactorTests(unittest.TestCa | |||||
559 | instream = framing.stream(1) |
|
559 | instream = framing.stream(1) | |
560 | list(sendcommandframes(reactor, instream, 1, b'mycommand', {})) |
|
560 | list(sendcommandframes(reactor, instream, 1, b'mycommand', {})) | |
561 |
|
561 | |||
562 |
outstream = |
|
562 | outstream = reactor.makeoutputstream() | |
563 | result = reactor.onapplicationerror(outstream, 1, b'some message') |
|
563 | result = reactor.onapplicationerror(outstream, 1, b'some message') | |
564 | self.assertaction(result, 'sendframes') |
|
564 | self.assertaction(result, 'sendframes') | |
565 | self.assertframesequal(result[1]['framegen'], [ |
|
565 | self.assertframesequal(result[1]['framegen'], [ | |
@@ -575,7 +575,7 class ServerReactorTests(unittest.TestCa | |||||
575 | self.assertEqual(len(results), 1) |
|
575 | self.assertEqual(len(results), 1) | |
576 | self.assertaction(results[0], 'runcommand') |
|
576 | self.assertaction(results[0], 'runcommand') | |
577 |
|
577 | |||
578 |
outstream = |
|
578 | outstream = reactor.makeoutputstream() | |
579 | result = reactor.onbytesresponseready(outstream, 1, b'response') |
|
579 | result = reactor.onbytesresponseready(outstream, 1, b'response') | |
580 | self.assertaction(result, 'noop') |
|
580 | self.assertaction(result, 'noop') | |
581 | result = reactor.oninputeof() |
|
581 | result = reactor.oninputeof() | |
@@ -590,7 +590,7 class ServerReactorTests(unittest.TestCa | |||||
590 | list(sendcommandframes(reactor, instream, 1, b'command1', {})) |
|
590 | list(sendcommandframes(reactor, instream, 1, b'command1', {})) | |
591 | list(sendcommandframes(reactor, instream, 3, b'command2', {})) |
|
591 | list(sendcommandframes(reactor, instream, 3, b'command2', {})) | |
592 |
|
592 | |||
593 |
outstream = |
|
593 | outstream = reactor.makeoutputstream() | |
594 | result = reactor.onbytesresponseready(outstream, 1, b'response1') |
|
594 | result = reactor.onbytesresponseready(outstream, 1, b'response1') | |
595 | self.assertaction(result, 'noop') |
|
595 | self.assertaction(result, 'noop') | |
596 | result = reactor.onbytesresponseready(outstream, 3, b'response2') |
|
596 | result = reactor.onbytesresponseready(outstream, 3, b'response2') | |
@@ -610,7 +610,7 class ServerReactorTests(unittest.TestCa | |||||
610 | list(sendcommandframes(reactor, instream, 5, b'command3', {})) |
|
610 | list(sendcommandframes(reactor, instream, 5, b'command3', {})) | |
611 |
|
611 | |||
612 | # Register results for commands out of order. |
|
612 | # Register results for commands out of order. | |
613 |
outstream = |
|
613 | outstream = reactor.makeoutputstream() | |
614 | reactor.onbytesresponseready(outstream, 3, b'response3') |
|
614 | reactor.onbytesresponseready(outstream, 3, b'response3') | |
615 | reactor.onbytesresponseready(outstream, 1, b'response1') |
|
615 | reactor.onbytesresponseready(outstream, 1, b'response1') | |
616 | reactor.onbytesresponseready(outstream, 5, b'response5') |
|
616 | reactor.onbytesresponseready(outstream, 5, b'response5') | |
@@ -640,7 +640,7 class ServerReactorTests(unittest.TestCa | |||||
640 | reactor = makereactor() |
|
640 | reactor = makereactor() | |
641 | instream = framing.stream(1) |
|
641 | instream = framing.stream(1) | |
642 | list(sendcommandframes(reactor, instream, 1, b'command1', {})) |
|
642 | list(sendcommandframes(reactor, instream, 1, b'command1', {})) | |
643 |
outstream = |
|
643 | outstream = reactor.makeoutputstream() | |
644 | reactor.onbytesresponseready(outstream, 1, b'response') |
|
644 | reactor.onbytesresponseready(outstream, 1, b'response') | |
645 |
|
645 | |||
646 | # We've registered the response but haven't sent it. From the |
|
646 | # We've registered the response but haven't sent it. From the | |
@@ -672,7 +672,7 class ServerReactorTests(unittest.TestCa | |||||
672 | reactor = makereactor() |
|
672 | reactor = makereactor() | |
673 | instream = framing.stream(1) |
|
673 | instream = framing.stream(1) | |
674 | list(sendcommandframes(reactor, instream, 1, b'command1', {})) |
|
674 | list(sendcommandframes(reactor, instream, 1, b'command1', {})) | |
675 |
outstream = |
|
675 | outstream = reactor.makeoutputstream() | |
676 | res = reactor.onbytesresponseready(outstream, 1, b'response') |
|
676 | res = reactor.onbytesresponseready(outstream, 1, b'response') | |
677 | list(res[1]['framegen']) |
|
677 | list(res[1]['framegen']) | |
678 |
|
678 |
General Comments 0
You need to be logged in to leave comments.
Login now