##// END OF EJS Templates
changegroup: move manifest unpacking into its own method...
Augie Fackler -
r26712:04176eaf default
parent child Browse files
Show More
@@ -1,946 +1,950 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullid,
19 19 nullrev,
20 20 short,
21 21 )
22 22
23 23 from . import (
24 24 branchmap,
25 25 dagutil,
26 26 discovery,
27 27 error,
28 28 mdiff,
29 29 phases,
30 30 util,
31 31 )
32 32
33 33 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
34 34 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
35 35
36 36 def readexactly(stream, n):
37 37 '''read n bytes from stream.read and abort if less was available'''
38 38 s = stream.read(n)
39 39 if len(s) < n:
40 40 raise error.Abort(_("stream ended unexpectedly"
41 41 " (got %d bytes, expected %d)")
42 42 % (len(s), n))
43 43 return s
44 44
45 45 def getchunk(stream):
46 46 """return the next chunk from stream as a string"""
47 47 d = readexactly(stream, 4)
48 48 l = struct.unpack(">l", d)[0]
49 49 if l <= 4:
50 50 if l:
51 51 raise error.Abort(_("invalid chunk length %d") % l)
52 52 return ""
53 53 return readexactly(stream, l - 4)
54 54
55 55 def chunkheader(length):
56 56 """return a changegroup chunk header (string)"""
57 57 return struct.pack(">l", length + 4)
58 58
59 59 def closechunk():
60 60 """return a changegroup chunk header (string) for a zero-length chunk"""
61 61 return struct.pack(">l", 0)
62 62
63 63 def combineresults(results):
64 64 """logic to combine 0 or more addchangegroup results into one"""
65 65 changedheads = 0
66 66 result = 1
67 67 for ret in results:
68 68 # If any changegroup result is 0, return 0
69 69 if ret == 0:
70 70 result = 0
71 71 break
72 72 if ret < -1:
73 73 changedheads += ret + 1
74 74 elif ret > 1:
75 75 changedheads += ret - 1
76 76 if changedheads > 0:
77 77 result = 1 + changedheads
78 78 elif changedheads < 0:
79 79 result = -1 + changedheads
80 80 return result
81 81
82 82 bundletypes = {
83 83 "": ("", None), # only when using unbundle on ssh and old http servers
84 84 # since the unification ssh accepts a header but there
85 85 # is no capability signaling it.
86 86 "HG20": (), # special-cased below
87 87 "HG10UN": ("HG10UN", None),
88 88 "HG10BZ": ("HG10", 'BZ'),
89 89 "HG10GZ": ("HG10GZ", 'GZ'),
90 90 }
91 91
92 92 # hgweb uses this list to communicate its preferred type
93 93 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
94 94
95 95 def writechunks(ui, chunks, filename, vfs=None):
96 96 """Write chunks to a file and return its filename.
97 97
98 98 The stream is assumed to be a bundle file.
99 99 Existing files will not be overwritten.
100 100 If no filename is specified, a temporary file is created.
101 101 """
102 102 fh = None
103 103 cleanup = None
104 104 try:
105 105 if filename:
106 106 if vfs:
107 107 fh = vfs.open(filename, "wb")
108 108 else:
109 109 fh = open(filename, "wb")
110 110 else:
111 111 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
112 112 fh = os.fdopen(fd, "wb")
113 113 cleanup = filename
114 114 for c in chunks:
115 115 fh.write(c)
116 116 cleanup = None
117 117 return filename
118 118 finally:
119 119 if fh is not None:
120 120 fh.close()
121 121 if cleanup is not None:
122 122 if filename and vfs:
123 123 vfs.unlink(cleanup)
124 124 else:
125 125 os.unlink(cleanup)
126 126
127 127 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
128 128 """Write a bundle file and return its filename.
129 129
130 130 Existing files will not be overwritten.
131 131 If no filename is specified, a temporary file is created.
132 132 bz2 compression can be turned off.
133 133 The bundle file will be deleted in case of errors.
134 134 """
135 135
136 136 if bundletype == "HG20":
137 137 from . import bundle2
138 138 bundle = bundle2.bundle20(ui)
139 139 bundle.setcompression(compression)
140 140 part = bundle.newpart('changegroup', data=cg.getchunks())
141 141 part.addparam('version', cg.version)
142 142 chunkiter = bundle.getchunks()
143 143 else:
144 144 # compression argument is only for the bundle2 case
145 145 assert compression is None
146 146 if cg.version != '01':
147 147 raise error.Abort(_('old bundle types only supports v1 '
148 148 'changegroups'))
149 149 header, comp = bundletypes[bundletype]
150 150 if comp not in util.compressors:
151 151 raise error.Abort(_('unknown stream compression type: %s')
152 152 % comp)
153 153 z = util.compressors[comp]()
154 154 subchunkiter = cg.getchunks()
155 155 def chunkiter():
156 156 yield header
157 157 for chunk in subchunkiter:
158 158 yield z.compress(chunk)
159 159 yield z.flush()
160 160 chunkiter = chunkiter()
161 161
162 162 # parse the changegroup data, otherwise we will block
163 163 # in case of sshrepo because we don't know the end of the stream
164 164
165 165 # an empty chunkgroup is the end of the changegroup
166 166 # a changegroup has at least 2 chunkgroups (changelog and manifest).
167 167 # after that, an empty chunkgroup is the end of the changegroup
168 168 return writechunks(ui, chunkiter, filename, vfs=vfs)
169 169
170 170 class cg1unpacker(object):
171 171 """Unpacker for cg1 changegroup streams.
172 172
173 173 A changegroup unpacker handles the framing of the revision data in
174 174 the wire format. Most consumers will want to use the apply()
175 175 method to add the changes from the changegroup to a repository.
176 176
177 177 If you're forwarding a changegroup unmodified to another consumer,
178 178 use getchunks(), which returns an iterator of changegroup
179 179 chunks. This is mostly useful for cases where you need to know the
180 180 data stream has ended by observing the end of the changegroup.
181 181
182 182 deltachunk() is useful only if you're applying delta data. Most
183 183 consumers should prefer apply() instead.
184 184
185 185 A few other public methods exist. Those are used only for
186 186 bundlerepo and some debug commands - their use is discouraged.
187 187 """
188 188 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
189 189 deltaheadersize = struct.calcsize(deltaheader)
190 190 version = '01'
191 191 def __init__(self, fh, alg):
192 192 if alg == 'UN':
193 193 alg = None # get more modern without breaking too much
194 194 if not alg in util.decompressors:
195 195 raise error.Abort(_('unknown stream compression type: %s')
196 196 % alg)
197 197 if alg == 'BZ':
198 198 alg = '_truncatedBZ'
199 199 self._stream = util.decompressors[alg](fh)
200 200 self._type = alg
201 201 self.callback = None
202 202
203 203 # These methods (compressed, read, seek, tell) all appear to only
204 204 # be used by bundlerepo, but it's a little hard to tell.
205 205 def compressed(self):
206 206 return self._type is not None
207 207 def read(self, l):
208 208 return self._stream.read(l)
209 209 def seek(self, pos):
210 210 return self._stream.seek(pos)
211 211 def tell(self):
212 212 return self._stream.tell()
213 213 def close(self):
214 214 return self._stream.close()
215 215
216 216 def _chunklength(self):
217 217 d = readexactly(self._stream, 4)
218 218 l = struct.unpack(">l", d)[0]
219 219 if l <= 4:
220 220 if l:
221 221 raise error.Abort(_("invalid chunk length %d") % l)
222 222 return 0
223 223 if self.callback:
224 224 self.callback()
225 225 return l - 4
226 226
227 227 def changelogheader(self):
228 228 """v10 does not have a changelog header chunk"""
229 229 return {}
230 230
231 231 def manifestheader(self):
232 232 """v10 does not have a manifest header chunk"""
233 233 return {}
234 234
235 235 def filelogheader(self):
236 236 """return the header of the filelogs chunk, v10 only has the filename"""
237 237 l = self._chunklength()
238 238 if not l:
239 239 return {}
240 240 fname = readexactly(self._stream, l)
241 241 return {'filename': fname}
242 242
243 243 def _deltaheader(self, headertuple, prevnode):
244 244 node, p1, p2, cs = headertuple
245 245 if prevnode is None:
246 246 deltabase = p1
247 247 else:
248 248 deltabase = prevnode
249 249 return node, p1, p2, deltabase, cs
250 250
251 251 def deltachunk(self, prevnode):
252 252 l = self._chunklength()
253 253 if not l:
254 254 return {}
255 255 headerdata = readexactly(self._stream, self.deltaheadersize)
256 256 header = struct.unpack(self.deltaheader, headerdata)
257 257 delta = readexactly(self._stream, l - self.deltaheadersize)
258 258 node, p1, p2, deltabase, cs = self._deltaheader(header, prevnode)
259 259 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
260 260 'deltabase': deltabase, 'delta': delta}
261 261
262 262 def getchunks(self):
263 263 """returns all the chunks contains in the bundle
264 264
265 265 Used when you need to forward the binary stream to a file or another
266 266 network API. To do so, it parse the changegroup data, otherwise it will
267 267 block in case of sshrepo because it don't know the end of the stream.
268 268 """
269 269 # an empty chunkgroup is the end of the changegroup
270 270 # a changegroup has at least 2 chunkgroups (changelog and manifest).
271 271 # after that, an empty chunkgroup is the end of the changegroup
272 272 empty = False
273 273 count = 0
274 274 while not empty or count <= 2:
275 275 empty = True
276 276 count += 1
277 277 while True:
278 278 chunk = getchunk(self)
279 279 if not chunk:
280 280 break
281 281 empty = False
282 282 yield chunkheader(len(chunk))
283 283 pos = 0
284 284 while pos < len(chunk):
285 285 next = pos + 2**20
286 286 yield chunk[pos:next]
287 287 pos = next
288 288 yield closechunk()
289 289
290 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
291 # We know that we'll never have more manifests than we had
292 # changesets.
293 self.callback = prog(_('manifests'), numchanges)
294 # no need to check for empty manifest group here:
295 # if the result of the merge of 1 and 2 is the same in 3 and 4,
296 # no new manifest will be created and the manifest group will
297 # be empty during the pull
298 self.manifestheader()
299 repo.manifest.addgroup(self, revmap, trp)
300 repo.ui.progress(_('manifests'), None)
301
290 302 def apply(self, repo, srctype, url, emptyok=False,
291 303 targetphase=phases.draft, expectedtotal=None):
292 304 """Add the changegroup returned by source.read() to this repo.
293 305 srctype is a string like 'push', 'pull', or 'unbundle'. url is
294 306 the URL of the repo where this changegroup is coming from.
295 307
296 308 Return an integer summarizing the change to this repo:
297 309 - nothing changed or no source: 0
298 310 - more heads than before: 1+added heads (2..n)
299 311 - fewer heads than before: -1-removed heads (-2..-n)
300 312 - number of heads stays the same: 1
301 313 """
302 314 repo = repo.unfiltered()
303 315 def csmap(x):
304 316 repo.ui.debug("add changeset %s\n" % short(x))
305 317 return len(cl)
306 318
307 319 def revmap(x):
308 320 return cl.rev(x)
309 321
310 322 changesets = files = revisions = 0
311 323
312 324 tr = repo.transaction("\n".join([srctype, util.hidepassword(url)]))
313 325 # The transaction could have been created before and already
314 326 # carries source information. In this case we use the top
315 327 # level data. We overwrite the argument because we need to use
316 328 # the top level value (if they exist) in this function.
317 329 srctype = tr.hookargs.setdefault('source', srctype)
318 330 url = tr.hookargs.setdefault('url', url)
319 331
320 332 # write changelog data to temp files so concurrent readers will not see
321 333 # inconsistent view
322 334 cl = repo.changelog
323 335 cl.delayupdate(tr)
324 336 oldheads = cl.heads()
325 337 try:
326 338 repo.hook('prechangegroup', throw=True, **tr.hookargs)
327 339
328 340 trp = weakref.proxy(tr)
329 341 # pull off the changeset group
330 342 repo.ui.status(_("adding changesets\n"))
331 343 clstart = len(cl)
332 344 class prog(object):
333 345 def __init__(self, step, total):
334 346 self._step = step
335 347 self._total = total
336 348 self._count = 1
337 349 def __call__(self):
338 350 repo.ui.progress(self._step, self._count, unit=_('chunks'),
339 351 total=self._total)
340 352 self._count += 1
341 353 self.callback = prog(_('changesets'), expectedtotal)
342 354
343 355 efiles = set()
344 356 def onchangelog(cl, node):
345 357 efiles.update(cl.read(node)[3])
346 358
347 359 self.changelogheader()
348 360 srccontent = cl.addgroup(self, csmap, trp,
349 361 addrevisioncb=onchangelog)
350 362 efiles = len(efiles)
351 363
352 364 if not (srccontent or emptyok):
353 365 raise error.Abort(_("received changelog group is empty"))
354 366 clend = len(cl)
355 367 changesets = clend - clstart
356 368 repo.ui.progress(_('changesets'), None)
357 369
358 370 # pull off the manifest group
359 371 repo.ui.status(_("adding manifests\n"))
360 # manifests <= changesets
361 self.callback = prog(_('manifests'), changesets)
362 # no need to check for empty manifest group here:
363 # if the result of the merge of 1 and 2 is the same in 3 and 4,
364 # no new manifest will be created and the manifest group will
365 # be empty during the pull
366 self.manifestheader()
367 repo.manifest.addgroup(self, revmap, trp)
368 repo.ui.progress(_('manifests'), None)
372 self._unpackmanifests(repo, revmap, trp, prog, changesets)
369 373
370 374 needfiles = {}
371 375 if repo.ui.configbool('server', 'validate', default=False):
372 376 # validate incoming csets have their manifests
373 377 for cset in xrange(clstart, clend):
374 378 mfnode = repo.changelog.read(repo.changelog.node(cset))[0]
375 379 mfest = repo.manifest.readdelta(mfnode)
376 380 # store file nodes we must see
377 381 for f, n in mfest.iteritems():
378 382 needfiles.setdefault(f, set()).add(n)
379 383
380 384 # process the files
381 385 repo.ui.status(_("adding file changes\n"))
382 386 self.callback = None
383 387 pr = prog(_('files'), efiles)
384 388 newrevs, newfiles = _addchangegroupfiles(
385 389 repo, self, revmap, trp, pr, needfiles)
386 390 revisions += newrevs
387 391 files += newfiles
388 392
389 393 dh = 0
390 394 if oldheads:
391 395 heads = cl.heads()
392 396 dh = len(heads) - len(oldheads)
393 397 for h in heads:
394 398 if h not in oldheads and repo[h].closesbranch():
395 399 dh -= 1
396 400 htext = ""
397 401 if dh:
398 402 htext = _(" (%+d heads)") % dh
399 403
400 404 repo.ui.status(_("added %d changesets"
401 405 " with %d changes to %d files%s\n")
402 406 % (changesets, revisions, files, htext))
403 407 repo.invalidatevolatilesets()
404 408
405 409 if changesets > 0:
406 410 p = lambda: tr.writepending() and repo.root or ""
407 411 if 'node' not in tr.hookargs:
408 412 tr.hookargs['node'] = hex(cl.node(clstart))
409 413 hookargs = dict(tr.hookargs)
410 414 else:
411 415 hookargs = dict(tr.hookargs)
412 416 hookargs['node'] = hex(cl.node(clstart))
413 417 repo.hook('pretxnchangegroup', throw=True, pending=p,
414 418 **hookargs)
415 419
416 420 added = [cl.node(r) for r in xrange(clstart, clend)]
417 421 publishing = repo.publishing()
418 422 if srctype in ('push', 'serve'):
419 423 # Old servers can not push the boundary themselves.
420 424 # New servers won't push the boundary if changeset already
421 425 # exists locally as secret
422 426 #
423 427 # We should not use added here but the list of all change in
424 428 # the bundle
425 429 if publishing:
426 430 phases.advanceboundary(repo, tr, phases.public, srccontent)
427 431 else:
428 432 # Those changesets have been pushed from the outside, their
429 433 # phases are going to be pushed alongside. Therefor
430 434 # `targetphase` is ignored.
431 435 phases.advanceboundary(repo, tr, phases.draft, srccontent)
432 436 phases.retractboundary(repo, tr, phases.draft, added)
433 437 elif srctype != 'strip':
434 438 # publishing only alter behavior during push
435 439 #
436 440 # strip should not touch boundary at all
437 441 phases.retractboundary(repo, tr, targetphase, added)
438 442
439 443 if changesets > 0:
440 444 if srctype != 'strip':
441 445 # During strip, branchcache is invalid but coming call to
442 446 # `destroyed` will repair it.
443 447 # In other case we can safely update cache on disk.
444 448 branchmap.updatecache(repo.filtered('served'))
445 449
446 450 def runhooks():
447 451 # These hooks run when the lock releases, not when the
448 452 # transaction closes. So it's possible for the changelog
449 453 # to have changed since we last saw it.
450 454 if clstart >= len(repo):
451 455 return
452 456
453 457 # forcefully update the on-disk branch cache
454 458 repo.ui.debug("updating the branch cache\n")
455 459 repo.hook("changegroup", **hookargs)
456 460
457 461 for n in added:
458 462 args = hookargs.copy()
459 463 args['node'] = hex(n)
460 464 repo.hook("incoming", **args)
461 465
462 466 newheads = [h for h in repo.heads() if h not in oldheads]
463 467 repo.ui.log("incoming",
464 468 "%s incoming changes - new heads: %s\n",
465 469 len(added),
466 470 ', '.join([hex(c[:6]) for c in newheads]))
467 471
468 472 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
469 473 lambda tr: repo._afterlock(runhooks))
470 474
471 475 tr.close()
472 476
473 477 finally:
474 478 tr.release()
475 479 repo.ui.flush()
476 480 # never return 0 here:
477 481 if dh < 0:
478 482 return dh - 1
479 483 else:
480 484 return dh + 1
481 485
482 486 class cg2unpacker(cg1unpacker):
483 487 """Unpacker for cg2 streams.
484 488
485 489 cg2 streams add support for generaldelta, so the delta header
486 490 format is slightly different. All other features about the data
487 491 remain the same.
488 492 """
489 493 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
490 494 deltaheadersize = struct.calcsize(deltaheader)
491 495 version = '02'
492 496
493 497 def _deltaheader(self, headertuple, prevnode):
494 498 node, p1, p2, deltabase, cs = headertuple
495 499 return node, p1, p2, deltabase, cs
496 500
497 501 class headerlessfixup(object):
498 502 def __init__(self, fh, h):
499 503 self._h = h
500 504 self._fh = fh
501 505 def read(self, n):
502 506 if self._h:
503 507 d, self._h = self._h[:n], self._h[n:]
504 508 if len(d) < n:
505 509 d += readexactly(self._fh, n - len(d))
506 510 return d
507 511 return readexactly(self._fh, n)
508 512
509 513 class cg1packer(object):
510 514 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
511 515 version = '01'
512 516 def __init__(self, repo, bundlecaps=None):
513 517 """Given a source repo, construct a bundler.
514 518
515 519 bundlecaps is optional and can be used to specify the set of
516 520 capabilities which can be used to build the bundle.
517 521 """
518 522 # Set of capabilities we can use to build the bundle.
519 523 if bundlecaps is None:
520 524 bundlecaps = set()
521 525 self._bundlecaps = bundlecaps
522 526 # experimental config: bundle.reorder
523 527 reorder = repo.ui.config('bundle', 'reorder', 'auto')
524 528 if reorder == 'auto':
525 529 reorder = None
526 530 else:
527 531 reorder = util.parsebool(reorder)
528 532 self._repo = repo
529 533 self._reorder = reorder
530 534 self._progress = repo.ui.progress
531 535 if self._repo.ui.verbose and not self._repo.ui.debugflag:
532 536 self._verbosenote = self._repo.ui.note
533 537 else:
534 538 self._verbosenote = lambda s: None
535 539
536 540 def close(self):
537 541 return closechunk()
538 542
539 543 def fileheader(self, fname):
540 544 return chunkheader(len(fname)) + fname
541 545
542 546 def group(self, nodelist, revlog, lookup, units=None):
543 547 """Calculate a delta group, yielding a sequence of changegroup chunks
544 548 (strings).
545 549
546 550 Given a list of changeset revs, return a set of deltas and
547 551 metadata corresponding to nodes. The first delta is
548 552 first parent(nodelist[0]) -> nodelist[0], the receiver is
549 553 guaranteed to have this parent as it has all history before
550 554 these changesets. In the case firstparent is nullrev the
551 555 changegroup starts with a full revision.
552 556
553 557 If units is not None, progress detail will be generated, units specifies
554 558 the type of revlog that is touched (changelog, manifest, etc.).
555 559 """
556 560 # if we don't have any revisions touched by these changesets, bail
557 561 if len(nodelist) == 0:
558 562 yield self.close()
559 563 return
560 564
561 565 # for generaldelta revlogs, we linearize the revs; this will both be
562 566 # much quicker and generate a much smaller bundle
563 567 if (revlog._generaldelta and self._reorder is None) or self._reorder:
564 568 dag = dagutil.revlogdag(revlog)
565 569 revs = set(revlog.rev(n) for n in nodelist)
566 570 revs = dag.linearize(revs)
567 571 else:
568 572 revs = sorted([revlog.rev(n) for n in nodelist])
569 573
570 574 # add the parent of the first rev
571 575 p = revlog.parentrevs(revs[0])[0]
572 576 revs.insert(0, p)
573 577
574 578 # build deltas
575 579 total = len(revs) - 1
576 580 msgbundling = _('bundling')
577 581 for r in xrange(len(revs) - 1):
578 582 if units is not None:
579 583 self._progress(msgbundling, r + 1, unit=units, total=total)
580 584 prev, curr = revs[r], revs[r + 1]
581 585 linknode = lookup(revlog.node(curr))
582 586 for c in self.revchunk(revlog, curr, prev, linknode):
583 587 yield c
584 588
585 589 if units is not None:
586 590 self._progress(msgbundling, None)
587 591 yield self.close()
588 592
589 593 # filter any nodes that claim to be part of the known set
590 594 def prune(self, revlog, missing, commonrevs):
591 595 rr, rl = revlog.rev, revlog.linkrev
592 596 return [n for n in missing if rl(rr(n)) not in commonrevs]
593 597
594 598 def _packmanifests(self, mfnodes, lookuplinknode):
595 599 """Pack flat manifests into a changegroup stream."""
596 600 ml = self._repo.manifest
597 601 size = 0
598 602 for chunk in self.group(
599 603 mfnodes, ml, lookuplinknode, units=_('manifests')):
600 604 size += len(chunk)
601 605 yield chunk
602 606 self._verbosenote(_('%8.i (manifests)\n') % size)
603 607
604 608 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
605 609 '''yield a sequence of changegroup chunks (strings)'''
606 610 repo = self._repo
607 611 cl = repo.changelog
608 612 ml = repo.manifest
609 613
610 614 clrevorder = {}
611 615 mfs = {} # needed manifests
612 616 fnodes = {} # needed file nodes
613 617 changedfiles = set()
614 618
615 619 # Callback for the changelog, used to collect changed files and manifest
616 620 # nodes.
617 621 # Returns the linkrev node (identity in the changelog case).
618 622 def lookupcl(x):
619 623 c = cl.read(x)
620 624 clrevorder[x] = len(clrevorder)
621 625 changedfiles.update(c[3])
622 626 # record the first changeset introducing this manifest version
623 627 mfs.setdefault(c[0], x)
624 628 return x
625 629
626 630 self._verbosenote(_('uncompressed size of bundle content:\n'))
627 631 size = 0
628 632 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
629 633 size += len(chunk)
630 634 yield chunk
631 635 self._verbosenote(_('%8.i (changelog)\n') % size)
632 636
633 637 # We need to make sure that the linkrev in the changegroup refers to
634 638 # the first changeset that introduced the manifest or file revision.
635 639 # The fastpath is usually safer than the slowpath, because the filelogs
636 640 # are walked in revlog order.
637 641 #
638 642 # When taking the slowpath with reorder=None and the manifest revlog
639 643 # uses generaldelta, the manifest may be walked in the "wrong" order.
640 644 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
641 645 # cc0ff93d0c0c).
642 646 #
643 647 # When taking the fastpath, we are only vulnerable to reordering
644 648 # of the changelog itself. The changelog never uses generaldelta, so
645 649 # it is only reordered when reorder=True. To handle this case, we
646 650 # simply take the slowpath, which already has the 'clrevorder' logic.
647 651 # This was also fixed in cc0ff93d0c0c.
648 652 fastpathlinkrev = fastpathlinkrev and not self._reorder
649 653 # Callback for the manifest, used to collect linkrevs for filelog
650 654 # revisions.
651 655 # Returns the linkrev node (collected in lookupcl).
652 656 def lookupmflinknode(x):
653 657 clnode = mfs[x]
654 658 if not fastpathlinkrev:
655 659 mdata = ml.readfast(x)
656 660 for f, n in mdata.iteritems():
657 661 if f in changedfiles:
658 662 # record the first changeset introducing this filelog
659 663 # version
660 664 fclnodes = fnodes.setdefault(f, {})
661 665 fclnode = fclnodes.setdefault(n, clnode)
662 666 if clrevorder[clnode] < clrevorder[fclnode]:
663 667 fclnodes[n] = clnode
664 668 return clnode
665 669
666 670 mfnodes = self.prune(ml, mfs, commonrevs)
667 671 for x in self._packmanifests(mfnodes, lookupmflinknode):
668 672 yield x
669 673
670 674 mfs.clear()
671 675 clrevs = set(cl.rev(x) for x in clnodes)
672 676
673 677 def linknodes(filerevlog, fname):
674 678 if fastpathlinkrev:
675 679 llr = filerevlog.linkrev
676 680 def genfilenodes():
677 681 for r in filerevlog:
678 682 linkrev = llr(r)
679 683 if linkrev in clrevs:
680 684 yield filerevlog.node(r), cl.node(linkrev)
681 685 return dict(genfilenodes())
682 686 return fnodes.get(fname, {})
683 687
684 688 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
685 689 source):
686 690 yield chunk
687 691
688 692 yield self.close()
689 693
690 694 if clnodes:
691 695 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
692 696
693 697 # The 'source' parameter is useful for extensions
694 698 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
695 699 repo = self._repo
696 700 progress = self._progress
697 701 msgbundling = _('bundling')
698 702
699 703 total = len(changedfiles)
700 704 # for progress output
701 705 msgfiles = _('files')
702 706 for i, fname in enumerate(sorted(changedfiles)):
703 707 filerevlog = repo.file(fname)
704 708 if not filerevlog:
705 709 raise error.Abort(_("empty or missing revlog for %s") % fname)
706 710
707 711 linkrevnodes = linknodes(filerevlog, fname)
708 712 # Lookup for filenodes, we collected the linkrev nodes above in the
709 713 # fastpath case and with lookupmf in the slowpath case.
710 714 def lookupfilelog(x):
711 715 return linkrevnodes[x]
712 716
713 717 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
714 718 if filenodes:
715 719 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
716 720 total=total)
717 721 h = self.fileheader(fname)
718 722 size = len(h)
719 723 yield h
720 724 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
721 725 size += len(chunk)
722 726 yield chunk
723 727 self._verbosenote(_('%8.i %s\n') % (size, fname))
724 728 progress(msgbundling, None)
725 729
726 730 def deltaparent(self, revlog, rev, p1, p2, prev):
727 731 return prev
728 732
729 733 def revchunk(self, revlog, rev, prev, linknode):
730 734 node = revlog.node(rev)
731 735 p1, p2 = revlog.parentrevs(rev)
732 736 base = self.deltaparent(revlog, rev, p1, p2, prev)
733 737
734 738 prefix = ''
735 739 if revlog.iscensored(base) or revlog.iscensored(rev):
736 740 try:
737 741 delta = revlog.revision(node)
738 742 except error.CensoredNodeError as e:
739 743 delta = e.tombstone
740 744 if base == nullrev:
741 745 prefix = mdiff.trivialdiffheader(len(delta))
742 746 else:
743 747 baselen = revlog.rawsize(base)
744 748 prefix = mdiff.replacediffheader(baselen, len(delta))
745 749 elif base == nullrev:
746 750 delta = revlog.revision(node)
747 751 prefix = mdiff.trivialdiffheader(len(delta))
748 752 else:
749 753 delta = revlog.revdiff(base, rev)
750 754 p1n, p2n = revlog.parents(node)
751 755 basenode = revlog.node(base)
752 756 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode)
753 757 meta += prefix
754 758 l = len(meta) + len(delta)
755 759 yield chunkheader(l)
756 760 yield meta
757 761 yield delta
758 762 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
759 763 # do nothing with basenode, it is implicitly the previous one in HG10
760 764 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
761 765
762 766 class cg2packer(cg1packer):
763 767 version = '02'
764 768 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
765 769
766 770 def __init__(self, repo, bundlecaps=None):
767 771 super(cg2packer, self).__init__(repo, bundlecaps)
768 772 if self._reorder is None:
769 773 # Since generaldelta is directly supported by cg2, reordering
770 774 # generally doesn't help, so we disable it by default (treating
771 775 # bundle.reorder=auto just like bundle.reorder=False).
772 776 self._reorder = False
773 777
774 778 def deltaparent(self, revlog, rev, p1, p2, prev):
775 779 dp = revlog.deltaparent(rev)
776 780 # avoid storing full revisions; pick prev in those cases
777 781 # also pick prev when we can't be sure remote has dp
778 782 if dp == nullrev or (dp != p1 and dp != p2 and dp != prev):
779 783 return prev
780 784 return dp
781 785
782 786 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
783 787 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
784 788
785 789 packermap = {'01': (cg1packer, cg1unpacker),
786 790 # cg2 adds support for exchanging generaldelta
787 791 '02': (cg2packer, cg2unpacker),
788 792 }
789 793
790 794 def _changegroupinfo(repo, nodes, source):
791 795 if repo.ui.verbose or source == 'bundle':
792 796 repo.ui.status(_("%d changesets found\n") % len(nodes))
793 797 if repo.ui.debugflag:
794 798 repo.ui.debug("list of changesets:\n")
795 799 for node in nodes:
796 800 repo.ui.debug("%s\n" % hex(node))
797 801
798 802 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
799 803 repo = repo.unfiltered()
800 804 commonrevs = outgoing.common
801 805 csets = outgoing.missing
802 806 heads = outgoing.missingheads
803 807 # We go through the fast path if we get told to, or if all (unfiltered
804 808 # heads have been requested (since we then know there all linkrevs will
805 809 # be pulled by the client).
806 810 heads.sort()
807 811 fastpathlinkrev = fastpath or (
808 812 repo.filtername is None and heads == sorted(repo.heads()))
809 813
810 814 repo.hook('preoutgoing', throw=True, source=source)
811 815 _changegroupinfo(repo, csets, source)
812 816 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
813 817
814 818 def getsubset(repo, outgoing, bundler, source, fastpath=False):
815 819 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
816 820 return packermap[bundler.version][1](util.chunkbuffer(gengroup), None)
817 821
818 822 def changegroupsubset(repo, roots, heads, source, version='01'):
819 823 """Compute a changegroup consisting of all the nodes that are
820 824 descendants of any of the roots and ancestors of any of the heads.
821 825 Return a chunkbuffer object whose read() method will return
822 826 successive changegroup chunks.
823 827
824 828 It is fairly complex as determining which filenodes and which
825 829 manifest nodes need to be included for the changeset to be complete
826 830 is non-trivial.
827 831
828 832 Another wrinkle is doing the reverse, figuring out which changeset in
829 833 the changegroup a particular filenode or manifestnode belongs to.
830 834 """
831 835 cl = repo.changelog
832 836 if not roots:
833 837 roots = [nullid]
834 838 discbases = []
835 839 for n in roots:
836 840 discbases.extend([p for p in cl.parents(n) if p != nullid])
837 841 # TODO: remove call to nodesbetween.
838 842 csets, roots, heads = cl.nodesbetween(roots, heads)
839 843 included = set(csets)
840 844 discbases = [n for n in discbases if n not in included]
841 845 outgoing = discovery.outgoing(cl, discbases, heads)
842 846 bundler = packermap[version][0](repo)
843 847 return getsubset(repo, outgoing, bundler, source)
844 848
845 849 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
846 850 version='01'):
847 851 """Like getbundle, but taking a discovery.outgoing as an argument.
848 852
849 853 This is only implemented for local repos and reuses potentially
850 854 precomputed sets in outgoing. Returns a raw changegroup generator."""
851 855 if not outgoing.missing:
852 856 return None
853 857 bundler = packermap[version][0](repo, bundlecaps)
854 858 return getsubsetraw(repo, outgoing, bundler, source)
855 859
856 860 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None,
857 861 version='01'):
858 862 """Like getbundle, but taking a discovery.outgoing as an argument.
859 863
860 864 This is only implemented for local repos and reuses potentially
861 865 precomputed sets in outgoing."""
862 866 if not outgoing.missing:
863 867 return None
864 868 bundler = packermap[version][0](repo, bundlecaps)
865 869 return getsubset(repo, outgoing, bundler, source)
866 870
867 871 def computeoutgoing(repo, heads, common):
868 872 """Computes which revs are outgoing given a set of common
869 873 and a set of heads.
870 874
871 875 This is a separate function so extensions can have access to
872 876 the logic.
873 877
874 878 Returns a discovery.outgoing object.
875 879 """
876 880 cl = repo.changelog
877 881 if common:
878 882 hasnode = cl.hasnode
879 883 common = [n for n in common if hasnode(n)]
880 884 else:
881 885 common = [nullid]
882 886 if not heads:
883 887 heads = cl.heads()
884 888 return discovery.outgoing(cl, common, heads)
885 889
886 890 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None,
887 891 version='01'):
888 892 """Like changegroupsubset, but returns the set difference between the
889 893 ancestors of heads and the ancestors common.
890 894
891 895 If heads is None, use the local heads. If common is None, use [nullid].
892 896
893 897 The nodes in common might not all be known locally due to the way the
894 898 current discovery protocol works.
895 899 """
896 900 outgoing = computeoutgoing(repo, heads, common)
897 901 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps,
898 902 version=version)
899 903
900 904 def changegroup(repo, basenodes, source):
901 905 # to avoid a race we use changegroupsubset() (issue1320)
902 906 return changegroupsubset(repo, basenodes, repo.heads(), source)
903 907
904 908 def _addchangegroupfiles(repo, source, revmap, trp, pr, needfiles):
905 909 revisions = 0
906 910 files = 0
907 911 while True:
908 912 chunkdata = source.filelogheader()
909 913 if not chunkdata:
910 914 break
911 915 f = chunkdata["filename"]
912 916 repo.ui.debug("adding %s revisions\n" % f)
913 917 pr()
914 918 fl = repo.file(f)
915 919 o = len(fl)
916 920 try:
917 921 if not fl.addgroup(source, revmap, trp):
918 922 raise error.Abort(_("received file revlog group is empty"))
919 923 except error.CensoredBaseError as e:
920 924 raise error.Abort(_("received delta base is censored: %s") % e)
921 925 revisions += len(fl) - o
922 926 files += 1
923 927 if f in needfiles:
924 928 needs = needfiles[f]
925 929 for new in xrange(o, len(fl)):
926 930 n = fl.node(new)
927 931 if n in needs:
928 932 needs.remove(n)
929 933 else:
930 934 raise error.Abort(
931 935 _("received spurious file revlog entry"))
932 936 if not needs:
933 937 del needfiles[f]
934 938 repo.ui.progress(_('files'), None)
935 939
936 940 for f, needs in needfiles.iteritems():
937 941 fl = repo.file(f)
938 942 for n in needs:
939 943 try:
940 944 fl.rev(n)
941 945 except error.LookupError:
942 946 raise error.Abort(
943 947 _('missing file data for %s:%s - run hg verify') %
944 948 (f, hex(n)))
945 949
946 950 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now