Show More
@@ -76,7 +76,7 b' def getlfile(repo, proto, sha):' | |||
|
76 | 76 | yield '%d\n' % length |
|
77 | 77 | for chunk in util.filechunkiter(f): |
|
78 | 78 | yield chunk |
|
79 | return wireproto.streamres(generator()) | |
|
79 | return wireproto.streamres(gen=generator()) | |
|
80 | 80 | |
|
81 | 81 | def statlfile(repo, proto, sha): |
|
82 | 82 | '''Server command for checking if a largefile is present - returns '2\n' if |
@@ -73,16 +73,6 b' class webproto(wireproto.abstractserverp' | |||
|
73 | 73 | self.ui.ferr, self.ui.fout = self.oldio |
|
74 | 74 | return val |
|
75 | 75 | |
|
76 | def groupchunks(self, fh): | |
|
77 | def getchunks(): | |
|
78 | while True: | |
|
79 | chunk = fh.read(32768) | |
|
80 | if not chunk: | |
|
81 | break | |
|
82 | yield chunk | |
|
83 | ||
|
84 | return self.compresschunks(getchunks()) | |
|
85 | ||
|
86 | 76 | def compresschunks(self, chunks): |
|
87 | 77 | # Don't allow untrusted settings because disabling compression or |
|
88 | 78 | # setting a very high compression level could lead to flooding |
@@ -106,8 +96,16 b' def call(repo, req, cmd):' | |||
|
106 | 96 | req.respond(HTTP_OK, HGTYPE, body=rsp) |
|
107 | 97 | return [] |
|
108 | 98 | elif isinstance(rsp, wireproto.streamres): |
|
99 | if rsp.reader: | |
|
100 | gen = iter(lambda: rsp.reader.read(32768), '') | |
|
101 | else: | |
|
102 | gen = rsp.gen | |
|
103 | ||
|
104 | if rsp.v1compressible: | |
|
105 | gen = p.compresschunks(gen) | |
|
106 | ||
|
109 | 107 | req.respond(HTTP_OK, HGTYPE) |
|
110 |
return |
|
|
108 | return gen | |
|
111 | 109 | elif isinstance(rsp, wireproto.pushres): |
|
112 | 110 | val = p.restore() |
|
113 | 111 | rsp = '%d\n%s' % (rsp.res, val) |
@@ -68,13 +68,6 b' class sshserver(wireproto.abstractserver' | |||
|
68 | 68 | def redirect(self): |
|
69 | 69 | pass |
|
70 | 70 | |
|
71 | def groupchunks(self, fh): | |
|
72 | return iter(lambda: fh.read(4096), '') | |
|
73 | ||
|
74 | def compresschunks(self, chunks): | |
|
75 | for chunk in chunks: | |
|
76 | yield chunk | |
|
77 | ||
|
78 | 71 | def sendresponse(self, v): |
|
79 | 72 | self.fout.write("%d\n" % len(v)) |
|
80 | 73 | self.fout.write(v) |
@@ -82,7 +75,13 b' class sshserver(wireproto.abstractserver' | |||
|
82 | 75 | |
|
83 | 76 | def sendstream(self, source): |
|
84 | 77 | write = self.fout.write |
|
85 | for chunk in source.gen: | |
|
78 | ||
|
79 | if source.reader: | |
|
80 | gen = iter(lambda: source.reader.read(4096), '') | |
|
81 | else: | |
|
82 | gen = source.gen | |
|
83 | ||
|
84 | for chunk in gen: | |
|
86 | 85 | write(chunk) |
|
87 | 86 | self.fout.flush() |
|
88 | 87 |
@@ -78,21 +78,6 b' class abstractserverproto(object):' | |||
|
78 | 78 | # """ |
|
79 | 79 | # raise NotImplementedError() |
|
80 | 80 | |
|
81 | def groupchunks(self, fh): | |
|
82 | """Generator of chunks to send to the client. | |
|
83 | ||
|
84 | Some protocols may have compressed the contents. | |
|
85 | """ | |
|
86 | raise NotImplementedError() | |
|
87 | ||
|
88 | def compresschunks(self, chunks): | |
|
89 | """Generator of possible compressed chunks to send to the client. | |
|
90 | ||
|
91 | This is like ``groupchunks()`` except it accepts a generator as | |
|
92 | its argument. | |
|
93 | """ | |
|
94 | raise NotImplementedError() | |
|
95 | ||
|
96 | 81 | class remotebatch(peer.batcher): |
|
97 | 82 | '''batches the queued calls; uses as few roundtrips as possible''' |
|
98 | 83 | def __init__(self, remote): |
@@ -529,10 +514,19 b' class streamres(object):' | |||
|
529 | 514 | """wireproto reply: binary stream |
|
530 | 515 | |
|
531 | 516 | The call was successful and the result is a stream. |
|
532 | Iterate on the `self.gen` attribute to retrieve chunks. | |
|
517 | ||
|
518 | Accepts either a generator or an object with a ``read(size)`` method. | |
|
519 | ||
|
520 | ``v1compressible`` indicates whether this data can be compressed to | |
|
521 | "version 1" clients (technically: HTTP peers using | |
|
522 | application/mercurial-0.1 media type). This flag should NOT be used on | |
|
523 | new commands because new clients should support a more modern compression | |
|
524 | mechanism. | |
|
533 | 525 | """ |
|
534 | def __init__(self, gen): | |
|
526 | def __init__(self, gen=None, reader=None, v1compressible=False): | |
|
535 | 527 | self.gen = gen |
|
528 | self.reader = reader | |
|
529 | self.v1compressible = v1compressible | |
|
536 | 530 | |
|
537 | 531 | class pushres(object): |
|
538 | 532 | """wireproto reply: success with simple integer return |
@@ -739,14 +733,14 b' def capabilities(repo, proto):' | |||
|
739 | 733 | def changegroup(repo, proto, roots): |
|
740 | 734 | nodes = decodelist(roots) |
|
741 | 735 | cg = changegroupmod.changegroup(repo, nodes, 'serve') |
|
742 | return streamres(proto.groupchunks(cg)) | |
|
736 | return streamres(reader=cg, v1compressible=True) | |
|
743 | 737 | |
|
744 | 738 | @wireprotocommand('changegroupsubset', 'bases heads') |
|
745 | 739 | def changegroupsubset(repo, proto, bases, heads): |
|
746 | 740 | bases = decodelist(bases) |
|
747 | 741 | heads = decodelist(heads) |
|
748 | 742 | cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve') |
|
749 | return streamres(proto.groupchunks(cg)) | |
|
743 | return streamres(reader=cg, v1compressible=True) | |
|
750 | 744 | |
|
751 | 745 | @wireprotocommand('debugwireargs', 'one two *') |
|
752 | 746 | def debugwireargs(repo, proto, one, two, others): |
@@ -781,7 +775,7 b' def getbundle(repo, proto, others):' | |||
|
781 | 775 | return ooberror(bundle2required) |
|
782 | 776 | |
|
783 | 777 | chunks = exchange.getbundlechunks(repo, 'serve', **opts) |
|
784 |
return streamres( |
|
|
778 | return streamres(gen=chunks, v1compressible=True) | |
|
785 | 779 | |
|
786 | 780 | @wireprotocommand('heads') |
|
787 | 781 | def heads(repo, proto): |
@@ -870,7 +864,7 b' def stream(repo, proto):' | |||
|
870 | 864 | # LockError may be raised before the first result is yielded. Don't |
|
871 | 865 | # emit output until we're sure we got the lock successfully. |
|
872 | 866 | it = streamclone.generatev1wireproto(repo) |
|
873 | return streamres(getstream(it)) | |
|
867 | return streamres(gen=getstream(it)) | |
|
874 | 868 | except error.LockError: |
|
875 | 869 | return '2\n' |
|
876 | 870 | |
@@ -900,7 +894,7 b' def unbundle(repo, proto, heads):' | |||
|
900 | 894 | if util.safehasattr(r, 'addpart'): |
|
901 | 895 | # The return looks streamable, we are in the bundle2 case and |
|
902 | 896 | # should return a stream. |
|
903 | return streamres(r.getchunks()) | |
|
897 | return streamres(gen=r.getchunks()) | |
|
904 | 898 | return pushres(r) |
|
905 | 899 | |
|
906 | 900 | finally: |
@@ -962,4 +956,4 b' def unbundle(repo, proto, heads):' | |||
|
962 | 956 | manargs, advargs)) |
|
963 | 957 | except error.PushRaced as exc: |
|
964 | 958 | bundler.newpart('error:pushraced', [('message', str(exc))]) |
|
965 | return streamres(bundler.getchunks()) | |
|
959 | return streamres(gen=bundler.getchunks()) |
General Comments 0
You need to be logged in to leave comments.
Login now