##// END OF EJS Templates
changegroup: use compression engines API...
Gregory Szorc -
r30354:a37a96d8 default
parent child Browse files
Show More
@@ -1,1046 +1,1048
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullrev,
19 19 short,
20 20 )
21 21
22 22 from . import (
23 23 branchmap,
24 24 dagutil,
25 25 discovery,
26 26 error,
27 27 mdiff,
28 28 phases,
29 29 util,
30 30 )
31 31
32 32 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
33 33 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
34 34 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
35 35
36 36 def readexactly(stream, n):
37 37 '''read n bytes from stream.read and abort if less was available'''
38 38 s = stream.read(n)
39 39 if len(s) < n:
40 40 raise error.Abort(_("stream ended unexpectedly"
41 41 " (got %d bytes, expected %d)")
42 42 % (len(s), n))
43 43 return s
44 44
45 45 def getchunk(stream):
46 46 """return the next chunk from stream as a string"""
47 47 d = readexactly(stream, 4)
48 48 l = struct.unpack(">l", d)[0]
49 49 if l <= 4:
50 50 if l:
51 51 raise error.Abort(_("invalid chunk length %d") % l)
52 52 return ""
53 53 return readexactly(stream, l - 4)
54 54
55 55 def chunkheader(length):
56 56 """return a changegroup chunk header (string)"""
57 57 return struct.pack(">l", length + 4)
58 58
59 59 def closechunk():
60 60 """return a changegroup chunk header (string) for a zero-length chunk"""
61 61 return struct.pack(">l", 0)
62 62
63 63 def combineresults(results):
64 64 """logic to combine 0 or more addchangegroup results into one"""
65 65 changedheads = 0
66 66 result = 1
67 67 for ret in results:
68 68 # If any changegroup result is 0, return 0
69 69 if ret == 0:
70 70 result = 0
71 71 break
72 72 if ret < -1:
73 73 changedheads += ret + 1
74 74 elif ret > 1:
75 75 changedheads += ret - 1
76 76 if changedheads > 0:
77 77 result = 1 + changedheads
78 78 elif changedheads < 0:
79 79 result = -1 + changedheads
80 80 return result
81 81
82 82 def writechunks(ui, chunks, filename, vfs=None):
83 83 """Write chunks to a file and return its filename.
84 84
85 85 The stream is assumed to be a bundle file.
86 86 Existing files will not be overwritten.
87 87 If no filename is specified, a temporary file is created.
88 88 """
89 89 fh = None
90 90 cleanup = None
91 91 try:
92 92 if filename:
93 93 if vfs:
94 94 fh = vfs.open(filename, "wb")
95 95 else:
96 96 # Increase default buffer size because default is usually
97 97 # small (4k is common on Linux).
98 98 fh = open(filename, "wb", 131072)
99 99 else:
100 100 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
101 101 fh = os.fdopen(fd, "wb")
102 102 cleanup = filename
103 103 for c in chunks:
104 104 fh.write(c)
105 105 cleanup = None
106 106 return filename
107 107 finally:
108 108 if fh is not None:
109 109 fh.close()
110 110 if cleanup is not None:
111 111 if filename and vfs:
112 112 vfs.unlink(cleanup)
113 113 else:
114 114 os.unlink(cleanup)
115 115
116 116 class cg1unpacker(object):
117 117 """Unpacker for cg1 changegroup streams.
118 118
119 119 A changegroup unpacker handles the framing of the revision data in
120 120 the wire format. Most consumers will want to use the apply()
121 121 method to add the changes from the changegroup to a repository.
122 122
123 123 If you're forwarding a changegroup unmodified to another consumer,
124 124 use getchunks(), which returns an iterator of changegroup
125 125 chunks. This is mostly useful for cases where you need to know the
126 126 data stream has ended by observing the end of the changegroup.
127 127
128 128 deltachunk() is useful only if you're applying delta data. Most
129 129 consumers should prefer apply() instead.
130 130
131 131 A few other public methods exist. Those are used only for
132 132 bundlerepo and some debug commands - their use is discouraged.
133 133 """
134 134 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
135 135 deltaheadersize = struct.calcsize(deltaheader)
136 136 version = '01'
137 137 _grouplistcount = 1 # One list of files after the manifests
138 138
139 139 def __init__(self, fh, alg, extras=None):
140 if alg == 'UN':
141 alg = None # get more modern without breaking too much
142 if not alg in util.decompressors:
140 if alg is None:
141 alg = 'UN'
142 if alg not in util.compengines.supportedbundletypes:
143 143 raise error.Abort(_('unknown stream compression type: %s')
144 144 % alg)
145 145 if alg == 'BZ':
146 146 alg = '_truncatedBZ'
147 self._stream = util.decompressors[alg](fh)
147
148 compengine = util.compengines.forbundletype(alg)
149 self._stream = compengine.decompressorreader(fh)
148 150 self._type = alg
149 151 self.extras = extras or {}
150 152 self.callback = None
151 153
152 154 # These methods (compressed, read, seek, tell) all appear to only
153 155 # be used by bundlerepo, but it's a little hard to tell.
154 156 def compressed(self):
155 157 return self._type is not None
156 158 def read(self, l):
157 159 return self._stream.read(l)
158 160 def seek(self, pos):
159 161 return self._stream.seek(pos)
160 162 def tell(self):
161 163 return self._stream.tell()
162 164 def close(self):
163 165 return self._stream.close()
164 166
165 167 def _chunklength(self):
166 168 d = readexactly(self._stream, 4)
167 169 l = struct.unpack(">l", d)[0]
168 170 if l <= 4:
169 171 if l:
170 172 raise error.Abort(_("invalid chunk length %d") % l)
171 173 return 0
172 174 if self.callback:
173 175 self.callback()
174 176 return l - 4
175 177
176 178 def changelogheader(self):
177 179 """v10 does not have a changelog header chunk"""
178 180 return {}
179 181
180 182 def manifestheader(self):
181 183 """v10 does not have a manifest header chunk"""
182 184 return {}
183 185
184 186 def filelogheader(self):
185 187 """return the header of the filelogs chunk, v10 only has the filename"""
186 188 l = self._chunklength()
187 189 if not l:
188 190 return {}
189 191 fname = readexactly(self._stream, l)
190 192 return {'filename': fname}
191 193
192 194 def _deltaheader(self, headertuple, prevnode):
193 195 node, p1, p2, cs = headertuple
194 196 if prevnode is None:
195 197 deltabase = p1
196 198 else:
197 199 deltabase = prevnode
198 200 flags = 0
199 201 return node, p1, p2, deltabase, cs, flags
200 202
201 203 def deltachunk(self, prevnode):
202 204 l = self._chunklength()
203 205 if not l:
204 206 return {}
205 207 headerdata = readexactly(self._stream, self.deltaheadersize)
206 208 header = struct.unpack(self.deltaheader, headerdata)
207 209 delta = readexactly(self._stream, l - self.deltaheadersize)
208 210 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
209 211 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
210 212 'deltabase': deltabase, 'delta': delta, 'flags': flags}
211 213
212 214 def getchunks(self):
213 215 """returns all the chunks contains in the bundle
214 216
215 217 Used when you need to forward the binary stream to a file or another
216 218 network API. To do so, it parse the changegroup data, otherwise it will
217 219 block in case of sshrepo because it don't know the end of the stream.
218 220 """
219 221 # an empty chunkgroup is the end of the changegroup
220 222 # a changegroup has at least 2 chunkgroups (changelog and manifest).
221 223 # after that, changegroup versions 1 and 2 have a series of groups
222 224 # with one group per file. changegroup 3 has a series of directory
223 225 # manifests before the files.
224 226 count = 0
225 227 emptycount = 0
226 228 while emptycount < self._grouplistcount:
227 229 empty = True
228 230 count += 1
229 231 while True:
230 232 chunk = getchunk(self)
231 233 if not chunk:
232 234 if empty and count > 2:
233 235 emptycount += 1
234 236 break
235 237 empty = False
236 238 yield chunkheader(len(chunk))
237 239 pos = 0
238 240 while pos < len(chunk):
239 241 next = pos + 2**20
240 242 yield chunk[pos:next]
241 243 pos = next
242 244 yield closechunk()
243 245
244 246 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
245 247 # We know that we'll never have more manifests than we had
246 248 # changesets.
247 249 self.callback = prog(_('manifests'), numchanges)
248 250 # no need to check for empty manifest group here:
249 251 # if the result of the merge of 1 and 2 is the same in 3 and 4,
250 252 # no new manifest will be created and the manifest group will
251 253 # be empty during the pull
252 254 self.manifestheader()
253 255 repo.manifestlog._revlog.addgroup(self, revmap, trp)
254 256 repo.ui.progress(_('manifests'), None)
255 257 self.callback = None
256 258
257 259 def apply(self, repo, srctype, url, emptyok=False,
258 260 targetphase=phases.draft, expectedtotal=None):
259 261 """Add the changegroup returned by source.read() to this repo.
260 262 srctype is a string like 'push', 'pull', or 'unbundle'. url is
261 263 the URL of the repo where this changegroup is coming from.
262 264
263 265 Return an integer summarizing the change to this repo:
264 266 - nothing changed or no source: 0
265 267 - more heads than before: 1+added heads (2..n)
266 268 - fewer heads than before: -1-removed heads (-2..-n)
267 269 - number of heads stays the same: 1
268 270 """
269 271 repo = repo.unfiltered()
270 272 def csmap(x):
271 273 repo.ui.debug("add changeset %s\n" % short(x))
272 274 return len(cl)
273 275
274 276 def revmap(x):
275 277 return cl.rev(x)
276 278
277 279 changesets = files = revisions = 0
278 280
279 281 try:
280 282 with repo.transaction("\n".join([srctype,
281 283 util.hidepassword(url)])) as tr:
282 284 # The transaction could have been created before and already
283 285 # carries source information. In this case we use the top
284 286 # level data. We overwrite the argument because we need to use
285 287 # the top level value (if they exist) in this function.
286 288 srctype = tr.hookargs.setdefault('source', srctype)
287 289 url = tr.hookargs.setdefault('url', url)
288 290 repo.hook('prechangegroup', throw=True, **tr.hookargs)
289 291
290 292 # write changelog data to temp files so concurrent readers
291 293 # will not see an inconsistent view
292 294 cl = repo.changelog
293 295 cl.delayupdate(tr)
294 296 oldheads = cl.heads()
295 297
296 298 trp = weakref.proxy(tr)
297 299 # pull off the changeset group
298 300 repo.ui.status(_("adding changesets\n"))
299 301 clstart = len(cl)
300 302 class prog(object):
301 303 def __init__(self, step, total):
302 304 self._step = step
303 305 self._total = total
304 306 self._count = 1
305 307 def __call__(self):
306 308 repo.ui.progress(self._step, self._count,
307 309 unit=_('chunks'), total=self._total)
308 310 self._count += 1
309 311 self.callback = prog(_('changesets'), expectedtotal)
310 312
311 313 efiles = set()
312 314 def onchangelog(cl, node):
313 315 efiles.update(cl.readfiles(node))
314 316
315 317 self.changelogheader()
316 318 srccontent = cl.addgroup(self, csmap, trp,
317 319 addrevisioncb=onchangelog)
318 320 efiles = len(efiles)
319 321
320 322 if not (srccontent or emptyok):
321 323 raise error.Abort(_("received changelog group is empty"))
322 324 clend = len(cl)
323 325 changesets = clend - clstart
324 326 repo.ui.progress(_('changesets'), None)
325 327 self.callback = None
326 328
327 329 # pull off the manifest group
328 330 repo.ui.status(_("adding manifests\n"))
329 331 self._unpackmanifests(repo, revmap, trp, prog, changesets)
330 332
331 333 needfiles = {}
332 334 if repo.ui.configbool('server', 'validate', default=False):
333 335 cl = repo.changelog
334 336 ml = repo.manifestlog
335 337 # validate incoming csets have their manifests
336 338 for cset in xrange(clstart, clend):
337 339 mfnode = cl.changelogrevision(cset).manifest
338 340 mfest = ml[mfnode].readdelta()
339 341 # store file nodes we must see
340 342 for f, n in mfest.iteritems():
341 343 needfiles.setdefault(f, set()).add(n)
342 344
343 345 # process the files
344 346 repo.ui.status(_("adding file changes\n"))
345 347 newrevs, newfiles = _addchangegroupfiles(
346 348 repo, self, revmap, trp, efiles, needfiles)
347 349 revisions += newrevs
348 350 files += newfiles
349 351
350 352 dh = 0
351 353 if oldheads:
352 354 heads = cl.heads()
353 355 dh = len(heads) - len(oldheads)
354 356 for h in heads:
355 357 if h not in oldheads and repo[h].closesbranch():
356 358 dh -= 1
357 359 htext = ""
358 360 if dh:
359 361 htext = _(" (%+d heads)") % dh
360 362
361 363 repo.ui.status(_("added %d changesets"
362 364 " with %d changes to %d files%s\n")
363 365 % (changesets, revisions, files, htext))
364 366 repo.invalidatevolatilesets()
365 367
366 368 if changesets > 0:
367 369 if 'node' not in tr.hookargs:
368 370 tr.hookargs['node'] = hex(cl.node(clstart))
369 371 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
370 372 hookargs = dict(tr.hookargs)
371 373 else:
372 374 hookargs = dict(tr.hookargs)
373 375 hookargs['node'] = hex(cl.node(clstart))
374 376 hookargs['node_last'] = hex(cl.node(clend - 1))
375 377 repo.hook('pretxnchangegroup', throw=True, **hookargs)
376 378
377 379 added = [cl.node(r) for r in xrange(clstart, clend)]
378 380 publishing = repo.publishing()
379 381 if srctype in ('push', 'serve'):
380 382 # Old servers can not push the boundary themselves.
381 383 # New servers won't push the boundary if changeset already
382 384 # exists locally as secret
383 385 #
384 386 # We should not use added here but the list of all change in
385 387 # the bundle
386 388 if publishing:
387 389 phases.advanceboundary(repo, tr, phases.public,
388 390 srccontent)
389 391 else:
390 392 # Those changesets have been pushed from the
391 393 # outside, their phases are going to be pushed
392 394 # alongside. Therefor `targetphase` is
393 395 # ignored.
394 396 phases.advanceboundary(repo, tr, phases.draft,
395 397 srccontent)
396 398 phases.retractboundary(repo, tr, phases.draft, added)
397 399 elif srctype != 'strip':
398 400 # publishing only alter behavior during push
399 401 #
400 402 # strip should not touch boundary at all
401 403 phases.retractboundary(repo, tr, targetphase, added)
402 404
403 405 if changesets > 0:
404 406 if srctype != 'strip':
405 407 # During strip, branchcache is invalid but
406 408 # coming call to `destroyed` will repair it.
407 409 # In other case we can safely update cache on
408 410 # disk.
409 411 repo.ui.debug('updating the branch cache\n')
410 412 branchmap.updatecache(repo.filtered('served'))
411 413
412 414 def runhooks():
413 415 # These hooks run when the lock releases, not when the
414 416 # transaction closes. So it's possible for the changelog
415 417 # to have changed since we last saw it.
416 418 if clstart >= len(repo):
417 419 return
418 420
419 421 repo.hook("changegroup", **hookargs)
420 422
421 423 for n in added:
422 424 args = hookargs.copy()
423 425 args['node'] = hex(n)
424 426 del args['node_last']
425 427 repo.hook("incoming", **args)
426 428
427 429 newheads = [h for h in repo.heads()
428 430 if h not in oldheads]
429 431 repo.ui.log("incoming",
430 432 "%s incoming changes - new heads: %s\n",
431 433 len(added),
432 434 ', '.join([hex(c[:6]) for c in newheads]))
433 435
434 436 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
435 437 lambda tr: repo._afterlock(runhooks))
436 438 finally:
437 439 repo.ui.flush()
438 440 # never return 0 here:
439 441 if dh < 0:
440 442 return dh - 1
441 443 else:
442 444 return dh + 1
443 445
444 446 class cg2unpacker(cg1unpacker):
445 447 """Unpacker for cg2 streams.
446 448
447 449 cg2 streams add support for generaldelta, so the delta header
448 450 format is slightly different. All other features about the data
449 451 remain the same.
450 452 """
451 453 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
452 454 deltaheadersize = struct.calcsize(deltaheader)
453 455 version = '02'
454 456
455 457 def _deltaheader(self, headertuple, prevnode):
456 458 node, p1, p2, deltabase, cs = headertuple
457 459 flags = 0
458 460 return node, p1, p2, deltabase, cs, flags
459 461
460 462 class cg3unpacker(cg2unpacker):
461 463 """Unpacker for cg3 streams.
462 464
463 465 cg3 streams add support for exchanging treemanifests and revlog
464 466 flags. It adds the revlog flags to the delta header and an empty chunk
465 467 separating manifests and files.
466 468 """
467 469 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
468 470 deltaheadersize = struct.calcsize(deltaheader)
469 471 version = '03'
470 472 _grouplistcount = 2 # One list of manifests and one list of files
471 473
472 474 def _deltaheader(self, headertuple, prevnode):
473 475 node, p1, p2, deltabase, cs, flags = headertuple
474 476 return node, p1, p2, deltabase, cs, flags
475 477
476 478 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
477 479 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog,
478 480 numchanges)
479 481 for chunkdata in iter(self.filelogheader, {}):
480 482 # If we get here, there are directory manifests in the changegroup
481 483 d = chunkdata["filename"]
482 484 repo.ui.debug("adding %s revisions\n" % d)
483 485 dirlog = repo.manifestlog._revlog.dirlog(d)
484 486 if not dirlog.addgroup(self, revmap, trp):
485 487 raise error.Abort(_("received dir revlog group is empty"))
486 488
487 489 class headerlessfixup(object):
488 490 def __init__(self, fh, h):
489 491 self._h = h
490 492 self._fh = fh
491 493 def read(self, n):
492 494 if self._h:
493 495 d, self._h = self._h[:n], self._h[n:]
494 496 if len(d) < n:
495 497 d += readexactly(self._fh, n - len(d))
496 498 return d
497 499 return readexactly(self._fh, n)
498 500
499 501 class cg1packer(object):
500 502 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
501 503 version = '01'
502 504 def __init__(self, repo, bundlecaps=None):
503 505 """Given a source repo, construct a bundler.
504 506
505 507 bundlecaps is optional and can be used to specify the set of
506 508 capabilities which can be used to build the bundle.
507 509 """
508 510 # Set of capabilities we can use to build the bundle.
509 511 if bundlecaps is None:
510 512 bundlecaps = set()
511 513 self._bundlecaps = bundlecaps
512 514 # experimental config: bundle.reorder
513 515 reorder = repo.ui.config('bundle', 'reorder', 'auto')
514 516 if reorder == 'auto':
515 517 reorder = None
516 518 else:
517 519 reorder = util.parsebool(reorder)
518 520 self._repo = repo
519 521 self._reorder = reorder
520 522 self._progress = repo.ui.progress
521 523 if self._repo.ui.verbose and not self._repo.ui.debugflag:
522 524 self._verbosenote = self._repo.ui.note
523 525 else:
524 526 self._verbosenote = lambda s: None
525 527
526 528 def close(self):
527 529 return closechunk()
528 530
529 531 def fileheader(self, fname):
530 532 return chunkheader(len(fname)) + fname
531 533
532 534 # Extracted both for clarity and for overriding in extensions.
533 535 def _sortgroup(self, revlog, nodelist, lookup):
534 536 """Sort nodes for change group and turn them into revnums."""
535 537 # for generaldelta revlogs, we linearize the revs; this will both be
536 538 # much quicker and generate a much smaller bundle
537 539 if (revlog._generaldelta and self._reorder is None) or self._reorder:
538 540 dag = dagutil.revlogdag(revlog)
539 541 return dag.linearize(set(revlog.rev(n) for n in nodelist))
540 542 else:
541 543 return sorted([revlog.rev(n) for n in nodelist])
542 544
543 545 def group(self, nodelist, revlog, lookup, units=None):
544 546 """Calculate a delta group, yielding a sequence of changegroup chunks
545 547 (strings).
546 548
547 549 Given a list of changeset revs, return a set of deltas and
548 550 metadata corresponding to nodes. The first delta is
549 551 first parent(nodelist[0]) -> nodelist[0], the receiver is
550 552 guaranteed to have this parent as it has all history before
551 553 these changesets. In the case firstparent is nullrev the
552 554 changegroup starts with a full revision.
553 555
554 556 If units is not None, progress detail will be generated, units specifies
555 557 the type of revlog that is touched (changelog, manifest, etc.).
556 558 """
557 559 # if we don't have any revisions touched by these changesets, bail
558 560 if len(nodelist) == 0:
559 561 yield self.close()
560 562 return
561 563
562 564 revs = self._sortgroup(revlog, nodelist, lookup)
563 565
564 566 # add the parent of the first rev
565 567 p = revlog.parentrevs(revs[0])[0]
566 568 revs.insert(0, p)
567 569
568 570 # build deltas
569 571 total = len(revs) - 1
570 572 msgbundling = _('bundling')
571 573 for r in xrange(len(revs) - 1):
572 574 if units is not None:
573 575 self._progress(msgbundling, r + 1, unit=units, total=total)
574 576 prev, curr = revs[r], revs[r + 1]
575 577 linknode = lookup(revlog.node(curr))
576 578 for c in self.revchunk(revlog, curr, prev, linknode):
577 579 yield c
578 580
579 581 if units is not None:
580 582 self._progress(msgbundling, None)
581 583 yield self.close()
582 584
583 585 # filter any nodes that claim to be part of the known set
584 586 def prune(self, revlog, missing, commonrevs):
585 587 rr, rl = revlog.rev, revlog.linkrev
586 588 return [n for n in missing if rl(rr(n)) not in commonrevs]
587 589
588 590 def _packmanifests(self, dir, mfnodes, lookuplinknode):
589 591 """Pack flat manifests into a changegroup stream."""
590 592 assert not dir
591 593 for chunk in self.group(mfnodes, self._repo.manifestlog._revlog,
592 594 lookuplinknode, units=_('manifests')):
593 595 yield chunk
594 596
595 597 def _manifestsdone(self):
596 598 return ''
597 599
598 600 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
599 601 '''yield a sequence of changegroup chunks (strings)'''
600 602 repo = self._repo
601 603 cl = repo.changelog
602 604
603 605 clrevorder = {}
604 606 mfs = {} # needed manifests
605 607 fnodes = {} # needed file nodes
606 608 changedfiles = set()
607 609
608 610 # Callback for the changelog, used to collect changed files and manifest
609 611 # nodes.
610 612 # Returns the linkrev node (identity in the changelog case).
611 613 def lookupcl(x):
612 614 c = cl.read(x)
613 615 clrevorder[x] = len(clrevorder)
614 616 n = c[0]
615 617 # record the first changeset introducing this manifest version
616 618 mfs.setdefault(n, x)
617 619 # Record a complete list of potentially-changed files in
618 620 # this manifest.
619 621 changedfiles.update(c[3])
620 622 return x
621 623
622 624 self._verbosenote(_('uncompressed size of bundle content:\n'))
623 625 size = 0
624 626 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
625 627 size += len(chunk)
626 628 yield chunk
627 629 self._verbosenote(_('%8.i (changelog)\n') % size)
628 630
629 631 # We need to make sure that the linkrev in the changegroup refers to
630 632 # the first changeset that introduced the manifest or file revision.
631 633 # The fastpath is usually safer than the slowpath, because the filelogs
632 634 # are walked in revlog order.
633 635 #
634 636 # When taking the slowpath with reorder=None and the manifest revlog
635 637 # uses generaldelta, the manifest may be walked in the "wrong" order.
636 638 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
637 639 # cc0ff93d0c0c).
638 640 #
639 641 # When taking the fastpath, we are only vulnerable to reordering
640 642 # of the changelog itself. The changelog never uses generaldelta, so
641 643 # it is only reordered when reorder=True. To handle this case, we
642 644 # simply take the slowpath, which already has the 'clrevorder' logic.
643 645 # This was also fixed in cc0ff93d0c0c.
644 646 fastpathlinkrev = fastpathlinkrev and not self._reorder
645 647 # Treemanifests don't work correctly with fastpathlinkrev
646 648 # either, because we don't discover which directory nodes to
647 649 # send along with files. This could probably be fixed.
648 650 fastpathlinkrev = fastpathlinkrev and (
649 651 'treemanifest' not in repo.requirements)
650 652
651 653 for chunk in self.generatemanifests(commonrevs, clrevorder,
652 654 fastpathlinkrev, mfs, fnodes):
653 655 yield chunk
654 656 mfs.clear()
655 657 clrevs = set(cl.rev(x) for x in clnodes)
656 658
657 659 if not fastpathlinkrev:
658 660 def linknodes(unused, fname):
659 661 return fnodes.get(fname, {})
660 662 else:
661 663 cln = cl.node
662 664 def linknodes(filerevlog, fname):
663 665 llr = filerevlog.linkrev
664 666 fln = filerevlog.node
665 667 revs = ((r, llr(r)) for r in filerevlog)
666 668 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
667 669
668 670 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
669 671 source):
670 672 yield chunk
671 673
672 674 yield self.close()
673 675
674 676 if clnodes:
675 677 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
676 678
677 679 def generatemanifests(self, commonrevs, clrevorder, fastpathlinkrev, mfs,
678 680 fnodes):
679 681 repo = self._repo
680 682 mfl = repo.manifestlog
681 683 dirlog = mfl._revlog.dirlog
682 684 tmfnodes = {'': mfs}
683 685
684 686 # Callback for the manifest, used to collect linkrevs for filelog
685 687 # revisions.
686 688 # Returns the linkrev node (collected in lookupcl).
687 689 def makelookupmflinknode(dir):
688 690 if fastpathlinkrev:
689 691 assert not dir
690 692 return mfs.__getitem__
691 693
692 694 def lookupmflinknode(x):
693 695 """Callback for looking up the linknode for manifests.
694 696
695 697 Returns the linkrev node for the specified manifest.
696 698
697 699 SIDE EFFECT:
698 700
699 701 1) fclnodes gets populated with the list of relevant
700 702 file nodes if we're not using fastpathlinkrev
701 703 2) When treemanifests are in use, collects treemanifest nodes
702 704 to send
703 705
704 706 Note that this means manifests must be completely sent to
705 707 the client before you can trust the list of files and
706 708 treemanifests to send.
707 709 """
708 710 clnode = tmfnodes[dir][x]
709 711 mdata = mfl.get(dir, x).readfast(shallow=True)
710 712 for p, n, fl in mdata.iterentries():
711 713 if fl == 't': # subdirectory manifest
712 714 subdir = dir + p + '/'
713 715 tmfclnodes = tmfnodes.setdefault(subdir, {})
714 716 tmfclnode = tmfclnodes.setdefault(n, clnode)
715 717 if clrevorder[clnode] < clrevorder[tmfclnode]:
716 718 tmfclnodes[n] = clnode
717 719 else:
718 720 f = dir + p
719 721 fclnodes = fnodes.setdefault(f, {})
720 722 fclnode = fclnodes.setdefault(n, clnode)
721 723 if clrevorder[clnode] < clrevorder[fclnode]:
722 724 fclnodes[n] = clnode
723 725 return clnode
724 726 return lookupmflinknode
725 727
726 728 size = 0
727 729 while tmfnodes:
728 730 dir = min(tmfnodes)
729 731 nodes = tmfnodes[dir]
730 732 prunednodes = self.prune(dirlog(dir), nodes, commonrevs)
731 733 if not dir or prunednodes:
732 734 for x in self._packmanifests(dir, prunednodes,
733 735 makelookupmflinknode(dir)):
734 736 size += len(x)
735 737 yield x
736 738 del tmfnodes[dir]
737 739 self._verbosenote(_('%8.i (manifests)\n') % size)
738 740 yield self._manifestsdone()
739 741
740 742 # The 'source' parameter is useful for extensions
741 743 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
742 744 repo = self._repo
743 745 progress = self._progress
744 746 msgbundling = _('bundling')
745 747
746 748 total = len(changedfiles)
747 749 # for progress output
748 750 msgfiles = _('files')
749 751 for i, fname in enumerate(sorted(changedfiles)):
750 752 filerevlog = repo.file(fname)
751 753 if not filerevlog:
752 754 raise error.Abort(_("empty or missing revlog for %s") % fname)
753 755
754 756 linkrevnodes = linknodes(filerevlog, fname)
755 757 # Lookup for filenodes, we collected the linkrev nodes above in the
756 758 # fastpath case and with lookupmf in the slowpath case.
757 759 def lookupfilelog(x):
758 760 return linkrevnodes[x]
759 761
760 762 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
761 763 if filenodes:
762 764 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
763 765 total=total)
764 766 h = self.fileheader(fname)
765 767 size = len(h)
766 768 yield h
767 769 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
768 770 size += len(chunk)
769 771 yield chunk
770 772 self._verbosenote(_('%8.i %s\n') % (size, fname))
771 773 progress(msgbundling, None)
772 774
773 775 def deltaparent(self, revlog, rev, p1, p2, prev):
774 776 return prev
775 777
776 778 def revchunk(self, revlog, rev, prev, linknode):
777 779 node = revlog.node(rev)
778 780 p1, p2 = revlog.parentrevs(rev)
779 781 base = self.deltaparent(revlog, rev, p1, p2, prev)
780 782
781 783 prefix = ''
782 784 if revlog.iscensored(base) or revlog.iscensored(rev):
783 785 try:
784 786 delta = revlog.revision(node)
785 787 except error.CensoredNodeError as e:
786 788 delta = e.tombstone
787 789 if base == nullrev:
788 790 prefix = mdiff.trivialdiffheader(len(delta))
789 791 else:
790 792 baselen = revlog.rawsize(base)
791 793 prefix = mdiff.replacediffheader(baselen, len(delta))
792 794 elif base == nullrev:
793 795 delta = revlog.revision(node)
794 796 prefix = mdiff.trivialdiffheader(len(delta))
795 797 else:
796 798 delta = revlog.revdiff(base, rev)
797 799 p1n, p2n = revlog.parents(node)
798 800 basenode = revlog.node(base)
799 801 flags = revlog.flags(rev)
800 802 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
801 803 meta += prefix
802 804 l = len(meta) + len(delta)
803 805 yield chunkheader(l)
804 806 yield meta
805 807 yield delta
806 808 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
807 809 # do nothing with basenode, it is implicitly the previous one in HG10
808 810 # do nothing with flags, it is implicitly 0 for cg1 and cg2
809 811 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
810 812
811 813 class cg2packer(cg1packer):
812 814 version = '02'
813 815 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
814 816
815 817 def __init__(self, repo, bundlecaps=None):
816 818 super(cg2packer, self).__init__(repo, bundlecaps)
817 819 if self._reorder is None:
818 820 # Since generaldelta is directly supported by cg2, reordering
819 821 # generally doesn't help, so we disable it by default (treating
820 822 # bundle.reorder=auto just like bundle.reorder=False).
821 823 self._reorder = False
822 824
823 825 def deltaparent(self, revlog, rev, p1, p2, prev):
824 826 dp = revlog.deltaparent(rev)
825 827 if dp == nullrev and revlog.storedeltachains:
826 828 # Avoid sending full revisions when delta parent is null. Pick prev
827 829 # in that case. It's tempting to pick p1 in this case, as p1 will
828 830 # be smaller in the common case. However, computing a delta against
829 831 # p1 may require resolving the raw text of p1, which could be
830 832 # expensive. The revlog caches should have prev cached, meaning
831 833 # less CPU for changegroup generation. There is likely room to add
832 834 # a flag and/or config option to control this behavior.
833 835 return prev
834 836 elif dp == nullrev:
835 837 # revlog is configured to use full snapshot for a reason,
836 838 # stick to full snapshot.
837 839 return nullrev
838 840 elif dp not in (p1, p2, prev):
839 841 # Pick prev when we can't be sure remote has the base revision.
840 842 return prev
841 843 else:
842 844 return dp
843 845
844 846 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
845 847 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
846 848 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
847 849
848 850 class cg3packer(cg2packer):
849 851 version = '03'
850 852 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
851 853
852 854 def _packmanifests(self, dir, mfnodes, lookuplinknode):
853 855 if dir:
854 856 yield self.fileheader(dir)
855 857
856 858 dirlog = self._repo.manifestlog._revlog.dirlog(dir)
857 859 for chunk in self.group(mfnodes, dirlog, lookuplinknode,
858 860 units=_('manifests')):
859 861 yield chunk
860 862
861 863 def _manifestsdone(self):
862 864 return self.close()
863 865
864 866 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
865 867 return struct.pack(
866 868 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
867 869
868 870 _packermap = {'01': (cg1packer, cg1unpacker),
869 871 # cg2 adds support for exchanging generaldelta
870 872 '02': (cg2packer, cg2unpacker),
871 873 # cg3 adds support for exchanging revlog flags and treemanifests
872 874 '03': (cg3packer, cg3unpacker),
873 875 }
874 876
875 877 def allsupportedversions(ui):
876 878 versions = set(_packermap.keys())
877 879 versions.discard('03')
878 880 if (ui.configbool('experimental', 'changegroup3') or
879 881 ui.configbool('experimental', 'treemanifest')):
880 882 versions.add('03')
881 883 return versions
882 884
883 885 # Changegroup versions that can be applied to the repo
884 886 def supportedincomingversions(repo):
885 887 versions = allsupportedversions(repo.ui)
886 888 if 'treemanifest' in repo.requirements:
887 889 versions.add('03')
888 890 return versions
889 891
890 892 # Changegroup versions that can be created from the repo
891 893 def supportedoutgoingversions(repo):
892 894 versions = allsupportedversions(repo.ui)
893 895 if 'treemanifest' in repo.requirements:
894 896 # Versions 01 and 02 support only flat manifests and it's just too
895 897 # expensive to convert between the flat manifest and tree manifest on
896 898 # the fly. Since tree manifests are hashed differently, all of history
897 899 # would have to be converted. Instead, we simply don't even pretend to
898 900 # support versions 01 and 02.
899 901 versions.discard('01')
900 902 versions.discard('02')
901 903 versions.add('03')
902 904 return versions
903 905
904 906 def safeversion(repo):
905 907 # Finds the smallest version that it's safe to assume clients of the repo
906 908 # will support. For example, all hg versions that support generaldelta also
907 909 # support changegroup 02.
908 910 versions = supportedoutgoingversions(repo)
909 911 if 'generaldelta' in repo.requirements:
910 912 versions.discard('01')
911 913 assert versions
912 914 return min(versions)
913 915
914 916 def getbundler(version, repo, bundlecaps=None):
915 917 assert version in supportedoutgoingversions(repo)
916 918 return _packermap[version][0](repo, bundlecaps)
917 919
918 920 def getunbundler(version, fh, alg, extras=None):
919 921 return _packermap[version][1](fh, alg, extras=extras)
920 922
921 923 def _changegroupinfo(repo, nodes, source):
922 924 if repo.ui.verbose or source == 'bundle':
923 925 repo.ui.status(_("%d changesets found\n") % len(nodes))
924 926 if repo.ui.debugflag:
925 927 repo.ui.debug("list of changesets:\n")
926 928 for node in nodes:
927 929 repo.ui.debug("%s\n" % hex(node))
928 930
929 931 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
930 932 repo = repo.unfiltered()
931 933 commonrevs = outgoing.common
932 934 csets = outgoing.missing
933 935 heads = outgoing.missingheads
934 936 # We go through the fast path if we get told to, or if all (unfiltered
935 937 # heads have been requested (since we then know there all linkrevs will
936 938 # be pulled by the client).
937 939 heads.sort()
938 940 fastpathlinkrev = fastpath or (
939 941 repo.filtername is None and heads == sorted(repo.heads()))
940 942
941 943 repo.hook('preoutgoing', throw=True, source=source)
942 944 _changegroupinfo(repo, csets, source)
943 945 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
944 946
945 947 def getsubset(repo, outgoing, bundler, source, fastpath=False):
946 948 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
947 949 return getunbundler(bundler.version, util.chunkbuffer(gengroup), None,
948 950 {'clcount': len(outgoing.missing)})
949 951
950 952 def changegroupsubset(repo, roots, heads, source, version='01'):
951 953 """Compute a changegroup consisting of all the nodes that are
952 954 descendants of any of the roots and ancestors of any of the heads.
953 955 Return a chunkbuffer object whose read() method will return
954 956 successive changegroup chunks.
955 957
956 958 It is fairly complex as determining which filenodes and which
957 959 manifest nodes need to be included for the changeset to be complete
958 960 is non-trivial.
959 961
960 962 Another wrinkle is doing the reverse, figuring out which changeset in
961 963 the changegroup a particular filenode or manifestnode belongs to.
962 964 """
963 965 outgoing = discovery.outgoing(repo, missingroots=roots, missingheads=heads)
964 966 bundler = getbundler(version, repo)
965 967 return getsubset(repo, outgoing, bundler, source)
966 968
967 969 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
968 970 version='01'):
969 971 """Like getbundle, but taking a discovery.outgoing as an argument.
970 972
971 973 This is only implemented for local repos and reuses potentially
972 974 precomputed sets in outgoing. Returns a raw changegroup generator."""
973 975 if not outgoing.missing:
974 976 return None
975 977 bundler = getbundler(version, repo, bundlecaps)
976 978 return getsubsetraw(repo, outgoing, bundler, source)
977 979
978 980 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None,
979 981 version='01'):
980 982 """Like getbundle, but taking a discovery.outgoing as an argument.
981 983
982 984 This is only implemented for local repos and reuses potentially
983 985 precomputed sets in outgoing."""
984 986 if not outgoing.missing:
985 987 return None
986 988 bundler = getbundler(version, repo, bundlecaps)
987 989 return getsubset(repo, outgoing, bundler, source)
988 990
989 991 def getchangegroup(repo, source, outgoing, bundlecaps=None,
990 992 version='01'):
991 993 """Like changegroupsubset, but returns the set difference between the
992 994 ancestors of heads and the ancestors common.
993 995
994 996 If heads is None, use the local heads. If common is None, use [nullid].
995 997
996 998 The nodes in common might not all be known locally due to the way the
997 999 current discovery protocol works.
998 1000 """
999 1001 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps,
1000 1002 version=version)
1001 1003
1002 1004 def changegroup(repo, basenodes, source):
1003 1005 # to avoid a race we use changegroupsubset() (issue1320)
1004 1006 return changegroupsubset(repo, basenodes, repo.heads(), source)
1005 1007
1006 1008 def _addchangegroupfiles(repo, source, revmap, trp, expectedfiles, needfiles):
1007 1009 revisions = 0
1008 1010 files = 0
1009 1011 for chunkdata in iter(source.filelogheader, {}):
1010 1012 files += 1
1011 1013 f = chunkdata["filename"]
1012 1014 repo.ui.debug("adding %s revisions\n" % f)
1013 1015 repo.ui.progress(_('files'), files, unit=_('files'),
1014 1016 total=expectedfiles)
1015 1017 fl = repo.file(f)
1016 1018 o = len(fl)
1017 1019 try:
1018 1020 if not fl.addgroup(source, revmap, trp):
1019 1021 raise error.Abort(_("received file revlog group is empty"))
1020 1022 except error.CensoredBaseError as e:
1021 1023 raise error.Abort(_("received delta base is censored: %s") % e)
1022 1024 revisions += len(fl) - o
1023 1025 if f in needfiles:
1024 1026 needs = needfiles[f]
1025 1027 for new in xrange(o, len(fl)):
1026 1028 n = fl.node(new)
1027 1029 if n in needs:
1028 1030 needs.remove(n)
1029 1031 else:
1030 1032 raise error.Abort(
1031 1033 _("received spurious file revlog entry"))
1032 1034 if not needs:
1033 1035 del needfiles[f]
1034 1036 repo.ui.progress(_('files'), None)
1035 1037
1036 1038 for f, needs in needfiles.iteritems():
1037 1039 fl = repo.file(f)
1038 1040 for n in needs:
1039 1041 try:
1040 1042 fl.rev(n)
1041 1043 except error.LookupError:
1042 1044 raise error.Abort(
1043 1045 _('missing file data for %s:%s - run hg verify') %
1044 1046 (f, hex(n)))
1045 1047
1046 1048 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now