##// END OF EJS Templates
changegroup: add source parameter to generatemanifests...
Durham Goode -
r34148:75cc1f1e default
parent child Browse files
Show More
@@ -1,1005 +1,1010 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullrev,
19 19 short,
20 20 )
21 21
22 22 from . import (
23 23 dagutil,
24 24 error,
25 25 mdiff,
26 26 phases,
27 27 pycompat,
28 28 util,
29 29 )
30 30
31 31 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
32 32 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
33 33 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
34 34
35 35 def readexactly(stream, n):
36 36 '''read n bytes from stream.read and abort if less was available'''
37 37 s = stream.read(n)
38 38 if len(s) < n:
39 39 raise error.Abort(_("stream ended unexpectedly"
40 40 " (got %d bytes, expected %d)")
41 41 % (len(s), n))
42 42 return s
43 43
44 44 def getchunk(stream):
45 45 """return the next chunk from stream as a string"""
46 46 d = readexactly(stream, 4)
47 47 l = struct.unpack(">l", d)[0]
48 48 if l <= 4:
49 49 if l:
50 50 raise error.Abort(_("invalid chunk length %d") % l)
51 51 return ""
52 52 return readexactly(stream, l - 4)
53 53
54 54 def chunkheader(length):
55 55 """return a changegroup chunk header (string)"""
56 56 return struct.pack(">l", length + 4)
57 57
58 58 def closechunk():
59 59 """return a changegroup chunk header (string) for a zero-length chunk"""
60 60 return struct.pack(">l", 0)
61 61
62 62 def writechunks(ui, chunks, filename, vfs=None):
63 63 """Write chunks to a file and return its filename.
64 64
65 65 The stream is assumed to be a bundle file.
66 66 Existing files will not be overwritten.
67 67 If no filename is specified, a temporary file is created.
68 68 """
69 69 fh = None
70 70 cleanup = None
71 71 try:
72 72 if filename:
73 73 if vfs:
74 74 fh = vfs.open(filename, "wb")
75 75 else:
76 76 # Increase default buffer size because default is usually
77 77 # small (4k is common on Linux).
78 78 fh = open(filename, "wb", 131072)
79 79 else:
80 80 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
81 81 fh = os.fdopen(fd, pycompat.sysstr("wb"))
82 82 cleanup = filename
83 83 for c in chunks:
84 84 fh.write(c)
85 85 cleanup = None
86 86 return filename
87 87 finally:
88 88 if fh is not None:
89 89 fh.close()
90 90 if cleanup is not None:
91 91 if filename and vfs:
92 92 vfs.unlink(cleanup)
93 93 else:
94 94 os.unlink(cleanup)
95 95
96 96 class cg1unpacker(object):
97 97 """Unpacker for cg1 changegroup streams.
98 98
99 99 A changegroup unpacker handles the framing of the revision data in
100 100 the wire format. Most consumers will want to use the apply()
101 101 method to add the changes from the changegroup to a repository.
102 102
103 103 If you're forwarding a changegroup unmodified to another consumer,
104 104 use getchunks(), which returns an iterator of changegroup
105 105 chunks. This is mostly useful for cases where you need to know the
106 106 data stream has ended by observing the end of the changegroup.
107 107
108 108 deltachunk() is useful only if you're applying delta data. Most
109 109 consumers should prefer apply() instead.
110 110
111 111 A few other public methods exist. Those are used only for
112 112 bundlerepo and some debug commands - their use is discouraged.
113 113 """
114 114 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
115 115 deltaheadersize = struct.calcsize(deltaheader)
116 116 version = '01'
117 117 _grouplistcount = 1 # One list of files after the manifests
118 118
119 119 def __init__(self, fh, alg, extras=None):
120 120 if alg is None:
121 121 alg = 'UN'
122 122 if alg not in util.compengines.supportedbundletypes:
123 123 raise error.Abort(_('unknown stream compression type: %s')
124 124 % alg)
125 125 if alg == 'BZ':
126 126 alg = '_truncatedBZ'
127 127
128 128 compengine = util.compengines.forbundletype(alg)
129 129 self._stream = compengine.decompressorreader(fh)
130 130 self._type = alg
131 131 self.extras = extras or {}
132 132 self.callback = None
133 133
134 134 # These methods (compressed, read, seek, tell) all appear to only
135 135 # be used by bundlerepo, but it's a little hard to tell.
136 136 def compressed(self):
137 137 return self._type is not None and self._type != 'UN'
138 138 def read(self, l):
139 139 return self._stream.read(l)
140 140 def seek(self, pos):
141 141 return self._stream.seek(pos)
142 142 def tell(self):
143 143 return self._stream.tell()
144 144 def close(self):
145 145 return self._stream.close()
146 146
147 147 def _chunklength(self):
148 148 d = readexactly(self._stream, 4)
149 149 l = struct.unpack(">l", d)[0]
150 150 if l <= 4:
151 151 if l:
152 152 raise error.Abort(_("invalid chunk length %d") % l)
153 153 return 0
154 154 if self.callback:
155 155 self.callback()
156 156 return l - 4
157 157
158 158 def changelogheader(self):
159 159 """v10 does not have a changelog header chunk"""
160 160 return {}
161 161
162 162 def manifestheader(self):
163 163 """v10 does not have a manifest header chunk"""
164 164 return {}
165 165
166 166 def filelogheader(self):
167 167 """return the header of the filelogs chunk, v10 only has the filename"""
168 168 l = self._chunklength()
169 169 if not l:
170 170 return {}
171 171 fname = readexactly(self._stream, l)
172 172 return {'filename': fname}
173 173
174 174 def _deltaheader(self, headertuple, prevnode):
175 175 node, p1, p2, cs = headertuple
176 176 if prevnode is None:
177 177 deltabase = p1
178 178 else:
179 179 deltabase = prevnode
180 180 flags = 0
181 181 return node, p1, p2, deltabase, cs, flags
182 182
183 183 def deltachunk(self, prevnode):
184 184 l = self._chunklength()
185 185 if not l:
186 186 return {}
187 187 headerdata = readexactly(self._stream, self.deltaheadersize)
188 188 header = struct.unpack(self.deltaheader, headerdata)
189 189 delta = readexactly(self._stream, l - self.deltaheadersize)
190 190 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
191 191 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
192 192 'deltabase': deltabase, 'delta': delta, 'flags': flags}
193 193
194 194 def getchunks(self):
195 195 """returns all the chunks contains in the bundle
196 196
197 197 Used when you need to forward the binary stream to a file or another
198 198 network API. To do so, it parse the changegroup data, otherwise it will
199 199 block in case of sshrepo because it don't know the end of the stream.
200 200 """
201 201 # For changegroup 1 and 2, we expect 3 parts: changelog, manifestlog,
202 202 # and a list of filelogs. For changegroup 3, we expect 4 parts:
203 203 # changelog, manifestlog, a list of tree manifestlogs, and a list of
204 204 # filelogs.
205 205 #
206 206 # Changelog and manifestlog parts are terminated with empty chunks. The
207 207 # tree and file parts are a list of entry sections. Each entry section
208 208 # is a series of chunks terminating in an empty chunk. The list of these
209 209 # entry sections is terminated in yet another empty chunk, so we know
210 210 # we've reached the end of the tree/file list when we reach an empty
211 211 # chunk that was proceeded by no non-empty chunks.
212 212
213 213 parts = 0
214 214 while parts < 2 + self._grouplistcount:
215 215 noentries = True
216 216 while True:
217 217 chunk = getchunk(self)
218 218 if not chunk:
219 219 # The first two empty chunks represent the end of the
220 220 # changelog and the manifestlog portions. The remaining
221 221 # empty chunks represent either A) the end of individual
222 222 # tree or file entries in the file list, or B) the end of
223 223 # the entire list. It's the end of the entire list if there
224 224 # were no entries (i.e. noentries is True).
225 225 if parts < 2:
226 226 parts += 1
227 227 elif noentries:
228 228 parts += 1
229 229 break
230 230 noentries = False
231 231 yield chunkheader(len(chunk))
232 232 pos = 0
233 233 while pos < len(chunk):
234 234 next = pos + 2**20
235 235 yield chunk[pos:next]
236 236 pos = next
237 237 yield closechunk()
238 238
239 239 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
240 240 # We know that we'll never have more manifests than we had
241 241 # changesets.
242 242 self.callback = prog(_('manifests'), numchanges)
243 243 # no need to check for empty manifest group here:
244 244 # if the result of the merge of 1 and 2 is the same in 3 and 4,
245 245 # no new manifest will be created and the manifest group will
246 246 # be empty during the pull
247 247 self.manifestheader()
248 248 deltas = self.deltaiter(revmap)
249 249 repo.manifestlog._revlog.addgroup(deltas, trp)
250 250 repo.ui.progress(_('manifests'), None)
251 251 self.callback = None
252 252
253 253 def apply(self, repo, tr, srctype, url, targetphase=phases.draft,
254 254 expectedtotal=None):
255 255 """Add the changegroup returned by source.read() to this repo.
256 256 srctype is a string like 'push', 'pull', or 'unbundle'. url is
257 257 the URL of the repo where this changegroup is coming from.
258 258
259 259 Return an integer summarizing the change to this repo:
260 260 - nothing changed or no source: 0
261 261 - more heads than before: 1+added heads (2..n)
262 262 - fewer heads than before: -1-removed heads (-2..-n)
263 263 - number of heads stays the same: 1
264 264 """
265 265 repo = repo.unfiltered()
266 266 def csmap(x):
267 267 repo.ui.debug("add changeset %s\n" % short(x))
268 268 return len(cl)
269 269
270 270 def revmap(x):
271 271 return cl.rev(x)
272 272
273 273 changesets = files = revisions = 0
274 274
275 275 try:
276 276 # The transaction may already carry source information. In this
277 277 # case we use the top level data. We overwrite the argument
278 278 # because we need to use the top level value (if they exist)
279 279 # in this function.
280 280 srctype = tr.hookargs.setdefault('source', srctype)
281 281 url = tr.hookargs.setdefault('url', url)
282 282 repo.hook('prechangegroup',
283 283 throw=True, **pycompat.strkwargs(tr.hookargs))
284 284
285 285 # write changelog data to temp files so concurrent readers
286 286 # will not see an inconsistent view
287 287 cl = repo.changelog
288 288 cl.delayupdate(tr)
289 289 oldheads = set(cl.heads())
290 290
291 291 trp = weakref.proxy(tr)
292 292 # pull off the changeset group
293 293 repo.ui.status(_("adding changesets\n"))
294 294 clstart = len(cl)
295 295 class prog(object):
296 296 def __init__(self, step, total):
297 297 self._step = step
298 298 self._total = total
299 299 self._count = 1
300 300 def __call__(self):
301 301 repo.ui.progress(self._step, self._count, unit=_('chunks'),
302 302 total=self._total)
303 303 self._count += 1
304 304 self.callback = prog(_('changesets'), expectedtotal)
305 305
306 306 efiles = set()
307 307 def onchangelog(cl, node):
308 308 efiles.update(cl.readfiles(node))
309 309
310 310 self.changelogheader()
311 311 deltas = self.deltaiter(csmap)
312 312 cgnodes = cl.addgroup(deltas, trp, addrevisioncb=onchangelog)
313 313 efiles = len(efiles)
314 314
315 315 if not cgnodes:
316 316 repo.ui.develwarn('applied empty changegroup',
317 317 config='empty-changegroup')
318 318 clend = len(cl)
319 319 changesets = clend - clstart
320 320 repo.ui.progress(_('changesets'), None)
321 321 self.callback = None
322 322
323 323 # pull off the manifest group
324 324 repo.ui.status(_("adding manifests\n"))
325 325 self._unpackmanifests(repo, revmap, trp, prog, changesets)
326 326
327 327 needfiles = {}
328 328 if repo.ui.configbool('server', 'validate'):
329 329 cl = repo.changelog
330 330 ml = repo.manifestlog
331 331 # validate incoming csets have their manifests
332 332 for cset in xrange(clstart, clend):
333 333 mfnode = cl.changelogrevision(cset).manifest
334 334 mfest = ml[mfnode].readdelta()
335 335 # store file cgnodes we must see
336 336 for f, n in mfest.iteritems():
337 337 needfiles.setdefault(f, set()).add(n)
338 338
339 339 # process the files
340 340 repo.ui.status(_("adding file changes\n"))
341 341 newrevs, newfiles = _addchangegroupfiles(
342 342 repo, self, revmap, trp, efiles, needfiles)
343 343 revisions += newrevs
344 344 files += newfiles
345 345
346 346 deltaheads = 0
347 347 if oldheads:
348 348 heads = cl.heads()
349 349 deltaheads = len(heads) - len(oldheads)
350 350 for h in heads:
351 351 if h not in oldheads and repo[h].closesbranch():
352 352 deltaheads -= 1
353 353 htext = ""
354 354 if deltaheads:
355 355 htext = _(" (%+d heads)") % deltaheads
356 356
357 357 repo.ui.status(_("added %d changesets"
358 358 " with %d changes to %d files%s\n")
359 359 % (changesets, revisions, files, htext))
360 360 repo.invalidatevolatilesets()
361 361
362 362 if changesets > 0:
363 363 if 'node' not in tr.hookargs:
364 364 tr.hookargs['node'] = hex(cl.node(clstart))
365 365 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
366 366 hookargs = dict(tr.hookargs)
367 367 else:
368 368 hookargs = dict(tr.hookargs)
369 369 hookargs['node'] = hex(cl.node(clstart))
370 370 hookargs['node_last'] = hex(cl.node(clend - 1))
371 371 repo.hook('pretxnchangegroup',
372 372 throw=True, **pycompat.strkwargs(hookargs))
373 373
374 374 added = [cl.node(r) for r in xrange(clstart, clend)]
375 375 phaseall = None
376 376 if srctype in ('push', 'serve'):
377 377 # Old servers can not push the boundary themselves.
378 378 # New servers won't push the boundary if changeset already
379 379 # exists locally as secret
380 380 #
381 381 # We should not use added here but the list of all change in
382 382 # the bundle
383 383 if repo.publishing():
384 384 targetphase = phaseall = phases.public
385 385 else:
386 386 # closer target phase computation
387 387
388 388 # Those changesets have been pushed from the
389 389 # outside, their phases are going to be pushed
390 390 # alongside. Therefor `targetphase` is
391 391 # ignored.
392 392 targetphase = phaseall = phases.draft
393 393 if added:
394 394 phases.registernew(repo, tr, targetphase, added)
395 395 if phaseall is not None:
396 396 phases.advanceboundary(repo, tr, phaseall, cgnodes)
397 397
398 398 if changesets > 0:
399 399
400 400 def runhooks():
401 401 # These hooks run when the lock releases, not when the
402 402 # transaction closes. So it's possible for the changelog
403 403 # to have changed since we last saw it.
404 404 if clstart >= len(repo):
405 405 return
406 406
407 407 repo.hook("changegroup", **pycompat.strkwargs(hookargs))
408 408
409 409 for n in added:
410 410 args = hookargs.copy()
411 411 args['node'] = hex(n)
412 412 del args['node_last']
413 413 repo.hook("incoming", **pycompat.strkwargs(args))
414 414
415 415 newheads = [h for h in repo.heads()
416 416 if h not in oldheads]
417 417 repo.ui.log("incoming",
418 418 "%s incoming changes - new heads: %s\n",
419 419 len(added),
420 420 ', '.join([hex(c[:6]) for c in newheads]))
421 421
422 422 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
423 423 lambda tr: repo._afterlock(runhooks))
424 424 finally:
425 425 repo.ui.flush()
426 426 # never return 0 here:
427 427 if deltaheads < 0:
428 428 ret = deltaheads - 1
429 429 else:
430 430 ret = deltaheads + 1
431 431 return ret
432 432
433 433 def deltaiter(self, linkmapper):
434 434 """
435 435 returns an iterator of the deltas in this changegroup
436 436
437 437 Useful for passing to the underlying storage system to be stored.
438 438 """
439 439 chain = None
440 440 for chunkdata in iter(lambda: self.deltachunk(chain), {}):
441 441 node = chunkdata['node']
442 442 p1 = chunkdata['p1']
443 443 p2 = chunkdata['p2']
444 444 cs = chunkdata['cs']
445 445 deltabase = chunkdata['deltabase']
446 446 delta = chunkdata['delta']
447 447 flags = chunkdata['flags']
448 448
449 449 link = linkmapper(cs)
450 450 chain = node
451 451
452 452 yield (node, p1, p2, link, deltabase, delta, flags)
453 453
454 454 class cg2unpacker(cg1unpacker):
455 455 """Unpacker for cg2 streams.
456 456
457 457 cg2 streams add support for generaldelta, so the delta header
458 458 format is slightly different. All other features about the data
459 459 remain the same.
460 460 """
461 461 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
462 462 deltaheadersize = struct.calcsize(deltaheader)
463 463 version = '02'
464 464
465 465 def _deltaheader(self, headertuple, prevnode):
466 466 node, p1, p2, deltabase, cs = headertuple
467 467 flags = 0
468 468 return node, p1, p2, deltabase, cs, flags
469 469
470 470 class cg3unpacker(cg2unpacker):
471 471 """Unpacker for cg3 streams.
472 472
473 473 cg3 streams add support for exchanging treemanifests and revlog
474 474 flags. It adds the revlog flags to the delta header and an empty chunk
475 475 separating manifests and files.
476 476 """
477 477 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
478 478 deltaheadersize = struct.calcsize(deltaheader)
479 479 version = '03'
480 480 _grouplistcount = 2 # One list of manifests and one list of files
481 481
482 482 def _deltaheader(self, headertuple, prevnode):
483 483 node, p1, p2, deltabase, cs, flags = headertuple
484 484 return node, p1, p2, deltabase, cs, flags
485 485
486 486 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
487 487 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog,
488 488 numchanges)
489 489 for chunkdata in iter(self.filelogheader, {}):
490 490 # If we get here, there are directory manifests in the changegroup
491 491 d = chunkdata["filename"]
492 492 repo.ui.debug("adding %s revisions\n" % d)
493 493 dirlog = repo.manifestlog._revlog.dirlog(d)
494 494 deltas = self.deltaiter(revmap)
495 495 if not dirlog.addgroup(deltas, trp):
496 496 raise error.Abort(_("received dir revlog group is empty"))
497 497
498 498 class headerlessfixup(object):
499 499 def __init__(self, fh, h):
500 500 self._h = h
501 501 self._fh = fh
502 502 def read(self, n):
503 503 if self._h:
504 504 d, self._h = self._h[:n], self._h[n:]
505 505 if len(d) < n:
506 506 d += readexactly(self._fh, n - len(d))
507 507 return d
508 508 return readexactly(self._fh, n)
509 509
510 510 class cg1packer(object):
511 511 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
512 512 version = '01'
513 513 def __init__(self, repo, bundlecaps=None):
514 514 """Given a source repo, construct a bundler.
515 515
516 516 bundlecaps is optional and can be used to specify the set of
517 517 capabilities which can be used to build the bundle. While bundlecaps is
518 518 unused in core Mercurial, extensions rely on this feature to communicate
519 519 capabilities to customize the changegroup packer.
520 520 """
521 521 # Set of capabilities we can use to build the bundle.
522 522 if bundlecaps is None:
523 523 bundlecaps = set()
524 524 self._bundlecaps = bundlecaps
525 525 # experimental config: bundle.reorder
526 526 reorder = repo.ui.config('bundle', 'reorder')
527 527 if reorder == 'auto':
528 528 reorder = None
529 529 else:
530 530 reorder = util.parsebool(reorder)
531 531 self._repo = repo
532 532 self._reorder = reorder
533 533 self._progress = repo.ui.progress
534 534 if self._repo.ui.verbose and not self._repo.ui.debugflag:
535 535 self._verbosenote = self._repo.ui.note
536 536 else:
537 537 self._verbosenote = lambda s: None
538 538
539 539 def close(self):
540 540 return closechunk()
541 541
542 542 def fileheader(self, fname):
543 543 return chunkheader(len(fname)) + fname
544 544
545 545 # Extracted both for clarity and for overriding in extensions.
546 546 def _sortgroup(self, revlog, nodelist, lookup):
547 547 """Sort nodes for change group and turn them into revnums."""
548 548 # for generaldelta revlogs, we linearize the revs; this will both be
549 549 # much quicker and generate a much smaller bundle
550 550 if (revlog._generaldelta and self._reorder is None) or self._reorder:
551 551 dag = dagutil.revlogdag(revlog)
552 552 return dag.linearize(set(revlog.rev(n) for n in nodelist))
553 553 else:
554 554 return sorted([revlog.rev(n) for n in nodelist])
555 555
556 556 def group(self, nodelist, revlog, lookup, units=None):
557 557 """Calculate a delta group, yielding a sequence of changegroup chunks
558 558 (strings).
559 559
560 560 Given a list of changeset revs, return a set of deltas and
561 561 metadata corresponding to nodes. The first delta is
562 562 first parent(nodelist[0]) -> nodelist[0], the receiver is
563 563 guaranteed to have this parent as it has all history before
564 564 these changesets. In the case firstparent is nullrev the
565 565 changegroup starts with a full revision.
566 566
567 567 If units is not None, progress detail will be generated, units specifies
568 568 the type of revlog that is touched (changelog, manifest, etc.).
569 569 """
570 570 # if we don't have any revisions touched by these changesets, bail
571 571 if len(nodelist) == 0:
572 572 yield self.close()
573 573 return
574 574
575 575 revs = self._sortgroup(revlog, nodelist, lookup)
576 576
577 577 # add the parent of the first rev
578 578 p = revlog.parentrevs(revs[0])[0]
579 579 revs.insert(0, p)
580 580
581 581 # build deltas
582 582 total = len(revs) - 1
583 583 msgbundling = _('bundling')
584 584 for r in xrange(len(revs) - 1):
585 585 if units is not None:
586 586 self._progress(msgbundling, r + 1, unit=units, total=total)
587 587 prev, curr = revs[r], revs[r + 1]
588 588 linknode = lookup(revlog.node(curr))
589 589 for c in self.revchunk(revlog, curr, prev, linknode):
590 590 yield c
591 591
592 592 if units is not None:
593 593 self._progress(msgbundling, None)
594 594 yield self.close()
595 595
596 596 # filter any nodes that claim to be part of the known set
597 597 def prune(self, revlog, missing, commonrevs):
598 598 rr, rl = revlog.rev, revlog.linkrev
599 599 return [n for n in missing if rl(rr(n)) not in commonrevs]
600 600
601 601 def _packmanifests(self, dir, mfnodes, lookuplinknode):
602 602 """Pack flat manifests into a changegroup stream."""
603 603 assert not dir
604 604 for chunk in self.group(mfnodes, self._repo.manifestlog._revlog,
605 605 lookuplinknode, units=_('manifests')):
606 606 yield chunk
607 607
608 608 def _manifestsdone(self):
609 609 return ''
610 610
611 611 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
612 612 '''yield a sequence of changegroup chunks (strings)'''
613 613 repo = self._repo
614 614 cl = repo.changelog
615 615
616 616 clrevorder = {}
617 617 mfs = {} # needed manifests
618 618 fnodes = {} # needed file nodes
619 619 changedfiles = set()
620 620
621 621 # Callback for the changelog, used to collect changed files and manifest
622 622 # nodes.
623 623 # Returns the linkrev node (identity in the changelog case).
624 624 def lookupcl(x):
625 625 c = cl.read(x)
626 626 clrevorder[x] = len(clrevorder)
627 627 n = c[0]
628 628 # record the first changeset introducing this manifest version
629 629 mfs.setdefault(n, x)
630 630 # Record a complete list of potentially-changed files in
631 631 # this manifest.
632 632 changedfiles.update(c[3])
633 633 return x
634 634
635 635 self._verbosenote(_('uncompressed size of bundle content:\n'))
636 636 size = 0
637 637 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
638 638 size += len(chunk)
639 639 yield chunk
640 640 self._verbosenote(_('%8.i (changelog)\n') % size)
641 641
642 642 # We need to make sure that the linkrev in the changegroup refers to
643 643 # the first changeset that introduced the manifest or file revision.
644 644 # The fastpath is usually safer than the slowpath, because the filelogs
645 645 # are walked in revlog order.
646 646 #
647 647 # When taking the slowpath with reorder=None and the manifest revlog
648 648 # uses generaldelta, the manifest may be walked in the "wrong" order.
649 649 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
650 650 # cc0ff93d0c0c).
651 651 #
652 652 # When taking the fastpath, we are only vulnerable to reordering
653 653 # of the changelog itself. The changelog never uses generaldelta, so
654 654 # it is only reordered when reorder=True. To handle this case, we
655 655 # simply take the slowpath, which already has the 'clrevorder' logic.
656 656 # This was also fixed in cc0ff93d0c0c.
657 657 fastpathlinkrev = fastpathlinkrev and not self._reorder
658 658 # Treemanifests don't work correctly with fastpathlinkrev
659 659 # either, because we don't discover which directory nodes to
660 660 # send along with files. This could probably be fixed.
661 661 fastpathlinkrev = fastpathlinkrev and (
662 662 'treemanifest' not in repo.requirements)
663 663
664 664 for chunk in self.generatemanifests(commonrevs, clrevorder,
665 fastpathlinkrev, mfs, fnodes):
665 fastpathlinkrev, mfs, fnodes, source):
666 666 yield chunk
667 667 mfs.clear()
668 668 clrevs = set(cl.rev(x) for x in clnodes)
669 669
670 670 if not fastpathlinkrev:
671 671 def linknodes(unused, fname):
672 672 return fnodes.get(fname, {})
673 673 else:
674 674 cln = cl.node
675 675 def linknodes(filerevlog, fname):
676 676 llr = filerevlog.linkrev
677 677 fln = filerevlog.node
678 678 revs = ((r, llr(r)) for r in filerevlog)
679 679 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
680 680
681 681 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
682 682 source):
683 683 yield chunk
684 684
685 685 yield self.close()
686 686
687 687 if clnodes:
688 688 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
689 689
690 690 def generatemanifests(self, commonrevs, clrevorder, fastpathlinkrev, mfs,
691 fnodes):
691 fnodes, source):
692 """Returns an iterator of changegroup chunks containing manifests.
693
694 `source` is unused here, but is used by extensions like remotefilelog to
695 change what is sent based in pulls vs pushes, etc.
696 """
692 697 repo = self._repo
693 698 mfl = repo.manifestlog
694 699 dirlog = mfl._revlog.dirlog
695 700 tmfnodes = {'': mfs}
696 701
697 702 # Callback for the manifest, used to collect linkrevs for filelog
698 703 # revisions.
699 704 # Returns the linkrev node (collected in lookupcl).
700 705 def makelookupmflinknode(dir):
701 706 if fastpathlinkrev:
702 707 assert not dir
703 708 return mfs.__getitem__
704 709
705 710 def lookupmflinknode(x):
706 711 """Callback for looking up the linknode for manifests.
707 712
708 713 Returns the linkrev node for the specified manifest.
709 714
710 715 SIDE EFFECT:
711 716
712 717 1) fclnodes gets populated with the list of relevant
713 718 file nodes if we're not using fastpathlinkrev
714 719 2) When treemanifests are in use, collects treemanifest nodes
715 720 to send
716 721
717 722 Note that this means manifests must be completely sent to
718 723 the client before you can trust the list of files and
719 724 treemanifests to send.
720 725 """
721 726 clnode = tmfnodes[dir][x]
722 727 mdata = mfl.get(dir, x).readfast(shallow=True)
723 728 for p, n, fl in mdata.iterentries():
724 729 if fl == 't': # subdirectory manifest
725 730 subdir = dir + p + '/'
726 731 tmfclnodes = tmfnodes.setdefault(subdir, {})
727 732 tmfclnode = tmfclnodes.setdefault(n, clnode)
728 733 if clrevorder[clnode] < clrevorder[tmfclnode]:
729 734 tmfclnodes[n] = clnode
730 735 else:
731 736 f = dir + p
732 737 fclnodes = fnodes.setdefault(f, {})
733 738 fclnode = fclnodes.setdefault(n, clnode)
734 739 if clrevorder[clnode] < clrevorder[fclnode]:
735 740 fclnodes[n] = clnode
736 741 return clnode
737 742 return lookupmflinknode
738 743
739 744 size = 0
740 745 while tmfnodes:
741 746 dir = min(tmfnodes)
742 747 nodes = tmfnodes[dir]
743 748 prunednodes = self.prune(dirlog(dir), nodes, commonrevs)
744 749 if not dir or prunednodes:
745 750 for x in self._packmanifests(dir, prunednodes,
746 751 makelookupmflinknode(dir)):
747 752 size += len(x)
748 753 yield x
749 754 del tmfnodes[dir]
750 755 self._verbosenote(_('%8.i (manifests)\n') % size)
751 756 yield self._manifestsdone()
752 757
753 758 # The 'source' parameter is useful for extensions
754 759 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
755 760 repo = self._repo
756 761 progress = self._progress
757 762 msgbundling = _('bundling')
758 763
759 764 total = len(changedfiles)
760 765 # for progress output
761 766 msgfiles = _('files')
762 767 for i, fname in enumerate(sorted(changedfiles)):
763 768 filerevlog = repo.file(fname)
764 769 if not filerevlog:
765 770 raise error.Abort(_("empty or missing revlog for %s") % fname)
766 771
767 772 linkrevnodes = linknodes(filerevlog, fname)
768 773 # Lookup for filenodes, we collected the linkrev nodes above in the
769 774 # fastpath case and with lookupmf in the slowpath case.
770 775 def lookupfilelog(x):
771 776 return linkrevnodes[x]
772 777
773 778 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
774 779 if filenodes:
775 780 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
776 781 total=total)
777 782 h = self.fileheader(fname)
778 783 size = len(h)
779 784 yield h
780 785 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
781 786 size += len(chunk)
782 787 yield chunk
783 788 self._verbosenote(_('%8.i %s\n') % (size, fname))
784 789 progress(msgbundling, None)
785 790
786 791 def deltaparent(self, revlog, rev, p1, p2, prev):
787 792 return prev
788 793
789 794 def revchunk(self, revlog, rev, prev, linknode):
790 795 node = revlog.node(rev)
791 796 p1, p2 = revlog.parentrevs(rev)
792 797 base = self.deltaparent(revlog, rev, p1, p2, prev)
793 798
794 799 prefix = ''
795 800 if revlog.iscensored(base) or revlog.iscensored(rev):
796 801 try:
797 802 delta = revlog.revision(node, raw=True)
798 803 except error.CensoredNodeError as e:
799 804 delta = e.tombstone
800 805 if base == nullrev:
801 806 prefix = mdiff.trivialdiffheader(len(delta))
802 807 else:
803 808 baselen = revlog.rawsize(base)
804 809 prefix = mdiff.replacediffheader(baselen, len(delta))
805 810 elif base == nullrev:
806 811 delta = revlog.revision(node, raw=True)
807 812 prefix = mdiff.trivialdiffheader(len(delta))
808 813 else:
809 814 delta = revlog.revdiff(base, rev)
810 815 p1n, p2n = revlog.parents(node)
811 816 basenode = revlog.node(base)
812 817 flags = revlog.flags(rev)
813 818 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
814 819 meta += prefix
815 820 l = len(meta) + len(delta)
816 821 yield chunkheader(l)
817 822 yield meta
818 823 yield delta
819 824 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
820 825 # do nothing with basenode, it is implicitly the previous one in HG10
821 826 # do nothing with flags, it is implicitly 0 for cg1 and cg2
822 827 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
823 828
824 829 class cg2packer(cg1packer):
825 830 version = '02'
826 831 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
827 832
828 833 def __init__(self, repo, bundlecaps=None):
829 834 super(cg2packer, self).__init__(repo, bundlecaps)
830 835 if self._reorder is None:
831 836 # Since generaldelta is directly supported by cg2, reordering
832 837 # generally doesn't help, so we disable it by default (treating
833 838 # bundle.reorder=auto just like bundle.reorder=False).
834 839 self._reorder = False
835 840
836 841 def deltaparent(self, revlog, rev, p1, p2, prev):
837 842 dp = revlog.deltaparent(rev)
838 843 if dp == nullrev and revlog.storedeltachains:
839 844 # Avoid sending full revisions when delta parent is null. Pick prev
840 845 # in that case. It's tempting to pick p1 in this case, as p1 will
841 846 # be smaller in the common case. However, computing a delta against
842 847 # p1 may require resolving the raw text of p1, which could be
843 848 # expensive. The revlog caches should have prev cached, meaning
844 849 # less CPU for changegroup generation. There is likely room to add
845 850 # a flag and/or config option to control this behavior.
846 851 return prev
847 852 elif dp == nullrev:
848 853 # revlog is configured to use full snapshot for a reason,
849 854 # stick to full snapshot.
850 855 return nullrev
851 856 elif dp not in (p1, p2, prev):
852 857 # Pick prev when we can't be sure remote has the base revision.
853 858 return prev
854 859 else:
855 860 return dp
856 861
857 862 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
858 863 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
859 864 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
860 865
861 866 class cg3packer(cg2packer):
862 867 version = '03'
863 868 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
864 869
865 870 def _packmanifests(self, dir, mfnodes, lookuplinknode):
866 871 if dir:
867 872 yield self.fileheader(dir)
868 873
869 874 dirlog = self._repo.manifestlog._revlog.dirlog(dir)
870 875 for chunk in self.group(mfnodes, dirlog, lookuplinknode,
871 876 units=_('manifests')):
872 877 yield chunk
873 878
874 879 def _manifestsdone(self):
875 880 return self.close()
876 881
877 882 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
878 883 return struct.pack(
879 884 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
880 885
881 886 _packermap = {'01': (cg1packer, cg1unpacker),
882 887 # cg2 adds support for exchanging generaldelta
883 888 '02': (cg2packer, cg2unpacker),
884 889 # cg3 adds support for exchanging revlog flags and treemanifests
885 890 '03': (cg3packer, cg3unpacker),
886 891 }
887 892
888 893 def allsupportedversions(repo):
889 894 versions = set(_packermap.keys())
890 895 if not (repo.ui.configbool('experimental', 'changegroup3') or
891 896 repo.ui.configbool('experimental', 'treemanifest') or
892 897 'treemanifest' in repo.requirements):
893 898 versions.discard('03')
894 899 return versions
895 900
896 901 # Changegroup versions that can be applied to the repo
897 902 def supportedincomingversions(repo):
898 903 return allsupportedversions(repo)
899 904
900 905 # Changegroup versions that can be created from the repo
901 906 def supportedoutgoingversions(repo):
902 907 versions = allsupportedversions(repo)
903 908 if 'treemanifest' in repo.requirements:
904 909 # Versions 01 and 02 support only flat manifests and it's just too
905 910 # expensive to convert between the flat manifest and tree manifest on
906 911 # the fly. Since tree manifests are hashed differently, all of history
907 912 # would have to be converted. Instead, we simply don't even pretend to
908 913 # support versions 01 and 02.
909 914 versions.discard('01')
910 915 versions.discard('02')
911 916 return versions
912 917
913 918 def safeversion(repo):
914 919 # Finds the smallest version that it's safe to assume clients of the repo
915 920 # will support. For example, all hg versions that support generaldelta also
916 921 # support changegroup 02.
917 922 versions = supportedoutgoingversions(repo)
918 923 if 'generaldelta' in repo.requirements:
919 924 versions.discard('01')
920 925 assert versions
921 926 return min(versions)
922 927
923 928 def getbundler(version, repo, bundlecaps=None):
924 929 assert version in supportedoutgoingversions(repo)
925 930 return _packermap[version][0](repo, bundlecaps)
926 931
927 932 def getunbundler(version, fh, alg, extras=None):
928 933 return _packermap[version][1](fh, alg, extras=extras)
929 934
930 935 def _changegroupinfo(repo, nodes, source):
931 936 if repo.ui.verbose or source == 'bundle':
932 937 repo.ui.status(_("%d changesets found\n") % len(nodes))
933 938 if repo.ui.debugflag:
934 939 repo.ui.debug("list of changesets:\n")
935 940 for node in nodes:
936 941 repo.ui.debug("%s\n" % hex(node))
937 942
938 943 def makechangegroup(repo, outgoing, version, source, fastpath=False,
939 944 bundlecaps=None):
940 945 cgstream = makestream(repo, outgoing, version, source,
941 946 fastpath=fastpath, bundlecaps=bundlecaps)
942 947 return getunbundler(version, util.chunkbuffer(cgstream), None,
943 948 {'clcount': len(outgoing.missing) })
944 949
945 950 def makestream(repo, outgoing, version, source, fastpath=False,
946 951 bundlecaps=None):
947 952 bundler = getbundler(version, repo, bundlecaps=bundlecaps)
948 953
949 954 repo = repo.unfiltered()
950 955 commonrevs = outgoing.common
951 956 csets = outgoing.missing
952 957 heads = outgoing.missingheads
953 958 # We go through the fast path if we get told to, or if all (unfiltered
954 959 # heads have been requested (since we then know there all linkrevs will
955 960 # be pulled by the client).
956 961 heads.sort()
957 962 fastpathlinkrev = fastpath or (
958 963 repo.filtername is None and heads == sorted(repo.heads()))
959 964
960 965 repo.hook('preoutgoing', throw=True, source=source)
961 966 _changegroupinfo(repo, csets, source)
962 967 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
963 968
964 969 def _addchangegroupfiles(repo, source, revmap, trp, expectedfiles, needfiles):
965 970 revisions = 0
966 971 files = 0
967 972 for chunkdata in iter(source.filelogheader, {}):
968 973 files += 1
969 974 f = chunkdata["filename"]
970 975 repo.ui.debug("adding %s revisions\n" % f)
971 976 repo.ui.progress(_('files'), files, unit=_('files'),
972 977 total=expectedfiles)
973 978 fl = repo.file(f)
974 979 o = len(fl)
975 980 try:
976 981 deltas = source.deltaiter(revmap)
977 982 if not fl.addgroup(deltas, trp):
978 983 raise error.Abort(_("received file revlog group is empty"))
979 984 except error.CensoredBaseError as e:
980 985 raise error.Abort(_("received delta base is censored: %s") % e)
981 986 revisions += len(fl) - o
982 987 if f in needfiles:
983 988 needs = needfiles[f]
984 989 for new in xrange(o, len(fl)):
985 990 n = fl.node(new)
986 991 if n in needs:
987 992 needs.remove(n)
988 993 else:
989 994 raise error.Abort(
990 995 _("received spurious file revlog entry"))
991 996 if not needs:
992 997 del needfiles[f]
993 998 repo.ui.progress(_('files'), None)
994 999
995 1000 for f, needs in needfiles.iteritems():
996 1001 fl = repo.file(f)
997 1002 for n in needs:
998 1003 try:
999 1004 fl.rev(n)
1000 1005 except error.LookupError:
1001 1006 raise error.Abort(
1002 1007 _('missing file data for %s:%s - run hg verify') %
1003 1008 (f, hex(n)))
1004 1009
1005 1010 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now