##// END OF EJS Templates
changegroup: delete "if True" and reflow
Martin von Zweigbergk -
r32931:b08431e1 default
parent child Browse files
Show More
@@ -1,1027 +1,1025 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullrev,
19 19 short,
20 20 )
21 21
22 22 from . import (
23 23 dagutil,
24 24 discovery,
25 25 error,
26 26 mdiff,
27 27 phases,
28 28 pycompat,
29 29 util,
30 30 )
31 31
32 32 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
33 33 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
34 34 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
35 35
36 36 def readexactly(stream, n):
37 37 '''read n bytes from stream.read and abort if less was available'''
38 38 s = stream.read(n)
39 39 if len(s) < n:
40 40 raise error.Abort(_("stream ended unexpectedly"
41 41 " (got %d bytes, expected %d)")
42 42 % (len(s), n))
43 43 return s
44 44
45 45 def getchunk(stream):
46 46 """return the next chunk from stream as a string"""
47 47 d = readexactly(stream, 4)
48 48 l = struct.unpack(">l", d)[0]
49 49 if l <= 4:
50 50 if l:
51 51 raise error.Abort(_("invalid chunk length %d") % l)
52 52 return ""
53 53 return readexactly(stream, l - 4)
54 54
55 55 def chunkheader(length):
56 56 """return a changegroup chunk header (string)"""
57 57 return struct.pack(">l", length + 4)
58 58
59 59 def closechunk():
60 60 """return a changegroup chunk header (string) for a zero-length chunk"""
61 61 return struct.pack(">l", 0)
62 62
63 63 def combineresults(results):
64 64 """logic to combine 0 or more addchangegroup results into one"""
65 65 changedheads = 0
66 66 result = 1
67 67 for ret in results:
68 68 # If any changegroup result is 0, return 0
69 69 if ret == 0:
70 70 result = 0
71 71 break
72 72 if ret < -1:
73 73 changedheads += ret + 1
74 74 elif ret > 1:
75 75 changedheads += ret - 1
76 76 if changedheads > 0:
77 77 result = 1 + changedheads
78 78 elif changedheads < 0:
79 79 result = -1 + changedheads
80 80 return result
81 81
82 82 def writechunks(ui, chunks, filename, vfs=None):
83 83 """Write chunks to a file and return its filename.
84 84
85 85 The stream is assumed to be a bundle file.
86 86 Existing files will not be overwritten.
87 87 If no filename is specified, a temporary file is created.
88 88 """
89 89 fh = None
90 90 cleanup = None
91 91 try:
92 92 if filename:
93 93 if vfs:
94 94 fh = vfs.open(filename, "wb")
95 95 else:
96 96 # Increase default buffer size because default is usually
97 97 # small (4k is common on Linux).
98 98 fh = open(filename, "wb", 131072)
99 99 else:
100 100 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
101 101 fh = os.fdopen(fd, pycompat.sysstr("wb"))
102 102 cleanup = filename
103 103 for c in chunks:
104 104 fh.write(c)
105 105 cleanup = None
106 106 return filename
107 107 finally:
108 108 if fh is not None:
109 109 fh.close()
110 110 if cleanup is not None:
111 111 if filename and vfs:
112 112 vfs.unlink(cleanup)
113 113 else:
114 114 os.unlink(cleanup)
115 115
116 116 class cg1unpacker(object):
117 117 """Unpacker for cg1 changegroup streams.
118 118
119 119 A changegroup unpacker handles the framing of the revision data in
120 120 the wire format. Most consumers will want to use the apply()
121 121 method to add the changes from the changegroup to a repository.
122 122
123 123 If you're forwarding a changegroup unmodified to another consumer,
124 124 use getchunks(), which returns an iterator of changegroup
125 125 chunks. This is mostly useful for cases where you need to know the
126 126 data stream has ended by observing the end of the changegroup.
127 127
128 128 deltachunk() is useful only if you're applying delta data. Most
129 129 consumers should prefer apply() instead.
130 130
131 131 A few other public methods exist. Those are used only for
132 132 bundlerepo and some debug commands - their use is discouraged.
133 133 """
134 134 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
135 135 deltaheadersize = struct.calcsize(deltaheader)
136 136 version = '01'
137 137 _grouplistcount = 1 # One list of files after the manifests
138 138
139 139 def __init__(self, fh, alg, extras=None):
140 140 if alg is None:
141 141 alg = 'UN'
142 142 if alg not in util.compengines.supportedbundletypes:
143 143 raise error.Abort(_('unknown stream compression type: %s')
144 144 % alg)
145 145 if alg == 'BZ':
146 146 alg = '_truncatedBZ'
147 147
148 148 compengine = util.compengines.forbundletype(alg)
149 149 self._stream = compengine.decompressorreader(fh)
150 150 self._type = alg
151 151 self.extras = extras or {}
152 152 self.callback = None
153 153
154 154 # These methods (compressed, read, seek, tell) all appear to only
155 155 # be used by bundlerepo, but it's a little hard to tell.
156 156 def compressed(self):
157 157 return self._type is not None and self._type != 'UN'
158 158 def read(self, l):
159 159 return self._stream.read(l)
160 160 def seek(self, pos):
161 161 return self._stream.seek(pos)
162 162 def tell(self):
163 163 return self._stream.tell()
164 164 def close(self):
165 165 return self._stream.close()
166 166
167 167 def _chunklength(self):
168 168 d = readexactly(self._stream, 4)
169 169 l = struct.unpack(">l", d)[0]
170 170 if l <= 4:
171 171 if l:
172 172 raise error.Abort(_("invalid chunk length %d") % l)
173 173 return 0
174 174 if self.callback:
175 175 self.callback()
176 176 return l - 4
177 177
178 178 def changelogheader(self):
179 179 """v10 does not have a changelog header chunk"""
180 180 return {}
181 181
182 182 def manifestheader(self):
183 183 """v10 does not have a manifest header chunk"""
184 184 return {}
185 185
186 186 def filelogheader(self):
187 187 """return the header of the filelogs chunk, v10 only has the filename"""
188 188 l = self._chunklength()
189 189 if not l:
190 190 return {}
191 191 fname = readexactly(self._stream, l)
192 192 return {'filename': fname}
193 193
194 194 def _deltaheader(self, headertuple, prevnode):
195 195 node, p1, p2, cs = headertuple
196 196 if prevnode is None:
197 197 deltabase = p1
198 198 else:
199 199 deltabase = prevnode
200 200 flags = 0
201 201 return node, p1, p2, deltabase, cs, flags
202 202
203 203 def deltachunk(self, prevnode):
204 204 l = self._chunklength()
205 205 if not l:
206 206 return {}
207 207 headerdata = readexactly(self._stream, self.deltaheadersize)
208 208 header = struct.unpack(self.deltaheader, headerdata)
209 209 delta = readexactly(self._stream, l - self.deltaheadersize)
210 210 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
211 211 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
212 212 'deltabase': deltabase, 'delta': delta, 'flags': flags}
213 213
214 214 def getchunks(self):
215 215 """returns all the chunks contains in the bundle
216 216
217 217 Used when you need to forward the binary stream to a file or another
218 218 network API. To do so, it parse the changegroup data, otherwise it will
219 219 block in case of sshrepo because it don't know the end of the stream.
220 220 """
221 221 # an empty chunkgroup is the end of the changegroup
222 222 # a changegroup has at least 2 chunkgroups (changelog and manifest).
223 223 # after that, changegroup versions 1 and 2 have a series of groups
224 224 # with one group per file. changegroup 3 has a series of directory
225 225 # manifests before the files.
226 226 count = 0
227 227 emptycount = 0
228 228 while emptycount < self._grouplistcount:
229 229 empty = True
230 230 count += 1
231 231 while True:
232 232 chunk = getchunk(self)
233 233 if not chunk:
234 234 if empty and count > 2:
235 235 emptycount += 1
236 236 break
237 237 empty = False
238 238 yield chunkheader(len(chunk))
239 239 pos = 0
240 240 while pos < len(chunk):
241 241 next = pos + 2**20
242 242 yield chunk[pos:next]
243 243 pos = next
244 244 yield closechunk()
245 245
246 246 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
247 247 # We know that we'll never have more manifests than we had
248 248 # changesets.
249 249 self.callback = prog(_('manifests'), numchanges)
250 250 # no need to check for empty manifest group here:
251 251 # if the result of the merge of 1 and 2 is the same in 3 and 4,
252 252 # no new manifest will be created and the manifest group will
253 253 # be empty during the pull
254 254 self.manifestheader()
255 255 repo.manifestlog._revlog.addgroup(self, revmap, trp)
256 256 repo.ui.progress(_('manifests'), None)
257 257 self.callback = None
258 258
259 259 def apply(self, repo, tr, srctype, url, emptyok=False,
260 260 targetphase=phases.draft, expectedtotal=None):
261 261 """Add the changegroup returned by source.read() to this repo.
262 262 srctype is a string like 'push', 'pull', or 'unbundle'. url is
263 263 the URL of the repo where this changegroup is coming from.
264 264
265 265 Return an integer summarizing the change to this repo:
266 266 - nothing changed or no source: 0
267 267 - more heads than before: 1+added heads (2..n)
268 268 - fewer heads than before: -1-removed heads (-2..-n)
269 269 - number of heads stays the same: 1
270 270 """
271 271 repo = repo.unfiltered()
272 272 def csmap(x):
273 273 repo.ui.debug("add changeset %s\n" % short(x))
274 274 return len(cl)
275 275
276 276 def revmap(x):
277 277 return cl.rev(x)
278 278
279 279 changesets = files = revisions = 0
280 280
281 281 try:
282 if True:
283 # The transaction may already carry source information. In this
284 # case we use the top level data. We overwrite the argument
285 # because we need to use the top level value (if they exist)
286 # in this function.
287 srctype = tr.hookargs.setdefault('source', srctype)
288 url = tr.hookargs.setdefault('url', url)
289 repo.hook('prechangegroup', throw=True, **tr.hookargs)
282 # The transaction may already carry source information. In this
283 # case we use the top level data. We overwrite the argument
284 # because we need to use the top level value (if they exist)
285 # in this function.
286 srctype = tr.hookargs.setdefault('source', srctype)
287 url = tr.hookargs.setdefault('url', url)
288 repo.hook('prechangegroup', throw=True, **tr.hookargs)
290 289
291 # write changelog data to temp files so concurrent readers
292 # will not see an inconsistent view
293 cl = repo.changelog
294 cl.delayupdate(tr)
295 oldheads = set(cl.heads())
290 # write changelog data to temp files so concurrent readers
291 # will not see an inconsistent view
292 cl = repo.changelog
293 cl.delayupdate(tr)
294 oldheads = set(cl.heads())
296 295
297 trp = weakref.proxy(tr)
298 # pull off the changeset group
299 repo.ui.status(_("adding changesets\n"))
300 clstart = len(cl)
301 class prog(object):
302 def __init__(self, step, total):
303 self._step = step
304 self._total = total
305 self._count = 1
306 def __call__(self):
307 repo.ui.progress(self._step, self._count,
308 unit=_('chunks'), total=self._total)
309 self._count += 1
310 self.callback = prog(_('changesets'), expectedtotal)
296 trp = weakref.proxy(tr)
297 # pull off the changeset group
298 repo.ui.status(_("adding changesets\n"))
299 clstart = len(cl)
300 class prog(object):
301 def __init__(self, step, total):
302 self._step = step
303 self._total = total
304 self._count = 1
305 def __call__(self):
306 repo.ui.progress(self._step, self._count, unit=_('chunks'),
307 total=self._total)
308 self._count += 1
309 self.callback = prog(_('changesets'), expectedtotal)
311 310
312 efiles = set()
313 def onchangelog(cl, node):
314 efiles.update(cl.readfiles(node))
311 efiles = set()
312 def onchangelog(cl, node):
313 efiles.update(cl.readfiles(node))
315 314
316 self.changelogheader()
317 cgnodes = cl.addgroup(self, csmap, trp,
318 addrevisioncb=onchangelog)
319 efiles = len(efiles)
315 self.changelogheader()
316 cgnodes = cl.addgroup(self, csmap, trp, addrevisioncb=onchangelog)
317 efiles = len(efiles)
320 318
321 if not (cgnodes or emptyok):
322 raise error.Abort(_("received changelog group is empty"))
323 clend = len(cl)
324 changesets = clend - clstart
325 repo.ui.progress(_('changesets'), None)
326 self.callback = None
319 if not (cgnodes or emptyok):
320 raise error.Abort(_("received changelog group is empty"))
321 clend = len(cl)
322 changesets = clend - clstart
323 repo.ui.progress(_('changesets'), None)
324 self.callback = None
327 325
328 # pull off the manifest group
329 repo.ui.status(_("adding manifests\n"))
330 self._unpackmanifests(repo, revmap, trp, prog, changesets)
326 # pull off the manifest group
327 repo.ui.status(_("adding manifests\n"))
328 self._unpackmanifests(repo, revmap, trp, prog, changesets)
331 329
332 needfiles = {}
333 if repo.ui.configbool('server', 'validate', default=False):
334 cl = repo.changelog
335 ml = repo.manifestlog
336 # validate incoming csets have their manifests
337 for cset in xrange(clstart, clend):
338 mfnode = cl.changelogrevision(cset).manifest
339 mfest = ml[mfnode].readdelta()
340 # store file cgnodes we must see
341 for f, n in mfest.iteritems():
342 needfiles.setdefault(f, set()).add(n)
330 needfiles = {}
331 if repo.ui.configbool('server', 'validate', default=False):
332 cl = repo.changelog
333 ml = repo.manifestlog
334 # validate incoming csets have their manifests
335 for cset in xrange(clstart, clend):
336 mfnode = cl.changelogrevision(cset).manifest
337 mfest = ml[mfnode].readdelta()
338 # store file cgnodes we must see
339 for f, n in mfest.iteritems():
340 needfiles.setdefault(f, set()).add(n)
343 341
344 # process the files
345 repo.ui.status(_("adding file changes\n"))
346 newrevs, newfiles = _addchangegroupfiles(
347 repo, self, revmap, trp, efiles, needfiles)
348 revisions += newrevs
349 files += newfiles
342 # process the files
343 repo.ui.status(_("adding file changes\n"))
344 newrevs, newfiles = _addchangegroupfiles(
345 repo, self, revmap, trp, efiles, needfiles)
346 revisions += newrevs
347 files += newfiles
350 348
351 deltaheads = 0
352 if oldheads:
353 heads = cl.heads()
354 deltaheads = len(heads) - len(oldheads)
355 for h in heads:
356 if h not in oldheads and repo[h].closesbranch():
357 deltaheads -= 1
358 htext = ""
359 if deltaheads:
360 htext = _(" (%+d heads)") % deltaheads
349 deltaheads = 0
350 if oldheads:
351 heads = cl.heads()
352 deltaheads = len(heads) - len(oldheads)
353 for h in heads:
354 if h not in oldheads and repo[h].closesbranch():
355 deltaheads -= 1
356 htext = ""
357 if deltaheads:
358 htext = _(" (%+d heads)") % deltaheads
361 359
362 repo.ui.status(_("added %d changesets"
363 " with %d changes to %d files%s\n")
364 % (changesets, revisions, files, htext))
365 repo.invalidatevolatilesets()
360 repo.ui.status(_("added %d changesets"
361 " with %d changes to %d files%s\n")
362 % (changesets, revisions, files, htext))
363 repo.invalidatevolatilesets()
366 364
367 if changesets > 0:
368 if 'node' not in tr.hookargs:
369 tr.hookargs['node'] = hex(cl.node(clstart))
370 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
371 hookargs = dict(tr.hookargs)
372 else:
373 hookargs = dict(tr.hookargs)
374 hookargs['node'] = hex(cl.node(clstart))
375 hookargs['node_last'] = hex(cl.node(clend - 1))
376 repo.hook('pretxnchangegroup', throw=True, **hookargs)
365 if changesets > 0:
366 if 'node' not in tr.hookargs:
367 tr.hookargs['node'] = hex(cl.node(clstart))
368 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
369 hookargs = dict(tr.hookargs)
370 else:
371 hookargs = dict(tr.hookargs)
372 hookargs['node'] = hex(cl.node(clstart))
373 hookargs['node_last'] = hex(cl.node(clend - 1))
374 repo.hook('pretxnchangegroup', throw=True, **hookargs)
377 375
378 added = [cl.node(r) for r in xrange(clstart, clend)]
379 if srctype in ('push', 'serve'):
380 # Old servers can not push the boundary themselves.
381 # New servers won't push the boundary if changeset already
382 # exists locally as secret
383 #
384 # We should not use added here but the list of all change in
385 # the bundle
386 if repo.publishing():
387 phases.advanceboundary(repo, tr, phases.public, cgnodes)
388 else:
389 # Those changesets have been pushed from the
390 # outside, their phases are going to be pushed
391 # alongside. Therefor `targetphase` is
392 # ignored.
393 phases.advanceboundary(repo, tr, phases.draft, cgnodes)
394 phases.retractboundary(repo, tr, phases.draft, added)
395 elif srctype != 'strip':
396 # publishing only alter behavior during push
397 #
398 # strip should not touch boundary at all
399 phases.retractboundary(repo, tr, targetphase, added)
376 added = [cl.node(r) for r in xrange(clstart, clend)]
377 if srctype in ('push', 'serve'):
378 # Old servers can not push the boundary themselves.
379 # New servers won't push the boundary if changeset already
380 # exists locally as secret
381 #
382 # We should not use added here but the list of all change in
383 # the bundle
384 if repo.publishing():
385 phases.advanceboundary(repo, tr, phases.public, cgnodes)
386 else:
387 # Those changesets have been pushed from the
388 # outside, their phases are going to be pushed
389 # alongside. Therefor `targetphase` is
390 # ignored.
391 phases.advanceboundary(repo, tr, phases.draft, cgnodes)
392 phases.retractboundary(repo, tr, phases.draft, added)
393 elif srctype != 'strip':
394 # publishing only alter behavior during push
395 #
396 # strip should not touch boundary at all
397 phases.retractboundary(repo, tr, targetphase, added)
400 398
401 if changesets > 0:
399 if changesets > 0:
402 400
403 def runhooks():
404 # These hooks run when the lock releases, not when the
405 # transaction closes. So it's possible for the changelog
406 # to have changed since we last saw it.
407 if clstart >= len(repo):
408 return
401 def runhooks():
402 # These hooks run when the lock releases, not when the
403 # transaction closes. So it's possible for the changelog
404 # to have changed since we last saw it.
405 if clstart >= len(repo):
406 return
409 407
410 repo.hook("changegroup", **hookargs)
408 repo.hook("changegroup", **hookargs)
411 409
412 for n in added:
413 args = hookargs.copy()
414 args['node'] = hex(n)
415 del args['node_last']
416 repo.hook("incoming", **args)
410 for n in added:
411 args = hookargs.copy()
412 args['node'] = hex(n)
413 del args['node_last']
414 repo.hook("incoming", **args)
417 415
418 newheads = [h for h in repo.heads()
419 if h not in oldheads]
420 repo.ui.log("incoming",
421 "%s incoming changes - new heads: %s\n",
422 len(added),
423 ', '.join([hex(c[:6]) for c in newheads]))
416 newheads = [h for h in repo.heads()
417 if h not in oldheads]
418 repo.ui.log("incoming",
419 "%s incoming changes - new heads: %s\n",
420 len(added),
421 ', '.join([hex(c[:6]) for c in newheads]))
424 422
425 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
426 lambda tr: repo._afterlock(runhooks))
423 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
424 lambda tr: repo._afterlock(runhooks))
427 425 finally:
428 426 repo.ui.flush()
429 427 # never return 0 here:
430 428 if deltaheads < 0:
431 429 return deltaheads - 1
432 430 else:
433 431 return deltaheads + 1
434 432
435 433 class cg2unpacker(cg1unpacker):
436 434 """Unpacker for cg2 streams.
437 435
438 436 cg2 streams add support for generaldelta, so the delta header
439 437 format is slightly different. All other features about the data
440 438 remain the same.
441 439 """
442 440 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
443 441 deltaheadersize = struct.calcsize(deltaheader)
444 442 version = '02'
445 443
446 444 def _deltaheader(self, headertuple, prevnode):
447 445 node, p1, p2, deltabase, cs = headertuple
448 446 flags = 0
449 447 return node, p1, p2, deltabase, cs, flags
450 448
451 449 class cg3unpacker(cg2unpacker):
452 450 """Unpacker for cg3 streams.
453 451
454 452 cg3 streams add support for exchanging treemanifests and revlog
455 453 flags. It adds the revlog flags to the delta header and an empty chunk
456 454 separating manifests and files.
457 455 """
458 456 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
459 457 deltaheadersize = struct.calcsize(deltaheader)
460 458 version = '03'
461 459 _grouplistcount = 2 # One list of manifests and one list of files
462 460
463 461 def _deltaheader(self, headertuple, prevnode):
464 462 node, p1, p2, deltabase, cs, flags = headertuple
465 463 return node, p1, p2, deltabase, cs, flags
466 464
467 465 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
468 466 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog,
469 467 numchanges)
470 468 for chunkdata in iter(self.filelogheader, {}):
471 469 # If we get here, there are directory manifests in the changegroup
472 470 d = chunkdata["filename"]
473 471 repo.ui.debug("adding %s revisions\n" % d)
474 472 dirlog = repo.manifestlog._revlog.dirlog(d)
475 473 if not dirlog.addgroup(self, revmap, trp):
476 474 raise error.Abort(_("received dir revlog group is empty"))
477 475
478 476 class headerlessfixup(object):
479 477 def __init__(self, fh, h):
480 478 self._h = h
481 479 self._fh = fh
482 480 def read(self, n):
483 481 if self._h:
484 482 d, self._h = self._h[:n], self._h[n:]
485 483 if len(d) < n:
486 484 d += readexactly(self._fh, n - len(d))
487 485 return d
488 486 return readexactly(self._fh, n)
489 487
490 488 class cg1packer(object):
491 489 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
492 490 version = '01'
493 491 def __init__(self, repo, bundlecaps=None):
494 492 """Given a source repo, construct a bundler.
495 493
496 494 bundlecaps is optional and can be used to specify the set of
497 495 capabilities which can be used to build the bundle. While bundlecaps is
498 496 unused in core Mercurial, extensions rely on this feature to communicate
499 497 capabilities to customize the changegroup packer.
500 498 """
501 499 # Set of capabilities we can use to build the bundle.
502 500 if bundlecaps is None:
503 501 bundlecaps = set()
504 502 self._bundlecaps = bundlecaps
505 503 # experimental config: bundle.reorder
506 504 reorder = repo.ui.config('bundle', 'reorder', 'auto')
507 505 if reorder == 'auto':
508 506 reorder = None
509 507 else:
510 508 reorder = util.parsebool(reorder)
511 509 self._repo = repo
512 510 self._reorder = reorder
513 511 self._progress = repo.ui.progress
514 512 if self._repo.ui.verbose and not self._repo.ui.debugflag:
515 513 self._verbosenote = self._repo.ui.note
516 514 else:
517 515 self._verbosenote = lambda s: None
518 516
519 517 def close(self):
520 518 return closechunk()
521 519
522 520 def fileheader(self, fname):
523 521 return chunkheader(len(fname)) + fname
524 522
525 523 # Extracted both for clarity and for overriding in extensions.
526 524 def _sortgroup(self, revlog, nodelist, lookup):
527 525 """Sort nodes for change group and turn them into revnums."""
528 526 # for generaldelta revlogs, we linearize the revs; this will both be
529 527 # much quicker and generate a much smaller bundle
530 528 if (revlog._generaldelta and self._reorder is None) or self._reorder:
531 529 dag = dagutil.revlogdag(revlog)
532 530 return dag.linearize(set(revlog.rev(n) for n in nodelist))
533 531 else:
534 532 return sorted([revlog.rev(n) for n in nodelist])
535 533
536 534 def group(self, nodelist, revlog, lookup, units=None):
537 535 """Calculate a delta group, yielding a sequence of changegroup chunks
538 536 (strings).
539 537
540 538 Given a list of changeset revs, return a set of deltas and
541 539 metadata corresponding to nodes. The first delta is
542 540 first parent(nodelist[0]) -> nodelist[0], the receiver is
543 541 guaranteed to have this parent as it has all history before
544 542 these changesets. In the case firstparent is nullrev the
545 543 changegroup starts with a full revision.
546 544
547 545 If units is not None, progress detail will be generated, units specifies
548 546 the type of revlog that is touched (changelog, manifest, etc.).
549 547 """
550 548 # if we don't have any revisions touched by these changesets, bail
551 549 if len(nodelist) == 0:
552 550 yield self.close()
553 551 return
554 552
555 553 revs = self._sortgroup(revlog, nodelist, lookup)
556 554
557 555 # add the parent of the first rev
558 556 p = revlog.parentrevs(revs[0])[0]
559 557 revs.insert(0, p)
560 558
561 559 # build deltas
562 560 total = len(revs) - 1
563 561 msgbundling = _('bundling')
564 562 for r in xrange(len(revs) - 1):
565 563 if units is not None:
566 564 self._progress(msgbundling, r + 1, unit=units, total=total)
567 565 prev, curr = revs[r], revs[r + 1]
568 566 linknode = lookup(revlog.node(curr))
569 567 for c in self.revchunk(revlog, curr, prev, linknode):
570 568 yield c
571 569
572 570 if units is not None:
573 571 self._progress(msgbundling, None)
574 572 yield self.close()
575 573
576 574 # filter any nodes that claim to be part of the known set
577 575 def prune(self, revlog, missing, commonrevs):
578 576 rr, rl = revlog.rev, revlog.linkrev
579 577 return [n for n in missing if rl(rr(n)) not in commonrevs]
580 578
581 579 def _packmanifests(self, dir, mfnodes, lookuplinknode):
582 580 """Pack flat manifests into a changegroup stream."""
583 581 assert not dir
584 582 for chunk in self.group(mfnodes, self._repo.manifestlog._revlog,
585 583 lookuplinknode, units=_('manifests')):
586 584 yield chunk
587 585
588 586 def _manifestsdone(self):
589 587 return ''
590 588
591 589 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
592 590 '''yield a sequence of changegroup chunks (strings)'''
593 591 repo = self._repo
594 592 cl = repo.changelog
595 593
596 594 clrevorder = {}
597 595 mfs = {} # needed manifests
598 596 fnodes = {} # needed file nodes
599 597 changedfiles = set()
600 598
601 599 # Callback for the changelog, used to collect changed files and manifest
602 600 # nodes.
603 601 # Returns the linkrev node (identity in the changelog case).
604 602 def lookupcl(x):
605 603 c = cl.read(x)
606 604 clrevorder[x] = len(clrevorder)
607 605 n = c[0]
608 606 # record the first changeset introducing this manifest version
609 607 mfs.setdefault(n, x)
610 608 # Record a complete list of potentially-changed files in
611 609 # this manifest.
612 610 changedfiles.update(c[3])
613 611 return x
614 612
615 613 self._verbosenote(_('uncompressed size of bundle content:\n'))
616 614 size = 0
617 615 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
618 616 size += len(chunk)
619 617 yield chunk
620 618 self._verbosenote(_('%8.i (changelog)\n') % size)
621 619
622 620 # We need to make sure that the linkrev in the changegroup refers to
623 621 # the first changeset that introduced the manifest or file revision.
624 622 # The fastpath is usually safer than the slowpath, because the filelogs
625 623 # are walked in revlog order.
626 624 #
627 625 # When taking the slowpath with reorder=None and the manifest revlog
628 626 # uses generaldelta, the manifest may be walked in the "wrong" order.
629 627 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
630 628 # cc0ff93d0c0c).
631 629 #
632 630 # When taking the fastpath, we are only vulnerable to reordering
633 631 # of the changelog itself. The changelog never uses generaldelta, so
634 632 # it is only reordered when reorder=True. To handle this case, we
635 633 # simply take the slowpath, which already has the 'clrevorder' logic.
636 634 # This was also fixed in cc0ff93d0c0c.
637 635 fastpathlinkrev = fastpathlinkrev and not self._reorder
638 636 # Treemanifests don't work correctly with fastpathlinkrev
639 637 # either, because we don't discover which directory nodes to
640 638 # send along with files. This could probably be fixed.
641 639 fastpathlinkrev = fastpathlinkrev and (
642 640 'treemanifest' not in repo.requirements)
643 641
644 642 for chunk in self.generatemanifests(commonrevs, clrevorder,
645 643 fastpathlinkrev, mfs, fnodes):
646 644 yield chunk
647 645 mfs.clear()
648 646 clrevs = set(cl.rev(x) for x in clnodes)
649 647
650 648 if not fastpathlinkrev:
651 649 def linknodes(unused, fname):
652 650 return fnodes.get(fname, {})
653 651 else:
654 652 cln = cl.node
655 653 def linknodes(filerevlog, fname):
656 654 llr = filerevlog.linkrev
657 655 fln = filerevlog.node
658 656 revs = ((r, llr(r)) for r in filerevlog)
659 657 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
660 658
661 659 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
662 660 source):
663 661 yield chunk
664 662
665 663 yield self.close()
666 664
667 665 if clnodes:
668 666 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
669 667
670 668 def generatemanifests(self, commonrevs, clrevorder, fastpathlinkrev, mfs,
671 669 fnodes):
672 670 repo = self._repo
673 671 mfl = repo.manifestlog
674 672 dirlog = mfl._revlog.dirlog
675 673 tmfnodes = {'': mfs}
676 674
677 675 # Callback for the manifest, used to collect linkrevs for filelog
678 676 # revisions.
679 677 # Returns the linkrev node (collected in lookupcl).
680 678 def makelookupmflinknode(dir):
681 679 if fastpathlinkrev:
682 680 assert not dir
683 681 return mfs.__getitem__
684 682
685 683 def lookupmflinknode(x):
686 684 """Callback for looking up the linknode for manifests.
687 685
688 686 Returns the linkrev node for the specified manifest.
689 687
690 688 SIDE EFFECT:
691 689
692 690 1) fclnodes gets populated with the list of relevant
693 691 file nodes if we're not using fastpathlinkrev
694 692 2) When treemanifests are in use, collects treemanifest nodes
695 693 to send
696 694
697 695 Note that this means manifests must be completely sent to
698 696 the client before you can trust the list of files and
699 697 treemanifests to send.
700 698 """
701 699 clnode = tmfnodes[dir][x]
702 700 mdata = mfl.get(dir, x).readfast(shallow=True)
703 701 for p, n, fl in mdata.iterentries():
704 702 if fl == 't': # subdirectory manifest
705 703 subdir = dir + p + '/'
706 704 tmfclnodes = tmfnodes.setdefault(subdir, {})
707 705 tmfclnode = tmfclnodes.setdefault(n, clnode)
708 706 if clrevorder[clnode] < clrevorder[tmfclnode]:
709 707 tmfclnodes[n] = clnode
710 708 else:
711 709 f = dir + p
712 710 fclnodes = fnodes.setdefault(f, {})
713 711 fclnode = fclnodes.setdefault(n, clnode)
714 712 if clrevorder[clnode] < clrevorder[fclnode]:
715 713 fclnodes[n] = clnode
716 714 return clnode
717 715 return lookupmflinknode
718 716
719 717 size = 0
720 718 while tmfnodes:
721 719 dir = min(tmfnodes)
722 720 nodes = tmfnodes[dir]
723 721 prunednodes = self.prune(dirlog(dir), nodes, commonrevs)
724 722 if not dir or prunednodes:
725 723 for x in self._packmanifests(dir, prunednodes,
726 724 makelookupmflinknode(dir)):
727 725 size += len(x)
728 726 yield x
729 727 del tmfnodes[dir]
730 728 self._verbosenote(_('%8.i (manifests)\n') % size)
731 729 yield self._manifestsdone()
732 730
733 731 # The 'source' parameter is useful for extensions
734 732 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
735 733 repo = self._repo
736 734 progress = self._progress
737 735 msgbundling = _('bundling')
738 736
739 737 total = len(changedfiles)
740 738 # for progress output
741 739 msgfiles = _('files')
742 740 for i, fname in enumerate(sorted(changedfiles)):
743 741 filerevlog = repo.file(fname)
744 742 if not filerevlog:
745 743 raise error.Abort(_("empty or missing revlog for %s") % fname)
746 744
747 745 linkrevnodes = linknodes(filerevlog, fname)
748 746 # Lookup for filenodes, we collected the linkrev nodes above in the
749 747 # fastpath case and with lookupmf in the slowpath case.
750 748 def lookupfilelog(x):
751 749 return linkrevnodes[x]
752 750
753 751 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
754 752 if filenodes:
755 753 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
756 754 total=total)
757 755 h = self.fileheader(fname)
758 756 size = len(h)
759 757 yield h
760 758 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
761 759 size += len(chunk)
762 760 yield chunk
763 761 self._verbosenote(_('%8.i %s\n') % (size, fname))
764 762 progress(msgbundling, None)
765 763
766 764 def deltaparent(self, revlog, rev, p1, p2, prev):
767 765 return prev
768 766
769 767 def revchunk(self, revlog, rev, prev, linknode):
770 768 node = revlog.node(rev)
771 769 p1, p2 = revlog.parentrevs(rev)
772 770 base = self.deltaparent(revlog, rev, p1, p2, prev)
773 771
774 772 prefix = ''
775 773 if revlog.iscensored(base) or revlog.iscensored(rev):
776 774 try:
777 775 delta = revlog.revision(node, raw=True)
778 776 except error.CensoredNodeError as e:
779 777 delta = e.tombstone
780 778 if base == nullrev:
781 779 prefix = mdiff.trivialdiffheader(len(delta))
782 780 else:
783 781 baselen = revlog.rawsize(base)
784 782 prefix = mdiff.replacediffheader(baselen, len(delta))
785 783 elif base == nullrev:
786 784 delta = revlog.revision(node, raw=True)
787 785 prefix = mdiff.trivialdiffheader(len(delta))
788 786 else:
789 787 delta = revlog.revdiff(base, rev)
790 788 p1n, p2n = revlog.parents(node)
791 789 basenode = revlog.node(base)
792 790 flags = revlog.flags(rev)
793 791 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
794 792 meta += prefix
795 793 l = len(meta) + len(delta)
796 794 yield chunkheader(l)
797 795 yield meta
798 796 yield delta
799 797 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
800 798 # do nothing with basenode, it is implicitly the previous one in HG10
801 799 # do nothing with flags, it is implicitly 0 for cg1 and cg2
802 800 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
803 801
804 802 class cg2packer(cg1packer):
805 803 version = '02'
806 804 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
807 805
808 806 def __init__(self, repo, bundlecaps=None):
809 807 super(cg2packer, self).__init__(repo, bundlecaps)
810 808 if self._reorder is None:
811 809 # Since generaldelta is directly supported by cg2, reordering
812 810 # generally doesn't help, so we disable it by default (treating
813 811 # bundle.reorder=auto just like bundle.reorder=False).
814 812 self._reorder = False
815 813
816 814 def deltaparent(self, revlog, rev, p1, p2, prev):
817 815 dp = revlog.deltaparent(rev)
818 816 if dp == nullrev and revlog.storedeltachains:
819 817 # Avoid sending full revisions when delta parent is null. Pick prev
820 818 # in that case. It's tempting to pick p1 in this case, as p1 will
821 819 # be smaller in the common case. However, computing a delta against
822 820 # p1 may require resolving the raw text of p1, which could be
823 821 # expensive. The revlog caches should have prev cached, meaning
824 822 # less CPU for changegroup generation. There is likely room to add
825 823 # a flag and/or config option to control this behavior.
826 824 return prev
827 825 elif dp == nullrev:
828 826 # revlog is configured to use full snapshot for a reason,
829 827 # stick to full snapshot.
830 828 return nullrev
831 829 elif dp not in (p1, p2, prev):
832 830 # Pick prev when we can't be sure remote has the base revision.
833 831 return prev
834 832 else:
835 833 return dp
836 834
837 835 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
838 836 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
839 837 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
840 838
841 839 class cg3packer(cg2packer):
842 840 version = '03'
843 841 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
844 842
845 843 def _packmanifests(self, dir, mfnodes, lookuplinknode):
846 844 if dir:
847 845 yield self.fileheader(dir)
848 846
849 847 dirlog = self._repo.manifestlog._revlog.dirlog(dir)
850 848 for chunk in self.group(mfnodes, dirlog, lookuplinknode,
851 849 units=_('manifests')):
852 850 yield chunk
853 851
854 852 def _manifestsdone(self):
855 853 return self.close()
856 854
857 855 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
858 856 return struct.pack(
859 857 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
860 858
861 859 _packermap = {'01': (cg1packer, cg1unpacker),
862 860 # cg2 adds support for exchanging generaldelta
863 861 '02': (cg2packer, cg2unpacker),
864 862 # cg3 adds support for exchanging revlog flags and treemanifests
865 863 '03': (cg3packer, cg3unpacker),
866 864 }
867 865
868 866 def allsupportedversions(repo):
869 867 versions = set(_packermap.keys())
870 868 if not (repo.ui.configbool('experimental', 'changegroup3') or
871 869 repo.ui.configbool('experimental', 'treemanifest') or
872 870 'treemanifest' in repo.requirements):
873 871 versions.discard('03')
874 872 return versions
875 873
876 874 # Changegroup versions that can be applied to the repo
877 875 def supportedincomingversions(repo):
878 876 return allsupportedversions(repo)
879 877
880 878 # Changegroup versions that can be created from the repo
881 879 def supportedoutgoingversions(repo):
882 880 versions = allsupportedversions(repo)
883 881 if 'treemanifest' in repo.requirements:
884 882 # Versions 01 and 02 support only flat manifests and it's just too
885 883 # expensive to convert between the flat manifest and tree manifest on
886 884 # the fly. Since tree manifests are hashed differently, all of history
887 885 # would have to be converted. Instead, we simply don't even pretend to
888 886 # support versions 01 and 02.
889 887 versions.discard('01')
890 888 versions.discard('02')
891 889 return versions
892 890
893 891 def safeversion(repo):
894 892 # Finds the smallest version that it's safe to assume clients of the repo
895 893 # will support. For example, all hg versions that support generaldelta also
896 894 # support changegroup 02.
897 895 versions = supportedoutgoingversions(repo)
898 896 if 'generaldelta' in repo.requirements:
899 897 versions.discard('01')
900 898 assert versions
901 899 return min(versions)
902 900
903 901 def getbundler(version, repo, bundlecaps=None):
904 902 assert version in supportedoutgoingversions(repo)
905 903 return _packermap[version][0](repo, bundlecaps)
906 904
907 905 def getunbundler(version, fh, alg, extras=None):
908 906 return _packermap[version][1](fh, alg, extras=extras)
909 907
910 908 def _changegroupinfo(repo, nodes, source):
911 909 if repo.ui.verbose or source == 'bundle':
912 910 repo.ui.status(_("%d changesets found\n") % len(nodes))
913 911 if repo.ui.debugflag:
914 912 repo.ui.debug("list of changesets:\n")
915 913 for node in nodes:
916 914 repo.ui.debug("%s\n" % hex(node))
917 915
918 916 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
919 917 repo = repo.unfiltered()
920 918 commonrevs = outgoing.common
921 919 csets = outgoing.missing
922 920 heads = outgoing.missingheads
923 921 # We go through the fast path if we get told to, or if all (unfiltered
924 922 # heads have been requested (since we then know there all linkrevs will
925 923 # be pulled by the client).
926 924 heads.sort()
927 925 fastpathlinkrev = fastpath or (
928 926 repo.filtername is None and heads == sorted(repo.heads()))
929 927
930 928 repo.hook('preoutgoing', throw=True, source=source)
931 929 _changegroupinfo(repo, csets, source)
932 930 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
933 931
934 932 def getsubset(repo, outgoing, bundler, source, fastpath=False):
935 933 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
936 934 return getunbundler(bundler.version, util.chunkbuffer(gengroup), None,
937 935 {'clcount': len(outgoing.missing)})
938 936
939 937 def changegroupsubset(repo, roots, heads, source, version='01'):
940 938 """Compute a changegroup consisting of all the nodes that are
941 939 descendants of any of the roots and ancestors of any of the heads.
942 940 Return a chunkbuffer object whose read() method will return
943 941 successive changegroup chunks.
944 942
945 943 It is fairly complex as determining which filenodes and which
946 944 manifest nodes need to be included for the changeset to be complete
947 945 is non-trivial.
948 946
949 947 Another wrinkle is doing the reverse, figuring out which changeset in
950 948 the changegroup a particular filenode or manifestnode belongs to.
951 949 """
952 950 outgoing = discovery.outgoing(repo, missingroots=roots, missingheads=heads)
953 951 bundler = getbundler(version, repo)
954 952 return getsubset(repo, outgoing, bundler, source)
955 953
956 954 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
957 955 version='01'):
958 956 """Like getbundle, but taking a discovery.outgoing as an argument.
959 957
960 958 This is only implemented for local repos and reuses potentially
961 959 precomputed sets in outgoing. Returns a raw changegroup generator."""
962 960 if not outgoing.missing:
963 961 return None
964 962 bundler = getbundler(version, repo, bundlecaps)
965 963 return getsubsetraw(repo, outgoing, bundler, source)
966 964
967 965 def getchangegroup(repo, source, outgoing, bundlecaps=None,
968 966 version='01'):
969 967 """Like getbundle, but taking a discovery.outgoing as an argument.
970 968
971 969 This is only implemented for local repos and reuses potentially
972 970 precomputed sets in outgoing."""
973 971 if not outgoing.missing:
974 972 return None
975 973 bundler = getbundler(version, repo, bundlecaps)
976 974 return getsubset(repo, outgoing, bundler, source)
977 975
978 976 def getlocalchangegroup(repo, *args, **kwargs):
979 977 repo.ui.deprecwarn('getlocalchangegroup is deprecated, use getchangegroup',
980 978 '4.3')
981 979 return getchangegroup(repo, *args, **kwargs)
982 980
983 981 def changegroup(repo, basenodes, source):
984 982 # to avoid a race we use changegroupsubset() (issue1320)
985 983 return changegroupsubset(repo, basenodes, repo.heads(), source)
986 984
987 985 def _addchangegroupfiles(repo, source, revmap, trp, expectedfiles, needfiles):
988 986 revisions = 0
989 987 files = 0
990 988 for chunkdata in iter(source.filelogheader, {}):
991 989 files += 1
992 990 f = chunkdata["filename"]
993 991 repo.ui.debug("adding %s revisions\n" % f)
994 992 repo.ui.progress(_('files'), files, unit=_('files'),
995 993 total=expectedfiles)
996 994 fl = repo.file(f)
997 995 o = len(fl)
998 996 try:
999 997 if not fl.addgroup(source, revmap, trp):
1000 998 raise error.Abort(_("received file revlog group is empty"))
1001 999 except error.CensoredBaseError as e:
1002 1000 raise error.Abort(_("received delta base is censored: %s") % e)
1003 1001 revisions += len(fl) - o
1004 1002 if f in needfiles:
1005 1003 needs = needfiles[f]
1006 1004 for new in xrange(o, len(fl)):
1007 1005 n = fl.node(new)
1008 1006 if n in needs:
1009 1007 needs.remove(n)
1010 1008 else:
1011 1009 raise error.Abort(
1012 1010 _("received spurious file revlog entry"))
1013 1011 if not needs:
1014 1012 del needfiles[f]
1015 1013 repo.ui.progress(_('files'), None)
1016 1014
1017 1015 for f, needs in needfiles.iteritems():
1018 1016 fl = repo.file(f)
1019 1017 for n in needs:
1020 1018 try:
1021 1019 fl.rev(n)
1022 1020 except error.LookupError:
1023 1021 raise error.Abort(
1024 1022 _('missing file data for %s:%s - run hg verify') %
1025 1023 (f, hex(n)))
1026 1024
1027 1025 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now