Show More
@@ -386,6 +386,56 def createcommandresponseframesfrombytes | |||||
386 | if done: |
|
386 | if done: | |
387 | break |
|
387 | break | |
388 |
|
388 | |||
|
389 | def createbytesresponseframesfromgen(stream, requestid, gen, | |||
|
390 | maxframesize=DEFAULT_MAX_FRAME_SIZE): | |||
|
391 | overall = cbor.dumps({b'status': b'ok'}, canonical=True) | |||
|
392 | ||||
|
393 | yield stream.makeframe(requestid=requestid, | |||
|
394 | typeid=FRAME_TYPE_COMMAND_RESPONSE, | |||
|
395 | flags=FLAG_COMMAND_RESPONSE_CONTINUATION, | |||
|
396 | payload=overall) | |||
|
397 | ||||
|
398 | cb = util.chunkbuffer(gen) | |||
|
399 | ||||
|
400 | flags = 0 | |||
|
401 | ||||
|
402 | while True: | |||
|
403 | chunk = cb.read(maxframesize) | |||
|
404 | if not chunk: | |||
|
405 | break | |||
|
406 | ||||
|
407 | yield stream.makeframe(requestid=requestid, | |||
|
408 | typeid=FRAME_TYPE_COMMAND_RESPONSE, | |||
|
409 | flags=flags, | |||
|
410 | payload=chunk) | |||
|
411 | ||||
|
412 | flags |= FLAG_COMMAND_RESPONSE_CONTINUATION | |||
|
413 | ||||
|
414 | flags ^= FLAG_COMMAND_RESPONSE_CONTINUATION | |||
|
415 | flags |= FLAG_COMMAND_RESPONSE_EOS | |||
|
416 | yield stream.makeframe(requestid=requestid, | |||
|
417 | typeid=FRAME_TYPE_COMMAND_RESPONSE, | |||
|
418 | flags=flags, | |||
|
419 | payload=b'') | |||
|
420 | ||||
|
421 | def createcommanderrorresponse(stream, requestid, message, args=None): | |||
|
422 | m = { | |||
|
423 | b'status': b'error', | |||
|
424 | b'error': { | |||
|
425 | b'message': message, | |||
|
426 | } | |||
|
427 | } | |||
|
428 | ||||
|
429 | if args: | |||
|
430 | m[b'error'][b'args'] = args | |||
|
431 | ||||
|
432 | overall = cbor.dumps(m, canonical=True) | |||
|
433 | ||||
|
434 | yield stream.makeframe(requestid=requestid, | |||
|
435 | typeid=FRAME_TYPE_COMMAND_RESPONSE, | |||
|
436 | flags=FLAG_COMMAND_RESPONSE_EOS, | |||
|
437 | payload=overall) | |||
|
438 | ||||
389 | def createerrorframe(stream, requestid, msg, errtype): |
|
439 | def createerrorframe(stream, requestid, msg, errtype): | |
390 | # TODO properly handle frame size limits. |
|
440 | # TODO properly handle frame size limits. | |
391 | assert len(msg) <= DEFAULT_MAX_FRAME_SIZE |
|
441 | assert len(msg) <= DEFAULT_MAX_FRAME_SIZE | |
@@ -634,6 +684,19 class serverreactor(object): | |||||
634 | 'framegen': result, |
|
684 | 'framegen': result, | |
635 | } |
|
685 | } | |
636 |
|
686 | |||
|
687 | def oncommandresponsereadygen(self, stream, requestid, gen): | |||
|
688 | """Signal that a bytes response is ready, with data as a generator.""" | |||
|
689 | ensureserverstream(stream) | |||
|
690 | ||||
|
691 | def sendframes(): | |||
|
692 | for frame in createbytesresponseframesfromgen(stream, requestid, | |||
|
693 | gen): | |||
|
694 | yield frame | |||
|
695 | ||||
|
696 | self._activecommands.remove(requestid) | |||
|
697 | ||||
|
698 | return self._handlesendframes(sendframes()) | |||
|
699 | ||||
637 | def oninputeof(self): |
|
700 | def oninputeof(self): | |
638 | """Signals that end of input has been received. |
|
701 | """Signals that end of input has been received. | |
639 |
|
702 | |||
@@ -655,13 +718,39 class serverreactor(object): | |||||
655 | 'framegen': makegen(), |
|
718 | 'framegen': makegen(), | |
656 | } |
|
719 | } | |
657 |
|
720 | |||
|
721 | def _handlesendframes(self, framegen): | |||
|
722 | if self._deferoutput: | |||
|
723 | self._bufferedframegens.append(framegen) | |||
|
724 | return 'noop', {} | |||
|
725 | else: | |||
|
726 | return 'sendframes', { | |||
|
727 | 'framegen': framegen, | |||
|
728 | } | |||
|
729 | ||||
658 | def onservererror(self, stream, requestid, msg): |
|
730 | def onservererror(self, stream, requestid, msg): | |
659 | ensureserverstream(stream) |
|
731 | ensureserverstream(stream) | |
660 |
|
732 | |||
661 |
|
|
733 | def sendframes(): | |
662 |
|
|
734 | for frame in createerrorframe(stream, requestid, msg, | |
663 |
errtype='server') |
|
735 | errtype='server'): | |
664 | } |
|
736 | yield frame | |
|
737 | ||||
|
738 | self._activecommands.remove(requestid) | |||
|
739 | ||||
|
740 | return self._handlesendframes(sendframes()) | |||
|
741 | ||||
|
742 | def oncommanderror(self, stream, requestid, message, args=None): | |||
|
743 | """Called when a command encountered an error before sending output.""" | |||
|
744 | ensureserverstream(stream) | |||
|
745 | ||||
|
746 | def sendframes(): | |||
|
747 | for frame in createcommanderrorresponse(stream, requestid, message, | |||
|
748 | args): | |||
|
749 | yield frame | |||
|
750 | ||||
|
751 | self._activecommands.remove(requestid) | |||
|
752 | ||||
|
753 | return self._handlesendframes(sendframes()) | |||
665 |
|
754 | |||
666 | def makeoutputstream(self): |
|
755 | def makeoutputstream(self): | |
667 | """Create a stream to be used for sending data to the client.""" |
|
756 | """Create a stream to be used for sending data to the client.""" |
@@ -106,6 +106,22 class cborresponse(object): | |||||
106 | def __init__(self, v): |
|
106 | def __init__(self, v): | |
107 | self.value = v |
|
107 | self.value = v | |
108 |
|
108 | |||
|
109 | class v2errorresponse(object): | |||
|
110 | """Represents a command error for version 2 transports.""" | |||
|
111 | def __init__(self, message, args=None): | |||
|
112 | self.message = message | |||
|
113 | self.args = args | |||
|
114 | ||||
|
115 | class v2streamingresponse(object): | |||
|
116 | """A response whose data is supplied by a generator. | |||
|
117 | ||||
|
118 | The generator can either consist of data structures to CBOR | |||
|
119 | encode or a stream of already-encoded bytes. | |||
|
120 | """ | |||
|
121 | def __init__(self, gen, compressible=True): | |||
|
122 | self.gen = gen | |||
|
123 | self.compressible = compressible | |||
|
124 | ||||
109 | # list of nodes encoding / decoding |
|
125 | # list of nodes encoding / decoding | |
110 | def decodelist(l, sep=' '): |
|
126 | def decodelist(l, sep=' '): | |
111 | if l: |
|
127 | if l: |
@@ -306,6 +306,15 def _httpv2runcommand(ui, repo, req, res | |||||
306 | action, meta = reactor.oncommandresponseready(outstream, |
|
306 | action, meta = reactor.oncommandresponseready(outstream, | |
307 | command['requestid'], |
|
307 | command['requestid'], | |
308 | encoded) |
|
308 | encoded) | |
|
309 | elif isinstance(rsp, wireprototypes.v2streamingresponse): | |||
|
310 | action, meta = reactor.oncommandresponsereadygen(outstream, | |||
|
311 | command['requestid'], | |||
|
312 | rsp.gen) | |||
|
313 | elif isinstance(rsp, wireprototypes.v2errorresponse): | |||
|
314 | action, meta = reactor.oncommanderror(outstream, | |||
|
315 | command['requestid'], | |||
|
316 | rsp.message, | |||
|
317 | rsp.args) | |||
309 | else: |
|
318 | else: | |
310 | action, meta = reactor.onservererror( |
|
319 | action, meta = reactor.onservererror( | |
311 | _('unhandled response type from wire proto command')) |
|
320 | _('unhandled response type from wire proto command')) |
General Comments 0
You need to be logged in to leave comments.
Login now