##// END OF EJS Templates
changegroup: deduplicate 'getlocalchangegroup'...
marmoute -
r32164:29e286fa default
parent child Browse files
Show More
@@ -1,1034 +1,1026 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullrev,
19 19 short,
20 20 )
21 21
22 22 from . import (
23 23 branchmap,
24 24 dagutil,
25 25 discovery,
26 26 error,
27 27 mdiff,
28 28 phases,
29 29 pycompat,
30 30 util,
31 31 )
32 32
33 33 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
34 34 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
35 35 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
36 36
37 37 def readexactly(stream, n):
38 38 '''read n bytes from stream.read and abort if less was available'''
39 39 s = stream.read(n)
40 40 if len(s) < n:
41 41 raise error.Abort(_("stream ended unexpectedly"
42 42 " (got %d bytes, expected %d)")
43 43 % (len(s), n))
44 44 return s
45 45
46 46 def getchunk(stream):
47 47 """return the next chunk from stream as a string"""
48 48 d = readexactly(stream, 4)
49 49 l = struct.unpack(">l", d)[0]
50 50 if l <= 4:
51 51 if l:
52 52 raise error.Abort(_("invalid chunk length %d") % l)
53 53 return ""
54 54 return readexactly(stream, l - 4)
55 55
56 56 def chunkheader(length):
57 57 """return a changegroup chunk header (string)"""
58 58 return struct.pack(">l", length + 4)
59 59
60 60 def closechunk():
61 61 """return a changegroup chunk header (string) for a zero-length chunk"""
62 62 return struct.pack(">l", 0)
63 63
64 64 def combineresults(results):
65 65 """logic to combine 0 or more addchangegroup results into one"""
66 66 changedheads = 0
67 67 result = 1
68 68 for ret in results:
69 69 # If any changegroup result is 0, return 0
70 70 if ret == 0:
71 71 result = 0
72 72 break
73 73 if ret < -1:
74 74 changedheads += ret + 1
75 75 elif ret > 1:
76 76 changedheads += ret - 1
77 77 if changedheads > 0:
78 78 result = 1 + changedheads
79 79 elif changedheads < 0:
80 80 result = -1 + changedheads
81 81 return result
82 82
83 83 def writechunks(ui, chunks, filename, vfs=None):
84 84 """Write chunks to a file and return its filename.
85 85
86 86 The stream is assumed to be a bundle file.
87 87 Existing files will not be overwritten.
88 88 If no filename is specified, a temporary file is created.
89 89 """
90 90 fh = None
91 91 cleanup = None
92 92 try:
93 93 if filename:
94 94 if vfs:
95 95 fh = vfs.open(filename, "wb")
96 96 else:
97 97 # Increase default buffer size because default is usually
98 98 # small (4k is common on Linux).
99 99 fh = open(filename, "wb", 131072)
100 100 else:
101 101 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
102 102 fh = os.fdopen(fd, pycompat.sysstr("wb"))
103 103 cleanup = filename
104 104 for c in chunks:
105 105 fh.write(c)
106 106 cleanup = None
107 107 return filename
108 108 finally:
109 109 if fh is not None:
110 110 fh.close()
111 111 if cleanup is not None:
112 112 if filename and vfs:
113 113 vfs.unlink(cleanup)
114 114 else:
115 115 os.unlink(cleanup)
116 116
117 117 class cg1unpacker(object):
118 118 """Unpacker for cg1 changegroup streams.
119 119
120 120 A changegroup unpacker handles the framing of the revision data in
121 121 the wire format. Most consumers will want to use the apply()
122 122 method to add the changes from the changegroup to a repository.
123 123
124 124 If you're forwarding a changegroup unmodified to another consumer,
125 125 use getchunks(), which returns an iterator of changegroup
126 126 chunks. This is mostly useful for cases where you need to know the
127 127 data stream has ended by observing the end of the changegroup.
128 128
129 129 deltachunk() is useful only if you're applying delta data. Most
130 130 consumers should prefer apply() instead.
131 131
132 132 A few other public methods exist. Those are used only for
133 133 bundlerepo and some debug commands - their use is discouraged.
134 134 """
135 135 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
136 136 deltaheadersize = struct.calcsize(deltaheader)
137 137 version = '01'
138 138 _grouplistcount = 1 # One list of files after the manifests
139 139
140 140 def __init__(self, fh, alg, extras=None):
141 141 if alg is None:
142 142 alg = 'UN'
143 143 if alg not in util.compengines.supportedbundletypes:
144 144 raise error.Abort(_('unknown stream compression type: %s')
145 145 % alg)
146 146 if alg == 'BZ':
147 147 alg = '_truncatedBZ'
148 148
149 149 compengine = util.compengines.forbundletype(alg)
150 150 self._stream = compengine.decompressorreader(fh)
151 151 self._type = alg
152 152 self.extras = extras or {}
153 153 self.callback = None
154 154
155 155 # These methods (compressed, read, seek, tell) all appear to only
156 156 # be used by bundlerepo, but it's a little hard to tell.
157 157 def compressed(self):
158 158 return self._type is not None and self._type != 'UN'
159 159 def read(self, l):
160 160 return self._stream.read(l)
161 161 def seek(self, pos):
162 162 return self._stream.seek(pos)
163 163 def tell(self):
164 164 return self._stream.tell()
165 165 def close(self):
166 166 return self._stream.close()
167 167
168 168 def _chunklength(self):
169 169 d = readexactly(self._stream, 4)
170 170 l = struct.unpack(">l", d)[0]
171 171 if l <= 4:
172 172 if l:
173 173 raise error.Abort(_("invalid chunk length %d") % l)
174 174 return 0
175 175 if self.callback:
176 176 self.callback()
177 177 return l - 4
178 178
179 179 def changelogheader(self):
180 180 """v10 does not have a changelog header chunk"""
181 181 return {}
182 182
183 183 def manifestheader(self):
184 184 """v10 does not have a manifest header chunk"""
185 185 return {}
186 186
187 187 def filelogheader(self):
188 188 """return the header of the filelogs chunk, v10 only has the filename"""
189 189 l = self._chunklength()
190 190 if not l:
191 191 return {}
192 192 fname = readexactly(self._stream, l)
193 193 return {'filename': fname}
194 194
195 195 def _deltaheader(self, headertuple, prevnode):
196 196 node, p1, p2, cs = headertuple
197 197 if prevnode is None:
198 198 deltabase = p1
199 199 else:
200 200 deltabase = prevnode
201 201 flags = 0
202 202 return node, p1, p2, deltabase, cs, flags
203 203
204 204 def deltachunk(self, prevnode):
205 205 l = self._chunklength()
206 206 if not l:
207 207 return {}
208 208 headerdata = readexactly(self._stream, self.deltaheadersize)
209 209 header = struct.unpack(self.deltaheader, headerdata)
210 210 delta = readexactly(self._stream, l - self.deltaheadersize)
211 211 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
212 212 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
213 213 'deltabase': deltabase, 'delta': delta, 'flags': flags}
214 214
215 215 def getchunks(self):
216 216 """returns all the chunks contains in the bundle
217 217
218 218 Used when you need to forward the binary stream to a file or another
219 219 network API. To do so, it parse the changegroup data, otherwise it will
220 220 block in case of sshrepo because it don't know the end of the stream.
221 221 """
222 222 # an empty chunkgroup is the end of the changegroup
223 223 # a changegroup has at least 2 chunkgroups (changelog and manifest).
224 224 # after that, changegroup versions 1 and 2 have a series of groups
225 225 # with one group per file. changegroup 3 has a series of directory
226 226 # manifests before the files.
227 227 count = 0
228 228 emptycount = 0
229 229 while emptycount < self._grouplistcount:
230 230 empty = True
231 231 count += 1
232 232 while True:
233 233 chunk = getchunk(self)
234 234 if not chunk:
235 235 if empty and count > 2:
236 236 emptycount += 1
237 237 break
238 238 empty = False
239 239 yield chunkheader(len(chunk))
240 240 pos = 0
241 241 while pos < len(chunk):
242 242 next = pos + 2**20
243 243 yield chunk[pos:next]
244 244 pos = next
245 245 yield closechunk()
246 246
247 247 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
248 248 # We know that we'll never have more manifests than we had
249 249 # changesets.
250 250 self.callback = prog(_('manifests'), numchanges)
251 251 # no need to check for empty manifest group here:
252 252 # if the result of the merge of 1 and 2 is the same in 3 and 4,
253 253 # no new manifest will be created and the manifest group will
254 254 # be empty during the pull
255 255 self.manifestheader()
256 256 repo.manifestlog._revlog.addgroup(self, revmap, trp)
257 257 repo.ui.progress(_('manifests'), None)
258 258 self.callback = None
259 259
260 260 def apply(self, repo, srctype, url, emptyok=False,
261 261 targetphase=phases.draft, expectedtotal=None):
262 262 """Add the changegroup returned by source.read() to this repo.
263 263 srctype is a string like 'push', 'pull', or 'unbundle'. url is
264 264 the URL of the repo where this changegroup is coming from.
265 265
266 266 Return an integer summarizing the change to this repo:
267 267 - nothing changed or no source: 0
268 268 - more heads than before: 1+added heads (2..n)
269 269 - fewer heads than before: -1-removed heads (-2..-n)
270 270 - number of heads stays the same: 1
271 271 """
272 272 repo = repo.unfiltered()
273 273 def csmap(x):
274 274 repo.ui.debug("add changeset %s\n" % short(x))
275 275 return len(cl)
276 276
277 277 def revmap(x):
278 278 return cl.rev(x)
279 279
280 280 changesets = files = revisions = 0
281 281
282 282 try:
283 283 with repo.transaction("\n".join([srctype,
284 284 util.hidepassword(url)])) as tr:
285 285 # The transaction could have been created before and already
286 286 # carries source information. In this case we use the top
287 287 # level data. We overwrite the argument because we need to use
288 288 # the top level value (if they exist) in this function.
289 289 srctype = tr.hookargs.setdefault('source', srctype)
290 290 url = tr.hookargs.setdefault('url', url)
291 291 repo.hook('prechangegroup', throw=True, **tr.hookargs)
292 292
293 293 # write changelog data to temp files so concurrent readers
294 294 # will not see an inconsistent view
295 295 cl = repo.changelog
296 296 cl.delayupdate(tr)
297 297 oldheads = set(cl.heads())
298 298
299 299 trp = weakref.proxy(tr)
300 300 # pull off the changeset group
301 301 repo.ui.status(_("adding changesets\n"))
302 302 clstart = len(cl)
303 303 class prog(object):
304 304 def __init__(self, step, total):
305 305 self._step = step
306 306 self._total = total
307 307 self._count = 1
308 308 def __call__(self):
309 309 repo.ui.progress(self._step, self._count,
310 310 unit=_('chunks'), total=self._total)
311 311 self._count += 1
312 312 self.callback = prog(_('changesets'), expectedtotal)
313 313
314 314 efiles = set()
315 315 def onchangelog(cl, node):
316 316 efiles.update(cl.readfiles(node))
317 317
318 318 self.changelogheader()
319 319 srccontent = cl.addgroup(self, csmap, trp,
320 320 addrevisioncb=onchangelog)
321 321 efiles = len(efiles)
322 322
323 323 if not (srccontent or emptyok):
324 324 raise error.Abort(_("received changelog group is empty"))
325 325 clend = len(cl)
326 326 changesets = clend - clstart
327 327 repo.ui.progress(_('changesets'), None)
328 328 self.callback = None
329 329
330 330 # pull off the manifest group
331 331 repo.ui.status(_("adding manifests\n"))
332 332 self._unpackmanifests(repo, revmap, trp, prog, changesets)
333 333
334 334 needfiles = {}
335 335 if repo.ui.configbool('server', 'validate', default=False):
336 336 cl = repo.changelog
337 337 ml = repo.manifestlog
338 338 # validate incoming csets have their manifests
339 339 for cset in xrange(clstart, clend):
340 340 mfnode = cl.changelogrevision(cset).manifest
341 341 mfest = ml[mfnode].readdelta()
342 342 # store file nodes we must see
343 343 for f, n in mfest.iteritems():
344 344 needfiles.setdefault(f, set()).add(n)
345 345
346 346 # process the files
347 347 repo.ui.status(_("adding file changes\n"))
348 348 newrevs, newfiles = _addchangegroupfiles(
349 349 repo, self, revmap, trp, efiles, needfiles)
350 350 revisions += newrevs
351 351 files += newfiles
352 352
353 353 dh = 0
354 354 if oldheads:
355 355 heads = cl.heads()
356 356 dh = len(heads) - len(oldheads)
357 357 for h in heads:
358 358 if h not in oldheads and repo[h].closesbranch():
359 359 dh -= 1
360 360 htext = ""
361 361 if dh:
362 362 htext = _(" (%+d heads)") % dh
363 363
364 364 repo.ui.status(_("added %d changesets"
365 365 " with %d changes to %d files%s\n")
366 366 % (changesets, revisions, files, htext))
367 367 repo.invalidatevolatilesets()
368 368
369 369 if changesets > 0:
370 370 if 'node' not in tr.hookargs:
371 371 tr.hookargs['node'] = hex(cl.node(clstart))
372 372 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
373 373 hookargs = dict(tr.hookargs)
374 374 else:
375 375 hookargs = dict(tr.hookargs)
376 376 hookargs['node'] = hex(cl.node(clstart))
377 377 hookargs['node_last'] = hex(cl.node(clend - 1))
378 378 repo.hook('pretxnchangegroup', throw=True, **hookargs)
379 379
380 380 added = [cl.node(r) for r in xrange(clstart, clend)]
381 381 publishing = repo.publishing()
382 382 if srctype in ('push', 'serve'):
383 383 # Old servers can not push the boundary themselves.
384 384 # New servers won't push the boundary if changeset already
385 385 # exists locally as secret
386 386 #
387 387 # We should not use added here but the list of all change in
388 388 # the bundle
389 389 if publishing:
390 390 phases.advanceboundary(repo, tr, phases.public,
391 391 srccontent)
392 392 else:
393 393 # Those changesets have been pushed from the
394 394 # outside, their phases are going to be pushed
395 395 # alongside. Therefor `targetphase` is
396 396 # ignored.
397 397 phases.advanceboundary(repo, tr, phases.draft,
398 398 srccontent)
399 399 phases.retractboundary(repo, tr, phases.draft, added)
400 400 elif srctype != 'strip':
401 401 # publishing only alter behavior during push
402 402 #
403 403 # strip should not touch boundary at all
404 404 phases.retractboundary(repo, tr, targetphase, added)
405 405
406 406 if changesets > 0:
407 407 if srctype != 'strip':
408 408 # During strip, branchcache is invalid but
409 409 # coming call to `destroyed` will repair it.
410 410 # In other case we can safely update cache on
411 411 # disk.
412 412 repo.ui.debug('updating the branch cache\n')
413 413 branchmap.updatecache(repo.filtered('served'))
414 414
415 415 def runhooks():
416 416 # These hooks run when the lock releases, not when the
417 417 # transaction closes. So it's possible for the changelog
418 418 # to have changed since we last saw it.
419 419 if clstart >= len(repo):
420 420 return
421 421
422 422 repo.hook("changegroup", **hookargs)
423 423
424 424 for n in added:
425 425 args = hookargs.copy()
426 426 args['node'] = hex(n)
427 427 del args['node_last']
428 428 repo.hook("incoming", **args)
429 429
430 430 newheads = [h for h in repo.heads()
431 431 if h not in oldheads]
432 432 repo.ui.log("incoming",
433 433 "%s incoming changes - new heads: %s\n",
434 434 len(added),
435 435 ', '.join([hex(c[:6]) for c in newheads]))
436 436
437 437 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
438 438 lambda tr: repo._afterlock(runhooks))
439 439 finally:
440 440 repo.ui.flush()
441 441 # never return 0 here:
442 442 if dh < 0:
443 443 return dh - 1
444 444 else:
445 445 return dh + 1
446 446
447 447 class cg2unpacker(cg1unpacker):
448 448 """Unpacker for cg2 streams.
449 449
450 450 cg2 streams add support for generaldelta, so the delta header
451 451 format is slightly different. All other features about the data
452 452 remain the same.
453 453 """
454 454 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
455 455 deltaheadersize = struct.calcsize(deltaheader)
456 456 version = '02'
457 457
458 458 def _deltaheader(self, headertuple, prevnode):
459 459 node, p1, p2, deltabase, cs = headertuple
460 460 flags = 0
461 461 return node, p1, p2, deltabase, cs, flags
462 462
463 463 class cg3unpacker(cg2unpacker):
464 464 """Unpacker for cg3 streams.
465 465
466 466 cg3 streams add support for exchanging treemanifests and revlog
467 467 flags. It adds the revlog flags to the delta header and an empty chunk
468 468 separating manifests and files.
469 469 """
470 470 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
471 471 deltaheadersize = struct.calcsize(deltaheader)
472 472 version = '03'
473 473 _grouplistcount = 2 # One list of manifests and one list of files
474 474
475 475 def _deltaheader(self, headertuple, prevnode):
476 476 node, p1, p2, deltabase, cs, flags = headertuple
477 477 return node, p1, p2, deltabase, cs, flags
478 478
479 479 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
480 480 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog,
481 481 numchanges)
482 482 for chunkdata in iter(self.filelogheader, {}):
483 483 # If we get here, there are directory manifests in the changegroup
484 484 d = chunkdata["filename"]
485 485 repo.ui.debug("adding %s revisions\n" % d)
486 486 dirlog = repo.manifestlog._revlog.dirlog(d)
487 487 if not dirlog.addgroup(self, revmap, trp):
488 488 raise error.Abort(_("received dir revlog group is empty"))
489 489
490 490 class headerlessfixup(object):
491 491 def __init__(self, fh, h):
492 492 self._h = h
493 493 self._fh = fh
494 494 def read(self, n):
495 495 if self._h:
496 496 d, self._h = self._h[:n], self._h[n:]
497 497 if len(d) < n:
498 498 d += readexactly(self._fh, n - len(d))
499 499 return d
500 500 return readexactly(self._fh, n)
501 501
502 502 class cg1packer(object):
503 503 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
504 504 version = '01'
505 505 def __init__(self, repo):
506 506 """Given a source repo, construct a bundler.
507 507 """
508 508 # experimental config: bundle.reorder
509 509 reorder = repo.ui.config('bundle', 'reorder', 'auto')
510 510 if reorder == 'auto':
511 511 reorder = None
512 512 else:
513 513 reorder = util.parsebool(reorder)
514 514 self._repo = repo
515 515 self._reorder = reorder
516 516 self._progress = repo.ui.progress
517 517 if self._repo.ui.verbose and not self._repo.ui.debugflag:
518 518 self._verbosenote = self._repo.ui.note
519 519 else:
520 520 self._verbosenote = lambda s: None
521 521
522 522 def close(self):
523 523 return closechunk()
524 524
525 525 def fileheader(self, fname):
526 526 return chunkheader(len(fname)) + fname
527 527
528 528 # Extracted both for clarity and for overriding in extensions.
529 529 def _sortgroup(self, revlog, nodelist, lookup):
530 530 """Sort nodes for change group and turn them into revnums."""
531 531 # for generaldelta revlogs, we linearize the revs; this will both be
532 532 # much quicker and generate a much smaller bundle
533 533 if (revlog._generaldelta and self._reorder is None) or self._reorder:
534 534 dag = dagutil.revlogdag(revlog)
535 535 return dag.linearize(set(revlog.rev(n) for n in nodelist))
536 536 else:
537 537 return sorted([revlog.rev(n) for n in nodelist])
538 538
539 539 def group(self, nodelist, revlog, lookup, units=None):
540 540 """Calculate a delta group, yielding a sequence of changegroup chunks
541 541 (strings).
542 542
543 543 Given a list of changeset revs, return a set of deltas and
544 544 metadata corresponding to nodes. The first delta is
545 545 first parent(nodelist[0]) -> nodelist[0], the receiver is
546 546 guaranteed to have this parent as it has all history before
547 547 these changesets. In the case firstparent is nullrev the
548 548 changegroup starts with a full revision.
549 549
550 550 If units is not None, progress detail will be generated, units specifies
551 551 the type of revlog that is touched (changelog, manifest, etc.).
552 552 """
553 553 # if we don't have any revisions touched by these changesets, bail
554 554 if len(nodelist) == 0:
555 555 yield self.close()
556 556 return
557 557
558 558 revs = self._sortgroup(revlog, nodelist, lookup)
559 559
560 560 # add the parent of the first rev
561 561 p = revlog.parentrevs(revs[0])[0]
562 562 revs.insert(0, p)
563 563
564 564 # build deltas
565 565 total = len(revs) - 1
566 566 msgbundling = _('bundling')
567 567 for r in xrange(len(revs) - 1):
568 568 if units is not None:
569 569 self._progress(msgbundling, r + 1, unit=units, total=total)
570 570 prev, curr = revs[r], revs[r + 1]
571 571 linknode = lookup(revlog.node(curr))
572 572 for c in self.revchunk(revlog, curr, prev, linknode):
573 573 yield c
574 574
575 575 if units is not None:
576 576 self._progress(msgbundling, None)
577 577 yield self.close()
578 578
579 579 # filter any nodes that claim to be part of the known set
580 580 def prune(self, revlog, missing, commonrevs):
581 581 rr, rl = revlog.rev, revlog.linkrev
582 582 return [n for n in missing if rl(rr(n)) not in commonrevs]
583 583
584 584 def _packmanifests(self, dir, mfnodes, lookuplinknode):
585 585 """Pack flat manifests into a changegroup stream."""
586 586 assert not dir
587 587 for chunk in self.group(mfnodes, self._repo.manifestlog._revlog,
588 588 lookuplinknode, units=_('manifests')):
589 589 yield chunk
590 590
591 591 def _manifestsdone(self):
592 592 return ''
593 593
594 594 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
595 595 '''yield a sequence of changegroup chunks (strings)'''
596 596 repo = self._repo
597 597 cl = repo.changelog
598 598
599 599 clrevorder = {}
600 600 mfs = {} # needed manifests
601 601 fnodes = {} # needed file nodes
602 602 changedfiles = set()
603 603
604 604 # Callback for the changelog, used to collect changed files and manifest
605 605 # nodes.
606 606 # Returns the linkrev node (identity in the changelog case).
607 607 def lookupcl(x):
608 608 c = cl.read(x)
609 609 clrevorder[x] = len(clrevorder)
610 610 n = c[0]
611 611 # record the first changeset introducing this manifest version
612 612 mfs.setdefault(n, x)
613 613 # Record a complete list of potentially-changed files in
614 614 # this manifest.
615 615 changedfiles.update(c[3])
616 616 return x
617 617
618 618 self._verbosenote(_('uncompressed size of bundle content:\n'))
619 619 size = 0
620 620 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
621 621 size += len(chunk)
622 622 yield chunk
623 623 self._verbosenote(_('%8.i (changelog)\n') % size)
624 624
625 625 # We need to make sure that the linkrev in the changegroup refers to
626 626 # the first changeset that introduced the manifest or file revision.
627 627 # The fastpath is usually safer than the slowpath, because the filelogs
628 628 # are walked in revlog order.
629 629 #
630 630 # When taking the slowpath with reorder=None and the manifest revlog
631 631 # uses generaldelta, the manifest may be walked in the "wrong" order.
632 632 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
633 633 # cc0ff93d0c0c).
634 634 #
635 635 # When taking the fastpath, we are only vulnerable to reordering
636 636 # of the changelog itself. The changelog never uses generaldelta, so
637 637 # it is only reordered when reorder=True. To handle this case, we
638 638 # simply take the slowpath, which already has the 'clrevorder' logic.
639 639 # This was also fixed in cc0ff93d0c0c.
640 640 fastpathlinkrev = fastpathlinkrev and not self._reorder
641 641 # Treemanifests don't work correctly with fastpathlinkrev
642 642 # either, because we don't discover which directory nodes to
643 643 # send along with files. This could probably be fixed.
644 644 fastpathlinkrev = fastpathlinkrev and (
645 645 'treemanifest' not in repo.requirements)
646 646
647 647 for chunk in self.generatemanifests(commonrevs, clrevorder,
648 648 fastpathlinkrev, mfs, fnodes):
649 649 yield chunk
650 650 mfs.clear()
651 651 clrevs = set(cl.rev(x) for x in clnodes)
652 652
653 653 if not fastpathlinkrev:
654 654 def linknodes(unused, fname):
655 655 return fnodes.get(fname, {})
656 656 else:
657 657 cln = cl.node
658 658 def linknodes(filerevlog, fname):
659 659 llr = filerevlog.linkrev
660 660 fln = filerevlog.node
661 661 revs = ((r, llr(r)) for r in filerevlog)
662 662 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
663 663
664 664 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
665 665 source):
666 666 yield chunk
667 667
668 668 yield self.close()
669 669
670 670 if clnodes:
671 671 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
672 672
673 673 def generatemanifests(self, commonrevs, clrevorder, fastpathlinkrev, mfs,
674 674 fnodes):
675 675 repo = self._repo
676 676 mfl = repo.manifestlog
677 677 dirlog = mfl._revlog.dirlog
678 678 tmfnodes = {'': mfs}
679 679
680 680 # Callback for the manifest, used to collect linkrevs for filelog
681 681 # revisions.
682 682 # Returns the linkrev node (collected in lookupcl).
683 683 def makelookupmflinknode(dir):
684 684 if fastpathlinkrev:
685 685 assert not dir
686 686 return mfs.__getitem__
687 687
688 688 def lookupmflinknode(x):
689 689 """Callback for looking up the linknode for manifests.
690 690
691 691 Returns the linkrev node for the specified manifest.
692 692
693 693 SIDE EFFECT:
694 694
695 695 1) fclnodes gets populated with the list of relevant
696 696 file nodes if we're not using fastpathlinkrev
697 697 2) When treemanifests are in use, collects treemanifest nodes
698 698 to send
699 699
700 700 Note that this means manifests must be completely sent to
701 701 the client before you can trust the list of files and
702 702 treemanifests to send.
703 703 """
704 704 clnode = tmfnodes[dir][x]
705 705 mdata = mfl.get(dir, x).readfast(shallow=True)
706 706 for p, n, fl in mdata.iterentries():
707 707 if fl == 't': # subdirectory manifest
708 708 subdir = dir + p + '/'
709 709 tmfclnodes = tmfnodes.setdefault(subdir, {})
710 710 tmfclnode = tmfclnodes.setdefault(n, clnode)
711 711 if clrevorder[clnode] < clrevorder[tmfclnode]:
712 712 tmfclnodes[n] = clnode
713 713 else:
714 714 f = dir + p
715 715 fclnodes = fnodes.setdefault(f, {})
716 716 fclnode = fclnodes.setdefault(n, clnode)
717 717 if clrevorder[clnode] < clrevorder[fclnode]:
718 718 fclnodes[n] = clnode
719 719 return clnode
720 720 return lookupmflinknode
721 721
722 722 size = 0
723 723 while tmfnodes:
724 724 dir = min(tmfnodes)
725 725 nodes = tmfnodes[dir]
726 726 prunednodes = self.prune(dirlog(dir), nodes, commonrevs)
727 727 if not dir or prunednodes:
728 728 for x in self._packmanifests(dir, prunednodes,
729 729 makelookupmflinknode(dir)):
730 730 size += len(x)
731 731 yield x
732 732 del tmfnodes[dir]
733 733 self._verbosenote(_('%8.i (manifests)\n') % size)
734 734 yield self._manifestsdone()
735 735
736 736 # The 'source' parameter is useful for extensions
737 737 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
738 738 repo = self._repo
739 739 progress = self._progress
740 740 msgbundling = _('bundling')
741 741
742 742 total = len(changedfiles)
743 743 # for progress output
744 744 msgfiles = _('files')
745 745 for i, fname in enumerate(sorted(changedfiles)):
746 746 filerevlog = repo.file(fname)
747 747 if not filerevlog:
748 748 raise error.Abort(_("empty or missing revlog for %s") % fname)
749 749
750 750 linkrevnodes = linknodes(filerevlog, fname)
751 751 # Lookup for filenodes, we collected the linkrev nodes above in the
752 752 # fastpath case and with lookupmf in the slowpath case.
753 753 def lookupfilelog(x):
754 754 return linkrevnodes[x]
755 755
756 756 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
757 757 if filenodes:
758 758 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
759 759 total=total)
760 760 h = self.fileheader(fname)
761 761 size = len(h)
762 762 yield h
763 763 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
764 764 size += len(chunk)
765 765 yield chunk
766 766 self._verbosenote(_('%8.i %s\n') % (size, fname))
767 767 progress(msgbundling, None)
768 768
769 769 def deltaparent(self, revlog, rev, p1, p2, prev):
770 770 return prev
771 771
772 772 def revchunk(self, revlog, rev, prev, linknode):
773 773 node = revlog.node(rev)
774 774 p1, p2 = revlog.parentrevs(rev)
775 775 base = self.deltaparent(revlog, rev, p1, p2, prev)
776 776
777 777 prefix = ''
778 778 if revlog.iscensored(base) or revlog.iscensored(rev):
779 779 try:
780 780 delta = revlog.revision(node, raw=True)
781 781 except error.CensoredNodeError as e:
782 782 delta = e.tombstone
783 783 if base == nullrev:
784 784 prefix = mdiff.trivialdiffheader(len(delta))
785 785 else:
786 786 baselen = revlog.rawsize(base)
787 787 prefix = mdiff.replacediffheader(baselen, len(delta))
788 788 elif base == nullrev:
789 789 delta = revlog.revision(node, raw=True)
790 790 prefix = mdiff.trivialdiffheader(len(delta))
791 791 else:
792 792 delta = revlog.revdiff(base, rev)
793 793 p1n, p2n = revlog.parents(node)
794 794 basenode = revlog.node(base)
795 795 flags = revlog.flags(rev)
796 796 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
797 797 meta += prefix
798 798 l = len(meta) + len(delta)
799 799 yield chunkheader(l)
800 800 yield meta
801 801 yield delta
802 802 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
803 803 # do nothing with basenode, it is implicitly the previous one in HG10
804 804 # do nothing with flags, it is implicitly 0 for cg1 and cg2
805 805 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
806 806
807 807 class cg2packer(cg1packer):
808 808 version = '02'
809 809 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
810 810
811 811 def __init__(self, repo):
812 812 super(cg2packer, self).__init__(repo)
813 813 if self._reorder is None:
814 814 # Since generaldelta is directly supported by cg2, reordering
815 815 # generally doesn't help, so we disable it by default (treating
816 816 # bundle.reorder=auto just like bundle.reorder=False).
817 817 self._reorder = False
818 818
819 819 def deltaparent(self, revlog, rev, p1, p2, prev):
820 820 dp = revlog.deltaparent(rev)
821 821 if dp == nullrev and revlog.storedeltachains:
822 822 # Avoid sending full revisions when delta parent is null. Pick prev
823 823 # in that case. It's tempting to pick p1 in this case, as p1 will
824 824 # be smaller in the common case. However, computing a delta against
825 825 # p1 may require resolving the raw text of p1, which could be
826 826 # expensive. The revlog caches should have prev cached, meaning
827 827 # less CPU for changegroup generation. There is likely room to add
828 828 # a flag and/or config option to control this behavior.
829 829 return prev
830 830 elif dp == nullrev:
831 831 # revlog is configured to use full snapshot for a reason,
832 832 # stick to full snapshot.
833 833 return nullrev
834 834 elif dp not in (p1, p2, prev):
835 835 # Pick prev when we can't be sure remote has the base revision.
836 836 return prev
837 837 else:
838 838 return dp
839 839
840 840 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
841 841 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
842 842 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
843 843
844 844 class cg3packer(cg2packer):
845 845 version = '03'
846 846 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
847 847
848 848 def _packmanifests(self, dir, mfnodes, lookuplinknode):
849 849 if dir:
850 850 yield self.fileheader(dir)
851 851
852 852 dirlog = self._repo.manifestlog._revlog.dirlog(dir)
853 853 for chunk in self.group(mfnodes, dirlog, lookuplinknode,
854 854 units=_('manifests')):
855 855 yield chunk
856 856
857 857 def _manifestsdone(self):
858 858 return self.close()
859 859
860 860 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
861 861 return struct.pack(
862 862 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
863 863
864 864 _packermap = {'01': (cg1packer, cg1unpacker),
865 865 # cg2 adds support for exchanging generaldelta
866 866 '02': (cg2packer, cg2unpacker),
867 867 # cg3 adds support for exchanging revlog flags and treemanifests
868 868 '03': (cg3packer, cg3unpacker),
869 869 }
870 870
871 871 def allsupportedversions(repo):
872 872 versions = set(_packermap.keys())
873 873 if not (repo.ui.configbool('experimental', 'changegroup3') or
874 874 repo.ui.configbool('experimental', 'treemanifest') or
875 875 'treemanifest' in repo.requirements):
876 876 versions.discard('03')
877 877 return versions
878 878
879 879 # Changegroup versions that can be applied to the repo
880 880 def supportedincomingversions(repo):
881 881 return allsupportedversions(repo)
882 882
883 883 # Changegroup versions that can be created from the repo
884 884 def supportedoutgoingversions(repo):
885 885 versions = allsupportedversions(repo)
886 886 if 'treemanifest' in repo.requirements:
887 887 # Versions 01 and 02 support only flat manifests and it's just too
888 888 # expensive to convert between the flat manifest and tree manifest on
889 889 # the fly. Since tree manifests are hashed differently, all of history
890 890 # would have to be converted. Instead, we simply don't even pretend to
891 891 # support versions 01 and 02.
892 892 versions.discard('01')
893 893 versions.discard('02')
894 894 return versions
895 895
896 896 def safeversion(repo):
897 897 # Finds the smallest version that it's safe to assume clients of the repo
898 898 # will support. For example, all hg versions that support generaldelta also
899 899 # support changegroup 02.
900 900 versions = supportedoutgoingversions(repo)
901 901 if 'generaldelta' in repo.requirements:
902 902 versions.discard('01')
903 903 assert versions
904 904 return min(versions)
905 905
906 906 def getbundler(version, repo):
907 907 assert version in supportedoutgoingversions(repo)
908 908 return _packermap[version][0](repo)
909 909
910 910 def getunbundler(version, fh, alg, extras=None):
911 911 return _packermap[version][1](fh, alg, extras=extras)
912 912
913 913 def _changegroupinfo(repo, nodes, source):
914 914 if repo.ui.verbose or source == 'bundle':
915 915 repo.ui.status(_("%d changesets found\n") % len(nodes))
916 916 if repo.ui.debugflag:
917 917 repo.ui.debug("list of changesets:\n")
918 918 for node in nodes:
919 919 repo.ui.debug("%s\n" % hex(node))
920 920
921 921 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
922 922 repo = repo.unfiltered()
923 923 commonrevs = outgoing.common
924 924 csets = outgoing.missing
925 925 heads = outgoing.missingheads
926 926 # We go through the fast path if we get told to, or if all (unfiltered
927 927 # heads have been requested (since we then know there all linkrevs will
928 928 # be pulled by the client).
929 929 heads.sort()
930 930 fastpathlinkrev = fastpath or (
931 931 repo.filtername is None and heads == sorted(repo.heads()))
932 932
933 933 repo.hook('preoutgoing', throw=True, source=source)
934 934 _changegroupinfo(repo, csets, source)
935 935 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
936 936
937 937 def getsubset(repo, outgoing, bundler, source, fastpath=False):
938 938 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
939 939 return getunbundler(bundler.version, util.chunkbuffer(gengroup), None,
940 940 {'clcount': len(outgoing.missing)})
941 941
942 942 def changegroupsubset(repo, roots, heads, source, version='01'):
943 943 """Compute a changegroup consisting of all the nodes that are
944 944 descendants of any of the roots and ancestors of any of the heads.
945 945 Return a chunkbuffer object whose read() method will return
946 946 successive changegroup chunks.
947 947
948 948 It is fairly complex as determining which filenodes and which
949 949 manifest nodes need to be included for the changeset to be complete
950 950 is non-trivial.
951 951
952 952 Another wrinkle is doing the reverse, figuring out which changeset in
953 953 the changegroup a particular filenode or manifestnode belongs to.
954 954 """
955 955 outgoing = discovery.outgoing(repo, missingroots=roots, missingheads=heads)
956 956 bundler = getbundler(version, repo)
957 957 return getsubset(repo, outgoing, bundler, source)
958 958
959 959 def getlocalchangegroupraw(repo, source, outgoing, version='01'):
960 960 """Like getbundle, but taking a discovery.outgoing as an argument.
961 961
962 962 This is only implemented for local repos and reuses potentially
963 963 precomputed sets in outgoing. Returns a raw changegroup generator."""
964 964 if not outgoing.missing:
965 965 return None
966 966 bundler = getbundler(version, repo)
967 967 return getsubsetraw(repo, outgoing, bundler, source)
968 968
969 def getlocalchangegroup(repo, source, outgoing, version='01'):
969 def getchangegroup(repo, source, outgoing, version='01'):
970 970 """Like getbundle, but taking a discovery.outgoing as an argument.
971 971
972 972 This is only implemented for local repos and reuses potentially
973 973 precomputed sets in outgoing."""
974 974 if not outgoing.missing:
975 975 return None
976 976 bundler = getbundler(version, repo)
977 977 return getsubset(repo, outgoing, bundler, source)
978 978
979 def getchangegroup(repo, source, outgoing, version='01'):
980 """Like changegroupsubset, but returns the set difference between the
981 ancestors of heads and the ancestors common.
982
983 If heads is None, use the local heads. If common is None, use [nullid].
984
985 The nodes in common might not all be known locally due to the way the
986 current discovery protocol works.
987 """
988 return getlocalchangegroup(repo, source, outgoing, version=version)
979 # deprecate me once all users are gone
980 getlocalchangegroup = getchangegroup
989 981
990 982 def changegroup(repo, basenodes, source):
991 983 # to avoid a race we use changegroupsubset() (issue1320)
992 984 return changegroupsubset(repo, basenodes, repo.heads(), source)
993 985
994 986 def _addchangegroupfiles(repo, source, revmap, trp, expectedfiles, needfiles):
995 987 revisions = 0
996 988 files = 0
997 989 for chunkdata in iter(source.filelogheader, {}):
998 990 files += 1
999 991 f = chunkdata["filename"]
1000 992 repo.ui.debug("adding %s revisions\n" % f)
1001 993 repo.ui.progress(_('files'), files, unit=_('files'),
1002 994 total=expectedfiles)
1003 995 fl = repo.file(f)
1004 996 o = len(fl)
1005 997 try:
1006 998 if not fl.addgroup(source, revmap, trp):
1007 999 raise error.Abort(_("received file revlog group is empty"))
1008 1000 except error.CensoredBaseError as e:
1009 1001 raise error.Abort(_("received delta base is censored: %s") % e)
1010 1002 revisions += len(fl) - o
1011 1003 if f in needfiles:
1012 1004 needs = needfiles[f]
1013 1005 for new in xrange(o, len(fl)):
1014 1006 n = fl.node(new)
1015 1007 if n in needs:
1016 1008 needs.remove(n)
1017 1009 else:
1018 1010 raise error.Abort(
1019 1011 _("received spurious file revlog entry"))
1020 1012 if not needs:
1021 1013 del needfiles[f]
1022 1014 repo.ui.progress(_('files'), None)
1023 1015
1024 1016 for f, needs in needfiles.iteritems():
1025 1017 fl = repo.file(f)
1026 1018 for n in needs:
1027 1019 try:
1028 1020 fl.rev(n)
1029 1021 except error.LookupError:
1030 1022 raise error.Abort(
1031 1023 _('missing file data for %s:%s - run hg verify') %
1032 1024 (f, hex(n)))
1033 1025
1034 1026 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now