##// END OF EJS Templates
changegroup: document the public surface area of cg?unpackers...
Augie Fackler -
r26708:749d913f default
parent child Browse files
Show More
@@ -1,914 +1,937 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullid,
19 19 nullrev,
20 20 short,
21 21 )
22 22
23 23 from . import (
24 24 branchmap,
25 25 dagutil,
26 26 discovery,
27 27 error,
28 28 mdiff,
29 29 phases,
30 30 util,
31 31 )
32 32
33 33 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
34 34 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
35 35
36 36 def readexactly(stream, n):
37 37 '''read n bytes from stream.read and abort if less was available'''
38 38 s = stream.read(n)
39 39 if len(s) < n:
40 40 raise error.Abort(_("stream ended unexpectedly"
41 41 " (got %d bytes, expected %d)")
42 42 % (len(s), n))
43 43 return s
44 44
45 45 def getchunk(stream):
46 46 """return the next chunk from stream as a string"""
47 47 d = readexactly(stream, 4)
48 48 l = struct.unpack(">l", d)[0]
49 49 if l <= 4:
50 50 if l:
51 51 raise error.Abort(_("invalid chunk length %d") % l)
52 52 return ""
53 53 return readexactly(stream, l - 4)
54 54
55 55 def chunkheader(length):
56 56 """return a changegroup chunk header (string)"""
57 57 return struct.pack(">l", length + 4)
58 58
59 59 def closechunk():
60 60 """return a changegroup chunk header (string) for a zero-length chunk"""
61 61 return struct.pack(">l", 0)
62 62
63 63 def combineresults(results):
64 64 """logic to combine 0 or more addchangegroup results into one"""
65 65 changedheads = 0
66 66 result = 1
67 67 for ret in results:
68 68 # If any changegroup result is 0, return 0
69 69 if ret == 0:
70 70 result = 0
71 71 break
72 72 if ret < -1:
73 73 changedheads += ret + 1
74 74 elif ret > 1:
75 75 changedheads += ret - 1
76 76 if changedheads > 0:
77 77 result = 1 + changedheads
78 78 elif changedheads < 0:
79 79 result = -1 + changedheads
80 80 return result
81 81
82 82 bundletypes = {
83 83 "": ("", None), # only when using unbundle on ssh and old http servers
84 84 # since the unification ssh accepts a header but there
85 85 # is no capability signaling it.
86 86 "HG20": (), # special-cased below
87 87 "HG10UN": ("HG10UN", None),
88 88 "HG10BZ": ("HG10", 'BZ'),
89 89 "HG10GZ": ("HG10GZ", 'GZ'),
90 90 }
91 91
92 92 # hgweb uses this list to communicate its preferred type
93 93 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
94 94
95 95 def writechunks(ui, chunks, filename, vfs=None):
96 96 """Write chunks to a file and return its filename.
97 97
98 98 The stream is assumed to be a bundle file.
99 99 Existing files will not be overwritten.
100 100 If no filename is specified, a temporary file is created.
101 101 """
102 102 fh = None
103 103 cleanup = None
104 104 try:
105 105 if filename:
106 106 if vfs:
107 107 fh = vfs.open(filename, "wb")
108 108 else:
109 109 fh = open(filename, "wb")
110 110 else:
111 111 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
112 112 fh = os.fdopen(fd, "wb")
113 113 cleanup = filename
114 114 for c in chunks:
115 115 fh.write(c)
116 116 cleanup = None
117 117 return filename
118 118 finally:
119 119 if fh is not None:
120 120 fh.close()
121 121 if cleanup is not None:
122 122 if filename and vfs:
123 123 vfs.unlink(cleanup)
124 124 else:
125 125 os.unlink(cleanup)
126 126
127 127 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
128 128 """Write a bundle file and return its filename.
129 129
130 130 Existing files will not be overwritten.
131 131 If no filename is specified, a temporary file is created.
132 132 bz2 compression can be turned off.
133 133 The bundle file will be deleted in case of errors.
134 134 """
135 135
136 136 if bundletype == "HG20":
137 137 from . import bundle2
138 138 bundle = bundle2.bundle20(ui)
139 139 bundle.setcompression(compression)
140 140 part = bundle.newpart('changegroup', data=cg.getchunks())
141 141 part.addparam('version', cg.version)
142 142 chunkiter = bundle.getchunks()
143 143 else:
144 144 # compression argument is only for the bundle2 case
145 145 assert compression is None
146 146 if cg.version != '01':
147 147 raise error.Abort(_('old bundle types only supports v1 '
148 148 'changegroups'))
149 149 header, comp = bundletypes[bundletype]
150 150 if comp not in util.compressors:
151 151 raise error.Abort(_('unknown stream compression type: %s')
152 152 % comp)
153 153 z = util.compressors[comp]()
154 154 subchunkiter = cg.getchunks()
155 155 def chunkiter():
156 156 yield header
157 157 for chunk in subchunkiter:
158 158 yield z.compress(chunk)
159 159 yield z.flush()
160 160 chunkiter = chunkiter()
161 161
162 162 # parse the changegroup data, otherwise we will block
163 163 # in case of sshrepo because we don't know the end of the stream
164 164
165 165 # an empty chunkgroup is the end of the changegroup
166 166 # a changegroup has at least 2 chunkgroups (changelog and manifest).
167 167 # after that, an empty chunkgroup is the end of the changegroup
168 168 return writechunks(ui, chunkiter, filename, vfs=vfs)
169 169
170 170 class cg1unpacker(object):
171 """Unpacker for cg1 changegroup streams.
172
173 A changegroup unpacker handles the framing of the revision data in
174 the wire format. Most consumers will want to use the apply()
175 method to add the changes from the changegroup to a repository.
176
177 If you're forwarding a changegroup unmodified to another consumer,
178 use getchunks(), which returns an iterator of changegroup
179 chunks. This is mostly useful for cases where you need to know the
180 data stream has ended by observing the end of the changegroup.
181
182 deltachunk() is useful only if you're applying delta data. Most
183 consumers should prefer apply() instead.
184
185 A few other public methods exist. Those are used only for
186 bundlerepo and some debug commands - their use is discouraged.
187 """
171 188 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
172 189 deltaheadersize = struct.calcsize(deltaheader)
173 190 version = '01'
174 191 def __init__(self, fh, alg):
175 192 if alg == 'UN':
176 193 alg = None # get more modern without breaking too much
177 194 if not alg in util.decompressors:
178 195 raise error.Abort(_('unknown stream compression type: %s')
179 196 % alg)
180 197 if alg == 'BZ':
181 198 alg = '_truncatedBZ'
182 199 self._stream = util.decompressors[alg](fh)
183 200 self._type = alg
184 201 self.callback = None
185 202
186 203 # These methods (compressed, read, seek, tell) all appear to only
187 204 # be used by bundlerepo, but it's a little hard to tell.
188 205 def compressed(self):
189 206 return self._type is not None
190 207 def read(self, l):
191 208 return self._stream.read(l)
192 209 def seek(self, pos):
193 210 return self._stream.seek(pos)
194 211 def tell(self):
195 212 return self._stream.tell()
196 213 def close(self):
197 214 return self._stream.close()
198 215
199 216 def _chunklength(self):
200 217 d = readexactly(self._stream, 4)
201 218 l = struct.unpack(">l", d)[0]
202 219 if l <= 4:
203 220 if l:
204 221 raise error.Abort(_("invalid chunk length %d") % l)
205 222 return 0
206 223 if self.callback:
207 224 self.callback()
208 225 return l - 4
209 226
210 227 def changelogheader(self):
211 228 """v10 does not have a changelog header chunk"""
212 229 return {}
213 230
214 231 def manifestheader(self):
215 232 """v10 does not have a manifest header chunk"""
216 233 return {}
217 234
218 235 def filelogheader(self):
219 236 """return the header of the filelogs chunk, v10 only has the filename"""
220 237 l = self._chunklength()
221 238 if not l:
222 239 return {}
223 240 fname = readexactly(self._stream, l)
224 241 return {'filename': fname}
225 242
226 243 def _deltaheader(self, headertuple, prevnode):
227 244 node, p1, p2, cs = headertuple
228 245 if prevnode is None:
229 246 deltabase = p1
230 247 else:
231 248 deltabase = prevnode
232 249 return node, p1, p2, deltabase, cs
233 250
234 251 def deltachunk(self, prevnode):
235 252 l = self._chunklength()
236 253 if not l:
237 254 return {}
238 255 headerdata = readexactly(self._stream, self.deltaheadersize)
239 256 header = struct.unpack(self.deltaheader, headerdata)
240 257 delta = readexactly(self._stream, l - self.deltaheadersize)
241 258 node, p1, p2, deltabase, cs = self._deltaheader(header, prevnode)
242 259 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
243 260 'deltabase': deltabase, 'delta': delta}
244 261
245 262 def getchunks(self):
246 263 """returns all the chunks contains in the bundle
247 264
248 265 Used when you need to forward the binary stream to a file or another
249 266 network API. To do so, it parse the changegroup data, otherwise it will
250 267 block in case of sshrepo because it don't know the end of the stream.
251 268 """
252 269 # an empty chunkgroup is the end of the changegroup
253 270 # a changegroup has at least 2 chunkgroups (changelog and manifest).
254 271 # after that, an empty chunkgroup is the end of the changegroup
255 272 empty = False
256 273 count = 0
257 274 while not empty or count <= 2:
258 275 empty = True
259 276 count += 1
260 277 while True:
261 278 chunk = getchunk(self)
262 279 if not chunk:
263 280 break
264 281 empty = False
265 282 yield chunkheader(len(chunk))
266 283 pos = 0
267 284 while pos < len(chunk):
268 285 next = pos + 2**20
269 286 yield chunk[pos:next]
270 287 pos = next
271 288 yield closechunk()
272 289
273 290 def apply(self, repo, srctype, url, emptyok=False,
274 291 targetphase=phases.draft, expectedtotal=None):
275 292 """Add the changegroup returned by source.read() to this repo.
276 293 srctype is a string like 'push', 'pull', or 'unbundle'. url is
277 294 the URL of the repo where this changegroup is coming from.
278 295
279 296 Return an integer summarizing the change to this repo:
280 297 - nothing changed or no source: 0
281 298 - more heads than before: 1+added heads (2..n)
282 299 - fewer heads than before: -1-removed heads (-2..-n)
283 300 - number of heads stays the same: 1
284 301 """
285 302 repo = repo.unfiltered()
286 303 def csmap(x):
287 304 repo.ui.debug("add changeset %s\n" % short(x))
288 305 return len(cl)
289 306
290 307 def revmap(x):
291 308 return cl.rev(x)
292 309
293 310 changesets = files = revisions = 0
294 311
295 312 tr = repo.transaction("\n".join([srctype, util.hidepassword(url)]))
296 313 # The transaction could have been created before and already
297 314 # carries source information. In this case we use the top
298 315 # level data. We overwrite the argument because we need to use
299 316 # the top level value (if they exist) in this function.
300 317 srctype = tr.hookargs.setdefault('source', srctype)
301 318 url = tr.hookargs.setdefault('url', url)
302 319
303 320 # write changelog data to temp files so concurrent readers will not see
304 321 # inconsistent view
305 322 cl = repo.changelog
306 323 cl.delayupdate(tr)
307 324 oldheads = cl.heads()
308 325 try:
309 326 repo.hook('prechangegroup', throw=True, **tr.hookargs)
310 327
311 328 trp = weakref.proxy(tr)
312 329 # pull off the changeset group
313 330 repo.ui.status(_("adding changesets\n"))
314 331 clstart = len(cl)
315 332 class prog(object):
316 333 def __init__(self, step, total):
317 334 self._step = step
318 335 self._total = total
319 336 self._count = 1
320 337 def __call__(self):
321 338 repo.ui.progress(self._step, self._count, unit=_('chunks'),
322 339 total=self._total)
323 340 self._count += 1
324 341 self.callback = prog(_('changesets'), expectedtotal)
325 342
326 343 efiles = set()
327 344 def onchangelog(cl, node):
328 345 efiles.update(cl.read(node)[3])
329 346
330 347 self.changelogheader()
331 348 srccontent = cl.addgroup(self, csmap, trp,
332 349 addrevisioncb=onchangelog)
333 350 efiles = len(efiles)
334 351
335 352 if not (srccontent or emptyok):
336 353 raise error.Abort(_("received changelog group is empty"))
337 354 clend = len(cl)
338 355 changesets = clend - clstart
339 356 repo.ui.progress(_('changesets'), None)
340 357
341 358 # pull off the manifest group
342 359 repo.ui.status(_("adding manifests\n"))
343 360 # manifests <= changesets
344 361 self.callback = prog(_('manifests'), changesets)
345 362 # no need to check for empty manifest group here:
346 363 # if the result of the merge of 1 and 2 is the same in 3 and 4,
347 364 # no new manifest will be created and the manifest group will
348 365 # be empty during the pull
349 366 self.manifestheader()
350 367 repo.manifest.addgroup(self, revmap, trp)
351 368 repo.ui.progress(_('manifests'), None)
352 369
353 370 needfiles = {}
354 371 if repo.ui.configbool('server', 'validate', default=False):
355 372 # validate incoming csets have their manifests
356 373 for cset in xrange(clstart, clend):
357 374 mfnode = repo.changelog.read(repo.changelog.node(cset))[0]
358 375 mfest = repo.manifest.readdelta(mfnode)
359 376 # store file nodes we must see
360 377 for f, n in mfest.iteritems():
361 378 needfiles.setdefault(f, set()).add(n)
362 379
363 380 # process the files
364 381 repo.ui.status(_("adding file changes\n"))
365 382 self.callback = None
366 383 pr = prog(_('files'), efiles)
367 384 newrevs, newfiles = _addchangegroupfiles(
368 385 repo, self, revmap, trp, pr, needfiles)
369 386 revisions += newrevs
370 387 files += newfiles
371 388
372 389 dh = 0
373 390 if oldheads:
374 391 heads = cl.heads()
375 392 dh = len(heads) - len(oldheads)
376 393 for h in heads:
377 394 if h not in oldheads and repo[h].closesbranch():
378 395 dh -= 1
379 396 htext = ""
380 397 if dh:
381 398 htext = _(" (%+d heads)") % dh
382 399
383 400 repo.ui.status(_("added %d changesets"
384 401 " with %d changes to %d files%s\n")
385 402 % (changesets, revisions, files, htext))
386 403 repo.invalidatevolatilesets()
387 404
388 405 if changesets > 0:
389 406 p = lambda: tr.writepending() and repo.root or ""
390 407 if 'node' not in tr.hookargs:
391 408 tr.hookargs['node'] = hex(cl.node(clstart))
392 409 hookargs = dict(tr.hookargs)
393 410 else:
394 411 hookargs = dict(tr.hookargs)
395 412 hookargs['node'] = hex(cl.node(clstart))
396 413 repo.hook('pretxnchangegroup', throw=True, pending=p,
397 414 **hookargs)
398 415
399 416 added = [cl.node(r) for r in xrange(clstart, clend)]
400 417 publishing = repo.publishing()
401 418 if srctype in ('push', 'serve'):
402 419 # Old servers can not push the boundary themselves.
403 420 # New servers won't push the boundary if changeset already
404 421 # exists locally as secret
405 422 #
406 423 # We should not use added here but the list of all change in
407 424 # the bundle
408 425 if publishing:
409 426 phases.advanceboundary(repo, tr, phases.public, srccontent)
410 427 else:
411 428 # Those changesets have been pushed from the outside, their
412 429 # phases are going to be pushed alongside. Therefor
413 430 # `targetphase` is ignored.
414 431 phases.advanceboundary(repo, tr, phases.draft, srccontent)
415 432 phases.retractboundary(repo, tr, phases.draft, added)
416 433 elif srctype != 'strip':
417 434 # publishing only alter behavior during push
418 435 #
419 436 # strip should not touch boundary at all
420 437 phases.retractboundary(repo, tr, targetphase, added)
421 438
422 439 if changesets > 0:
423 440 if srctype != 'strip':
424 441 # During strip, branchcache is invalid but coming call to
425 442 # `destroyed` will repair it.
426 443 # In other case we can safely update cache on disk.
427 444 branchmap.updatecache(repo.filtered('served'))
428 445
429 446 def runhooks():
430 447 # These hooks run when the lock releases, not when the
431 448 # transaction closes. So it's possible for the changelog
432 449 # to have changed since we last saw it.
433 450 if clstart >= len(repo):
434 451 return
435 452
436 453 # forcefully update the on-disk branch cache
437 454 repo.ui.debug("updating the branch cache\n")
438 455 repo.hook("changegroup", **hookargs)
439 456
440 457 for n in added:
441 458 args = hookargs.copy()
442 459 args['node'] = hex(n)
443 460 repo.hook("incoming", **args)
444 461
445 462 newheads = [h for h in repo.heads() if h not in oldheads]
446 463 repo.ui.log("incoming",
447 464 "%s incoming changes - new heads: %s\n",
448 465 len(added),
449 466 ', '.join([hex(c[:6]) for c in newheads]))
450 467
451 468 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
452 469 lambda tr: repo._afterlock(runhooks))
453 470
454 471 tr.close()
455 472
456 473 finally:
457 474 tr.release()
458 475 repo.ui.flush()
459 476 # never return 0 here:
460 477 if dh < 0:
461 478 return dh - 1
462 479 else:
463 480 return dh + 1
464 481
465 482 class cg2unpacker(cg1unpacker):
483 """Unpacker for cg2 streams.
484
485 cg2 streams add support for generaldelta, so the delta header
486 format is slightly different. All other features about the data
487 remain the same.
488 """
466 489 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
467 490 deltaheadersize = struct.calcsize(deltaheader)
468 491 version = '02'
469 492
470 493 def _deltaheader(self, headertuple, prevnode):
471 494 node, p1, p2, deltabase, cs = headertuple
472 495 return node, p1, p2, deltabase, cs
473 496
474 497 class headerlessfixup(object):
475 498 def __init__(self, fh, h):
476 499 self._h = h
477 500 self._fh = fh
478 501 def read(self, n):
479 502 if self._h:
480 503 d, self._h = self._h[:n], self._h[n:]
481 504 if len(d) < n:
482 505 d += readexactly(self._fh, n - len(d))
483 506 return d
484 507 return readexactly(self._fh, n)
485 508
486 509 class cg1packer(object):
487 510 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
488 511 version = '01'
489 512 def __init__(self, repo, bundlecaps=None):
490 513 """Given a source repo, construct a bundler.
491 514
492 515 bundlecaps is optional and can be used to specify the set of
493 516 capabilities which can be used to build the bundle.
494 517 """
495 518 # Set of capabilities we can use to build the bundle.
496 519 if bundlecaps is None:
497 520 bundlecaps = set()
498 521 self._bundlecaps = bundlecaps
499 522 # experimental config: bundle.reorder
500 523 reorder = repo.ui.config('bundle', 'reorder', 'auto')
501 524 if reorder == 'auto':
502 525 reorder = None
503 526 else:
504 527 reorder = util.parsebool(reorder)
505 528 self._repo = repo
506 529 self._reorder = reorder
507 530 self._progress = repo.ui.progress
508 531 if self._repo.ui.verbose and not self._repo.ui.debugflag:
509 532 self._verbosenote = self._repo.ui.note
510 533 else:
511 534 self._verbosenote = lambda s: None
512 535
513 536 def close(self):
514 537 return closechunk()
515 538
516 539 def fileheader(self, fname):
517 540 return chunkheader(len(fname)) + fname
518 541
519 542 def group(self, nodelist, revlog, lookup, units=None):
520 543 """Calculate a delta group, yielding a sequence of changegroup chunks
521 544 (strings).
522 545
523 546 Given a list of changeset revs, return a set of deltas and
524 547 metadata corresponding to nodes. The first delta is
525 548 first parent(nodelist[0]) -> nodelist[0], the receiver is
526 549 guaranteed to have this parent as it has all history before
527 550 these changesets. In the case firstparent is nullrev the
528 551 changegroup starts with a full revision.
529 552
530 553 If units is not None, progress detail will be generated, units specifies
531 554 the type of revlog that is touched (changelog, manifest, etc.).
532 555 """
533 556 # if we don't have any revisions touched by these changesets, bail
534 557 if len(nodelist) == 0:
535 558 yield self.close()
536 559 return
537 560
538 561 # for generaldelta revlogs, we linearize the revs; this will both be
539 562 # much quicker and generate a much smaller bundle
540 563 if (revlog._generaldelta and self._reorder is None) or self._reorder:
541 564 dag = dagutil.revlogdag(revlog)
542 565 revs = set(revlog.rev(n) for n in nodelist)
543 566 revs = dag.linearize(revs)
544 567 else:
545 568 revs = sorted([revlog.rev(n) for n in nodelist])
546 569
547 570 # add the parent of the first rev
548 571 p = revlog.parentrevs(revs[0])[0]
549 572 revs.insert(0, p)
550 573
551 574 # build deltas
552 575 total = len(revs) - 1
553 576 msgbundling = _('bundling')
554 577 for r in xrange(len(revs) - 1):
555 578 if units is not None:
556 579 self._progress(msgbundling, r + 1, unit=units, total=total)
557 580 prev, curr = revs[r], revs[r + 1]
558 581 linknode = lookup(revlog.node(curr))
559 582 for c in self.revchunk(revlog, curr, prev, linknode):
560 583 yield c
561 584
562 585 if units is not None:
563 586 self._progress(msgbundling, None)
564 587 yield self.close()
565 588
566 589 # filter any nodes that claim to be part of the known set
567 590 def prune(self, revlog, missing, commonrevs):
568 591 rr, rl = revlog.rev, revlog.linkrev
569 592 return [n for n in missing if rl(rr(n)) not in commonrevs]
570 593
571 594 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
572 595 '''yield a sequence of changegroup chunks (strings)'''
573 596 repo = self._repo
574 597 cl = repo.changelog
575 598 ml = repo.manifest
576 599
577 600 clrevorder = {}
578 601 mfs = {} # needed manifests
579 602 fnodes = {} # needed file nodes
580 603 changedfiles = set()
581 604
582 605 # Callback for the changelog, used to collect changed files and manifest
583 606 # nodes.
584 607 # Returns the linkrev node (identity in the changelog case).
585 608 def lookupcl(x):
586 609 c = cl.read(x)
587 610 clrevorder[x] = len(clrevorder)
588 611 changedfiles.update(c[3])
589 612 # record the first changeset introducing this manifest version
590 613 mfs.setdefault(c[0], x)
591 614 return x
592 615
593 616 self._verbosenote(_('uncompressed size of bundle content:\n'))
594 617 size = 0
595 618 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
596 619 size += len(chunk)
597 620 yield chunk
598 621 self._verbosenote(_('%8.i (changelog)\n') % size)
599 622
600 623 # We need to make sure that the linkrev in the changegroup refers to
601 624 # the first changeset that introduced the manifest or file revision.
602 625 # The fastpath is usually safer than the slowpath, because the filelogs
603 626 # are walked in revlog order.
604 627 #
605 628 # When taking the slowpath with reorder=None and the manifest revlog
606 629 # uses generaldelta, the manifest may be walked in the "wrong" order.
607 630 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
608 631 # cc0ff93d0c0c).
609 632 #
610 633 # When taking the fastpath, we are only vulnerable to reordering
611 634 # of the changelog itself. The changelog never uses generaldelta, so
612 635 # it is only reordered when reorder=True. To handle this case, we
613 636 # simply take the slowpath, which already has the 'clrevorder' logic.
614 637 # This was also fixed in cc0ff93d0c0c.
615 638 fastpathlinkrev = fastpathlinkrev and not self._reorder
616 639 # Callback for the manifest, used to collect linkrevs for filelog
617 640 # revisions.
618 641 # Returns the linkrev node (collected in lookupcl).
619 642 def lookupmf(x):
620 643 clnode = mfs[x]
621 644 if not fastpathlinkrev:
622 645 mdata = ml.readfast(x)
623 646 for f, n in mdata.iteritems():
624 647 if f in changedfiles:
625 648 # record the first changeset introducing this filelog
626 649 # version
627 650 fclnodes = fnodes.setdefault(f, {})
628 651 fclnode = fclnodes.setdefault(n, clnode)
629 652 if clrevorder[clnode] < clrevorder[fclnode]:
630 653 fclnodes[n] = clnode
631 654 return clnode
632 655
633 656 mfnodes = self.prune(ml, mfs, commonrevs)
634 657 size = 0
635 658 for chunk in self.group(mfnodes, ml, lookupmf, units=_('manifests')):
636 659 size += len(chunk)
637 660 yield chunk
638 661 self._verbosenote(_('%8.i (manifests)\n') % size)
639 662
640 663 mfs.clear()
641 664 clrevs = set(cl.rev(x) for x in clnodes)
642 665
643 666 def linknodes(filerevlog, fname):
644 667 if fastpathlinkrev:
645 668 llr = filerevlog.linkrev
646 669 def genfilenodes():
647 670 for r in filerevlog:
648 671 linkrev = llr(r)
649 672 if linkrev in clrevs:
650 673 yield filerevlog.node(r), cl.node(linkrev)
651 674 return dict(genfilenodes())
652 675 return fnodes.get(fname, {})
653 676
654 677 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
655 678 source):
656 679 yield chunk
657 680
658 681 yield self.close()
659 682
660 683 if clnodes:
661 684 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
662 685
663 686 # The 'source' parameter is useful for extensions
664 687 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
665 688 repo = self._repo
666 689 progress = self._progress
667 690 msgbundling = _('bundling')
668 691
669 692 total = len(changedfiles)
670 693 # for progress output
671 694 msgfiles = _('files')
672 695 for i, fname in enumerate(sorted(changedfiles)):
673 696 filerevlog = repo.file(fname)
674 697 if not filerevlog:
675 698 raise error.Abort(_("empty or missing revlog for %s") % fname)
676 699
677 700 linkrevnodes = linknodes(filerevlog, fname)
678 701 # Lookup for filenodes, we collected the linkrev nodes above in the
679 702 # fastpath case and with lookupmf in the slowpath case.
680 703 def lookupfilelog(x):
681 704 return linkrevnodes[x]
682 705
683 706 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
684 707 if filenodes:
685 708 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
686 709 total=total)
687 710 h = self.fileheader(fname)
688 711 size = len(h)
689 712 yield h
690 713 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
691 714 size += len(chunk)
692 715 yield chunk
693 716 self._verbosenote(_('%8.i %s\n') % (size, fname))
694 717 progress(msgbundling, None)
695 718
696 719 def deltaparent(self, revlog, rev, p1, p2, prev):
697 720 return prev
698 721
699 722 def revchunk(self, revlog, rev, prev, linknode):
700 723 node = revlog.node(rev)
701 724 p1, p2 = revlog.parentrevs(rev)
702 725 base = self.deltaparent(revlog, rev, p1, p2, prev)
703 726
704 727 prefix = ''
705 728 if revlog.iscensored(base) or revlog.iscensored(rev):
706 729 try:
707 730 delta = revlog.revision(node)
708 731 except error.CensoredNodeError as e:
709 732 delta = e.tombstone
710 733 if base == nullrev:
711 734 prefix = mdiff.trivialdiffheader(len(delta))
712 735 else:
713 736 baselen = revlog.rawsize(base)
714 737 prefix = mdiff.replacediffheader(baselen, len(delta))
715 738 elif base == nullrev:
716 739 delta = revlog.revision(node)
717 740 prefix = mdiff.trivialdiffheader(len(delta))
718 741 else:
719 742 delta = revlog.revdiff(base, rev)
720 743 p1n, p2n = revlog.parents(node)
721 744 basenode = revlog.node(base)
722 745 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode)
723 746 meta += prefix
724 747 l = len(meta) + len(delta)
725 748 yield chunkheader(l)
726 749 yield meta
727 750 yield delta
728 751 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
729 752 # do nothing with basenode, it is implicitly the previous one in HG10
730 753 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
731 754
732 755 class cg2packer(cg1packer):
733 756 version = '02'
734 757 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
735 758
736 759 def __init__(self, repo, bundlecaps=None):
737 760 super(cg2packer, self).__init__(repo, bundlecaps)
738 761 if self._reorder is None:
739 762 # Since generaldelta is directly supported by cg2, reordering
740 763 # generally doesn't help, so we disable it by default (treating
741 764 # bundle.reorder=auto just like bundle.reorder=False).
742 765 self._reorder = False
743 766
744 767 def deltaparent(self, revlog, rev, p1, p2, prev):
745 768 dp = revlog.deltaparent(rev)
746 769 # avoid storing full revisions; pick prev in those cases
747 770 # also pick prev when we can't be sure remote has dp
748 771 if dp == nullrev or (dp != p1 and dp != p2 and dp != prev):
749 772 return prev
750 773 return dp
751 774
752 775 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
753 776 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
754 777
755 778 packermap = {'01': (cg1packer, cg1unpacker),
756 779 '02': (cg2packer, cg2unpacker)}
757 780
758 781 def _changegroupinfo(repo, nodes, source):
759 782 if repo.ui.verbose or source == 'bundle':
760 783 repo.ui.status(_("%d changesets found\n") % len(nodes))
761 784 if repo.ui.debugflag:
762 785 repo.ui.debug("list of changesets:\n")
763 786 for node in nodes:
764 787 repo.ui.debug("%s\n" % hex(node))
765 788
766 789 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
767 790 repo = repo.unfiltered()
768 791 commonrevs = outgoing.common
769 792 csets = outgoing.missing
770 793 heads = outgoing.missingheads
771 794 # We go through the fast path if we get told to, or if all (unfiltered
772 795 # heads have been requested (since we then know there all linkrevs will
773 796 # be pulled by the client).
774 797 heads.sort()
775 798 fastpathlinkrev = fastpath or (
776 799 repo.filtername is None and heads == sorted(repo.heads()))
777 800
778 801 repo.hook('preoutgoing', throw=True, source=source)
779 802 _changegroupinfo(repo, csets, source)
780 803 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
781 804
782 805 def getsubset(repo, outgoing, bundler, source, fastpath=False):
783 806 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
784 807 return packermap[bundler.version][1](util.chunkbuffer(gengroup), None)
785 808
786 809 def changegroupsubset(repo, roots, heads, source, version='01'):
787 810 """Compute a changegroup consisting of all the nodes that are
788 811 descendants of any of the roots and ancestors of any of the heads.
789 812 Return a chunkbuffer object whose read() method will return
790 813 successive changegroup chunks.
791 814
792 815 It is fairly complex as determining which filenodes and which
793 816 manifest nodes need to be included for the changeset to be complete
794 817 is non-trivial.
795 818
796 819 Another wrinkle is doing the reverse, figuring out which changeset in
797 820 the changegroup a particular filenode or manifestnode belongs to.
798 821 """
799 822 cl = repo.changelog
800 823 if not roots:
801 824 roots = [nullid]
802 825 discbases = []
803 826 for n in roots:
804 827 discbases.extend([p for p in cl.parents(n) if p != nullid])
805 828 # TODO: remove call to nodesbetween.
806 829 csets, roots, heads = cl.nodesbetween(roots, heads)
807 830 included = set(csets)
808 831 discbases = [n for n in discbases if n not in included]
809 832 outgoing = discovery.outgoing(cl, discbases, heads)
810 833 bundler = packermap[version][0](repo)
811 834 return getsubset(repo, outgoing, bundler, source)
812 835
813 836 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
814 837 version='01'):
815 838 """Like getbundle, but taking a discovery.outgoing as an argument.
816 839
817 840 This is only implemented for local repos and reuses potentially
818 841 precomputed sets in outgoing. Returns a raw changegroup generator."""
819 842 if not outgoing.missing:
820 843 return None
821 844 bundler = packermap[version][0](repo, bundlecaps)
822 845 return getsubsetraw(repo, outgoing, bundler, source)
823 846
824 847 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None,
825 848 version='01'):
826 849 """Like getbundle, but taking a discovery.outgoing as an argument.
827 850
828 851 This is only implemented for local repos and reuses potentially
829 852 precomputed sets in outgoing."""
830 853 if not outgoing.missing:
831 854 return None
832 855 bundler = packermap[version][0](repo, bundlecaps)
833 856 return getsubset(repo, outgoing, bundler, source)
834 857
835 858 def computeoutgoing(repo, heads, common):
836 859 """Computes which revs are outgoing given a set of common
837 860 and a set of heads.
838 861
839 862 This is a separate function so extensions can have access to
840 863 the logic.
841 864
842 865 Returns a discovery.outgoing object.
843 866 """
844 867 cl = repo.changelog
845 868 if common:
846 869 hasnode = cl.hasnode
847 870 common = [n for n in common if hasnode(n)]
848 871 else:
849 872 common = [nullid]
850 873 if not heads:
851 874 heads = cl.heads()
852 875 return discovery.outgoing(cl, common, heads)
853 876
854 877 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None,
855 878 version='01'):
856 879 """Like changegroupsubset, but returns the set difference between the
857 880 ancestors of heads and the ancestors common.
858 881
859 882 If heads is None, use the local heads. If common is None, use [nullid].
860 883
861 884 The nodes in common might not all be known locally due to the way the
862 885 current discovery protocol works.
863 886 """
864 887 outgoing = computeoutgoing(repo, heads, common)
865 888 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps,
866 889 version=version)
867 890
868 891 def changegroup(repo, basenodes, source):
869 892 # to avoid a race we use changegroupsubset() (issue1320)
870 893 return changegroupsubset(repo, basenodes, repo.heads(), source)
871 894
872 895 def _addchangegroupfiles(repo, source, revmap, trp, pr, needfiles):
873 896 revisions = 0
874 897 files = 0
875 898 while True:
876 899 chunkdata = source.filelogheader()
877 900 if not chunkdata:
878 901 break
879 902 f = chunkdata["filename"]
880 903 repo.ui.debug("adding %s revisions\n" % f)
881 904 pr()
882 905 fl = repo.file(f)
883 906 o = len(fl)
884 907 try:
885 908 if not fl.addgroup(source, revmap, trp):
886 909 raise error.Abort(_("received file revlog group is empty"))
887 910 except error.CensoredBaseError as e:
888 911 raise error.Abort(_("received delta base is censored: %s") % e)
889 912 revisions += len(fl) - o
890 913 files += 1
891 914 if f in needfiles:
892 915 needs = needfiles[f]
893 916 for new in xrange(o, len(fl)):
894 917 n = fl.node(new)
895 918 if n in needs:
896 919 needs.remove(n)
897 920 else:
898 921 raise error.Abort(
899 922 _("received spurious file revlog entry"))
900 923 if not needs:
901 924 del needfiles[f]
902 925 repo.ui.progress(_('files'), None)
903 926
904 927 for f, needs in needfiles.iteritems():
905 928 fl = repo.file(f)
906 929 for n in needs:
907 930 try:
908 931 fl.rev(n)
909 932 except error.LookupError:
910 933 raise error.Abort(
911 934 _('missing file data for %s:%s - run hg verify') %
912 935 (f, hex(n)))
913 936
914 937 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now