##// END OF EJS Templates
bundle: fix performance regression when bundling file changes (issue4031)...
Antoine Pitrou -
r19708:fd4f612f stable
parent child Browse files
Show More
@@ -1,430 +1,430
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from i18n import _
9 9 from node import nullrev, hex
10 10 import mdiff, util, dagutil
11 11 import struct, os, bz2, zlib, tempfile
12 12
13 13 _BUNDLE10_DELTA_HEADER = "20s20s20s20s"
14 14
15 15 def readexactly(stream, n):
16 16 '''read n bytes from stream.read and abort if less was available'''
17 17 s = stream.read(n)
18 18 if len(s) < n:
19 19 raise util.Abort(_("stream ended unexpectedly"
20 20 " (got %d bytes, expected %d)")
21 21 % (len(s), n))
22 22 return s
23 23
24 24 def getchunk(stream):
25 25 """return the next chunk from stream as a string"""
26 26 d = readexactly(stream, 4)
27 27 l = struct.unpack(">l", d)[0]
28 28 if l <= 4:
29 29 if l:
30 30 raise util.Abort(_("invalid chunk length %d") % l)
31 31 return ""
32 32 return readexactly(stream, l - 4)
33 33
34 34 def chunkheader(length):
35 35 """return a changegroup chunk header (string)"""
36 36 return struct.pack(">l", length + 4)
37 37
38 38 def closechunk():
39 39 """return a changegroup chunk header (string) for a zero-length chunk"""
40 40 return struct.pack(">l", 0)
41 41
42 42 class nocompress(object):
43 43 def compress(self, x):
44 44 return x
45 45 def flush(self):
46 46 return ""
47 47
48 48 bundletypes = {
49 49 "": ("", nocompress), # only when using unbundle on ssh and old http servers
50 50 # since the unification ssh accepts a header but there
51 51 # is no capability signaling it.
52 52 "HG10UN": ("HG10UN", nocompress),
53 53 "HG10BZ": ("HG10", lambda: bz2.BZ2Compressor()),
54 54 "HG10GZ": ("HG10GZ", lambda: zlib.compressobj()),
55 55 }
56 56
57 57 # hgweb uses this list to communicate its preferred type
58 58 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
59 59
60 60 def writebundle(cg, filename, bundletype):
61 61 """Write a bundle file and return its filename.
62 62
63 63 Existing files will not be overwritten.
64 64 If no filename is specified, a temporary file is created.
65 65 bz2 compression can be turned off.
66 66 The bundle file will be deleted in case of errors.
67 67 """
68 68
69 69 fh = None
70 70 cleanup = None
71 71 try:
72 72 if filename:
73 73 fh = open(filename, "wb")
74 74 else:
75 75 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
76 76 fh = os.fdopen(fd, "wb")
77 77 cleanup = filename
78 78
79 79 header, compressor = bundletypes[bundletype]
80 80 fh.write(header)
81 81 z = compressor()
82 82
83 83 # parse the changegroup data, otherwise we will block
84 84 # in case of sshrepo because we don't know the end of the stream
85 85
86 86 # an empty chunkgroup is the end of the changegroup
87 87 # a changegroup has at least 2 chunkgroups (changelog and manifest).
88 88 # after that, an empty chunkgroup is the end of the changegroup
89 89 empty = False
90 90 count = 0
91 91 while not empty or count <= 2:
92 92 empty = True
93 93 count += 1
94 94 while True:
95 95 chunk = getchunk(cg)
96 96 if not chunk:
97 97 break
98 98 empty = False
99 99 fh.write(z.compress(chunkheader(len(chunk))))
100 100 pos = 0
101 101 while pos < len(chunk):
102 102 next = pos + 2**20
103 103 fh.write(z.compress(chunk[pos:next]))
104 104 pos = next
105 105 fh.write(z.compress(closechunk()))
106 106 fh.write(z.flush())
107 107 cleanup = None
108 108 return filename
109 109 finally:
110 110 if fh is not None:
111 111 fh.close()
112 112 if cleanup is not None:
113 113 os.unlink(cleanup)
114 114
115 115 def decompressor(fh, alg):
116 116 if alg == 'UN':
117 117 return fh
118 118 elif alg == 'GZ':
119 119 def generator(f):
120 120 zd = zlib.decompressobj()
121 121 for chunk in util.filechunkiter(f):
122 122 yield zd.decompress(chunk)
123 123 elif alg == 'BZ':
124 124 def generator(f):
125 125 zd = bz2.BZ2Decompressor()
126 126 zd.decompress("BZ")
127 127 for chunk in util.filechunkiter(f, 4096):
128 128 yield zd.decompress(chunk)
129 129 else:
130 130 raise util.Abort("unknown bundle compression '%s'" % alg)
131 131 return util.chunkbuffer(generator(fh))
132 132
133 133 class unbundle10(object):
134 134 deltaheader = _BUNDLE10_DELTA_HEADER
135 135 deltaheadersize = struct.calcsize(deltaheader)
136 136 def __init__(self, fh, alg):
137 137 self._stream = decompressor(fh, alg)
138 138 self._type = alg
139 139 self.callback = None
140 140 def compressed(self):
141 141 return self._type != 'UN'
142 142 def read(self, l):
143 143 return self._stream.read(l)
144 144 def seek(self, pos):
145 145 return self._stream.seek(pos)
146 146 def tell(self):
147 147 return self._stream.tell()
148 148 def close(self):
149 149 return self._stream.close()
150 150
151 151 def chunklength(self):
152 152 d = readexactly(self._stream, 4)
153 153 l = struct.unpack(">l", d)[0]
154 154 if l <= 4:
155 155 if l:
156 156 raise util.Abort(_("invalid chunk length %d") % l)
157 157 return 0
158 158 if self.callback:
159 159 self.callback()
160 160 return l - 4
161 161
162 162 def changelogheader(self):
163 163 """v10 does not have a changelog header chunk"""
164 164 return {}
165 165
166 166 def manifestheader(self):
167 167 """v10 does not have a manifest header chunk"""
168 168 return {}
169 169
170 170 def filelogheader(self):
171 171 """return the header of the filelogs chunk, v10 only has the filename"""
172 172 l = self.chunklength()
173 173 if not l:
174 174 return {}
175 175 fname = readexactly(self._stream, l)
176 176 return dict(filename=fname)
177 177
178 178 def _deltaheader(self, headertuple, prevnode):
179 179 node, p1, p2, cs = headertuple
180 180 if prevnode is None:
181 181 deltabase = p1
182 182 else:
183 183 deltabase = prevnode
184 184 return node, p1, p2, deltabase, cs
185 185
186 186 def deltachunk(self, prevnode):
187 187 l = self.chunklength()
188 188 if not l:
189 189 return {}
190 190 headerdata = readexactly(self._stream, self.deltaheadersize)
191 191 header = struct.unpack(self.deltaheader, headerdata)
192 192 delta = readexactly(self._stream, l - self.deltaheadersize)
193 193 node, p1, p2, deltabase, cs = self._deltaheader(header, prevnode)
194 194 return dict(node=node, p1=p1, p2=p2, cs=cs,
195 195 deltabase=deltabase, delta=delta)
196 196
197 197 class headerlessfixup(object):
198 198 def __init__(self, fh, h):
199 199 self._h = h
200 200 self._fh = fh
201 201 def read(self, n):
202 202 if self._h:
203 203 d, self._h = self._h[:n], self._h[n:]
204 204 if len(d) < n:
205 205 d += readexactly(self._fh, n - len(d))
206 206 return d
207 207 return readexactly(self._fh, n)
208 208
209 209 def readbundle(fh, fname):
210 210 header = readexactly(fh, 6)
211 211
212 212 if not fname:
213 213 fname = "stream"
214 214 if not header.startswith('HG') and header.startswith('\0'):
215 215 fh = headerlessfixup(fh, header)
216 216 header = "HG10UN"
217 217
218 218 magic, version, alg = header[0:2], header[2:4], header[4:6]
219 219
220 220 if magic != 'HG':
221 221 raise util.Abort(_('%s: not a Mercurial bundle') % fname)
222 222 if version != '10':
223 223 raise util.Abort(_('%s: unknown bundle version %s') % (fname, version))
224 224 return unbundle10(fh, alg)
225 225
226 226 class bundle10(object):
227 227 deltaheader = _BUNDLE10_DELTA_HEADER
228 228 def __init__(self, repo, bundlecaps=None):
229 229 """Given a source repo, construct a bundler.
230 230
231 231 bundlecaps is optional and can be used to specify the set of
232 232 capabilities which can be used to build the bundle.
233 233 """
234 234 # Set of capabilities we can use to build the bundle.
235 235 if bundlecaps is None:
236 236 bundlecaps = set()
237 237 self._bundlecaps = bundlecaps
238 238 self._changelog = repo.changelog
239 239 self._manifest = repo.manifest
240 240 reorder = repo.ui.config('bundle', 'reorder', 'auto')
241 241 if reorder == 'auto':
242 242 reorder = None
243 243 else:
244 244 reorder = util.parsebool(reorder)
245 245 self._repo = repo
246 246 self._reorder = reorder
247 247 self._progress = repo.ui.progress
248 248 def close(self):
249 249 return closechunk()
250 250
251 251 def fileheader(self, fname):
252 252 return chunkheader(len(fname)) + fname
253 253
254 254 def group(self, nodelist, revlog, lookup, units=None, reorder=None):
255 255 """Calculate a delta group, yielding a sequence of changegroup chunks
256 256 (strings).
257 257
258 258 Given a list of changeset revs, return a set of deltas and
259 259 metadata corresponding to nodes. The first delta is
260 260 first parent(nodelist[0]) -> nodelist[0], the receiver is
261 261 guaranteed to have this parent as it has all history before
262 262 these changesets. In the case firstparent is nullrev the
263 263 changegroup starts with a full revision.
264 264
265 265 If units is not None, progress detail will be generated, units specifies
266 266 the type of revlog that is touched (changelog, manifest, etc.).
267 267 """
268 268 # if we don't have any revisions touched by these changesets, bail
269 269 if len(nodelist) == 0:
270 270 yield self.close()
271 271 return
272 272
273 273 # for generaldelta revlogs, we linearize the revs; this will both be
274 274 # much quicker and generate a much smaller bundle
275 275 if (revlog._generaldelta and reorder is not False) or reorder:
276 276 dag = dagutil.revlogdag(revlog)
277 277 revs = set(revlog.rev(n) for n in nodelist)
278 278 revs = dag.linearize(revs)
279 279 else:
280 280 revs = sorted([revlog.rev(n) for n in nodelist])
281 281
282 282 # add the parent of the first rev
283 283 p = revlog.parentrevs(revs[0])[0]
284 284 revs.insert(0, p)
285 285
286 286 # build deltas
287 287 total = len(revs) - 1
288 288 msgbundling = _('bundling')
289 289 for r in xrange(len(revs) - 1):
290 290 if units is not None:
291 291 self._progress(msgbundling, r + 1, unit=units, total=total)
292 292 prev, curr = revs[r], revs[r + 1]
293 293 linknode = lookup(revlog.node(curr))
294 294 for c in self.revchunk(revlog, curr, prev, linknode):
295 295 yield c
296 296
297 297 yield self.close()
298 298
299 299 # filter any nodes that claim to be part of the known set
300 300 def prune(self, revlog, missing, commonrevs, source):
301 301 rr, rl = revlog.rev, revlog.linkrev
302 302 return [n for n in missing if rl(rr(n)) not in commonrevs]
303 303
304 304 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
305 305 '''yield a sequence of changegroup chunks (strings)'''
306 306 repo = self._repo
307 307 cl = self._changelog
308 308 mf = self._manifest
309 309 reorder = self._reorder
310 310 progress = self._progress
311 311
312 312 # for progress output
313 313 msgbundling = _('bundling')
314 314
315 315 mfs = {} # needed manifests
316 316 fnodes = {} # needed file nodes
317 317 changedfiles = set()
318 318
319 319 # Callback for the changelog, used to collect changed files and manifest
320 320 # nodes.
321 321 # Returns the linkrev node (identity in the changelog case).
322 322 def lookupcl(x):
323 323 c = cl.read(x)
324 324 changedfiles.update(c[3])
325 325 # record the first changeset introducing this manifest version
326 326 mfs.setdefault(c[0], x)
327 327 return x
328 328
329 329 # Callback for the manifest, used to collect linkrevs for filelog
330 330 # revisions.
331 331 # Returns the linkrev node (collected in lookupcl).
332 332 def lookupmf(x):
333 333 clnode = mfs[x]
334 334 if not fastpathlinkrev:
335 335 mdata = mf.readfast(x)
336 336 for f, n in mdata.iteritems():
337 337 if f in changedfiles:
338 338 # record the first changeset introducing this filelog
339 339 # version
340 340 fnodes[f].setdefault(n, clnode)
341 341 return clnode
342 342
343 343 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets'),
344 344 reorder=reorder):
345 345 yield chunk
346 346 progress(msgbundling, None)
347 347
348 348 for f in changedfiles:
349 349 fnodes[f] = {}
350 350 mfnodes = self.prune(mf, mfs, commonrevs, source)
351 351 for chunk in self.group(mfnodes, mf, lookupmf, units=_('manifests'),
352 352 reorder=reorder):
353 353 yield chunk
354 354 progress(msgbundling, None)
355 355
356 356 mfs.clear()
357 needed = set(cl.rev(x) for x in clnodes)
357 358
358 359 def linknodes(filerevlog, fname):
359 360 if fastpathlinkrev:
360 361 ln, llr = filerevlog.node, filerevlog.linkrev
361 needed = set(cl.rev(x) for x in clnodes)
362 362 def genfilenodes():
363 363 for r in filerevlog:
364 364 linkrev = llr(r)
365 365 if linkrev in needed:
366 366 yield filerevlog.node(r), cl.node(linkrev)
367 367 fnodes[fname] = dict(genfilenodes())
368 368 return fnodes.get(fname, {})
369 369
370 370 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
371 371 source):
372 372 yield chunk
373 373
374 374 yield self.close()
375 375 progress(msgbundling, None)
376 376
377 377 if clnodes:
378 378 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
379 379
380 380 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
381 381 repo = self._repo
382 382 progress = self._progress
383 383 reorder = self._reorder
384 384 msgbundling = _('bundling')
385 385
386 386 total = len(changedfiles)
387 387 # for progress output
388 388 msgfiles = _('files')
389 389 for i, fname in enumerate(sorted(changedfiles)):
390 390 filerevlog = repo.file(fname)
391 391 if not filerevlog:
392 392 raise util.Abort(_("empty or missing revlog for %s") % fname)
393 393
394 394 linkrevnodes = linknodes(filerevlog, fname)
395 395 # Lookup for filenodes, we collected the linkrev nodes above in the
396 396 # fastpath case and with lookupmf in the slowpath case.
397 397 def lookupfilelog(x):
398 398 return linkrevnodes[x]
399 399
400 400 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs, source)
401 401 if filenodes:
402 402 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
403 403 total=total)
404 404 yield self.fileheader(fname)
405 405 for chunk in self.group(filenodes, filerevlog, lookupfilelog,
406 406 reorder=reorder):
407 407 yield chunk
408 408
409 409 def revchunk(self, revlog, rev, prev, linknode):
410 410 node = revlog.node(rev)
411 411 p1, p2 = revlog.parentrevs(rev)
412 412 base = prev
413 413
414 414 prefix = ''
415 415 if base == nullrev:
416 416 delta = revlog.revision(node)
417 417 prefix = mdiff.trivialdiffheader(len(delta))
418 418 else:
419 419 delta = revlog.revdiff(base, rev)
420 420 p1n, p2n = revlog.parents(node)
421 421 basenode = revlog.node(base)
422 422 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode)
423 423 meta += prefix
424 424 l = len(meta) + len(delta)
425 425 yield chunkheader(l)
426 426 yield meta
427 427 yield delta
428 428 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
429 429 # do nothing with basenode, it is implicitly the previous one in HG10
430 430 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
General Comments 0
You need to be logged in to leave comments. Login now