##// END OF EJS Templates
changegroup: cache changelog and manifestlog outside of loop...
Gregory Szorc -
r30267:d92777f9 default
parent child Browse files
Show More
@@ -1,1042 +1,1043 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullrev,
19 19 short,
20 20 )
21 21
22 22 from . import (
23 23 branchmap,
24 24 dagutil,
25 25 discovery,
26 26 error,
27 27 mdiff,
28 28 phases,
29 29 util,
30 30 )
31 31
32 32 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
33 33 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
34 34 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
35 35
36 36 def readexactly(stream, n):
37 37 '''read n bytes from stream.read and abort if less was available'''
38 38 s = stream.read(n)
39 39 if len(s) < n:
40 40 raise error.Abort(_("stream ended unexpectedly"
41 41 " (got %d bytes, expected %d)")
42 42 % (len(s), n))
43 43 return s
44 44
45 45 def getchunk(stream):
46 46 """return the next chunk from stream as a string"""
47 47 d = readexactly(stream, 4)
48 48 l = struct.unpack(">l", d)[0]
49 49 if l <= 4:
50 50 if l:
51 51 raise error.Abort(_("invalid chunk length %d") % l)
52 52 return ""
53 53 return readexactly(stream, l - 4)
54 54
55 55 def chunkheader(length):
56 56 """return a changegroup chunk header (string)"""
57 57 return struct.pack(">l", length + 4)
58 58
59 59 def closechunk():
60 60 """return a changegroup chunk header (string) for a zero-length chunk"""
61 61 return struct.pack(">l", 0)
62 62
63 63 def combineresults(results):
64 64 """logic to combine 0 or more addchangegroup results into one"""
65 65 changedheads = 0
66 66 result = 1
67 67 for ret in results:
68 68 # If any changegroup result is 0, return 0
69 69 if ret == 0:
70 70 result = 0
71 71 break
72 72 if ret < -1:
73 73 changedheads += ret + 1
74 74 elif ret > 1:
75 75 changedheads += ret - 1
76 76 if changedheads > 0:
77 77 result = 1 + changedheads
78 78 elif changedheads < 0:
79 79 result = -1 + changedheads
80 80 return result
81 81
82 82 def writechunks(ui, chunks, filename, vfs=None):
83 83 """Write chunks to a file and return its filename.
84 84
85 85 The stream is assumed to be a bundle file.
86 86 Existing files will not be overwritten.
87 87 If no filename is specified, a temporary file is created.
88 88 """
89 89 fh = None
90 90 cleanup = None
91 91 try:
92 92 if filename:
93 93 if vfs:
94 94 fh = vfs.open(filename, "wb")
95 95 else:
96 96 # Increase default buffer size because default is usually
97 97 # small (4k is common on Linux).
98 98 fh = open(filename, "wb", 131072)
99 99 else:
100 100 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
101 101 fh = os.fdopen(fd, "wb")
102 102 cleanup = filename
103 103 for c in chunks:
104 104 fh.write(c)
105 105 cleanup = None
106 106 return filename
107 107 finally:
108 108 if fh is not None:
109 109 fh.close()
110 110 if cleanup is not None:
111 111 if filename and vfs:
112 112 vfs.unlink(cleanup)
113 113 else:
114 114 os.unlink(cleanup)
115 115
116 116 class cg1unpacker(object):
117 117 """Unpacker for cg1 changegroup streams.
118 118
119 119 A changegroup unpacker handles the framing of the revision data in
120 120 the wire format. Most consumers will want to use the apply()
121 121 method to add the changes from the changegroup to a repository.
122 122
123 123 If you're forwarding a changegroup unmodified to another consumer,
124 124 use getchunks(), which returns an iterator of changegroup
125 125 chunks. This is mostly useful for cases where you need to know the
126 126 data stream has ended by observing the end of the changegroup.
127 127
128 128 deltachunk() is useful only if you're applying delta data. Most
129 129 consumers should prefer apply() instead.
130 130
131 131 A few other public methods exist. Those are used only for
132 132 bundlerepo and some debug commands - their use is discouraged.
133 133 """
134 134 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
135 135 deltaheadersize = struct.calcsize(deltaheader)
136 136 version = '01'
137 137 _grouplistcount = 1 # One list of files after the manifests
138 138
139 139 def __init__(self, fh, alg, extras=None):
140 140 if alg == 'UN':
141 141 alg = None # get more modern without breaking too much
142 142 if not alg in util.decompressors:
143 143 raise error.Abort(_('unknown stream compression type: %s')
144 144 % alg)
145 145 if alg == 'BZ':
146 146 alg = '_truncatedBZ'
147 147 self._stream = util.decompressors[alg](fh)
148 148 self._type = alg
149 149 self.extras = extras or {}
150 150 self.callback = None
151 151
152 152 # These methods (compressed, read, seek, tell) all appear to only
153 153 # be used by bundlerepo, but it's a little hard to tell.
154 154 def compressed(self):
155 155 return self._type is not None
156 156 def read(self, l):
157 157 return self._stream.read(l)
158 158 def seek(self, pos):
159 159 return self._stream.seek(pos)
160 160 def tell(self):
161 161 return self._stream.tell()
162 162 def close(self):
163 163 return self._stream.close()
164 164
165 165 def _chunklength(self):
166 166 d = readexactly(self._stream, 4)
167 167 l = struct.unpack(">l", d)[0]
168 168 if l <= 4:
169 169 if l:
170 170 raise error.Abort(_("invalid chunk length %d") % l)
171 171 return 0
172 172 if self.callback:
173 173 self.callback()
174 174 return l - 4
175 175
176 176 def changelogheader(self):
177 177 """v10 does not have a changelog header chunk"""
178 178 return {}
179 179
180 180 def manifestheader(self):
181 181 """v10 does not have a manifest header chunk"""
182 182 return {}
183 183
184 184 def filelogheader(self):
185 185 """return the header of the filelogs chunk, v10 only has the filename"""
186 186 l = self._chunklength()
187 187 if not l:
188 188 return {}
189 189 fname = readexactly(self._stream, l)
190 190 return {'filename': fname}
191 191
192 192 def _deltaheader(self, headertuple, prevnode):
193 193 node, p1, p2, cs = headertuple
194 194 if prevnode is None:
195 195 deltabase = p1
196 196 else:
197 197 deltabase = prevnode
198 198 flags = 0
199 199 return node, p1, p2, deltabase, cs, flags
200 200
201 201 def deltachunk(self, prevnode):
202 202 l = self._chunklength()
203 203 if not l:
204 204 return {}
205 205 headerdata = readexactly(self._stream, self.deltaheadersize)
206 206 header = struct.unpack(self.deltaheader, headerdata)
207 207 delta = readexactly(self._stream, l - self.deltaheadersize)
208 208 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
209 209 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
210 210 'deltabase': deltabase, 'delta': delta, 'flags': flags}
211 211
212 212 def getchunks(self):
213 213 """returns all the chunks contains in the bundle
214 214
215 215 Used when you need to forward the binary stream to a file or another
216 216 network API. To do so, it parse the changegroup data, otherwise it will
217 217 block in case of sshrepo because it don't know the end of the stream.
218 218 """
219 219 # an empty chunkgroup is the end of the changegroup
220 220 # a changegroup has at least 2 chunkgroups (changelog and manifest).
221 221 # after that, changegroup versions 1 and 2 have a series of groups
222 222 # with one group per file. changegroup 3 has a series of directory
223 223 # manifests before the files.
224 224 count = 0
225 225 emptycount = 0
226 226 while emptycount < self._grouplistcount:
227 227 empty = True
228 228 count += 1
229 229 while True:
230 230 chunk = getchunk(self)
231 231 if not chunk:
232 232 if empty and count > 2:
233 233 emptycount += 1
234 234 break
235 235 empty = False
236 236 yield chunkheader(len(chunk))
237 237 pos = 0
238 238 while pos < len(chunk):
239 239 next = pos + 2**20
240 240 yield chunk[pos:next]
241 241 pos = next
242 242 yield closechunk()
243 243
244 244 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
245 245 # We know that we'll never have more manifests than we had
246 246 # changesets.
247 247 self.callback = prog(_('manifests'), numchanges)
248 248 # no need to check for empty manifest group here:
249 249 # if the result of the merge of 1 and 2 is the same in 3 and 4,
250 250 # no new manifest will be created and the manifest group will
251 251 # be empty during the pull
252 252 self.manifestheader()
253 253 repo.manifest.addgroup(self, revmap, trp)
254 254 repo.ui.progress(_('manifests'), None)
255 255 self.callback = None
256 256
257 257 def apply(self, repo, srctype, url, emptyok=False,
258 258 targetphase=phases.draft, expectedtotal=None):
259 259 """Add the changegroup returned by source.read() to this repo.
260 260 srctype is a string like 'push', 'pull', or 'unbundle'. url is
261 261 the URL of the repo where this changegroup is coming from.
262 262
263 263 Return an integer summarizing the change to this repo:
264 264 - nothing changed or no source: 0
265 265 - more heads than before: 1+added heads (2..n)
266 266 - fewer heads than before: -1-removed heads (-2..-n)
267 267 - number of heads stays the same: 1
268 268 """
269 269 repo = repo.unfiltered()
270 270 def csmap(x):
271 271 repo.ui.debug("add changeset %s\n" % short(x))
272 272 return len(cl)
273 273
274 274 def revmap(x):
275 275 return cl.rev(x)
276 276
277 277 changesets = files = revisions = 0
278 278
279 279 try:
280 280 with repo.transaction("\n".join([srctype,
281 281 util.hidepassword(url)])) as tr:
282 282 # The transaction could have been created before and already
283 283 # carries source information. In this case we use the top
284 284 # level data. We overwrite the argument because we need to use
285 285 # the top level value (if they exist) in this function.
286 286 srctype = tr.hookargs.setdefault('source', srctype)
287 287 url = tr.hookargs.setdefault('url', url)
288 288 repo.hook('prechangegroup', throw=True, **tr.hookargs)
289 289
290 290 # write changelog data to temp files so concurrent readers
291 291 # will not see an inconsistent view
292 292 cl = repo.changelog
293 293 cl.delayupdate(tr)
294 294 oldheads = cl.heads()
295 295
296 296 trp = weakref.proxy(tr)
297 297 # pull off the changeset group
298 298 repo.ui.status(_("adding changesets\n"))
299 299 clstart = len(cl)
300 300 class prog(object):
301 301 def __init__(self, step, total):
302 302 self._step = step
303 303 self._total = total
304 304 self._count = 1
305 305 def __call__(self):
306 306 repo.ui.progress(self._step, self._count,
307 307 unit=_('chunks'), total=self._total)
308 308 self._count += 1
309 309 self.callback = prog(_('changesets'), expectedtotal)
310 310
311 311 efiles = set()
312 312 def onchangelog(cl, node):
313 313 efiles.update(cl.readfiles(node))
314 314
315 315 self.changelogheader()
316 316 srccontent = cl.addgroup(self, csmap, trp,
317 317 addrevisioncb=onchangelog)
318 318 efiles = len(efiles)
319 319
320 320 if not (srccontent or emptyok):
321 321 raise error.Abort(_("received changelog group is empty"))
322 322 clend = len(cl)
323 323 changesets = clend - clstart
324 324 repo.ui.progress(_('changesets'), None)
325 325 self.callback = None
326 326
327 327 # pull off the manifest group
328 328 repo.ui.status(_("adding manifests\n"))
329 329 self._unpackmanifests(repo, revmap, trp, prog, changesets)
330 330
331 331 needfiles = {}
332 332 if repo.ui.configbool('server', 'validate', default=False):
333 cl = repo.changelog
334 ml = repo.manifestlog
333 335 # validate incoming csets have their manifests
334 336 for cset in xrange(clstart, clend):
335 mfnode = repo.changelog.read(
336 repo.changelog.node(cset))[0]
337 mfest = repo.manifestlog[mfnode].readdelta()
337 mfnode = cl.read(cl.node(cset))[0]
338 mfest = ml[mfnode].readdelta()
338 339 # store file nodes we must see
339 340 for f, n in mfest.iteritems():
340 341 needfiles.setdefault(f, set()).add(n)
341 342
342 343 # process the files
343 344 repo.ui.status(_("adding file changes\n"))
344 345 newrevs, newfiles = _addchangegroupfiles(
345 346 repo, self, revmap, trp, efiles, needfiles)
346 347 revisions += newrevs
347 348 files += newfiles
348 349
349 350 dh = 0
350 351 if oldheads:
351 352 heads = cl.heads()
352 353 dh = len(heads) - len(oldheads)
353 354 for h in heads:
354 355 if h not in oldheads and repo[h].closesbranch():
355 356 dh -= 1
356 357 htext = ""
357 358 if dh:
358 359 htext = _(" (%+d heads)") % dh
359 360
360 361 repo.ui.status(_("added %d changesets"
361 362 " with %d changes to %d files%s\n")
362 363 % (changesets, revisions, files, htext))
363 364 repo.invalidatevolatilesets()
364 365
365 366 if changesets > 0:
366 367 if 'node' not in tr.hookargs:
367 368 tr.hookargs['node'] = hex(cl.node(clstart))
368 369 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
369 370 hookargs = dict(tr.hookargs)
370 371 else:
371 372 hookargs = dict(tr.hookargs)
372 373 hookargs['node'] = hex(cl.node(clstart))
373 374 hookargs['node_last'] = hex(cl.node(clend - 1))
374 375 repo.hook('pretxnchangegroup', throw=True, **hookargs)
375 376
376 377 added = [cl.node(r) for r in xrange(clstart, clend)]
377 378 publishing = repo.publishing()
378 379 if srctype in ('push', 'serve'):
379 380 # Old servers can not push the boundary themselves.
380 381 # New servers won't push the boundary if changeset already
381 382 # exists locally as secret
382 383 #
383 384 # We should not use added here but the list of all change in
384 385 # the bundle
385 386 if publishing:
386 387 phases.advanceboundary(repo, tr, phases.public,
387 388 srccontent)
388 389 else:
389 390 # Those changesets have been pushed from the
390 391 # outside, their phases are going to be pushed
391 392 # alongside. Therefor `targetphase` is
392 393 # ignored.
393 394 phases.advanceboundary(repo, tr, phases.draft,
394 395 srccontent)
395 396 phases.retractboundary(repo, tr, phases.draft, added)
396 397 elif srctype != 'strip':
397 398 # publishing only alter behavior during push
398 399 #
399 400 # strip should not touch boundary at all
400 401 phases.retractboundary(repo, tr, targetphase, added)
401 402
402 403 if changesets > 0:
403 404 if srctype != 'strip':
404 405 # During strip, branchcache is invalid but
405 406 # coming call to `destroyed` will repair it.
406 407 # In other case we can safely update cache on
407 408 # disk.
408 409 repo.ui.debug('updating the branch cache\n')
409 410 branchmap.updatecache(repo.filtered('served'))
410 411
411 412 def runhooks():
412 413 # These hooks run when the lock releases, not when the
413 414 # transaction closes. So it's possible for the changelog
414 415 # to have changed since we last saw it.
415 416 if clstart >= len(repo):
416 417 return
417 418
418 419 repo.hook("changegroup", **hookargs)
419 420
420 421 for n in added:
421 422 args = hookargs.copy()
422 423 args['node'] = hex(n)
423 424 del args['node_last']
424 425 repo.hook("incoming", **args)
425 426
426 427 newheads = [h for h in repo.heads()
427 428 if h not in oldheads]
428 429 repo.ui.log("incoming",
429 430 "%s incoming changes - new heads: %s\n",
430 431 len(added),
431 432 ', '.join([hex(c[:6]) for c in newheads]))
432 433
433 434 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
434 435 lambda tr: repo._afterlock(runhooks))
435 436 finally:
436 437 repo.ui.flush()
437 438 # never return 0 here:
438 439 if dh < 0:
439 440 return dh - 1
440 441 else:
441 442 return dh + 1
442 443
443 444 class cg2unpacker(cg1unpacker):
444 445 """Unpacker for cg2 streams.
445 446
446 447 cg2 streams add support for generaldelta, so the delta header
447 448 format is slightly different. All other features about the data
448 449 remain the same.
449 450 """
450 451 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
451 452 deltaheadersize = struct.calcsize(deltaheader)
452 453 version = '02'
453 454
454 455 def _deltaheader(self, headertuple, prevnode):
455 456 node, p1, p2, deltabase, cs = headertuple
456 457 flags = 0
457 458 return node, p1, p2, deltabase, cs, flags
458 459
459 460 class cg3unpacker(cg2unpacker):
460 461 """Unpacker for cg3 streams.
461 462
462 463 cg3 streams add support for exchanging treemanifests and revlog
463 464 flags. It adds the revlog flags to the delta header and an empty chunk
464 465 separating manifests and files.
465 466 """
466 467 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
467 468 deltaheadersize = struct.calcsize(deltaheader)
468 469 version = '03'
469 470 _grouplistcount = 2 # One list of manifests and one list of files
470 471
471 472 def _deltaheader(self, headertuple, prevnode):
472 473 node, p1, p2, deltabase, cs, flags = headertuple
473 474 return node, p1, p2, deltabase, cs, flags
474 475
475 476 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
476 477 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog,
477 478 numchanges)
478 479 for chunkdata in iter(self.filelogheader, {}):
479 480 # If we get here, there are directory manifests in the changegroup
480 481 d = chunkdata["filename"]
481 482 repo.ui.debug("adding %s revisions\n" % d)
482 483 dirlog = repo.manifest.dirlog(d)
483 484 if not dirlog.addgroup(self, revmap, trp):
484 485 raise error.Abort(_("received dir revlog group is empty"))
485 486
486 487 class headerlessfixup(object):
487 488 def __init__(self, fh, h):
488 489 self._h = h
489 490 self._fh = fh
490 491 def read(self, n):
491 492 if self._h:
492 493 d, self._h = self._h[:n], self._h[n:]
493 494 if len(d) < n:
494 495 d += readexactly(self._fh, n - len(d))
495 496 return d
496 497 return readexactly(self._fh, n)
497 498
498 499 class cg1packer(object):
499 500 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
500 501 version = '01'
501 502 def __init__(self, repo, bundlecaps=None):
502 503 """Given a source repo, construct a bundler.
503 504
504 505 bundlecaps is optional and can be used to specify the set of
505 506 capabilities which can be used to build the bundle.
506 507 """
507 508 # Set of capabilities we can use to build the bundle.
508 509 if bundlecaps is None:
509 510 bundlecaps = set()
510 511 self._bundlecaps = bundlecaps
511 512 # experimental config: bundle.reorder
512 513 reorder = repo.ui.config('bundle', 'reorder', 'auto')
513 514 if reorder == 'auto':
514 515 reorder = None
515 516 else:
516 517 reorder = util.parsebool(reorder)
517 518 self._repo = repo
518 519 self._reorder = reorder
519 520 self._progress = repo.ui.progress
520 521 if self._repo.ui.verbose and not self._repo.ui.debugflag:
521 522 self._verbosenote = self._repo.ui.note
522 523 else:
523 524 self._verbosenote = lambda s: None
524 525
525 526 def close(self):
526 527 return closechunk()
527 528
528 529 def fileheader(self, fname):
529 530 return chunkheader(len(fname)) + fname
530 531
531 532 # Extracted both for clarity and for overriding in extensions.
532 533 def _sortgroup(self, revlog, nodelist, lookup):
533 534 """Sort nodes for change group and turn them into revnums."""
534 535 # for generaldelta revlogs, we linearize the revs; this will both be
535 536 # much quicker and generate a much smaller bundle
536 537 if (revlog._generaldelta and self._reorder is None) or self._reorder:
537 538 dag = dagutil.revlogdag(revlog)
538 539 return dag.linearize(set(revlog.rev(n) for n in nodelist))
539 540 else:
540 541 return sorted([revlog.rev(n) for n in nodelist])
541 542
542 543 def group(self, nodelist, revlog, lookup, units=None):
543 544 """Calculate a delta group, yielding a sequence of changegroup chunks
544 545 (strings).
545 546
546 547 Given a list of changeset revs, return a set of deltas and
547 548 metadata corresponding to nodes. The first delta is
548 549 first parent(nodelist[0]) -> nodelist[0], the receiver is
549 550 guaranteed to have this parent as it has all history before
550 551 these changesets. In the case firstparent is nullrev the
551 552 changegroup starts with a full revision.
552 553
553 554 If units is not None, progress detail will be generated, units specifies
554 555 the type of revlog that is touched (changelog, manifest, etc.).
555 556 """
556 557 # if we don't have any revisions touched by these changesets, bail
557 558 if len(nodelist) == 0:
558 559 yield self.close()
559 560 return
560 561
561 562 revs = self._sortgroup(revlog, nodelist, lookup)
562 563
563 564 # add the parent of the first rev
564 565 p = revlog.parentrevs(revs[0])[0]
565 566 revs.insert(0, p)
566 567
567 568 # build deltas
568 569 total = len(revs) - 1
569 570 msgbundling = _('bundling')
570 571 for r in xrange(len(revs) - 1):
571 572 if units is not None:
572 573 self._progress(msgbundling, r + 1, unit=units, total=total)
573 574 prev, curr = revs[r], revs[r + 1]
574 575 linknode = lookup(revlog.node(curr))
575 576 for c in self.revchunk(revlog, curr, prev, linknode):
576 577 yield c
577 578
578 579 if units is not None:
579 580 self._progress(msgbundling, None)
580 581 yield self.close()
581 582
582 583 # filter any nodes that claim to be part of the known set
583 584 def prune(self, revlog, missing, commonrevs):
584 585 rr, rl = revlog.rev, revlog.linkrev
585 586 return [n for n in missing if rl(rr(n)) not in commonrevs]
586 587
587 588 def _packmanifests(self, dir, mfnodes, lookuplinknode):
588 589 """Pack flat manifests into a changegroup stream."""
589 590 assert not dir
590 591 for chunk in self.group(mfnodes, self._repo.manifest,
591 592 lookuplinknode, units=_('manifests')):
592 593 yield chunk
593 594
594 595 def _manifestsdone(self):
595 596 return ''
596 597
597 598 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
598 599 '''yield a sequence of changegroup chunks (strings)'''
599 600 repo = self._repo
600 601 cl = repo.changelog
601 602
602 603 clrevorder = {}
603 604 mfs = {} # needed manifests
604 605 fnodes = {} # needed file nodes
605 606 changedfiles = set()
606 607
607 608 # Callback for the changelog, used to collect changed files and manifest
608 609 # nodes.
609 610 # Returns the linkrev node (identity in the changelog case).
610 611 def lookupcl(x):
611 612 c = cl.read(x)
612 613 clrevorder[x] = len(clrevorder)
613 614 n = c[0]
614 615 # record the first changeset introducing this manifest version
615 616 mfs.setdefault(n, x)
616 617 # Record a complete list of potentially-changed files in
617 618 # this manifest.
618 619 changedfiles.update(c[3])
619 620 return x
620 621
621 622 self._verbosenote(_('uncompressed size of bundle content:\n'))
622 623 size = 0
623 624 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
624 625 size += len(chunk)
625 626 yield chunk
626 627 self._verbosenote(_('%8.i (changelog)\n') % size)
627 628
628 629 # We need to make sure that the linkrev in the changegroup refers to
629 630 # the first changeset that introduced the manifest or file revision.
630 631 # The fastpath is usually safer than the slowpath, because the filelogs
631 632 # are walked in revlog order.
632 633 #
633 634 # When taking the slowpath with reorder=None and the manifest revlog
634 635 # uses generaldelta, the manifest may be walked in the "wrong" order.
635 636 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
636 637 # cc0ff93d0c0c).
637 638 #
638 639 # When taking the fastpath, we are only vulnerable to reordering
639 640 # of the changelog itself. The changelog never uses generaldelta, so
640 641 # it is only reordered when reorder=True. To handle this case, we
641 642 # simply take the slowpath, which already has the 'clrevorder' logic.
642 643 # This was also fixed in cc0ff93d0c0c.
643 644 fastpathlinkrev = fastpathlinkrev and not self._reorder
644 645 # Treemanifests don't work correctly with fastpathlinkrev
645 646 # either, because we don't discover which directory nodes to
646 647 # send along with files. This could probably be fixed.
647 648 fastpathlinkrev = fastpathlinkrev and (
648 649 'treemanifest' not in repo.requirements)
649 650
650 651 for chunk in self.generatemanifests(commonrevs, clrevorder,
651 652 fastpathlinkrev, mfs, fnodes):
652 653 yield chunk
653 654 mfs.clear()
654 655 clrevs = set(cl.rev(x) for x in clnodes)
655 656
656 657 if not fastpathlinkrev:
657 658 def linknodes(unused, fname):
658 659 return fnodes.get(fname, {})
659 660 else:
660 661 cln = cl.node
661 662 def linknodes(filerevlog, fname):
662 663 llr = filerevlog.linkrev
663 664 fln = filerevlog.node
664 665 revs = ((r, llr(r)) for r in filerevlog)
665 666 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
666 667
667 668 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
668 669 source):
669 670 yield chunk
670 671
671 672 yield self.close()
672 673
673 674 if clnodes:
674 675 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
675 676
676 677 def generatemanifests(self, commonrevs, clrevorder, fastpathlinkrev, mfs,
677 678 fnodes):
678 679 repo = self._repo
679 680 dirlog = repo.manifest.dirlog
680 681 tmfnodes = {'': mfs}
681 682
682 683 # Callback for the manifest, used to collect linkrevs for filelog
683 684 # revisions.
684 685 # Returns the linkrev node (collected in lookupcl).
685 686 def makelookupmflinknode(dir):
686 687 if fastpathlinkrev:
687 688 assert not dir
688 689 return mfs.__getitem__
689 690
690 691 def lookupmflinknode(x):
691 692 """Callback for looking up the linknode for manifests.
692 693
693 694 Returns the linkrev node for the specified manifest.
694 695
695 696 SIDE EFFECT:
696 697
697 698 1) fclnodes gets populated with the list of relevant
698 699 file nodes if we're not using fastpathlinkrev
699 700 2) When treemanifests are in use, collects treemanifest nodes
700 701 to send
701 702
702 703 Note that this means manifests must be completely sent to
703 704 the client before you can trust the list of files and
704 705 treemanifests to send.
705 706 """
706 707 clnode = tmfnodes[dir][x]
707 708 mdata = dirlog(dir).readshallowfast(x)
708 709 for p, n, fl in mdata.iterentries():
709 710 if fl == 't': # subdirectory manifest
710 711 subdir = dir + p + '/'
711 712 tmfclnodes = tmfnodes.setdefault(subdir, {})
712 713 tmfclnode = tmfclnodes.setdefault(n, clnode)
713 714 if clrevorder[clnode] < clrevorder[tmfclnode]:
714 715 tmfclnodes[n] = clnode
715 716 else:
716 717 f = dir + p
717 718 fclnodes = fnodes.setdefault(f, {})
718 719 fclnode = fclnodes.setdefault(n, clnode)
719 720 if clrevorder[clnode] < clrevorder[fclnode]:
720 721 fclnodes[n] = clnode
721 722 return clnode
722 723 return lookupmflinknode
723 724
724 725 size = 0
725 726 while tmfnodes:
726 727 dir = min(tmfnodes)
727 728 nodes = tmfnodes[dir]
728 729 prunednodes = self.prune(dirlog(dir), nodes, commonrevs)
729 730 if not dir or prunednodes:
730 731 for x in self._packmanifests(dir, prunednodes,
731 732 makelookupmflinknode(dir)):
732 733 size += len(x)
733 734 yield x
734 735 del tmfnodes[dir]
735 736 self._verbosenote(_('%8.i (manifests)\n') % size)
736 737 yield self._manifestsdone()
737 738
738 739 # The 'source' parameter is useful for extensions
739 740 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
740 741 repo = self._repo
741 742 progress = self._progress
742 743 msgbundling = _('bundling')
743 744
744 745 total = len(changedfiles)
745 746 # for progress output
746 747 msgfiles = _('files')
747 748 for i, fname in enumerate(sorted(changedfiles)):
748 749 filerevlog = repo.file(fname)
749 750 if not filerevlog:
750 751 raise error.Abort(_("empty or missing revlog for %s") % fname)
751 752
752 753 linkrevnodes = linknodes(filerevlog, fname)
753 754 # Lookup for filenodes, we collected the linkrev nodes above in the
754 755 # fastpath case and with lookupmf in the slowpath case.
755 756 def lookupfilelog(x):
756 757 return linkrevnodes[x]
757 758
758 759 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
759 760 if filenodes:
760 761 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
761 762 total=total)
762 763 h = self.fileheader(fname)
763 764 size = len(h)
764 765 yield h
765 766 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
766 767 size += len(chunk)
767 768 yield chunk
768 769 self._verbosenote(_('%8.i %s\n') % (size, fname))
769 770 progress(msgbundling, None)
770 771
771 772 def deltaparent(self, revlog, rev, p1, p2, prev):
772 773 return prev
773 774
774 775 def revchunk(self, revlog, rev, prev, linknode):
775 776 node = revlog.node(rev)
776 777 p1, p2 = revlog.parentrevs(rev)
777 778 base = self.deltaparent(revlog, rev, p1, p2, prev)
778 779
779 780 prefix = ''
780 781 if revlog.iscensored(base) or revlog.iscensored(rev):
781 782 try:
782 783 delta = revlog.revision(node)
783 784 except error.CensoredNodeError as e:
784 785 delta = e.tombstone
785 786 if base == nullrev:
786 787 prefix = mdiff.trivialdiffheader(len(delta))
787 788 else:
788 789 baselen = revlog.rawsize(base)
789 790 prefix = mdiff.replacediffheader(baselen, len(delta))
790 791 elif base == nullrev:
791 792 delta = revlog.revision(node)
792 793 prefix = mdiff.trivialdiffheader(len(delta))
793 794 else:
794 795 delta = revlog.revdiff(base, rev)
795 796 p1n, p2n = revlog.parents(node)
796 797 basenode = revlog.node(base)
797 798 flags = revlog.flags(rev)
798 799 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
799 800 meta += prefix
800 801 l = len(meta) + len(delta)
801 802 yield chunkheader(l)
802 803 yield meta
803 804 yield delta
804 805 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
805 806 # do nothing with basenode, it is implicitly the previous one in HG10
806 807 # do nothing with flags, it is implicitly 0 for cg1 and cg2
807 808 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
808 809
809 810 class cg2packer(cg1packer):
810 811 version = '02'
811 812 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
812 813
813 814 def __init__(self, repo, bundlecaps=None):
814 815 super(cg2packer, self).__init__(repo, bundlecaps)
815 816 if self._reorder is None:
816 817 # Since generaldelta is directly supported by cg2, reordering
817 818 # generally doesn't help, so we disable it by default (treating
818 819 # bundle.reorder=auto just like bundle.reorder=False).
819 820 self._reorder = False
820 821
821 822 def deltaparent(self, revlog, rev, p1, p2, prev):
822 823 dp = revlog.deltaparent(rev)
823 824 if dp == nullrev and revlog.storedeltachains:
824 825 # Avoid sending full revisions when delta parent is null. Pick prev
825 826 # in that case. It's tempting to pick p1 in this case, as p1 will
826 827 # be smaller in the common case. However, computing a delta against
827 828 # p1 may require resolving the raw text of p1, which could be
828 829 # expensive. The revlog caches should have prev cached, meaning
829 830 # less CPU for changegroup generation. There is likely room to add
830 831 # a flag and/or config option to control this behavior.
831 832 return prev
832 833 elif dp == nullrev:
833 834 # revlog is configured to use full snapshot for a reason,
834 835 # stick to full snapshot.
835 836 return nullrev
836 837 elif dp not in (p1, p2, prev):
837 838 # Pick prev when we can't be sure remote has the base revision.
838 839 return prev
839 840 else:
840 841 return dp
841 842
842 843 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
843 844 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
844 845 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
845 846
846 847 class cg3packer(cg2packer):
847 848 version = '03'
848 849 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
849 850
850 851 def _packmanifests(self, dir, mfnodes, lookuplinknode):
851 852 if dir:
852 853 yield self.fileheader(dir)
853 854 for chunk in self.group(mfnodes, self._repo.manifest.dirlog(dir),
854 855 lookuplinknode, units=_('manifests')):
855 856 yield chunk
856 857
857 858 def _manifestsdone(self):
858 859 return self.close()
859 860
860 861 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
861 862 return struct.pack(
862 863 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
863 864
864 865 _packermap = {'01': (cg1packer, cg1unpacker),
865 866 # cg2 adds support for exchanging generaldelta
866 867 '02': (cg2packer, cg2unpacker),
867 868 # cg3 adds support for exchanging revlog flags and treemanifests
868 869 '03': (cg3packer, cg3unpacker),
869 870 }
870 871
871 872 def allsupportedversions(ui):
872 873 versions = set(_packermap.keys())
873 874 versions.discard('03')
874 875 if (ui.configbool('experimental', 'changegroup3') or
875 876 ui.configbool('experimental', 'treemanifest')):
876 877 versions.add('03')
877 878 return versions
878 879
879 880 # Changegroup versions that can be applied to the repo
880 881 def supportedincomingversions(repo):
881 882 versions = allsupportedversions(repo.ui)
882 883 if 'treemanifest' in repo.requirements:
883 884 versions.add('03')
884 885 return versions
885 886
886 887 # Changegroup versions that can be created from the repo
887 888 def supportedoutgoingversions(repo):
888 889 versions = allsupportedversions(repo.ui)
889 890 if 'treemanifest' in repo.requirements:
890 891 # Versions 01 and 02 support only flat manifests and it's just too
891 892 # expensive to convert between the flat manifest and tree manifest on
892 893 # the fly. Since tree manifests are hashed differently, all of history
893 894 # would have to be converted. Instead, we simply don't even pretend to
894 895 # support versions 01 and 02.
895 896 versions.discard('01')
896 897 versions.discard('02')
897 898 versions.add('03')
898 899 return versions
899 900
900 901 def safeversion(repo):
901 902 # Finds the smallest version that it's safe to assume clients of the repo
902 903 # will support. For example, all hg versions that support generaldelta also
903 904 # support changegroup 02.
904 905 versions = supportedoutgoingversions(repo)
905 906 if 'generaldelta' in repo.requirements:
906 907 versions.discard('01')
907 908 assert versions
908 909 return min(versions)
909 910
910 911 def getbundler(version, repo, bundlecaps=None):
911 912 assert version in supportedoutgoingversions(repo)
912 913 return _packermap[version][0](repo, bundlecaps)
913 914
914 915 def getunbundler(version, fh, alg, extras=None):
915 916 return _packermap[version][1](fh, alg, extras=extras)
916 917
917 918 def _changegroupinfo(repo, nodes, source):
918 919 if repo.ui.verbose or source == 'bundle':
919 920 repo.ui.status(_("%d changesets found\n") % len(nodes))
920 921 if repo.ui.debugflag:
921 922 repo.ui.debug("list of changesets:\n")
922 923 for node in nodes:
923 924 repo.ui.debug("%s\n" % hex(node))
924 925
925 926 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
926 927 repo = repo.unfiltered()
927 928 commonrevs = outgoing.common
928 929 csets = outgoing.missing
929 930 heads = outgoing.missingheads
930 931 # We go through the fast path if we get told to, or if all (unfiltered
931 932 # heads have been requested (since we then know there all linkrevs will
932 933 # be pulled by the client).
933 934 heads.sort()
934 935 fastpathlinkrev = fastpath or (
935 936 repo.filtername is None and heads == sorted(repo.heads()))
936 937
937 938 repo.hook('preoutgoing', throw=True, source=source)
938 939 _changegroupinfo(repo, csets, source)
939 940 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
940 941
941 942 def getsubset(repo, outgoing, bundler, source, fastpath=False):
942 943 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
943 944 return getunbundler(bundler.version, util.chunkbuffer(gengroup), None,
944 945 {'clcount': len(outgoing.missing)})
945 946
946 947 def changegroupsubset(repo, roots, heads, source, version='01'):
947 948 """Compute a changegroup consisting of all the nodes that are
948 949 descendants of any of the roots and ancestors of any of the heads.
949 950 Return a chunkbuffer object whose read() method will return
950 951 successive changegroup chunks.
951 952
952 953 It is fairly complex as determining which filenodes and which
953 954 manifest nodes need to be included for the changeset to be complete
954 955 is non-trivial.
955 956
956 957 Another wrinkle is doing the reverse, figuring out which changeset in
957 958 the changegroup a particular filenode or manifestnode belongs to.
958 959 """
959 960 outgoing = discovery.outgoing(repo, missingroots=roots, missingheads=heads)
960 961 bundler = getbundler(version, repo)
961 962 return getsubset(repo, outgoing, bundler, source)
962 963
963 964 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
964 965 version='01'):
965 966 """Like getbundle, but taking a discovery.outgoing as an argument.
966 967
967 968 This is only implemented for local repos and reuses potentially
968 969 precomputed sets in outgoing. Returns a raw changegroup generator."""
969 970 if not outgoing.missing:
970 971 return None
971 972 bundler = getbundler(version, repo, bundlecaps)
972 973 return getsubsetraw(repo, outgoing, bundler, source)
973 974
974 975 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None,
975 976 version='01'):
976 977 """Like getbundle, but taking a discovery.outgoing as an argument.
977 978
978 979 This is only implemented for local repos and reuses potentially
979 980 precomputed sets in outgoing."""
980 981 if not outgoing.missing:
981 982 return None
982 983 bundler = getbundler(version, repo, bundlecaps)
983 984 return getsubset(repo, outgoing, bundler, source)
984 985
985 986 def getchangegroup(repo, source, outgoing, bundlecaps=None,
986 987 version='01'):
987 988 """Like changegroupsubset, but returns the set difference between the
988 989 ancestors of heads and the ancestors common.
989 990
990 991 If heads is None, use the local heads. If common is None, use [nullid].
991 992
992 993 The nodes in common might not all be known locally due to the way the
993 994 current discovery protocol works.
994 995 """
995 996 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps,
996 997 version=version)
997 998
998 999 def changegroup(repo, basenodes, source):
999 1000 # to avoid a race we use changegroupsubset() (issue1320)
1000 1001 return changegroupsubset(repo, basenodes, repo.heads(), source)
1001 1002
1002 1003 def _addchangegroupfiles(repo, source, revmap, trp, expectedfiles, needfiles):
1003 1004 revisions = 0
1004 1005 files = 0
1005 1006 for chunkdata in iter(source.filelogheader, {}):
1006 1007 files += 1
1007 1008 f = chunkdata["filename"]
1008 1009 repo.ui.debug("adding %s revisions\n" % f)
1009 1010 repo.ui.progress(_('files'), files, unit=_('files'),
1010 1011 total=expectedfiles)
1011 1012 fl = repo.file(f)
1012 1013 o = len(fl)
1013 1014 try:
1014 1015 if not fl.addgroup(source, revmap, trp):
1015 1016 raise error.Abort(_("received file revlog group is empty"))
1016 1017 except error.CensoredBaseError as e:
1017 1018 raise error.Abort(_("received delta base is censored: %s") % e)
1018 1019 revisions += len(fl) - o
1019 1020 if f in needfiles:
1020 1021 needs = needfiles[f]
1021 1022 for new in xrange(o, len(fl)):
1022 1023 n = fl.node(new)
1023 1024 if n in needs:
1024 1025 needs.remove(n)
1025 1026 else:
1026 1027 raise error.Abort(
1027 1028 _("received spurious file revlog entry"))
1028 1029 if not needs:
1029 1030 del needfiles[f]
1030 1031 repo.ui.progress(_('files'), None)
1031 1032
1032 1033 for f, needs in needfiles.iteritems():
1033 1034 fl = repo.file(f)
1034 1035 for n in needs:
1035 1036 try:
1036 1037 fl.rev(n)
1037 1038 except error.LookupError:
1038 1039 raise error.Abort(
1039 1040 _('missing file data for %s:%s - run hg verify') %
1040 1041 (f, hex(n)))
1041 1042
1042 1043 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now