##// END OF EJS Templates
wireprotov2: add support for more response types...
Gregory Szorc -
r37746:564a3eec default
parent child Browse files
Show More
@@ -386,6 +386,56 b' def createcommandresponseframesfrombytes'
386 386 if done:
387 387 break
388 388
389 def createbytesresponseframesfromgen(stream, requestid, gen,
390 maxframesize=DEFAULT_MAX_FRAME_SIZE):
391 overall = cbor.dumps({b'status': b'ok'}, canonical=True)
392
393 yield stream.makeframe(requestid=requestid,
394 typeid=FRAME_TYPE_COMMAND_RESPONSE,
395 flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
396 payload=overall)
397
398 cb = util.chunkbuffer(gen)
399
400 flags = 0
401
402 while True:
403 chunk = cb.read(maxframesize)
404 if not chunk:
405 break
406
407 yield stream.makeframe(requestid=requestid,
408 typeid=FRAME_TYPE_COMMAND_RESPONSE,
409 flags=flags,
410 payload=chunk)
411
412 flags |= FLAG_COMMAND_RESPONSE_CONTINUATION
413
414 flags ^= FLAG_COMMAND_RESPONSE_CONTINUATION
415 flags |= FLAG_COMMAND_RESPONSE_EOS
416 yield stream.makeframe(requestid=requestid,
417 typeid=FRAME_TYPE_COMMAND_RESPONSE,
418 flags=flags,
419 payload=b'')
420
421 def createcommanderrorresponse(stream, requestid, message, args=None):
422 m = {
423 b'status': b'error',
424 b'error': {
425 b'message': message,
426 }
427 }
428
429 if args:
430 m[b'error'][b'args'] = args
431
432 overall = cbor.dumps(m, canonical=True)
433
434 yield stream.makeframe(requestid=requestid,
435 typeid=FRAME_TYPE_COMMAND_RESPONSE,
436 flags=FLAG_COMMAND_RESPONSE_EOS,
437 payload=overall)
438
389 439 def createerrorframe(stream, requestid, msg, errtype):
390 440 # TODO properly handle frame size limits.
391 441 assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
@@ -634,6 +684,19 b' class serverreactor(object):'
634 684 'framegen': result,
635 685 }
636 686
687 def oncommandresponsereadygen(self, stream, requestid, gen):
688 """Signal that a bytes response is ready, with data as a generator."""
689 ensureserverstream(stream)
690
691 def sendframes():
692 for frame in createbytesresponseframesfromgen(stream, requestid,
693 gen):
694 yield frame
695
696 self._activecommands.remove(requestid)
697
698 return self._handlesendframes(sendframes())
699
637 700 def oninputeof(self):
638 701 """Signals that end of input has been received.
639 702
@@ -655,13 +718,39 b' class serverreactor(object):'
655 718 'framegen': makegen(),
656 719 }
657 720
721 def _handlesendframes(self, framegen):
722 if self._deferoutput:
723 self._bufferedframegens.append(framegen)
724 return 'noop', {}
725 else:
726 return 'sendframes', {
727 'framegen': framegen,
728 }
729
658 730 def onservererror(self, stream, requestid, msg):
659 731 ensureserverstream(stream)
660 732
661 return 'sendframes', {
662 'framegen': createerrorframe(stream, requestid, msg,
663 errtype='server'),
664 }
733 def sendframes():
734 for frame in createerrorframe(stream, requestid, msg,
735 errtype='server'):
736 yield frame
737
738 self._activecommands.remove(requestid)
739
740 return self._handlesendframes(sendframes())
741
742 def oncommanderror(self, stream, requestid, message, args=None):
743 """Called when a command encountered an error before sending output."""
744 ensureserverstream(stream)
745
746 def sendframes():
747 for frame in createcommanderrorresponse(stream, requestid, message,
748 args):
749 yield frame
750
751 self._activecommands.remove(requestid)
752
753 return self._handlesendframes(sendframes())
665 754
666 755 def makeoutputstream(self):
667 756 """Create a stream to be used for sending data to the client."""
@@ -106,6 +106,22 b' class cborresponse(object):'
106 106 def __init__(self, v):
107 107 self.value = v
108 108
109 class v2errorresponse(object):
110 """Represents a command error for version 2 transports."""
111 def __init__(self, message, args=None):
112 self.message = message
113 self.args = args
114
115 class v2streamingresponse(object):
116 """A response whose data is supplied by a generator.
117
118 The generator can either consist of data structures to CBOR
119 encode or a stream of already-encoded bytes.
120 """
121 def __init__(self, gen, compressible=True):
122 self.gen = gen
123 self.compressible = compressible
124
109 125 # list of nodes encoding / decoding
110 126 def decodelist(l, sep=' '):
111 127 if l:
@@ -306,6 +306,15 b' def _httpv2runcommand(ui, repo, req, res'
306 306 action, meta = reactor.oncommandresponseready(outstream,
307 307 command['requestid'],
308 308 encoded)
309 elif isinstance(rsp, wireprototypes.v2streamingresponse):
310 action, meta = reactor.oncommandresponsereadygen(outstream,
311 command['requestid'],
312 rsp.gen)
313 elif isinstance(rsp, wireprototypes.v2errorresponse):
314 action, meta = reactor.oncommanderror(outstream,
315 command['requestid'],
316 rsp.message,
317 rsp.args)
309 318 else:
310 319 action, meta = reactor.onservererror(
311 320 _('unhandled response type from wire proto command'))
General Comments 0
You need to be logged in to leave comments. Login now