##// END OF EJS Templates
wireproto: explicit API to create outgoing streams...
Gregory Szorc -
r37305:5fadc63a default
parent child Browse files
Show More
@@ -533,9 +533,11 class serverreactor(object):
533 """
533 """
534 self._deferoutput = deferoutput
534 self._deferoutput = deferoutput
535 self._state = 'idle'
535 self._state = 'idle'
536 self._nextoutgoingstreamid = 2
536 self._bufferedframegens = []
537 self._bufferedframegens = []
537 # stream id -> stream instance for all active streams from the client.
538 # stream id -> stream instance for all active streams from the client.
538 self._incomingstreams = {}
539 self._incomingstreams = {}
540 self._outgoingstreams = {}
539 # request id -> dict of commands that are actively being received.
541 # request id -> dict of commands that are actively being received.
540 self._receivingcommands = {}
542 self._receivingcommands = {}
541 # Request IDs that have been received and are actively being processed.
543 # Request IDs that have been received and are actively being processed.
@@ -638,6 +640,16 class serverreactor(object):
638 application=True),
640 application=True),
639 }
641 }
640
642
643 def makeoutputstream(self):
644 """Create a stream to be used for sending data to the client."""
645 streamid = self._nextoutgoingstreamid
646 self._nextoutgoingstreamid += 2
647
648 s = stream(streamid)
649 self._outgoingstreams[streamid] = s
650
651 return s
652
641 def _makeerrorresult(self, msg):
653 def _makeerrorresult(self, msg):
642 return 'error', {
654 return 'error', {
643 'message': msg,
655 'message': msg,
@@ -432,6 +432,8 def _processhttpv2request(ui, repo, req,
432 reactor = wireprotoframing.serverreactor(deferoutput=True)
432 reactor = wireprotoframing.serverreactor(deferoutput=True)
433 seencommand = False
433 seencommand = False
434
434
435 outstream = reactor.makeoutputstream()
436
435 while True:
437 while True:
436 frame = wireprotoframing.readframe(req.bodyfh)
438 frame = wireprotoframing.readframe(req.bodyfh)
437 if not frame:
439 if not frame:
@@ -444,8 +446,8 def _processhttpv2request(ui, repo, req,
444 continue
446 continue
445 elif action == 'runcommand':
447 elif action == 'runcommand':
446 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
448 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
447 reqcommand, reactor, meta,
449 reqcommand, reactor, outstream,
448 issubsequent=seencommand)
450 meta, issubsequent=seencommand)
449
451
450 if sentoutput:
452 if sentoutput:
451 return
453 return
@@ -476,7 +478,7 def _processhttpv2request(ui, repo, req,
476 % action)
478 % action)
477
479
478 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
480 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
479 command, issubsequent):
481 outstream, command, issubsequent):
480 """Dispatch a wire protocol command made from HTTPv2 requests.
482 """Dispatch a wire protocol command made from HTTPv2 requests.
481
483
482 The authenticated permission (``authedperm``) along with the original
484 The authenticated permission (``authedperm``) along with the original
@@ -546,10 +548,9 def _httpv2runcommand(ui, repo, req, res
546
548
547 res.status = b'200 OK'
549 res.status = b'200 OK'
548 res.headers[b'Content-Type'] = FRAMINGTYPE
550 res.headers[b'Content-Type'] = FRAMINGTYPE
549 stream = wireprotoframing.stream(2)
550
551
551 if isinstance(rsp, wireprototypes.bytesresponse):
552 if isinstance(rsp, wireprototypes.bytesresponse):
552 action, meta = reactor.onbytesresponseready(stream,
553 action, meta = reactor.onbytesresponseready(outstream,
553 command['requestid'],
554 command['requestid'],
554 rsp.data)
555 rsp.data)
555 else:
556 else:
@@ -472,7 +472,7 Multiple requests to "multirequest" URL
472 s> \x1d\x00\x00\x01\x00\x02\x01Bcustomreadonly bytes response
472 s> \x1d\x00\x00\x01\x00\x02\x01Bcustomreadonly bytes response
473 s> \r\n
473 s> \r\n
474 s> 25\r\n
474 s> 25\r\n
475 s> \x1d\x00\x00\x03\x00\x02\x01Bcustomreadonly bytes response
475 s> \x1d\x00\x00\x03\x00\x02\x00Bcustomreadonly bytes response
476 s> \r\n
476 s> \r\n
477 s> 0\r\n
477 s> 0\r\n
478 s> \r\n
478 s> \r\n
@@ -511,7 +511,7 Interleaved requests to "multirequest" a
511 s> \x00\x00\x00\x03\x00\x02\x01B
511 s> \x00\x00\x00\x03\x00\x02\x01B
512 s> \r\n
512 s> \r\n
513 s> 26\r\n
513 s> 26\r\n
514 s> \x1e\x00\x00\x01\x00\x02\x01Bbookmarks \n
514 s> \x1e\x00\x00\x01\x00\x02\x00Bbookmarks \n
515 s> namespaces \n
515 s> namespaces \n
516 s> phases
516 s> phases
517 s> \r\n
517 s> \r\n
@@ -375,7 +375,7 class ServerReactorTests(unittest.TestCa
375 """Multiple fully serviced commands with same request ID is allowed."""
375 """Multiple fully serviced commands with same request ID is allowed."""
376 reactor = makereactor()
376 reactor = makereactor()
377 results = []
377 results = []
378 outstream = framing.stream(2)
378 outstream = reactor.makeoutputstream()
379 results.append(self._sendsingleframe(
379 results.append(self._sendsingleframe(
380 reactor, ffs(b'1 1 stream-begin command-name eos command')))
380 reactor, ffs(b'1 1 stream-begin command-name eos command')))
381 result = reactor.onbytesresponseready(outstream, 1, b'response1')
381 result = reactor.onbytesresponseready(outstream, 1, b'response1')
@@ -530,7 +530,7 class ServerReactorTests(unittest.TestCa
530 instream = framing.stream(1)
530 instream = framing.stream(1)
531 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
531 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
532
532
533 outstream = framing.stream(2)
533 outstream = reactor.makeoutputstream()
534 result = reactor.onbytesresponseready(outstream, 1, b'response')
534 result = reactor.onbytesresponseready(outstream, 1, b'response')
535 self.assertaction(result, 'sendframes')
535 self.assertaction(result, 'sendframes')
536 self.assertframesequal(result[1]['framegen'], [
536 self.assertframesequal(result[1]['framegen'], [
@@ -546,7 +546,7 class ServerReactorTests(unittest.TestCa
546 instream = framing.stream(1)
546 instream = framing.stream(1)
547 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
547 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
548
548
549 outstream = framing.stream(2)
549 outstream = reactor.makeoutputstream()
550 result = reactor.onbytesresponseready(outstream, 1, first + second)
550 result = reactor.onbytesresponseready(outstream, 1, first + second)
551 self.assertaction(result, 'sendframes')
551 self.assertaction(result, 'sendframes')
552 self.assertframesequal(result[1]['framegen'], [
552 self.assertframesequal(result[1]['framegen'], [
@@ -559,7 +559,7 class ServerReactorTests(unittest.TestCa
559 instream = framing.stream(1)
559 instream = framing.stream(1)
560 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
560 list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
561
561
562 outstream = framing.stream(2)
562 outstream = reactor.makeoutputstream()
563 result = reactor.onapplicationerror(outstream, 1, b'some message')
563 result = reactor.onapplicationerror(outstream, 1, b'some message')
564 self.assertaction(result, 'sendframes')
564 self.assertaction(result, 'sendframes')
565 self.assertframesequal(result[1]['framegen'], [
565 self.assertframesequal(result[1]['framegen'], [
@@ -575,7 +575,7 class ServerReactorTests(unittest.TestCa
575 self.assertEqual(len(results), 1)
575 self.assertEqual(len(results), 1)
576 self.assertaction(results[0], 'runcommand')
576 self.assertaction(results[0], 'runcommand')
577
577
578 outstream = framing.stream(2)
578 outstream = reactor.makeoutputstream()
579 result = reactor.onbytesresponseready(outstream, 1, b'response')
579 result = reactor.onbytesresponseready(outstream, 1, b'response')
580 self.assertaction(result, 'noop')
580 self.assertaction(result, 'noop')
581 result = reactor.oninputeof()
581 result = reactor.oninputeof()
@@ -590,7 +590,7 class ServerReactorTests(unittest.TestCa
590 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
590 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
591 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
591 list(sendcommandframes(reactor, instream, 3, b'command2', {}))
592
592
593 outstream = framing.stream(2)
593 outstream = reactor.makeoutputstream()
594 result = reactor.onbytesresponseready(outstream, 1, b'response1')
594 result = reactor.onbytesresponseready(outstream, 1, b'response1')
595 self.assertaction(result, 'noop')
595 self.assertaction(result, 'noop')
596 result = reactor.onbytesresponseready(outstream, 3, b'response2')
596 result = reactor.onbytesresponseready(outstream, 3, b'response2')
@@ -610,7 +610,7 class ServerReactorTests(unittest.TestCa
610 list(sendcommandframes(reactor, instream, 5, b'command3', {}))
610 list(sendcommandframes(reactor, instream, 5, b'command3', {}))
611
611
612 # Register results for commands out of order.
612 # Register results for commands out of order.
613 outstream = framing.stream(2)
613 outstream = reactor.makeoutputstream()
614 reactor.onbytesresponseready(outstream, 3, b'response3')
614 reactor.onbytesresponseready(outstream, 3, b'response3')
615 reactor.onbytesresponseready(outstream, 1, b'response1')
615 reactor.onbytesresponseready(outstream, 1, b'response1')
616 reactor.onbytesresponseready(outstream, 5, b'response5')
616 reactor.onbytesresponseready(outstream, 5, b'response5')
@@ -640,7 +640,7 class ServerReactorTests(unittest.TestCa
640 reactor = makereactor()
640 reactor = makereactor()
641 instream = framing.stream(1)
641 instream = framing.stream(1)
642 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
642 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
643 outstream = framing.stream(2)
643 outstream = reactor.makeoutputstream()
644 reactor.onbytesresponseready(outstream, 1, b'response')
644 reactor.onbytesresponseready(outstream, 1, b'response')
645
645
646 # We've registered the response but haven't sent it. From the
646 # We've registered the response but haven't sent it. From the
@@ -672,7 +672,7 class ServerReactorTests(unittest.TestCa
672 reactor = makereactor()
672 reactor = makereactor()
673 instream = framing.stream(1)
673 instream = framing.stream(1)
674 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
674 list(sendcommandframes(reactor, instream, 1, b'command1', {}))
675 outstream = framing.stream(2)
675 outstream = reactor.makeoutputstream()
676 res = reactor.onbytesresponseready(outstream, 1, b'response')
676 res = reactor.onbytesresponseready(outstream, 1, b'response')
677 list(res[1]['framegen'])
677 list(res[1]['framegen'])
678
678
General Comments 0
You need to be logged in to leave comments. Login now