##// END OF EJS Templates
changegroup: use any node, not min(), in treemanifest's generatemanifests...
Kyle Lippincott -
r35013:d80380ba default
parent child Browse files
Show More
@@ -1,1005 +1,1003 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullrev,
19 19 short,
20 20 )
21 21
22 22 from . import (
23 23 dagutil,
24 24 error,
25 25 mdiff,
26 26 phases,
27 27 pycompat,
28 28 util,
29 29 )
30 30
31 31 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
32 32 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
33 33 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
34 34
35 35 def readexactly(stream, n):
36 36 '''read n bytes from stream.read and abort if less was available'''
37 37 s = stream.read(n)
38 38 if len(s) < n:
39 39 raise error.Abort(_("stream ended unexpectedly"
40 40 " (got %d bytes, expected %d)")
41 41 % (len(s), n))
42 42 return s
43 43
44 44 def getchunk(stream):
45 45 """return the next chunk from stream as a string"""
46 46 d = readexactly(stream, 4)
47 47 l = struct.unpack(">l", d)[0]
48 48 if l <= 4:
49 49 if l:
50 50 raise error.Abort(_("invalid chunk length %d") % l)
51 51 return ""
52 52 return readexactly(stream, l - 4)
53 53
54 54 def chunkheader(length):
55 55 """return a changegroup chunk header (string)"""
56 56 return struct.pack(">l", length + 4)
57 57
58 58 def closechunk():
59 59 """return a changegroup chunk header (string) for a zero-length chunk"""
60 60 return struct.pack(">l", 0)
61 61
62 62 def writechunks(ui, chunks, filename, vfs=None):
63 63 """Write chunks to a file and return its filename.
64 64
65 65 The stream is assumed to be a bundle file.
66 66 Existing files will not be overwritten.
67 67 If no filename is specified, a temporary file is created.
68 68 """
69 69 fh = None
70 70 cleanup = None
71 71 try:
72 72 if filename:
73 73 if vfs:
74 74 fh = vfs.open(filename, "wb")
75 75 else:
76 76 # Increase default buffer size because default is usually
77 77 # small (4k is common on Linux).
78 78 fh = open(filename, "wb", 131072)
79 79 else:
80 80 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
81 81 fh = os.fdopen(fd, pycompat.sysstr("wb"))
82 82 cleanup = filename
83 83 for c in chunks:
84 84 fh.write(c)
85 85 cleanup = None
86 86 return filename
87 87 finally:
88 88 if fh is not None:
89 89 fh.close()
90 90 if cleanup is not None:
91 91 if filename and vfs:
92 92 vfs.unlink(cleanup)
93 93 else:
94 94 os.unlink(cleanup)
95 95
96 96 class cg1unpacker(object):
97 97 """Unpacker for cg1 changegroup streams.
98 98
99 99 A changegroup unpacker handles the framing of the revision data in
100 100 the wire format. Most consumers will want to use the apply()
101 101 method to add the changes from the changegroup to a repository.
102 102
103 103 If you're forwarding a changegroup unmodified to another consumer,
104 104 use getchunks(), which returns an iterator of changegroup
105 105 chunks. This is mostly useful for cases where you need to know the
106 106 data stream has ended by observing the end of the changegroup.
107 107
108 108 deltachunk() is useful only if you're applying delta data. Most
109 109 consumers should prefer apply() instead.
110 110
111 111 A few other public methods exist. Those are used only for
112 112 bundlerepo and some debug commands - their use is discouraged.
113 113 """
114 114 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
115 115 deltaheadersize = struct.calcsize(deltaheader)
116 116 version = '01'
117 117 _grouplistcount = 1 # One list of files after the manifests
118 118
119 119 def __init__(self, fh, alg, extras=None):
120 120 if alg is None:
121 121 alg = 'UN'
122 122 if alg not in util.compengines.supportedbundletypes:
123 123 raise error.Abort(_('unknown stream compression type: %s')
124 124 % alg)
125 125 if alg == 'BZ':
126 126 alg = '_truncatedBZ'
127 127
128 128 compengine = util.compengines.forbundletype(alg)
129 129 self._stream = compengine.decompressorreader(fh)
130 130 self._type = alg
131 131 self.extras = extras or {}
132 132 self.callback = None
133 133
134 134 # These methods (compressed, read, seek, tell) all appear to only
135 135 # be used by bundlerepo, but it's a little hard to tell.
136 136 def compressed(self):
137 137 return self._type is not None and self._type != 'UN'
138 138 def read(self, l):
139 139 return self._stream.read(l)
140 140 def seek(self, pos):
141 141 return self._stream.seek(pos)
142 142 def tell(self):
143 143 return self._stream.tell()
144 144 def close(self):
145 145 return self._stream.close()
146 146
147 147 def _chunklength(self):
148 148 d = readexactly(self._stream, 4)
149 149 l = struct.unpack(">l", d)[0]
150 150 if l <= 4:
151 151 if l:
152 152 raise error.Abort(_("invalid chunk length %d") % l)
153 153 return 0
154 154 if self.callback:
155 155 self.callback()
156 156 return l - 4
157 157
158 158 def changelogheader(self):
159 159 """v10 does not have a changelog header chunk"""
160 160 return {}
161 161
162 162 def manifestheader(self):
163 163 """v10 does not have a manifest header chunk"""
164 164 return {}
165 165
166 166 def filelogheader(self):
167 167 """return the header of the filelogs chunk, v10 only has the filename"""
168 168 l = self._chunklength()
169 169 if not l:
170 170 return {}
171 171 fname = readexactly(self._stream, l)
172 172 return {'filename': fname}
173 173
174 174 def _deltaheader(self, headertuple, prevnode):
175 175 node, p1, p2, cs = headertuple
176 176 if prevnode is None:
177 177 deltabase = p1
178 178 else:
179 179 deltabase = prevnode
180 180 flags = 0
181 181 return node, p1, p2, deltabase, cs, flags
182 182
183 183 def deltachunk(self, prevnode):
184 184 l = self._chunklength()
185 185 if not l:
186 186 return {}
187 187 headerdata = readexactly(self._stream, self.deltaheadersize)
188 188 header = struct.unpack(self.deltaheader, headerdata)
189 189 delta = readexactly(self._stream, l - self.deltaheadersize)
190 190 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
191 191 return (node, p1, p2, cs, deltabase, delta, flags)
192 192
193 193 def getchunks(self):
194 194 """returns all the chunks contains in the bundle
195 195
196 196 Used when you need to forward the binary stream to a file or another
197 197 network API. To do so, it parse the changegroup data, otherwise it will
198 198 block in case of sshrepo because it don't know the end of the stream.
199 199 """
200 200 # For changegroup 1 and 2, we expect 3 parts: changelog, manifestlog,
201 201 # and a list of filelogs. For changegroup 3, we expect 4 parts:
202 202 # changelog, manifestlog, a list of tree manifestlogs, and a list of
203 203 # filelogs.
204 204 #
205 205 # Changelog and manifestlog parts are terminated with empty chunks. The
206 206 # tree and file parts are a list of entry sections. Each entry section
207 207 # is a series of chunks terminating in an empty chunk. The list of these
208 208 # entry sections is terminated in yet another empty chunk, so we know
209 209 # we've reached the end of the tree/file list when we reach an empty
210 210 # chunk that was proceeded by no non-empty chunks.
211 211
212 212 parts = 0
213 213 while parts < 2 + self._grouplistcount:
214 214 noentries = True
215 215 while True:
216 216 chunk = getchunk(self)
217 217 if not chunk:
218 218 # The first two empty chunks represent the end of the
219 219 # changelog and the manifestlog portions. The remaining
220 220 # empty chunks represent either A) the end of individual
221 221 # tree or file entries in the file list, or B) the end of
222 222 # the entire list. It's the end of the entire list if there
223 223 # were no entries (i.e. noentries is True).
224 224 if parts < 2:
225 225 parts += 1
226 226 elif noentries:
227 227 parts += 1
228 228 break
229 229 noentries = False
230 230 yield chunkheader(len(chunk))
231 231 pos = 0
232 232 while pos < len(chunk):
233 233 next = pos + 2**20
234 234 yield chunk[pos:next]
235 235 pos = next
236 236 yield closechunk()
237 237
238 238 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
239 239 # We know that we'll never have more manifests than we had
240 240 # changesets.
241 241 self.callback = prog(_('manifests'), numchanges)
242 242 # no need to check for empty manifest group here:
243 243 # if the result of the merge of 1 and 2 is the same in 3 and 4,
244 244 # no new manifest will be created and the manifest group will
245 245 # be empty during the pull
246 246 self.manifestheader()
247 247 deltas = self.deltaiter()
248 248 repo.manifestlog._revlog.addgroup(deltas, revmap, trp)
249 249 repo.ui.progress(_('manifests'), None)
250 250 self.callback = None
251 251
252 252 def apply(self, repo, tr, srctype, url, targetphase=phases.draft,
253 253 expectedtotal=None):
254 254 """Add the changegroup returned by source.read() to this repo.
255 255 srctype is a string like 'push', 'pull', or 'unbundle'. url is
256 256 the URL of the repo where this changegroup is coming from.
257 257
258 258 Return an integer summarizing the change to this repo:
259 259 - nothing changed or no source: 0
260 260 - more heads than before: 1+added heads (2..n)
261 261 - fewer heads than before: -1-removed heads (-2..-n)
262 262 - number of heads stays the same: 1
263 263 """
264 264 repo = repo.unfiltered()
265 265 def csmap(x):
266 266 repo.ui.debug("add changeset %s\n" % short(x))
267 267 return len(cl)
268 268
269 269 def revmap(x):
270 270 return cl.rev(x)
271 271
272 272 changesets = files = revisions = 0
273 273
274 274 try:
275 275 # The transaction may already carry source information. In this
276 276 # case we use the top level data. We overwrite the argument
277 277 # because we need to use the top level value (if they exist)
278 278 # in this function.
279 279 srctype = tr.hookargs.setdefault('source', srctype)
280 280 url = tr.hookargs.setdefault('url', url)
281 281 repo.hook('prechangegroup',
282 282 throw=True, **pycompat.strkwargs(tr.hookargs))
283 283
284 284 # write changelog data to temp files so concurrent readers
285 285 # will not see an inconsistent view
286 286 cl = repo.changelog
287 287 cl.delayupdate(tr)
288 288 oldheads = set(cl.heads())
289 289
290 290 trp = weakref.proxy(tr)
291 291 # pull off the changeset group
292 292 repo.ui.status(_("adding changesets\n"))
293 293 clstart = len(cl)
294 294 class prog(object):
295 295 def __init__(self, step, total):
296 296 self._step = step
297 297 self._total = total
298 298 self._count = 1
299 299 def __call__(self):
300 300 repo.ui.progress(self._step, self._count, unit=_('chunks'),
301 301 total=self._total)
302 302 self._count += 1
303 303 self.callback = prog(_('changesets'), expectedtotal)
304 304
305 305 efiles = set()
306 306 def onchangelog(cl, node):
307 307 efiles.update(cl.readfiles(node))
308 308
309 309 self.changelogheader()
310 310 deltas = self.deltaiter()
311 311 cgnodes = cl.addgroup(deltas, csmap, trp, addrevisioncb=onchangelog)
312 312 efiles = len(efiles)
313 313
314 314 if not cgnodes:
315 315 repo.ui.develwarn('applied empty changegroup',
316 316 config='warn-empty-changegroup')
317 317 clend = len(cl)
318 318 changesets = clend - clstart
319 319 repo.ui.progress(_('changesets'), None)
320 320 self.callback = None
321 321
322 322 # pull off the manifest group
323 323 repo.ui.status(_("adding manifests\n"))
324 324 self._unpackmanifests(repo, revmap, trp, prog, changesets)
325 325
326 326 needfiles = {}
327 327 if repo.ui.configbool('server', 'validate'):
328 328 cl = repo.changelog
329 329 ml = repo.manifestlog
330 330 # validate incoming csets have their manifests
331 331 for cset in xrange(clstart, clend):
332 332 mfnode = cl.changelogrevision(cset).manifest
333 333 mfest = ml[mfnode].readdelta()
334 334 # store file cgnodes we must see
335 335 for f, n in mfest.iteritems():
336 336 needfiles.setdefault(f, set()).add(n)
337 337
338 338 # process the files
339 339 repo.ui.status(_("adding file changes\n"))
340 340 newrevs, newfiles = _addchangegroupfiles(
341 341 repo, self, revmap, trp, efiles, needfiles)
342 342 revisions += newrevs
343 343 files += newfiles
344 344
345 345 deltaheads = 0
346 346 if oldheads:
347 347 heads = cl.heads()
348 348 deltaheads = len(heads) - len(oldheads)
349 349 for h in heads:
350 350 if h not in oldheads and repo[h].closesbranch():
351 351 deltaheads -= 1
352 352 htext = ""
353 353 if deltaheads:
354 354 htext = _(" (%+d heads)") % deltaheads
355 355
356 356 repo.ui.status(_("added %d changesets"
357 357 " with %d changes to %d files%s\n")
358 358 % (changesets, revisions, files, htext))
359 359 repo.invalidatevolatilesets()
360 360
361 361 if changesets > 0:
362 362 if 'node' not in tr.hookargs:
363 363 tr.hookargs['node'] = hex(cl.node(clstart))
364 364 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
365 365 hookargs = dict(tr.hookargs)
366 366 else:
367 367 hookargs = dict(tr.hookargs)
368 368 hookargs['node'] = hex(cl.node(clstart))
369 369 hookargs['node_last'] = hex(cl.node(clend - 1))
370 370 repo.hook('pretxnchangegroup',
371 371 throw=True, **pycompat.strkwargs(hookargs))
372 372
373 373 added = [cl.node(r) for r in xrange(clstart, clend)]
374 374 phaseall = None
375 375 if srctype in ('push', 'serve'):
376 376 # Old servers can not push the boundary themselves.
377 377 # New servers won't push the boundary if changeset already
378 378 # exists locally as secret
379 379 #
380 380 # We should not use added here but the list of all change in
381 381 # the bundle
382 382 if repo.publishing():
383 383 targetphase = phaseall = phases.public
384 384 else:
385 385 # closer target phase computation
386 386
387 387 # Those changesets have been pushed from the
388 388 # outside, their phases are going to be pushed
389 389 # alongside. Therefor `targetphase` is
390 390 # ignored.
391 391 targetphase = phaseall = phases.draft
392 392 if added:
393 393 phases.registernew(repo, tr, targetphase, added)
394 394 if phaseall is not None:
395 395 phases.advanceboundary(repo, tr, phaseall, cgnodes)
396 396
397 397 if changesets > 0:
398 398
399 399 def runhooks():
400 400 # These hooks run when the lock releases, not when the
401 401 # transaction closes. So it's possible for the changelog
402 402 # to have changed since we last saw it.
403 403 if clstart >= len(repo):
404 404 return
405 405
406 406 repo.hook("changegroup", **pycompat.strkwargs(hookargs))
407 407
408 408 for n in added:
409 409 args = hookargs.copy()
410 410 args['node'] = hex(n)
411 411 del args['node_last']
412 412 repo.hook("incoming", **pycompat.strkwargs(args))
413 413
414 414 newheads = [h for h in repo.heads()
415 415 if h not in oldheads]
416 416 repo.ui.log("incoming",
417 417 "%s incoming changes - new heads: %s\n",
418 418 len(added),
419 419 ', '.join([hex(c[:6]) for c in newheads]))
420 420
421 421 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
422 422 lambda tr: repo._afterlock(runhooks))
423 423 finally:
424 424 repo.ui.flush()
425 425 # never return 0 here:
426 426 if deltaheads < 0:
427 427 ret = deltaheads - 1
428 428 else:
429 429 ret = deltaheads + 1
430 430 return ret
431 431
432 432 def deltaiter(self):
433 433 """
434 434 returns an iterator of the deltas in this changegroup
435 435
436 436 Useful for passing to the underlying storage system to be stored.
437 437 """
438 438 chain = None
439 439 for chunkdata in iter(lambda: self.deltachunk(chain), {}):
440 440 # Chunkdata: (node, p1, p2, cs, deltabase, delta, flags)
441 441 yield chunkdata
442 442 chain = chunkdata[0]
443 443
444 444 class cg2unpacker(cg1unpacker):
445 445 """Unpacker for cg2 streams.
446 446
447 447 cg2 streams add support for generaldelta, so the delta header
448 448 format is slightly different. All other features about the data
449 449 remain the same.
450 450 """
451 451 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
452 452 deltaheadersize = struct.calcsize(deltaheader)
453 453 version = '02'
454 454
455 455 def _deltaheader(self, headertuple, prevnode):
456 456 node, p1, p2, deltabase, cs = headertuple
457 457 flags = 0
458 458 return node, p1, p2, deltabase, cs, flags
459 459
460 460 class cg3unpacker(cg2unpacker):
461 461 """Unpacker for cg3 streams.
462 462
463 463 cg3 streams add support for exchanging treemanifests and revlog
464 464 flags. It adds the revlog flags to the delta header and an empty chunk
465 465 separating manifests and files.
466 466 """
467 467 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
468 468 deltaheadersize = struct.calcsize(deltaheader)
469 469 version = '03'
470 470 _grouplistcount = 2 # One list of manifests and one list of files
471 471
472 472 def _deltaheader(self, headertuple, prevnode):
473 473 node, p1, p2, deltabase, cs, flags = headertuple
474 474 return node, p1, p2, deltabase, cs, flags
475 475
476 476 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
477 477 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog,
478 478 numchanges)
479 479 for chunkdata in iter(self.filelogheader, {}):
480 480 # If we get here, there are directory manifests in the changegroup
481 481 d = chunkdata["filename"]
482 482 repo.ui.debug("adding %s revisions\n" % d)
483 483 dirlog = repo.manifestlog._revlog.dirlog(d)
484 484 deltas = self.deltaiter()
485 485 if not dirlog.addgroup(deltas, revmap, trp):
486 486 raise error.Abort(_("received dir revlog group is empty"))
487 487
488 488 class headerlessfixup(object):
489 489 def __init__(self, fh, h):
490 490 self._h = h
491 491 self._fh = fh
492 492 def read(self, n):
493 493 if self._h:
494 494 d, self._h = self._h[:n], self._h[n:]
495 495 if len(d) < n:
496 496 d += readexactly(self._fh, n - len(d))
497 497 return d
498 498 return readexactly(self._fh, n)
499 499
500 500 class cg1packer(object):
501 501 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
502 502 version = '01'
503 503 def __init__(self, repo, bundlecaps=None):
504 504 """Given a source repo, construct a bundler.
505 505
506 506 bundlecaps is optional and can be used to specify the set of
507 507 capabilities which can be used to build the bundle. While bundlecaps is
508 508 unused in core Mercurial, extensions rely on this feature to communicate
509 509 capabilities to customize the changegroup packer.
510 510 """
511 511 # Set of capabilities we can use to build the bundle.
512 512 if bundlecaps is None:
513 513 bundlecaps = set()
514 514 self._bundlecaps = bundlecaps
515 515 # experimental config: bundle.reorder
516 516 reorder = repo.ui.config('bundle', 'reorder')
517 517 if reorder == 'auto':
518 518 reorder = None
519 519 else:
520 520 reorder = util.parsebool(reorder)
521 521 self._repo = repo
522 522 self._reorder = reorder
523 523 self._progress = repo.ui.progress
524 524 if self._repo.ui.verbose and not self._repo.ui.debugflag:
525 525 self._verbosenote = self._repo.ui.note
526 526 else:
527 527 self._verbosenote = lambda s: None
528 528
529 529 def close(self):
530 530 return closechunk()
531 531
532 532 def fileheader(self, fname):
533 533 return chunkheader(len(fname)) + fname
534 534
535 535 # Extracted both for clarity and for overriding in extensions.
536 536 def _sortgroup(self, revlog, nodelist, lookup):
537 537 """Sort nodes for change group and turn them into revnums."""
538 538 # for generaldelta revlogs, we linearize the revs; this will both be
539 539 # much quicker and generate a much smaller bundle
540 540 if (revlog._generaldelta and self._reorder is None) or self._reorder:
541 541 dag = dagutil.revlogdag(revlog)
542 542 return dag.linearize(set(revlog.rev(n) for n in nodelist))
543 543 else:
544 544 return sorted([revlog.rev(n) for n in nodelist])
545 545
546 546 def group(self, nodelist, revlog, lookup, units=None):
547 547 """Calculate a delta group, yielding a sequence of changegroup chunks
548 548 (strings).
549 549
550 550 Given a list of changeset revs, return a set of deltas and
551 551 metadata corresponding to nodes. The first delta is
552 552 first parent(nodelist[0]) -> nodelist[0], the receiver is
553 553 guaranteed to have this parent as it has all history before
554 554 these changesets. In the case firstparent is nullrev the
555 555 changegroup starts with a full revision.
556 556
557 557 If units is not None, progress detail will be generated, units specifies
558 558 the type of revlog that is touched (changelog, manifest, etc.).
559 559 """
560 560 # if we don't have any revisions touched by these changesets, bail
561 561 if len(nodelist) == 0:
562 562 yield self.close()
563 563 return
564 564
565 565 revs = self._sortgroup(revlog, nodelist, lookup)
566 566
567 567 # add the parent of the first rev
568 568 p = revlog.parentrevs(revs[0])[0]
569 569 revs.insert(0, p)
570 570
571 571 # build deltas
572 572 total = len(revs) - 1
573 573 msgbundling = _('bundling')
574 574 for r in xrange(len(revs) - 1):
575 575 if units is not None:
576 576 self._progress(msgbundling, r + 1, unit=units, total=total)
577 577 prev, curr = revs[r], revs[r + 1]
578 578 linknode = lookup(revlog.node(curr))
579 579 for c in self.revchunk(revlog, curr, prev, linknode):
580 580 yield c
581 581
582 582 if units is not None:
583 583 self._progress(msgbundling, None)
584 584 yield self.close()
585 585
586 586 # filter any nodes that claim to be part of the known set
587 587 def prune(self, revlog, missing, commonrevs):
588 588 rr, rl = revlog.rev, revlog.linkrev
589 589 return [n for n in missing if rl(rr(n)) not in commonrevs]
590 590
591 591 def _packmanifests(self, dir, mfnodes, lookuplinknode):
592 592 """Pack flat manifests into a changegroup stream."""
593 593 assert not dir
594 594 for chunk in self.group(mfnodes, self._repo.manifestlog._revlog,
595 595 lookuplinknode, units=_('manifests')):
596 596 yield chunk
597 597
598 598 def _manifestsdone(self):
599 599 return ''
600 600
601 601 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
602 602 '''yield a sequence of changegroup chunks (strings)'''
603 603 repo = self._repo
604 604 cl = repo.changelog
605 605
606 606 clrevorder = {}
607 607 mfs = {} # needed manifests
608 608 fnodes = {} # needed file nodes
609 609 changedfiles = set()
610 610
611 611 # Callback for the changelog, used to collect changed files and manifest
612 612 # nodes.
613 613 # Returns the linkrev node (identity in the changelog case).
614 614 def lookupcl(x):
615 615 c = cl.read(x)
616 616 clrevorder[x] = len(clrevorder)
617 617 n = c[0]
618 618 # record the first changeset introducing this manifest version
619 619 mfs.setdefault(n, x)
620 620 # Record a complete list of potentially-changed files in
621 621 # this manifest.
622 622 changedfiles.update(c[3])
623 623 return x
624 624
625 625 self._verbosenote(_('uncompressed size of bundle content:\n'))
626 626 size = 0
627 627 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
628 628 size += len(chunk)
629 629 yield chunk
630 630 self._verbosenote(_('%8.i (changelog)\n') % size)
631 631
632 632 # We need to make sure that the linkrev in the changegroup refers to
633 633 # the first changeset that introduced the manifest or file revision.
634 634 # The fastpath is usually safer than the slowpath, because the filelogs
635 635 # are walked in revlog order.
636 636 #
637 637 # When taking the slowpath with reorder=None and the manifest revlog
638 638 # uses generaldelta, the manifest may be walked in the "wrong" order.
639 639 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
640 640 # cc0ff93d0c0c).
641 641 #
642 642 # When taking the fastpath, we are only vulnerable to reordering
643 643 # of the changelog itself. The changelog never uses generaldelta, so
644 644 # it is only reordered when reorder=True. To handle this case, we
645 645 # simply take the slowpath, which already has the 'clrevorder' logic.
646 646 # This was also fixed in cc0ff93d0c0c.
647 647 fastpathlinkrev = fastpathlinkrev and not self._reorder
648 648 # Treemanifests don't work correctly with fastpathlinkrev
649 649 # either, because we don't discover which directory nodes to
650 650 # send along with files. This could probably be fixed.
651 651 fastpathlinkrev = fastpathlinkrev and (
652 652 'treemanifest' not in repo.requirements)
653 653
654 654 for chunk in self.generatemanifests(commonrevs, clrevorder,
655 655 fastpathlinkrev, mfs, fnodes, source):
656 656 yield chunk
657 657 mfs.clear()
658 658 clrevs = set(cl.rev(x) for x in clnodes)
659 659
660 660 if not fastpathlinkrev:
661 661 def linknodes(unused, fname):
662 662 return fnodes.get(fname, {})
663 663 else:
664 664 cln = cl.node
665 665 def linknodes(filerevlog, fname):
666 666 llr = filerevlog.linkrev
667 667 fln = filerevlog.node
668 668 revs = ((r, llr(r)) for r in filerevlog)
669 669 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
670 670
671 671 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
672 672 source):
673 673 yield chunk
674 674
675 675 yield self.close()
676 676
677 677 if clnodes:
678 678 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
679 679
680 680 def generatemanifests(self, commonrevs, clrevorder, fastpathlinkrev, mfs,
681 681 fnodes, source):
682 682 """Returns an iterator of changegroup chunks containing manifests.
683 683
684 684 `source` is unused here, but is used by extensions like remotefilelog to
685 685 change what is sent based in pulls vs pushes, etc.
686 686 """
687 687 repo = self._repo
688 688 mfl = repo.manifestlog
689 689 dirlog = mfl._revlog.dirlog
690 690 tmfnodes = {'': mfs}
691 691
692 692 # Callback for the manifest, used to collect linkrevs for filelog
693 693 # revisions.
694 694 # Returns the linkrev node (collected in lookupcl).
695 def makelookupmflinknode(dir):
695 def makelookupmflinknode(dir, nodes):
696 696 if fastpathlinkrev:
697 697 assert not dir
698 698 return mfs.__getitem__
699 699
700 700 def lookupmflinknode(x):
701 701 """Callback for looking up the linknode for manifests.
702 702
703 703 Returns the linkrev node for the specified manifest.
704 704
705 705 SIDE EFFECT:
706 706
707 707 1) fclnodes gets populated with the list of relevant
708 708 file nodes if we're not using fastpathlinkrev
709 709 2) When treemanifests are in use, collects treemanifest nodes
710 710 to send
711 711
712 712 Note that this means manifests must be completely sent to
713 713 the client before you can trust the list of files and
714 714 treemanifests to send.
715 715 """
716 clnode = tmfnodes[dir][x]
716 clnode = nodes[x]
717 717 mdata = mfl.get(dir, x).readfast(shallow=True)
718 718 for p, n, fl in mdata.iterentries():
719 719 if fl == 't': # subdirectory manifest
720 720 subdir = dir + p + '/'
721 721 tmfclnodes = tmfnodes.setdefault(subdir, {})
722 722 tmfclnode = tmfclnodes.setdefault(n, clnode)
723 723 if clrevorder[clnode] < clrevorder[tmfclnode]:
724 724 tmfclnodes[n] = clnode
725 725 else:
726 726 f = dir + p
727 727 fclnodes = fnodes.setdefault(f, {})
728 728 fclnode = fclnodes.setdefault(n, clnode)
729 729 if clrevorder[clnode] < clrevorder[fclnode]:
730 730 fclnodes[n] = clnode
731 731 return clnode
732 732 return lookupmflinknode
733 733
734 734 size = 0
735 735 while tmfnodes:
736 dir = min(tmfnodes)
737 nodes = tmfnodes[dir]
736 dir, nodes = tmfnodes.popitem()
738 737 prunednodes = self.prune(dirlog(dir), nodes, commonrevs)
739 738 if not dir or prunednodes:
740 739 for x in self._packmanifests(dir, prunednodes,
741 makelookupmflinknode(dir)):
740 makelookupmflinknode(dir, nodes)):
742 741 size += len(x)
743 742 yield x
744 del tmfnodes[dir]
745 743 self._verbosenote(_('%8.i (manifests)\n') % size)
746 744 yield self._manifestsdone()
747 745
748 746 # The 'source' parameter is useful for extensions
749 747 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
750 748 repo = self._repo
751 749 progress = self._progress
752 750 msgbundling = _('bundling')
753 751
754 752 total = len(changedfiles)
755 753 # for progress output
756 754 msgfiles = _('files')
757 755 for i, fname in enumerate(sorted(changedfiles)):
758 756 filerevlog = repo.file(fname)
759 757 if not filerevlog:
760 758 raise error.Abort(_("empty or missing revlog for %s") % fname)
761 759
762 760 linkrevnodes = linknodes(filerevlog, fname)
763 761 # Lookup for filenodes, we collected the linkrev nodes above in the
764 762 # fastpath case and with lookupmf in the slowpath case.
765 763 def lookupfilelog(x):
766 764 return linkrevnodes[x]
767 765
768 766 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
769 767 if filenodes:
770 768 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
771 769 total=total)
772 770 h = self.fileheader(fname)
773 771 size = len(h)
774 772 yield h
775 773 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
776 774 size += len(chunk)
777 775 yield chunk
778 776 self._verbosenote(_('%8.i %s\n') % (size, fname))
779 777 progress(msgbundling, None)
780 778
781 779 def deltaparent(self, revlog, rev, p1, p2, prev):
782 780 return prev
783 781
784 782 def revchunk(self, revlog, rev, prev, linknode):
785 783 node = revlog.node(rev)
786 784 p1, p2 = revlog.parentrevs(rev)
787 785 base = self.deltaparent(revlog, rev, p1, p2, prev)
788 786
789 787 prefix = ''
790 788 if revlog.iscensored(base) or revlog.iscensored(rev):
791 789 try:
792 790 delta = revlog.revision(node, raw=True)
793 791 except error.CensoredNodeError as e:
794 792 delta = e.tombstone
795 793 if base == nullrev:
796 794 prefix = mdiff.trivialdiffheader(len(delta))
797 795 else:
798 796 baselen = revlog.rawsize(base)
799 797 prefix = mdiff.replacediffheader(baselen, len(delta))
800 798 elif base == nullrev:
801 799 delta = revlog.revision(node, raw=True)
802 800 prefix = mdiff.trivialdiffheader(len(delta))
803 801 else:
804 802 delta = revlog.revdiff(base, rev)
805 803 p1n, p2n = revlog.parents(node)
806 804 basenode = revlog.node(base)
807 805 flags = revlog.flags(rev)
808 806 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
809 807 meta += prefix
810 808 l = len(meta) + len(delta)
811 809 yield chunkheader(l)
812 810 yield meta
813 811 yield delta
814 812 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
815 813 # do nothing with basenode, it is implicitly the previous one in HG10
816 814 # do nothing with flags, it is implicitly 0 for cg1 and cg2
817 815 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
818 816
819 817 class cg2packer(cg1packer):
820 818 version = '02'
821 819 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
822 820
823 821 def __init__(self, repo, bundlecaps=None):
824 822 super(cg2packer, self).__init__(repo, bundlecaps)
825 823 if self._reorder is None:
826 824 # Since generaldelta is directly supported by cg2, reordering
827 825 # generally doesn't help, so we disable it by default (treating
828 826 # bundle.reorder=auto just like bundle.reorder=False).
829 827 self._reorder = False
830 828
831 829 def deltaparent(self, revlog, rev, p1, p2, prev):
832 830 dp = revlog.deltaparent(rev)
833 831 if dp == nullrev and revlog.storedeltachains:
834 832 # Avoid sending full revisions when delta parent is null. Pick prev
835 833 # in that case. It's tempting to pick p1 in this case, as p1 will
836 834 # be smaller in the common case. However, computing a delta against
837 835 # p1 may require resolving the raw text of p1, which could be
838 836 # expensive. The revlog caches should have prev cached, meaning
839 837 # less CPU for changegroup generation. There is likely room to add
840 838 # a flag and/or config option to control this behavior.
841 839 return prev
842 840 elif dp == nullrev:
843 841 # revlog is configured to use full snapshot for a reason,
844 842 # stick to full snapshot.
845 843 return nullrev
846 844 elif dp not in (p1, p2, prev):
847 845 # Pick prev when we can't be sure remote has the base revision.
848 846 return prev
849 847 else:
850 848 return dp
851 849
852 850 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
853 851 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
854 852 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
855 853
856 854 class cg3packer(cg2packer):
857 855 version = '03'
858 856 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
859 857
860 858 def _packmanifests(self, dir, mfnodes, lookuplinknode):
861 859 if dir:
862 860 yield self.fileheader(dir)
863 861
864 862 dirlog = self._repo.manifestlog._revlog.dirlog(dir)
865 863 for chunk in self.group(mfnodes, dirlog, lookuplinknode,
866 864 units=_('manifests')):
867 865 yield chunk
868 866
869 867 def _manifestsdone(self):
870 868 return self.close()
871 869
872 870 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
873 871 return struct.pack(
874 872 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
875 873
876 874 _packermap = {'01': (cg1packer, cg1unpacker),
877 875 # cg2 adds support for exchanging generaldelta
878 876 '02': (cg2packer, cg2unpacker),
879 877 # cg3 adds support for exchanging revlog flags and treemanifests
880 878 '03': (cg3packer, cg3unpacker),
881 879 }
882 880
883 881 def allsupportedversions(repo):
884 882 versions = set(_packermap.keys())
885 883 if not (repo.ui.configbool('experimental', 'changegroup3') or
886 884 repo.ui.configbool('experimental', 'treemanifest') or
887 885 'treemanifest' in repo.requirements):
888 886 versions.discard('03')
889 887 return versions
890 888
891 889 # Changegroup versions that can be applied to the repo
892 890 def supportedincomingversions(repo):
893 891 return allsupportedversions(repo)
894 892
895 893 # Changegroup versions that can be created from the repo
896 894 def supportedoutgoingversions(repo):
897 895 versions = allsupportedversions(repo)
898 896 if 'treemanifest' in repo.requirements:
899 897 # Versions 01 and 02 support only flat manifests and it's just too
900 898 # expensive to convert between the flat manifest and tree manifest on
901 899 # the fly. Since tree manifests are hashed differently, all of history
902 900 # would have to be converted. Instead, we simply don't even pretend to
903 901 # support versions 01 and 02.
904 902 versions.discard('01')
905 903 versions.discard('02')
906 904 return versions
907 905
908 906 def localversion(repo):
909 907 # Finds the best version to use for bundles that are meant to be used
910 908 # locally, such as those from strip and shelve, and temporary bundles.
911 909 return max(supportedoutgoingversions(repo))
912 910
913 911 def safeversion(repo):
914 912 # Finds the smallest version that it's safe to assume clients of the repo
915 913 # will support. For example, all hg versions that support generaldelta also
916 914 # support changegroup 02.
917 915 versions = supportedoutgoingversions(repo)
918 916 if 'generaldelta' in repo.requirements:
919 917 versions.discard('01')
920 918 assert versions
921 919 return min(versions)
922 920
923 921 def getbundler(version, repo, bundlecaps=None):
924 922 assert version in supportedoutgoingversions(repo)
925 923 return _packermap[version][0](repo, bundlecaps)
926 924
927 925 def getunbundler(version, fh, alg, extras=None):
928 926 return _packermap[version][1](fh, alg, extras=extras)
929 927
930 928 def _changegroupinfo(repo, nodes, source):
931 929 if repo.ui.verbose or source == 'bundle':
932 930 repo.ui.status(_("%d changesets found\n") % len(nodes))
933 931 if repo.ui.debugflag:
934 932 repo.ui.debug("list of changesets:\n")
935 933 for node in nodes:
936 934 repo.ui.debug("%s\n" % hex(node))
937 935
938 936 def makechangegroup(repo, outgoing, version, source, fastpath=False,
939 937 bundlecaps=None):
940 938 cgstream = makestream(repo, outgoing, version, source,
941 939 fastpath=fastpath, bundlecaps=bundlecaps)
942 940 return getunbundler(version, util.chunkbuffer(cgstream), None,
943 941 {'clcount': len(outgoing.missing) })
944 942
945 943 def makestream(repo, outgoing, version, source, fastpath=False,
946 944 bundlecaps=None):
947 945 bundler = getbundler(version, repo, bundlecaps=bundlecaps)
948 946
949 947 repo = repo.unfiltered()
950 948 commonrevs = outgoing.common
951 949 csets = outgoing.missing
952 950 heads = outgoing.missingheads
953 951 # We go through the fast path if we get told to, or if all (unfiltered
954 952 # heads have been requested (since we then know there all linkrevs will
955 953 # be pulled by the client).
956 954 heads.sort()
957 955 fastpathlinkrev = fastpath or (
958 956 repo.filtername is None and heads == sorted(repo.heads()))
959 957
960 958 repo.hook('preoutgoing', throw=True, source=source)
961 959 _changegroupinfo(repo, csets, source)
962 960 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
963 961
964 962 def _addchangegroupfiles(repo, source, revmap, trp, expectedfiles, needfiles):
965 963 revisions = 0
966 964 files = 0
967 965 for chunkdata in iter(source.filelogheader, {}):
968 966 files += 1
969 967 f = chunkdata["filename"]
970 968 repo.ui.debug("adding %s revisions\n" % f)
971 969 repo.ui.progress(_('files'), files, unit=_('files'),
972 970 total=expectedfiles)
973 971 fl = repo.file(f)
974 972 o = len(fl)
975 973 try:
976 974 deltas = source.deltaiter()
977 975 if not fl.addgroup(deltas, revmap, trp):
978 976 raise error.Abort(_("received file revlog group is empty"))
979 977 except error.CensoredBaseError as e:
980 978 raise error.Abort(_("received delta base is censored: %s") % e)
981 979 revisions += len(fl) - o
982 980 if f in needfiles:
983 981 needs = needfiles[f]
984 982 for new in xrange(o, len(fl)):
985 983 n = fl.node(new)
986 984 if n in needs:
987 985 needs.remove(n)
988 986 else:
989 987 raise error.Abort(
990 988 _("received spurious file revlog entry"))
991 989 if not needs:
992 990 del needfiles[f]
993 991 repo.ui.progress(_('files'), None)
994 992
995 993 for f, needs in needfiles.iteritems():
996 994 fl = repo.file(f)
997 995 for n in needs:
998 996 try:
999 997 fl.rev(n)
1000 998 except error.LookupError:
1001 999 raise error.Abort(
1002 1000 _('missing file data for %s:%s - run hg verify') %
1003 1001 (f, hex(n)))
1004 1002
1005 1003 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now