##// END OF EJS Templates
changegroup: decompress GZ algorithm in larger chunks for better performance
Michael Tjørnemark -
r16557:9dba5536 stable
parent child Browse files
Show More
@@ -1,256 +1,256
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 from node import nullrev
10 10 import mdiff, util
11 11 import struct, os, bz2, zlib, tempfile
12 12
13 13 _BUNDLE10_DELTA_HEADER = "20s20s20s20s"
14 14
15 15 def readexactly(stream, n):
16 16 '''read n bytes from stream.read and abort if less was available'''
17 17 s = stream.read(n)
18 18 if len(s) < n:
19 19 raise util.Abort(_("stream ended unexpectedly"
20 20 " (got %d bytes, expected %d)")
21 21 % (len(s), n))
22 22 return s
23 23
24 24 def getchunk(stream):
25 25 """return the next chunk from stream as a string"""
26 26 d = readexactly(stream, 4)
27 27 l = struct.unpack(">l", d)[0]
28 28 if l <= 4:
29 29 if l:
30 30 raise util.Abort(_("invalid chunk length %d") % l)
31 31 return ""
32 32 return readexactly(stream, l - 4)
33 33
34 34 def chunkheader(length):
35 35 """return a changegroup chunk header (string)"""
36 36 return struct.pack(">l", length + 4)
37 37
38 38 def closechunk():
39 39 """return a changegroup chunk header (string) for a zero-length chunk"""
40 40 return struct.pack(">l", 0)
41 41
42 42 class nocompress(object):
43 43 def compress(self, x):
44 44 return x
45 45 def flush(self):
46 46 return ""
47 47
48 48 bundletypes = {
49 49 "": ("", nocompress), # only when using unbundle on ssh and old http servers
50 50 # since the unification ssh accepts a header but there
51 51 # is no capability signaling it.
52 52 "HG10UN": ("HG10UN", nocompress),
53 53 "HG10BZ": ("HG10", lambda: bz2.BZ2Compressor()),
54 54 "HG10GZ": ("HG10GZ", lambda: zlib.compressobj()),
55 55 }
56 56
57 57 # hgweb uses this list to communicate its preferred type
58 58 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
59 59
60 60 def writebundle(cg, filename, bundletype):
61 61 """Write a bundle file and return its filename.
62 62
63 63 Existing files will not be overwritten.
64 64 If no filename is specified, a temporary file is created.
65 65 bz2 compression can be turned off.
66 66 The bundle file will be deleted in case of errors.
67 67 """
68 68
69 69 fh = None
70 70 cleanup = None
71 71 try:
72 72 if filename:
73 73 fh = open(filename, "wb")
74 74 else:
75 75 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
76 76 fh = os.fdopen(fd, "wb")
77 77 cleanup = filename
78 78
79 79 header, compressor = bundletypes[bundletype]
80 80 fh.write(header)
81 81 z = compressor()
82 82
83 83 # parse the changegroup data, otherwise we will block
84 84 # in case of sshrepo because we don't know the end of the stream
85 85
86 86 # an empty chunkgroup is the end of the changegroup
87 87 # a changegroup has at least 2 chunkgroups (changelog and manifest).
88 88 # after that, an empty chunkgroup is the end of the changegroup
89 89 empty = False
90 90 count = 0
91 91 while not empty or count <= 2:
92 92 empty = True
93 93 count += 1
94 94 while True:
95 95 chunk = getchunk(cg)
96 96 if not chunk:
97 97 break
98 98 empty = False
99 99 fh.write(z.compress(chunkheader(len(chunk))))
100 100 pos = 0
101 101 while pos < len(chunk):
102 102 next = pos + 2**20
103 103 fh.write(z.compress(chunk[pos:next]))
104 104 pos = next
105 105 fh.write(z.compress(closechunk()))
106 106 fh.write(z.flush())
107 107 cleanup = None
108 108 return filename
109 109 finally:
110 110 if fh is not None:
111 111 fh.close()
112 112 if cleanup is not None:
113 113 os.unlink(cleanup)
114 114
115 115 def decompressor(fh, alg):
116 116 if alg == 'UN':
117 117 return fh
118 118 elif alg == 'GZ':
119 119 def generator(f):
120 120 zd = zlib.decompressobj()
121 for chunk in f:
121 for chunk in util.filechunkiter(f):
122 122 yield zd.decompress(chunk)
123 123 elif alg == 'BZ':
124 124 def generator(f):
125 125 zd = bz2.BZ2Decompressor()
126 126 zd.decompress("BZ")
127 127 for chunk in util.filechunkiter(f, 4096):
128 128 yield zd.decompress(chunk)
129 129 else:
130 130 raise util.Abort("unknown bundle compression '%s'" % alg)
131 131 return util.chunkbuffer(generator(fh))
132 132
133 133 class unbundle10(object):
134 134 deltaheader = _BUNDLE10_DELTA_HEADER
135 135 deltaheadersize = struct.calcsize(deltaheader)
136 136 def __init__(self, fh, alg):
137 137 self._stream = decompressor(fh, alg)
138 138 self._type = alg
139 139 self.callback = None
140 140 def compressed(self):
141 141 return self._type != 'UN'
142 142 def read(self, l):
143 143 return self._stream.read(l)
144 144 def seek(self, pos):
145 145 return self._stream.seek(pos)
146 146 def tell(self):
147 147 return self._stream.tell()
148 148 def close(self):
149 149 return self._stream.close()
150 150
151 151 def chunklength(self):
152 152 d = readexactly(self._stream, 4)
153 153 l = struct.unpack(">l", d)[0]
154 154 if l <= 4:
155 155 if l:
156 156 raise util.Abort(_("invalid chunk length %d") % l)
157 157 return 0
158 158 if self.callback:
159 159 self.callback()
160 160 return l - 4
161 161
162 162 def changelogheader(self):
163 163 """v10 does not have a changelog header chunk"""
164 164 return {}
165 165
166 166 def manifestheader(self):
167 167 """v10 does not have a manifest header chunk"""
168 168 return {}
169 169
170 170 def filelogheader(self):
171 171 """return the header of the filelogs chunk, v10 only has the filename"""
172 172 l = self.chunklength()
173 173 if not l:
174 174 return {}
175 175 fname = readexactly(self._stream, l)
176 176 return dict(filename=fname)
177 177
178 178 def _deltaheader(self, headertuple, prevnode):
179 179 node, p1, p2, cs = headertuple
180 180 if prevnode is None:
181 181 deltabase = p1
182 182 else:
183 183 deltabase = prevnode
184 184 return node, p1, p2, deltabase, cs
185 185
186 186 def deltachunk(self, prevnode):
187 187 l = self.chunklength()
188 188 if not l:
189 189 return {}
190 190 headerdata = readexactly(self._stream, self.deltaheadersize)
191 191 header = struct.unpack(self.deltaheader, headerdata)
192 192 delta = readexactly(self._stream, l - self.deltaheadersize)
193 193 node, p1, p2, deltabase, cs = self._deltaheader(header, prevnode)
194 194 return dict(node=node, p1=p1, p2=p2, cs=cs,
195 195 deltabase=deltabase, delta=delta)
196 196
197 197 class headerlessfixup(object):
198 198 def __init__(self, fh, h):
199 199 self._h = h
200 200 self._fh = fh
201 201 def read(self, n):
202 202 if self._h:
203 203 d, self._h = self._h[:n], self._h[n:]
204 204 if len(d) < n:
205 205 d += readexactly(self._fh, n - len(d))
206 206 return d
207 207 return readexactly(self._fh, n)
208 208
209 209 def readbundle(fh, fname):
210 210 header = readexactly(fh, 6)
211 211
212 212 if not fname:
213 213 fname = "stream"
214 214 if not header.startswith('HG') and header.startswith('\0'):
215 215 fh = headerlessfixup(fh, header)
216 216 header = "HG10UN"
217 217
218 218 magic, version, alg = header[0:2], header[2:4], header[4:6]
219 219
220 220 if magic != 'HG':
221 221 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
222 222 if version != '10':
223 223 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
224 224 return unbundle10(fh, alg)
225 225
226 226 class bundle10(object):
227 227 deltaheader = _BUNDLE10_DELTA_HEADER
228 228 def __init__(self, lookup):
229 229 self._lookup = lookup
230 230 def close(self):
231 231 return closechunk()
232 232 def fileheader(self, fname):
233 233 return chunkheader(len(fname)) + fname
234 234 def revchunk(self, revlog, rev, prev):
235 235 node = revlog.node(rev)
236 236 p1, p2 = revlog.parentrevs(rev)
237 237 base = prev
238 238
239 239 prefix = ''
240 240 if base == nullrev:
241 241 delta = revlog.revision(node)
242 242 prefix = mdiff.trivialdiffheader(len(delta))
243 243 else:
244 244 delta = revlog.revdiff(base, rev)
245 245 linknode = self._lookup(revlog, node)
246 246 p1n, p2n = revlog.parents(node)
247 247 basenode = revlog.node(base)
248 248 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode)
249 249 meta += prefix
250 250 l = len(meta) + len(delta)
251 251 yield chunkheader(l)
252 252 yield meta
253 253 yield delta
254 254 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
255 255 # do nothing with basenode, it is implicitly the previous one in HG10
256 256 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
General Comments 0
You need to be logged in to leave comments. Login now