##// END OF EJS Templates
changegroup: more **kwargs...
Augie Fackler -
r33678:672ad4f3 default
parent child Browse files
Show More
@@ -1,1011 +1,1011
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullrev,
19 19 short,
20 20 )
21 21
22 22 from . import (
23 23 dagutil,
24 24 discovery,
25 25 error,
26 26 mdiff,
27 27 phases,
28 28 pycompat,
29 29 util,
30 30 )
31 31
32 32 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
33 33 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
34 34 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
35 35
36 36 def readexactly(stream, n):
37 37 '''read n bytes from stream.read and abort if less was available'''
38 38 s = stream.read(n)
39 39 if len(s) < n:
40 40 raise error.Abort(_("stream ended unexpectedly"
41 41 " (got %d bytes, expected %d)")
42 42 % (len(s), n))
43 43 return s
44 44
45 45 def getchunk(stream):
46 46 """return the next chunk from stream as a string"""
47 47 d = readexactly(stream, 4)
48 48 l = struct.unpack(">l", d)[0]
49 49 if l <= 4:
50 50 if l:
51 51 raise error.Abort(_("invalid chunk length %d") % l)
52 52 return ""
53 53 return readexactly(stream, l - 4)
54 54
55 55 def chunkheader(length):
56 56 """return a changegroup chunk header (string)"""
57 57 return struct.pack(">l", length + 4)
58 58
59 59 def closechunk():
60 60 """return a changegroup chunk header (string) for a zero-length chunk"""
61 61 return struct.pack(">l", 0)
62 62
63 63 def writechunks(ui, chunks, filename, vfs=None):
64 64 """Write chunks to a file and return its filename.
65 65
66 66 The stream is assumed to be a bundle file.
67 67 Existing files will not be overwritten.
68 68 If no filename is specified, a temporary file is created.
69 69 """
70 70 fh = None
71 71 cleanup = None
72 72 try:
73 73 if filename:
74 74 if vfs:
75 75 fh = vfs.open(filename, "wb")
76 76 else:
77 77 # Increase default buffer size because default is usually
78 78 # small (4k is common on Linux).
79 79 fh = open(filename, "wb", 131072)
80 80 else:
81 81 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
82 82 fh = os.fdopen(fd, pycompat.sysstr("wb"))
83 83 cleanup = filename
84 84 for c in chunks:
85 85 fh.write(c)
86 86 cleanup = None
87 87 return filename
88 88 finally:
89 89 if fh is not None:
90 90 fh.close()
91 91 if cleanup is not None:
92 92 if filename and vfs:
93 93 vfs.unlink(cleanup)
94 94 else:
95 95 os.unlink(cleanup)
96 96
97 97 class cg1unpacker(object):
98 98 """Unpacker for cg1 changegroup streams.
99 99
100 100 A changegroup unpacker handles the framing of the revision data in
101 101 the wire format. Most consumers will want to use the apply()
102 102 method to add the changes from the changegroup to a repository.
103 103
104 104 If you're forwarding a changegroup unmodified to another consumer,
105 105 use getchunks(), which returns an iterator of changegroup
106 106 chunks. This is mostly useful for cases where you need to know the
107 107 data stream has ended by observing the end of the changegroup.
108 108
109 109 deltachunk() is useful only if you're applying delta data. Most
110 110 consumers should prefer apply() instead.
111 111
112 112 A few other public methods exist. Those are used only for
113 113 bundlerepo and some debug commands - their use is discouraged.
114 114 """
115 115 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
116 116 deltaheadersize = struct.calcsize(deltaheader)
117 117 version = '01'
118 118 _grouplistcount = 1 # One list of files after the manifests
119 119
120 120 def __init__(self, fh, alg, extras=None):
121 121 if alg is None:
122 122 alg = 'UN'
123 123 if alg not in util.compengines.supportedbundletypes:
124 124 raise error.Abort(_('unknown stream compression type: %s')
125 125 % alg)
126 126 if alg == 'BZ':
127 127 alg = '_truncatedBZ'
128 128
129 129 compengine = util.compengines.forbundletype(alg)
130 130 self._stream = compengine.decompressorreader(fh)
131 131 self._type = alg
132 132 self.extras = extras or {}
133 133 self.callback = None
134 134
135 135 # These methods (compressed, read, seek, tell) all appear to only
136 136 # be used by bundlerepo, but it's a little hard to tell.
137 137 def compressed(self):
138 138 return self._type is not None and self._type != 'UN'
139 139 def read(self, l):
140 140 return self._stream.read(l)
141 141 def seek(self, pos):
142 142 return self._stream.seek(pos)
143 143 def tell(self):
144 144 return self._stream.tell()
145 145 def close(self):
146 146 return self._stream.close()
147 147
148 148 def _chunklength(self):
149 149 d = readexactly(self._stream, 4)
150 150 l = struct.unpack(">l", d)[0]
151 151 if l <= 4:
152 152 if l:
153 153 raise error.Abort(_("invalid chunk length %d") % l)
154 154 return 0
155 155 if self.callback:
156 156 self.callback()
157 157 return l - 4
158 158
159 159 def changelogheader(self):
160 160 """v10 does not have a changelog header chunk"""
161 161 return {}
162 162
163 163 def manifestheader(self):
164 164 """v10 does not have a manifest header chunk"""
165 165 return {}
166 166
167 167 def filelogheader(self):
168 168 """return the header of the filelogs chunk, v10 only has the filename"""
169 169 l = self._chunklength()
170 170 if not l:
171 171 return {}
172 172 fname = readexactly(self._stream, l)
173 173 return {'filename': fname}
174 174
175 175 def _deltaheader(self, headertuple, prevnode):
176 176 node, p1, p2, cs = headertuple
177 177 if prevnode is None:
178 178 deltabase = p1
179 179 else:
180 180 deltabase = prevnode
181 181 flags = 0
182 182 return node, p1, p2, deltabase, cs, flags
183 183
184 184 def deltachunk(self, prevnode):
185 185 l = self._chunklength()
186 186 if not l:
187 187 return {}
188 188 headerdata = readexactly(self._stream, self.deltaheadersize)
189 189 header = struct.unpack(self.deltaheader, headerdata)
190 190 delta = readexactly(self._stream, l - self.deltaheadersize)
191 191 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
192 192 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
193 193 'deltabase': deltabase, 'delta': delta, 'flags': flags}
194 194
195 195 def getchunks(self):
196 196 """returns all the chunks contains in the bundle
197 197
198 198 Used when you need to forward the binary stream to a file or another
199 199 network API. To do so, it parse the changegroup data, otherwise it will
200 200 block in case of sshrepo because it don't know the end of the stream.
201 201 """
202 202 # an empty chunkgroup is the end of the changegroup
203 203 # a changegroup has at least 2 chunkgroups (changelog and manifest).
204 204 # after that, changegroup versions 1 and 2 have a series of groups
205 205 # with one group per file. changegroup 3 has a series of directory
206 206 # manifests before the files.
207 207 count = 0
208 208 emptycount = 0
209 209 while emptycount < self._grouplistcount:
210 210 empty = True
211 211 count += 1
212 212 while True:
213 213 chunk = getchunk(self)
214 214 if not chunk:
215 215 if empty and count > 2:
216 216 emptycount += 1
217 217 break
218 218 empty = False
219 219 yield chunkheader(len(chunk))
220 220 pos = 0
221 221 while pos < len(chunk):
222 222 next = pos + 2**20
223 223 yield chunk[pos:next]
224 224 pos = next
225 225 yield closechunk()
226 226
227 227 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
228 228 # We know that we'll never have more manifests than we had
229 229 # changesets.
230 230 self.callback = prog(_('manifests'), numchanges)
231 231 # no need to check for empty manifest group here:
232 232 # if the result of the merge of 1 and 2 is the same in 3 and 4,
233 233 # no new manifest will be created and the manifest group will
234 234 # be empty during the pull
235 235 self.manifestheader()
236 236 repo.manifestlog._revlog.addgroup(self, revmap, trp)
237 237 repo.ui.progress(_('manifests'), None)
238 238 self.callback = None
239 239
240 240 def apply(self, repo, tr, srctype, url, targetphase=phases.draft,
241 241 expectedtotal=None):
242 242 """Add the changegroup returned by source.read() to this repo.
243 243 srctype is a string like 'push', 'pull', or 'unbundle'. url is
244 244 the URL of the repo where this changegroup is coming from.
245 245
246 246 Return an integer summarizing the change to this repo:
247 247 - nothing changed or no source: 0
248 248 - more heads than before: 1+added heads (2..n)
249 249 - fewer heads than before: -1-removed heads (-2..-n)
250 250 - number of heads stays the same: 1
251 251 """
252 252 repo = repo.unfiltered()
253 253 def csmap(x):
254 254 repo.ui.debug("add changeset %s\n" % short(x))
255 255 return len(cl)
256 256
257 257 def revmap(x):
258 258 return cl.rev(x)
259 259
260 260 changesets = files = revisions = 0
261 261
262 262 try:
263 263 # The transaction may already carry source information. In this
264 264 # case we use the top level data. We overwrite the argument
265 265 # because we need to use the top level value (if they exist)
266 266 # in this function.
267 267 srctype = tr.hookargs.setdefault('source', srctype)
268 268 url = tr.hookargs.setdefault('url', url)
269 269 repo.hook('prechangegroup',
270 270 throw=True, **pycompat.strkwargs(tr.hookargs))
271 271
272 272 # write changelog data to temp files so concurrent readers
273 273 # will not see an inconsistent view
274 274 cl = repo.changelog
275 275 cl.delayupdate(tr)
276 276 oldheads = set(cl.heads())
277 277
278 278 trp = weakref.proxy(tr)
279 279 # pull off the changeset group
280 280 repo.ui.status(_("adding changesets\n"))
281 281 clstart = len(cl)
282 282 class prog(object):
283 283 def __init__(self, step, total):
284 284 self._step = step
285 285 self._total = total
286 286 self._count = 1
287 287 def __call__(self):
288 288 repo.ui.progress(self._step, self._count, unit=_('chunks'),
289 289 total=self._total)
290 290 self._count += 1
291 291 self.callback = prog(_('changesets'), expectedtotal)
292 292
293 293 efiles = set()
294 294 def onchangelog(cl, node):
295 295 efiles.update(cl.readfiles(node))
296 296
297 297 self.changelogheader()
298 298 cgnodes = cl.addgroup(self, csmap, trp, addrevisioncb=onchangelog)
299 299 efiles = len(efiles)
300 300
301 301 if not cgnodes:
302 302 repo.ui.develwarn('applied empty changegroup',
303 303 config='empty-changegroup')
304 304 clend = len(cl)
305 305 changesets = clend - clstart
306 306 repo.ui.progress(_('changesets'), None)
307 307 self.callback = None
308 308
309 309 # pull off the manifest group
310 310 repo.ui.status(_("adding manifests\n"))
311 311 self._unpackmanifests(repo, revmap, trp, prog, changesets)
312 312
313 313 needfiles = {}
314 314 if repo.ui.configbool('server', 'validate'):
315 315 cl = repo.changelog
316 316 ml = repo.manifestlog
317 317 # validate incoming csets have their manifests
318 318 for cset in xrange(clstart, clend):
319 319 mfnode = cl.changelogrevision(cset).manifest
320 320 mfest = ml[mfnode].readdelta()
321 321 # store file cgnodes we must see
322 322 for f, n in mfest.iteritems():
323 323 needfiles.setdefault(f, set()).add(n)
324 324
325 325 # process the files
326 326 repo.ui.status(_("adding file changes\n"))
327 327 newrevs, newfiles = _addchangegroupfiles(
328 328 repo, self, revmap, trp, efiles, needfiles)
329 329 revisions += newrevs
330 330 files += newfiles
331 331
332 332 deltaheads = 0
333 333 if oldheads:
334 334 heads = cl.heads()
335 335 deltaheads = len(heads) - len(oldheads)
336 336 for h in heads:
337 337 if h not in oldheads and repo[h].closesbranch():
338 338 deltaheads -= 1
339 339 htext = ""
340 340 if deltaheads:
341 341 htext = _(" (%+d heads)") % deltaheads
342 342
343 343 repo.ui.status(_("added %d changesets"
344 344 " with %d changes to %d files%s\n")
345 345 % (changesets, revisions, files, htext))
346 346 repo.invalidatevolatilesets()
347 347
348 348 if changesets > 0:
349 349 if 'node' not in tr.hookargs:
350 350 tr.hookargs['node'] = hex(cl.node(clstart))
351 351 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
352 352 hookargs = dict(tr.hookargs)
353 353 else:
354 354 hookargs = dict(tr.hookargs)
355 355 hookargs['node'] = hex(cl.node(clstart))
356 356 hookargs['node_last'] = hex(cl.node(clend - 1))
357 357 repo.hook('pretxnchangegroup',
358 358 throw=True, **pycompat.strkwargs(hookargs))
359 359
360 360 added = [cl.node(r) for r in xrange(clstart, clend)]
361 361 phaseall = None
362 362 if srctype in ('push', 'serve'):
363 363 # Old servers can not push the boundary themselves.
364 364 # New servers won't push the boundary if changeset already
365 365 # exists locally as secret
366 366 #
367 367 # We should not use added here but the list of all change in
368 368 # the bundle
369 369 if repo.publishing():
370 370 targetphase = phaseall = phases.public
371 371 else:
372 372 # closer target phase computation
373 373
374 374 # Those changesets have been pushed from the
375 375 # outside, their phases are going to be pushed
376 376 # alongside. Therefor `targetphase` is
377 377 # ignored.
378 378 targetphase = phaseall = phases.draft
379 379 if added:
380 380 phases.registernew(repo, tr, targetphase, added)
381 381 if phaseall is not None:
382 382 phases.advanceboundary(repo, tr, phaseall, cgnodes)
383 383
384 384 if changesets > 0:
385 385
386 386 def runhooks():
387 387 # These hooks run when the lock releases, not when the
388 388 # transaction closes. So it's possible for the changelog
389 389 # to have changed since we last saw it.
390 390 if clstart >= len(repo):
391 391 return
392 392
393 repo.hook("changegroup", **hookargs)
393 repo.hook("changegroup", **pycompat.strkwargs(hookargs))
394 394
395 395 for n in added:
396 396 args = hookargs.copy()
397 397 args['node'] = hex(n)
398 398 del args['node_last']
399 repo.hook("incoming", **args)
399 repo.hook("incoming", **pycompat.strkwargs(args))
400 400
401 401 newheads = [h for h in repo.heads()
402 402 if h not in oldheads]
403 403 repo.ui.log("incoming",
404 404 "%s incoming changes - new heads: %s\n",
405 405 len(added),
406 406 ', '.join([hex(c[:6]) for c in newheads]))
407 407
408 408 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
409 409 lambda tr: repo._afterlock(runhooks))
410 410 finally:
411 411 repo.ui.flush()
412 412 # never return 0 here:
413 413 if deltaheads < 0:
414 414 ret = deltaheads - 1
415 415 else:
416 416 ret = deltaheads + 1
417 417 return ret
418 418
419 419 class cg2unpacker(cg1unpacker):
420 420 """Unpacker for cg2 streams.
421 421
422 422 cg2 streams add support for generaldelta, so the delta header
423 423 format is slightly different. All other features about the data
424 424 remain the same.
425 425 """
426 426 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
427 427 deltaheadersize = struct.calcsize(deltaheader)
428 428 version = '02'
429 429
430 430 def _deltaheader(self, headertuple, prevnode):
431 431 node, p1, p2, deltabase, cs = headertuple
432 432 flags = 0
433 433 return node, p1, p2, deltabase, cs, flags
434 434
435 435 class cg3unpacker(cg2unpacker):
436 436 """Unpacker for cg3 streams.
437 437
438 438 cg3 streams add support for exchanging treemanifests and revlog
439 439 flags. It adds the revlog flags to the delta header and an empty chunk
440 440 separating manifests and files.
441 441 """
442 442 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
443 443 deltaheadersize = struct.calcsize(deltaheader)
444 444 version = '03'
445 445 _grouplistcount = 2 # One list of manifests and one list of files
446 446
447 447 def _deltaheader(self, headertuple, prevnode):
448 448 node, p1, p2, deltabase, cs, flags = headertuple
449 449 return node, p1, p2, deltabase, cs, flags
450 450
451 451 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
452 452 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog,
453 453 numchanges)
454 454 for chunkdata in iter(self.filelogheader, {}):
455 455 # If we get here, there are directory manifests in the changegroup
456 456 d = chunkdata["filename"]
457 457 repo.ui.debug("adding %s revisions\n" % d)
458 458 dirlog = repo.manifestlog._revlog.dirlog(d)
459 459 if not dirlog.addgroup(self, revmap, trp):
460 460 raise error.Abort(_("received dir revlog group is empty"))
461 461
462 462 class headerlessfixup(object):
463 463 def __init__(self, fh, h):
464 464 self._h = h
465 465 self._fh = fh
466 466 def read(self, n):
467 467 if self._h:
468 468 d, self._h = self._h[:n], self._h[n:]
469 469 if len(d) < n:
470 470 d += readexactly(self._fh, n - len(d))
471 471 return d
472 472 return readexactly(self._fh, n)
473 473
474 474 class cg1packer(object):
475 475 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
476 476 version = '01'
477 477 def __init__(self, repo, bundlecaps=None):
478 478 """Given a source repo, construct a bundler.
479 479
480 480 bundlecaps is optional and can be used to specify the set of
481 481 capabilities which can be used to build the bundle. While bundlecaps is
482 482 unused in core Mercurial, extensions rely on this feature to communicate
483 483 capabilities to customize the changegroup packer.
484 484 """
485 485 # Set of capabilities we can use to build the bundle.
486 486 if bundlecaps is None:
487 487 bundlecaps = set()
488 488 self._bundlecaps = bundlecaps
489 489 # experimental config: bundle.reorder
490 490 reorder = repo.ui.config('bundle', 'reorder')
491 491 if reorder == 'auto':
492 492 reorder = None
493 493 else:
494 494 reorder = util.parsebool(reorder)
495 495 self._repo = repo
496 496 self._reorder = reorder
497 497 self._progress = repo.ui.progress
498 498 if self._repo.ui.verbose and not self._repo.ui.debugflag:
499 499 self._verbosenote = self._repo.ui.note
500 500 else:
501 501 self._verbosenote = lambda s: None
502 502
503 503 def close(self):
504 504 return closechunk()
505 505
506 506 def fileheader(self, fname):
507 507 return chunkheader(len(fname)) + fname
508 508
509 509 # Extracted both for clarity and for overriding in extensions.
510 510 def _sortgroup(self, revlog, nodelist, lookup):
511 511 """Sort nodes for change group and turn them into revnums."""
512 512 # for generaldelta revlogs, we linearize the revs; this will both be
513 513 # much quicker and generate a much smaller bundle
514 514 if (revlog._generaldelta and self._reorder is None) or self._reorder:
515 515 dag = dagutil.revlogdag(revlog)
516 516 return dag.linearize(set(revlog.rev(n) for n in nodelist))
517 517 else:
518 518 return sorted([revlog.rev(n) for n in nodelist])
519 519
520 520 def group(self, nodelist, revlog, lookup, units=None):
521 521 """Calculate a delta group, yielding a sequence of changegroup chunks
522 522 (strings).
523 523
524 524 Given a list of changeset revs, return a set of deltas and
525 525 metadata corresponding to nodes. The first delta is
526 526 first parent(nodelist[0]) -> nodelist[0], the receiver is
527 527 guaranteed to have this parent as it has all history before
528 528 these changesets. In the case firstparent is nullrev the
529 529 changegroup starts with a full revision.
530 530
531 531 If units is not None, progress detail will be generated, units specifies
532 532 the type of revlog that is touched (changelog, manifest, etc.).
533 533 """
534 534 # if we don't have any revisions touched by these changesets, bail
535 535 if len(nodelist) == 0:
536 536 yield self.close()
537 537 return
538 538
539 539 revs = self._sortgroup(revlog, nodelist, lookup)
540 540
541 541 # add the parent of the first rev
542 542 p = revlog.parentrevs(revs[0])[0]
543 543 revs.insert(0, p)
544 544
545 545 # build deltas
546 546 total = len(revs) - 1
547 547 msgbundling = _('bundling')
548 548 for r in xrange(len(revs) - 1):
549 549 if units is not None:
550 550 self._progress(msgbundling, r + 1, unit=units, total=total)
551 551 prev, curr = revs[r], revs[r + 1]
552 552 linknode = lookup(revlog.node(curr))
553 553 for c in self.revchunk(revlog, curr, prev, linknode):
554 554 yield c
555 555
556 556 if units is not None:
557 557 self._progress(msgbundling, None)
558 558 yield self.close()
559 559
560 560 # filter any nodes that claim to be part of the known set
561 561 def prune(self, revlog, missing, commonrevs):
562 562 rr, rl = revlog.rev, revlog.linkrev
563 563 return [n for n in missing if rl(rr(n)) not in commonrevs]
564 564
565 565 def _packmanifests(self, dir, mfnodes, lookuplinknode):
566 566 """Pack flat manifests into a changegroup stream."""
567 567 assert not dir
568 568 for chunk in self.group(mfnodes, self._repo.manifestlog._revlog,
569 569 lookuplinknode, units=_('manifests')):
570 570 yield chunk
571 571
572 572 def _manifestsdone(self):
573 573 return ''
574 574
575 575 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
576 576 '''yield a sequence of changegroup chunks (strings)'''
577 577 repo = self._repo
578 578 cl = repo.changelog
579 579
580 580 clrevorder = {}
581 581 mfs = {} # needed manifests
582 582 fnodes = {} # needed file nodes
583 583 changedfiles = set()
584 584
585 585 # Callback for the changelog, used to collect changed files and manifest
586 586 # nodes.
587 587 # Returns the linkrev node (identity in the changelog case).
588 588 def lookupcl(x):
589 589 c = cl.read(x)
590 590 clrevorder[x] = len(clrevorder)
591 591 n = c[0]
592 592 # record the first changeset introducing this manifest version
593 593 mfs.setdefault(n, x)
594 594 # Record a complete list of potentially-changed files in
595 595 # this manifest.
596 596 changedfiles.update(c[3])
597 597 return x
598 598
599 599 self._verbosenote(_('uncompressed size of bundle content:\n'))
600 600 size = 0
601 601 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
602 602 size += len(chunk)
603 603 yield chunk
604 604 self._verbosenote(_('%8.i (changelog)\n') % size)
605 605
606 606 # We need to make sure that the linkrev in the changegroup refers to
607 607 # the first changeset that introduced the manifest or file revision.
608 608 # The fastpath is usually safer than the slowpath, because the filelogs
609 609 # are walked in revlog order.
610 610 #
611 611 # When taking the slowpath with reorder=None and the manifest revlog
612 612 # uses generaldelta, the manifest may be walked in the "wrong" order.
613 613 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
614 614 # cc0ff93d0c0c).
615 615 #
616 616 # When taking the fastpath, we are only vulnerable to reordering
617 617 # of the changelog itself. The changelog never uses generaldelta, so
618 618 # it is only reordered when reorder=True. To handle this case, we
619 619 # simply take the slowpath, which already has the 'clrevorder' logic.
620 620 # This was also fixed in cc0ff93d0c0c.
621 621 fastpathlinkrev = fastpathlinkrev and not self._reorder
622 622 # Treemanifests don't work correctly with fastpathlinkrev
623 623 # either, because we don't discover which directory nodes to
624 624 # send along with files. This could probably be fixed.
625 625 fastpathlinkrev = fastpathlinkrev and (
626 626 'treemanifest' not in repo.requirements)
627 627
628 628 for chunk in self.generatemanifests(commonrevs, clrevorder,
629 629 fastpathlinkrev, mfs, fnodes):
630 630 yield chunk
631 631 mfs.clear()
632 632 clrevs = set(cl.rev(x) for x in clnodes)
633 633
634 634 if not fastpathlinkrev:
635 635 def linknodes(unused, fname):
636 636 return fnodes.get(fname, {})
637 637 else:
638 638 cln = cl.node
639 639 def linknodes(filerevlog, fname):
640 640 llr = filerevlog.linkrev
641 641 fln = filerevlog.node
642 642 revs = ((r, llr(r)) for r in filerevlog)
643 643 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
644 644
645 645 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
646 646 source):
647 647 yield chunk
648 648
649 649 yield self.close()
650 650
651 651 if clnodes:
652 652 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
653 653
654 654 def generatemanifests(self, commonrevs, clrevorder, fastpathlinkrev, mfs,
655 655 fnodes):
656 656 repo = self._repo
657 657 mfl = repo.manifestlog
658 658 dirlog = mfl._revlog.dirlog
659 659 tmfnodes = {'': mfs}
660 660
661 661 # Callback for the manifest, used to collect linkrevs for filelog
662 662 # revisions.
663 663 # Returns the linkrev node (collected in lookupcl).
664 664 def makelookupmflinknode(dir):
665 665 if fastpathlinkrev:
666 666 assert not dir
667 667 return mfs.__getitem__
668 668
669 669 def lookupmflinknode(x):
670 670 """Callback for looking up the linknode for manifests.
671 671
672 672 Returns the linkrev node for the specified manifest.
673 673
674 674 SIDE EFFECT:
675 675
676 676 1) fclnodes gets populated with the list of relevant
677 677 file nodes if we're not using fastpathlinkrev
678 678 2) When treemanifests are in use, collects treemanifest nodes
679 679 to send
680 680
681 681 Note that this means manifests must be completely sent to
682 682 the client before you can trust the list of files and
683 683 treemanifests to send.
684 684 """
685 685 clnode = tmfnodes[dir][x]
686 686 mdata = mfl.get(dir, x).readfast(shallow=True)
687 687 for p, n, fl in mdata.iterentries():
688 688 if fl == 't': # subdirectory manifest
689 689 subdir = dir + p + '/'
690 690 tmfclnodes = tmfnodes.setdefault(subdir, {})
691 691 tmfclnode = tmfclnodes.setdefault(n, clnode)
692 692 if clrevorder[clnode] < clrevorder[tmfclnode]:
693 693 tmfclnodes[n] = clnode
694 694 else:
695 695 f = dir + p
696 696 fclnodes = fnodes.setdefault(f, {})
697 697 fclnode = fclnodes.setdefault(n, clnode)
698 698 if clrevorder[clnode] < clrevorder[fclnode]:
699 699 fclnodes[n] = clnode
700 700 return clnode
701 701 return lookupmflinknode
702 702
703 703 size = 0
704 704 while tmfnodes:
705 705 dir = min(tmfnodes)
706 706 nodes = tmfnodes[dir]
707 707 prunednodes = self.prune(dirlog(dir), nodes, commonrevs)
708 708 if not dir or prunednodes:
709 709 for x in self._packmanifests(dir, prunednodes,
710 710 makelookupmflinknode(dir)):
711 711 size += len(x)
712 712 yield x
713 713 del tmfnodes[dir]
714 714 self._verbosenote(_('%8.i (manifests)\n') % size)
715 715 yield self._manifestsdone()
716 716
717 717 # The 'source' parameter is useful for extensions
718 718 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
719 719 repo = self._repo
720 720 progress = self._progress
721 721 msgbundling = _('bundling')
722 722
723 723 total = len(changedfiles)
724 724 # for progress output
725 725 msgfiles = _('files')
726 726 for i, fname in enumerate(sorted(changedfiles)):
727 727 filerevlog = repo.file(fname)
728 728 if not filerevlog:
729 729 raise error.Abort(_("empty or missing revlog for %s") % fname)
730 730
731 731 linkrevnodes = linknodes(filerevlog, fname)
732 732 # Lookup for filenodes, we collected the linkrev nodes above in the
733 733 # fastpath case and with lookupmf in the slowpath case.
734 734 def lookupfilelog(x):
735 735 return linkrevnodes[x]
736 736
737 737 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
738 738 if filenodes:
739 739 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
740 740 total=total)
741 741 h = self.fileheader(fname)
742 742 size = len(h)
743 743 yield h
744 744 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
745 745 size += len(chunk)
746 746 yield chunk
747 747 self._verbosenote(_('%8.i %s\n') % (size, fname))
748 748 progress(msgbundling, None)
749 749
750 750 def deltaparent(self, revlog, rev, p1, p2, prev):
751 751 return prev
752 752
753 753 def revchunk(self, revlog, rev, prev, linknode):
754 754 node = revlog.node(rev)
755 755 p1, p2 = revlog.parentrevs(rev)
756 756 base = self.deltaparent(revlog, rev, p1, p2, prev)
757 757
758 758 prefix = ''
759 759 if revlog.iscensored(base) or revlog.iscensored(rev):
760 760 try:
761 761 delta = revlog.revision(node, raw=True)
762 762 except error.CensoredNodeError as e:
763 763 delta = e.tombstone
764 764 if base == nullrev:
765 765 prefix = mdiff.trivialdiffheader(len(delta))
766 766 else:
767 767 baselen = revlog.rawsize(base)
768 768 prefix = mdiff.replacediffheader(baselen, len(delta))
769 769 elif base == nullrev:
770 770 delta = revlog.revision(node, raw=True)
771 771 prefix = mdiff.trivialdiffheader(len(delta))
772 772 else:
773 773 delta = revlog.revdiff(base, rev)
774 774 p1n, p2n = revlog.parents(node)
775 775 basenode = revlog.node(base)
776 776 flags = revlog.flags(rev)
777 777 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
778 778 meta += prefix
779 779 l = len(meta) + len(delta)
780 780 yield chunkheader(l)
781 781 yield meta
782 782 yield delta
783 783 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
784 784 # do nothing with basenode, it is implicitly the previous one in HG10
785 785 # do nothing with flags, it is implicitly 0 for cg1 and cg2
786 786 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
787 787
788 788 class cg2packer(cg1packer):
789 789 version = '02'
790 790 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
791 791
792 792 def __init__(self, repo, bundlecaps=None):
793 793 super(cg2packer, self).__init__(repo, bundlecaps)
794 794 if self._reorder is None:
795 795 # Since generaldelta is directly supported by cg2, reordering
796 796 # generally doesn't help, so we disable it by default (treating
797 797 # bundle.reorder=auto just like bundle.reorder=False).
798 798 self._reorder = False
799 799
800 800 def deltaparent(self, revlog, rev, p1, p2, prev):
801 801 dp = revlog.deltaparent(rev)
802 802 if dp == nullrev and revlog.storedeltachains:
803 803 # Avoid sending full revisions when delta parent is null. Pick prev
804 804 # in that case. It's tempting to pick p1 in this case, as p1 will
805 805 # be smaller in the common case. However, computing a delta against
806 806 # p1 may require resolving the raw text of p1, which could be
807 807 # expensive. The revlog caches should have prev cached, meaning
808 808 # less CPU for changegroup generation. There is likely room to add
809 809 # a flag and/or config option to control this behavior.
810 810 return prev
811 811 elif dp == nullrev:
812 812 # revlog is configured to use full snapshot for a reason,
813 813 # stick to full snapshot.
814 814 return nullrev
815 815 elif dp not in (p1, p2, prev):
816 816 # Pick prev when we can't be sure remote has the base revision.
817 817 return prev
818 818 else:
819 819 return dp
820 820
821 821 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
822 822 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
823 823 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
824 824
825 825 class cg3packer(cg2packer):
826 826 version = '03'
827 827 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
828 828
829 829 def _packmanifests(self, dir, mfnodes, lookuplinknode):
830 830 if dir:
831 831 yield self.fileheader(dir)
832 832
833 833 dirlog = self._repo.manifestlog._revlog.dirlog(dir)
834 834 for chunk in self.group(mfnodes, dirlog, lookuplinknode,
835 835 units=_('manifests')):
836 836 yield chunk
837 837
838 838 def _manifestsdone(self):
839 839 return self.close()
840 840
841 841 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
842 842 return struct.pack(
843 843 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
844 844
845 845 _packermap = {'01': (cg1packer, cg1unpacker),
846 846 # cg2 adds support for exchanging generaldelta
847 847 '02': (cg2packer, cg2unpacker),
848 848 # cg3 adds support for exchanging revlog flags and treemanifests
849 849 '03': (cg3packer, cg3unpacker),
850 850 }
851 851
852 852 def allsupportedversions(repo):
853 853 versions = set(_packermap.keys())
854 854 if not (repo.ui.configbool('experimental', 'changegroup3') or
855 855 repo.ui.configbool('experimental', 'treemanifest') or
856 856 'treemanifest' in repo.requirements):
857 857 versions.discard('03')
858 858 return versions
859 859
860 860 # Changegroup versions that can be applied to the repo
861 861 def supportedincomingversions(repo):
862 862 return allsupportedversions(repo)
863 863
864 864 # Changegroup versions that can be created from the repo
865 865 def supportedoutgoingversions(repo):
866 866 versions = allsupportedversions(repo)
867 867 if 'treemanifest' in repo.requirements:
868 868 # Versions 01 and 02 support only flat manifests and it's just too
869 869 # expensive to convert between the flat manifest and tree manifest on
870 870 # the fly. Since tree manifests are hashed differently, all of history
871 871 # would have to be converted. Instead, we simply don't even pretend to
872 872 # support versions 01 and 02.
873 873 versions.discard('01')
874 874 versions.discard('02')
875 875 return versions
876 876
877 877 def safeversion(repo):
878 878 # Finds the smallest version that it's safe to assume clients of the repo
879 879 # will support. For example, all hg versions that support generaldelta also
880 880 # support changegroup 02.
881 881 versions = supportedoutgoingversions(repo)
882 882 if 'generaldelta' in repo.requirements:
883 883 versions.discard('01')
884 884 assert versions
885 885 return min(versions)
886 886
887 887 def getbundler(version, repo, bundlecaps=None):
888 888 assert version in supportedoutgoingversions(repo)
889 889 return _packermap[version][0](repo, bundlecaps)
890 890
891 891 def getunbundler(version, fh, alg, extras=None):
892 892 return _packermap[version][1](fh, alg, extras=extras)
893 893
894 894 def _changegroupinfo(repo, nodes, source):
895 895 if repo.ui.verbose or source == 'bundle':
896 896 repo.ui.status(_("%d changesets found\n") % len(nodes))
897 897 if repo.ui.debugflag:
898 898 repo.ui.debug("list of changesets:\n")
899 899 for node in nodes:
900 900 repo.ui.debug("%s\n" % hex(node))
901 901
902 902 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
903 903 repo = repo.unfiltered()
904 904 commonrevs = outgoing.common
905 905 csets = outgoing.missing
906 906 heads = outgoing.missingheads
907 907 # We go through the fast path if we get told to, or if all (unfiltered
908 908 # heads have been requested (since we then know there all linkrevs will
909 909 # be pulled by the client).
910 910 heads.sort()
911 911 fastpathlinkrev = fastpath or (
912 912 repo.filtername is None and heads == sorted(repo.heads()))
913 913
914 914 repo.hook('preoutgoing', throw=True, source=source)
915 915 _changegroupinfo(repo, csets, source)
916 916 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
917 917
918 918 def getsubset(repo, outgoing, bundler, source, fastpath=False):
919 919 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
920 920 return getunbundler(bundler.version, util.chunkbuffer(gengroup), None,
921 921 {'clcount': len(outgoing.missing)})
922 922
923 923 def changegroupsubset(repo, roots, heads, source, version='01'):
924 924 """Compute a changegroup consisting of all the nodes that are
925 925 descendants of any of the roots and ancestors of any of the heads.
926 926 Return a chunkbuffer object whose read() method will return
927 927 successive changegroup chunks.
928 928
929 929 It is fairly complex as determining which filenodes and which
930 930 manifest nodes need to be included for the changeset to be complete
931 931 is non-trivial.
932 932
933 933 Another wrinkle is doing the reverse, figuring out which changeset in
934 934 the changegroup a particular filenode or manifestnode belongs to.
935 935 """
936 936 outgoing = discovery.outgoing(repo, missingroots=roots, missingheads=heads)
937 937 bundler = getbundler(version, repo)
938 938 return getsubset(repo, outgoing, bundler, source)
939 939
940 940 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
941 941 version='01'):
942 942 """Like getbundle, but taking a discovery.outgoing as an argument.
943 943
944 944 This is only implemented for local repos and reuses potentially
945 945 precomputed sets in outgoing. Returns a raw changegroup generator."""
946 946 if not outgoing.missing:
947 947 return None
948 948 bundler = getbundler(version, repo, bundlecaps)
949 949 return getsubsetraw(repo, outgoing, bundler, source)
950 950
951 951 def getchangegroup(repo, source, outgoing, bundlecaps=None,
952 952 version='01'):
953 953 """Like getbundle, but taking a discovery.outgoing as an argument.
954 954
955 955 This is only implemented for local repos and reuses potentially
956 956 precomputed sets in outgoing."""
957 957 if not outgoing.missing:
958 958 return None
959 959 bundler = getbundler(version, repo, bundlecaps)
960 960 return getsubset(repo, outgoing, bundler, source)
961 961
962 962 def getlocalchangegroup(repo, *args, **kwargs):
963 963 repo.ui.deprecwarn('getlocalchangegroup is deprecated, use getchangegroup',
964 964 '4.3')
965 965 return getchangegroup(repo, *args, **kwargs)
966 966
967 967 def changegroup(repo, basenodes, source):
968 968 # to avoid a race we use changegroupsubset() (issue1320)
969 969 return changegroupsubset(repo, basenodes, repo.heads(), source)
970 970
971 971 def _addchangegroupfiles(repo, source, revmap, trp, expectedfiles, needfiles):
972 972 revisions = 0
973 973 files = 0
974 974 for chunkdata in iter(source.filelogheader, {}):
975 975 files += 1
976 976 f = chunkdata["filename"]
977 977 repo.ui.debug("adding %s revisions\n" % f)
978 978 repo.ui.progress(_('files'), files, unit=_('files'),
979 979 total=expectedfiles)
980 980 fl = repo.file(f)
981 981 o = len(fl)
982 982 try:
983 983 if not fl.addgroup(source, revmap, trp):
984 984 raise error.Abort(_("received file revlog group is empty"))
985 985 except error.CensoredBaseError as e:
986 986 raise error.Abort(_("received delta base is censored: %s") % e)
987 987 revisions += len(fl) - o
988 988 if f in needfiles:
989 989 needs = needfiles[f]
990 990 for new in xrange(o, len(fl)):
991 991 n = fl.node(new)
992 992 if n in needs:
993 993 needs.remove(n)
994 994 else:
995 995 raise error.Abort(
996 996 _("received spurious file revlog entry"))
997 997 if not needs:
998 998 del needfiles[f]
999 999 repo.ui.progress(_('files'), None)
1000 1000
1001 1001 for f, needs in needfiles.iteritems():
1002 1002 fl = repo.file(f)
1003 1003 for n in needs:
1004 1004 try:
1005 1005 fl.rev(n)
1006 1006 except error.LookupError:
1007 1007 raise error.Abort(
1008 1008 _('missing file data for %s:%s - run hg verify') %
1009 1009 (f, hex(n)))
1010 1010
1011 1011 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now