##// END OF EJS Templates
wireproto: explicit API to create outgoing streams...
Gregory Szorc -
r37305:5fadc63a default
parent child Browse files
Show More
@@ -533,9 +533,11 b' class serverreactor(object):'
533 533 """
534 534 self._deferoutput = deferoutput
535 535 self._state = 'idle'
536 self._nextoutgoingstreamid = 2
536 537 self._bufferedframegens = []
537 538 # stream id -> stream instance for all active streams from the client.
538 539 self._incomingstreams = {}
540 self._outgoingstreams = {}
539 541 # request id -> dict of commands that are actively being received.
540 542 self._receivingcommands = {}
541 543 # Request IDs that have been received and are actively being processed.
@@ -638,6 +640,16 b' class serverreactor(object):'
638 640 application=True),
639 641 }
640 642
643 def makeoutputstream(self):
644 """Create a stream to be used for sending data to the client."""
645 streamid = self._nextoutgoingstreamid
646 self._nextoutgoingstreamid += 2
647
648 s = stream(streamid)
649 self._outgoingstreams[streamid] = s
650
651 return s
652
641 653 def _makeerrorresult(self, msg):
642 654 return 'error', {
643 655 'message': msg,
@@ -432,6 +432,8 b' def _processhttpv2request(ui, repo, req,'
432 432 reactor = wireprotoframing.serverreactor(deferoutput=True)
433 433 seencommand = False
434 434
435 outstream = reactor.makeoutputstream()
436
435 437 while True:
436 438 frame = wireprotoframing.readframe(req.bodyfh)
437 439 if not frame:
@@ -444,8 +446,8 b' def _processhttpv2request(ui, repo, req,'
444 446 continue
445 447 elif action == 'runcommand':
446 448 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
447 reqcommand, reactor, meta,
448 issubsequent=seencommand)
449 reqcommand, reactor, outstream,
450 meta, issubsequent=seencommand)
449 451
450 452 if sentoutput:
451 453 return
@@ -476,7 +478,7 b' def _processhttpv2request(ui, repo, req,'
476 478 % action)
477 479
478 480 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
479 command, issubsequent):
481 outstream, command, issubsequent):
480 482 """Dispatch a wire protocol command made from HTTPv2 requests.
481 483
482 484 The authenticated permission (``authedperm``) along with the original
@@ -546,10 +548,9 b' def _httpv2runcommand(ui, repo, req, res'
546 548
547 549 res.status = b'200 OK'
548 550 res.headers[b'Content-Type'] = FRAMINGTYPE
549 stream = wireprotoframing.stream(2)
550 551
551 552 if isinstance(rsp, wireprototypes.bytesresponse):
552 action, meta = reactor.onbytesresponseready(stream,
553 action, meta = reactor.onbytesresponseready(outstream,
553 554 command['requestid'],
554 555 rsp.data)
555 556 else:
@@ -472,7 +472,7 b' Multiple requests to "multirequest" URL '
472 472 s> \x1d\x00\x00\x01\x00\x02\x01Bcustomreadonly bytes response
473 473 s> \r\n
474 474 s> 25\r\n
475 s> \x1d\x00\x00\x03\x00\x02\x01Bcustomreadonly bytes response
475 s> \x1d\x00\x00\x03\x00\x02\x00Bcustomreadonly bytes response
476 476 s> \r\n
477 477 s> 0\r\n
478 478 s> \r\n
@@ -511,7 +511,7 b' Interleaved requests to "multirequest" a'
511 511 s> \x00\x00\x00\x03\x00\x02\x01B
512 512 s> \r\n
513 513 s> 26\r\n
514 s> \x1e\x00\x00\x01\x00\x02\x01Bbookmarks \n
514 s> \x1e\x00\x00\x01\x00\x02\x00Bbookmarks \n
515 515 s> namespaces \n
516 516 s> phases
517 517 s> \r\n
@@ -375,7 +375,7 b' class ServerReactorTests(unittest.TestCa'
375 375 """Multiple fully serviced commands with same request ID is allowed."""
376 376 reactor = makereactor()
377 377 results = []
378 outstream = framing.stream(2)
378 outstream = reactor.makeoutputstream()
379 379 results.append(self._sendsingleframe(
380 380 reactor, ffs(b'1 1 stream-begin command-name eos command')))
381 381 result = reactor.onbytesresponseready(outstream, 1, b'response1')
@@ -530,7 +530,7 b' class ServerReactorTests(unittest.TestCa'
530 530 instream = framing.stream(1)
531 531 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
532 532
533 outstream = framing.stream(2)
533 outstream = reactor.makeoutputstream()
534 534 result = reactor.onbytesresponseready(outstream, 1, b'response')
535 535 self.assertaction(result, 'sendframes')
536 536 self.assertframesequal(result[1]['framegen'], [
@@ -546,7 +546,7 b' class ServerReactorTests(unittest.TestCa'
546 546 instream = framing.stream(1)
547 547 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
548 548
549 outstream = framing.stream(2)
549 outstream = reactor.makeoutputstream()
550 550 result = reactor.onbytesresponseready(outstream, 1, first + second)
551 551 self.assertaction(result, 'sendframes')
552 552 self.assertframesequal(result[1]['framegen'], [
@@ -559,7 +559,7 b' class ServerReactorTests(unittest.TestCa'
559 559 instream = framing.stream(1)
560 560 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
561 561
562 outstream = framing.stream(2)
562 outstream = reactor.makeoutputstream()
563 563 result = reactor.onapplicationerror(outstream, 1, b'some message')
564 564 self.assertaction(result, 'sendframes')
565 565 self.assertframesequal(result[1]['framegen'], [
@@ -575,7 +575,7 b' class ServerReactorTests(unittest.TestCa'
575 575 self.assertEqual(len(results), 1)
576 576 self.assertaction(results[0], 'runcommand')
577 577
578 outstream = framing.stream(2)
578 outstream = reactor.makeoutputstream()
579 579 result = reactor.onbytesresponseready(outstream, 1, b'response')
580 580 self.assertaction(result, 'noop')
581 581 result = reactor.oninputeof()
@@ -590,7 +590,7 b' class ServerReactorTests(unittest.TestCa'
590 590 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
591 591 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
592 592
593 outstream = framing.stream(2)
593 outstream = reactor.makeoutputstream()
594 594 result = reactor.onbytesresponseready(outstream, 1, b'response1')
595 595 self.assertaction(result, 'noop')
596 596 result = reactor.onbytesresponseready(outstream, 3, b'response2')
@@ -610,7 +610,7 b' class ServerReactorTests(unittest.TestCa'
610 610 list(sendcommandframes(reactor, instream, 5, b'command3', {}))
611 611
612 612 # Register results for commands out of order.
613 outstream = framing.stream(2)
613 outstream = reactor.makeoutputstream()
614 614 reactor.onbytesresponseready(outstream, 3, b'response3')
615 615 reactor.onbytesresponseready(outstream, 1, b'response1')
616 616 reactor.onbytesresponseready(outstream, 5, b'response5')
@@ -640,7 +640,7 b' class ServerReactorTests(unittest.TestCa'
640 640 reactor = makereactor()
641 641 instream = framing.stream(1)
642 642 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
643 outstream = framing.stream(2)
643 outstream = reactor.makeoutputstream()
644 644 reactor.onbytesresponseready(outstream, 1, b'response')
645 645
646 646 # We've registered the response but haven't sent it. From the
@@ -672,7 +672,7 b' class ServerReactorTests(unittest.TestCa'
672 672 reactor = makereactor()
673 673 instream = framing.stream(1)
674 674 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
675 outstream = framing.stream(2)
675 outstream = reactor.makeoutputstream()
676 676 res = reactor.onbytesresponseready(outstream, 1, b'response')
677 677 list(res[1]['framegen'])
678 678
General Comments 0
You need to be logged in to leave comments. Login now