##// END OF EJS Templates
changegroup: rename getsubsetraw to makestream...
Durham Goode -
r34105:92f1e2be default
parent child Browse files
Show More
@@ -1,982 +1,980 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullrev,
19 19 short,
20 20 )
21 21
22 22 from . import (
23 23 dagutil,
24 24 error,
25 25 mdiff,
26 26 phases,
27 27 pycompat,
28 28 util,
29 29 )
30 30
31 31 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
32 32 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
33 33 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
34 34
35 35 def readexactly(stream, n):
36 36 '''read n bytes from stream.read and abort if less was available'''
37 37 s = stream.read(n)
38 38 if len(s) < n:
39 39 raise error.Abort(_("stream ended unexpectedly"
40 40 " (got %d bytes, expected %d)")
41 41 % (len(s), n))
42 42 return s
43 43
44 44 def getchunk(stream):
45 45 """return the next chunk from stream as a string"""
46 46 d = readexactly(stream, 4)
47 47 l = struct.unpack(">l", d)[0]
48 48 if l <= 4:
49 49 if l:
50 50 raise error.Abort(_("invalid chunk length %d") % l)
51 51 return ""
52 52 return readexactly(stream, l - 4)
53 53
54 54 def chunkheader(length):
55 55 """return a changegroup chunk header (string)"""
56 56 return struct.pack(">l", length + 4)
57 57
58 58 def closechunk():
59 59 """return a changegroup chunk header (string) for a zero-length chunk"""
60 60 return struct.pack(">l", 0)
61 61
62 62 def writechunks(ui, chunks, filename, vfs=None):
63 63 """Write chunks to a file and return its filename.
64 64
65 65 The stream is assumed to be a bundle file.
66 66 Existing files will not be overwritten.
67 67 If no filename is specified, a temporary file is created.
68 68 """
69 69 fh = None
70 70 cleanup = None
71 71 try:
72 72 if filename:
73 73 if vfs:
74 74 fh = vfs.open(filename, "wb")
75 75 else:
76 76 # Increase default buffer size because default is usually
77 77 # small (4k is common on Linux).
78 78 fh = open(filename, "wb", 131072)
79 79 else:
80 80 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
81 81 fh = os.fdopen(fd, pycompat.sysstr("wb"))
82 82 cleanup = filename
83 83 for c in chunks:
84 84 fh.write(c)
85 85 cleanup = None
86 86 return filename
87 87 finally:
88 88 if fh is not None:
89 89 fh.close()
90 90 if cleanup is not None:
91 91 if filename and vfs:
92 92 vfs.unlink(cleanup)
93 93 else:
94 94 os.unlink(cleanup)
95 95
96 96 class cg1unpacker(object):
97 97 """Unpacker for cg1 changegroup streams.
98 98
99 99 A changegroup unpacker handles the framing of the revision data in
100 100 the wire format. Most consumers will want to use the apply()
101 101 method to add the changes from the changegroup to a repository.
102 102
103 103 If you're forwarding a changegroup unmodified to another consumer,
104 104 use getchunks(), which returns an iterator of changegroup
105 105 chunks. This is mostly useful for cases where you need to know the
106 106 data stream has ended by observing the end of the changegroup.
107 107
108 108 deltachunk() is useful only if you're applying delta data. Most
109 109 consumers should prefer apply() instead.
110 110
111 111 A few other public methods exist. Those are used only for
112 112 bundlerepo and some debug commands - their use is discouraged.
113 113 """
114 114 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
115 115 deltaheadersize = struct.calcsize(deltaheader)
116 116 version = '01'
117 117 _grouplistcount = 1 # One list of files after the manifests
118 118
119 119 def __init__(self, fh, alg, extras=None):
120 120 if alg is None:
121 121 alg = 'UN'
122 122 if alg not in util.compengines.supportedbundletypes:
123 123 raise error.Abort(_('unknown stream compression type: %s')
124 124 % alg)
125 125 if alg == 'BZ':
126 126 alg = '_truncatedBZ'
127 127
128 128 compengine = util.compengines.forbundletype(alg)
129 129 self._stream = compengine.decompressorreader(fh)
130 130 self._type = alg
131 131 self.extras = extras or {}
132 132 self.callback = None
133 133
134 134 # These methods (compressed, read, seek, tell) all appear to only
135 135 # be used by bundlerepo, but it's a little hard to tell.
136 136 def compressed(self):
137 137 return self._type is not None and self._type != 'UN'
138 138 def read(self, l):
139 139 return self._stream.read(l)
140 140 def seek(self, pos):
141 141 return self._stream.seek(pos)
142 142 def tell(self):
143 143 return self._stream.tell()
144 144 def close(self):
145 145 return self._stream.close()
146 146
147 147 def _chunklength(self):
148 148 d = readexactly(self._stream, 4)
149 149 l = struct.unpack(">l", d)[0]
150 150 if l <= 4:
151 151 if l:
152 152 raise error.Abort(_("invalid chunk length %d") % l)
153 153 return 0
154 154 if self.callback:
155 155 self.callback()
156 156 return l - 4
157 157
158 158 def changelogheader(self):
159 159 """v10 does not have a changelog header chunk"""
160 160 return {}
161 161
162 162 def manifestheader(self):
163 163 """v10 does not have a manifest header chunk"""
164 164 return {}
165 165
166 166 def filelogheader(self):
167 167 """return the header of the filelogs chunk, v10 only has the filename"""
168 168 l = self._chunklength()
169 169 if not l:
170 170 return {}
171 171 fname = readexactly(self._stream, l)
172 172 return {'filename': fname}
173 173
174 174 def _deltaheader(self, headertuple, prevnode):
175 175 node, p1, p2, cs = headertuple
176 176 if prevnode is None:
177 177 deltabase = p1
178 178 else:
179 179 deltabase = prevnode
180 180 flags = 0
181 181 return node, p1, p2, deltabase, cs, flags
182 182
183 183 def deltachunk(self, prevnode):
184 184 l = self._chunklength()
185 185 if not l:
186 186 return {}
187 187 headerdata = readexactly(self._stream, self.deltaheadersize)
188 188 header = struct.unpack(self.deltaheader, headerdata)
189 189 delta = readexactly(self._stream, l - self.deltaheadersize)
190 190 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
191 191 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
192 192 'deltabase': deltabase, 'delta': delta, 'flags': flags}
193 193
194 194 def getchunks(self):
195 195 """returns all the chunks contains in the bundle
196 196
197 197 Used when you need to forward the binary stream to a file or another
198 198 network API. To do so, it parse the changegroup data, otherwise it will
199 199 block in case of sshrepo because it don't know the end of the stream.
200 200 """
201 201 # For changegroup 1 and 2, we expect 3 parts: changelog, manifestlog,
202 202 # and a list of filelogs. For changegroup 3, we expect 4 parts:
203 203 # changelog, manifestlog, a list of tree manifestlogs, and a list of
204 204 # filelogs.
205 205 #
206 206 # Changelog and manifestlog parts are terminated with empty chunks. The
207 207 # tree and file parts are a list of entry sections. Each entry section
208 208 # is a series of chunks terminating in an empty chunk. The list of these
209 209 # entry sections is terminated in yet another empty chunk, so we know
210 210 # we've reached the end of the tree/file list when we reach an empty
211 211 # chunk that was proceeded by no non-empty chunks.
212 212
213 213 parts = 0
214 214 while parts < 2 + self._grouplistcount:
215 215 noentries = True
216 216 while True:
217 217 chunk = getchunk(self)
218 218 if not chunk:
219 219 # The first two empty chunks represent the end of the
220 220 # changelog and the manifestlog portions. The remaining
221 221 # empty chunks represent either A) the end of individual
222 222 # tree or file entries in the file list, or B) the end of
223 223 # the entire list. It's the end of the entire list if there
224 224 # were no entries (i.e. noentries is True).
225 225 if parts < 2:
226 226 parts += 1
227 227 elif noentries:
228 228 parts += 1
229 229 break
230 230 noentries = False
231 231 yield chunkheader(len(chunk))
232 232 pos = 0
233 233 while pos < len(chunk):
234 234 next = pos + 2**20
235 235 yield chunk[pos:next]
236 236 pos = next
237 237 yield closechunk()
238 238
239 239 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
240 240 # We know that we'll never have more manifests than we had
241 241 # changesets.
242 242 self.callback = prog(_('manifests'), numchanges)
243 243 # no need to check for empty manifest group here:
244 244 # if the result of the merge of 1 and 2 is the same in 3 and 4,
245 245 # no new manifest will be created and the manifest group will
246 246 # be empty during the pull
247 247 self.manifestheader()
248 248 repo.manifestlog._revlog.addgroup(self, revmap, trp)
249 249 repo.ui.progress(_('manifests'), None)
250 250 self.callback = None
251 251
252 252 def apply(self, repo, tr, srctype, url, targetphase=phases.draft,
253 253 expectedtotal=None):
254 254 """Add the changegroup returned by source.read() to this repo.
255 255 srctype is a string like 'push', 'pull', or 'unbundle'. url is
256 256 the URL of the repo where this changegroup is coming from.
257 257
258 258 Return an integer summarizing the change to this repo:
259 259 - nothing changed or no source: 0
260 260 - more heads than before: 1+added heads (2..n)
261 261 - fewer heads than before: -1-removed heads (-2..-n)
262 262 - number of heads stays the same: 1
263 263 """
264 264 repo = repo.unfiltered()
265 265 def csmap(x):
266 266 repo.ui.debug("add changeset %s\n" % short(x))
267 267 return len(cl)
268 268
269 269 def revmap(x):
270 270 return cl.rev(x)
271 271
272 272 changesets = files = revisions = 0
273 273
274 274 try:
275 275 # The transaction may already carry source information. In this
276 276 # case we use the top level data. We overwrite the argument
277 277 # because we need to use the top level value (if they exist)
278 278 # in this function.
279 279 srctype = tr.hookargs.setdefault('source', srctype)
280 280 url = tr.hookargs.setdefault('url', url)
281 281 repo.hook('prechangegroup',
282 282 throw=True, **pycompat.strkwargs(tr.hookargs))
283 283
284 284 # write changelog data to temp files so concurrent readers
285 285 # will not see an inconsistent view
286 286 cl = repo.changelog
287 287 cl.delayupdate(tr)
288 288 oldheads = set(cl.heads())
289 289
290 290 trp = weakref.proxy(tr)
291 291 # pull off the changeset group
292 292 repo.ui.status(_("adding changesets\n"))
293 293 clstart = len(cl)
294 294 class prog(object):
295 295 def __init__(self, step, total):
296 296 self._step = step
297 297 self._total = total
298 298 self._count = 1
299 299 def __call__(self):
300 300 repo.ui.progress(self._step, self._count, unit=_('chunks'),
301 301 total=self._total)
302 302 self._count += 1
303 303 self.callback = prog(_('changesets'), expectedtotal)
304 304
305 305 efiles = set()
306 306 def onchangelog(cl, node):
307 307 efiles.update(cl.readfiles(node))
308 308
309 309 self.changelogheader()
310 310 cgnodes = cl.addgroup(self, csmap, trp, addrevisioncb=onchangelog)
311 311 efiles = len(efiles)
312 312
313 313 if not cgnodes:
314 314 repo.ui.develwarn('applied empty changegroup',
315 315 config='empty-changegroup')
316 316 clend = len(cl)
317 317 changesets = clend - clstart
318 318 repo.ui.progress(_('changesets'), None)
319 319 self.callback = None
320 320
321 321 # pull off the manifest group
322 322 repo.ui.status(_("adding manifests\n"))
323 323 self._unpackmanifests(repo, revmap, trp, prog, changesets)
324 324
325 325 needfiles = {}
326 326 if repo.ui.configbool('server', 'validate'):
327 327 cl = repo.changelog
328 328 ml = repo.manifestlog
329 329 # validate incoming csets have their manifests
330 330 for cset in xrange(clstart, clend):
331 331 mfnode = cl.changelogrevision(cset).manifest
332 332 mfest = ml[mfnode].readdelta()
333 333 # store file cgnodes we must see
334 334 for f, n in mfest.iteritems():
335 335 needfiles.setdefault(f, set()).add(n)
336 336
337 337 # process the files
338 338 repo.ui.status(_("adding file changes\n"))
339 339 newrevs, newfiles = _addchangegroupfiles(
340 340 repo, self, revmap, trp, efiles, needfiles)
341 341 revisions += newrevs
342 342 files += newfiles
343 343
344 344 deltaheads = 0
345 345 if oldheads:
346 346 heads = cl.heads()
347 347 deltaheads = len(heads) - len(oldheads)
348 348 for h in heads:
349 349 if h not in oldheads and repo[h].closesbranch():
350 350 deltaheads -= 1
351 351 htext = ""
352 352 if deltaheads:
353 353 htext = _(" (%+d heads)") % deltaheads
354 354
355 355 repo.ui.status(_("added %d changesets"
356 356 " with %d changes to %d files%s\n")
357 357 % (changesets, revisions, files, htext))
358 358 repo.invalidatevolatilesets()
359 359
360 360 if changesets > 0:
361 361 if 'node' not in tr.hookargs:
362 362 tr.hookargs['node'] = hex(cl.node(clstart))
363 363 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
364 364 hookargs = dict(tr.hookargs)
365 365 else:
366 366 hookargs = dict(tr.hookargs)
367 367 hookargs['node'] = hex(cl.node(clstart))
368 368 hookargs['node_last'] = hex(cl.node(clend - 1))
369 369 repo.hook('pretxnchangegroup',
370 370 throw=True, **pycompat.strkwargs(hookargs))
371 371
372 372 added = [cl.node(r) for r in xrange(clstart, clend)]
373 373 phaseall = None
374 374 if srctype in ('push', 'serve'):
375 375 # Old servers can not push the boundary themselves.
376 376 # New servers won't push the boundary if changeset already
377 377 # exists locally as secret
378 378 #
379 379 # We should not use added here but the list of all change in
380 380 # the bundle
381 381 if repo.publishing():
382 382 targetphase = phaseall = phases.public
383 383 else:
384 384 # closer target phase computation
385 385
386 386 # Those changesets have been pushed from the
387 387 # outside, their phases are going to be pushed
388 388 # alongside. Therefor `targetphase` is
389 389 # ignored.
390 390 targetphase = phaseall = phases.draft
391 391 if added:
392 392 phases.registernew(repo, tr, targetphase, added)
393 393 if phaseall is not None:
394 394 phases.advanceboundary(repo, tr, phaseall, cgnodes)
395 395
396 396 if changesets > 0:
397 397
398 398 def runhooks():
399 399 # These hooks run when the lock releases, not when the
400 400 # transaction closes. So it's possible for the changelog
401 401 # to have changed since we last saw it.
402 402 if clstart >= len(repo):
403 403 return
404 404
405 405 repo.hook("changegroup", **pycompat.strkwargs(hookargs))
406 406
407 407 for n in added:
408 408 args = hookargs.copy()
409 409 args['node'] = hex(n)
410 410 del args['node_last']
411 411 repo.hook("incoming", **pycompat.strkwargs(args))
412 412
413 413 newheads = [h for h in repo.heads()
414 414 if h not in oldheads]
415 415 repo.ui.log("incoming",
416 416 "%s incoming changes - new heads: %s\n",
417 417 len(added),
418 418 ', '.join([hex(c[:6]) for c in newheads]))
419 419
420 420 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
421 421 lambda tr: repo._afterlock(runhooks))
422 422 finally:
423 423 repo.ui.flush()
424 424 # never return 0 here:
425 425 if deltaheads < 0:
426 426 ret = deltaheads - 1
427 427 else:
428 428 ret = deltaheads + 1
429 429 return ret
430 430
431 431 class cg2unpacker(cg1unpacker):
432 432 """Unpacker for cg2 streams.
433 433
434 434 cg2 streams add support for generaldelta, so the delta header
435 435 format is slightly different. All other features about the data
436 436 remain the same.
437 437 """
438 438 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
439 439 deltaheadersize = struct.calcsize(deltaheader)
440 440 version = '02'
441 441
442 442 def _deltaheader(self, headertuple, prevnode):
443 443 node, p1, p2, deltabase, cs = headertuple
444 444 flags = 0
445 445 return node, p1, p2, deltabase, cs, flags
446 446
447 447 class cg3unpacker(cg2unpacker):
448 448 """Unpacker for cg3 streams.
449 449
450 450 cg3 streams add support for exchanging treemanifests and revlog
451 451 flags. It adds the revlog flags to the delta header and an empty chunk
452 452 separating manifests and files.
453 453 """
454 454 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
455 455 deltaheadersize = struct.calcsize(deltaheader)
456 456 version = '03'
457 457 _grouplistcount = 2 # One list of manifests and one list of files
458 458
459 459 def _deltaheader(self, headertuple, prevnode):
460 460 node, p1, p2, deltabase, cs, flags = headertuple
461 461 return node, p1, p2, deltabase, cs, flags
462 462
463 463 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
464 464 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog,
465 465 numchanges)
466 466 for chunkdata in iter(self.filelogheader, {}):
467 467 # If we get here, there are directory manifests in the changegroup
468 468 d = chunkdata["filename"]
469 469 repo.ui.debug("adding %s revisions\n" % d)
470 470 dirlog = repo.manifestlog._revlog.dirlog(d)
471 471 if not dirlog.addgroup(self, revmap, trp):
472 472 raise error.Abort(_("received dir revlog group is empty"))
473 473
474 474 class headerlessfixup(object):
475 475 def __init__(self, fh, h):
476 476 self._h = h
477 477 self._fh = fh
478 478 def read(self, n):
479 479 if self._h:
480 480 d, self._h = self._h[:n], self._h[n:]
481 481 if len(d) < n:
482 482 d += readexactly(self._fh, n - len(d))
483 483 return d
484 484 return readexactly(self._fh, n)
485 485
486 486 class cg1packer(object):
487 487 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
488 488 version = '01'
489 489 def __init__(self, repo, bundlecaps=None):
490 490 """Given a source repo, construct a bundler.
491 491
492 492 bundlecaps is optional and can be used to specify the set of
493 493 capabilities which can be used to build the bundle. While bundlecaps is
494 494 unused in core Mercurial, extensions rely on this feature to communicate
495 495 capabilities to customize the changegroup packer.
496 496 """
497 497 # Set of capabilities we can use to build the bundle.
498 498 if bundlecaps is None:
499 499 bundlecaps = set()
500 500 self._bundlecaps = bundlecaps
501 501 # experimental config: bundle.reorder
502 502 reorder = repo.ui.config('bundle', 'reorder')
503 503 if reorder == 'auto':
504 504 reorder = None
505 505 else:
506 506 reorder = util.parsebool(reorder)
507 507 self._repo = repo
508 508 self._reorder = reorder
509 509 self._progress = repo.ui.progress
510 510 if self._repo.ui.verbose and not self._repo.ui.debugflag:
511 511 self._verbosenote = self._repo.ui.note
512 512 else:
513 513 self._verbosenote = lambda s: None
514 514
515 515 def close(self):
516 516 return closechunk()
517 517
518 518 def fileheader(self, fname):
519 519 return chunkheader(len(fname)) + fname
520 520
521 521 # Extracted both for clarity and for overriding in extensions.
522 522 def _sortgroup(self, revlog, nodelist, lookup):
523 523 """Sort nodes for change group and turn them into revnums."""
524 524 # for generaldelta revlogs, we linearize the revs; this will both be
525 525 # much quicker and generate a much smaller bundle
526 526 if (revlog._generaldelta and self._reorder is None) or self._reorder:
527 527 dag = dagutil.revlogdag(revlog)
528 528 return dag.linearize(set(revlog.rev(n) for n in nodelist))
529 529 else:
530 530 return sorted([revlog.rev(n) for n in nodelist])
531 531
532 532 def group(self, nodelist, revlog, lookup, units=None):
533 533 """Calculate a delta group, yielding a sequence of changegroup chunks
534 534 (strings).
535 535
536 536 Given a list of changeset revs, return a set of deltas and
537 537 metadata corresponding to nodes. The first delta is
538 538 first parent(nodelist[0]) -> nodelist[0], the receiver is
539 539 guaranteed to have this parent as it has all history before
540 540 these changesets. In the case firstparent is nullrev the
541 541 changegroup starts with a full revision.
542 542
543 543 If units is not None, progress detail will be generated, units specifies
544 544 the type of revlog that is touched (changelog, manifest, etc.).
545 545 """
546 546 # if we don't have any revisions touched by these changesets, bail
547 547 if len(nodelist) == 0:
548 548 yield self.close()
549 549 return
550 550
551 551 revs = self._sortgroup(revlog, nodelist, lookup)
552 552
553 553 # add the parent of the first rev
554 554 p = revlog.parentrevs(revs[0])[0]
555 555 revs.insert(0, p)
556 556
557 557 # build deltas
558 558 total = len(revs) - 1
559 559 msgbundling = _('bundling')
560 560 for r in xrange(len(revs) - 1):
561 561 if units is not None:
562 562 self._progress(msgbundling, r + 1, unit=units, total=total)
563 563 prev, curr = revs[r], revs[r + 1]
564 564 linknode = lookup(revlog.node(curr))
565 565 for c in self.revchunk(revlog, curr, prev, linknode):
566 566 yield c
567 567
568 568 if units is not None:
569 569 self._progress(msgbundling, None)
570 570 yield self.close()
571 571
572 572 # filter any nodes that claim to be part of the known set
573 573 def prune(self, revlog, missing, commonrevs):
574 574 rr, rl = revlog.rev, revlog.linkrev
575 575 return [n for n in missing if rl(rr(n)) not in commonrevs]
576 576
577 577 def _packmanifests(self, dir, mfnodes, lookuplinknode):
578 578 """Pack flat manifests into a changegroup stream."""
579 579 assert not dir
580 580 for chunk in self.group(mfnodes, self._repo.manifestlog._revlog,
581 581 lookuplinknode, units=_('manifests')):
582 582 yield chunk
583 583
584 584 def _manifestsdone(self):
585 585 return ''
586 586
587 587 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
588 588 '''yield a sequence of changegroup chunks (strings)'''
589 589 repo = self._repo
590 590 cl = repo.changelog
591 591
592 592 clrevorder = {}
593 593 mfs = {} # needed manifests
594 594 fnodes = {} # needed file nodes
595 595 changedfiles = set()
596 596
597 597 # Callback for the changelog, used to collect changed files and manifest
598 598 # nodes.
599 599 # Returns the linkrev node (identity in the changelog case).
600 600 def lookupcl(x):
601 601 c = cl.read(x)
602 602 clrevorder[x] = len(clrevorder)
603 603 n = c[0]
604 604 # record the first changeset introducing this manifest version
605 605 mfs.setdefault(n, x)
606 606 # Record a complete list of potentially-changed files in
607 607 # this manifest.
608 608 changedfiles.update(c[3])
609 609 return x
610 610
611 611 self._verbosenote(_('uncompressed size of bundle content:\n'))
612 612 size = 0
613 613 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
614 614 size += len(chunk)
615 615 yield chunk
616 616 self._verbosenote(_('%8.i (changelog)\n') % size)
617 617
618 618 # We need to make sure that the linkrev in the changegroup refers to
619 619 # the first changeset that introduced the manifest or file revision.
620 620 # The fastpath is usually safer than the slowpath, because the filelogs
621 621 # are walked in revlog order.
622 622 #
623 623 # When taking the slowpath with reorder=None and the manifest revlog
624 624 # uses generaldelta, the manifest may be walked in the "wrong" order.
625 625 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
626 626 # cc0ff93d0c0c).
627 627 #
628 628 # When taking the fastpath, we are only vulnerable to reordering
629 629 # of the changelog itself. The changelog never uses generaldelta, so
630 630 # it is only reordered when reorder=True. To handle this case, we
631 631 # simply take the slowpath, which already has the 'clrevorder' logic.
632 632 # This was also fixed in cc0ff93d0c0c.
633 633 fastpathlinkrev = fastpathlinkrev and not self._reorder
634 634 # Treemanifests don't work correctly with fastpathlinkrev
635 635 # either, because we don't discover which directory nodes to
636 636 # send along with files. This could probably be fixed.
637 637 fastpathlinkrev = fastpathlinkrev and (
638 638 'treemanifest' not in repo.requirements)
639 639
640 640 for chunk in self.generatemanifests(commonrevs, clrevorder,
641 641 fastpathlinkrev, mfs, fnodes):
642 642 yield chunk
643 643 mfs.clear()
644 644 clrevs = set(cl.rev(x) for x in clnodes)
645 645
646 646 if not fastpathlinkrev:
647 647 def linknodes(unused, fname):
648 648 return fnodes.get(fname, {})
649 649 else:
650 650 cln = cl.node
651 651 def linknodes(filerevlog, fname):
652 652 llr = filerevlog.linkrev
653 653 fln = filerevlog.node
654 654 revs = ((r, llr(r)) for r in filerevlog)
655 655 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
656 656
657 657 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
658 658 source):
659 659 yield chunk
660 660
661 661 yield self.close()
662 662
663 663 if clnodes:
664 664 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
665 665
666 666 def generatemanifests(self, commonrevs, clrevorder, fastpathlinkrev, mfs,
667 667 fnodes):
668 668 repo = self._repo
669 669 mfl = repo.manifestlog
670 670 dirlog = mfl._revlog.dirlog
671 671 tmfnodes = {'': mfs}
672 672
673 673 # Callback for the manifest, used to collect linkrevs for filelog
674 674 # revisions.
675 675 # Returns the linkrev node (collected in lookupcl).
676 676 def makelookupmflinknode(dir):
677 677 if fastpathlinkrev:
678 678 assert not dir
679 679 return mfs.__getitem__
680 680
681 681 def lookupmflinknode(x):
682 682 """Callback for looking up the linknode for manifests.
683 683
684 684 Returns the linkrev node for the specified manifest.
685 685
686 686 SIDE EFFECT:
687 687
688 688 1) fclnodes gets populated with the list of relevant
689 689 file nodes if we're not using fastpathlinkrev
690 690 2) When treemanifests are in use, collects treemanifest nodes
691 691 to send
692 692
693 693 Note that this means manifests must be completely sent to
694 694 the client before you can trust the list of files and
695 695 treemanifests to send.
696 696 """
697 697 clnode = tmfnodes[dir][x]
698 698 mdata = mfl.get(dir, x).readfast(shallow=True)
699 699 for p, n, fl in mdata.iterentries():
700 700 if fl == 't': # subdirectory manifest
701 701 subdir = dir + p + '/'
702 702 tmfclnodes = tmfnodes.setdefault(subdir, {})
703 703 tmfclnode = tmfclnodes.setdefault(n, clnode)
704 704 if clrevorder[clnode] < clrevorder[tmfclnode]:
705 705 tmfclnodes[n] = clnode
706 706 else:
707 707 f = dir + p
708 708 fclnodes = fnodes.setdefault(f, {})
709 709 fclnode = fclnodes.setdefault(n, clnode)
710 710 if clrevorder[clnode] < clrevorder[fclnode]:
711 711 fclnodes[n] = clnode
712 712 return clnode
713 713 return lookupmflinknode
714 714
715 715 size = 0
716 716 while tmfnodes:
717 717 dir = min(tmfnodes)
718 718 nodes = tmfnodes[dir]
719 719 prunednodes = self.prune(dirlog(dir), nodes, commonrevs)
720 720 if not dir or prunednodes:
721 721 for x in self._packmanifests(dir, prunednodes,
722 722 makelookupmflinknode(dir)):
723 723 size += len(x)
724 724 yield x
725 725 del tmfnodes[dir]
726 726 self._verbosenote(_('%8.i (manifests)\n') % size)
727 727 yield self._manifestsdone()
728 728
729 729 # The 'source' parameter is useful for extensions
730 730 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
731 731 repo = self._repo
732 732 progress = self._progress
733 733 msgbundling = _('bundling')
734 734
735 735 total = len(changedfiles)
736 736 # for progress output
737 737 msgfiles = _('files')
738 738 for i, fname in enumerate(sorted(changedfiles)):
739 739 filerevlog = repo.file(fname)
740 740 if not filerevlog:
741 741 raise error.Abort(_("empty or missing revlog for %s") % fname)
742 742
743 743 linkrevnodes = linknodes(filerevlog, fname)
744 744 # Lookup for filenodes, we collected the linkrev nodes above in the
745 745 # fastpath case and with lookupmf in the slowpath case.
746 746 def lookupfilelog(x):
747 747 return linkrevnodes[x]
748 748
749 749 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
750 750 if filenodes:
751 751 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
752 752 total=total)
753 753 h = self.fileheader(fname)
754 754 size = len(h)
755 755 yield h
756 756 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
757 757 size += len(chunk)
758 758 yield chunk
759 759 self._verbosenote(_('%8.i %s\n') % (size, fname))
760 760 progress(msgbundling, None)
761 761
762 762 def deltaparent(self, revlog, rev, p1, p2, prev):
763 763 return prev
764 764
765 765 def revchunk(self, revlog, rev, prev, linknode):
766 766 node = revlog.node(rev)
767 767 p1, p2 = revlog.parentrevs(rev)
768 768 base = self.deltaparent(revlog, rev, p1, p2, prev)
769 769
770 770 prefix = ''
771 771 if revlog.iscensored(base) or revlog.iscensored(rev):
772 772 try:
773 773 delta = revlog.revision(node, raw=True)
774 774 except error.CensoredNodeError as e:
775 775 delta = e.tombstone
776 776 if base == nullrev:
777 777 prefix = mdiff.trivialdiffheader(len(delta))
778 778 else:
779 779 baselen = revlog.rawsize(base)
780 780 prefix = mdiff.replacediffheader(baselen, len(delta))
781 781 elif base == nullrev:
782 782 delta = revlog.revision(node, raw=True)
783 783 prefix = mdiff.trivialdiffheader(len(delta))
784 784 else:
785 785 delta = revlog.revdiff(base, rev)
786 786 p1n, p2n = revlog.parents(node)
787 787 basenode = revlog.node(base)
788 788 flags = revlog.flags(rev)
789 789 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
790 790 meta += prefix
791 791 l = len(meta) + len(delta)
792 792 yield chunkheader(l)
793 793 yield meta
794 794 yield delta
795 795 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
796 796 # do nothing with basenode, it is implicitly the previous one in HG10
797 797 # do nothing with flags, it is implicitly 0 for cg1 and cg2
798 798 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
799 799
800 800 class cg2packer(cg1packer):
801 801 version = '02'
802 802 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
803 803
804 804 def __init__(self, repo, bundlecaps=None):
805 805 super(cg2packer, self).__init__(repo, bundlecaps)
806 806 if self._reorder is None:
807 807 # Since generaldelta is directly supported by cg2, reordering
808 808 # generally doesn't help, so we disable it by default (treating
809 809 # bundle.reorder=auto just like bundle.reorder=False).
810 810 self._reorder = False
811 811
812 812 def deltaparent(self, revlog, rev, p1, p2, prev):
813 813 dp = revlog.deltaparent(rev)
814 814 if dp == nullrev and revlog.storedeltachains:
815 815 # Avoid sending full revisions when delta parent is null. Pick prev
816 816 # in that case. It's tempting to pick p1 in this case, as p1 will
817 817 # be smaller in the common case. However, computing a delta against
818 818 # p1 may require resolving the raw text of p1, which could be
819 819 # expensive. The revlog caches should have prev cached, meaning
820 820 # less CPU for changegroup generation. There is likely room to add
821 821 # a flag and/or config option to control this behavior.
822 822 return prev
823 823 elif dp == nullrev:
824 824 # revlog is configured to use full snapshot for a reason,
825 825 # stick to full snapshot.
826 826 return nullrev
827 827 elif dp not in (p1, p2, prev):
828 828 # Pick prev when we can't be sure remote has the base revision.
829 829 return prev
830 830 else:
831 831 return dp
832 832
833 833 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
834 834 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
835 835 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
836 836
837 837 class cg3packer(cg2packer):
838 838 version = '03'
839 839 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
840 840
841 841 def _packmanifests(self, dir, mfnodes, lookuplinknode):
842 842 if dir:
843 843 yield self.fileheader(dir)
844 844
845 845 dirlog = self._repo.manifestlog._revlog.dirlog(dir)
846 846 for chunk in self.group(mfnodes, dirlog, lookuplinknode,
847 847 units=_('manifests')):
848 848 yield chunk
849 849
850 850 def _manifestsdone(self):
851 851 return self.close()
852 852
853 853 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
854 854 return struct.pack(
855 855 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
856 856
857 857 _packermap = {'01': (cg1packer, cg1unpacker),
858 858 # cg2 adds support for exchanging generaldelta
859 859 '02': (cg2packer, cg2unpacker),
860 860 # cg3 adds support for exchanging revlog flags and treemanifests
861 861 '03': (cg3packer, cg3unpacker),
862 862 }
863 863
864 864 def allsupportedversions(repo):
865 865 versions = set(_packermap.keys())
866 866 if not (repo.ui.configbool('experimental', 'changegroup3') or
867 867 repo.ui.configbool('experimental', 'treemanifest') or
868 868 'treemanifest' in repo.requirements):
869 869 versions.discard('03')
870 870 return versions
871 871
872 872 # Changegroup versions that can be applied to the repo
873 873 def supportedincomingversions(repo):
874 874 return allsupportedversions(repo)
875 875
876 876 # Changegroup versions that can be created from the repo
877 877 def supportedoutgoingversions(repo):
878 878 versions = allsupportedversions(repo)
879 879 if 'treemanifest' in repo.requirements:
880 880 # Versions 01 and 02 support only flat manifests and it's just too
881 881 # expensive to convert between the flat manifest and tree manifest on
882 882 # the fly. Since tree manifests are hashed differently, all of history
883 883 # would have to be converted. Instead, we simply don't even pretend to
884 884 # support versions 01 and 02.
885 885 versions.discard('01')
886 886 versions.discard('02')
887 887 return versions
888 888
889 889 def safeversion(repo):
890 890 # Finds the smallest version that it's safe to assume clients of the repo
891 891 # will support. For example, all hg versions that support generaldelta also
892 892 # support changegroup 02.
893 893 versions = supportedoutgoingversions(repo)
894 894 if 'generaldelta' in repo.requirements:
895 895 versions.discard('01')
896 896 assert versions
897 897 return min(versions)
898 898
899 899 def getbundler(version, repo, bundlecaps=None):
900 900 assert version in supportedoutgoingversions(repo)
901 901 return _packermap[version][0](repo, bundlecaps)
902 902
903 903 def getunbundler(version, fh, alg, extras=None):
904 904 return _packermap[version][1](fh, alg, extras=extras)
905 905
906 906 def _changegroupinfo(repo, nodes, source):
907 907 if repo.ui.verbose or source == 'bundle':
908 908 repo.ui.status(_("%d changesets found\n") % len(nodes))
909 909 if repo.ui.debugflag:
910 910 repo.ui.debug("list of changesets:\n")
911 911 for node in nodes:
912 912 repo.ui.debug("%s\n" % hex(node))
913 913
914 def makestream(repo, outgoing, version, source, fastpath=False,
915 bundlecaps=None):
916 bundler = getbundler(version, repo, bundlecaps=bundlecaps)
917 return getsubsetraw(repo, outgoing, bundler, source, fastpath=fastpath)
918
919 914 def makechangegroup(repo, outgoing, version, source, fastpath=False,
920 915 bundlecaps=None):
921 916 cgstream = makestream(repo, outgoing, version, source,
922 917 fastpath=fastpath, bundlecaps=bundlecaps)
923 918 return getunbundler(version, util.chunkbuffer(cgstream), None,
924 919 {'clcount': len(outgoing.missing) })
925 920
926 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
921 def makestream(repo, outgoing, version, source, fastpath=False,
922 bundlecaps=None):
923 bundler = getbundler(version, repo, bundlecaps=bundlecaps)
924
927 925 repo = repo.unfiltered()
928 926 commonrevs = outgoing.common
929 927 csets = outgoing.missing
930 928 heads = outgoing.missingheads
931 929 # We go through the fast path if we get told to, or if all (unfiltered
932 930 # heads have been requested (since we then know there all linkrevs will
933 931 # be pulled by the client).
934 932 heads.sort()
935 933 fastpathlinkrev = fastpath or (
936 934 repo.filtername is None and heads == sorted(repo.heads()))
937 935
938 936 repo.hook('preoutgoing', throw=True, source=source)
939 937 _changegroupinfo(repo, csets, source)
940 938 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
941 939
942 940 def _addchangegroupfiles(repo, source, revmap, trp, expectedfiles, needfiles):
943 941 revisions = 0
944 942 files = 0
945 943 for chunkdata in iter(source.filelogheader, {}):
946 944 files += 1
947 945 f = chunkdata["filename"]
948 946 repo.ui.debug("adding %s revisions\n" % f)
949 947 repo.ui.progress(_('files'), files, unit=_('files'),
950 948 total=expectedfiles)
951 949 fl = repo.file(f)
952 950 o = len(fl)
953 951 try:
954 952 if not fl.addgroup(source, revmap, trp):
955 953 raise error.Abort(_("received file revlog group is empty"))
956 954 except error.CensoredBaseError as e:
957 955 raise error.Abort(_("received delta base is censored: %s") % e)
958 956 revisions += len(fl) - o
959 957 if f in needfiles:
960 958 needs = needfiles[f]
961 959 for new in xrange(o, len(fl)):
962 960 n = fl.node(new)
963 961 if n in needs:
964 962 needs.remove(n)
965 963 else:
966 964 raise error.Abort(
967 965 _("received spurious file revlog entry"))
968 966 if not needs:
969 967 del needfiles[f]
970 968 repo.ui.progress(_('files'), None)
971 969
972 970 for f, needs in needfiles.iteritems():
973 971 fl = repo.file(f)
974 972 for n in needs:
975 973 try:
976 974 fl.rev(n)
977 975 except error.LookupError:
978 976 raise error.Abort(
979 977 _('missing file data for %s:%s - run hg verify') %
980 978 (f, hex(n)))
981 979
982 980 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now