Show More
@@ -386,6 +386,56 b' def createcommandresponseframesfrombytes' | |||
|
386 | 386 | if done: |
|
387 | 387 | break |
|
388 | 388 | |
|
389 | def createbytesresponseframesfromgen(stream, requestid, gen, | |
|
390 | maxframesize=DEFAULT_MAX_FRAME_SIZE): | |
|
391 | overall = cbor.dumps({b'status': b'ok'}, canonical=True) | |
|
392 | ||
|
393 | yield stream.makeframe(requestid=requestid, | |
|
394 | typeid=FRAME_TYPE_COMMAND_RESPONSE, | |
|
395 | flags=FLAG_COMMAND_RESPONSE_CONTINUATION, | |
|
396 | payload=overall) | |
|
397 | ||
|
398 | cb = util.chunkbuffer(gen) | |
|
399 | ||
|
400 | flags = 0 | |
|
401 | ||
|
402 | while True: | |
|
403 | chunk = cb.read(maxframesize) | |
|
404 | if not chunk: | |
|
405 | break | |
|
406 | ||
|
407 | yield stream.makeframe(requestid=requestid, | |
|
408 | typeid=FRAME_TYPE_COMMAND_RESPONSE, | |
|
409 | flags=flags, | |
|
410 | payload=chunk) | |
|
411 | ||
|
412 | flags |= FLAG_COMMAND_RESPONSE_CONTINUATION | |
|
413 | ||
|
414 | flags ^= FLAG_COMMAND_RESPONSE_CONTINUATION | |
|
415 | flags |= FLAG_COMMAND_RESPONSE_EOS | |
|
416 | yield stream.makeframe(requestid=requestid, | |
|
417 | typeid=FRAME_TYPE_COMMAND_RESPONSE, | |
|
418 | flags=flags, | |
|
419 | payload=b'') | |
|
420 | ||
|
421 | def createcommanderrorresponse(stream, requestid, message, args=None): | |
|
422 | m = { | |
|
423 | b'status': b'error', | |
|
424 | b'error': { | |
|
425 | b'message': message, | |
|
426 | } | |
|
427 | } | |
|
428 | ||
|
429 | if args: | |
|
430 | m[b'error'][b'args'] = args | |
|
431 | ||
|
432 | overall = cbor.dumps(m, canonical=True) | |
|
433 | ||
|
434 | yield stream.makeframe(requestid=requestid, | |
|
435 | typeid=FRAME_TYPE_COMMAND_RESPONSE, | |
|
436 | flags=FLAG_COMMAND_RESPONSE_EOS, | |
|
437 | payload=overall) | |
|
438 | ||
|
389 | 439 | def createerrorframe(stream, requestid, msg, errtype): |
|
390 | 440 | # TODO properly handle frame size limits. |
|
391 | 441 | assert len(msg) <= DEFAULT_MAX_FRAME_SIZE |
@@ -634,6 +684,19 b' class serverreactor(object):' | |||
|
634 | 684 | 'framegen': result, |
|
635 | 685 | } |
|
636 | 686 | |
|
687 | def oncommandresponsereadygen(self, stream, requestid, gen): | |
|
688 | """Signal that a bytes response is ready, with data as a generator.""" | |
|
689 | ensureserverstream(stream) | |
|
690 | ||
|
691 | def sendframes(): | |
|
692 | for frame in createbytesresponseframesfromgen(stream, requestid, | |
|
693 | gen): | |
|
694 | yield frame | |
|
695 | ||
|
696 | self._activecommands.remove(requestid) | |
|
697 | ||
|
698 | return self._handlesendframes(sendframes()) | |
|
699 | ||
|
637 | 700 | def oninputeof(self): |
|
638 | 701 | """Signals that end of input has been received. |
|
639 | 702 | |
@@ -655,13 +718,39 b' class serverreactor(object):' | |||
|
655 | 718 | 'framegen': makegen(), |
|
656 | 719 | } |
|
657 | 720 | |
|
721 | def _handlesendframes(self, framegen): | |
|
722 | if self._deferoutput: | |
|
723 | self._bufferedframegens.append(framegen) | |
|
724 | return 'noop', {} | |
|
725 | else: | |
|
726 | return 'sendframes', { | |
|
727 | 'framegen': framegen, | |
|
728 | } | |
|
729 | ||
|
658 | 730 | def onservererror(self, stream, requestid, msg): |
|
659 | 731 | ensureserverstream(stream) |
|
660 | 732 | |
|
661 |
|
|
|
662 |
|
|
|
663 |
errtype='server') |
|
|
664 | } | |
|
733 | def sendframes(): | |
|
734 | for frame in createerrorframe(stream, requestid, msg, | |
|
735 | errtype='server'): | |
|
736 | yield frame | |
|
737 | ||
|
738 | self._activecommands.remove(requestid) | |
|
739 | ||
|
740 | return self._handlesendframes(sendframes()) | |
|
741 | ||
|
742 | def oncommanderror(self, stream, requestid, message, args=None): | |
|
743 | """Called when a command encountered an error before sending output.""" | |
|
744 | ensureserverstream(stream) | |
|
745 | ||
|
746 | def sendframes(): | |
|
747 | for frame in createcommanderrorresponse(stream, requestid, message, | |
|
748 | args): | |
|
749 | yield frame | |
|
750 | ||
|
751 | self._activecommands.remove(requestid) | |
|
752 | ||
|
753 | return self._handlesendframes(sendframes()) | |
|
665 | 754 | |
|
666 | 755 | def makeoutputstream(self): |
|
667 | 756 | """Create a stream to be used for sending data to the client.""" |
@@ -106,6 +106,22 b' class cborresponse(object):' | |||
|
106 | 106 | def __init__(self, v): |
|
107 | 107 | self.value = v |
|
108 | 108 | |
|
109 | class v2errorresponse(object): | |
|
110 | """Represents a command error for version 2 transports.""" | |
|
111 | def __init__(self, message, args=None): | |
|
112 | self.message = message | |
|
113 | self.args = args | |
|
114 | ||
|
115 | class v2streamingresponse(object): | |
|
116 | """A response whose data is supplied by a generator. | |
|
117 | ||
|
118 | The generator can either consist of data structures to CBOR | |
|
119 | encode or a stream of already-encoded bytes. | |
|
120 | """ | |
|
121 | def __init__(self, gen, compressible=True): | |
|
122 | self.gen = gen | |
|
123 | self.compressible = compressible | |
|
124 | ||
|
109 | 125 | # list of nodes encoding / decoding |
|
110 | 126 | def decodelist(l, sep=' '): |
|
111 | 127 | if l: |
@@ -306,6 +306,15 b' def _httpv2runcommand(ui, repo, req, res' | |||
|
306 | 306 | action, meta = reactor.oncommandresponseready(outstream, |
|
307 | 307 | command['requestid'], |
|
308 | 308 | encoded) |
|
309 | elif isinstance(rsp, wireprototypes.v2streamingresponse): | |
|
310 | action, meta = reactor.oncommandresponsereadygen(outstream, | |
|
311 | command['requestid'], | |
|
312 | rsp.gen) | |
|
313 | elif isinstance(rsp, wireprototypes.v2errorresponse): | |
|
314 | action, meta = reactor.oncommanderror(outstream, | |
|
315 | command['requestid'], | |
|
316 | rsp.message, | |
|
317 | rsp.args) | |
|
309 | 318 | else: |
|
310 | 319 | action, meta = reactor.onservererror( |
|
311 | 320 | _('unhandled response type from wire proto command')) |
General Comments 0
You need to be logged in to leave comments.
Login now