##// END OF EJS Templates
changegroup: remove dictionary creation from deltachunk...
Durham Goode -
r34295:05131c96 default
parent child Browse files
Show More
@@ -1,1014 +1,1005 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullrev,
19 19 short,
20 20 )
21 21
22 22 from . import (
23 23 dagutil,
24 24 error,
25 25 mdiff,
26 26 phases,
27 27 pycompat,
28 28 util,
29 29 )
30 30
31 31 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
32 32 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
33 33 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
34 34
35 35 def readexactly(stream, n):
36 36 '''read n bytes from stream.read and abort if less was available'''
37 37 s = stream.read(n)
38 38 if len(s) < n:
39 39 raise error.Abort(_("stream ended unexpectedly"
40 40 " (got %d bytes, expected %d)")
41 41 % (len(s), n))
42 42 return s
43 43
44 44 def getchunk(stream):
45 45 """return the next chunk from stream as a string"""
46 46 d = readexactly(stream, 4)
47 47 l = struct.unpack(">l", d)[0]
48 48 if l <= 4:
49 49 if l:
50 50 raise error.Abort(_("invalid chunk length %d") % l)
51 51 return ""
52 52 return readexactly(stream, l - 4)
53 53
54 54 def chunkheader(length):
55 55 """return a changegroup chunk header (string)"""
56 56 return struct.pack(">l", length + 4)
57 57
58 58 def closechunk():
59 59 """return a changegroup chunk header (string) for a zero-length chunk"""
60 60 return struct.pack(">l", 0)
61 61
62 62 def writechunks(ui, chunks, filename, vfs=None):
63 63 """Write chunks to a file and return its filename.
64 64
65 65 The stream is assumed to be a bundle file.
66 66 Existing files will not be overwritten.
67 67 If no filename is specified, a temporary file is created.
68 68 """
69 69 fh = None
70 70 cleanup = None
71 71 try:
72 72 if filename:
73 73 if vfs:
74 74 fh = vfs.open(filename, "wb")
75 75 else:
76 76 # Increase default buffer size because default is usually
77 77 # small (4k is common on Linux).
78 78 fh = open(filename, "wb", 131072)
79 79 else:
80 80 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
81 81 fh = os.fdopen(fd, pycompat.sysstr("wb"))
82 82 cleanup = filename
83 83 for c in chunks:
84 84 fh.write(c)
85 85 cleanup = None
86 86 return filename
87 87 finally:
88 88 if fh is not None:
89 89 fh.close()
90 90 if cleanup is not None:
91 91 if filename and vfs:
92 92 vfs.unlink(cleanup)
93 93 else:
94 94 os.unlink(cleanup)
95 95
96 96 class cg1unpacker(object):
97 97 """Unpacker for cg1 changegroup streams.
98 98
99 99 A changegroup unpacker handles the framing of the revision data in
100 100 the wire format. Most consumers will want to use the apply()
101 101 method to add the changes from the changegroup to a repository.
102 102
103 103 If you're forwarding a changegroup unmodified to another consumer,
104 104 use getchunks(), which returns an iterator of changegroup
105 105 chunks. This is mostly useful for cases where you need to know the
106 106 data stream has ended by observing the end of the changegroup.
107 107
108 108 deltachunk() is useful only if you're applying delta data. Most
109 109 consumers should prefer apply() instead.
110 110
111 111 A few other public methods exist. Those are used only for
112 112 bundlerepo and some debug commands - their use is discouraged.
113 113 """
114 114 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
115 115 deltaheadersize = struct.calcsize(deltaheader)
116 116 version = '01'
117 117 _grouplistcount = 1 # One list of files after the manifests
118 118
119 119 def __init__(self, fh, alg, extras=None):
120 120 if alg is None:
121 121 alg = 'UN'
122 122 if alg not in util.compengines.supportedbundletypes:
123 123 raise error.Abort(_('unknown stream compression type: %s')
124 124 % alg)
125 125 if alg == 'BZ':
126 126 alg = '_truncatedBZ'
127 127
128 128 compengine = util.compengines.forbundletype(alg)
129 129 self._stream = compengine.decompressorreader(fh)
130 130 self._type = alg
131 131 self.extras = extras or {}
132 132 self.callback = None
133 133
134 134 # These methods (compressed, read, seek, tell) all appear to only
135 135 # be used by bundlerepo, but it's a little hard to tell.
136 136 def compressed(self):
137 137 return self._type is not None and self._type != 'UN'
138 138 def read(self, l):
139 139 return self._stream.read(l)
140 140 def seek(self, pos):
141 141 return self._stream.seek(pos)
142 142 def tell(self):
143 143 return self._stream.tell()
144 144 def close(self):
145 145 return self._stream.close()
146 146
147 147 def _chunklength(self):
148 148 d = readexactly(self._stream, 4)
149 149 l = struct.unpack(">l", d)[0]
150 150 if l <= 4:
151 151 if l:
152 152 raise error.Abort(_("invalid chunk length %d") % l)
153 153 return 0
154 154 if self.callback:
155 155 self.callback()
156 156 return l - 4
157 157
158 158 def changelogheader(self):
159 159 """v10 does not have a changelog header chunk"""
160 160 return {}
161 161
162 162 def manifestheader(self):
163 163 """v10 does not have a manifest header chunk"""
164 164 return {}
165 165
166 166 def filelogheader(self):
167 167 """return the header of the filelogs chunk, v10 only has the filename"""
168 168 l = self._chunklength()
169 169 if not l:
170 170 return {}
171 171 fname = readexactly(self._stream, l)
172 172 return {'filename': fname}
173 173
174 174 def _deltaheader(self, headertuple, prevnode):
175 175 node, p1, p2, cs = headertuple
176 176 if prevnode is None:
177 177 deltabase = p1
178 178 else:
179 179 deltabase = prevnode
180 180 flags = 0
181 181 return node, p1, p2, deltabase, cs, flags
182 182
183 183 def deltachunk(self, prevnode):
184 184 l = self._chunklength()
185 185 if not l:
186 186 return {}
187 187 headerdata = readexactly(self._stream, self.deltaheadersize)
188 188 header = struct.unpack(self.deltaheader, headerdata)
189 189 delta = readexactly(self._stream, l - self.deltaheadersize)
190 190 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
191 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
192 'deltabase': deltabase, 'delta': delta, 'flags': flags}
191 return (node, p1, p2, cs, deltabase, delta, flags)
193 192
194 193 def getchunks(self):
195 194 """returns all the chunks contains in the bundle
196 195
197 196 Used when you need to forward the binary stream to a file or another
198 197 network API. To do so, it parse the changegroup data, otherwise it will
199 198 block in case of sshrepo because it don't know the end of the stream.
200 199 """
201 200 # For changegroup 1 and 2, we expect 3 parts: changelog, manifestlog,
202 201 # and a list of filelogs. For changegroup 3, we expect 4 parts:
203 202 # changelog, manifestlog, a list of tree manifestlogs, and a list of
204 203 # filelogs.
205 204 #
206 205 # Changelog and manifestlog parts are terminated with empty chunks. The
207 206 # tree and file parts are a list of entry sections. Each entry section
208 207 # is a series of chunks terminating in an empty chunk. The list of these
209 208 # entry sections is terminated in yet another empty chunk, so we know
210 209 # we've reached the end of the tree/file list when we reach an empty
211 210 # chunk that was proceeded by no non-empty chunks.
212 211
213 212 parts = 0
214 213 while parts < 2 + self._grouplistcount:
215 214 noentries = True
216 215 while True:
217 216 chunk = getchunk(self)
218 217 if not chunk:
219 218 # The first two empty chunks represent the end of the
220 219 # changelog and the manifestlog portions. The remaining
221 220 # empty chunks represent either A) the end of individual
222 221 # tree or file entries in the file list, or B) the end of
223 222 # the entire list. It's the end of the entire list if there
224 223 # were no entries (i.e. noentries is True).
225 224 if parts < 2:
226 225 parts += 1
227 226 elif noentries:
228 227 parts += 1
229 228 break
230 229 noentries = False
231 230 yield chunkheader(len(chunk))
232 231 pos = 0
233 232 while pos < len(chunk):
234 233 next = pos + 2**20
235 234 yield chunk[pos:next]
236 235 pos = next
237 236 yield closechunk()
238 237
239 238 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
240 239 # We know that we'll never have more manifests than we had
241 240 # changesets.
242 241 self.callback = prog(_('manifests'), numchanges)
243 242 # no need to check for empty manifest group here:
244 243 # if the result of the merge of 1 and 2 is the same in 3 and 4,
245 244 # no new manifest will be created and the manifest group will
246 245 # be empty during the pull
247 246 self.manifestheader()
248 247 deltas = self.deltaiter()
249 248 repo.manifestlog._revlog.addgroup(deltas, revmap, trp)
250 249 repo.ui.progress(_('manifests'), None)
251 250 self.callback = None
252 251
253 252 def apply(self, repo, tr, srctype, url, targetphase=phases.draft,
254 253 expectedtotal=None):
255 254 """Add the changegroup returned by source.read() to this repo.
256 255 srctype is a string like 'push', 'pull', or 'unbundle'. url is
257 256 the URL of the repo where this changegroup is coming from.
258 257
259 258 Return an integer summarizing the change to this repo:
260 259 - nothing changed or no source: 0
261 260 - more heads than before: 1+added heads (2..n)
262 261 - fewer heads than before: -1-removed heads (-2..-n)
263 262 - number of heads stays the same: 1
264 263 """
265 264 repo = repo.unfiltered()
266 265 def csmap(x):
267 266 repo.ui.debug("add changeset %s\n" % short(x))
268 267 return len(cl)
269 268
270 269 def revmap(x):
271 270 return cl.rev(x)
272 271
273 272 changesets = files = revisions = 0
274 273
275 274 try:
276 275 # The transaction may already carry source information. In this
277 276 # case we use the top level data. We overwrite the argument
278 277 # because we need to use the top level value (if they exist)
279 278 # in this function.
280 279 srctype = tr.hookargs.setdefault('source', srctype)
281 280 url = tr.hookargs.setdefault('url', url)
282 281 repo.hook('prechangegroup',
283 282 throw=True, **pycompat.strkwargs(tr.hookargs))
284 283
285 284 # write changelog data to temp files so concurrent readers
286 285 # will not see an inconsistent view
287 286 cl = repo.changelog
288 287 cl.delayupdate(tr)
289 288 oldheads = set(cl.heads())
290 289
291 290 trp = weakref.proxy(tr)
292 291 # pull off the changeset group
293 292 repo.ui.status(_("adding changesets\n"))
294 293 clstart = len(cl)
295 294 class prog(object):
296 295 def __init__(self, step, total):
297 296 self._step = step
298 297 self._total = total
299 298 self._count = 1
300 299 def __call__(self):
301 300 repo.ui.progress(self._step, self._count, unit=_('chunks'),
302 301 total=self._total)
303 302 self._count += 1
304 303 self.callback = prog(_('changesets'), expectedtotal)
305 304
306 305 efiles = set()
307 306 def onchangelog(cl, node):
308 307 efiles.update(cl.readfiles(node))
309 308
310 309 self.changelogheader()
311 310 deltas = self.deltaiter()
312 311 cgnodes = cl.addgroup(deltas, csmap, trp, addrevisioncb=onchangelog)
313 312 efiles = len(efiles)
314 313
315 314 if not cgnodes:
316 315 repo.ui.develwarn('applied empty changegroup',
317 316 config='empty-changegroup')
318 317 clend = len(cl)
319 318 changesets = clend - clstart
320 319 repo.ui.progress(_('changesets'), None)
321 320 self.callback = None
322 321
323 322 # pull off the manifest group
324 323 repo.ui.status(_("adding manifests\n"))
325 324 self._unpackmanifests(repo, revmap, trp, prog, changesets)
326 325
327 326 needfiles = {}
328 327 if repo.ui.configbool('server', 'validate'):
329 328 cl = repo.changelog
330 329 ml = repo.manifestlog
331 330 # validate incoming csets have their manifests
332 331 for cset in xrange(clstart, clend):
333 332 mfnode = cl.changelogrevision(cset).manifest
334 333 mfest = ml[mfnode].readdelta()
335 334 # store file cgnodes we must see
336 335 for f, n in mfest.iteritems():
337 336 needfiles.setdefault(f, set()).add(n)
338 337
339 338 # process the files
340 339 repo.ui.status(_("adding file changes\n"))
341 340 newrevs, newfiles = _addchangegroupfiles(
342 341 repo, self, revmap, trp, efiles, needfiles)
343 342 revisions += newrevs
344 343 files += newfiles
345 344
346 345 deltaheads = 0
347 346 if oldheads:
348 347 heads = cl.heads()
349 348 deltaheads = len(heads) - len(oldheads)
350 349 for h in heads:
351 350 if h not in oldheads and repo[h].closesbranch():
352 351 deltaheads -= 1
353 352 htext = ""
354 353 if deltaheads:
355 354 htext = _(" (%+d heads)") % deltaheads
356 355
357 356 repo.ui.status(_("added %d changesets"
358 357 " with %d changes to %d files%s\n")
359 358 % (changesets, revisions, files, htext))
360 359 repo.invalidatevolatilesets()
361 360
362 361 if changesets > 0:
363 362 if 'node' not in tr.hookargs:
364 363 tr.hookargs['node'] = hex(cl.node(clstart))
365 364 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
366 365 hookargs = dict(tr.hookargs)
367 366 else:
368 367 hookargs = dict(tr.hookargs)
369 368 hookargs['node'] = hex(cl.node(clstart))
370 369 hookargs['node_last'] = hex(cl.node(clend - 1))
371 370 repo.hook('pretxnchangegroup',
372 371 throw=True, **pycompat.strkwargs(hookargs))
373 372
374 373 added = [cl.node(r) for r in xrange(clstart, clend)]
375 374 phaseall = None
376 375 if srctype in ('push', 'serve'):
377 376 # Old servers can not push the boundary themselves.
378 377 # New servers won't push the boundary if changeset already
379 378 # exists locally as secret
380 379 #
381 380 # We should not use added here but the list of all change in
382 381 # the bundle
383 382 if repo.publishing():
384 383 targetphase = phaseall = phases.public
385 384 else:
386 385 # closer target phase computation
387 386
388 387 # Those changesets have been pushed from the
389 388 # outside, their phases are going to be pushed
390 389 # alongside. Therefor `targetphase` is
391 390 # ignored.
392 391 targetphase = phaseall = phases.draft
393 392 if added:
394 393 phases.registernew(repo, tr, targetphase, added)
395 394 if phaseall is not None:
396 395 phases.advanceboundary(repo, tr, phaseall, cgnodes)
397 396
398 397 if changesets > 0:
399 398
400 399 def runhooks():
401 400 # These hooks run when the lock releases, not when the
402 401 # transaction closes. So it's possible for the changelog
403 402 # to have changed since we last saw it.
404 403 if clstart >= len(repo):
405 404 return
406 405
407 406 repo.hook("changegroup", **pycompat.strkwargs(hookargs))
408 407
409 408 for n in added:
410 409 args = hookargs.copy()
411 410 args['node'] = hex(n)
412 411 del args['node_last']
413 412 repo.hook("incoming", **pycompat.strkwargs(args))
414 413
415 414 newheads = [h for h in repo.heads()
416 415 if h not in oldheads]
417 416 repo.ui.log("incoming",
418 417 "%s incoming changes - new heads: %s\n",
419 418 len(added),
420 419 ', '.join([hex(c[:6]) for c in newheads]))
421 420
422 421 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
423 422 lambda tr: repo._afterlock(runhooks))
424 423 finally:
425 424 repo.ui.flush()
426 425 # never return 0 here:
427 426 if deltaheads < 0:
428 427 ret = deltaheads - 1
429 428 else:
430 429 ret = deltaheads + 1
431 430 return ret
432 431
433 432 def deltaiter(self):
434 433 """
435 434 returns an iterator of the deltas in this changegroup
436 435
437 436 Useful for passing to the underlying storage system to be stored.
438 437 """
439 438 chain = None
440 439 for chunkdata in iter(lambda: self.deltachunk(chain), {}):
441 node = chunkdata['node']
442 p1 = chunkdata['p1']
443 p2 = chunkdata['p2']
444 cs = chunkdata['cs']
445 deltabase = chunkdata['deltabase']
446 delta = chunkdata['delta']
447 flags = chunkdata['flags']
448
449 chain = node
450
451 yield (node, p1, p2, cs, deltabase, delta, flags)
440 # Chunkdata: (node, p1, p2, cs, deltabase, delta, flags)
441 yield chunkdata
442 chain = chunkdata[0]
452 443
453 444 class cg2unpacker(cg1unpacker):
454 445 """Unpacker for cg2 streams.
455 446
456 447 cg2 streams add support for generaldelta, so the delta header
457 448 format is slightly different. All other features about the data
458 449 remain the same.
459 450 """
460 451 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
461 452 deltaheadersize = struct.calcsize(deltaheader)
462 453 version = '02'
463 454
464 455 def _deltaheader(self, headertuple, prevnode):
465 456 node, p1, p2, deltabase, cs = headertuple
466 457 flags = 0
467 458 return node, p1, p2, deltabase, cs, flags
468 459
469 460 class cg3unpacker(cg2unpacker):
470 461 """Unpacker for cg3 streams.
471 462
472 463 cg3 streams add support for exchanging treemanifests and revlog
473 464 flags. It adds the revlog flags to the delta header and an empty chunk
474 465 separating manifests and files.
475 466 """
476 467 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
477 468 deltaheadersize = struct.calcsize(deltaheader)
478 469 version = '03'
479 470 _grouplistcount = 2 # One list of manifests and one list of files
480 471
481 472 def _deltaheader(self, headertuple, prevnode):
482 473 node, p1, p2, deltabase, cs, flags = headertuple
483 474 return node, p1, p2, deltabase, cs, flags
484 475
485 476 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
486 477 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog,
487 478 numchanges)
488 479 for chunkdata in iter(self.filelogheader, {}):
489 480 # If we get here, there are directory manifests in the changegroup
490 481 d = chunkdata["filename"]
491 482 repo.ui.debug("adding %s revisions\n" % d)
492 483 dirlog = repo.manifestlog._revlog.dirlog(d)
493 484 deltas = self.deltaiter()
494 485 if not dirlog.addgroup(deltas, revmap, trp):
495 486 raise error.Abort(_("received dir revlog group is empty"))
496 487
497 488 class headerlessfixup(object):
498 489 def __init__(self, fh, h):
499 490 self._h = h
500 491 self._fh = fh
501 492 def read(self, n):
502 493 if self._h:
503 494 d, self._h = self._h[:n], self._h[n:]
504 495 if len(d) < n:
505 496 d += readexactly(self._fh, n - len(d))
506 497 return d
507 498 return readexactly(self._fh, n)
508 499
509 500 class cg1packer(object):
510 501 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
511 502 version = '01'
512 503 def __init__(self, repo, bundlecaps=None):
513 504 """Given a source repo, construct a bundler.
514 505
515 506 bundlecaps is optional and can be used to specify the set of
516 507 capabilities which can be used to build the bundle. While bundlecaps is
517 508 unused in core Mercurial, extensions rely on this feature to communicate
518 509 capabilities to customize the changegroup packer.
519 510 """
520 511 # Set of capabilities we can use to build the bundle.
521 512 if bundlecaps is None:
522 513 bundlecaps = set()
523 514 self._bundlecaps = bundlecaps
524 515 # experimental config: bundle.reorder
525 516 reorder = repo.ui.config('bundle', 'reorder')
526 517 if reorder == 'auto':
527 518 reorder = None
528 519 else:
529 520 reorder = util.parsebool(reorder)
530 521 self._repo = repo
531 522 self._reorder = reorder
532 523 self._progress = repo.ui.progress
533 524 if self._repo.ui.verbose and not self._repo.ui.debugflag:
534 525 self._verbosenote = self._repo.ui.note
535 526 else:
536 527 self._verbosenote = lambda s: None
537 528
538 529 def close(self):
539 530 return closechunk()
540 531
541 532 def fileheader(self, fname):
542 533 return chunkheader(len(fname)) + fname
543 534
544 535 # Extracted both for clarity and for overriding in extensions.
545 536 def _sortgroup(self, revlog, nodelist, lookup):
546 537 """Sort nodes for change group and turn them into revnums."""
547 538 # for generaldelta revlogs, we linearize the revs; this will both be
548 539 # much quicker and generate a much smaller bundle
549 540 if (revlog._generaldelta and self._reorder is None) or self._reorder:
550 541 dag = dagutil.revlogdag(revlog)
551 542 return dag.linearize(set(revlog.rev(n) for n in nodelist))
552 543 else:
553 544 return sorted([revlog.rev(n) for n in nodelist])
554 545
555 546 def group(self, nodelist, revlog, lookup, units=None):
556 547 """Calculate a delta group, yielding a sequence of changegroup chunks
557 548 (strings).
558 549
559 550 Given a list of changeset revs, return a set of deltas and
560 551 metadata corresponding to nodes. The first delta is
561 552 first parent(nodelist[0]) -> nodelist[0], the receiver is
562 553 guaranteed to have this parent as it has all history before
563 554 these changesets. In the case firstparent is nullrev the
564 555 changegroup starts with a full revision.
565 556
566 557 If units is not None, progress detail will be generated, units specifies
567 558 the type of revlog that is touched (changelog, manifest, etc.).
568 559 """
569 560 # if we don't have any revisions touched by these changesets, bail
570 561 if len(nodelist) == 0:
571 562 yield self.close()
572 563 return
573 564
574 565 revs = self._sortgroup(revlog, nodelist, lookup)
575 566
576 567 # add the parent of the first rev
577 568 p = revlog.parentrevs(revs[0])[0]
578 569 revs.insert(0, p)
579 570
580 571 # build deltas
581 572 total = len(revs) - 1
582 573 msgbundling = _('bundling')
583 574 for r in xrange(len(revs) - 1):
584 575 if units is not None:
585 576 self._progress(msgbundling, r + 1, unit=units, total=total)
586 577 prev, curr = revs[r], revs[r + 1]
587 578 linknode = lookup(revlog.node(curr))
588 579 for c in self.revchunk(revlog, curr, prev, linknode):
589 580 yield c
590 581
591 582 if units is not None:
592 583 self._progress(msgbundling, None)
593 584 yield self.close()
594 585
595 586 # filter any nodes that claim to be part of the known set
596 587 def prune(self, revlog, missing, commonrevs):
597 588 rr, rl = revlog.rev, revlog.linkrev
598 589 return [n for n in missing if rl(rr(n)) not in commonrevs]
599 590
600 591 def _packmanifests(self, dir, mfnodes, lookuplinknode):
601 592 """Pack flat manifests into a changegroup stream."""
602 593 assert not dir
603 594 for chunk in self.group(mfnodes, self._repo.manifestlog._revlog,
604 595 lookuplinknode, units=_('manifests')):
605 596 yield chunk
606 597
607 598 def _manifestsdone(self):
608 599 return ''
609 600
610 601 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
611 602 '''yield a sequence of changegroup chunks (strings)'''
612 603 repo = self._repo
613 604 cl = repo.changelog
614 605
615 606 clrevorder = {}
616 607 mfs = {} # needed manifests
617 608 fnodes = {} # needed file nodes
618 609 changedfiles = set()
619 610
620 611 # Callback for the changelog, used to collect changed files and manifest
621 612 # nodes.
622 613 # Returns the linkrev node (identity in the changelog case).
623 614 def lookupcl(x):
624 615 c = cl.read(x)
625 616 clrevorder[x] = len(clrevorder)
626 617 n = c[0]
627 618 # record the first changeset introducing this manifest version
628 619 mfs.setdefault(n, x)
629 620 # Record a complete list of potentially-changed files in
630 621 # this manifest.
631 622 changedfiles.update(c[3])
632 623 return x
633 624
634 625 self._verbosenote(_('uncompressed size of bundle content:\n'))
635 626 size = 0
636 627 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
637 628 size += len(chunk)
638 629 yield chunk
639 630 self._verbosenote(_('%8.i (changelog)\n') % size)
640 631
641 632 # We need to make sure that the linkrev in the changegroup refers to
642 633 # the first changeset that introduced the manifest or file revision.
643 634 # The fastpath is usually safer than the slowpath, because the filelogs
644 635 # are walked in revlog order.
645 636 #
646 637 # When taking the slowpath with reorder=None and the manifest revlog
647 638 # uses generaldelta, the manifest may be walked in the "wrong" order.
648 639 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
649 640 # cc0ff93d0c0c).
650 641 #
651 642 # When taking the fastpath, we are only vulnerable to reordering
652 643 # of the changelog itself. The changelog never uses generaldelta, so
653 644 # it is only reordered when reorder=True. To handle this case, we
654 645 # simply take the slowpath, which already has the 'clrevorder' logic.
655 646 # This was also fixed in cc0ff93d0c0c.
656 647 fastpathlinkrev = fastpathlinkrev and not self._reorder
657 648 # Treemanifests don't work correctly with fastpathlinkrev
658 649 # either, because we don't discover which directory nodes to
659 650 # send along with files. This could probably be fixed.
660 651 fastpathlinkrev = fastpathlinkrev and (
661 652 'treemanifest' not in repo.requirements)
662 653
663 654 for chunk in self.generatemanifests(commonrevs, clrevorder,
664 655 fastpathlinkrev, mfs, fnodes, source):
665 656 yield chunk
666 657 mfs.clear()
667 658 clrevs = set(cl.rev(x) for x in clnodes)
668 659
669 660 if not fastpathlinkrev:
670 661 def linknodes(unused, fname):
671 662 return fnodes.get(fname, {})
672 663 else:
673 664 cln = cl.node
674 665 def linknodes(filerevlog, fname):
675 666 llr = filerevlog.linkrev
676 667 fln = filerevlog.node
677 668 revs = ((r, llr(r)) for r in filerevlog)
678 669 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
679 670
680 671 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
681 672 source):
682 673 yield chunk
683 674
684 675 yield self.close()
685 676
686 677 if clnodes:
687 678 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
688 679
689 680 def generatemanifests(self, commonrevs, clrevorder, fastpathlinkrev, mfs,
690 681 fnodes, source):
691 682 """Returns an iterator of changegroup chunks containing manifests.
692 683
693 684 `source` is unused here, but is used by extensions like remotefilelog to
694 685 change what is sent based in pulls vs pushes, etc.
695 686 """
696 687 repo = self._repo
697 688 mfl = repo.manifestlog
698 689 dirlog = mfl._revlog.dirlog
699 690 tmfnodes = {'': mfs}
700 691
701 692 # Callback for the manifest, used to collect linkrevs for filelog
702 693 # revisions.
703 694 # Returns the linkrev node (collected in lookupcl).
704 695 def makelookupmflinknode(dir):
705 696 if fastpathlinkrev:
706 697 assert not dir
707 698 return mfs.__getitem__
708 699
709 700 def lookupmflinknode(x):
710 701 """Callback for looking up the linknode for manifests.
711 702
712 703 Returns the linkrev node for the specified manifest.
713 704
714 705 SIDE EFFECT:
715 706
716 707 1) fclnodes gets populated with the list of relevant
717 708 file nodes if we're not using fastpathlinkrev
718 709 2) When treemanifests are in use, collects treemanifest nodes
719 710 to send
720 711
721 712 Note that this means manifests must be completely sent to
722 713 the client before you can trust the list of files and
723 714 treemanifests to send.
724 715 """
725 716 clnode = tmfnodes[dir][x]
726 717 mdata = mfl.get(dir, x).readfast(shallow=True)
727 718 for p, n, fl in mdata.iterentries():
728 719 if fl == 't': # subdirectory manifest
729 720 subdir = dir + p + '/'
730 721 tmfclnodes = tmfnodes.setdefault(subdir, {})
731 722 tmfclnode = tmfclnodes.setdefault(n, clnode)
732 723 if clrevorder[clnode] < clrevorder[tmfclnode]:
733 724 tmfclnodes[n] = clnode
734 725 else:
735 726 f = dir + p
736 727 fclnodes = fnodes.setdefault(f, {})
737 728 fclnode = fclnodes.setdefault(n, clnode)
738 729 if clrevorder[clnode] < clrevorder[fclnode]:
739 730 fclnodes[n] = clnode
740 731 return clnode
741 732 return lookupmflinknode
742 733
743 734 size = 0
744 735 while tmfnodes:
745 736 dir = min(tmfnodes)
746 737 nodes = tmfnodes[dir]
747 738 prunednodes = self.prune(dirlog(dir), nodes, commonrevs)
748 739 if not dir or prunednodes:
749 740 for x in self._packmanifests(dir, prunednodes,
750 741 makelookupmflinknode(dir)):
751 742 size += len(x)
752 743 yield x
753 744 del tmfnodes[dir]
754 745 self._verbosenote(_('%8.i (manifests)\n') % size)
755 746 yield self._manifestsdone()
756 747
757 748 # The 'source' parameter is useful for extensions
758 749 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
759 750 repo = self._repo
760 751 progress = self._progress
761 752 msgbundling = _('bundling')
762 753
763 754 total = len(changedfiles)
764 755 # for progress output
765 756 msgfiles = _('files')
766 757 for i, fname in enumerate(sorted(changedfiles)):
767 758 filerevlog = repo.file(fname)
768 759 if not filerevlog:
769 760 raise error.Abort(_("empty or missing revlog for %s") % fname)
770 761
771 762 linkrevnodes = linknodes(filerevlog, fname)
772 763 # Lookup for filenodes, we collected the linkrev nodes above in the
773 764 # fastpath case and with lookupmf in the slowpath case.
774 765 def lookupfilelog(x):
775 766 return linkrevnodes[x]
776 767
777 768 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
778 769 if filenodes:
779 770 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
780 771 total=total)
781 772 h = self.fileheader(fname)
782 773 size = len(h)
783 774 yield h
784 775 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
785 776 size += len(chunk)
786 777 yield chunk
787 778 self._verbosenote(_('%8.i %s\n') % (size, fname))
788 779 progress(msgbundling, None)
789 780
790 781 def deltaparent(self, revlog, rev, p1, p2, prev):
791 782 return prev
792 783
793 784 def revchunk(self, revlog, rev, prev, linknode):
794 785 node = revlog.node(rev)
795 786 p1, p2 = revlog.parentrevs(rev)
796 787 base = self.deltaparent(revlog, rev, p1, p2, prev)
797 788
798 789 prefix = ''
799 790 if revlog.iscensored(base) or revlog.iscensored(rev):
800 791 try:
801 792 delta = revlog.revision(node, raw=True)
802 793 except error.CensoredNodeError as e:
803 794 delta = e.tombstone
804 795 if base == nullrev:
805 796 prefix = mdiff.trivialdiffheader(len(delta))
806 797 else:
807 798 baselen = revlog.rawsize(base)
808 799 prefix = mdiff.replacediffheader(baselen, len(delta))
809 800 elif base == nullrev:
810 801 delta = revlog.revision(node, raw=True)
811 802 prefix = mdiff.trivialdiffheader(len(delta))
812 803 else:
813 804 delta = revlog.revdiff(base, rev)
814 805 p1n, p2n = revlog.parents(node)
815 806 basenode = revlog.node(base)
816 807 flags = revlog.flags(rev)
817 808 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
818 809 meta += prefix
819 810 l = len(meta) + len(delta)
820 811 yield chunkheader(l)
821 812 yield meta
822 813 yield delta
823 814 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
824 815 # do nothing with basenode, it is implicitly the previous one in HG10
825 816 # do nothing with flags, it is implicitly 0 for cg1 and cg2
826 817 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
827 818
828 819 class cg2packer(cg1packer):
829 820 version = '02'
830 821 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
831 822
832 823 def __init__(self, repo, bundlecaps=None):
833 824 super(cg2packer, self).__init__(repo, bundlecaps)
834 825 if self._reorder is None:
835 826 # Since generaldelta is directly supported by cg2, reordering
836 827 # generally doesn't help, so we disable it by default (treating
837 828 # bundle.reorder=auto just like bundle.reorder=False).
838 829 self._reorder = False
839 830
840 831 def deltaparent(self, revlog, rev, p1, p2, prev):
841 832 dp = revlog.deltaparent(rev)
842 833 if dp == nullrev and revlog.storedeltachains:
843 834 # Avoid sending full revisions when delta parent is null. Pick prev
844 835 # in that case. It's tempting to pick p1 in this case, as p1 will
845 836 # be smaller in the common case. However, computing a delta against
846 837 # p1 may require resolving the raw text of p1, which could be
847 838 # expensive. The revlog caches should have prev cached, meaning
848 839 # less CPU for changegroup generation. There is likely room to add
849 840 # a flag and/or config option to control this behavior.
850 841 return prev
851 842 elif dp == nullrev:
852 843 # revlog is configured to use full snapshot for a reason,
853 844 # stick to full snapshot.
854 845 return nullrev
855 846 elif dp not in (p1, p2, prev):
856 847 # Pick prev when we can't be sure remote has the base revision.
857 848 return prev
858 849 else:
859 850 return dp
860 851
861 852 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
862 853 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
863 854 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
864 855
865 856 class cg3packer(cg2packer):
866 857 version = '03'
867 858 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
868 859
869 860 def _packmanifests(self, dir, mfnodes, lookuplinknode):
870 861 if dir:
871 862 yield self.fileheader(dir)
872 863
873 864 dirlog = self._repo.manifestlog._revlog.dirlog(dir)
874 865 for chunk in self.group(mfnodes, dirlog, lookuplinknode,
875 866 units=_('manifests')):
876 867 yield chunk
877 868
878 869 def _manifestsdone(self):
879 870 return self.close()
880 871
881 872 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
882 873 return struct.pack(
883 874 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
884 875
885 876 _packermap = {'01': (cg1packer, cg1unpacker),
886 877 # cg2 adds support for exchanging generaldelta
887 878 '02': (cg2packer, cg2unpacker),
888 879 # cg3 adds support for exchanging revlog flags and treemanifests
889 880 '03': (cg3packer, cg3unpacker),
890 881 }
891 882
892 883 def allsupportedversions(repo):
893 884 versions = set(_packermap.keys())
894 885 if not (repo.ui.configbool('experimental', 'changegroup3') or
895 886 repo.ui.configbool('experimental', 'treemanifest') or
896 887 'treemanifest' in repo.requirements):
897 888 versions.discard('03')
898 889 return versions
899 890
900 891 # Changegroup versions that can be applied to the repo
901 892 def supportedincomingversions(repo):
902 893 return allsupportedversions(repo)
903 894
904 895 # Changegroup versions that can be created from the repo
905 896 def supportedoutgoingversions(repo):
906 897 versions = allsupportedversions(repo)
907 898 if 'treemanifest' in repo.requirements:
908 899 # Versions 01 and 02 support only flat manifests and it's just too
909 900 # expensive to convert between the flat manifest and tree manifest on
910 901 # the fly. Since tree manifests are hashed differently, all of history
911 902 # would have to be converted. Instead, we simply don't even pretend to
912 903 # support versions 01 and 02.
913 904 versions.discard('01')
914 905 versions.discard('02')
915 906 return versions
916 907
917 908 def localversion(repo):
918 909 # Finds the best version to use for bundles that are meant to be used
919 910 # locally, such as those from strip and shelve, and temporary bundles.
920 911 return max(supportedoutgoingversions(repo))
921 912
922 913 def safeversion(repo):
923 914 # Finds the smallest version that it's safe to assume clients of the repo
924 915 # will support. For example, all hg versions that support generaldelta also
925 916 # support changegroup 02.
926 917 versions = supportedoutgoingversions(repo)
927 918 if 'generaldelta' in repo.requirements:
928 919 versions.discard('01')
929 920 assert versions
930 921 return min(versions)
931 922
932 923 def getbundler(version, repo, bundlecaps=None):
933 924 assert version in supportedoutgoingversions(repo)
934 925 return _packermap[version][0](repo, bundlecaps)
935 926
936 927 def getunbundler(version, fh, alg, extras=None):
937 928 return _packermap[version][1](fh, alg, extras=extras)
938 929
939 930 def _changegroupinfo(repo, nodes, source):
940 931 if repo.ui.verbose or source == 'bundle':
941 932 repo.ui.status(_("%d changesets found\n") % len(nodes))
942 933 if repo.ui.debugflag:
943 934 repo.ui.debug("list of changesets:\n")
944 935 for node in nodes:
945 936 repo.ui.debug("%s\n" % hex(node))
946 937
947 938 def makechangegroup(repo, outgoing, version, source, fastpath=False,
948 939 bundlecaps=None):
949 940 cgstream = makestream(repo, outgoing, version, source,
950 941 fastpath=fastpath, bundlecaps=bundlecaps)
951 942 return getunbundler(version, util.chunkbuffer(cgstream), None,
952 943 {'clcount': len(outgoing.missing) })
953 944
954 945 def makestream(repo, outgoing, version, source, fastpath=False,
955 946 bundlecaps=None):
956 947 bundler = getbundler(version, repo, bundlecaps=bundlecaps)
957 948
958 949 repo = repo.unfiltered()
959 950 commonrevs = outgoing.common
960 951 csets = outgoing.missing
961 952 heads = outgoing.missingheads
962 953 # We go through the fast path if we get told to, or if all (unfiltered
963 954 # heads have been requested (since we then know there all linkrevs will
964 955 # be pulled by the client).
965 956 heads.sort()
966 957 fastpathlinkrev = fastpath or (
967 958 repo.filtername is None and heads == sorted(repo.heads()))
968 959
969 960 repo.hook('preoutgoing', throw=True, source=source)
970 961 _changegroupinfo(repo, csets, source)
971 962 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
972 963
973 964 def _addchangegroupfiles(repo, source, revmap, trp, expectedfiles, needfiles):
974 965 revisions = 0
975 966 files = 0
976 967 for chunkdata in iter(source.filelogheader, {}):
977 968 files += 1
978 969 f = chunkdata["filename"]
979 970 repo.ui.debug("adding %s revisions\n" % f)
980 971 repo.ui.progress(_('files'), files, unit=_('files'),
981 972 total=expectedfiles)
982 973 fl = repo.file(f)
983 974 o = len(fl)
984 975 try:
985 976 deltas = source.deltaiter()
986 977 if not fl.addgroup(deltas, revmap, trp):
987 978 raise error.Abort(_("received file revlog group is empty"))
988 979 except error.CensoredBaseError as e:
989 980 raise error.Abort(_("received delta base is censored: %s") % e)
990 981 revisions += len(fl) - o
991 982 if f in needfiles:
992 983 needs = needfiles[f]
993 984 for new in xrange(o, len(fl)):
994 985 n = fl.node(new)
995 986 if n in needs:
996 987 needs.remove(n)
997 988 else:
998 989 raise error.Abort(
999 990 _("received spurious file revlog entry"))
1000 991 if not needs:
1001 992 del needfiles[f]
1002 993 repo.ui.progress(_('files'), None)
1003 994
1004 995 for f, needs in needfiles.iteritems():
1005 996 fl = repo.file(f)
1006 997 for n in needs:
1007 998 try:
1008 999 fl.rev(n)
1009 1000 except error.LookupError:
1010 1001 raise error.Abort(
1011 1002 _('missing file data for %s:%s - run hg verify') %
1012 1003 (f, hex(n)))
1013 1004
1014 1005 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now