Show More
@@ -76,7 +76,7 b' def getlfile(repo, proto, sha):' | |||||
76 | yield '%d\n' % length |
|
76 | yield '%d\n' % length | |
77 | for chunk in util.filechunkiter(f): |
|
77 | for chunk in util.filechunkiter(f): | |
78 | yield chunk |
|
78 | yield chunk | |
79 | return wireproto.streamres(generator()) |
|
79 | return wireproto.streamres(gen=generator()) | |
80 |
|
80 | |||
81 | def statlfile(repo, proto, sha): |
|
81 | def statlfile(repo, proto, sha): | |
82 | '''Server command for checking if a largefile is present - returns '2\n' if |
|
82 | '''Server command for checking if a largefile is present - returns '2\n' if |
@@ -73,16 +73,6 b' class webproto(wireproto.abstractserverp' | |||||
73 | self.ui.ferr, self.ui.fout = self.oldio |
|
73 | self.ui.ferr, self.ui.fout = self.oldio | |
74 | return val |
|
74 | return val | |
75 |
|
75 | |||
76 | def groupchunks(self, fh): |
|
|||
77 | def getchunks(): |
|
|||
78 | while True: |
|
|||
79 | chunk = fh.read(32768) |
|
|||
80 | if not chunk: |
|
|||
81 | break |
|
|||
82 | yield chunk |
|
|||
83 |
|
||||
84 | return self.compresschunks(getchunks()) |
|
|||
85 |
|
||||
86 | def compresschunks(self, chunks): |
|
76 | def compresschunks(self, chunks): | |
87 | # Don't allow untrusted settings because disabling compression or |
|
77 | # Don't allow untrusted settings because disabling compression or | |
88 | # setting a very high compression level could lead to flooding |
|
78 | # setting a very high compression level could lead to flooding | |
@@ -106,8 +96,16 b' def call(repo, req, cmd):' | |||||
106 | req.respond(HTTP_OK, HGTYPE, body=rsp) |
|
96 | req.respond(HTTP_OK, HGTYPE, body=rsp) | |
107 | return [] |
|
97 | return [] | |
108 | elif isinstance(rsp, wireproto.streamres): |
|
98 | elif isinstance(rsp, wireproto.streamres): | |
|
99 | if rsp.reader: | |||
|
100 | gen = iter(lambda: rsp.reader.read(32768), '') | |||
|
101 | else: | |||
|
102 | gen = rsp.gen | |||
|
103 | ||||
|
104 | if rsp.v1compressible: | |||
|
105 | gen = p.compresschunks(gen) | |||
|
106 | ||||
109 | req.respond(HTTP_OK, HGTYPE) |
|
107 | req.respond(HTTP_OK, HGTYPE) | |
110 |
return |
|
108 | return gen | |
111 | elif isinstance(rsp, wireproto.pushres): |
|
109 | elif isinstance(rsp, wireproto.pushres): | |
112 | val = p.restore() |
|
110 | val = p.restore() | |
113 | rsp = '%d\n%s' % (rsp.res, val) |
|
111 | rsp = '%d\n%s' % (rsp.res, val) |
@@ -68,13 +68,6 b' class sshserver(wireproto.abstractserver' | |||||
68 | def redirect(self): |
|
68 | def redirect(self): | |
69 | pass |
|
69 | pass | |
70 |
|
70 | |||
71 | def groupchunks(self, fh): |
|
|||
72 | return iter(lambda: fh.read(4096), '') |
|
|||
73 |
|
||||
74 | def compresschunks(self, chunks): |
|
|||
75 | for chunk in chunks: |
|
|||
76 | yield chunk |
|
|||
77 |
|
||||
78 | def sendresponse(self, v): |
|
71 | def sendresponse(self, v): | |
79 | self.fout.write("%d\n" % len(v)) |
|
72 | self.fout.write("%d\n" % len(v)) | |
80 | self.fout.write(v) |
|
73 | self.fout.write(v) | |
@@ -82,7 +75,13 b' class sshserver(wireproto.abstractserver' | |||||
82 |
|
75 | |||
83 | def sendstream(self, source): |
|
76 | def sendstream(self, source): | |
84 | write = self.fout.write |
|
77 | write = self.fout.write | |
85 | for chunk in source.gen: |
|
78 | ||
|
79 | if source.reader: | |||
|
80 | gen = iter(lambda: source.reader.read(4096), '') | |||
|
81 | else: | |||
|
82 | gen = source.gen | |||
|
83 | ||||
|
84 | for chunk in gen: | |||
86 | write(chunk) |
|
85 | write(chunk) | |
87 | self.fout.flush() |
|
86 | self.fout.flush() | |
88 |
|
87 |
@@ -78,21 +78,6 b' class abstractserverproto(object):' | |||||
78 | # """ |
|
78 | # """ | |
79 | # raise NotImplementedError() |
|
79 | # raise NotImplementedError() | |
80 |
|
80 | |||
81 | def groupchunks(self, fh): |
|
|||
82 | """Generator of chunks to send to the client. |
|
|||
83 |
|
||||
84 | Some protocols may have compressed the contents. |
|
|||
85 | """ |
|
|||
86 | raise NotImplementedError() |
|
|||
87 |
|
||||
88 | def compresschunks(self, chunks): |
|
|||
89 | """Generator of possible compressed chunks to send to the client. |
|
|||
90 |
|
||||
91 | This is like ``groupchunks()`` except it accepts a generator as |
|
|||
92 | its argument. |
|
|||
93 | """ |
|
|||
94 | raise NotImplementedError() |
|
|||
95 |
|
||||
96 | class remotebatch(peer.batcher): |
|
81 | class remotebatch(peer.batcher): | |
97 | '''batches the queued calls; uses as few roundtrips as possible''' |
|
82 | '''batches the queued calls; uses as few roundtrips as possible''' | |
98 | def __init__(self, remote): |
|
83 | def __init__(self, remote): | |
@@ -529,10 +514,19 b' class streamres(object):' | |||||
529 | """wireproto reply: binary stream |
|
514 | """wireproto reply: binary stream | |
530 |
|
515 | |||
531 | The call was successful and the result is a stream. |
|
516 | The call was successful and the result is a stream. | |
532 | Iterate on the `self.gen` attribute to retrieve chunks. |
|
517 | ||
|
518 | Accepts either a generator or an object with a ``read(size)`` method. | |||
|
519 | ||||
|
520 | ``v1compressible`` indicates whether this data can be compressed to | |||
|
521 | "version 1" clients (technically: HTTP peers using | |||
|
522 | application/mercurial-0.1 media type). This flag should NOT be used on | |||
|
523 | new commands because new clients should support a more modern compression | |||
|
524 | mechanism. | |||
533 | """ |
|
525 | """ | |
534 | def __init__(self, gen): |
|
526 | def __init__(self, gen=None, reader=None, v1compressible=False): | |
535 | self.gen = gen |
|
527 | self.gen = gen | |
|
528 | self.reader = reader | |||
|
529 | self.v1compressible = v1compressible | |||
536 |
|
530 | |||
537 | class pushres(object): |
|
531 | class pushres(object): | |
538 | """wireproto reply: success with simple integer return |
|
532 | """wireproto reply: success with simple integer return | |
@@ -739,14 +733,14 b' def capabilities(repo, proto):' | |||||
739 | def changegroup(repo, proto, roots): |
|
733 | def changegroup(repo, proto, roots): | |
740 | nodes = decodelist(roots) |
|
734 | nodes = decodelist(roots) | |
741 | cg = changegroupmod.changegroup(repo, nodes, 'serve') |
|
735 | cg = changegroupmod.changegroup(repo, nodes, 'serve') | |
742 | return streamres(proto.groupchunks(cg)) |
|
736 | return streamres(reader=cg, v1compressible=True) | |
743 |
|
737 | |||
744 | @wireprotocommand('changegroupsubset', 'bases heads') |
|
738 | @wireprotocommand('changegroupsubset', 'bases heads') | |
745 | def changegroupsubset(repo, proto, bases, heads): |
|
739 | def changegroupsubset(repo, proto, bases, heads): | |
746 | bases = decodelist(bases) |
|
740 | bases = decodelist(bases) | |
747 | heads = decodelist(heads) |
|
741 | heads = decodelist(heads) | |
748 | cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve') |
|
742 | cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve') | |
749 | return streamres(proto.groupchunks(cg)) |
|
743 | return streamres(reader=cg, v1compressible=True) | |
750 |
|
744 | |||
751 | @wireprotocommand('debugwireargs', 'one two *') |
|
745 | @wireprotocommand('debugwireargs', 'one two *') | |
752 | def debugwireargs(repo, proto, one, two, others): |
|
746 | def debugwireargs(repo, proto, one, two, others): | |
@@ -781,7 +775,7 b' def getbundle(repo, proto, others):' | |||||
781 | return ooberror(bundle2required) |
|
775 | return ooberror(bundle2required) | |
782 |
|
776 | |||
783 | chunks = exchange.getbundlechunks(repo, 'serve', **opts) |
|
777 | chunks = exchange.getbundlechunks(repo, 'serve', **opts) | |
784 |
return streamres( |
|
778 | return streamres(gen=chunks, v1compressible=True) | |
785 |
|
779 | |||
786 | @wireprotocommand('heads') |
|
780 | @wireprotocommand('heads') | |
787 | def heads(repo, proto): |
|
781 | def heads(repo, proto): | |
@@ -870,7 +864,7 b' def stream(repo, proto):' | |||||
870 | # LockError may be raised before the first result is yielded. Don't |
|
864 | # LockError may be raised before the first result is yielded. Don't | |
871 | # emit output until we're sure we got the lock successfully. |
|
865 | # emit output until we're sure we got the lock successfully. | |
872 | it = streamclone.generatev1wireproto(repo) |
|
866 | it = streamclone.generatev1wireproto(repo) | |
873 | return streamres(getstream(it)) |
|
867 | return streamres(gen=getstream(it)) | |
874 | except error.LockError: |
|
868 | except error.LockError: | |
875 | return '2\n' |
|
869 | return '2\n' | |
876 |
|
870 | |||
@@ -900,7 +894,7 b' def unbundle(repo, proto, heads):' | |||||
900 | if util.safehasattr(r, 'addpart'): |
|
894 | if util.safehasattr(r, 'addpart'): | |
901 | # The return looks streamable, we are in the bundle2 case and |
|
895 | # The return looks streamable, we are in the bundle2 case and | |
902 | # should return a stream. |
|
896 | # should return a stream. | |
903 | return streamres(r.getchunks()) |
|
897 | return streamres(gen=r.getchunks()) | |
904 | return pushres(r) |
|
898 | return pushres(r) | |
905 |
|
899 | |||
906 | finally: |
|
900 | finally: | |
@@ -962,4 +956,4 b' def unbundle(repo, proto, heads):' | |||||
962 | manargs, advargs)) |
|
956 | manargs, advargs)) | |
963 | except error.PushRaced as exc: |
|
957 | except error.PushRaced as exc: | |
964 | bundler.newpart('error:pushraced', [('message', str(exc))]) |
|
958 | bundler.newpart('error:pushraced', [('message', str(exc))]) | |
965 | return streamres(bundler.getchunks()) |
|
959 | return streamres(gen=bundler.getchunks()) |
General Comments 0
You need to be logged in to leave comments.
Login now