##// END OF EJS Templates
changegroup: rename "dh" to the clearer "deltaheads"...
Martin von Zweigbergk -
r32870:b441296f default
parent child Browse files
Show More
@@ -1,1029 +1,1029 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullrev,
19 19 short,
20 20 )
21 21
22 22 from . import (
23 23 dagutil,
24 24 discovery,
25 25 error,
26 26 mdiff,
27 27 phases,
28 28 pycompat,
29 29 util,
30 30 )
31 31
32 32 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
33 33 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
34 34 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
35 35
36 36 def readexactly(stream, n):
37 37 '''read n bytes from stream.read and abort if less was available'''
38 38 s = stream.read(n)
39 39 if len(s) < n:
40 40 raise error.Abort(_("stream ended unexpectedly"
41 41 " (got %d bytes, expected %d)")
42 42 % (len(s), n))
43 43 return s
44 44
45 45 def getchunk(stream):
46 46 """return the next chunk from stream as a string"""
47 47 d = readexactly(stream, 4)
48 48 l = struct.unpack(">l", d)[0]
49 49 if l <= 4:
50 50 if l:
51 51 raise error.Abort(_("invalid chunk length %d") % l)
52 52 return ""
53 53 return readexactly(stream, l - 4)
54 54
55 55 def chunkheader(length):
56 56 """return a changegroup chunk header (string)"""
57 57 return struct.pack(">l", length + 4)
58 58
59 59 def closechunk():
60 60 """return a changegroup chunk header (string) for a zero-length chunk"""
61 61 return struct.pack(">l", 0)
62 62
63 63 def combineresults(results):
64 64 """logic to combine 0 or more addchangegroup results into one"""
65 65 changedheads = 0
66 66 result = 1
67 67 for ret in results:
68 68 # If any changegroup result is 0, return 0
69 69 if ret == 0:
70 70 result = 0
71 71 break
72 72 if ret < -1:
73 73 changedheads += ret + 1
74 74 elif ret > 1:
75 75 changedheads += ret - 1
76 76 if changedheads > 0:
77 77 result = 1 + changedheads
78 78 elif changedheads < 0:
79 79 result = -1 + changedheads
80 80 return result
81 81
82 82 def writechunks(ui, chunks, filename, vfs=None):
83 83 """Write chunks to a file and return its filename.
84 84
85 85 The stream is assumed to be a bundle file.
86 86 Existing files will not be overwritten.
87 87 If no filename is specified, a temporary file is created.
88 88 """
89 89 fh = None
90 90 cleanup = None
91 91 try:
92 92 if filename:
93 93 if vfs:
94 94 fh = vfs.open(filename, "wb")
95 95 else:
96 96 # Increase default buffer size because default is usually
97 97 # small (4k is common on Linux).
98 98 fh = open(filename, "wb", 131072)
99 99 else:
100 100 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
101 101 fh = os.fdopen(fd, pycompat.sysstr("wb"))
102 102 cleanup = filename
103 103 for c in chunks:
104 104 fh.write(c)
105 105 cleanup = None
106 106 return filename
107 107 finally:
108 108 if fh is not None:
109 109 fh.close()
110 110 if cleanup is not None:
111 111 if filename and vfs:
112 112 vfs.unlink(cleanup)
113 113 else:
114 114 os.unlink(cleanup)
115 115
116 116 class cg1unpacker(object):
117 117 """Unpacker for cg1 changegroup streams.
118 118
119 119 A changegroup unpacker handles the framing of the revision data in
120 120 the wire format. Most consumers will want to use the apply()
121 121 method to add the changes from the changegroup to a repository.
122 122
123 123 If you're forwarding a changegroup unmodified to another consumer,
124 124 use getchunks(), which returns an iterator of changegroup
125 125 chunks. This is mostly useful for cases where you need to know the
126 126 data stream has ended by observing the end of the changegroup.
127 127
128 128 deltachunk() is useful only if you're applying delta data. Most
129 129 consumers should prefer apply() instead.
130 130
131 131 A few other public methods exist. Those are used only for
132 132 bundlerepo and some debug commands - their use is discouraged.
133 133 """
134 134 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
135 135 deltaheadersize = struct.calcsize(deltaheader)
136 136 version = '01'
137 137 _grouplistcount = 1 # One list of files after the manifests
138 138
139 139 def __init__(self, fh, alg, extras=None):
140 140 if alg is None:
141 141 alg = 'UN'
142 142 if alg not in util.compengines.supportedbundletypes:
143 143 raise error.Abort(_('unknown stream compression type: %s')
144 144 % alg)
145 145 if alg == 'BZ':
146 146 alg = '_truncatedBZ'
147 147
148 148 compengine = util.compengines.forbundletype(alg)
149 149 self._stream = compengine.decompressorreader(fh)
150 150 self._type = alg
151 151 self.extras = extras or {}
152 152 self.callback = None
153 153
154 154 # These methods (compressed, read, seek, tell) all appear to only
155 155 # be used by bundlerepo, but it's a little hard to tell.
156 156 def compressed(self):
157 157 return self._type is not None and self._type != 'UN'
158 158 def read(self, l):
159 159 return self._stream.read(l)
160 160 def seek(self, pos):
161 161 return self._stream.seek(pos)
162 162 def tell(self):
163 163 return self._stream.tell()
164 164 def close(self):
165 165 return self._stream.close()
166 166
167 167 def _chunklength(self):
168 168 d = readexactly(self._stream, 4)
169 169 l = struct.unpack(">l", d)[0]
170 170 if l <= 4:
171 171 if l:
172 172 raise error.Abort(_("invalid chunk length %d") % l)
173 173 return 0
174 174 if self.callback:
175 175 self.callback()
176 176 return l - 4
177 177
178 178 def changelogheader(self):
179 179 """v10 does not have a changelog header chunk"""
180 180 return {}
181 181
182 182 def manifestheader(self):
183 183 """v10 does not have a manifest header chunk"""
184 184 return {}
185 185
186 186 def filelogheader(self):
187 187 """return the header of the filelogs chunk, v10 only has the filename"""
188 188 l = self._chunklength()
189 189 if not l:
190 190 return {}
191 191 fname = readexactly(self._stream, l)
192 192 return {'filename': fname}
193 193
194 194 def _deltaheader(self, headertuple, prevnode):
195 195 node, p1, p2, cs = headertuple
196 196 if prevnode is None:
197 197 deltabase = p1
198 198 else:
199 199 deltabase = prevnode
200 200 flags = 0
201 201 return node, p1, p2, deltabase, cs, flags
202 202
203 203 def deltachunk(self, prevnode):
204 204 l = self._chunklength()
205 205 if not l:
206 206 return {}
207 207 headerdata = readexactly(self._stream, self.deltaheadersize)
208 208 header = struct.unpack(self.deltaheader, headerdata)
209 209 delta = readexactly(self._stream, l - self.deltaheadersize)
210 210 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
211 211 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
212 212 'deltabase': deltabase, 'delta': delta, 'flags': flags}
213 213
214 214 def getchunks(self):
215 215 """returns all the chunks contains in the bundle
216 216
217 217 Used when you need to forward the binary stream to a file or another
218 218 network API. To do so, it parse the changegroup data, otherwise it will
219 219 block in case of sshrepo because it don't know the end of the stream.
220 220 """
221 221 # an empty chunkgroup is the end of the changegroup
222 222 # a changegroup has at least 2 chunkgroups (changelog and manifest).
223 223 # after that, changegroup versions 1 and 2 have a series of groups
224 224 # with one group per file. changegroup 3 has a series of directory
225 225 # manifests before the files.
226 226 count = 0
227 227 emptycount = 0
228 228 while emptycount < self._grouplistcount:
229 229 empty = True
230 230 count += 1
231 231 while True:
232 232 chunk = getchunk(self)
233 233 if not chunk:
234 234 if empty and count > 2:
235 235 emptycount += 1
236 236 break
237 237 empty = False
238 238 yield chunkheader(len(chunk))
239 239 pos = 0
240 240 while pos < len(chunk):
241 241 next = pos + 2**20
242 242 yield chunk[pos:next]
243 243 pos = next
244 244 yield closechunk()
245 245
246 246 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
247 247 # We know that we'll never have more manifests than we had
248 248 # changesets.
249 249 self.callback = prog(_('manifests'), numchanges)
250 250 # no need to check for empty manifest group here:
251 251 # if the result of the merge of 1 and 2 is the same in 3 and 4,
252 252 # no new manifest will be created and the manifest group will
253 253 # be empty during the pull
254 254 self.manifestheader()
255 255 repo.manifestlog._revlog.addgroup(self, revmap, trp)
256 256 repo.ui.progress(_('manifests'), None)
257 257 self.callback = None
258 258
259 259 def apply(self, repo, srctype, url, emptyok=False,
260 260 targetphase=phases.draft, expectedtotal=None):
261 261 """Add the changegroup returned by source.read() to this repo.
262 262 srctype is a string like 'push', 'pull', or 'unbundle'. url is
263 263 the URL of the repo where this changegroup is coming from.
264 264
265 265 Return an integer summarizing the change to this repo:
266 266 - nothing changed or no source: 0
267 267 - more heads than before: 1+added heads (2..n)
268 268 - fewer heads than before: -1-removed heads (-2..-n)
269 269 - number of heads stays the same: 1
270 270 """
271 271 repo = repo.unfiltered()
272 272 def csmap(x):
273 273 repo.ui.debug("add changeset %s\n" % short(x))
274 274 return len(cl)
275 275
276 276 def revmap(x):
277 277 return cl.rev(x)
278 278
279 279 changesets = files = revisions = 0
280 280
281 281 try:
282 282 with repo.transaction("\n".join([srctype,
283 283 util.hidepassword(url)])) as tr:
284 284 # The transaction could have been created before and already
285 285 # carries source information. In this case we use the top
286 286 # level data. We overwrite the argument because we need to use
287 287 # the top level value (if they exist) in this function.
288 288 srctype = tr.hookargs.setdefault('source', srctype)
289 289 url = tr.hookargs.setdefault('url', url)
290 290 repo.hook('prechangegroup', throw=True, **tr.hookargs)
291 291
292 292 # write changelog data to temp files so concurrent readers
293 293 # will not see an inconsistent view
294 294 cl = repo.changelog
295 295 cl.delayupdate(tr)
296 296 oldheads = set(cl.heads())
297 297
298 298 trp = weakref.proxy(tr)
299 299 # pull off the changeset group
300 300 repo.ui.status(_("adding changesets\n"))
301 301 clstart = len(cl)
302 302 class prog(object):
303 303 def __init__(self, step, total):
304 304 self._step = step
305 305 self._total = total
306 306 self._count = 1
307 307 def __call__(self):
308 308 repo.ui.progress(self._step, self._count,
309 309 unit=_('chunks'), total=self._total)
310 310 self._count += 1
311 311 self.callback = prog(_('changesets'), expectedtotal)
312 312
313 313 efiles = set()
314 314 def onchangelog(cl, node):
315 315 efiles.update(cl.readfiles(node))
316 316
317 317 self.changelogheader()
318 318 cgnodes = cl.addgroup(self, csmap, trp,
319 319 addrevisioncb=onchangelog)
320 320 efiles = len(efiles)
321 321
322 322 if not (cgnodes or emptyok):
323 323 raise error.Abort(_("received changelog group is empty"))
324 324 clend = len(cl)
325 325 changesets = clend - clstart
326 326 repo.ui.progress(_('changesets'), None)
327 327 self.callback = None
328 328
329 329 # pull off the manifest group
330 330 repo.ui.status(_("adding manifests\n"))
331 331 self._unpackmanifests(repo, revmap, trp, prog, changesets)
332 332
333 333 needfiles = {}
334 334 if repo.ui.configbool('server', 'validate', default=False):
335 335 cl = repo.changelog
336 336 ml = repo.manifestlog
337 337 # validate incoming csets have their manifests
338 338 for cset in xrange(clstart, clend):
339 339 mfnode = cl.changelogrevision(cset).manifest
340 340 mfest = ml[mfnode].readdelta()
341 341 # store file cgnodes we must see
342 342 for f, n in mfest.iteritems():
343 343 needfiles.setdefault(f, set()).add(n)
344 344
345 345 # process the files
346 346 repo.ui.status(_("adding file changes\n"))
347 347 newrevs, newfiles = _addchangegroupfiles(
348 348 repo, self, revmap, trp, efiles, needfiles)
349 349 revisions += newrevs
350 350 files += newfiles
351 351
352 dh = 0
352 deltaheads = 0
353 353 if oldheads:
354 354 heads = cl.heads()
355 dh = len(heads) - len(oldheads)
355 deltaheads = len(heads) - len(oldheads)
356 356 for h in heads:
357 357 if h not in oldheads and repo[h].closesbranch():
358 dh -= 1
358 deltaheads -= 1
359 359 htext = ""
360 if dh:
361 htext = _(" (%+d heads)") % dh
360 if deltaheads:
361 htext = _(" (%+d heads)") % deltaheads
362 362
363 363 repo.ui.status(_("added %d changesets"
364 364 " with %d changes to %d files%s\n")
365 365 % (changesets, revisions, files, htext))
366 366 repo.invalidatevolatilesets()
367 367
368 368 if changesets > 0:
369 369 if 'node' not in tr.hookargs:
370 370 tr.hookargs['node'] = hex(cl.node(clstart))
371 371 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
372 372 hookargs = dict(tr.hookargs)
373 373 else:
374 374 hookargs = dict(tr.hookargs)
375 375 hookargs['node'] = hex(cl.node(clstart))
376 376 hookargs['node_last'] = hex(cl.node(clend - 1))
377 377 repo.hook('pretxnchangegroup', throw=True, **hookargs)
378 378
379 379 added = [cl.node(r) for r in xrange(clstart, clend)]
380 380 publishing = repo.publishing()
381 381 if srctype in ('push', 'serve'):
382 382 # Old servers can not push the boundary themselves.
383 383 # New servers won't push the boundary if changeset already
384 384 # exists locally as secret
385 385 #
386 386 # We should not use added here but the list of all change in
387 387 # the bundle
388 388 if publishing:
389 389 phases.advanceboundary(repo, tr, phases.public, cgnodes)
390 390 else:
391 391 # Those changesets have been pushed from the
392 392 # outside, their phases are going to be pushed
393 393 # alongside. Therefor `targetphase` is
394 394 # ignored.
395 395 phases.advanceboundary(repo, tr, phases.draft, cgnodes)
396 396 phases.retractboundary(repo, tr, phases.draft, added)
397 397 elif srctype != 'strip':
398 398 # publishing only alter behavior during push
399 399 #
400 400 # strip should not touch boundary at all
401 401 phases.retractboundary(repo, tr, targetphase, added)
402 402
403 403 if changesets > 0:
404 404
405 405 def runhooks():
406 406 # These hooks run when the lock releases, not when the
407 407 # transaction closes. So it's possible for the changelog
408 408 # to have changed since we last saw it.
409 409 if clstart >= len(repo):
410 410 return
411 411
412 412 repo.hook("changegroup", **hookargs)
413 413
414 414 for n in added:
415 415 args = hookargs.copy()
416 416 args['node'] = hex(n)
417 417 del args['node_last']
418 418 repo.hook("incoming", **args)
419 419
420 420 newheads = [h for h in repo.heads()
421 421 if h not in oldheads]
422 422 repo.ui.log("incoming",
423 423 "%s incoming changes - new heads: %s\n",
424 424 len(added),
425 425 ', '.join([hex(c[:6]) for c in newheads]))
426 426
427 427 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
428 428 lambda tr: repo._afterlock(runhooks))
429 429 finally:
430 430 repo.ui.flush()
431 431 # never return 0 here:
432 if dh < 0:
433 return dh - 1
432 if deltaheads < 0:
433 return deltaheads - 1
434 434 else:
435 return dh + 1
435 return deltaheads + 1
436 436
437 437 class cg2unpacker(cg1unpacker):
438 438 """Unpacker for cg2 streams.
439 439
440 440 cg2 streams add support for generaldelta, so the delta header
441 441 format is slightly different. All other features about the data
442 442 remain the same.
443 443 """
444 444 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
445 445 deltaheadersize = struct.calcsize(deltaheader)
446 446 version = '02'
447 447
448 448 def _deltaheader(self, headertuple, prevnode):
449 449 node, p1, p2, deltabase, cs = headertuple
450 450 flags = 0
451 451 return node, p1, p2, deltabase, cs, flags
452 452
453 453 class cg3unpacker(cg2unpacker):
454 454 """Unpacker for cg3 streams.
455 455
456 456 cg3 streams add support for exchanging treemanifests and revlog
457 457 flags. It adds the revlog flags to the delta header and an empty chunk
458 458 separating manifests and files.
459 459 """
460 460 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
461 461 deltaheadersize = struct.calcsize(deltaheader)
462 462 version = '03'
463 463 _grouplistcount = 2 # One list of manifests and one list of files
464 464
465 465 def _deltaheader(self, headertuple, prevnode):
466 466 node, p1, p2, deltabase, cs, flags = headertuple
467 467 return node, p1, p2, deltabase, cs, flags
468 468
469 469 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
470 470 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog,
471 471 numchanges)
472 472 for chunkdata in iter(self.filelogheader, {}):
473 473 # If we get here, there are directory manifests in the changegroup
474 474 d = chunkdata["filename"]
475 475 repo.ui.debug("adding %s revisions\n" % d)
476 476 dirlog = repo.manifestlog._revlog.dirlog(d)
477 477 if not dirlog.addgroup(self, revmap, trp):
478 478 raise error.Abort(_("received dir revlog group is empty"))
479 479
480 480 class headerlessfixup(object):
481 481 def __init__(self, fh, h):
482 482 self._h = h
483 483 self._fh = fh
484 484 def read(self, n):
485 485 if self._h:
486 486 d, self._h = self._h[:n], self._h[n:]
487 487 if len(d) < n:
488 488 d += readexactly(self._fh, n - len(d))
489 489 return d
490 490 return readexactly(self._fh, n)
491 491
492 492 class cg1packer(object):
493 493 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
494 494 version = '01'
495 495 def __init__(self, repo, bundlecaps=None):
496 496 """Given a source repo, construct a bundler.
497 497
498 498 bundlecaps is optional and can be used to specify the set of
499 499 capabilities which can be used to build the bundle. While bundlecaps is
500 500 unused in core Mercurial, extensions rely on this feature to communicate
501 501 capabilities to customize the changegroup packer.
502 502 """
503 503 # Set of capabilities we can use to build the bundle.
504 504 if bundlecaps is None:
505 505 bundlecaps = set()
506 506 self._bundlecaps = bundlecaps
507 507 # experimental config: bundle.reorder
508 508 reorder = repo.ui.config('bundle', 'reorder', 'auto')
509 509 if reorder == 'auto':
510 510 reorder = None
511 511 else:
512 512 reorder = util.parsebool(reorder)
513 513 self._repo = repo
514 514 self._reorder = reorder
515 515 self._progress = repo.ui.progress
516 516 if self._repo.ui.verbose and not self._repo.ui.debugflag:
517 517 self._verbosenote = self._repo.ui.note
518 518 else:
519 519 self._verbosenote = lambda s: None
520 520
521 521 def close(self):
522 522 return closechunk()
523 523
524 524 def fileheader(self, fname):
525 525 return chunkheader(len(fname)) + fname
526 526
527 527 # Extracted both for clarity and for overriding in extensions.
528 528 def _sortgroup(self, revlog, nodelist, lookup):
529 529 """Sort nodes for change group and turn them into revnums."""
530 530 # for generaldelta revlogs, we linearize the revs; this will both be
531 531 # much quicker and generate a much smaller bundle
532 532 if (revlog._generaldelta and self._reorder is None) or self._reorder:
533 533 dag = dagutil.revlogdag(revlog)
534 534 return dag.linearize(set(revlog.rev(n) for n in nodelist))
535 535 else:
536 536 return sorted([revlog.rev(n) for n in nodelist])
537 537
538 538 def group(self, nodelist, revlog, lookup, units=None):
539 539 """Calculate a delta group, yielding a sequence of changegroup chunks
540 540 (strings).
541 541
542 542 Given a list of changeset revs, return a set of deltas and
543 543 metadata corresponding to nodes. The first delta is
544 544 first parent(nodelist[0]) -> nodelist[0], the receiver is
545 545 guaranteed to have this parent as it has all history before
546 546 these changesets. In the case firstparent is nullrev the
547 547 changegroup starts with a full revision.
548 548
549 549 If units is not None, progress detail will be generated, units specifies
550 550 the type of revlog that is touched (changelog, manifest, etc.).
551 551 """
552 552 # if we don't have any revisions touched by these changesets, bail
553 553 if len(nodelist) == 0:
554 554 yield self.close()
555 555 return
556 556
557 557 revs = self._sortgroup(revlog, nodelist, lookup)
558 558
559 559 # add the parent of the first rev
560 560 p = revlog.parentrevs(revs[0])[0]
561 561 revs.insert(0, p)
562 562
563 563 # build deltas
564 564 total = len(revs) - 1
565 565 msgbundling = _('bundling')
566 566 for r in xrange(len(revs) - 1):
567 567 if units is not None:
568 568 self._progress(msgbundling, r + 1, unit=units, total=total)
569 569 prev, curr = revs[r], revs[r + 1]
570 570 linknode = lookup(revlog.node(curr))
571 571 for c in self.revchunk(revlog, curr, prev, linknode):
572 572 yield c
573 573
574 574 if units is not None:
575 575 self._progress(msgbundling, None)
576 576 yield self.close()
577 577
578 578 # filter any nodes that claim to be part of the known set
579 579 def prune(self, revlog, missing, commonrevs):
580 580 rr, rl = revlog.rev, revlog.linkrev
581 581 return [n for n in missing if rl(rr(n)) not in commonrevs]
582 582
583 583 def _packmanifests(self, dir, mfnodes, lookuplinknode):
584 584 """Pack flat manifests into a changegroup stream."""
585 585 assert not dir
586 586 for chunk in self.group(mfnodes, self._repo.manifestlog._revlog,
587 587 lookuplinknode, units=_('manifests')):
588 588 yield chunk
589 589
590 590 def _manifestsdone(self):
591 591 return ''
592 592
593 593 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
594 594 '''yield a sequence of changegroup chunks (strings)'''
595 595 repo = self._repo
596 596 cl = repo.changelog
597 597
598 598 clrevorder = {}
599 599 mfs = {} # needed manifests
600 600 fnodes = {} # needed file nodes
601 601 changedfiles = set()
602 602
603 603 # Callback for the changelog, used to collect changed files and manifest
604 604 # nodes.
605 605 # Returns the linkrev node (identity in the changelog case).
606 606 def lookupcl(x):
607 607 c = cl.read(x)
608 608 clrevorder[x] = len(clrevorder)
609 609 n = c[0]
610 610 # record the first changeset introducing this manifest version
611 611 mfs.setdefault(n, x)
612 612 # Record a complete list of potentially-changed files in
613 613 # this manifest.
614 614 changedfiles.update(c[3])
615 615 return x
616 616
617 617 self._verbosenote(_('uncompressed size of bundle content:\n'))
618 618 size = 0
619 619 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
620 620 size += len(chunk)
621 621 yield chunk
622 622 self._verbosenote(_('%8.i (changelog)\n') % size)
623 623
624 624 # We need to make sure that the linkrev in the changegroup refers to
625 625 # the first changeset that introduced the manifest or file revision.
626 626 # The fastpath is usually safer than the slowpath, because the filelogs
627 627 # are walked in revlog order.
628 628 #
629 629 # When taking the slowpath with reorder=None and the manifest revlog
630 630 # uses generaldelta, the manifest may be walked in the "wrong" order.
631 631 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
632 632 # cc0ff93d0c0c).
633 633 #
634 634 # When taking the fastpath, we are only vulnerable to reordering
635 635 # of the changelog itself. The changelog never uses generaldelta, so
636 636 # it is only reordered when reorder=True. To handle this case, we
637 637 # simply take the slowpath, which already has the 'clrevorder' logic.
638 638 # This was also fixed in cc0ff93d0c0c.
639 639 fastpathlinkrev = fastpathlinkrev and not self._reorder
640 640 # Treemanifests don't work correctly with fastpathlinkrev
641 641 # either, because we don't discover which directory nodes to
642 642 # send along with files. This could probably be fixed.
643 643 fastpathlinkrev = fastpathlinkrev and (
644 644 'treemanifest' not in repo.requirements)
645 645
646 646 for chunk in self.generatemanifests(commonrevs, clrevorder,
647 647 fastpathlinkrev, mfs, fnodes):
648 648 yield chunk
649 649 mfs.clear()
650 650 clrevs = set(cl.rev(x) for x in clnodes)
651 651
652 652 if not fastpathlinkrev:
653 653 def linknodes(unused, fname):
654 654 return fnodes.get(fname, {})
655 655 else:
656 656 cln = cl.node
657 657 def linknodes(filerevlog, fname):
658 658 llr = filerevlog.linkrev
659 659 fln = filerevlog.node
660 660 revs = ((r, llr(r)) for r in filerevlog)
661 661 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
662 662
663 663 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
664 664 source):
665 665 yield chunk
666 666
667 667 yield self.close()
668 668
669 669 if clnodes:
670 670 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
671 671
672 672 def generatemanifests(self, commonrevs, clrevorder, fastpathlinkrev, mfs,
673 673 fnodes):
674 674 repo = self._repo
675 675 mfl = repo.manifestlog
676 676 dirlog = mfl._revlog.dirlog
677 677 tmfnodes = {'': mfs}
678 678
679 679 # Callback for the manifest, used to collect linkrevs for filelog
680 680 # revisions.
681 681 # Returns the linkrev node (collected in lookupcl).
682 682 def makelookupmflinknode(dir):
683 683 if fastpathlinkrev:
684 684 assert not dir
685 685 return mfs.__getitem__
686 686
687 687 def lookupmflinknode(x):
688 688 """Callback for looking up the linknode for manifests.
689 689
690 690 Returns the linkrev node for the specified manifest.
691 691
692 692 SIDE EFFECT:
693 693
694 694 1) fclnodes gets populated with the list of relevant
695 695 file nodes if we're not using fastpathlinkrev
696 696 2) When treemanifests are in use, collects treemanifest nodes
697 697 to send
698 698
699 699 Note that this means manifests must be completely sent to
700 700 the client before you can trust the list of files and
701 701 treemanifests to send.
702 702 """
703 703 clnode = tmfnodes[dir][x]
704 704 mdata = mfl.get(dir, x).readfast(shallow=True)
705 705 for p, n, fl in mdata.iterentries():
706 706 if fl == 't': # subdirectory manifest
707 707 subdir = dir + p + '/'
708 708 tmfclnodes = tmfnodes.setdefault(subdir, {})
709 709 tmfclnode = tmfclnodes.setdefault(n, clnode)
710 710 if clrevorder[clnode] < clrevorder[tmfclnode]:
711 711 tmfclnodes[n] = clnode
712 712 else:
713 713 f = dir + p
714 714 fclnodes = fnodes.setdefault(f, {})
715 715 fclnode = fclnodes.setdefault(n, clnode)
716 716 if clrevorder[clnode] < clrevorder[fclnode]:
717 717 fclnodes[n] = clnode
718 718 return clnode
719 719 return lookupmflinknode
720 720
721 721 size = 0
722 722 while tmfnodes:
723 723 dir = min(tmfnodes)
724 724 nodes = tmfnodes[dir]
725 725 prunednodes = self.prune(dirlog(dir), nodes, commonrevs)
726 726 if not dir or prunednodes:
727 727 for x in self._packmanifests(dir, prunednodes,
728 728 makelookupmflinknode(dir)):
729 729 size += len(x)
730 730 yield x
731 731 del tmfnodes[dir]
732 732 self._verbosenote(_('%8.i (manifests)\n') % size)
733 733 yield self._manifestsdone()
734 734
735 735 # The 'source' parameter is useful for extensions
736 736 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
737 737 repo = self._repo
738 738 progress = self._progress
739 739 msgbundling = _('bundling')
740 740
741 741 total = len(changedfiles)
742 742 # for progress output
743 743 msgfiles = _('files')
744 744 for i, fname in enumerate(sorted(changedfiles)):
745 745 filerevlog = repo.file(fname)
746 746 if not filerevlog:
747 747 raise error.Abort(_("empty or missing revlog for %s") % fname)
748 748
749 749 linkrevnodes = linknodes(filerevlog, fname)
750 750 # Lookup for filenodes, we collected the linkrev nodes above in the
751 751 # fastpath case and with lookupmf in the slowpath case.
752 752 def lookupfilelog(x):
753 753 return linkrevnodes[x]
754 754
755 755 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
756 756 if filenodes:
757 757 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
758 758 total=total)
759 759 h = self.fileheader(fname)
760 760 size = len(h)
761 761 yield h
762 762 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
763 763 size += len(chunk)
764 764 yield chunk
765 765 self._verbosenote(_('%8.i %s\n') % (size, fname))
766 766 progress(msgbundling, None)
767 767
768 768 def deltaparent(self, revlog, rev, p1, p2, prev):
769 769 return prev
770 770
771 771 def revchunk(self, revlog, rev, prev, linknode):
772 772 node = revlog.node(rev)
773 773 p1, p2 = revlog.parentrevs(rev)
774 774 base = self.deltaparent(revlog, rev, p1, p2, prev)
775 775
776 776 prefix = ''
777 777 if revlog.iscensored(base) or revlog.iscensored(rev):
778 778 try:
779 779 delta = revlog.revision(node, raw=True)
780 780 except error.CensoredNodeError as e:
781 781 delta = e.tombstone
782 782 if base == nullrev:
783 783 prefix = mdiff.trivialdiffheader(len(delta))
784 784 else:
785 785 baselen = revlog.rawsize(base)
786 786 prefix = mdiff.replacediffheader(baselen, len(delta))
787 787 elif base == nullrev:
788 788 delta = revlog.revision(node, raw=True)
789 789 prefix = mdiff.trivialdiffheader(len(delta))
790 790 else:
791 791 delta = revlog.revdiff(base, rev)
792 792 p1n, p2n = revlog.parents(node)
793 793 basenode = revlog.node(base)
794 794 flags = revlog.flags(rev)
795 795 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
796 796 meta += prefix
797 797 l = len(meta) + len(delta)
798 798 yield chunkheader(l)
799 799 yield meta
800 800 yield delta
801 801 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
802 802 # do nothing with basenode, it is implicitly the previous one in HG10
803 803 # do nothing with flags, it is implicitly 0 for cg1 and cg2
804 804 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
805 805
806 806 class cg2packer(cg1packer):
807 807 version = '02'
808 808 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
809 809
810 810 def __init__(self, repo, bundlecaps=None):
811 811 super(cg2packer, self).__init__(repo, bundlecaps)
812 812 if self._reorder is None:
813 813 # Since generaldelta is directly supported by cg2, reordering
814 814 # generally doesn't help, so we disable it by default (treating
815 815 # bundle.reorder=auto just like bundle.reorder=False).
816 816 self._reorder = False
817 817
818 818 def deltaparent(self, revlog, rev, p1, p2, prev):
819 819 dp = revlog.deltaparent(rev)
820 820 if dp == nullrev and revlog.storedeltachains:
821 821 # Avoid sending full revisions when delta parent is null. Pick prev
822 822 # in that case. It's tempting to pick p1 in this case, as p1 will
823 823 # be smaller in the common case. However, computing a delta against
824 824 # p1 may require resolving the raw text of p1, which could be
825 825 # expensive. The revlog caches should have prev cached, meaning
826 826 # less CPU for changegroup generation. There is likely room to add
827 827 # a flag and/or config option to control this behavior.
828 828 return prev
829 829 elif dp == nullrev:
830 830 # revlog is configured to use full snapshot for a reason,
831 831 # stick to full snapshot.
832 832 return nullrev
833 833 elif dp not in (p1, p2, prev):
834 834 # Pick prev when we can't be sure remote has the base revision.
835 835 return prev
836 836 else:
837 837 return dp
838 838
839 839 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
840 840 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
841 841 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
842 842
843 843 class cg3packer(cg2packer):
844 844 version = '03'
845 845 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
846 846
847 847 def _packmanifests(self, dir, mfnodes, lookuplinknode):
848 848 if dir:
849 849 yield self.fileheader(dir)
850 850
851 851 dirlog = self._repo.manifestlog._revlog.dirlog(dir)
852 852 for chunk in self.group(mfnodes, dirlog, lookuplinknode,
853 853 units=_('manifests')):
854 854 yield chunk
855 855
856 856 def _manifestsdone(self):
857 857 return self.close()
858 858
859 859 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
860 860 return struct.pack(
861 861 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
862 862
863 863 _packermap = {'01': (cg1packer, cg1unpacker),
864 864 # cg2 adds support for exchanging generaldelta
865 865 '02': (cg2packer, cg2unpacker),
866 866 # cg3 adds support for exchanging revlog flags and treemanifests
867 867 '03': (cg3packer, cg3unpacker),
868 868 }
869 869
870 870 def allsupportedversions(repo):
871 871 versions = set(_packermap.keys())
872 872 if not (repo.ui.configbool('experimental', 'changegroup3') or
873 873 repo.ui.configbool('experimental', 'treemanifest') or
874 874 'treemanifest' in repo.requirements):
875 875 versions.discard('03')
876 876 return versions
877 877
878 878 # Changegroup versions that can be applied to the repo
879 879 def supportedincomingversions(repo):
880 880 return allsupportedversions(repo)
881 881
882 882 # Changegroup versions that can be created from the repo
883 883 def supportedoutgoingversions(repo):
884 884 versions = allsupportedversions(repo)
885 885 if 'treemanifest' in repo.requirements:
886 886 # Versions 01 and 02 support only flat manifests and it's just too
887 887 # expensive to convert between the flat manifest and tree manifest on
888 888 # the fly. Since tree manifests are hashed differently, all of history
889 889 # would have to be converted. Instead, we simply don't even pretend to
890 890 # support versions 01 and 02.
891 891 versions.discard('01')
892 892 versions.discard('02')
893 893 return versions
894 894
895 895 def safeversion(repo):
896 896 # Finds the smallest version that it's safe to assume clients of the repo
897 897 # will support. For example, all hg versions that support generaldelta also
898 898 # support changegroup 02.
899 899 versions = supportedoutgoingversions(repo)
900 900 if 'generaldelta' in repo.requirements:
901 901 versions.discard('01')
902 902 assert versions
903 903 return min(versions)
904 904
905 905 def getbundler(version, repo, bundlecaps=None):
906 906 assert version in supportedoutgoingversions(repo)
907 907 return _packermap[version][0](repo, bundlecaps)
908 908
909 909 def getunbundler(version, fh, alg, extras=None):
910 910 return _packermap[version][1](fh, alg, extras=extras)
911 911
912 912 def _changegroupinfo(repo, nodes, source):
913 913 if repo.ui.verbose or source == 'bundle':
914 914 repo.ui.status(_("%d changesets found\n") % len(nodes))
915 915 if repo.ui.debugflag:
916 916 repo.ui.debug("list of changesets:\n")
917 917 for node in nodes:
918 918 repo.ui.debug("%s\n" % hex(node))
919 919
920 920 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
921 921 repo = repo.unfiltered()
922 922 commonrevs = outgoing.common
923 923 csets = outgoing.missing
924 924 heads = outgoing.missingheads
925 925 # We go through the fast path if we get told to, or if all (unfiltered
926 926 # heads have been requested (since we then know there all linkrevs will
927 927 # be pulled by the client).
928 928 heads.sort()
929 929 fastpathlinkrev = fastpath or (
930 930 repo.filtername is None and heads == sorted(repo.heads()))
931 931
932 932 repo.hook('preoutgoing', throw=True, source=source)
933 933 _changegroupinfo(repo, csets, source)
934 934 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
935 935
936 936 def getsubset(repo, outgoing, bundler, source, fastpath=False):
937 937 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
938 938 return getunbundler(bundler.version, util.chunkbuffer(gengroup), None,
939 939 {'clcount': len(outgoing.missing)})
940 940
941 941 def changegroupsubset(repo, roots, heads, source, version='01'):
942 942 """Compute a changegroup consisting of all the nodes that are
943 943 descendants of any of the roots and ancestors of any of the heads.
944 944 Return a chunkbuffer object whose read() method will return
945 945 successive changegroup chunks.
946 946
947 947 It is fairly complex as determining which filenodes and which
948 948 manifest nodes need to be included for the changeset to be complete
949 949 is non-trivial.
950 950
951 951 Another wrinkle is doing the reverse, figuring out which changeset in
952 952 the changegroup a particular filenode or manifestnode belongs to.
953 953 """
954 954 outgoing = discovery.outgoing(repo, missingroots=roots, missingheads=heads)
955 955 bundler = getbundler(version, repo)
956 956 return getsubset(repo, outgoing, bundler, source)
957 957
958 958 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
959 959 version='01'):
960 960 """Like getbundle, but taking a discovery.outgoing as an argument.
961 961
962 962 This is only implemented for local repos and reuses potentially
963 963 precomputed sets in outgoing. Returns a raw changegroup generator."""
964 964 if not outgoing.missing:
965 965 return None
966 966 bundler = getbundler(version, repo, bundlecaps)
967 967 return getsubsetraw(repo, outgoing, bundler, source)
968 968
969 969 def getchangegroup(repo, source, outgoing, bundlecaps=None,
970 970 version='01'):
971 971 """Like getbundle, but taking a discovery.outgoing as an argument.
972 972
973 973 This is only implemented for local repos and reuses potentially
974 974 precomputed sets in outgoing."""
975 975 if not outgoing.missing:
976 976 return None
977 977 bundler = getbundler(version, repo, bundlecaps)
978 978 return getsubset(repo, outgoing, bundler, source)
979 979
980 980 def getlocalchangegroup(repo, *args, **kwargs):
981 981 repo.ui.deprecwarn('getlocalchangegroup is deprecated, use getchangegroup',
982 982 '4.3')
983 983 return getchangegroup(repo, *args, **kwargs)
984 984
985 985 def changegroup(repo, basenodes, source):
986 986 # to avoid a race we use changegroupsubset() (issue1320)
987 987 return changegroupsubset(repo, basenodes, repo.heads(), source)
988 988
989 989 def _addchangegroupfiles(repo, source, revmap, trp, expectedfiles, needfiles):
990 990 revisions = 0
991 991 files = 0
992 992 for chunkdata in iter(source.filelogheader, {}):
993 993 files += 1
994 994 f = chunkdata["filename"]
995 995 repo.ui.debug("adding %s revisions\n" % f)
996 996 repo.ui.progress(_('files'), files, unit=_('files'),
997 997 total=expectedfiles)
998 998 fl = repo.file(f)
999 999 o = len(fl)
1000 1000 try:
1001 1001 if not fl.addgroup(source, revmap, trp):
1002 1002 raise error.Abort(_("received file revlog group is empty"))
1003 1003 except error.CensoredBaseError as e:
1004 1004 raise error.Abort(_("received delta base is censored: %s") % e)
1005 1005 revisions += len(fl) - o
1006 1006 if f in needfiles:
1007 1007 needs = needfiles[f]
1008 1008 for new in xrange(o, len(fl)):
1009 1009 n = fl.node(new)
1010 1010 if n in needs:
1011 1011 needs.remove(n)
1012 1012 else:
1013 1013 raise error.Abort(
1014 1014 _("received spurious file revlog entry"))
1015 1015 if not needs:
1016 1016 del needfiles[f]
1017 1017 repo.ui.progress(_('files'), None)
1018 1018
1019 1019 for f, needs in needfiles.iteritems():
1020 1020 fl = repo.file(f)
1021 1021 for n in needs:
1022 1022 try:
1023 1023 fl.rev(n)
1024 1024 except error.LookupError:
1025 1025 raise error.Abort(
1026 1026 _('missing file data for %s:%s - run hg verify') %
1027 1027 (f, hex(n)))
1028 1028
1029 1029 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now