##// END OF EJS Templates
changegroup: use progress helper...
Martin von Zweigbergk -
r38401:daa08d45 default
parent child Browse files
Show More
@@ -1,1016 +1,1017
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import weakref
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 hex,
17 17 nullrev,
18 18 short,
19 19 )
20 20
21 21 from . import (
22 22 dagutil,
23 23 error,
24 24 mdiff,
25 25 phases,
26 26 pycompat,
27 27 util,
28 28 )
29 29
30 30 from .utils import (
31 31 stringutil,
32 32 )
33 33
34 34 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
35 35 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
36 36 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
37 37
38 38 LFS_REQUIREMENT = 'lfs'
39 39
40 40 # When narrowing is finalized and no longer subject to format changes,
41 41 # we should move this to just "narrow" or similar.
42 42 NARROW_REQUIREMENT = 'narrowhg-experimental'
43 43
44 44 readexactly = util.readexactly
45 45
46 46 def getchunk(stream):
47 47 """return the next chunk from stream as a string"""
48 48 d = readexactly(stream, 4)
49 49 l = struct.unpack(">l", d)[0]
50 50 if l <= 4:
51 51 if l:
52 52 raise error.Abort(_("invalid chunk length %d") % l)
53 53 return ""
54 54 return readexactly(stream, l - 4)
55 55
56 56 def chunkheader(length):
57 57 """return a changegroup chunk header (string)"""
58 58 return struct.pack(">l", length + 4)
59 59
60 60 def closechunk():
61 61 """return a changegroup chunk header (string) for a zero-length chunk"""
62 62 return struct.pack(">l", 0)
63 63
64 64 def writechunks(ui, chunks, filename, vfs=None):
65 65 """Write chunks to a file and return its filename.
66 66
67 67 The stream is assumed to be a bundle file.
68 68 Existing files will not be overwritten.
69 69 If no filename is specified, a temporary file is created.
70 70 """
71 71 fh = None
72 72 cleanup = None
73 73 try:
74 74 if filename:
75 75 if vfs:
76 76 fh = vfs.open(filename, "wb")
77 77 else:
78 78 # Increase default buffer size because default is usually
79 79 # small (4k is common on Linux).
80 80 fh = open(filename, "wb", 131072)
81 81 else:
82 82 fd, filename = pycompat.mkstemp(prefix="hg-bundle-", suffix=".hg")
83 83 fh = os.fdopen(fd, r"wb")
84 84 cleanup = filename
85 85 for c in chunks:
86 86 fh.write(c)
87 87 cleanup = None
88 88 return filename
89 89 finally:
90 90 if fh is not None:
91 91 fh.close()
92 92 if cleanup is not None:
93 93 if filename and vfs:
94 94 vfs.unlink(cleanup)
95 95 else:
96 96 os.unlink(cleanup)
97 97
98 98 class cg1unpacker(object):
99 99 """Unpacker for cg1 changegroup streams.
100 100
101 101 A changegroup unpacker handles the framing of the revision data in
102 102 the wire format. Most consumers will want to use the apply()
103 103 method to add the changes from the changegroup to a repository.
104 104
105 105 If you're forwarding a changegroup unmodified to another consumer,
106 106 use getchunks(), which returns an iterator of changegroup
107 107 chunks. This is mostly useful for cases where you need to know the
108 108 data stream has ended by observing the end of the changegroup.
109 109
110 110 deltachunk() is useful only if you're applying delta data. Most
111 111 consumers should prefer apply() instead.
112 112
113 113 A few other public methods exist. Those are used only for
114 114 bundlerepo and some debug commands - their use is discouraged.
115 115 """
116 116 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
117 117 deltaheadersize = struct.calcsize(deltaheader)
118 118 version = '01'
119 119 _grouplistcount = 1 # One list of files after the manifests
120 120
121 121 def __init__(self, fh, alg, extras=None):
122 122 if alg is None:
123 123 alg = 'UN'
124 124 if alg not in util.compengines.supportedbundletypes:
125 125 raise error.Abort(_('unknown stream compression type: %s')
126 126 % alg)
127 127 if alg == 'BZ':
128 128 alg = '_truncatedBZ'
129 129
130 130 compengine = util.compengines.forbundletype(alg)
131 131 self._stream = compengine.decompressorreader(fh)
132 132 self._type = alg
133 133 self.extras = extras or {}
134 134 self.callback = None
135 135
136 136 # These methods (compressed, read, seek, tell) all appear to only
137 137 # be used by bundlerepo, but it's a little hard to tell.
138 138 def compressed(self):
139 139 return self._type is not None and self._type != 'UN'
140 140 def read(self, l):
141 141 return self._stream.read(l)
142 142 def seek(self, pos):
143 143 return self._stream.seek(pos)
144 144 def tell(self):
145 145 return self._stream.tell()
146 146 def close(self):
147 147 return self._stream.close()
148 148
149 149 def _chunklength(self):
150 150 d = readexactly(self._stream, 4)
151 151 l = struct.unpack(">l", d)[0]
152 152 if l <= 4:
153 153 if l:
154 154 raise error.Abort(_("invalid chunk length %d") % l)
155 155 return 0
156 156 if self.callback:
157 157 self.callback()
158 158 return l - 4
159 159
160 160 def changelogheader(self):
161 161 """v10 does not have a changelog header chunk"""
162 162 return {}
163 163
164 164 def manifestheader(self):
165 165 """v10 does not have a manifest header chunk"""
166 166 return {}
167 167
168 168 def filelogheader(self):
169 169 """return the header of the filelogs chunk, v10 only has the filename"""
170 170 l = self._chunklength()
171 171 if not l:
172 172 return {}
173 173 fname = readexactly(self._stream, l)
174 174 return {'filename': fname}
175 175
176 176 def _deltaheader(self, headertuple, prevnode):
177 177 node, p1, p2, cs = headertuple
178 178 if prevnode is None:
179 179 deltabase = p1
180 180 else:
181 181 deltabase = prevnode
182 182 flags = 0
183 183 return node, p1, p2, deltabase, cs, flags
184 184
185 185 def deltachunk(self, prevnode):
186 186 l = self._chunklength()
187 187 if not l:
188 188 return {}
189 189 headerdata = readexactly(self._stream, self.deltaheadersize)
190 190 header = struct.unpack(self.deltaheader, headerdata)
191 191 delta = readexactly(self._stream, l - self.deltaheadersize)
192 192 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
193 193 return (node, p1, p2, cs, deltabase, delta, flags)
194 194
195 195 def getchunks(self):
196 196 """returns all the chunks contains in the bundle
197 197
198 198 Used when you need to forward the binary stream to a file or another
199 199 network API. To do so, it parse the changegroup data, otherwise it will
200 200 block in case of sshrepo because it don't know the end of the stream.
201 201 """
202 202 # For changegroup 1 and 2, we expect 3 parts: changelog, manifestlog,
203 203 # and a list of filelogs. For changegroup 3, we expect 4 parts:
204 204 # changelog, manifestlog, a list of tree manifestlogs, and a list of
205 205 # filelogs.
206 206 #
207 207 # Changelog and manifestlog parts are terminated with empty chunks. The
208 208 # tree and file parts are a list of entry sections. Each entry section
209 209 # is a series of chunks terminating in an empty chunk. The list of these
210 210 # entry sections is terminated in yet another empty chunk, so we know
211 211 # we've reached the end of the tree/file list when we reach an empty
212 212 # chunk that was proceeded by no non-empty chunks.
213 213
214 214 parts = 0
215 215 while parts < 2 + self._grouplistcount:
216 216 noentries = True
217 217 while True:
218 218 chunk = getchunk(self)
219 219 if not chunk:
220 220 # The first two empty chunks represent the end of the
221 221 # changelog and the manifestlog portions. The remaining
222 222 # empty chunks represent either A) the end of individual
223 223 # tree or file entries in the file list, or B) the end of
224 224 # the entire list. It's the end of the entire list if there
225 225 # were no entries (i.e. noentries is True).
226 226 if parts < 2:
227 227 parts += 1
228 228 elif noentries:
229 229 parts += 1
230 230 break
231 231 noentries = False
232 232 yield chunkheader(len(chunk))
233 233 pos = 0
234 234 while pos < len(chunk):
235 235 next = pos + 2**20
236 236 yield chunk[pos:next]
237 237 pos = next
238 238 yield closechunk()
239 239
240 240 def _unpackmanifests(self, repo, revmap, trp, prog):
241 241 self.callback = prog.increment
242 242 # no need to check for empty manifest group here:
243 243 # if the result of the merge of 1 and 2 is the same in 3 and 4,
244 244 # no new manifest will be created and the manifest group will
245 245 # be empty during the pull
246 246 self.manifestheader()
247 247 deltas = self.deltaiter()
248 248 repo.manifestlog._revlog.addgroup(deltas, revmap, trp)
249 249 prog.complete()
250 250 self.callback = None
251 251
252 252 def apply(self, repo, tr, srctype, url, targetphase=phases.draft,
253 253 expectedtotal=None):
254 254 """Add the changegroup returned by source.read() to this repo.
255 255 srctype is a string like 'push', 'pull', or 'unbundle'. url is
256 256 the URL of the repo where this changegroup is coming from.
257 257
258 258 Return an integer summarizing the change to this repo:
259 259 - nothing changed or no source: 0
260 260 - more heads than before: 1+added heads (2..n)
261 261 - fewer heads than before: -1-removed heads (-2..-n)
262 262 - number of heads stays the same: 1
263 263 """
264 264 repo = repo.unfiltered()
265 265 def csmap(x):
266 266 repo.ui.debug("add changeset %s\n" % short(x))
267 267 return len(cl)
268 268
269 269 def revmap(x):
270 270 return cl.rev(x)
271 271
272 272 changesets = files = revisions = 0
273 273
274 274 try:
275 275 # The transaction may already carry source information. In this
276 276 # case we use the top level data. We overwrite the argument
277 277 # because we need to use the top level value (if they exist)
278 278 # in this function.
279 279 srctype = tr.hookargs.setdefault('source', srctype)
280 280 url = tr.hookargs.setdefault('url', url)
281 281 repo.hook('prechangegroup',
282 282 throw=True, **pycompat.strkwargs(tr.hookargs))
283 283
284 284 # write changelog data to temp files so concurrent readers
285 285 # will not see an inconsistent view
286 286 cl = repo.changelog
287 287 cl.delayupdate(tr)
288 288 oldheads = set(cl.heads())
289 289
290 290 trp = weakref.proxy(tr)
291 291 # pull off the changeset group
292 292 repo.ui.status(_("adding changesets\n"))
293 293 clstart = len(cl)
294 294 progress = repo.ui.makeprogress(_('changesets'), unit=_('chunks'),
295 295 total=expectedtotal)
296 296 self.callback = progress.increment
297 297
298 298 efiles = set()
299 299 def onchangelog(cl, node):
300 300 efiles.update(cl.readfiles(node))
301 301
302 302 self.changelogheader()
303 303 deltas = self.deltaiter()
304 304 cgnodes = cl.addgroup(deltas, csmap, trp, addrevisioncb=onchangelog)
305 305 efiles = len(efiles)
306 306
307 307 if not cgnodes:
308 308 repo.ui.develwarn('applied empty changegroup',
309 309 config='warn-empty-changegroup')
310 310 clend = len(cl)
311 311 changesets = clend - clstart
312 312 progress.complete()
313 313 self.callback = None
314 314
315 315 # pull off the manifest group
316 316 repo.ui.status(_("adding manifests\n"))
317 317 # We know that we'll never have more manifests than we had
318 318 # changesets.
319 319 progress = repo.ui.makeprogress(_('manifests'), unit=_('chunks'),
320 320 total=changesets)
321 321 self._unpackmanifests(repo, revmap, trp, progress)
322 322
323 323 needfiles = {}
324 324 if repo.ui.configbool('server', 'validate'):
325 325 cl = repo.changelog
326 326 ml = repo.manifestlog
327 327 # validate incoming csets have their manifests
328 328 for cset in xrange(clstart, clend):
329 329 mfnode = cl.changelogrevision(cset).manifest
330 330 mfest = ml[mfnode].readdelta()
331 331 # store file cgnodes we must see
332 332 for f, n in mfest.iteritems():
333 333 needfiles.setdefault(f, set()).add(n)
334 334
335 335 # process the files
336 336 repo.ui.status(_("adding file changes\n"))
337 337 newrevs, newfiles = _addchangegroupfiles(
338 338 repo, self, revmap, trp, efiles, needfiles)
339 339 revisions += newrevs
340 340 files += newfiles
341 341
342 342 deltaheads = 0
343 343 if oldheads:
344 344 heads = cl.heads()
345 345 deltaheads = len(heads) - len(oldheads)
346 346 for h in heads:
347 347 if h not in oldheads and repo[h].closesbranch():
348 348 deltaheads -= 1
349 349 htext = ""
350 350 if deltaheads:
351 351 htext = _(" (%+d heads)") % deltaheads
352 352
353 353 repo.ui.status(_("added %d changesets"
354 354 " with %d changes to %d files%s\n")
355 355 % (changesets, revisions, files, htext))
356 356 repo.invalidatevolatilesets()
357 357
358 358 if changesets > 0:
359 359 if 'node' not in tr.hookargs:
360 360 tr.hookargs['node'] = hex(cl.node(clstart))
361 361 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
362 362 hookargs = dict(tr.hookargs)
363 363 else:
364 364 hookargs = dict(tr.hookargs)
365 365 hookargs['node'] = hex(cl.node(clstart))
366 366 hookargs['node_last'] = hex(cl.node(clend - 1))
367 367 repo.hook('pretxnchangegroup',
368 368 throw=True, **pycompat.strkwargs(hookargs))
369 369
370 370 added = [cl.node(r) for r in xrange(clstart, clend)]
371 371 phaseall = None
372 372 if srctype in ('push', 'serve'):
373 373 # Old servers can not push the boundary themselves.
374 374 # New servers won't push the boundary if changeset already
375 375 # exists locally as secret
376 376 #
377 377 # We should not use added here but the list of all change in
378 378 # the bundle
379 379 if repo.publishing():
380 380 targetphase = phaseall = phases.public
381 381 else:
382 382 # closer target phase computation
383 383
384 384 # Those changesets have been pushed from the
385 385 # outside, their phases are going to be pushed
386 386 # alongside. Therefor `targetphase` is
387 387 # ignored.
388 388 targetphase = phaseall = phases.draft
389 389 if added:
390 390 phases.registernew(repo, tr, targetphase, added)
391 391 if phaseall is not None:
392 392 phases.advanceboundary(repo, tr, phaseall, cgnodes)
393 393
394 394 if changesets > 0:
395 395
396 396 def runhooks():
397 397 # These hooks run when the lock releases, not when the
398 398 # transaction closes. So it's possible for the changelog
399 399 # to have changed since we last saw it.
400 400 if clstart >= len(repo):
401 401 return
402 402
403 403 repo.hook("changegroup", **pycompat.strkwargs(hookargs))
404 404
405 405 for n in added:
406 406 args = hookargs.copy()
407 407 args['node'] = hex(n)
408 408 del args['node_last']
409 409 repo.hook("incoming", **pycompat.strkwargs(args))
410 410
411 411 newheads = [h for h in repo.heads()
412 412 if h not in oldheads]
413 413 repo.ui.log("incoming",
414 414 "%d incoming changes - new heads: %s\n",
415 415 len(added),
416 416 ', '.join([hex(c[:6]) for c in newheads]))
417 417
418 418 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
419 419 lambda tr: repo._afterlock(runhooks))
420 420 finally:
421 421 repo.ui.flush()
422 422 # never return 0 here:
423 423 if deltaheads < 0:
424 424 ret = deltaheads - 1
425 425 else:
426 426 ret = deltaheads + 1
427 427 return ret
428 428
429 429 def deltaiter(self):
430 430 """
431 431 returns an iterator of the deltas in this changegroup
432 432
433 433 Useful for passing to the underlying storage system to be stored.
434 434 """
435 435 chain = None
436 436 for chunkdata in iter(lambda: self.deltachunk(chain), {}):
437 437 # Chunkdata: (node, p1, p2, cs, deltabase, delta, flags)
438 438 yield chunkdata
439 439 chain = chunkdata[0]
440 440
441 441 class cg2unpacker(cg1unpacker):
442 442 """Unpacker for cg2 streams.
443 443
444 444 cg2 streams add support for generaldelta, so the delta header
445 445 format is slightly different. All other features about the data
446 446 remain the same.
447 447 """
448 448 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
449 449 deltaheadersize = struct.calcsize(deltaheader)
450 450 version = '02'
451 451
452 452 def _deltaheader(self, headertuple, prevnode):
453 453 node, p1, p2, deltabase, cs = headertuple
454 454 flags = 0
455 455 return node, p1, p2, deltabase, cs, flags
456 456
457 457 class cg3unpacker(cg2unpacker):
458 458 """Unpacker for cg3 streams.
459 459
460 460 cg3 streams add support for exchanging treemanifests and revlog
461 461 flags. It adds the revlog flags to the delta header and an empty chunk
462 462 separating manifests and files.
463 463 """
464 464 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
465 465 deltaheadersize = struct.calcsize(deltaheader)
466 466 version = '03'
467 467 _grouplistcount = 2 # One list of manifests and one list of files
468 468
469 469 def _deltaheader(self, headertuple, prevnode):
470 470 node, p1, p2, deltabase, cs, flags = headertuple
471 471 return node, p1, p2, deltabase, cs, flags
472 472
473 473 def _unpackmanifests(self, repo, revmap, trp, prog):
474 474 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog)
475 475 for chunkdata in iter(self.filelogheader, {}):
476 476 # If we get here, there are directory manifests in the changegroup
477 477 d = chunkdata["filename"]
478 478 repo.ui.debug("adding %s revisions\n" % d)
479 479 dirlog = repo.manifestlog._revlog.dirlog(d)
480 480 deltas = self.deltaiter()
481 481 if not dirlog.addgroup(deltas, revmap, trp):
482 482 raise error.Abort(_("received dir revlog group is empty"))
483 483
484 484 class headerlessfixup(object):
485 485 def __init__(self, fh, h):
486 486 self._h = h
487 487 self._fh = fh
488 488 def read(self, n):
489 489 if self._h:
490 490 d, self._h = self._h[:n], self._h[n:]
491 491 if len(d) < n:
492 492 d += readexactly(self._fh, n - len(d))
493 493 return d
494 494 return readexactly(self._fh, n)
495 495
496 496 class cg1packer(object):
497 497 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
498 498 version = '01'
499 499 def __init__(self, repo, bundlecaps=None):
500 500 """Given a source repo, construct a bundler.
501 501
502 502 bundlecaps is optional and can be used to specify the set of
503 503 capabilities which can be used to build the bundle. While bundlecaps is
504 504 unused in core Mercurial, extensions rely on this feature to communicate
505 505 capabilities to customize the changegroup packer.
506 506 """
507 507 # Set of capabilities we can use to build the bundle.
508 508 if bundlecaps is None:
509 509 bundlecaps = set()
510 510 self._bundlecaps = bundlecaps
511 511 # experimental config: bundle.reorder
512 512 reorder = repo.ui.config('bundle', 'reorder')
513 513 if reorder == 'auto':
514 514 reorder = None
515 515 else:
516 516 reorder = stringutil.parsebool(reorder)
517 517 self._repo = repo
518 518 self._reorder = reorder
519 519 self._progress = repo.ui.progress
520 520 if self._repo.ui.verbose and not self._repo.ui.debugflag:
521 521 self._verbosenote = self._repo.ui.note
522 522 else:
523 523 self._verbosenote = lambda s: None
524 524
525 525 def close(self):
526 526 return closechunk()
527 527
528 528 def fileheader(self, fname):
529 529 return chunkheader(len(fname)) + fname
530 530
531 531 # Extracted both for clarity and for overriding in extensions.
532 532 def _sortgroup(self, revlog, nodelist, lookup):
533 533 """Sort nodes for change group and turn them into revnums."""
534 534 # for generaldelta revlogs, we linearize the revs; this will both be
535 535 # much quicker and generate a much smaller bundle
536 536 if (revlog._generaldelta and self._reorder is None) or self._reorder:
537 537 dag = dagutil.revlogdag(revlog)
538 538 return dag.linearize(set(revlog.rev(n) for n in nodelist))
539 539 else:
540 540 return sorted([revlog.rev(n) for n in nodelist])
541 541
542 542 def group(self, nodelist, revlog, lookup, units=None):
543 543 """Calculate a delta group, yielding a sequence of changegroup chunks
544 544 (strings).
545 545
546 546 Given a list of changeset revs, return a set of deltas and
547 547 metadata corresponding to nodes. The first delta is
548 548 first parent(nodelist[0]) -> nodelist[0], the receiver is
549 549 guaranteed to have this parent as it has all history before
550 550 these changesets. In the case firstparent is nullrev the
551 551 changegroup starts with a full revision.
552 552
553 553 If units is not None, progress detail will be generated, units specifies
554 554 the type of revlog that is touched (changelog, manifest, etc.).
555 555 """
556 556 # if we don't have any revisions touched by these changesets, bail
557 557 if len(nodelist) == 0:
558 558 yield self.close()
559 559 return
560 560
561 561 revs = self._sortgroup(revlog, nodelist, lookup)
562 562
563 563 # add the parent of the first rev
564 564 p = revlog.parentrevs(revs[0])[0]
565 565 revs.insert(0, p)
566 566
567 567 # build deltas
568 568 total = len(revs) - 1
569 569 msgbundling = _('bundling')
570 570 for r in xrange(len(revs) - 1):
571 571 if units is not None:
572 572 self._progress(msgbundling, r + 1, unit=units, total=total)
573 573 prev, curr = revs[r], revs[r + 1]
574 574 linknode = lookup(revlog.node(curr))
575 575 for c in self.revchunk(revlog, curr, prev, linknode):
576 576 yield c
577 577
578 578 if units is not None:
579 579 self._progress(msgbundling, None)
580 580 yield self.close()
581 581
582 582 # filter any nodes that claim to be part of the known set
583 583 def prune(self, revlog, missing, commonrevs):
584 584 rr, rl = revlog.rev, revlog.linkrev
585 585 return [n for n in missing if rl(rr(n)) not in commonrevs]
586 586
587 587 def _packmanifests(self, dir, mfnodes, lookuplinknode):
588 588 """Pack flat manifests into a changegroup stream."""
589 589 assert not dir
590 590 for chunk in self.group(mfnodes, self._repo.manifestlog._revlog,
591 591 lookuplinknode, units=_('manifests')):
592 592 yield chunk
593 593
594 594 def _manifestsdone(self):
595 595 return ''
596 596
597 597 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
598 598 '''yield a sequence of changegroup chunks (strings)'''
599 599 repo = self._repo
600 600 cl = repo.changelog
601 601
602 602 clrevorder = {}
603 603 mfs = {} # needed manifests
604 604 fnodes = {} # needed file nodes
605 605 changedfiles = set()
606 606
607 607 # Callback for the changelog, used to collect changed files and manifest
608 608 # nodes.
609 609 # Returns the linkrev node (identity in the changelog case).
610 610 def lookupcl(x):
611 611 c = cl.read(x)
612 612 clrevorder[x] = len(clrevorder)
613 613 n = c[0]
614 614 # record the first changeset introducing this manifest version
615 615 mfs.setdefault(n, x)
616 616 # Record a complete list of potentially-changed files in
617 617 # this manifest.
618 618 changedfiles.update(c[3])
619 619 return x
620 620
621 621 self._verbosenote(_('uncompressed size of bundle content:\n'))
622 622 size = 0
623 623 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
624 624 size += len(chunk)
625 625 yield chunk
626 626 self._verbosenote(_('%8.i (changelog)\n') % size)
627 627
628 628 # We need to make sure that the linkrev in the changegroup refers to
629 629 # the first changeset that introduced the manifest or file revision.
630 630 # The fastpath is usually safer than the slowpath, because the filelogs
631 631 # are walked in revlog order.
632 632 #
633 633 # When taking the slowpath with reorder=None and the manifest revlog
634 634 # uses generaldelta, the manifest may be walked in the "wrong" order.
635 635 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
636 636 # cc0ff93d0c0c).
637 637 #
638 638 # When taking the fastpath, we are only vulnerable to reordering
639 639 # of the changelog itself. The changelog never uses generaldelta, so
640 640 # it is only reordered when reorder=True. To handle this case, we
641 641 # simply take the slowpath, which already has the 'clrevorder' logic.
642 642 # This was also fixed in cc0ff93d0c0c.
643 643 fastpathlinkrev = fastpathlinkrev and not self._reorder
644 644 # Treemanifests don't work correctly with fastpathlinkrev
645 645 # either, because we don't discover which directory nodes to
646 646 # send along with files. This could probably be fixed.
647 647 fastpathlinkrev = fastpathlinkrev and (
648 648 'treemanifest' not in repo.requirements)
649 649
650 650 for chunk in self.generatemanifests(commonrevs, clrevorder,
651 651 fastpathlinkrev, mfs, fnodes, source):
652 652 yield chunk
653 653 mfs.clear()
654 654 clrevs = set(cl.rev(x) for x in clnodes)
655 655
656 656 if not fastpathlinkrev:
657 657 def linknodes(unused, fname):
658 658 return fnodes.get(fname, {})
659 659 else:
660 660 cln = cl.node
661 661 def linknodes(filerevlog, fname):
662 662 llr = filerevlog.linkrev
663 663 fln = filerevlog.node
664 664 revs = ((r, llr(r)) for r in filerevlog)
665 665 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
666 666
667 667 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
668 668 source):
669 669 yield chunk
670 670
671 671 yield self.close()
672 672
673 673 if clnodes:
674 674 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
675 675
676 676 def generatemanifests(self, commonrevs, clrevorder, fastpathlinkrev, mfs,
677 677 fnodes, source):
678 678 """Returns an iterator of changegroup chunks containing manifests.
679 679
680 680 `source` is unused here, but is used by extensions like remotefilelog to
681 681 change what is sent based in pulls vs pushes, etc.
682 682 """
683 683 repo = self._repo
684 684 mfl = repo.manifestlog
685 685 dirlog = mfl._revlog.dirlog
686 686 tmfnodes = {'': mfs}
687 687
688 688 # Callback for the manifest, used to collect linkrevs for filelog
689 689 # revisions.
690 690 # Returns the linkrev node (collected in lookupcl).
691 691 def makelookupmflinknode(dir, nodes):
692 692 if fastpathlinkrev:
693 693 assert not dir
694 694 return mfs.__getitem__
695 695
696 696 def lookupmflinknode(x):
697 697 """Callback for looking up the linknode for manifests.
698 698
699 699 Returns the linkrev node for the specified manifest.
700 700
701 701 SIDE EFFECT:
702 702
703 703 1) fclnodes gets populated with the list of relevant
704 704 file nodes if we're not using fastpathlinkrev
705 705 2) When treemanifests are in use, collects treemanifest nodes
706 706 to send
707 707
708 708 Note that this means manifests must be completely sent to
709 709 the client before you can trust the list of files and
710 710 treemanifests to send.
711 711 """
712 712 clnode = nodes[x]
713 713 mdata = mfl.get(dir, x).readfast(shallow=True)
714 714 for p, n, fl in mdata.iterentries():
715 715 if fl == 't': # subdirectory manifest
716 716 subdir = dir + p + '/'
717 717 tmfclnodes = tmfnodes.setdefault(subdir, {})
718 718 tmfclnode = tmfclnodes.setdefault(n, clnode)
719 719 if clrevorder[clnode] < clrevorder[tmfclnode]:
720 720 tmfclnodes[n] = clnode
721 721 else:
722 722 f = dir + p
723 723 fclnodes = fnodes.setdefault(f, {})
724 724 fclnode = fclnodes.setdefault(n, clnode)
725 725 if clrevorder[clnode] < clrevorder[fclnode]:
726 726 fclnodes[n] = clnode
727 727 return clnode
728 728 return lookupmflinknode
729 729
730 730 size = 0
731 731 while tmfnodes:
732 732 dir, nodes = tmfnodes.popitem()
733 733 prunednodes = self.prune(dirlog(dir), nodes, commonrevs)
734 734 if not dir or prunednodes:
735 735 for x in self._packmanifests(dir, prunednodes,
736 736 makelookupmflinknode(dir, nodes)):
737 737 size += len(x)
738 738 yield x
739 739 self._verbosenote(_('%8.i (manifests)\n') % size)
740 740 yield self._manifestsdone()
741 741
742 742 # The 'source' parameter is useful for extensions
743 743 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
744 744 repo = self._repo
745 745 progress = self._progress
746 746 msgbundling = _('bundling')
747 747
748 748 total = len(changedfiles)
749 749 # for progress output
750 750 msgfiles = _('files')
751 751 for i, fname in enumerate(sorted(changedfiles)):
752 752 filerevlog = repo.file(fname)
753 753 if not filerevlog:
754 754 raise error.Abort(_("empty or missing file data for %s") %
755 755 fname)
756 756
757 757 linkrevnodes = linknodes(filerevlog, fname)
758 758 # Lookup for filenodes, we collected the linkrev nodes above in the
759 759 # fastpath case and with lookupmf in the slowpath case.
760 760 def lookupfilelog(x):
761 761 return linkrevnodes[x]
762 762
763 763 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
764 764 if filenodes:
765 765 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
766 766 total=total)
767 767 h = self.fileheader(fname)
768 768 size = len(h)
769 769 yield h
770 770 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
771 771 size += len(chunk)
772 772 yield chunk
773 773 self._verbosenote(_('%8.i %s\n') % (size, fname))
774 774 progress(msgbundling, None)
775 775
776 776 def deltaparent(self, revlog, rev, p1, p2, prev):
777 777 if not revlog.candelta(prev, rev):
778 778 raise error.ProgrammingError('cg1 should not be used in this case')
779 779 return prev
780 780
781 781 def revchunk(self, revlog, rev, prev, linknode):
782 782 node = revlog.node(rev)
783 783 p1, p2 = revlog.parentrevs(rev)
784 784 base = self.deltaparent(revlog, rev, p1, p2, prev)
785 785
786 786 prefix = ''
787 787 if revlog.iscensored(base) or revlog.iscensored(rev):
788 788 try:
789 789 delta = revlog.revision(node, raw=True)
790 790 except error.CensoredNodeError as e:
791 791 delta = e.tombstone
792 792 if base == nullrev:
793 793 prefix = mdiff.trivialdiffheader(len(delta))
794 794 else:
795 795 baselen = revlog.rawsize(base)
796 796 prefix = mdiff.replacediffheader(baselen, len(delta))
797 797 elif base == nullrev:
798 798 delta = revlog.revision(node, raw=True)
799 799 prefix = mdiff.trivialdiffheader(len(delta))
800 800 else:
801 801 delta = revlog.revdiff(base, rev)
802 802 p1n, p2n = revlog.parents(node)
803 803 basenode = revlog.node(base)
804 804 flags = revlog.flags(rev)
805 805 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
806 806 meta += prefix
807 807 l = len(meta) + len(delta)
808 808 yield chunkheader(l)
809 809 yield meta
810 810 yield delta
811 811 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
812 812 # do nothing with basenode, it is implicitly the previous one in HG10
813 813 # do nothing with flags, it is implicitly 0 for cg1 and cg2
814 814 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
815 815
816 816 class cg2packer(cg1packer):
817 817 version = '02'
818 818 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
819 819
820 820 def __init__(self, repo, bundlecaps=None):
821 821 super(cg2packer, self).__init__(repo, bundlecaps)
822 822 if self._reorder is None:
823 823 # Since generaldelta is directly supported by cg2, reordering
824 824 # generally doesn't help, so we disable it by default (treating
825 825 # bundle.reorder=auto just like bundle.reorder=False).
826 826 self._reorder = False
827 827
828 828 def deltaparent(self, revlog, rev, p1, p2, prev):
829 829 dp = revlog.deltaparent(rev)
830 830 if dp == nullrev and revlog.storedeltachains:
831 831 # Avoid sending full revisions when delta parent is null. Pick prev
832 832 # in that case. It's tempting to pick p1 in this case, as p1 will
833 833 # be smaller in the common case. However, computing a delta against
834 834 # p1 may require resolving the raw text of p1, which could be
835 835 # expensive. The revlog caches should have prev cached, meaning
836 836 # less CPU for changegroup generation. There is likely room to add
837 837 # a flag and/or config option to control this behavior.
838 838 base = prev
839 839 elif dp == nullrev:
840 840 # revlog is configured to use full snapshot for a reason,
841 841 # stick to full snapshot.
842 842 base = nullrev
843 843 elif dp not in (p1, p2, prev):
844 844 # Pick prev when we can't be sure remote has the base revision.
845 845 return prev
846 846 else:
847 847 base = dp
848 848 if base != nullrev and not revlog.candelta(base, rev):
849 849 base = nullrev
850 850 return base
851 851
852 852 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
853 853 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
854 854 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
855 855
856 856 class cg3packer(cg2packer):
857 857 version = '03'
858 858 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
859 859
860 860 def _packmanifests(self, dir, mfnodes, lookuplinknode):
861 861 if dir:
862 862 yield self.fileheader(dir)
863 863
864 864 dirlog = self._repo.manifestlog._revlog.dirlog(dir)
865 865 for chunk in self.group(mfnodes, dirlog, lookuplinknode,
866 866 units=_('manifests')):
867 867 yield chunk
868 868
869 869 def _manifestsdone(self):
870 870 return self.close()
871 871
872 872 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
873 873 return struct.pack(
874 874 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
875 875
876 876 _packermap = {'01': (cg1packer, cg1unpacker),
877 877 # cg2 adds support for exchanging generaldelta
878 878 '02': (cg2packer, cg2unpacker),
879 879 # cg3 adds support for exchanging revlog flags and treemanifests
880 880 '03': (cg3packer, cg3unpacker),
881 881 }
882 882
883 883 def allsupportedversions(repo):
884 884 versions = set(_packermap.keys())
885 885 if not (repo.ui.configbool('experimental', 'changegroup3') or
886 886 repo.ui.configbool('experimental', 'treemanifest') or
887 887 'treemanifest' in repo.requirements):
888 888 versions.discard('03')
889 889 return versions
890 890
891 891 # Changegroup versions that can be applied to the repo
892 892 def supportedincomingversions(repo):
893 893 return allsupportedversions(repo)
894 894
895 895 # Changegroup versions that can be created from the repo
896 896 def supportedoutgoingversions(repo):
897 897 versions = allsupportedversions(repo)
898 898 if 'treemanifest' in repo.requirements:
899 899 # Versions 01 and 02 support only flat manifests and it's just too
900 900 # expensive to convert between the flat manifest and tree manifest on
901 901 # the fly. Since tree manifests are hashed differently, all of history
902 902 # would have to be converted. Instead, we simply don't even pretend to
903 903 # support versions 01 and 02.
904 904 versions.discard('01')
905 905 versions.discard('02')
906 906 if NARROW_REQUIREMENT in repo.requirements:
907 907 # Versions 01 and 02 don't support revlog flags, and we need to
908 908 # support that for stripping and unbundling to work.
909 909 versions.discard('01')
910 910 versions.discard('02')
911 911 if LFS_REQUIREMENT in repo.requirements:
912 912 # Versions 01 and 02 don't support revlog flags, and we need to
913 913 # mark LFS entries with REVIDX_EXTSTORED.
914 914 versions.discard('01')
915 915 versions.discard('02')
916 916
917 917 return versions
918 918
919 919 def localversion(repo):
920 920 # Finds the best version to use for bundles that are meant to be used
921 921 # locally, such as those from strip and shelve, and temporary bundles.
922 922 return max(supportedoutgoingversions(repo))
923 923
924 924 def safeversion(repo):
925 925 # Finds the smallest version that it's safe to assume clients of the repo
926 926 # will support. For example, all hg versions that support generaldelta also
927 927 # support changegroup 02.
928 928 versions = supportedoutgoingversions(repo)
929 929 if 'generaldelta' in repo.requirements:
930 930 versions.discard('01')
931 931 assert versions
932 932 return min(versions)
933 933
934 934 def getbundler(version, repo, bundlecaps=None):
935 935 assert version in supportedoutgoingversions(repo)
936 936 return _packermap[version][0](repo, bundlecaps)
937 937
938 938 def getunbundler(version, fh, alg, extras=None):
939 939 return _packermap[version][1](fh, alg, extras=extras)
940 940
941 941 def _changegroupinfo(repo, nodes, source):
942 942 if repo.ui.verbose or source == 'bundle':
943 943 repo.ui.status(_("%d changesets found\n") % len(nodes))
944 944 if repo.ui.debugflag:
945 945 repo.ui.debug("list of changesets:\n")
946 946 for node in nodes:
947 947 repo.ui.debug("%s\n" % hex(node))
948 948
949 949 def makechangegroup(repo, outgoing, version, source, fastpath=False,
950 950 bundlecaps=None):
951 951 cgstream = makestream(repo, outgoing, version, source,
952 952 fastpath=fastpath, bundlecaps=bundlecaps)
953 953 return getunbundler(version, util.chunkbuffer(cgstream), None,
954 954 {'clcount': len(outgoing.missing) })
955 955
956 956 def makestream(repo, outgoing, version, source, fastpath=False,
957 957 bundlecaps=None):
958 958 bundler = getbundler(version, repo, bundlecaps=bundlecaps)
959 959
960 960 repo = repo.unfiltered()
961 961 commonrevs = outgoing.common
962 962 csets = outgoing.missing
963 963 heads = outgoing.missingheads
964 964 # We go through the fast path if we get told to, or if all (unfiltered
965 965 # heads have been requested (since we then know there all linkrevs will
966 966 # be pulled by the client).
967 967 heads.sort()
968 968 fastpathlinkrev = fastpath or (
969 969 repo.filtername is None and heads == sorted(repo.heads()))
970 970
971 971 repo.hook('preoutgoing', throw=True, source=source)
972 972 _changegroupinfo(repo, csets, source)
973 973 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
974 974
975 975 def _addchangegroupfiles(repo, source, revmap, trp, expectedfiles, needfiles):
976 976 revisions = 0
977 977 files = 0
978 progress = repo.ui.makeprogress(_('files'), unit=_('files'),
979 total=expectedfiles)
978 980 for chunkdata in iter(source.filelogheader, {}):
979 981 files += 1
980 982 f = chunkdata["filename"]
981 983 repo.ui.debug("adding %s revisions\n" % f)
982 repo.ui.progress(_('files'), files, unit=_('files'),
983 total=expectedfiles)
984 progress.increment()
984 985 fl = repo.file(f)
985 986 o = len(fl)
986 987 try:
987 988 deltas = source.deltaiter()
988 989 if not fl.addgroup(deltas, revmap, trp):
989 990 raise error.Abort(_("received file revlog group is empty"))
990 991 except error.CensoredBaseError as e:
991 992 raise error.Abort(_("received delta base is censored: %s") % e)
992 993 revisions += len(fl) - o
993 994 if f in needfiles:
994 995 needs = needfiles[f]
995 996 for new in xrange(o, len(fl)):
996 997 n = fl.node(new)
997 998 if n in needs:
998 999 needs.remove(n)
999 1000 else:
1000 1001 raise error.Abort(
1001 1002 _("received spurious file revlog entry"))
1002 1003 if not needs:
1003 1004 del needfiles[f]
1004 repo.ui.progress(_('files'), None)
1005 progress.complete()
1005 1006
1006 1007 for f, needs in needfiles.iteritems():
1007 1008 fl = repo.file(f)
1008 1009 for n in needs:
1009 1010 try:
1010 1011 fl.rev(n)
1011 1012 except error.LookupError:
1012 1013 raise error.Abort(
1013 1014 _('missing file data for %s:%s - run hg verify') %
1014 1015 (f, hex(n)))
1015 1016
1016 1017 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now