##// END OF EJS Templates
changegroup: fix to allow empty manifest parts...
Durham Goode -
r34093:bbdca7e4 default
parent child Browse files
Show More
@@ -1,1011 +1,1024 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullrev,
19 19 short,
20 20 )
21 21
22 22 from . import (
23 23 dagutil,
24 24 discovery,
25 25 error,
26 26 mdiff,
27 27 phases,
28 28 pycompat,
29 29 util,
30 30 )
31 31
32 32 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
33 33 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
34 34 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
35 35
36 36 def readexactly(stream, n):
37 37 '''read n bytes from stream.read and abort if less was available'''
38 38 s = stream.read(n)
39 39 if len(s) < n:
40 40 raise error.Abort(_("stream ended unexpectedly"
41 41 " (got %d bytes, expected %d)")
42 42 % (len(s), n))
43 43 return s
44 44
45 45 def getchunk(stream):
46 46 """return the next chunk from stream as a string"""
47 47 d = readexactly(stream, 4)
48 48 l = struct.unpack(">l", d)[0]
49 49 if l <= 4:
50 50 if l:
51 51 raise error.Abort(_("invalid chunk length %d") % l)
52 52 return ""
53 53 return readexactly(stream, l - 4)
54 54
55 55 def chunkheader(length):
56 56 """return a changegroup chunk header (string)"""
57 57 return struct.pack(">l", length + 4)
58 58
59 59 def closechunk():
60 60 """return a changegroup chunk header (string) for a zero-length chunk"""
61 61 return struct.pack(">l", 0)
62 62
63 63 def writechunks(ui, chunks, filename, vfs=None):
64 64 """Write chunks to a file and return its filename.
65 65
66 66 The stream is assumed to be a bundle file.
67 67 Existing files will not be overwritten.
68 68 If no filename is specified, a temporary file is created.
69 69 """
70 70 fh = None
71 71 cleanup = None
72 72 try:
73 73 if filename:
74 74 if vfs:
75 75 fh = vfs.open(filename, "wb")
76 76 else:
77 77 # Increase default buffer size because default is usually
78 78 # small (4k is common on Linux).
79 79 fh = open(filename, "wb", 131072)
80 80 else:
81 81 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
82 82 fh = os.fdopen(fd, pycompat.sysstr("wb"))
83 83 cleanup = filename
84 84 for c in chunks:
85 85 fh.write(c)
86 86 cleanup = None
87 87 return filename
88 88 finally:
89 89 if fh is not None:
90 90 fh.close()
91 91 if cleanup is not None:
92 92 if filename and vfs:
93 93 vfs.unlink(cleanup)
94 94 else:
95 95 os.unlink(cleanup)
96 96
97 97 class cg1unpacker(object):
98 98 """Unpacker for cg1 changegroup streams.
99 99
100 100 A changegroup unpacker handles the framing of the revision data in
101 101 the wire format. Most consumers will want to use the apply()
102 102 method to add the changes from the changegroup to a repository.
103 103
104 104 If you're forwarding a changegroup unmodified to another consumer,
105 105 use getchunks(), which returns an iterator of changegroup
106 106 chunks. This is mostly useful for cases where you need to know the
107 107 data stream has ended by observing the end of the changegroup.
108 108
109 109 deltachunk() is useful only if you're applying delta data. Most
110 110 consumers should prefer apply() instead.
111 111
112 112 A few other public methods exist. Those are used only for
113 113 bundlerepo and some debug commands - their use is discouraged.
114 114 """
115 115 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
116 116 deltaheadersize = struct.calcsize(deltaheader)
117 117 version = '01'
118 118 _grouplistcount = 1 # One list of files after the manifests
119 119
120 120 def __init__(self, fh, alg, extras=None):
121 121 if alg is None:
122 122 alg = 'UN'
123 123 if alg not in util.compengines.supportedbundletypes:
124 124 raise error.Abort(_('unknown stream compression type: %s')
125 125 % alg)
126 126 if alg == 'BZ':
127 127 alg = '_truncatedBZ'
128 128
129 129 compengine = util.compengines.forbundletype(alg)
130 130 self._stream = compengine.decompressorreader(fh)
131 131 self._type = alg
132 132 self.extras = extras or {}
133 133 self.callback = None
134 134
135 135 # These methods (compressed, read, seek, tell) all appear to only
136 136 # be used by bundlerepo, but it's a little hard to tell.
137 137 def compressed(self):
138 138 return self._type is not None and self._type != 'UN'
139 139 def read(self, l):
140 140 return self._stream.read(l)
141 141 def seek(self, pos):
142 142 return self._stream.seek(pos)
143 143 def tell(self):
144 144 return self._stream.tell()
145 145 def close(self):
146 146 return self._stream.close()
147 147
148 148 def _chunklength(self):
149 149 d = readexactly(self._stream, 4)
150 150 l = struct.unpack(">l", d)[0]
151 151 if l <= 4:
152 152 if l:
153 153 raise error.Abort(_("invalid chunk length %d") % l)
154 154 return 0
155 155 if self.callback:
156 156 self.callback()
157 157 return l - 4
158 158
159 159 def changelogheader(self):
160 160 """v10 does not have a changelog header chunk"""
161 161 return {}
162 162
163 163 def manifestheader(self):
164 164 """v10 does not have a manifest header chunk"""
165 165 return {}
166 166
167 167 def filelogheader(self):
168 168 """return the header of the filelogs chunk, v10 only has the filename"""
169 169 l = self._chunklength()
170 170 if not l:
171 171 return {}
172 172 fname = readexactly(self._stream, l)
173 173 return {'filename': fname}
174 174
175 175 def _deltaheader(self, headertuple, prevnode):
176 176 node, p1, p2, cs = headertuple
177 177 if prevnode is None:
178 178 deltabase = p1
179 179 else:
180 180 deltabase = prevnode
181 181 flags = 0
182 182 return node, p1, p2, deltabase, cs, flags
183 183
184 184 def deltachunk(self, prevnode):
185 185 l = self._chunklength()
186 186 if not l:
187 187 return {}
188 188 headerdata = readexactly(self._stream, self.deltaheadersize)
189 189 header = struct.unpack(self.deltaheader, headerdata)
190 190 delta = readexactly(self._stream, l - self.deltaheadersize)
191 191 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
192 192 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
193 193 'deltabase': deltabase, 'delta': delta, 'flags': flags}
194 194
195 195 def getchunks(self):
196 196 """returns all the chunks contains in the bundle
197 197
198 198 Used when you need to forward the binary stream to a file or another
199 199 network API. To do so, it parse the changegroup data, otherwise it will
200 200 block in case of sshrepo because it don't know the end of the stream.
201 201 """
202 # an empty chunkgroup is the end of the changegroup
203 # a changegroup has at least 2 chunkgroups (changelog and manifest).
204 # after that, changegroup versions 1 and 2 have a series of groups
205 # with one group per file. changegroup 3 has a series of directory
206 # manifests before the files.
207 count = 0
208 emptycount = 0
209 while emptycount < self._grouplistcount:
210 empty = True
211 count += 1
202 # For changegroup 1 and 2, we expect 3 parts: changelog, manifestlog,
203 # and a list of filelogs. For changegroup 3, we expect 4 parts:
204 # changelog, manifestlog, a list of tree manifestlogs, and a list of
205 # filelogs.
206 #
207 # Changelog and manifestlog parts are terminated with empty chunks. The
208 # tree and file parts are a list of entry sections. Each entry section
209 # is a series of chunks terminating in an empty chunk. The list of these
210 # entry sections is terminated in yet another empty chunk, so we know
211 # we've reached the end of the tree/file list when we reach an empty
212 # chunk that was proceeded by no non-empty chunks.
213
214 parts = 0
215 while parts < 2 + self._grouplistcount:
216 noentries = True
212 217 while True:
213 218 chunk = getchunk(self)
214 219 if not chunk:
215 if empty and count > 2:
216 emptycount += 1
220 # The first two empty chunks represent the end of the
221 # changelog and the manifestlog portions. The remaining
222 # empty chunks represent either A) the end of individual
223 # tree or file entries in the file list, or B) the end of
224 # the entire list. It's the end of the entire list if there
225 # were no entries (i.e. noentries is True).
226 if parts < 2:
227 parts += 1
228 elif noentries:
229 parts += 1
217 230 break
218 empty = False
231 noentries = False
219 232 yield chunkheader(len(chunk))
220 233 pos = 0
221 234 while pos < len(chunk):
222 235 next = pos + 2**20
223 236 yield chunk[pos:next]
224 237 pos = next
225 238 yield closechunk()
226 239
227 240 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
228 241 # We know that we'll never have more manifests than we had
229 242 # changesets.
230 243 self.callback = prog(_('manifests'), numchanges)
231 244 # no need to check for empty manifest group here:
232 245 # if the result of the merge of 1 and 2 is the same in 3 and 4,
233 246 # no new manifest will be created and the manifest group will
234 247 # be empty during the pull
235 248 self.manifestheader()
236 249 repo.manifestlog._revlog.addgroup(self, revmap, trp)
237 250 repo.ui.progress(_('manifests'), None)
238 251 self.callback = None
239 252
240 253 def apply(self, repo, tr, srctype, url, targetphase=phases.draft,
241 254 expectedtotal=None):
242 255 """Add the changegroup returned by source.read() to this repo.
243 256 srctype is a string like 'push', 'pull', or 'unbundle'. url is
244 257 the URL of the repo where this changegroup is coming from.
245 258
246 259 Return an integer summarizing the change to this repo:
247 260 - nothing changed or no source: 0
248 261 - more heads than before: 1+added heads (2..n)
249 262 - fewer heads than before: -1-removed heads (-2..-n)
250 263 - number of heads stays the same: 1
251 264 """
252 265 repo = repo.unfiltered()
253 266 def csmap(x):
254 267 repo.ui.debug("add changeset %s\n" % short(x))
255 268 return len(cl)
256 269
257 270 def revmap(x):
258 271 return cl.rev(x)
259 272
260 273 changesets = files = revisions = 0
261 274
262 275 try:
263 276 # The transaction may already carry source information. In this
264 277 # case we use the top level data. We overwrite the argument
265 278 # because we need to use the top level value (if they exist)
266 279 # in this function.
267 280 srctype = tr.hookargs.setdefault('source', srctype)
268 281 url = tr.hookargs.setdefault('url', url)
269 282 repo.hook('prechangegroup',
270 283 throw=True, **pycompat.strkwargs(tr.hookargs))
271 284
272 285 # write changelog data to temp files so concurrent readers
273 286 # will not see an inconsistent view
274 287 cl = repo.changelog
275 288 cl.delayupdate(tr)
276 289 oldheads = set(cl.heads())
277 290
278 291 trp = weakref.proxy(tr)
279 292 # pull off the changeset group
280 293 repo.ui.status(_("adding changesets\n"))
281 294 clstart = len(cl)
282 295 class prog(object):
283 296 def __init__(self, step, total):
284 297 self._step = step
285 298 self._total = total
286 299 self._count = 1
287 300 def __call__(self):
288 301 repo.ui.progress(self._step, self._count, unit=_('chunks'),
289 302 total=self._total)
290 303 self._count += 1
291 304 self.callback = prog(_('changesets'), expectedtotal)
292 305
293 306 efiles = set()
294 307 def onchangelog(cl, node):
295 308 efiles.update(cl.readfiles(node))
296 309
297 310 self.changelogheader()
298 311 cgnodes = cl.addgroup(self, csmap, trp, addrevisioncb=onchangelog)
299 312 efiles = len(efiles)
300 313
301 314 if not cgnodes:
302 315 repo.ui.develwarn('applied empty changegroup',
303 316 config='empty-changegroup')
304 317 clend = len(cl)
305 318 changesets = clend - clstart
306 319 repo.ui.progress(_('changesets'), None)
307 320 self.callback = None
308 321
309 322 # pull off the manifest group
310 323 repo.ui.status(_("adding manifests\n"))
311 324 self._unpackmanifests(repo, revmap, trp, prog, changesets)
312 325
313 326 needfiles = {}
314 327 if repo.ui.configbool('server', 'validate'):
315 328 cl = repo.changelog
316 329 ml = repo.manifestlog
317 330 # validate incoming csets have their manifests
318 331 for cset in xrange(clstart, clend):
319 332 mfnode = cl.changelogrevision(cset).manifest
320 333 mfest = ml[mfnode].readdelta()
321 334 # store file cgnodes we must see
322 335 for f, n in mfest.iteritems():
323 336 needfiles.setdefault(f, set()).add(n)
324 337
325 338 # process the files
326 339 repo.ui.status(_("adding file changes\n"))
327 340 newrevs, newfiles = _addchangegroupfiles(
328 341 repo, self, revmap, trp, efiles, needfiles)
329 342 revisions += newrevs
330 343 files += newfiles
331 344
332 345 deltaheads = 0
333 346 if oldheads:
334 347 heads = cl.heads()
335 348 deltaheads = len(heads) - len(oldheads)
336 349 for h in heads:
337 350 if h not in oldheads and repo[h].closesbranch():
338 351 deltaheads -= 1
339 352 htext = ""
340 353 if deltaheads:
341 354 htext = _(" (%+d heads)") % deltaheads
342 355
343 356 repo.ui.status(_("added %d changesets"
344 357 " with %d changes to %d files%s\n")
345 358 % (changesets, revisions, files, htext))
346 359 repo.invalidatevolatilesets()
347 360
348 361 if changesets > 0:
349 362 if 'node' not in tr.hookargs:
350 363 tr.hookargs['node'] = hex(cl.node(clstart))
351 364 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
352 365 hookargs = dict(tr.hookargs)
353 366 else:
354 367 hookargs = dict(tr.hookargs)
355 368 hookargs['node'] = hex(cl.node(clstart))
356 369 hookargs['node_last'] = hex(cl.node(clend - 1))
357 370 repo.hook('pretxnchangegroup',
358 371 throw=True, **pycompat.strkwargs(hookargs))
359 372
360 373 added = [cl.node(r) for r in xrange(clstart, clend)]
361 374 phaseall = None
362 375 if srctype in ('push', 'serve'):
363 376 # Old servers can not push the boundary themselves.
364 377 # New servers won't push the boundary if changeset already
365 378 # exists locally as secret
366 379 #
367 380 # We should not use added here but the list of all change in
368 381 # the bundle
369 382 if repo.publishing():
370 383 targetphase = phaseall = phases.public
371 384 else:
372 385 # closer target phase computation
373 386
374 387 # Those changesets have been pushed from the
375 388 # outside, their phases are going to be pushed
376 389 # alongside. Therefor `targetphase` is
377 390 # ignored.
378 391 targetphase = phaseall = phases.draft
379 392 if added:
380 393 phases.registernew(repo, tr, targetphase, added)
381 394 if phaseall is not None:
382 395 phases.advanceboundary(repo, tr, phaseall, cgnodes)
383 396
384 397 if changesets > 0:
385 398
386 399 def runhooks():
387 400 # These hooks run when the lock releases, not when the
388 401 # transaction closes. So it's possible for the changelog
389 402 # to have changed since we last saw it.
390 403 if clstart >= len(repo):
391 404 return
392 405
393 406 repo.hook("changegroup", **pycompat.strkwargs(hookargs))
394 407
395 408 for n in added:
396 409 args = hookargs.copy()
397 410 args['node'] = hex(n)
398 411 del args['node_last']
399 412 repo.hook("incoming", **pycompat.strkwargs(args))
400 413
401 414 newheads = [h for h in repo.heads()
402 415 if h not in oldheads]
403 416 repo.ui.log("incoming",
404 417 "%s incoming changes - new heads: %s\n",
405 418 len(added),
406 419 ', '.join([hex(c[:6]) for c in newheads]))
407 420
408 421 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
409 422 lambda tr: repo._afterlock(runhooks))
410 423 finally:
411 424 repo.ui.flush()
412 425 # never return 0 here:
413 426 if deltaheads < 0:
414 427 ret = deltaheads - 1
415 428 else:
416 429 ret = deltaheads + 1
417 430 return ret
418 431
419 432 class cg2unpacker(cg1unpacker):
420 433 """Unpacker for cg2 streams.
421 434
422 435 cg2 streams add support for generaldelta, so the delta header
423 436 format is slightly different. All other features about the data
424 437 remain the same.
425 438 """
426 439 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
427 440 deltaheadersize = struct.calcsize(deltaheader)
428 441 version = '02'
429 442
430 443 def _deltaheader(self, headertuple, prevnode):
431 444 node, p1, p2, deltabase, cs = headertuple
432 445 flags = 0
433 446 return node, p1, p2, deltabase, cs, flags
434 447
435 448 class cg3unpacker(cg2unpacker):
436 449 """Unpacker for cg3 streams.
437 450
438 451 cg3 streams add support for exchanging treemanifests and revlog
439 452 flags. It adds the revlog flags to the delta header and an empty chunk
440 453 separating manifests and files.
441 454 """
442 455 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
443 456 deltaheadersize = struct.calcsize(deltaheader)
444 457 version = '03'
445 458 _grouplistcount = 2 # One list of manifests and one list of files
446 459
447 460 def _deltaheader(self, headertuple, prevnode):
448 461 node, p1, p2, deltabase, cs, flags = headertuple
449 462 return node, p1, p2, deltabase, cs, flags
450 463
451 464 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
452 465 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog,
453 466 numchanges)
454 467 for chunkdata in iter(self.filelogheader, {}):
455 468 # If we get here, there are directory manifests in the changegroup
456 469 d = chunkdata["filename"]
457 470 repo.ui.debug("adding %s revisions\n" % d)
458 471 dirlog = repo.manifestlog._revlog.dirlog(d)
459 472 if not dirlog.addgroup(self, revmap, trp):
460 473 raise error.Abort(_("received dir revlog group is empty"))
461 474
462 475 class headerlessfixup(object):
463 476 def __init__(self, fh, h):
464 477 self._h = h
465 478 self._fh = fh
466 479 def read(self, n):
467 480 if self._h:
468 481 d, self._h = self._h[:n], self._h[n:]
469 482 if len(d) < n:
470 483 d += readexactly(self._fh, n - len(d))
471 484 return d
472 485 return readexactly(self._fh, n)
473 486
474 487 class cg1packer(object):
475 488 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
476 489 version = '01'
477 490 def __init__(self, repo, bundlecaps=None):
478 491 """Given a source repo, construct a bundler.
479 492
480 493 bundlecaps is optional and can be used to specify the set of
481 494 capabilities which can be used to build the bundle. While bundlecaps is
482 495 unused in core Mercurial, extensions rely on this feature to communicate
483 496 capabilities to customize the changegroup packer.
484 497 """
485 498 # Set of capabilities we can use to build the bundle.
486 499 if bundlecaps is None:
487 500 bundlecaps = set()
488 501 self._bundlecaps = bundlecaps
489 502 # experimental config: bundle.reorder
490 503 reorder = repo.ui.config('bundle', 'reorder')
491 504 if reorder == 'auto':
492 505 reorder = None
493 506 else:
494 507 reorder = util.parsebool(reorder)
495 508 self._repo = repo
496 509 self._reorder = reorder
497 510 self._progress = repo.ui.progress
498 511 if self._repo.ui.verbose and not self._repo.ui.debugflag:
499 512 self._verbosenote = self._repo.ui.note
500 513 else:
501 514 self._verbosenote = lambda s: None
502 515
503 516 def close(self):
504 517 return closechunk()
505 518
506 519 def fileheader(self, fname):
507 520 return chunkheader(len(fname)) + fname
508 521
509 522 # Extracted both for clarity and for overriding in extensions.
510 523 def _sortgroup(self, revlog, nodelist, lookup):
511 524 """Sort nodes for change group and turn them into revnums."""
512 525 # for generaldelta revlogs, we linearize the revs; this will both be
513 526 # much quicker and generate a much smaller bundle
514 527 if (revlog._generaldelta and self._reorder is None) or self._reorder:
515 528 dag = dagutil.revlogdag(revlog)
516 529 return dag.linearize(set(revlog.rev(n) for n in nodelist))
517 530 else:
518 531 return sorted([revlog.rev(n) for n in nodelist])
519 532
520 533 def group(self, nodelist, revlog, lookup, units=None):
521 534 """Calculate a delta group, yielding a sequence of changegroup chunks
522 535 (strings).
523 536
524 537 Given a list of changeset revs, return a set of deltas and
525 538 metadata corresponding to nodes. The first delta is
526 539 first parent(nodelist[0]) -> nodelist[0], the receiver is
527 540 guaranteed to have this parent as it has all history before
528 541 these changesets. In the case firstparent is nullrev the
529 542 changegroup starts with a full revision.
530 543
531 544 If units is not None, progress detail will be generated, units specifies
532 545 the type of revlog that is touched (changelog, manifest, etc.).
533 546 """
534 547 # if we don't have any revisions touched by these changesets, bail
535 548 if len(nodelist) == 0:
536 549 yield self.close()
537 550 return
538 551
539 552 revs = self._sortgroup(revlog, nodelist, lookup)
540 553
541 554 # add the parent of the first rev
542 555 p = revlog.parentrevs(revs[0])[0]
543 556 revs.insert(0, p)
544 557
545 558 # build deltas
546 559 total = len(revs) - 1
547 560 msgbundling = _('bundling')
548 561 for r in xrange(len(revs) - 1):
549 562 if units is not None:
550 563 self._progress(msgbundling, r + 1, unit=units, total=total)
551 564 prev, curr = revs[r], revs[r + 1]
552 565 linknode = lookup(revlog.node(curr))
553 566 for c in self.revchunk(revlog, curr, prev, linknode):
554 567 yield c
555 568
556 569 if units is not None:
557 570 self._progress(msgbundling, None)
558 571 yield self.close()
559 572
560 573 # filter any nodes that claim to be part of the known set
561 574 def prune(self, revlog, missing, commonrevs):
562 575 rr, rl = revlog.rev, revlog.linkrev
563 576 return [n for n in missing if rl(rr(n)) not in commonrevs]
564 577
565 578 def _packmanifests(self, dir, mfnodes, lookuplinknode):
566 579 """Pack flat manifests into a changegroup stream."""
567 580 assert not dir
568 581 for chunk in self.group(mfnodes, self._repo.manifestlog._revlog,
569 582 lookuplinknode, units=_('manifests')):
570 583 yield chunk
571 584
572 585 def _manifestsdone(self):
573 586 return ''
574 587
575 588 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
576 589 '''yield a sequence of changegroup chunks (strings)'''
577 590 repo = self._repo
578 591 cl = repo.changelog
579 592
580 593 clrevorder = {}
581 594 mfs = {} # needed manifests
582 595 fnodes = {} # needed file nodes
583 596 changedfiles = set()
584 597
585 598 # Callback for the changelog, used to collect changed files and manifest
586 599 # nodes.
587 600 # Returns the linkrev node (identity in the changelog case).
588 601 def lookupcl(x):
589 602 c = cl.read(x)
590 603 clrevorder[x] = len(clrevorder)
591 604 n = c[0]
592 605 # record the first changeset introducing this manifest version
593 606 mfs.setdefault(n, x)
594 607 # Record a complete list of potentially-changed files in
595 608 # this manifest.
596 609 changedfiles.update(c[3])
597 610 return x
598 611
599 612 self._verbosenote(_('uncompressed size of bundle content:\n'))
600 613 size = 0
601 614 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
602 615 size += len(chunk)
603 616 yield chunk
604 617 self._verbosenote(_('%8.i (changelog)\n') % size)
605 618
606 619 # We need to make sure that the linkrev in the changegroup refers to
607 620 # the first changeset that introduced the manifest or file revision.
608 621 # The fastpath is usually safer than the slowpath, because the filelogs
609 622 # are walked in revlog order.
610 623 #
611 624 # When taking the slowpath with reorder=None and the manifest revlog
612 625 # uses generaldelta, the manifest may be walked in the "wrong" order.
613 626 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
614 627 # cc0ff93d0c0c).
615 628 #
616 629 # When taking the fastpath, we are only vulnerable to reordering
617 630 # of the changelog itself. The changelog never uses generaldelta, so
618 631 # it is only reordered when reorder=True. To handle this case, we
619 632 # simply take the slowpath, which already has the 'clrevorder' logic.
620 633 # This was also fixed in cc0ff93d0c0c.
621 634 fastpathlinkrev = fastpathlinkrev and not self._reorder
622 635 # Treemanifests don't work correctly with fastpathlinkrev
623 636 # either, because we don't discover which directory nodes to
624 637 # send along with files. This could probably be fixed.
625 638 fastpathlinkrev = fastpathlinkrev and (
626 639 'treemanifest' not in repo.requirements)
627 640
628 641 for chunk in self.generatemanifests(commonrevs, clrevorder,
629 642 fastpathlinkrev, mfs, fnodes):
630 643 yield chunk
631 644 mfs.clear()
632 645 clrevs = set(cl.rev(x) for x in clnodes)
633 646
634 647 if not fastpathlinkrev:
635 648 def linknodes(unused, fname):
636 649 return fnodes.get(fname, {})
637 650 else:
638 651 cln = cl.node
639 652 def linknodes(filerevlog, fname):
640 653 llr = filerevlog.linkrev
641 654 fln = filerevlog.node
642 655 revs = ((r, llr(r)) for r in filerevlog)
643 656 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
644 657
645 658 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
646 659 source):
647 660 yield chunk
648 661
649 662 yield self.close()
650 663
651 664 if clnodes:
652 665 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
653 666
654 667 def generatemanifests(self, commonrevs, clrevorder, fastpathlinkrev, mfs,
655 668 fnodes):
656 669 repo = self._repo
657 670 mfl = repo.manifestlog
658 671 dirlog = mfl._revlog.dirlog
659 672 tmfnodes = {'': mfs}
660 673
661 674 # Callback for the manifest, used to collect linkrevs for filelog
662 675 # revisions.
663 676 # Returns the linkrev node (collected in lookupcl).
664 677 def makelookupmflinknode(dir):
665 678 if fastpathlinkrev:
666 679 assert not dir
667 680 return mfs.__getitem__
668 681
669 682 def lookupmflinknode(x):
670 683 """Callback for looking up the linknode for manifests.
671 684
672 685 Returns the linkrev node for the specified manifest.
673 686
674 687 SIDE EFFECT:
675 688
676 689 1) fclnodes gets populated with the list of relevant
677 690 file nodes if we're not using fastpathlinkrev
678 691 2) When treemanifests are in use, collects treemanifest nodes
679 692 to send
680 693
681 694 Note that this means manifests must be completely sent to
682 695 the client before you can trust the list of files and
683 696 treemanifests to send.
684 697 """
685 698 clnode = tmfnodes[dir][x]
686 699 mdata = mfl.get(dir, x).readfast(shallow=True)
687 700 for p, n, fl in mdata.iterentries():
688 701 if fl == 't': # subdirectory manifest
689 702 subdir = dir + p + '/'
690 703 tmfclnodes = tmfnodes.setdefault(subdir, {})
691 704 tmfclnode = tmfclnodes.setdefault(n, clnode)
692 705 if clrevorder[clnode] < clrevorder[tmfclnode]:
693 706 tmfclnodes[n] = clnode
694 707 else:
695 708 f = dir + p
696 709 fclnodes = fnodes.setdefault(f, {})
697 710 fclnode = fclnodes.setdefault(n, clnode)
698 711 if clrevorder[clnode] < clrevorder[fclnode]:
699 712 fclnodes[n] = clnode
700 713 return clnode
701 714 return lookupmflinknode
702 715
703 716 size = 0
704 717 while tmfnodes:
705 718 dir = min(tmfnodes)
706 719 nodes = tmfnodes[dir]
707 720 prunednodes = self.prune(dirlog(dir), nodes, commonrevs)
708 721 if not dir or prunednodes:
709 722 for x in self._packmanifests(dir, prunednodes,
710 723 makelookupmflinknode(dir)):
711 724 size += len(x)
712 725 yield x
713 726 del tmfnodes[dir]
714 727 self._verbosenote(_('%8.i (manifests)\n') % size)
715 728 yield self._manifestsdone()
716 729
717 730 # The 'source' parameter is useful for extensions
718 731 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
719 732 repo = self._repo
720 733 progress = self._progress
721 734 msgbundling = _('bundling')
722 735
723 736 total = len(changedfiles)
724 737 # for progress output
725 738 msgfiles = _('files')
726 739 for i, fname in enumerate(sorted(changedfiles)):
727 740 filerevlog = repo.file(fname)
728 741 if not filerevlog:
729 742 raise error.Abort(_("empty or missing revlog for %s") % fname)
730 743
731 744 linkrevnodes = linknodes(filerevlog, fname)
732 745 # Lookup for filenodes, we collected the linkrev nodes above in the
733 746 # fastpath case and with lookupmf in the slowpath case.
734 747 def lookupfilelog(x):
735 748 return linkrevnodes[x]
736 749
737 750 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
738 751 if filenodes:
739 752 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
740 753 total=total)
741 754 h = self.fileheader(fname)
742 755 size = len(h)
743 756 yield h
744 757 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
745 758 size += len(chunk)
746 759 yield chunk
747 760 self._verbosenote(_('%8.i %s\n') % (size, fname))
748 761 progress(msgbundling, None)
749 762
750 763 def deltaparent(self, revlog, rev, p1, p2, prev):
751 764 return prev
752 765
753 766 def revchunk(self, revlog, rev, prev, linknode):
754 767 node = revlog.node(rev)
755 768 p1, p2 = revlog.parentrevs(rev)
756 769 base = self.deltaparent(revlog, rev, p1, p2, prev)
757 770
758 771 prefix = ''
759 772 if revlog.iscensored(base) or revlog.iscensored(rev):
760 773 try:
761 774 delta = revlog.revision(node, raw=True)
762 775 except error.CensoredNodeError as e:
763 776 delta = e.tombstone
764 777 if base == nullrev:
765 778 prefix = mdiff.trivialdiffheader(len(delta))
766 779 else:
767 780 baselen = revlog.rawsize(base)
768 781 prefix = mdiff.replacediffheader(baselen, len(delta))
769 782 elif base == nullrev:
770 783 delta = revlog.revision(node, raw=True)
771 784 prefix = mdiff.trivialdiffheader(len(delta))
772 785 else:
773 786 delta = revlog.revdiff(base, rev)
774 787 p1n, p2n = revlog.parents(node)
775 788 basenode = revlog.node(base)
776 789 flags = revlog.flags(rev)
777 790 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
778 791 meta += prefix
779 792 l = len(meta) + len(delta)
780 793 yield chunkheader(l)
781 794 yield meta
782 795 yield delta
783 796 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
784 797 # do nothing with basenode, it is implicitly the previous one in HG10
785 798 # do nothing with flags, it is implicitly 0 for cg1 and cg2
786 799 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
787 800
788 801 class cg2packer(cg1packer):
789 802 version = '02'
790 803 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
791 804
792 805 def __init__(self, repo, bundlecaps=None):
793 806 super(cg2packer, self).__init__(repo, bundlecaps)
794 807 if self._reorder is None:
795 808 # Since generaldelta is directly supported by cg2, reordering
796 809 # generally doesn't help, so we disable it by default (treating
797 810 # bundle.reorder=auto just like bundle.reorder=False).
798 811 self._reorder = False
799 812
800 813 def deltaparent(self, revlog, rev, p1, p2, prev):
801 814 dp = revlog.deltaparent(rev)
802 815 if dp == nullrev and revlog.storedeltachains:
803 816 # Avoid sending full revisions when delta parent is null. Pick prev
804 817 # in that case. It's tempting to pick p1 in this case, as p1 will
805 818 # be smaller in the common case. However, computing a delta against
806 819 # p1 may require resolving the raw text of p1, which could be
807 820 # expensive. The revlog caches should have prev cached, meaning
808 821 # less CPU for changegroup generation. There is likely room to add
809 822 # a flag and/or config option to control this behavior.
810 823 return prev
811 824 elif dp == nullrev:
812 825 # revlog is configured to use full snapshot for a reason,
813 826 # stick to full snapshot.
814 827 return nullrev
815 828 elif dp not in (p1, p2, prev):
816 829 # Pick prev when we can't be sure remote has the base revision.
817 830 return prev
818 831 else:
819 832 return dp
820 833
821 834 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
822 835 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
823 836 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
824 837
825 838 class cg3packer(cg2packer):
826 839 version = '03'
827 840 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
828 841
829 842 def _packmanifests(self, dir, mfnodes, lookuplinknode):
830 843 if dir:
831 844 yield self.fileheader(dir)
832 845
833 846 dirlog = self._repo.manifestlog._revlog.dirlog(dir)
834 847 for chunk in self.group(mfnodes, dirlog, lookuplinknode,
835 848 units=_('manifests')):
836 849 yield chunk
837 850
838 851 def _manifestsdone(self):
839 852 return self.close()
840 853
841 854 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
842 855 return struct.pack(
843 856 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
844 857
845 858 _packermap = {'01': (cg1packer, cg1unpacker),
846 859 # cg2 adds support for exchanging generaldelta
847 860 '02': (cg2packer, cg2unpacker),
848 861 # cg3 adds support for exchanging revlog flags and treemanifests
849 862 '03': (cg3packer, cg3unpacker),
850 863 }
851 864
852 865 def allsupportedversions(repo):
853 866 versions = set(_packermap.keys())
854 867 if not (repo.ui.configbool('experimental', 'changegroup3') or
855 868 repo.ui.configbool('experimental', 'treemanifest') or
856 869 'treemanifest' in repo.requirements):
857 870 versions.discard('03')
858 871 return versions
859 872
860 873 # Changegroup versions that can be applied to the repo
861 874 def supportedincomingversions(repo):
862 875 return allsupportedversions(repo)
863 876
864 877 # Changegroup versions that can be created from the repo
865 878 def supportedoutgoingversions(repo):
866 879 versions = allsupportedversions(repo)
867 880 if 'treemanifest' in repo.requirements:
868 881 # Versions 01 and 02 support only flat manifests and it's just too
869 882 # expensive to convert between the flat manifest and tree manifest on
870 883 # the fly. Since tree manifests are hashed differently, all of history
871 884 # would have to be converted. Instead, we simply don't even pretend to
872 885 # support versions 01 and 02.
873 886 versions.discard('01')
874 887 versions.discard('02')
875 888 return versions
876 889
877 890 def safeversion(repo):
878 891 # Finds the smallest version that it's safe to assume clients of the repo
879 892 # will support. For example, all hg versions that support generaldelta also
880 893 # support changegroup 02.
881 894 versions = supportedoutgoingversions(repo)
882 895 if 'generaldelta' in repo.requirements:
883 896 versions.discard('01')
884 897 assert versions
885 898 return min(versions)
886 899
887 900 def getbundler(version, repo, bundlecaps=None):
888 901 assert version in supportedoutgoingversions(repo)
889 902 return _packermap[version][0](repo, bundlecaps)
890 903
891 904 def getunbundler(version, fh, alg, extras=None):
892 905 return _packermap[version][1](fh, alg, extras=extras)
893 906
894 907 def _changegroupinfo(repo, nodes, source):
895 908 if repo.ui.verbose or source == 'bundle':
896 909 repo.ui.status(_("%d changesets found\n") % len(nodes))
897 910 if repo.ui.debugflag:
898 911 repo.ui.debug("list of changesets:\n")
899 912 for node in nodes:
900 913 repo.ui.debug("%s\n" % hex(node))
901 914
902 915 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
903 916 repo = repo.unfiltered()
904 917 commonrevs = outgoing.common
905 918 csets = outgoing.missing
906 919 heads = outgoing.missingheads
907 920 # We go through the fast path if we get told to, or if all (unfiltered
908 921 # heads have been requested (since we then know there all linkrevs will
909 922 # be pulled by the client).
910 923 heads.sort()
911 924 fastpathlinkrev = fastpath or (
912 925 repo.filtername is None and heads == sorted(repo.heads()))
913 926
914 927 repo.hook('preoutgoing', throw=True, source=source)
915 928 _changegroupinfo(repo, csets, source)
916 929 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
917 930
918 931 def getsubset(repo, outgoing, bundler, source, fastpath=False):
919 932 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
920 933 return getunbundler(bundler.version, util.chunkbuffer(gengroup), None,
921 934 {'clcount': len(outgoing.missing)})
922 935
923 936 def changegroupsubset(repo, roots, heads, source, version='01'):
924 937 """Compute a changegroup consisting of all the nodes that are
925 938 descendants of any of the roots and ancestors of any of the heads.
926 939 Return a chunkbuffer object whose read() method will return
927 940 successive changegroup chunks.
928 941
929 942 It is fairly complex as determining which filenodes and which
930 943 manifest nodes need to be included for the changeset to be complete
931 944 is non-trivial.
932 945
933 946 Another wrinkle is doing the reverse, figuring out which changeset in
934 947 the changegroup a particular filenode or manifestnode belongs to.
935 948 """
936 949 outgoing = discovery.outgoing(repo, missingroots=roots, missingheads=heads)
937 950 bundler = getbundler(version, repo)
938 951 return getsubset(repo, outgoing, bundler, source)
939 952
940 953 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
941 954 version='01'):
942 955 """Like getbundle, but taking a discovery.outgoing as an argument.
943 956
944 957 This is only implemented for local repos and reuses potentially
945 958 precomputed sets in outgoing. Returns a raw changegroup generator."""
946 959 if not outgoing.missing:
947 960 return None
948 961 bundler = getbundler(version, repo, bundlecaps)
949 962 return getsubsetraw(repo, outgoing, bundler, source)
950 963
951 964 def getchangegroup(repo, source, outgoing, bundlecaps=None,
952 965 version='01'):
953 966 """Like getbundle, but taking a discovery.outgoing as an argument.
954 967
955 968 This is only implemented for local repos and reuses potentially
956 969 precomputed sets in outgoing."""
957 970 if not outgoing.missing:
958 971 return None
959 972 bundler = getbundler(version, repo, bundlecaps)
960 973 return getsubset(repo, outgoing, bundler, source)
961 974
962 975 def getlocalchangegroup(repo, *args, **kwargs):
963 976 repo.ui.deprecwarn('getlocalchangegroup is deprecated, use getchangegroup',
964 977 '4.3')
965 978 return getchangegroup(repo, *args, **kwargs)
966 979
967 980 def changegroup(repo, basenodes, source):
968 981 # to avoid a race we use changegroupsubset() (issue1320)
969 982 return changegroupsubset(repo, basenodes, repo.heads(), source)
970 983
971 984 def _addchangegroupfiles(repo, source, revmap, trp, expectedfiles, needfiles):
972 985 revisions = 0
973 986 files = 0
974 987 for chunkdata in iter(source.filelogheader, {}):
975 988 files += 1
976 989 f = chunkdata["filename"]
977 990 repo.ui.debug("adding %s revisions\n" % f)
978 991 repo.ui.progress(_('files'), files, unit=_('files'),
979 992 total=expectedfiles)
980 993 fl = repo.file(f)
981 994 o = len(fl)
982 995 try:
983 996 if not fl.addgroup(source, revmap, trp):
984 997 raise error.Abort(_("received file revlog group is empty"))
985 998 except error.CensoredBaseError as e:
986 999 raise error.Abort(_("received delta base is censored: %s") % e)
987 1000 revisions += len(fl) - o
988 1001 if f in needfiles:
989 1002 needs = needfiles[f]
990 1003 for new in xrange(o, len(fl)):
991 1004 n = fl.node(new)
992 1005 if n in needs:
993 1006 needs.remove(n)
994 1007 else:
995 1008 raise error.Abort(
996 1009 _("received spurious file revlog entry"))
997 1010 if not needs:
998 1011 del needfiles[f]
999 1012 repo.ui.progress(_('files'), None)
1000 1013
1001 1014 for f, needs in needfiles.iteritems():
1002 1015 fl = repo.file(f)
1003 1016 for n in needs:
1004 1017 try:
1005 1018 fl.rev(n)
1006 1019 except error.LookupError:
1007 1020 raise error.Abort(
1008 1021 _('missing file data for %s:%s - run hg verify') %
1009 1022 (f, hex(n)))
1010 1023
1011 1024 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now