##// END OF EJS Templates
changegroup: use progress helper in apply() (API)...
Martin von Zweigbergk -
r38365:83534c4e default
parent child Browse files
Show More
@@ -1,1022 +1,1016 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import weakref
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 hex,
17 17 nullrev,
18 18 short,
19 19 )
20 20
21 21 from . import (
22 22 dagutil,
23 23 error,
24 24 mdiff,
25 25 phases,
26 26 pycompat,
27 27 util,
28 28 )
29 29
30 30 from .utils import (
31 31 stringutil,
32 32 )
33 33
34 34 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
35 35 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
36 36 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
37 37
38 38 LFS_REQUIREMENT = 'lfs'
39 39
40 40 # When narrowing is finalized and no longer subject to format changes,
41 41 # we should move this to just "narrow" or similar.
42 42 NARROW_REQUIREMENT = 'narrowhg-experimental'
43 43
44 44 readexactly = util.readexactly
45 45
46 46 def getchunk(stream):
47 47 """return the next chunk from stream as a string"""
48 48 d = readexactly(stream, 4)
49 49 l = struct.unpack(">l", d)[0]
50 50 if l <= 4:
51 51 if l:
52 52 raise error.Abort(_("invalid chunk length %d") % l)
53 53 return ""
54 54 return readexactly(stream, l - 4)
55 55
56 56 def chunkheader(length):
57 57 """return a changegroup chunk header (string)"""
58 58 return struct.pack(">l", length + 4)
59 59
60 60 def closechunk():
61 61 """return a changegroup chunk header (string) for a zero-length chunk"""
62 62 return struct.pack(">l", 0)
63 63
64 64 def writechunks(ui, chunks, filename, vfs=None):
65 65 """Write chunks to a file and return its filename.
66 66
67 67 The stream is assumed to be a bundle file.
68 68 Existing files will not be overwritten.
69 69 If no filename is specified, a temporary file is created.
70 70 """
71 71 fh = None
72 72 cleanup = None
73 73 try:
74 74 if filename:
75 75 if vfs:
76 76 fh = vfs.open(filename, "wb")
77 77 else:
78 78 # Increase default buffer size because default is usually
79 79 # small (4k is common on Linux).
80 80 fh = open(filename, "wb", 131072)
81 81 else:
82 82 fd, filename = pycompat.mkstemp(prefix="hg-bundle-", suffix=".hg")
83 83 fh = os.fdopen(fd, r"wb")
84 84 cleanup = filename
85 85 for c in chunks:
86 86 fh.write(c)
87 87 cleanup = None
88 88 return filename
89 89 finally:
90 90 if fh is not None:
91 91 fh.close()
92 92 if cleanup is not None:
93 93 if filename and vfs:
94 94 vfs.unlink(cleanup)
95 95 else:
96 96 os.unlink(cleanup)
97 97
98 98 class cg1unpacker(object):
99 99 """Unpacker for cg1 changegroup streams.
100 100
101 101 A changegroup unpacker handles the framing of the revision data in
102 102 the wire format. Most consumers will want to use the apply()
103 103 method to add the changes from the changegroup to a repository.
104 104
105 105 If you're forwarding a changegroup unmodified to another consumer,
106 106 use getchunks(), which returns an iterator of changegroup
107 107 chunks. This is mostly useful for cases where you need to know the
108 108 data stream has ended by observing the end of the changegroup.
109 109
110 110 deltachunk() is useful only if you're applying delta data. Most
111 111 consumers should prefer apply() instead.
112 112
113 113 A few other public methods exist. Those are used only for
114 114 bundlerepo and some debug commands - their use is discouraged.
115 115 """
116 116 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
117 117 deltaheadersize = struct.calcsize(deltaheader)
118 118 version = '01'
119 119 _grouplistcount = 1 # One list of files after the manifests
120 120
121 121 def __init__(self, fh, alg, extras=None):
122 122 if alg is None:
123 123 alg = 'UN'
124 124 if alg not in util.compengines.supportedbundletypes:
125 125 raise error.Abort(_('unknown stream compression type: %s')
126 126 % alg)
127 127 if alg == 'BZ':
128 128 alg = '_truncatedBZ'
129 129
130 130 compengine = util.compengines.forbundletype(alg)
131 131 self._stream = compengine.decompressorreader(fh)
132 132 self._type = alg
133 133 self.extras = extras or {}
134 134 self.callback = None
135 135
136 136 # These methods (compressed, read, seek, tell) all appear to only
137 137 # be used by bundlerepo, but it's a little hard to tell.
138 138 def compressed(self):
139 139 return self._type is not None and self._type != 'UN'
140 140 def read(self, l):
141 141 return self._stream.read(l)
142 142 def seek(self, pos):
143 143 return self._stream.seek(pos)
144 144 def tell(self):
145 145 return self._stream.tell()
146 146 def close(self):
147 147 return self._stream.close()
148 148
149 149 def _chunklength(self):
150 150 d = readexactly(self._stream, 4)
151 151 l = struct.unpack(">l", d)[0]
152 152 if l <= 4:
153 153 if l:
154 154 raise error.Abort(_("invalid chunk length %d") % l)
155 155 return 0
156 156 if self.callback:
157 157 self.callback()
158 158 return l - 4
159 159
160 160 def changelogheader(self):
161 161 """v10 does not have a changelog header chunk"""
162 162 return {}
163 163
164 164 def manifestheader(self):
165 165 """v10 does not have a manifest header chunk"""
166 166 return {}
167 167
168 168 def filelogheader(self):
169 169 """return the header of the filelogs chunk, v10 only has the filename"""
170 170 l = self._chunklength()
171 171 if not l:
172 172 return {}
173 173 fname = readexactly(self._stream, l)
174 174 return {'filename': fname}
175 175
176 176 def _deltaheader(self, headertuple, prevnode):
177 177 node, p1, p2, cs = headertuple
178 178 if prevnode is None:
179 179 deltabase = p1
180 180 else:
181 181 deltabase = prevnode
182 182 flags = 0
183 183 return node, p1, p2, deltabase, cs, flags
184 184
185 185 def deltachunk(self, prevnode):
186 186 l = self._chunklength()
187 187 if not l:
188 188 return {}
189 189 headerdata = readexactly(self._stream, self.deltaheadersize)
190 190 header = struct.unpack(self.deltaheader, headerdata)
191 191 delta = readexactly(self._stream, l - self.deltaheadersize)
192 192 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
193 193 return (node, p1, p2, cs, deltabase, delta, flags)
194 194
195 195 def getchunks(self):
196 196 """returns all the chunks contains in the bundle
197 197
198 198 Used when you need to forward the binary stream to a file or another
199 199 network API. To do so, it parse the changegroup data, otherwise it will
200 200 block in case of sshrepo because it don't know the end of the stream.
201 201 """
202 202 # For changegroup 1 and 2, we expect 3 parts: changelog, manifestlog,
203 203 # and a list of filelogs. For changegroup 3, we expect 4 parts:
204 204 # changelog, manifestlog, a list of tree manifestlogs, and a list of
205 205 # filelogs.
206 206 #
207 207 # Changelog and manifestlog parts are terminated with empty chunks. The
208 208 # tree and file parts are a list of entry sections. Each entry section
209 209 # is a series of chunks terminating in an empty chunk. The list of these
210 210 # entry sections is terminated in yet another empty chunk, so we know
211 211 # we've reached the end of the tree/file list when we reach an empty
212 212 # chunk that was proceeded by no non-empty chunks.
213 213
214 214 parts = 0
215 215 while parts < 2 + self._grouplistcount:
216 216 noentries = True
217 217 while True:
218 218 chunk = getchunk(self)
219 219 if not chunk:
220 220 # The first two empty chunks represent the end of the
221 221 # changelog and the manifestlog portions. The remaining
222 222 # empty chunks represent either A) the end of individual
223 223 # tree or file entries in the file list, or B) the end of
224 224 # the entire list. It's the end of the entire list if there
225 225 # were no entries (i.e. noentries is True).
226 226 if parts < 2:
227 227 parts += 1
228 228 elif noentries:
229 229 parts += 1
230 230 break
231 231 noentries = False
232 232 yield chunkheader(len(chunk))
233 233 pos = 0
234 234 while pos < len(chunk):
235 235 next = pos + 2**20
236 236 yield chunk[pos:next]
237 237 pos = next
238 238 yield closechunk()
239 239
240 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
241 # We know that we'll never have more manifests than we had
242 # changesets.
243 self.callback = prog(_('manifests'), numchanges)
240 def _unpackmanifests(self, repo, revmap, trp, prog):
241 self.callback = prog.increment
244 242 # no need to check for empty manifest group here:
245 243 # if the result of the merge of 1 and 2 is the same in 3 and 4,
246 244 # no new manifest will be created and the manifest group will
247 245 # be empty during the pull
248 246 self.manifestheader()
249 247 deltas = self.deltaiter()
250 248 repo.manifestlog._revlog.addgroup(deltas, revmap, trp)
251 repo.ui.progress(_('manifests'), None)
249 prog.update(None)
252 250 self.callback = None
253 251
254 252 def apply(self, repo, tr, srctype, url, targetphase=phases.draft,
255 253 expectedtotal=None):
256 254 """Add the changegroup returned by source.read() to this repo.
257 255 srctype is a string like 'push', 'pull', or 'unbundle'. url is
258 256 the URL of the repo where this changegroup is coming from.
259 257
260 258 Return an integer summarizing the change to this repo:
261 259 - nothing changed or no source: 0
262 260 - more heads than before: 1+added heads (2..n)
263 261 - fewer heads than before: -1-removed heads (-2..-n)
264 262 - number of heads stays the same: 1
265 263 """
266 264 repo = repo.unfiltered()
267 265 def csmap(x):
268 266 repo.ui.debug("add changeset %s\n" % short(x))
269 267 return len(cl)
270 268
271 269 def revmap(x):
272 270 return cl.rev(x)
273 271
274 272 changesets = files = revisions = 0
275 273
276 274 try:
277 275 # The transaction may already carry source information. In this
278 276 # case we use the top level data. We overwrite the argument
279 277 # because we need to use the top level value (if they exist)
280 278 # in this function.
281 279 srctype = tr.hookargs.setdefault('source', srctype)
282 280 url = tr.hookargs.setdefault('url', url)
283 281 repo.hook('prechangegroup',
284 282 throw=True, **pycompat.strkwargs(tr.hookargs))
285 283
286 284 # write changelog data to temp files so concurrent readers
287 285 # will not see an inconsistent view
288 286 cl = repo.changelog
289 287 cl.delayupdate(tr)
290 288 oldheads = set(cl.heads())
291 289
292 290 trp = weakref.proxy(tr)
293 291 # pull off the changeset group
294 292 repo.ui.status(_("adding changesets\n"))
295 293 clstart = len(cl)
296 class prog(object):
297 def __init__(self, step, total):
298 self._step = step
299 self._total = total
300 self._count = 1
301 def __call__(self):
302 repo.ui.progress(self._step, self._count, unit=_('chunks'),
303 total=self._total)
304 self._count += 1
305 self.callback = prog(_('changesets'), expectedtotal)
294 progress = repo.ui.makeprogress(_('changesets'), unit=_('chunks'),
295 total=expectedtotal)
296 self.callback = progress.increment
306 297
307 298 efiles = set()
308 299 def onchangelog(cl, node):
309 300 efiles.update(cl.readfiles(node))
310 301
311 302 self.changelogheader()
312 303 deltas = self.deltaiter()
313 304 cgnodes = cl.addgroup(deltas, csmap, trp, addrevisioncb=onchangelog)
314 305 efiles = len(efiles)
315 306
316 307 if not cgnodes:
317 308 repo.ui.develwarn('applied empty changegroup',
318 309 config='warn-empty-changegroup')
319 310 clend = len(cl)
320 311 changesets = clend - clstart
321 repo.ui.progress(_('changesets'), None)
312 progress.update(None)
322 313 self.callback = None
323 314
324 315 # pull off the manifest group
325 316 repo.ui.status(_("adding manifests\n"))
326 self._unpackmanifests(repo, revmap, trp, prog, changesets)
317 # We know that we'll never have more manifests than we had
318 # changesets.
319 progress = repo.ui.makeprogress(_('manifests'), unit=_('chunks'),
320 total=changesets)
321 self._unpackmanifests(repo, revmap, trp, progress)
327 322
328 323 needfiles = {}
329 324 if repo.ui.configbool('server', 'validate'):
330 325 cl = repo.changelog
331 326 ml = repo.manifestlog
332 327 # validate incoming csets have their manifests
333 328 for cset in xrange(clstart, clend):
334 329 mfnode = cl.changelogrevision(cset).manifest
335 330 mfest = ml[mfnode].readdelta()
336 331 # store file cgnodes we must see
337 332 for f, n in mfest.iteritems():
338 333 needfiles.setdefault(f, set()).add(n)
339 334
340 335 # process the files
341 336 repo.ui.status(_("adding file changes\n"))
342 337 newrevs, newfiles = _addchangegroupfiles(
343 338 repo, self, revmap, trp, efiles, needfiles)
344 339 revisions += newrevs
345 340 files += newfiles
346 341
347 342 deltaheads = 0
348 343 if oldheads:
349 344 heads = cl.heads()
350 345 deltaheads = len(heads) - len(oldheads)
351 346 for h in heads:
352 347 if h not in oldheads and repo[h].closesbranch():
353 348 deltaheads -= 1
354 349 htext = ""
355 350 if deltaheads:
356 351 htext = _(" (%+d heads)") % deltaheads
357 352
358 353 repo.ui.status(_("added %d changesets"
359 354 " with %d changes to %d files%s\n")
360 355 % (changesets, revisions, files, htext))
361 356 repo.invalidatevolatilesets()
362 357
363 358 if changesets > 0:
364 359 if 'node' not in tr.hookargs:
365 360 tr.hookargs['node'] = hex(cl.node(clstart))
366 361 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
367 362 hookargs = dict(tr.hookargs)
368 363 else:
369 364 hookargs = dict(tr.hookargs)
370 365 hookargs['node'] = hex(cl.node(clstart))
371 366 hookargs['node_last'] = hex(cl.node(clend - 1))
372 367 repo.hook('pretxnchangegroup',
373 368 throw=True, **pycompat.strkwargs(hookargs))
374 369
375 370 added = [cl.node(r) for r in xrange(clstart, clend)]
376 371 phaseall = None
377 372 if srctype in ('push', 'serve'):
378 373 # Old servers can not push the boundary themselves.
379 374 # New servers won't push the boundary if changeset already
380 375 # exists locally as secret
381 376 #
382 377 # We should not use added here but the list of all change in
383 378 # the bundle
384 379 if repo.publishing():
385 380 targetphase = phaseall = phases.public
386 381 else:
387 382 # closer target phase computation
388 383
389 384 # Those changesets have been pushed from the
390 385 # outside, their phases are going to be pushed
391 386 # alongside. Therefor `targetphase` is
392 387 # ignored.
393 388 targetphase = phaseall = phases.draft
394 389 if added:
395 390 phases.registernew(repo, tr, targetphase, added)
396 391 if phaseall is not None:
397 392 phases.advanceboundary(repo, tr, phaseall, cgnodes)
398 393
399 394 if changesets > 0:
400 395
401 396 def runhooks():
402 397 # These hooks run when the lock releases, not when the
403 398 # transaction closes. So it's possible for the changelog
404 399 # to have changed since we last saw it.
405 400 if clstart >= len(repo):
406 401 return
407 402
408 403 repo.hook("changegroup", **pycompat.strkwargs(hookargs))
409 404
410 405 for n in added:
411 406 args = hookargs.copy()
412 407 args['node'] = hex(n)
413 408 del args['node_last']
414 409 repo.hook("incoming", **pycompat.strkwargs(args))
415 410
416 411 newheads = [h for h in repo.heads()
417 412 if h not in oldheads]
418 413 repo.ui.log("incoming",
419 414 "%d incoming changes - new heads: %s\n",
420 415 len(added),
421 416 ', '.join([hex(c[:6]) for c in newheads]))
422 417
423 418 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
424 419 lambda tr: repo._afterlock(runhooks))
425 420 finally:
426 421 repo.ui.flush()
427 422 # never return 0 here:
428 423 if deltaheads < 0:
429 424 ret = deltaheads - 1
430 425 else:
431 426 ret = deltaheads + 1
432 427 return ret
433 428
434 429 def deltaiter(self):
435 430 """
436 431 returns an iterator of the deltas in this changegroup
437 432
438 433 Useful for passing to the underlying storage system to be stored.
439 434 """
440 435 chain = None
441 436 for chunkdata in iter(lambda: self.deltachunk(chain), {}):
442 437 # Chunkdata: (node, p1, p2, cs, deltabase, delta, flags)
443 438 yield chunkdata
444 439 chain = chunkdata[0]
445 440
446 441 class cg2unpacker(cg1unpacker):
447 442 """Unpacker for cg2 streams.
448 443
449 444 cg2 streams add support for generaldelta, so the delta header
450 445 format is slightly different. All other features about the data
451 446 remain the same.
452 447 """
453 448 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
454 449 deltaheadersize = struct.calcsize(deltaheader)
455 450 version = '02'
456 451
457 452 def _deltaheader(self, headertuple, prevnode):
458 453 node, p1, p2, deltabase, cs = headertuple
459 454 flags = 0
460 455 return node, p1, p2, deltabase, cs, flags
461 456
462 457 class cg3unpacker(cg2unpacker):
463 458 """Unpacker for cg3 streams.
464 459
465 460 cg3 streams add support for exchanging treemanifests and revlog
466 461 flags. It adds the revlog flags to the delta header and an empty chunk
467 462 separating manifests and files.
468 463 """
469 464 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
470 465 deltaheadersize = struct.calcsize(deltaheader)
471 466 version = '03'
472 467 _grouplistcount = 2 # One list of manifests and one list of files
473 468
474 469 def _deltaheader(self, headertuple, prevnode):
475 470 node, p1, p2, deltabase, cs, flags = headertuple
476 471 return node, p1, p2, deltabase, cs, flags
477 472
478 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
479 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog,
480 numchanges)
473 def _unpackmanifests(self, repo, revmap, trp, prog):
474 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog)
481 475 for chunkdata in iter(self.filelogheader, {}):
482 476 # If we get here, there are directory manifests in the changegroup
483 477 d = chunkdata["filename"]
484 478 repo.ui.debug("adding %s revisions\n" % d)
485 479 dirlog = repo.manifestlog._revlog.dirlog(d)
486 480 deltas = self.deltaiter()
487 481 if not dirlog.addgroup(deltas, revmap, trp):
488 482 raise error.Abort(_("received dir revlog group is empty"))
489 483
490 484 class headerlessfixup(object):
491 485 def __init__(self, fh, h):
492 486 self._h = h
493 487 self._fh = fh
494 488 def read(self, n):
495 489 if self._h:
496 490 d, self._h = self._h[:n], self._h[n:]
497 491 if len(d) < n:
498 492 d += readexactly(self._fh, n - len(d))
499 493 return d
500 494 return readexactly(self._fh, n)
501 495
502 496 class cg1packer(object):
503 497 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
504 498 version = '01'
505 499 def __init__(self, repo, bundlecaps=None):
506 500 """Given a source repo, construct a bundler.
507 501
508 502 bundlecaps is optional and can be used to specify the set of
509 503 capabilities which can be used to build the bundle. While bundlecaps is
510 504 unused in core Mercurial, extensions rely on this feature to communicate
511 505 capabilities to customize the changegroup packer.
512 506 """
513 507 # Set of capabilities we can use to build the bundle.
514 508 if bundlecaps is None:
515 509 bundlecaps = set()
516 510 self._bundlecaps = bundlecaps
517 511 # experimental config: bundle.reorder
518 512 reorder = repo.ui.config('bundle', 'reorder')
519 513 if reorder == 'auto':
520 514 reorder = None
521 515 else:
522 516 reorder = stringutil.parsebool(reorder)
523 517 self._repo = repo
524 518 self._reorder = reorder
525 519 self._progress = repo.ui.progress
526 520 if self._repo.ui.verbose and not self._repo.ui.debugflag:
527 521 self._verbosenote = self._repo.ui.note
528 522 else:
529 523 self._verbosenote = lambda s: None
530 524
531 525 def close(self):
532 526 return closechunk()
533 527
534 528 def fileheader(self, fname):
535 529 return chunkheader(len(fname)) + fname
536 530
537 531 # Extracted both for clarity and for overriding in extensions.
538 532 def _sortgroup(self, revlog, nodelist, lookup):
539 533 """Sort nodes for change group and turn them into revnums."""
540 534 # for generaldelta revlogs, we linearize the revs; this will both be
541 535 # much quicker and generate a much smaller bundle
542 536 if (revlog._generaldelta and self._reorder is None) or self._reorder:
543 537 dag = dagutil.revlogdag(revlog)
544 538 return dag.linearize(set(revlog.rev(n) for n in nodelist))
545 539 else:
546 540 return sorted([revlog.rev(n) for n in nodelist])
547 541
548 542 def group(self, nodelist, revlog, lookup, units=None):
549 543 """Calculate a delta group, yielding a sequence of changegroup chunks
550 544 (strings).
551 545
552 546 Given a list of changeset revs, return a set of deltas and
553 547 metadata corresponding to nodes. The first delta is
554 548 first parent(nodelist[0]) -> nodelist[0], the receiver is
555 549 guaranteed to have this parent as it has all history before
556 550 these changesets. In the case firstparent is nullrev the
557 551 changegroup starts with a full revision.
558 552
559 553 If units is not None, progress detail will be generated, units specifies
560 554 the type of revlog that is touched (changelog, manifest, etc.).
561 555 """
562 556 # if we don't have any revisions touched by these changesets, bail
563 557 if len(nodelist) == 0:
564 558 yield self.close()
565 559 return
566 560
567 561 revs = self._sortgroup(revlog, nodelist, lookup)
568 562
569 563 # add the parent of the first rev
570 564 p = revlog.parentrevs(revs[0])[0]
571 565 revs.insert(0, p)
572 566
573 567 # build deltas
574 568 total = len(revs) - 1
575 569 msgbundling = _('bundling')
576 570 for r in xrange(len(revs) - 1):
577 571 if units is not None:
578 572 self._progress(msgbundling, r + 1, unit=units, total=total)
579 573 prev, curr = revs[r], revs[r + 1]
580 574 linknode = lookup(revlog.node(curr))
581 575 for c in self.revchunk(revlog, curr, prev, linknode):
582 576 yield c
583 577
584 578 if units is not None:
585 579 self._progress(msgbundling, None)
586 580 yield self.close()
587 581
588 582 # filter any nodes that claim to be part of the known set
589 583 def prune(self, revlog, missing, commonrevs):
590 584 rr, rl = revlog.rev, revlog.linkrev
591 585 return [n for n in missing if rl(rr(n)) not in commonrevs]
592 586
593 587 def _packmanifests(self, dir, mfnodes, lookuplinknode):
594 588 """Pack flat manifests into a changegroup stream."""
595 589 assert not dir
596 590 for chunk in self.group(mfnodes, self._repo.manifestlog._revlog,
597 591 lookuplinknode, units=_('manifests')):
598 592 yield chunk
599 593
600 594 def _manifestsdone(self):
601 595 return ''
602 596
603 597 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
604 598 '''yield a sequence of changegroup chunks (strings)'''
605 599 repo = self._repo
606 600 cl = repo.changelog
607 601
608 602 clrevorder = {}
609 603 mfs = {} # needed manifests
610 604 fnodes = {} # needed file nodes
611 605 changedfiles = set()
612 606
613 607 # Callback for the changelog, used to collect changed files and manifest
614 608 # nodes.
615 609 # Returns the linkrev node (identity in the changelog case).
616 610 def lookupcl(x):
617 611 c = cl.read(x)
618 612 clrevorder[x] = len(clrevorder)
619 613 n = c[0]
620 614 # record the first changeset introducing this manifest version
621 615 mfs.setdefault(n, x)
622 616 # Record a complete list of potentially-changed files in
623 617 # this manifest.
624 618 changedfiles.update(c[3])
625 619 return x
626 620
627 621 self._verbosenote(_('uncompressed size of bundle content:\n'))
628 622 size = 0
629 623 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
630 624 size += len(chunk)
631 625 yield chunk
632 626 self._verbosenote(_('%8.i (changelog)\n') % size)
633 627
634 628 # We need to make sure that the linkrev in the changegroup refers to
635 629 # the first changeset that introduced the manifest or file revision.
636 630 # The fastpath is usually safer than the slowpath, because the filelogs
637 631 # are walked in revlog order.
638 632 #
639 633 # When taking the slowpath with reorder=None and the manifest revlog
640 634 # uses generaldelta, the manifest may be walked in the "wrong" order.
641 635 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
642 636 # cc0ff93d0c0c).
643 637 #
644 638 # When taking the fastpath, we are only vulnerable to reordering
645 639 # of the changelog itself. The changelog never uses generaldelta, so
646 640 # it is only reordered when reorder=True. To handle this case, we
647 641 # simply take the slowpath, which already has the 'clrevorder' logic.
648 642 # This was also fixed in cc0ff93d0c0c.
649 643 fastpathlinkrev = fastpathlinkrev and not self._reorder
650 644 # Treemanifests don't work correctly with fastpathlinkrev
651 645 # either, because we don't discover which directory nodes to
652 646 # send along with files. This could probably be fixed.
653 647 fastpathlinkrev = fastpathlinkrev and (
654 648 'treemanifest' not in repo.requirements)
655 649
656 650 for chunk in self.generatemanifests(commonrevs, clrevorder,
657 651 fastpathlinkrev, mfs, fnodes, source):
658 652 yield chunk
659 653 mfs.clear()
660 654 clrevs = set(cl.rev(x) for x in clnodes)
661 655
662 656 if not fastpathlinkrev:
663 657 def linknodes(unused, fname):
664 658 return fnodes.get(fname, {})
665 659 else:
666 660 cln = cl.node
667 661 def linknodes(filerevlog, fname):
668 662 llr = filerevlog.linkrev
669 663 fln = filerevlog.node
670 664 revs = ((r, llr(r)) for r in filerevlog)
671 665 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
672 666
673 667 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
674 668 source):
675 669 yield chunk
676 670
677 671 yield self.close()
678 672
679 673 if clnodes:
680 674 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
681 675
682 676 def generatemanifests(self, commonrevs, clrevorder, fastpathlinkrev, mfs,
683 677 fnodes, source):
684 678 """Returns an iterator of changegroup chunks containing manifests.
685 679
686 680 `source` is unused here, but is used by extensions like remotefilelog to
687 681 change what is sent based in pulls vs pushes, etc.
688 682 """
689 683 repo = self._repo
690 684 mfl = repo.manifestlog
691 685 dirlog = mfl._revlog.dirlog
692 686 tmfnodes = {'': mfs}
693 687
694 688 # Callback for the manifest, used to collect linkrevs for filelog
695 689 # revisions.
696 690 # Returns the linkrev node (collected in lookupcl).
697 691 def makelookupmflinknode(dir, nodes):
698 692 if fastpathlinkrev:
699 693 assert not dir
700 694 return mfs.__getitem__
701 695
702 696 def lookupmflinknode(x):
703 697 """Callback for looking up the linknode for manifests.
704 698
705 699 Returns the linkrev node for the specified manifest.
706 700
707 701 SIDE EFFECT:
708 702
709 703 1) fclnodes gets populated with the list of relevant
710 704 file nodes if we're not using fastpathlinkrev
711 705 2) When treemanifests are in use, collects treemanifest nodes
712 706 to send
713 707
714 708 Note that this means manifests must be completely sent to
715 709 the client before you can trust the list of files and
716 710 treemanifests to send.
717 711 """
718 712 clnode = nodes[x]
719 713 mdata = mfl.get(dir, x).readfast(shallow=True)
720 714 for p, n, fl in mdata.iterentries():
721 715 if fl == 't': # subdirectory manifest
722 716 subdir = dir + p + '/'
723 717 tmfclnodes = tmfnodes.setdefault(subdir, {})
724 718 tmfclnode = tmfclnodes.setdefault(n, clnode)
725 719 if clrevorder[clnode] < clrevorder[tmfclnode]:
726 720 tmfclnodes[n] = clnode
727 721 else:
728 722 f = dir + p
729 723 fclnodes = fnodes.setdefault(f, {})
730 724 fclnode = fclnodes.setdefault(n, clnode)
731 725 if clrevorder[clnode] < clrevorder[fclnode]:
732 726 fclnodes[n] = clnode
733 727 return clnode
734 728 return lookupmflinknode
735 729
736 730 size = 0
737 731 while tmfnodes:
738 732 dir, nodes = tmfnodes.popitem()
739 733 prunednodes = self.prune(dirlog(dir), nodes, commonrevs)
740 734 if not dir or prunednodes:
741 735 for x in self._packmanifests(dir, prunednodes,
742 736 makelookupmflinknode(dir, nodes)):
743 737 size += len(x)
744 738 yield x
745 739 self._verbosenote(_('%8.i (manifests)\n') % size)
746 740 yield self._manifestsdone()
747 741
748 742 # The 'source' parameter is useful for extensions
749 743 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
750 744 repo = self._repo
751 745 progress = self._progress
752 746 msgbundling = _('bundling')
753 747
754 748 total = len(changedfiles)
755 749 # for progress output
756 750 msgfiles = _('files')
757 751 for i, fname in enumerate(sorted(changedfiles)):
758 752 filerevlog = repo.file(fname)
759 753 if not filerevlog:
760 754 raise error.Abort(_("empty or missing file data for %s") %
761 755 fname)
762 756
763 757 linkrevnodes = linknodes(filerevlog, fname)
764 758 # Lookup for filenodes, we collected the linkrev nodes above in the
765 759 # fastpath case and with lookupmf in the slowpath case.
766 760 def lookupfilelog(x):
767 761 return linkrevnodes[x]
768 762
769 763 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
770 764 if filenodes:
771 765 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
772 766 total=total)
773 767 h = self.fileheader(fname)
774 768 size = len(h)
775 769 yield h
776 770 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
777 771 size += len(chunk)
778 772 yield chunk
779 773 self._verbosenote(_('%8.i %s\n') % (size, fname))
780 774 progress(msgbundling, None)
781 775
782 776 def deltaparent(self, revlog, rev, p1, p2, prev):
783 777 if not revlog.candelta(prev, rev):
784 778 raise error.ProgrammingError('cg1 should not be used in this case')
785 779 return prev
786 780
787 781 def revchunk(self, revlog, rev, prev, linknode):
788 782 node = revlog.node(rev)
789 783 p1, p2 = revlog.parentrevs(rev)
790 784 base = self.deltaparent(revlog, rev, p1, p2, prev)
791 785
792 786 prefix = ''
793 787 if revlog.iscensored(base) or revlog.iscensored(rev):
794 788 try:
795 789 delta = revlog.revision(node, raw=True)
796 790 except error.CensoredNodeError as e:
797 791 delta = e.tombstone
798 792 if base == nullrev:
799 793 prefix = mdiff.trivialdiffheader(len(delta))
800 794 else:
801 795 baselen = revlog.rawsize(base)
802 796 prefix = mdiff.replacediffheader(baselen, len(delta))
803 797 elif base == nullrev:
804 798 delta = revlog.revision(node, raw=True)
805 799 prefix = mdiff.trivialdiffheader(len(delta))
806 800 else:
807 801 delta = revlog.revdiff(base, rev)
808 802 p1n, p2n = revlog.parents(node)
809 803 basenode = revlog.node(base)
810 804 flags = revlog.flags(rev)
811 805 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
812 806 meta += prefix
813 807 l = len(meta) + len(delta)
814 808 yield chunkheader(l)
815 809 yield meta
816 810 yield delta
817 811 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
818 812 # do nothing with basenode, it is implicitly the previous one in HG10
819 813 # do nothing with flags, it is implicitly 0 for cg1 and cg2
820 814 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
821 815
822 816 class cg2packer(cg1packer):
823 817 version = '02'
824 818 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
825 819
826 820 def __init__(self, repo, bundlecaps=None):
827 821 super(cg2packer, self).__init__(repo, bundlecaps)
828 822 if self._reorder is None:
829 823 # Since generaldelta is directly supported by cg2, reordering
830 824 # generally doesn't help, so we disable it by default (treating
831 825 # bundle.reorder=auto just like bundle.reorder=False).
832 826 self._reorder = False
833 827
834 828 def deltaparent(self, revlog, rev, p1, p2, prev):
835 829 dp = revlog.deltaparent(rev)
836 830 if dp == nullrev and revlog.storedeltachains:
837 831 # Avoid sending full revisions when delta parent is null. Pick prev
838 832 # in that case. It's tempting to pick p1 in this case, as p1 will
839 833 # be smaller in the common case. However, computing a delta against
840 834 # p1 may require resolving the raw text of p1, which could be
841 835 # expensive. The revlog caches should have prev cached, meaning
842 836 # less CPU for changegroup generation. There is likely room to add
843 837 # a flag and/or config option to control this behavior.
844 838 base = prev
845 839 elif dp == nullrev:
846 840 # revlog is configured to use full snapshot for a reason,
847 841 # stick to full snapshot.
848 842 base = nullrev
849 843 elif dp not in (p1, p2, prev):
850 844 # Pick prev when we can't be sure remote has the base revision.
851 845 return prev
852 846 else:
853 847 base = dp
854 848 if base != nullrev and not revlog.candelta(base, rev):
855 849 base = nullrev
856 850 return base
857 851
858 852 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
859 853 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
860 854 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
861 855
862 856 class cg3packer(cg2packer):
863 857 version = '03'
864 858 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
865 859
866 860 def _packmanifests(self, dir, mfnodes, lookuplinknode):
867 861 if dir:
868 862 yield self.fileheader(dir)
869 863
870 864 dirlog = self._repo.manifestlog._revlog.dirlog(dir)
871 865 for chunk in self.group(mfnodes, dirlog, lookuplinknode,
872 866 units=_('manifests')):
873 867 yield chunk
874 868
875 869 def _manifestsdone(self):
876 870 return self.close()
877 871
878 872 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
879 873 return struct.pack(
880 874 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
881 875
882 876 _packermap = {'01': (cg1packer, cg1unpacker),
883 877 # cg2 adds support for exchanging generaldelta
884 878 '02': (cg2packer, cg2unpacker),
885 879 # cg3 adds support for exchanging revlog flags and treemanifests
886 880 '03': (cg3packer, cg3unpacker),
887 881 }
888 882
889 883 def allsupportedversions(repo):
890 884 versions = set(_packermap.keys())
891 885 if not (repo.ui.configbool('experimental', 'changegroup3') or
892 886 repo.ui.configbool('experimental', 'treemanifest') or
893 887 'treemanifest' in repo.requirements):
894 888 versions.discard('03')
895 889 return versions
896 890
897 891 # Changegroup versions that can be applied to the repo
898 892 def supportedincomingversions(repo):
899 893 return allsupportedversions(repo)
900 894
901 895 # Changegroup versions that can be created from the repo
902 896 def supportedoutgoingversions(repo):
903 897 versions = allsupportedversions(repo)
904 898 if 'treemanifest' in repo.requirements:
905 899 # Versions 01 and 02 support only flat manifests and it's just too
906 900 # expensive to convert between the flat manifest and tree manifest on
907 901 # the fly. Since tree manifests are hashed differently, all of history
908 902 # would have to be converted. Instead, we simply don't even pretend to
909 903 # support versions 01 and 02.
910 904 versions.discard('01')
911 905 versions.discard('02')
912 906 if NARROW_REQUIREMENT in repo.requirements:
913 907 # Versions 01 and 02 don't support revlog flags, and we need to
914 908 # support that for stripping and unbundling to work.
915 909 versions.discard('01')
916 910 versions.discard('02')
917 911 if LFS_REQUIREMENT in repo.requirements:
918 912 # Versions 01 and 02 don't support revlog flags, and we need to
919 913 # mark LFS entries with REVIDX_EXTSTORED.
920 914 versions.discard('01')
921 915 versions.discard('02')
922 916
923 917 return versions
924 918
925 919 def localversion(repo):
926 920 # Finds the best version to use for bundles that are meant to be used
927 921 # locally, such as those from strip and shelve, and temporary bundles.
928 922 return max(supportedoutgoingversions(repo))
929 923
930 924 def safeversion(repo):
931 925 # Finds the smallest version that it's safe to assume clients of the repo
932 926 # will support. For example, all hg versions that support generaldelta also
933 927 # support changegroup 02.
934 928 versions = supportedoutgoingversions(repo)
935 929 if 'generaldelta' in repo.requirements:
936 930 versions.discard('01')
937 931 assert versions
938 932 return min(versions)
939 933
940 934 def getbundler(version, repo, bundlecaps=None):
941 935 assert version in supportedoutgoingversions(repo)
942 936 return _packermap[version][0](repo, bundlecaps)
943 937
944 938 def getunbundler(version, fh, alg, extras=None):
945 939 return _packermap[version][1](fh, alg, extras=extras)
946 940
947 941 def _changegroupinfo(repo, nodes, source):
948 942 if repo.ui.verbose or source == 'bundle':
949 943 repo.ui.status(_("%d changesets found\n") % len(nodes))
950 944 if repo.ui.debugflag:
951 945 repo.ui.debug("list of changesets:\n")
952 946 for node in nodes:
953 947 repo.ui.debug("%s\n" % hex(node))
954 948
955 949 def makechangegroup(repo, outgoing, version, source, fastpath=False,
956 950 bundlecaps=None):
957 951 cgstream = makestream(repo, outgoing, version, source,
958 952 fastpath=fastpath, bundlecaps=bundlecaps)
959 953 return getunbundler(version, util.chunkbuffer(cgstream), None,
960 954 {'clcount': len(outgoing.missing) })
961 955
962 956 def makestream(repo, outgoing, version, source, fastpath=False,
963 957 bundlecaps=None):
964 958 bundler = getbundler(version, repo, bundlecaps=bundlecaps)
965 959
966 960 repo = repo.unfiltered()
967 961 commonrevs = outgoing.common
968 962 csets = outgoing.missing
969 963 heads = outgoing.missingheads
970 964 # We go through the fast path if we get told to, or if all (unfiltered
971 965 # heads have been requested (since we then know there all linkrevs will
972 966 # be pulled by the client).
973 967 heads.sort()
974 968 fastpathlinkrev = fastpath or (
975 969 repo.filtername is None and heads == sorted(repo.heads()))
976 970
977 971 repo.hook('preoutgoing', throw=True, source=source)
978 972 _changegroupinfo(repo, csets, source)
979 973 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
980 974
981 975 def _addchangegroupfiles(repo, source, revmap, trp, expectedfiles, needfiles):
982 976 revisions = 0
983 977 files = 0
984 978 for chunkdata in iter(source.filelogheader, {}):
985 979 files += 1
986 980 f = chunkdata["filename"]
987 981 repo.ui.debug("adding %s revisions\n" % f)
988 982 repo.ui.progress(_('files'), files, unit=_('files'),
989 983 total=expectedfiles)
990 984 fl = repo.file(f)
991 985 o = len(fl)
992 986 try:
993 987 deltas = source.deltaiter()
994 988 if not fl.addgroup(deltas, revmap, trp):
995 989 raise error.Abort(_("received file revlog group is empty"))
996 990 except error.CensoredBaseError as e:
997 991 raise error.Abort(_("received delta base is censored: %s") % e)
998 992 revisions += len(fl) - o
999 993 if f in needfiles:
1000 994 needs = needfiles[f]
1001 995 for new in xrange(o, len(fl)):
1002 996 n = fl.node(new)
1003 997 if n in needs:
1004 998 needs.remove(n)
1005 999 else:
1006 1000 raise error.Abort(
1007 1001 _("received spurious file revlog entry"))
1008 1002 if not needs:
1009 1003 del needfiles[f]
1010 1004 repo.ui.progress(_('files'), None)
1011 1005
1012 1006 for f, needs in needfiles.iteritems():
1013 1007 fl = repo.file(f)
1014 1008 for n in needs:
1015 1009 try:
1016 1010 fl.rev(n)
1017 1011 except error.LookupError:
1018 1012 raise error.Abort(
1019 1013 _('missing file data for %s:%s - run hg verify') %
1020 1014 (f, hex(n)))
1021 1015
1022 1016 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now