##// END OF EJS Templates
phases: rework phase movement code in 'cg.apply' to use 'registernew'...
Boris Feld -
r33456:ae052d04 default
parent child Browse files
Show More
@@ -1,1005 +1,1009 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullrev,
19 19 short,
20 20 )
21 21
22 22 from . import (
23 23 dagutil,
24 24 discovery,
25 25 error,
26 26 mdiff,
27 27 phases,
28 28 pycompat,
29 29 util,
30 30 )
31 31
32 32 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
33 33 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
34 34 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
35 35
36 36 def readexactly(stream, n):
37 37 '''read n bytes from stream.read and abort if less was available'''
38 38 s = stream.read(n)
39 39 if len(s) < n:
40 40 raise error.Abort(_("stream ended unexpectedly"
41 41 " (got %d bytes, expected %d)")
42 42 % (len(s), n))
43 43 return s
44 44
45 45 def getchunk(stream):
46 46 """return the next chunk from stream as a string"""
47 47 d = readexactly(stream, 4)
48 48 l = struct.unpack(">l", d)[0]
49 49 if l <= 4:
50 50 if l:
51 51 raise error.Abort(_("invalid chunk length %d") % l)
52 52 return ""
53 53 return readexactly(stream, l - 4)
54 54
55 55 def chunkheader(length):
56 56 """return a changegroup chunk header (string)"""
57 57 return struct.pack(">l", length + 4)
58 58
59 59 def closechunk():
60 60 """return a changegroup chunk header (string) for a zero-length chunk"""
61 61 return struct.pack(">l", 0)
62 62
63 63 def writechunks(ui, chunks, filename, vfs=None):
64 64 """Write chunks to a file and return its filename.
65 65
66 66 The stream is assumed to be a bundle file.
67 67 Existing files will not be overwritten.
68 68 If no filename is specified, a temporary file is created.
69 69 """
70 70 fh = None
71 71 cleanup = None
72 72 try:
73 73 if filename:
74 74 if vfs:
75 75 fh = vfs.open(filename, "wb")
76 76 else:
77 77 # Increase default buffer size because default is usually
78 78 # small (4k is common on Linux).
79 79 fh = open(filename, "wb", 131072)
80 80 else:
81 81 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
82 82 fh = os.fdopen(fd, pycompat.sysstr("wb"))
83 83 cleanup = filename
84 84 for c in chunks:
85 85 fh.write(c)
86 86 cleanup = None
87 87 return filename
88 88 finally:
89 89 if fh is not None:
90 90 fh.close()
91 91 if cleanup is not None:
92 92 if filename and vfs:
93 93 vfs.unlink(cleanup)
94 94 else:
95 95 os.unlink(cleanup)
96 96
97 97 class cg1unpacker(object):
98 98 """Unpacker for cg1 changegroup streams.
99 99
100 100 A changegroup unpacker handles the framing of the revision data in
101 101 the wire format. Most consumers will want to use the apply()
102 102 method to add the changes from the changegroup to a repository.
103 103
104 104 If you're forwarding a changegroup unmodified to another consumer,
105 105 use getchunks(), which returns an iterator of changegroup
106 106 chunks. This is mostly useful for cases where you need to know the
107 107 data stream has ended by observing the end of the changegroup.
108 108
109 109 deltachunk() is useful only if you're applying delta data. Most
110 110 consumers should prefer apply() instead.
111 111
112 112 A few other public methods exist. Those are used only for
113 113 bundlerepo and some debug commands - their use is discouraged.
114 114 """
115 115 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
116 116 deltaheadersize = struct.calcsize(deltaheader)
117 117 version = '01'
118 118 _grouplistcount = 1 # One list of files after the manifests
119 119
120 120 def __init__(self, fh, alg, extras=None):
121 121 if alg is None:
122 122 alg = 'UN'
123 123 if alg not in util.compengines.supportedbundletypes:
124 124 raise error.Abort(_('unknown stream compression type: %s')
125 125 % alg)
126 126 if alg == 'BZ':
127 127 alg = '_truncatedBZ'
128 128
129 129 compengine = util.compengines.forbundletype(alg)
130 130 self._stream = compengine.decompressorreader(fh)
131 131 self._type = alg
132 132 self.extras = extras or {}
133 133 self.callback = None
134 134
135 135 # These methods (compressed, read, seek, tell) all appear to only
136 136 # be used by bundlerepo, but it's a little hard to tell.
137 137 def compressed(self):
138 138 return self._type is not None and self._type != 'UN'
139 139 def read(self, l):
140 140 return self._stream.read(l)
141 141 def seek(self, pos):
142 142 return self._stream.seek(pos)
143 143 def tell(self):
144 144 return self._stream.tell()
145 145 def close(self):
146 146 return self._stream.close()
147 147
148 148 def _chunklength(self):
149 149 d = readexactly(self._stream, 4)
150 150 l = struct.unpack(">l", d)[0]
151 151 if l <= 4:
152 152 if l:
153 153 raise error.Abort(_("invalid chunk length %d") % l)
154 154 return 0
155 155 if self.callback:
156 156 self.callback()
157 157 return l - 4
158 158
159 159 def changelogheader(self):
160 160 """v10 does not have a changelog header chunk"""
161 161 return {}
162 162
163 163 def manifestheader(self):
164 164 """v10 does not have a manifest header chunk"""
165 165 return {}
166 166
167 167 def filelogheader(self):
168 168 """return the header of the filelogs chunk, v10 only has the filename"""
169 169 l = self._chunklength()
170 170 if not l:
171 171 return {}
172 172 fname = readexactly(self._stream, l)
173 173 return {'filename': fname}
174 174
175 175 def _deltaheader(self, headertuple, prevnode):
176 176 node, p1, p2, cs = headertuple
177 177 if prevnode is None:
178 178 deltabase = p1
179 179 else:
180 180 deltabase = prevnode
181 181 flags = 0
182 182 return node, p1, p2, deltabase, cs, flags
183 183
184 184 def deltachunk(self, prevnode):
185 185 l = self._chunklength()
186 186 if not l:
187 187 return {}
188 188 headerdata = readexactly(self._stream, self.deltaheadersize)
189 189 header = struct.unpack(self.deltaheader, headerdata)
190 190 delta = readexactly(self._stream, l - self.deltaheadersize)
191 191 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
192 192 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
193 193 'deltabase': deltabase, 'delta': delta, 'flags': flags}
194 194
195 195 def getchunks(self):
196 196 """returns all the chunks contains in the bundle
197 197
198 198 Used when you need to forward the binary stream to a file or another
199 199 network API. To do so, it parse the changegroup data, otherwise it will
200 200 block in case of sshrepo because it don't know the end of the stream.
201 201 """
202 202 # an empty chunkgroup is the end of the changegroup
203 203 # a changegroup has at least 2 chunkgroups (changelog and manifest).
204 204 # after that, changegroup versions 1 and 2 have a series of groups
205 205 # with one group per file. changegroup 3 has a series of directory
206 206 # manifests before the files.
207 207 count = 0
208 208 emptycount = 0
209 209 while emptycount < self._grouplistcount:
210 210 empty = True
211 211 count += 1
212 212 while True:
213 213 chunk = getchunk(self)
214 214 if not chunk:
215 215 if empty and count > 2:
216 216 emptycount += 1
217 217 break
218 218 empty = False
219 219 yield chunkheader(len(chunk))
220 220 pos = 0
221 221 while pos < len(chunk):
222 222 next = pos + 2**20
223 223 yield chunk[pos:next]
224 224 pos = next
225 225 yield closechunk()
226 226
227 227 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
228 228 # We know that we'll never have more manifests than we had
229 229 # changesets.
230 230 self.callback = prog(_('manifests'), numchanges)
231 231 # no need to check for empty manifest group here:
232 232 # if the result of the merge of 1 and 2 is the same in 3 and 4,
233 233 # no new manifest will be created and the manifest group will
234 234 # be empty during the pull
235 235 self.manifestheader()
236 236 repo.manifestlog._revlog.addgroup(self, revmap, trp)
237 237 repo.ui.progress(_('manifests'), None)
238 238 self.callback = None
239 239
240 240 def apply(self, repo, tr, srctype, url, targetphase=phases.draft,
241 241 expectedtotal=None):
242 242 """Add the changegroup returned by source.read() to this repo.
243 243 srctype is a string like 'push', 'pull', or 'unbundle'. url is
244 244 the URL of the repo where this changegroup is coming from.
245 245
246 246 Return an integer summarizing the change to this repo:
247 247 - nothing changed or no source: 0
248 248 - more heads than before: 1+added heads (2..n)
249 249 - fewer heads than before: -1-removed heads (-2..-n)
250 250 - number of heads stays the same: 1
251 251 """
252 252 repo = repo.unfiltered()
253 253 def csmap(x):
254 254 repo.ui.debug("add changeset %s\n" % short(x))
255 255 return len(cl)
256 256
257 257 def revmap(x):
258 258 return cl.rev(x)
259 259
260 260 changesets = files = revisions = 0
261 261
262 262 try:
263 263 # The transaction may already carry source information. In this
264 264 # case we use the top level data. We overwrite the argument
265 265 # because we need to use the top level value (if they exist)
266 266 # in this function.
267 267 srctype = tr.hookargs.setdefault('source', srctype)
268 268 url = tr.hookargs.setdefault('url', url)
269 269 repo.hook('prechangegroup', throw=True, **tr.hookargs)
270 270
271 271 # write changelog data to temp files so concurrent readers
272 272 # will not see an inconsistent view
273 273 cl = repo.changelog
274 274 cl.delayupdate(tr)
275 275 oldheads = set(cl.heads())
276 276
277 277 trp = weakref.proxy(tr)
278 278 # pull off the changeset group
279 279 repo.ui.status(_("adding changesets\n"))
280 280 clstart = len(cl)
281 281 class prog(object):
282 282 def __init__(self, step, total):
283 283 self._step = step
284 284 self._total = total
285 285 self._count = 1
286 286 def __call__(self):
287 287 repo.ui.progress(self._step, self._count, unit=_('chunks'),
288 288 total=self._total)
289 289 self._count += 1
290 290 self.callback = prog(_('changesets'), expectedtotal)
291 291
292 292 efiles = set()
293 293 def onchangelog(cl, node):
294 294 efiles.update(cl.readfiles(node))
295 295
296 296 self.changelogheader()
297 297 cgnodes = cl.addgroup(self, csmap, trp, addrevisioncb=onchangelog)
298 298 efiles = len(efiles)
299 299
300 300 if not cgnodes:
301 301 repo.ui.develwarn('applied empty changegroup',
302 302 config='empty-changegroup')
303 303 clend = len(cl)
304 304 changesets = clend - clstart
305 305 repo.ui.progress(_('changesets'), None)
306 306 self.callback = None
307 307
308 308 # pull off the manifest group
309 309 repo.ui.status(_("adding manifests\n"))
310 310 self._unpackmanifests(repo, revmap, trp, prog, changesets)
311 311
312 312 needfiles = {}
313 313 if repo.ui.configbool('server', 'validate'):
314 314 cl = repo.changelog
315 315 ml = repo.manifestlog
316 316 # validate incoming csets have their manifests
317 317 for cset in xrange(clstart, clend):
318 318 mfnode = cl.changelogrevision(cset).manifest
319 319 mfest = ml[mfnode].readdelta()
320 320 # store file cgnodes we must see
321 321 for f, n in mfest.iteritems():
322 322 needfiles.setdefault(f, set()).add(n)
323 323
324 324 # process the files
325 325 repo.ui.status(_("adding file changes\n"))
326 326 newrevs, newfiles = _addchangegroupfiles(
327 327 repo, self, revmap, trp, efiles, needfiles)
328 328 revisions += newrevs
329 329 files += newfiles
330 330
331 331 deltaheads = 0
332 332 if oldheads:
333 333 heads = cl.heads()
334 334 deltaheads = len(heads) - len(oldheads)
335 335 for h in heads:
336 336 if h not in oldheads and repo[h].closesbranch():
337 337 deltaheads -= 1
338 338 htext = ""
339 339 if deltaheads:
340 340 htext = _(" (%+d heads)") % deltaheads
341 341
342 342 repo.ui.status(_("added %d changesets"
343 343 " with %d changes to %d files%s\n")
344 344 % (changesets, revisions, files, htext))
345 345 repo.invalidatevolatilesets()
346 346
347 347 if changesets > 0:
348 348 if 'node' not in tr.hookargs:
349 349 tr.hookargs['node'] = hex(cl.node(clstart))
350 350 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
351 351 hookargs = dict(tr.hookargs)
352 352 else:
353 353 hookargs = dict(tr.hookargs)
354 354 hookargs['node'] = hex(cl.node(clstart))
355 355 hookargs['node_last'] = hex(cl.node(clend - 1))
356 356 repo.hook('pretxnchangegroup', throw=True, **hookargs)
357 357
358 358 added = [cl.node(r) for r in xrange(clstart, clend)]
359 phaseall = None
359 360 if srctype in ('push', 'serve'):
360 361 # Old servers can not push the boundary themselves.
361 362 # New servers won't push the boundary if changeset already
362 363 # exists locally as secret
363 364 #
364 365 # We should not use added here but the list of all change in
365 366 # the bundle
366 367 if repo.publishing():
367 phases.advanceboundary(repo, tr, phases.public, cgnodes)
368 targetphase = phaseall = phases.public
368 369 else:
370 # closer target phase computation
371
369 372 # Those changesets have been pushed from the
370 373 # outside, their phases are going to be pushed
371 374 # alongside. Therefor `targetphase` is
372 375 # ignored.
373 phases.advanceboundary(repo, tr, phases.draft, cgnodes)
374 phases.retractboundary(repo, tr, phases.draft, added)
375 else:
376 phases.retractboundary(repo, tr, targetphase, added)
376 targetphase = phaseall = phases.draft
377 if added:
378 phases.registernew(repo, tr, targetphase, added)
379 if phaseall is not None:
380 phases.advanceboundary(repo, tr, phaseall, cgnodes)
377 381
378 382 if changesets > 0:
379 383
380 384 def runhooks():
381 385 # These hooks run when the lock releases, not when the
382 386 # transaction closes. So it's possible for the changelog
383 387 # to have changed since we last saw it.
384 388 if clstart >= len(repo):
385 389 return
386 390
387 391 repo.hook("changegroup", **hookargs)
388 392
389 393 for n in added:
390 394 args = hookargs.copy()
391 395 args['node'] = hex(n)
392 396 del args['node_last']
393 397 repo.hook("incoming", **args)
394 398
395 399 newheads = [h for h in repo.heads()
396 400 if h not in oldheads]
397 401 repo.ui.log("incoming",
398 402 "%s incoming changes - new heads: %s\n",
399 403 len(added),
400 404 ', '.join([hex(c[:6]) for c in newheads]))
401 405
402 406 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
403 407 lambda tr: repo._afterlock(runhooks))
404 408 finally:
405 409 repo.ui.flush()
406 410 # never return 0 here:
407 411 if deltaheads < 0:
408 412 ret = deltaheads - 1
409 413 else:
410 414 ret = deltaheads + 1
411 415 return ret, added
412 416
413 417 class cg2unpacker(cg1unpacker):
414 418 """Unpacker for cg2 streams.
415 419
416 420 cg2 streams add support for generaldelta, so the delta header
417 421 format is slightly different. All other features about the data
418 422 remain the same.
419 423 """
420 424 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
421 425 deltaheadersize = struct.calcsize(deltaheader)
422 426 version = '02'
423 427
424 428 def _deltaheader(self, headertuple, prevnode):
425 429 node, p1, p2, deltabase, cs = headertuple
426 430 flags = 0
427 431 return node, p1, p2, deltabase, cs, flags
428 432
429 433 class cg3unpacker(cg2unpacker):
430 434 """Unpacker for cg3 streams.
431 435
432 436 cg3 streams add support for exchanging treemanifests and revlog
433 437 flags. It adds the revlog flags to the delta header and an empty chunk
434 438 separating manifests and files.
435 439 """
436 440 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
437 441 deltaheadersize = struct.calcsize(deltaheader)
438 442 version = '03'
439 443 _grouplistcount = 2 # One list of manifests and one list of files
440 444
441 445 def _deltaheader(self, headertuple, prevnode):
442 446 node, p1, p2, deltabase, cs, flags = headertuple
443 447 return node, p1, p2, deltabase, cs, flags
444 448
445 449 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
446 450 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog,
447 451 numchanges)
448 452 for chunkdata in iter(self.filelogheader, {}):
449 453 # If we get here, there are directory manifests in the changegroup
450 454 d = chunkdata["filename"]
451 455 repo.ui.debug("adding %s revisions\n" % d)
452 456 dirlog = repo.manifestlog._revlog.dirlog(d)
453 457 if not dirlog.addgroup(self, revmap, trp):
454 458 raise error.Abort(_("received dir revlog group is empty"))
455 459
456 460 class headerlessfixup(object):
457 461 def __init__(self, fh, h):
458 462 self._h = h
459 463 self._fh = fh
460 464 def read(self, n):
461 465 if self._h:
462 466 d, self._h = self._h[:n], self._h[n:]
463 467 if len(d) < n:
464 468 d += readexactly(self._fh, n - len(d))
465 469 return d
466 470 return readexactly(self._fh, n)
467 471
468 472 class cg1packer(object):
469 473 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
470 474 version = '01'
471 475 def __init__(self, repo, bundlecaps=None):
472 476 """Given a source repo, construct a bundler.
473 477
474 478 bundlecaps is optional and can be used to specify the set of
475 479 capabilities which can be used to build the bundle. While bundlecaps is
476 480 unused in core Mercurial, extensions rely on this feature to communicate
477 481 capabilities to customize the changegroup packer.
478 482 """
479 483 # Set of capabilities we can use to build the bundle.
480 484 if bundlecaps is None:
481 485 bundlecaps = set()
482 486 self._bundlecaps = bundlecaps
483 487 # experimental config: bundle.reorder
484 488 reorder = repo.ui.config('bundle', 'reorder')
485 489 if reorder == 'auto':
486 490 reorder = None
487 491 else:
488 492 reorder = util.parsebool(reorder)
489 493 self._repo = repo
490 494 self._reorder = reorder
491 495 self._progress = repo.ui.progress
492 496 if self._repo.ui.verbose and not self._repo.ui.debugflag:
493 497 self._verbosenote = self._repo.ui.note
494 498 else:
495 499 self._verbosenote = lambda s: None
496 500
497 501 def close(self):
498 502 return closechunk()
499 503
500 504 def fileheader(self, fname):
501 505 return chunkheader(len(fname)) + fname
502 506
503 507 # Extracted both for clarity and for overriding in extensions.
504 508 def _sortgroup(self, revlog, nodelist, lookup):
505 509 """Sort nodes for change group and turn them into revnums."""
506 510 # for generaldelta revlogs, we linearize the revs; this will both be
507 511 # much quicker and generate a much smaller bundle
508 512 if (revlog._generaldelta and self._reorder is None) or self._reorder:
509 513 dag = dagutil.revlogdag(revlog)
510 514 return dag.linearize(set(revlog.rev(n) for n in nodelist))
511 515 else:
512 516 return sorted([revlog.rev(n) for n in nodelist])
513 517
514 518 def group(self, nodelist, revlog, lookup, units=None):
515 519 """Calculate a delta group, yielding a sequence of changegroup chunks
516 520 (strings).
517 521
518 522 Given a list of changeset revs, return a set of deltas and
519 523 metadata corresponding to nodes. The first delta is
520 524 first parent(nodelist[0]) -> nodelist[0], the receiver is
521 525 guaranteed to have this parent as it has all history before
522 526 these changesets. In the case firstparent is nullrev the
523 527 changegroup starts with a full revision.
524 528
525 529 If units is not None, progress detail will be generated, units specifies
526 530 the type of revlog that is touched (changelog, manifest, etc.).
527 531 """
528 532 # if we don't have any revisions touched by these changesets, bail
529 533 if len(nodelist) == 0:
530 534 yield self.close()
531 535 return
532 536
533 537 revs = self._sortgroup(revlog, nodelist, lookup)
534 538
535 539 # add the parent of the first rev
536 540 p = revlog.parentrevs(revs[0])[0]
537 541 revs.insert(0, p)
538 542
539 543 # build deltas
540 544 total = len(revs) - 1
541 545 msgbundling = _('bundling')
542 546 for r in xrange(len(revs) - 1):
543 547 if units is not None:
544 548 self._progress(msgbundling, r + 1, unit=units, total=total)
545 549 prev, curr = revs[r], revs[r + 1]
546 550 linknode = lookup(revlog.node(curr))
547 551 for c in self.revchunk(revlog, curr, prev, linknode):
548 552 yield c
549 553
550 554 if units is not None:
551 555 self._progress(msgbundling, None)
552 556 yield self.close()
553 557
554 558 # filter any nodes that claim to be part of the known set
555 559 def prune(self, revlog, missing, commonrevs):
556 560 rr, rl = revlog.rev, revlog.linkrev
557 561 return [n for n in missing if rl(rr(n)) not in commonrevs]
558 562
559 563 def _packmanifests(self, dir, mfnodes, lookuplinknode):
560 564 """Pack flat manifests into a changegroup stream."""
561 565 assert not dir
562 566 for chunk in self.group(mfnodes, self._repo.manifestlog._revlog,
563 567 lookuplinknode, units=_('manifests')):
564 568 yield chunk
565 569
566 570 def _manifestsdone(self):
567 571 return ''
568 572
569 573 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
570 574 '''yield a sequence of changegroup chunks (strings)'''
571 575 repo = self._repo
572 576 cl = repo.changelog
573 577
574 578 clrevorder = {}
575 579 mfs = {} # needed manifests
576 580 fnodes = {} # needed file nodes
577 581 changedfiles = set()
578 582
579 583 # Callback for the changelog, used to collect changed files and manifest
580 584 # nodes.
581 585 # Returns the linkrev node (identity in the changelog case).
582 586 def lookupcl(x):
583 587 c = cl.read(x)
584 588 clrevorder[x] = len(clrevorder)
585 589 n = c[0]
586 590 # record the first changeset introducing this manifest version
587 591 mfs.setdefault(n, x)
588 592 # Record a complete list of potentially-changed files in
589 593 # this manifest.
590 594 changedfiles.update(c[3])
591 595 return x
592 596
593 597 self._verbosenote(_('uncompressed size of bundle content:\n'))
594 598 size = 0
595 599 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
596 600 size += len(chunk)
597 601 yield chunk
598 602 self._verbosenote(_('%8.i (changelog)\n') % size)
599 603
600 604 # We need to make sure that the linkrev in the changegroup refers to
601 605 # the first changeset that introduced the manifest or file revision.
602 606 # The fastpath is usually safer than the slowpath, because the filelogs
603 607 # are walked in revlog order.
604 608 #
605 609 # When taking the slowpath with reorder=None and the manifest revlog
606 610 # uses generaldelta, the manifest may be walked in the "wrong" order.
607 611 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
608 612 # cc0ff93d0c0c).
609 613 #
610 614 # When taking the fastpath, we are only vulnerable to reordering
611 615 # of the changelog itself. The changelog never uses generaldelta, so
612 616 # it is only reordered when reorder=True. To handle this case, we
613 617 # simply take the slowpath, which already has the 'clrevorder' logic.
614 618 # This was also fixed in cc0ff93d0c0c.
615 619 fastpathlinkrev = fastpathlinkrev and not self._reorder
616 620 # Treemanifests don't work correctly with fastpathlinkrev
617 621 # either, because we don't discover which directory nodes to
618 622 # send along with files. This could probably be fixed.
619 623 fastpathlinkrev = fastpathlinkrev and (
620 624 'treemanifest' not in repo.requirements)
621 625
622 626 for chunk in self.generatemanifests(commonrevs, clrevorder,
623 627 fastpathlinkrev, mfs, fnodes):
624 628 yield chunk
625 629 mfs.clear()
626 630 clrevs = set(cl.rev(x) for x in clnodes)
627 631
628 632 if not fastpathlinkrev:
629 633 def linknodes(unused, fname):
630 634 return fnodes.get(fname, {})
631 635 else:
632 636 cln = cl.node
633 637 def linknodes(filerevlog, fname):
634 638 llr = filerevlog.linkrev
635 639 fln = filerevlog.node
636 640 revs = ((r, llr(r)) for r in filerevlog)
637 641 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
638 642
639 643 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
640 644 source):
641 645 yield chunk
642 646
643 647 yield self.close()
644 648
645 649 if clnodes:
646 650 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
647 651
648 652 def generatemanifests(self, commonrevs, clrevorder, fastpathlinkrev, mfs,
649 653 fnodes):
650 654 repo = self._repo
651 655 mfl = repo.manifestlog
652 656 dirlog = mfl._revlog.dirlog
653 657 tmfnodes = {'': mfs}
654 658
655 659 # Callback for the manifest, used to collect linkrevs for filelog
656 660 # revisions.
657 661 # Returns the linkrev node (collected in lookupcl).
658 662 def makelookupmflinknode(dir):
659 663 if fastpathlinkrev:
660 664 assert not dir
661 665 return mfs.__getitem__
662 666
663 667 def lookupmflinknode(x):
664 668 """Callback for looking up the linknode for manifests.
665 669
666 670 Returns the linkrev node for the specified manifest.
667 671
668 672 SIDE EFFECT:
669 673
670 674 1) fclnodes gets populated with the list of relevant
671 675 file nodes if we're not using fastpathlinkrev
672 676 2) When treemanifests are in use, collects treemanifest nodes
673 677 to send
674 678
675 679 Note that this means manifests must be completely sent to
676 680 the client before you can trust the list of files and
677 681 treemanifests to send.
678 682 """
679 683 clnode = tmfnodes[dir][x]
680 684 mdata = mfl.get(dir, x).readfast(shallow=True)
681 685 for p, n, fl in mdata.iterentries():
682 686 if fl == 't': # subdirectory manifest
683 687 subdir = dir + p + '/'
684 688 tmfclnodes = tmfnodes.setdefault(subdir, {})
685 689 tmfclnode = tmfclnodes.setdefault(n, clnode)
686 690 if clrevorder[clnode] < clrevorder[tmfclnode]:
687 691 tmfclnodes[n] = clnode
688 692 else:
689 693 f = dir + p
690 694 fclnodes = fnodes.setdefault(f, {})
691 695 fclnode = fclnodes.setdefault(n, clnode)
692 696 if clrevorder[clnode] < clrevorder[fclnode]:
693 697 fclnodes[n] = clnode
694 698 return clnode
695 699 return lookupmflinknode
696 700
697 701 size = 0
698 702 while tmfnodes:
699 703 dir = min(tmfnodes)
700 704 nodes = tmfnodes[dir]
701 705 prunednodes = self.prune(dirlog(dir), nodes, commonrevs)
702 706 if not dir or prunednodes:
703 707 for x in self._packmanifests(dir, prunednodes,
704 708 makelookupmflinknode(dir)):
705 709 size += len(x)
706 710 yield x
707 711 del tmfnodes[dir]
708 712 self._verbosenote(_('%8.i (manifests)\n') % size)
709 713 yield self._manifestsdone()
710 714
711 715 # The 'source' parameter is useful for extensions
712 716 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
713 717 repo = self._repo
714 718 progress = self._progress
715 719 msgbundling = _('bundling')
716 720
717 721 total = len(changedfiles)
718 722 # for progress output
719 723 msgfiles = _('files')
720 724 for i, fname in enumerate(sorted(changedfiles)):
721 725 filerevlog = repo.file(fname)
722 726 if not filerevlog:
723 727 raise error.Abort(_("empty or missing revlog for %s") % fname)
724 728
725 729 linkrevnodes = linknodes(filerevlog, fname)
726 730 # Lookup for filenodes, we collected the linkrev nodes above in the
727 731 # fastpath case and with lookupmf in the slowpath case.
728 732 def lookupfilelog(x):
729 733 return linkrevnodes[x]
730 734
731 735 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
732 736 if filenodes:
733 737 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
734 738 total=total)
735 739 h = self.fileheader(fname)
736 740 size = len(h)
737 741 yield h
738 742 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
739 743 size += len(chunk)
740 744 yield chunk
741 745 self._verbosenote(_('%8.i %s\n') % (size, fname))
742 746 progress(msgbundling, None)
743 747
744 748 def deltaparent(self, revlog, rev, p1, p2, prev):
745 749 return prev
746 750
747 751 def revchunk(self, revlog, rev, prev, linknode):
748 752 node = revlog.node(rev)
749 753 p1, p2 = revlog.parentrevs(rev)
750 754 base = self.deltaparent(revlog, rev, p1, p2, prev)
751 755
752 756 prefix = ''
753 757 if revlog.iscensored(base) or revlog.iscensored(rev):
754 758 try:
755 759 delta = revlog.revision(node, raw=True)
756 760 except error.CensoredNodeError as e:
757 761 delta = e.tombstone
758 762 if base == nullrev:
759 763 prefix = mdiff.trivialdiffheader(len(delta))
760 764 else:
761 765 baselen = revlog.rawsize(base)
762 766 prefix = mdiff.replacediffheader(baselen, len(delta))
763 767 elif base == nullrev:
764 768 delta = revlog.revision(node, raw=True)
765 769 prefix = mdiff.trivialdiffheader(len(delta))
766 770 else:
767 771 delta = revlog.revdiff(base, rev)
768 772 p1n, p2n = revlog.parents(node)
769 773 basenode = revlog.node(base)
770 774 flags = revlog.flags(rev)
771 775 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
772 776 meta += prefix
773 777 l = len(meta) + len(delta)
774 778 yield chunkheader(l)
775 779 yield meta
776 780 yield delta
777 781 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
778 782 # do nothing with basenode, it is implicitly the previous one in HG10
779 783 # do nothing with flags, it is implicitly 0 for cg1 and cg2
780 784 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
781 785
782 786 class cg2packer(cg1packer):
783 787 version = '02'
784 788 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
785 789
786 790 def __init__(self, repo, bundlecaps=None):
787 791 super(cg2packer, self).__init__(repo, bundlecaps)
788 792 if self._reorder is None:
789 793 # Since generaldelta is directly supported by cg2, reordering
790 794 # generally doesn't help, so we disable it by default (treating
791 795 # bundle.reorder=auto just like bundle.reorder=False).
792 796 self._reorder = False
793 797
794 798 def deltaparent(self, revlog, rev, p1, p2, prev):
795 799 dp = revlog.deltaparent(rev)
796 800 if dp == nullrev and revlog.storedeltachains:
797 801 # Avoid sending full revisions when delta parent is null. Pick prev
798 802 # in that case. It's tempting to pick p1 in this case, as p1 will
799 803 # be smaller in the common case. However, computing a delta against
800 804 # p1 may require resolving the raw text of p1, which could be
801 805 # expensive. The revlog caches should have prev cached, meaning
802 806 # less CPU for changegroup generation. There is likely room to add
803 807 # a flag and/or config option to control this behavior.
804 808 return prev
805 809 elif dp == nullrev:
806 810 # revlog is configured to use full snapshot for a reason,
807 811 # stick to full snapshot.
808 812 return nullrev
809 813 elif dp not in (p1, p2, prev):
810 814 # Pick prev when we can't be sure remote has the base revision.
811 815 return prev
812 816 else:
813 817 return dp
814 818
815 819 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
816 820 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
817 821 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
818 822
819 823 class cg3packer(cg2packer):
820 824 version = '03'
821 825 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
822 826
823 827 def _packmanifests(self, dir, mfnodes, lookuplinknode):
824 828 if dir:
825 829 yield self.fileheader(dir)
826 830
827 831 dirlog = self._repo.manifestlog._revlog.dirlog(dir)
828 832 for chunk in self.group(mfnodes, dirlog, lookuplinknode,
829 833 units=_('manifests')):
830 834 yield chunk
831 835
832 836 def _manifestsdone(self):
833 837 return self.close()
834 838
835 839 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
836 840 return struct.pack(
837 841 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
838 842
839 843 _packermap = {'01': (cg1packer, cg1unpacker),
840 844 # cg2 adds support for exchanging generaldelta
841 845 '02': (cg2packer, cg2unpacker),
842 846 # cg3 adds support for exchanging revlog flags and treemanifests
843 847 '03': (cg3packer, cg3unpacker),
844 848 }
845 849
846 850 def allsupportedversions(repo):
847 851 versions = set(_packermap.keys())
848 852 if not (repo.ui.configbool('experimental', 'changegroup3') or
849 853 repo.ui.configbool('experimental', 'treemanifest') or
850 854 'treemanifest' in repo.requirements):
851 855 versions.discard('03')
852 856 return versions
853 857
854 858 # Changegroup versions that can be applied to the repo
855 859 def supportedincomingversions(repo):
856 860 return allsupportedversions(repo)
857 861
858 862 # Changegroup versions that can be created from the repo
859 863 def supportedoutgoingversions(repo):
860 864 versions = allsupportedversions(repo)
861 865 if 'treemanifest' in repo.requirements:
862 866 # Versions 01 and 02 support only flat manifests and it's just too
863 867 # expensive to convert between the flat manifest and tree manifest on
864 868 # the fly. Since tree manifests are hashed differently, all of history
865 869 # would have to be converted. Instead, we simply don't even pretend to
866 870 # support versions 01 and 02.
867 871 versions.discard('01')
868 872 versions.discard('02')
869 873 return versions
870 874
871 875 def safeversion(repo):
872 876 # Finds the smallest version that it's safe to assume clients of the repo
873 877 # will support. For example, all hg versions that support generaldelta also
874 878 # support changegroup 02.
875 879 versions = supportedoutgoingversions(repo)
876 880 if 'generaldelta' in repo.requirements:
877 881 versions.discard('01')
878 882 assert versions
879 883 return min(versions)
880 884
881 885 def getbundler(version, repo, bundlecaps=None):
882 886 assert version in supportedoutgoingversions(repo)
883 887 return _packermap[version][0](repo, bundlecaps)
884 888
885 889 def getunbundler(version, fh, alg, extras=None):
886 890 return _packermap[version][1](fh, alg, extras=extras)
887 891
888 892 def _changegroupinfo(repo, nodes, source):
889 893 if repo.ui.verbose or source == 'bundle':
890 894 repo.ui.status(_("%d changesets found\n") % len(nodes))
891 895 if repo.ui.debugflag:
892 896 repo.ui.debug("list of changesets:\n")
893 897 for node in nodes:
894 898 repo.ui.debug("%s\n" % hex(node))
895 899
896 900 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
897 901 repo = repo.unfiltered()
898 902 commonrevs = outgoing.common
899 903 csets = outgoing.missing
900 904 heads = outgoing.missingheads
901 905 # We go through the fast path if we get told to, or if all (unfiltered
902 906 # heads have been requested (since we then know there all linkrevs will
903 907 # be pulled by the client).
904 908 heads.sort()
905 909 fastpathlinkrev = fastpath or (
906 910 repo.filtername is None and heads == sorted(repo.heads()))
907 911
908 912 repo.hook('preoutgoing', throw=True, source=source)
909 913 _changegroupinfo(repo, csets, source)
910 914 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
911 915
912 916 def getsubset(repo, outgoing, bundler, source, fastpath=False):
913 917 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
914 918 return getunbundler(bundler.version, util.chunkbuffer(gengroup), None,
915 919 {'clcount': len(outgoing.missing)})
916 920
917 921 def changegroupsubset(repo, roots, heads, source, version='01'):
918 922 """Compute a changegroup consisting of all the nodes that are
919 923 descendants of any of the roots and ancestors of any of the heads.
920 924 Return a chunkbuffer object whose read() method will return
921 925 successive changegroup chunks.
922 926
923 927 It is fairly complex as determining which filenodes and which
924 928 manifest nodes need to be included for the changeset to be complete
925 929 is non-trivial.
926 930
927 931 Another wrinkle is doing the reverse, figuring out which changeset in
928 932 the changegroup a particular filenode or manifestnode belongs to.
929 933 """
930 934 outgoing = discovery.outgoing(repo, missingroots=roots, missingheads=heads)
931 935 bundler = getbundler(version, repo)
932 936 return getsubset(repo, outgoing, bundler, source)
933 937
934 938 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
935 939 version='01'):
936 940 """Like getbundle, but taking a discovery.outgoing as an argument.
937 941
938 942 This is only implemented for local repos and reuses potentially
939 943 precomputed sets in outgoing. Returns a raw changegroup generator."""
940 944 if not outgoing.missing:
941 945 return None
942 946 bundler = getbundler(version, repo, bundlecaps)
943 947 return getsubsetraw(repo, outgoing, bundler, source)
944 948
945 949 def getchangegroup(repo, source, outgoing, bundlecaps=None,
946 950 version='01'):
947 951 """Like getbundle, but taking a discovery.outgoing as an argument.
948 952
949 953 This is only implemented for local repos and reuses potentially
950 954 precomputed sets in outgoing."""
951 955 if not outgoing.missing:
952 956 return None
953 957 bundler = getbundler(version, repo, bundlecaps)
954 958 return getsubset(repo, outgoing, bundler, source)
955 959
956 960 def getlocalchangegroup(repo, *args, **kwargs):
957 961 repo.ui.deprecwarn('getlocalchangegroup is deprecated, use getchangegroup',
958 962 '4.3')
959 963 return getchangegroup(repo, *args, **kwargs)
960 964
961 965 def changegroup(repo, basenodes, source):
962 966 # to avoid a race we use changegroupsubset() (issue1320)
963 967 return changegroupsubset(repo, basenodes, repo.heads(), source)
964 968
965 969 def _addchangegroupfiles(repo, source, revmap, trp, expectedfiles, needfiles):
966 970 revisions = 0
967 971 files = 0
968 972 for chunkdata in iter(source.filelogheader, {}):
969 973 files += 1
970 974 f = chunkdata["filename"]
971 975 repo.ui.debug("adding %s revisions\n" % f)
972 976 repo.ui.progress(_('files'), files, unit=_('files'),
973 977 total=expectedfiles)
974 978 fl = repo.file(f)
975 979 o = len(fl)
976 980 try:
977 981 if not fl.addgroup(source, revmap, trp):
978 982 raise error.Abort(_("received file revlog group is empty"))
979 983 except error.CensoredBaseError as e:
980 984 raise error.Abort(_("received delta base is censored: %s") % e)
981 985 revisions += len(fl) - o
982 986 if f in needfiles:
983 987 needs = needfiles[f]
984 988 for new in xrange(o, len(fl)):
985 989 n = fl.node(new)
986 990 if n in needs:
987 991 needs.remove(n)
988 992 else:
989 993 raise error.Abort(
990 994 _("received spurious file revlog entry"))
991 995 if not needs:
992 996 del needfiles[f]
993 997 repo.ui.progress(_('files'), None)
994 998
995 999 for f, needs in needfiles.iteritems():
996 1000 fl = repo.file(f)
997 1001 for n in needs:
998 1002 try:
999 1003 fl.rev(n)
1000 1004 except error.LookupError:
1001 1005 raise error.Abort(
1002 1006 _('missing file data for %s:%s - run hg verify') %
1003 1007 (f, hex(n)))
1004 1008
1005 1009 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now