##// END OF EJS Templates
changegroup: increase write buffer size to 128k...
Gregory Szorc -
r30212:260af198 default
parent child Browse files
Show More
@@ -1,1040 +1,1042 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullrev,
19 19 short,
20 20 )
21 21
22 22 from . import (
23 23 branchmap,
24 24 dagutil,
25 25 discovery,
26 26 error,
27 27 mdiff,
28 28 phases,
29 29 util,
30 30 )
31 31
32 32 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
33 33 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
34 34 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
35 35
36 36 def readexactly(stream, n):
37 37 '''read n bytes from stream.read and abort if less was available'''
38 38 s = stream.read(n)
39 39 if len(s) < n:
40 40 raise error.Abort(_("stream ended unexpectedly"
41 41 " (got %d bytes, expected %d)")
42 42 % (len(s), n))
43 43 return s
44 44
45 45 def getchunk(stream):
46 46 """return the next chunk from stream as a string"""
47 47 d = readexactly(stream, 4)
48 48 l = struct.unpack(">l", d)[0]
49 49 if l <= 4:
50 50 if l:
51 51 raise error.Abort(_("invalid chunk length %d") % l)
52 52 return ""
53 53 return readexactly(stream, l - 4)
54 54
55 55 def chunkheader(length):
56 56 """return a changegroup chunk header (string)"""
57 57 return struct.pack(">l", length + 4)
58 58
59 59 def closechunk():
60 60 """return a changegroup chunk header (string) for a zero-length chunk"""
61 61 return struct.pack(">l", 0)
62 62
63 63 def combineresults(results):
64 64 """logic to combine 0 or more addchangegroup results into one"""
65 65 changedheads = 0
66 66 result = 1
67 67 for ret in results:
68 68 # If any changegroup result is 0, return 0
69 69 if ret == 0:
70 70 result = 0
71 71 break
72 72 if ret < -1:
73 73 changedheads += ret + 1
74 74 elif ret > 1:
75 75 changedheads += ret - 1
76 76 if changedheads > 0:
77 77 result = 1 + changedheads
78 78 elif changedheads < 0:
79 79 result = -1 + changedheads
80 80 return result
81 81
82 82 def writechunks(ui, chunks, filename, vfs=None):
83 83 """Write chunks to a file and return its filename.
84 84
85 85 The stream is assumed to be a bundle file.
86 86 Existing files will not be overwritten.
87 87 If no filename is specified, a temporary file is created.
88 88 """
89 89 fh = None
90 90 cleanup = None
91 91 try:
92 92 if filename:
93 93 if vfs:
94 94 fh = vfs.open(filename, "wb")
95 95 else:
96 fh = open(filename, "wb")
96 # Increase default buffer size because default is usually
97 # small (4k is common on Linux).
98 fh = open(filename, "wb", 131072)
97 99 else:
98 100 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
99 101 fh = os.fdopen(fd, "wb")
100 102 cleanup = filename
101 103 for c in chunks:
102 104 fh.write(c)
103 105 cleanup = None
104 106 return filename
105 107 finally:
106 108 if fh is not None:
107 109 fh.close()
108 110 if cleanup is not None:
109 111 if filename and vfs:
110 112 vfs.unlink(cleanup)
111 113 else:
112 114 os.unlink(cleanup)
113 115
114 116 class cg1unpacker(object):
115 117 """Unpacker for cg1 changegroup streams.
116 118
117 119 A changegroup unpacker handles the framing of the revision data in
118 120 the wire format. Most consumers will want to use the apply()
119 121 method to add the changes from the changegroup to a repository.
120 122
121 123 If you're forwarding a changegroup unmodified to another consumer,
122 124 use getchunks(), which returns an iterator of changegroup
123 125 chunks. This is mostly useful for cases where you need to know the
124 126 data stream has ended by observing the end of the changegroup.
125 127
126 128 deltachunk() is useful only if you're applying delta data. Most
127 129 consumers should prefer apply() instead.
128 130
129 131 A few other public methods exist. Those are used only for
130 132 bundlerepo and some debug commands - their use is discouraged.
131 133 """
132 134 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
133 135 deltaheadersize = struct.calcsize(deltaheader)
134 136 version = '01'
135 137 _grouplistcount = 1 # One list of files after the manifests
136 138
137 139 def __init__(self, fh, alg, extras=None):
138 140 if alg == 'UN':
139 141 alg = None # get more modern without breaking too much
140 142 if not alg in util.decompressors:
141 143 raise error.Abort(_('unknown stream compression type: %s')
142 144 % alg)
143 145 if alg == 'BZ':
144 146 alg = '_truncatedBZ'
145 147 self._stream = util.decompressors[alg](fh)
146 148 self._type = alg
147 149 self.extras = extras or {}
148 150 self.callback = None
149 151
150 152 # These methods (compressed, read, seek, tell) all appear to only
151 153 # be used by bundlerepo, but it's a little hard to tell.
152 154 def compressed(self):
153 155 return self._type is not None
154 156 def read(self, l):
155 157 return self._stream.read(l)
156 158 def seek(self, pos):
157 159 return self._stream.seek(pos)
158 160 def tell(self):
159 161 return self._stream.tell()
160 162 def close(self):
161 163 return self._stream.close()
162 164
163 165 def _chunklength(self):
164 166 d = readexactly(self._stream, 4)
165 167 l = struct.unpack(">l", d)[0]
166 168 if l <= 4:
167 169 if l:
168 170 raise error.Abort(_("invalid chunk length %d") % l)
169 171 return 0
170 172 if self.callback:
171 173 self.callback()
172 174 return l - 4
173 175
174 176 def changelogheader(self):
175 177 """v10 does not have a changelog header chunk"""
176 178 return {}
177 179
178 180 def manifestheader(self):
179 181 """v10 does not have a manifest header chunk"""
180 182 return {}
181 183
182 184 def filelogheader(self):
183 185 """return the header of the filelogs chunk, v10 only has the filename"""
184 186 l = self._chunklength()
185 187 if not l:
186 188 return {}
187 189 fname = readexactly(self._stream, l)
188 190 return {'filename': fname}
189 191
190 192 def _deltaheader(self, headertuple, prevnode):
191 193 node, p1, p2, cs = headertuple
192 194 if prevnode is None:
193 195 deltabase = p1
194 196 else:
195 197 deltabase = prevnode
196 198 flags = 0
197 199 return node, p1, p2, deltabase, cs, flags
198 200
199 201 def deltachunk(self, prevnode):
200 202 l = self._chunklength()
201 203 if not l:
202 204 return {}
203 205 headerdata = readexactly(self._stream, self.deltaheadersize)
204 206 header = struct.unpack(self.deltaheader, headerdata)
205 207 delta = readexactly(self._stream, l - self.deltaheadersize)
206 208 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
207 209 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
208 210 'deltabase': deltabase, 'delta': delta, 'flags': flags}
209 211
210 212 def getchunks(self):
211 213 """returns all the chunks contains in the bundle
212 214
213 215 Used when you need to forward the binary stream to a file or another
214 216 network API. To do so, it parse the changegroup data, otherwise it will
215 217 block in case of sshrepo because it don't know the end of the stream.
216 218 """
217 219 # an empty chunkgroup is the end of the changegroup
218 220 # a changegroup has at least 2 chunkgroups (changelog and manifest).
219 221 # after that, changegroup versions 1 and 2 have a series of groups
220 222 # with one group per file. changegroup 3 has a series of directory
221 223 # manifests before the files.
222 224 count = 0
223 225 emptycount = 0
224 226 while emptycount < self._grouplistcount:
225 227 empty = True
226 228 count += 1
227 229 while True:
228 230 chunk = getchunk(self)
229 231 if not chunk:
230 232 if empty and count > 2:
231 233 emptycount += 1
232 234 break
233 235 empty = False
234 236 yield chunkheader(len(chunk))
235 237 pos = 0
236 238 while pos < len(chunk):
237 239 next = pos + 2**20
238 240 yield chunk[pos:next]
239 241 pos = next
240 242 yield closechunk()
241 243
242 244 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
243 245 # We know that we'll never have more manifests than we had
244 246 # changesets.
245 247 self.callback = prog(_('manifests'), numchanges)
246 248 # no need to check for empty manifest group here:
247 249 # if the result of the merge of 1 and 2 is the same in 3 and 4,
248 250 # no new manifest will be created and the manifest group will
249 251 # be empty during the pull
250 252 self.manifestheader()
251 253 repo.manifest.addgroup(self, revmap, trp)
252 254 repo.ui.progress(_('manifests'), None)
253 255 self.callback = None
254 256
255 257 def apply(self, repo, srctype, url, emptyok=False,
256 258 targetphase=phases.draft, expectedtotal=None):
257 259 """Add the changegroup returned by source.read() to this repo.
258 260 srctype is a string like 'push', 'pull', or 'unbundle'. url is
259 261 the URL of the repo where this changegroup is coming from.
260 262
261 263 Return an integer summarizing the change to this repo:
262 264 - nothing changed or no source: 0
263 265 - more heads than before: 1+added heads (2..n)
264 266 - fewer heads than before: -1-removed heads (-2..-n)
265 267 - number of heads stays the same: 1
266 268 """
267 269 repo = repo.unfiltered()
268 270 def csmap(x):
269 271 repo.ui.debug("add changeset %s\n" % short(x))
270 272 return len(cl)
271 273
272 274 def revmap(x):
273 275 return cl.rev(x)
274 276
275 277 changesets = files = revisions = 0
276 278
277 279 try:
278 280 with repo.transaction("\n".join([srctype,
279 281 util.hidepassword(url)])) as tr:
280 282 # The transaction could have been created before and already
281 283 # carries source information. In this case we use the top
282 284 # level data. We overwrite the argument because we need to use
283 285 # the top level value (if they exist) in this function.
284 286 srctype = tr.hookargs.setdefault('source', srctype)
285 287 url = tr.hookargs.setdefault('url', url)
286 288 repo.hook('prechangegroup', throw=True, **tr.hookargs)
287 289
288 290 # write changelog data to temp files so concurrent readers
289 291 # will not see an inconsistent view
290 292 cl = repo.changelog
291 293 cl.delayupdate(tr)
292 294 oldheads = cl.heads()
293 295
294 296 trp = weakref.proxy(tr)
295 297 # pull off the changeset group
296 298 repo.ui.status(_("adding changesets\n"))
297 299 clstart = len(cl)
298 300 class prog(object):
299 301 def __init__(self, step, total):
300 302 self._step = step
301 303 self._total = total
302 304 self._count = 1
303 305 def __call__(self):
304 306 repo.ui.progress(self._step, self._count,
305 307 unit=_('chunks'), total=self._total)
306 308 self._count += 1
307 309 self.callback = prog(_('changesets'), expectedtotal)
308 310
309 311 efiles = set()
310 312 def onchangelog(cl, node):
311 313 efiles.update(cl.readfiles(node))
312 314
313 315 self.changelogheader()
314 316 srccontent = cl.addgroup(self, csmap, trp,
315 317 addrevisioncb=onchangelog)
316 318 efiles = len(efiles)
317 319
318 320 if not (srccontent or emptyok):
319 321 raise error.Abort(_("received changelog group is empty"))
320 322 clend = len(cl)
321 323 changesets = clend - clstart
322 324 repo.ui.progress(_('changesets'), None)
323 325 self.callback = None
324 326
325 327 # pull off the manifest group
326 328 repo.ui.status(_("adding manifests\n"))
327 329 self._unpackmanifests(repo, revmap, trp, prog, changesets)
328 330
329 331 needfiles = {}
330 332 if repo.ui.configbool('server', 'validate', default=False):
331 333 # validate incoming csets have their manifests
332 334 for cset in xrange(clstart, clend):
333 335 mfnode = repo.changelog.read(
334 336 repo.changelog.node(cset))[0]
335 337 mfest = repo.manifestlog[mfnode].readdelta()
336 338 # store file nodes we must see
337 339 for f, n in mfest.iteritems():
338 340 needfiles.setdefault(f, set()).add(n)
339 341
340 342 # process the files
341 343 repo.ui.status(_("adding file changes\n"))
342 344 newrevs, newfiles = _addchangegroupfiles(
343 345 repo, self, revmap, trp, efiles, needfiles)
344 346 revisions += newrevs
345 347 files += newfiles
346 348
347 349 dh = 0
348 350 if oldheads:
349 351 heads = cl.heads()
350 352 dh = len(heads) - len(oldheads)
351 353 for h in heads:
352 354 if h not in oldheads and repo[h].closesbranch():
353 355 dh -= 1
354 356 htext = ""
355 357 if dh:
356 358 htext = _(" (%+d heads)") % dh
357 359
358 360 repo.ui.status(_("added %d changesets"
359 361 " with %d changes to %d files%s\n")
360 362 % (changesets, revisions, files, htext))
361 363 repo.invalidatevolatilesets()
362 364
363 365 if changesets > 0:
364 366 if 'node' not in tr.hookargs:
365 367 tr.hookargs['node'] = hex(cl.node(clstart))
366 368 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
367 369 hookargs = dict(tr.hookargs)
368 370 else:
369 371 hookargs = dict(tr.hookargs)
370 372 hookargs['node'] = hex(cl.node(clstart))
371 373 hookargs['node_last'] = hex(cl.node(clend - 1))
372 374 repo.hook('pretxnchangegroup', throw=True, **hookargs)
373 375
374 376 added = [cl.node(r) for r in xrange(clstart, clend)]
375 377 publishing = repo.publishing()
376 378 if srctype in ('push', 'serve'):
377 379 # Old servers can not push the boundary themselves.
378 380 # New servers won't push the boundary if changeset already
379 381 # exists locally as secret
380 382 #
381 383 # We should not use added here but the list of all change in
382 384 # the bundle
383 385 if publishing:
384 386 phases.advanceboundary(repo, tr, phases.public,
385 387 srccontent)
386 388 else:
387 389 # Those changesets have been pushed from the
388 390 # outside, their phases are going to be pushed
389 391 # alongside. Therefor `targetphase` is
390 392 # ignored.
391 393 phases.advanceboundary(repo, tr, phases.draft,
392 394 srccontent)
393 395 phases.retractboundary(repo, tr, phases.draft, added)
394 396 elif srctype != 'strip':
395 397 # publishing only alter behavior during push
396 398 #
397 399 # strip should not touch boundary at all
398 400 phases.retractboundary(repo, tr, targetphase, added)
399 401
400 402 if changesets > 0:
401 403 if srctype != 'strip':
402 404 # During strip, branchcache is invalid but
403 405 # coming call to `destroyed` will repair it.
404 406 # In other case we can safely update cache on
405 407 # disk.
406 408 repo.ui.debug('updating the branch cache\n')
407 409 branchmap.updatecache(repo.filtered('served'))
408 410
409 411 def runhooks():
410 412 # These hooks run when the lock releases, not when the
411 413 # transaction closes. So it's possible for the changelog
412 414 # to have changed since we last saw it.
413 415 if clstart >= len(repo):
414 416 return
415 417
416 418 repo.hook("changegroup", **hookargs)
417 419
418 420 for n in added:
419 421 args = hookargs.copy()
420 422 args['node'] = hex(n)
421 423 del args['node_last']
422 424 repo.hook("incoming", **args)
423 425
424 426 newheads = [h for h in repo.heads()
425 427 if h not in oldheads]
426 428 repo.ui.log("incoming",
427 429 "%s incoming changes - new heads: %s\n",
428 430 len(added),
429 431 ', '.join([hex(c[:6]) for c in newheads]))
430 432
431 433 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
432 434 lambda tr: repo._afterlock(runhooks))
433 435 finally:
434 436 repo.ui.flush()
435 437 # never return 0 here:
436 438 if dh < 0:
437 439 return dh - 1
438 440 else:
439 441 return dh + 1
440 442
441 443 class cg2unpacker(cg1unpacker):
442 444 """Unpacker for cg2 streams.
443 445
444 446 cg2 streams add support for generaldelta, so the delta header
445 447 format is slightly different. All other features about the data
446 448 remain the same.
447 449 """
448 450 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
449 451 deltaheadersize = struct.calcsize(deltaheader)
450 452 version = '02'
451 453
452 454 def _deltaheader(self, headertuple, prevnode):
453 455 node, p1, p2, deltabase, cs = headertuple
454 456 flags = 0
455 457 return node, p1, p2, deltabase, cs, flags
456 458
457 459 class cg3unpacker(cg2unpacker):
458 460 """Unpacker for cg3 streams.
459 461
460 462 cg3 streams add support for exchanging treemanifests and revlog
461 463 flags. It adds the revlog flags to the delta header and an empty chunk
462 464 separating manifests and files.
463 465 """
464 466 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
465 467 deltaheadersize = struct.calcsize(deltaheader)
466 468 version = '03'
467 469 _grouplistcount = 2 # One list of manifests and one list of files
468 470
469 471 def _deltaheader(self, headertuple, prevnode):
470 472 node, p1, p2, deltabase, cs, flags = headertuple
471 473 return node, p1, p2, deltabase, cs, flags
472 474
473 475 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
474 476 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog,
475 477 numchanges)
476 478 for chunkdata in iter(self.filelogheader, {}):
477 479 # If we get here, there are directory manifests in the changegroup
478 480 d = chunkdata["filename"]
479 481 repo.ui.debug("adding %s revisions\n" % d)
480 482 dirlog = repo.manifest.dirlog(d)
481 483 if not dirlog.addgroup(self, revmap, trp):
482 484 raise error.Abort(_("received dir revlog group is empty"))
483 485
484 486 class headerlessfixup(object):
485 487 def __init__(self, fh, h):
486 488 self._h = h
487 489 self._fh = fh
488 490 def read(self, n):
489 491 if self._h:
490 492 d, self._h = self._h[:n], self._h[n:]
491 493 if len(d) < n:
492 494 d += readexactly(self._fh, n - len(d))
493 495 return d
494 496 return readexactly(self._fh, n)
495 497
496 498 class cg1packer(object):
497 499 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
498 500 version = '01'
499 501 def __init__(self, repo, bundlecaps=None):
500 502 """Given a source repo, construct a bundler.
501 503
502 504 bundlecaps is optional and can be used to specify the set of
503 505 capabilities which can be used to build the bundle.
504 506 """
505 507 # Set of capabilities we can use to build the bundle.
506 508 if bundlecaps is None:
507 509 bundlecaps = set()
508 510 self._bundlecaps = bundlecaps
509 511 # experimental config: bundle.reorder
510 512 reorder = repo.ui.config('bundle', 'reorder', 'auto')
511 513 if reorder == 'auto':
512 514 reorder = None
513 515 else:
514 516 reorder = util.parsebool(reorder)
515 517 self._repo = repo
516 518 self._reorder = reorder
517 519 self._progress = repo.ui.progress
518 520 if self._repo.ui.verbose and not self._repo.ui.debugflag:
519 521 self._verbosenote = self._repo.ui.note
520 522 else:
521 523 self._verbosenote = lambda s: None
522 524
523 525 def close(self):
524 526 return closechunk()
525 527
526 528 def fileheader(self, fname):
527 529 return chunkheader(len(fname)) + fname
528 530
529 531 # Extracted both for clarity and for overriding in extensions.
530 532 def _sortgroup(self, revlog, nodelist, lookup):
531 533 """Sort nodes for change group and turn them into revnums."""
532 534 # for generaldelta revlogs, we linearize the revs; this will both be
533 535 # much quicker and generate a much smaller bundle
534 536 if (revlog._generaldelta and self._reorder is None) or self._reorder:
535 537 dag = dagutil.revlogdag(revlog)
536 538 return dag.linearize(set(revlog.rev(n) for n in nodelist))
537 539 else:
538 540 return sorted([revlog.rev(n) for n in nodelist])
539 541
540 542 def group(self, nodelist, revlog, lookup, units=None):
541 543 """Calculate a delta group, yielding a sequence of changegroup chunks
542 544 (strings).
543 545
544 546 Given a list of changeset revs, return a set of deltas and
545 547 metadata corresponding to nodes. The first delta is
546 548 first parent(nodelist[0]) -> nodelist[0], the receiver is
547 549 guaranteed to have this parent as it has all history before
548 550 these changesets. In the case firstparent is nullrev the
549 551 changegroup starts with a full revision.
550 552
551 553 If units is not None, progress detail will be generated, units specifies
552 554 the type of revlog that is touched (changelog, manifest, etc.).
553 555 """
554 556 # if we don't have any revisions touched by these changesets, bail
555 557 if len(nodelist) == 0:
556 558 yield self.close()
557 559 return
558 560
559 561 revs = self._sortgroup(revlog, nodelist, lookup)
560 562
561 563 # add the parent of the first rev
562 564 p = revlog.parentrevs(revs[0])[0]
563 565 revs.insert(0, p)
564 566
565 567 # build deltas
566 568 total = len(revs) - 1
567 569 msgbundling = _('bundling')
568 570 for r in xrange(len(revs) - 1):
569 571 if units is not None:
570 572 self._progress(msgbundling, r + 1, unit=units, total=total)
571 573 prev, curr = revs[r], revs[r + 1]
572 574 linknode = lookup(revlog.node(curr))
573 575 for c in self.revchunk(revlog, curr, prev, linknode):
574 576 yield c
575 577
576 578 if units is not None:
577 579 self._progress(msgbundling, None)
578 580 yield self.close()
579 581
580 582 # filter any nodes that claim to be part of the known set
581 583 def prune(self, revlog, missing, commonrevs):
582 584 rr, rl = revlog.rev, revlog.linkrev
583 585 return [n for n in missing if rl(rr(n)) not in commonrevs]
584 586
585 587 def _packmanifests(self, dir, mfnodes, lookuplinknode):
586 588 """Pack flat manifests into a changegroup stream."""
587 589 assert not dir
588 590 for chunk in self.group(mfnodes, self._repo.manifest,
589 591 lookuplinknode, units=_('manifests')):
590 592 yield chunk
591 593
592 594 def _manifestsdone(self):
593 595 return ''
594 596
595 597 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
596 598 '''yield a sequence of changegroup chunks (strings)'''
597 599 repo = self._repo
598 600 cl = repo.changelog
599 601
600 602 clrevorder = {}
601 603 mfs = {} # needed manifests
602 604 fnodes = {} # needed file nodes
603 605 changedfiles = set()
604 606
605 607 # Callback for the changelog, used to collect changed files and manifest
606 608 # nodes.
607 609 # Returns the linkrev node (identity in the changelog case).
608 610 def lookupcl(x):
609 611 c = cl.read(x)
610 612 clrevorder[x] = len(clrevorder)
611 613 n = c[0]
612 614 # record the first changeset introducing this manifest version
613 615 mfs.setdefault(n, x)
614 616 # Record a complete list of potentially-changed files in
615 617 # this manifest.
616 618 changedfiles.update(c[3])
617 619 return x
618 620
619 621 self._verbosenote(_('uncompressed size of bundle content:\n'))
620 622 size = 0
621 623 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
622 624 size += len(chunk)
623 625 yield chunk
624 626 self._verbosenote(_('%8.i (changelog)\n') % size)
625 627
626 628 # We need to make sure that the linkrev in the changegroup refers to
627 629 # the first changeset that introduced the manifest or file revision.
628 630 # The fastpath is usually safer than the slowpath, because the filelogs
629 631 # are walked in revlog order.
630 632 #
631 633 # When taking the slowpath with reorder=None and the manifest revlog
632 634 # uses generaldelta, the manifest may be walked in the "wrong" order.
633 635 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
634 636 # cc0ff93d0c0c).
635 637 #
636 638 # When taking the fastpath, we are only vulnerable to reordering
637 639 # of the changelog itself. The changelog never uses generaldelta, so
638 640 # it is only reordered when reorder=True. To handle this case, we
639 641 # simply take the slowpath, which already has the 'clrevorder' logic.
640 642 # This was also fixed in cc0ff93d0c0c.
641 643 fastpathlinkrev = fastpathlinkrev and not self._reorder
642 644 # Treemanifests don't work correctly with fastpathlinkrev
643 645 # either, because we don't discover which directory nodes to
644 646 # send along with files. This could probably be fixed.
645 647 fastpathlinkrev = fastpathlinkrev and (
646 648 'treemanifest' not in repo.requirements)
647 649
648 650 for chunk in self.generatemanifests(commonrevs, clrevorder,
649 651 fastpathlinkrev, mfs, fnodes):
650 652 yield chunk
651 653 mfs.clear()
652 654 clrevs = set(cl.rev(x) for x in clnodes)
653 655
654 656 if not fastpathlinkrev:
655 657 def linknodes(unused, fname):
656 658 return fnodes.get(fname, {})
657 659 else:
658 660 cln = cl.node
659 661 def linknodes(filerevlog, fname):
660 662 llr = filerevlog.linkrev
661 663 fln = filerevlog.node
662 664 revs = ((r, llr(r)) for r in filerevlog)
663 665 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
664 666
665 667 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
666 668 source):
667 669 yield chunk
668 670
669 671 yield self.close()
670 672
671 673 if clnodes:
672 674 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
673 675
674 676 def generatemanifests(self, commonrevs, clrevorder, fastpathlinkrev, mfs,
675 677 fnodes):
676 678 repo = self._repo
677 679 dirlog = repo.manifest.dirlog
678 680 tmfnodes = {'': mfs}
679 681
680 682 # Callback for the manifest, used to collect linkrevs for filelog
681 683 # revisions.
682 684 # Returns the linkrev node (collected in lookupcl).
683 685 def makelookupmflinknode(dir):
684 686 if fastpathlinkrev:
685 687 assert not dir
686 688 return mfs.__getitem__
687 689
688 690 def lookupmflinknode(x):
689 691 """Callback for looking up the linknode for manifests.
690 692
691 693 Returns the linkrev node for the specified manifest.
692 694
693 695 SIDE EFFECT:
694 696
695 697 1) fclnodes gets populated with the list of relevant
696 698 file nodes if we're not using fastpathlinkrev
697 699 2) When treemanifests are in use, collects treemanifest nodes
698 700 to send
699 701
700 702 Note that this means manifests must be completely sent to
701 703 the client before you can trust the list of files and
702 704 treemanifests to send.
703 705 """
704 706 clnode = tmfnodes[dir][x]
705 707 mdata = dirlog(dir).readshallowfast(x)
706 708 for p, n, fl in mdata.iterentries():
707 709 if fl == 't': # subdirectory manifest
708 710 subdir = dir + p + '/'
709 711 tmfclnodes = tmfnodes.setdefault(subdir, {})
710 712 tmfclnode = tmfclnodes.setdefault(n, clnode)
711 713 if clrevorder[clnode] < clrevorder[tmfclnode]:
712 714 tmfclnodes[n] = clnode
713 715 else:
714 716 f = dir + p
715 717 fclnodes = fnodes.setdefault(f, {})
716 718 fclnode = fclnodes.setdefault(n, clnode)
717 719 if clrevorder[clnode] < clrevorder[fclnode]:
718 720 fclnodes[n] = clnode
719 721 return clnode
720 722 return lookupmflinknode
721 723
722 724 size = 0
723 725 while tmfnodes:
724 726 dir = min(tmfnodes)
725 727 nodes = tmfnodes[dir]
726 728 prunednodes = self.prune(dirlog(dir), nodes, commonrevs)
727 729 if not dir or prunednodes:
728 730 for x in self._packmanifests(dir, prunednodes,
729 731 makelookupmflinknode(dir)):
730 732 size += len(x)
731 733 yield x
732 734 del tmfnodes[dir]
733 735 self._verbosenote(_('%8.i (manifests)\n') % size)
734 736 yield self._manifestsdone()
735 737
736 738 # The 'source' parameter is useful for extensions
737 739 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
738 740 repo = self._repo
739 741 progress = self._progress
740 742 msgbundling = _('bundling')
741 743
742 744 total = len(changedfiles)
743 745 # for progress output
744 746 msgfiles = _('files')
745 747 for i, fname in enumerate(sorted(changedfiles)):
746 748 filerevlog = repo.file(fname)
747 749 if not filerevlog:
748 750 raise error.Abort(_("empty or missing revlog for %s") % fname)
749 751
750 752 linkrevnodes = linknodes(filerevlog, fname)
751 753 # Lookup for filenodes, we collected the linkrev nodes above in the
752 754 # fastpath case and with lookupmf in the slowpath case.
753 755 def lookupfilelog(x):
754 756 return linkrevnodes[x]
755 757
756 758 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
757 759 if filenodes:
758 760 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
759 761 total=total)
760 762 h = self.fileheader(fname)
761 763 size = len(h)
762 764 yield h
763 765 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
764 766 size += len(chunk)
765 767 yield chunk
766 768 self._verbosenote(_('%8.i %s\n') % (size, fname))
767 769 progress(msgbundling, None)
768 770
769 771 def deltaparent(self, revlog, rev, p1, p2, prev):
770 772 return prev
771 773
772 774 def revchunk(self, revlog, rev, prev, linknode):
773 775 node = revlog.node(rev)
774 776 p1, p2 = revlog.parentrevs(rev)
775 777 base = self.deltaparent(revlog, rev, p1, p2, prev)
776 778
777 779 prefix = ''
778 780 if revlog.iscensored(base) or revlog.iscensored(rev):
779 781 try:
780 782 delta = revlog.revision(node)
781 783 except error.CensoredNodeError as e:
782 784 delta = e.tombstone
783 785 if base == nullrev:
784 786 prefix = mdiff.trivialdiffheader(len(delta))
785 787 else:
786 788 baselen = revlog.rawsize(base)
787 789 prefix = mdiff.replacediffheader(baselen, len(delta))
788 790 elif base == nullrev:
789 791 delta = revlog.revision(node)
790 792 prefix = mdiff.trivialdiffheader(len(delta))
791 793 else:
792 794 delta = revlog.revdiff(base, rev)
793 795 p1n, p2n = revlog.parents(node)
794 796 basenode = revlog.node(base)
795 797 flags = revlog.flags(rev)
796 798 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
797 799 meta += prefix
798 800 l = len(meta) + len(delta)
799 801 yield chunkheader(l)
800 802 yield meta
801 803 yield delta
802 804 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
803 805 # do nothing with basenode, it is implicitly the previous one in HG10
804 806 # do nothing with flags, it is implicitly 0 for cg1 and cg2
805 807 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
806 808
807 809 class cg2packer(cg1packer):
808 810 version = '02'
809 811 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
810 812
811 813 def __init__(self, repo, bundlecaps=None):
812 814 super(cg2packer, self).__init__(repo, bundlecaps)
813 815 if self._reorder is None:
814 816 # Since generaldelta is directly supported by cg2, reordering
815 817 # generally doesn't help, so we disable it by default (treating
816 818 # bundle.reorder=auto just like bundle.reorder=False).
817 819 self._reorder = False
818 820
819 821 def deltaparent(self, revlog, rev, p1, p2, prev):
820 822 dp = revlog.deltaparent(rev)
821 823 if dp == nullrev and revlog.storedeltachains:
822 824 # Avoid sending full revisions when delta parent is null. Pick prev
823 825 # in that case. It's tempting to pick p1 in this case, as p1 will
824 826 # be smaller in the common case. However, computing a delta against
825 827 # p1 may require resolving the raw text of p1, which could be
826 828 # expensive. The revlog caches should have prev cached, meaning
827 829 # less CPU for changegroup generation. There is likely room to add
828 830 # a flag and/or config option to control this behavior.
829 831 return prev
830 832 elif dp == nullrev:
831 833 # revlog is configured to use full snapshot for a reason,
832 834 # stick to full snapshot.
833 835 return nullrev
834 836 elif dp not in (p1, p2, prev):
835 837 # Pick prev when we can't be sure remote has the base revision.
836 838 return prev
837 839 else:
838 840 return dp
839 841
840 842 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
841 843 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
842 844 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
843 845
844 846 class cg3packer(cg2packer):
845 847 version = '03'
846 848 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
847 849
848 850 def _packmanifests(self, dir, mfnodes, lookuplinknode):
849 851 if dir:
850 852 yield self.fileheader(dir)
851 853 for chunk in self.group(mfnodes, self._repo.manifest.dirlog(dir),
852 854 lookuplinknode, units=_('manifests')):
853 855 yield chunk
854 856
855 857 def _manifestsdone(self):
856 858 return self.close()
857 859
858 860 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
859 861 return struct.pack(
860 862 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
861 863
862 864 _packermap = {'01': (cg1packer, cg1unpacker),
863 865 # cg2 adds support for exchanging generaldelta
864 866 '02': (cg2packer, cg2unpacker),
865 867 # cg3 adds support for exchanging revlog flags and treemanifests
866 868 '03': (cg3packer, cg3unpacker),
867 869 }
868 870
869 871 def allsupportedversions(ui):
870 872 versions = set(_packermap.keys())
871 873 versions.discard('03')
872 874 if (ui.configbool('experimental', 'changegroup3') or
873 875 ui.configbool('experimental', 'treemanifest')):
874 876 versions.add('03')
875 877 return versions
876 878
877 879 # Changegroup versions that can be applied to the repo
878 880 def supportedincomingversions(repo):
879 881 versions = allsupportedversions(repo.ui)
880 882 if 'treemanifest' in repo.requirements:
881 883 versions.add('03')
882 884 return versions
883 885
884 886 # Changegroup versions that can be created from the repo
885 887 def supportedoutgoingversions(repo):
886 888 versions = allsupportedversions(repo.ui)
887 889 if 'treemanifest' in repo.requirements:
888 890 # Versions 01 and 02 support only flat manifests and it's just too
889 891 # expensive to convert between the flat manifest and tree manifest on
890 892 # the fly. Since tree manifests are hashed differently, all of history
891 893 # would have to be converted. Instead, we simply don't even pretend to
892 894 # support versions 01 and 02.
893 895 versions.discard('01')
894 896 versions.discard('02')
895 897 versions.add('03')
896 898 return versions
897 899
898 900 def safeversion(repo):
899 901 # Finds the smallest version that it's safe to assume clients of the repo
900 902 # will support. For example, all hg versions that support generaldelta also
901 903 # support changegroup 02.
902 904 versions = supportedoutgoingversions(repo)
903 905 if 'generaldelta' in repo.requirements:
904 906 versions.discard('01')
905 907 assert versions
906 908 return min(versions)
907 909
908 910 def getbundler(version, repo, bundlecaps=None):
909 911 assert version in supportedoutgoingversions(repo)
910 912 return _packermap[version][0](repo, bundlecaps)
911 913
912 914 def getunbundler(version, fh, alg, extras=None):
913 915 return _packermap[version][1](fh, alg, extras=extras)
914 916
915 917 def _changegroupinfo(repo, nodes, source):
916 918 if repo.ui.verbose or source == 'bundle':
917 919 repo.ui.status(_("%d changesets found\n") % len(nodes))
918 920 if repo.ui.debugflag:
919 921 repo.ui.debug("list of changesets:\n")
920 922 for node in nodes:
921 923 repo.ui.debug("%s\n" % hex(node))
922 924
923 925 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
924 926 repo = repo.unfiltered()
925 927 commonrevs = outgoing.common
926 928 csets = outgoing.missing
927 929 heads = outgoing.missingheads
928 930 # We go through the fast path if we get told to, or if all (unfiltered
929 931 # heads have been requested (since we then know there all linkrevs will
930 932 # be pulled by the client).
931 933 heads.sort()
932 934 fastpathlinkrev = fastpath or (
933 935 repo.filtername is None and heads == sorted(repo.heads()))
934 936
935 937 repo.hook('preoutgoing', throw=True, source=source)
936 938 _changegroupinfo(repo, csets, source)
937 939 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
938 940
939 941 def getsubset(repo, outgoing, bundler, source, fastpath=False):
940 942 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
941 943 return getunbundler(bundler.version, util.chunkbuffer(gengroup), None,
942 944 {'clcount': len(outgoing.missing)})
943 945
944 946 def changegroupsubset(repo, roots, heads, source, version='01'):
945 947 """Compute a changegroup consisting of all the nodes that are
946 948 descendants of any of the roots and ancestors of any of the heads.
947 949 Return a chunkbuffer object whose read() method will return
948 950 successive changegroup chunks.
949 951
950 952 It is fairly complex as determining which filenodes and which
951 953 manifest nodes need to be included for the changeset to be complete
952 954 is non-trivial.
953 955
954 956 Another wrinkle is doing the reverse, figuring out which changeset in
955 957 the changegroup a particular filenode or manifestnode belongs to.
956 958 """
957 959 outgoing = discovery.outgoing(repo, missingroots=roots, missingheads=heads)
958 960 bundler = getbundler(version, repo)
959 961 return getsubset(repo, outgoing, bundler, source)
960 962
961 963 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
962 964 version='01'):
963 965 """Like getbundle, but taking a discovery.outgoing as an argument.
964 966
965 967 This is only implemented for local repos and reuses potentially
966 968 precomputed sets in outgoing. Returns a raw changegroup generator."""
967 969 if not outgoing.missing:
968 970 return None
969 971 bundler = getbundler(version, repo, bundlecaps)
970 972 return getsubsetraw(repo, outgoing, bundler, source)
971 973
972 974 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None,
973 975 version='01'):
974 976 """Like getbundle, but taking a discovery.outgoing as an argument.
975 977
976 978 This is only implemented for local repos and reuses potentially
977 979 precomputed sets in outgoing."""
978 980 if not outgoing.missing:
979 981 return None
980 982 bundler = getbundler(version, repo, bundlecaps)
981 983 return getsubset(repo, outgoing, bundler, source)
982 984
983 985 def getchangegroup(repo, source, outgoing, bundlecaps=None,
984 986 version='01'):
985 987 """Like changegroupsubset, but returns the set difference between the
986 988 ancestors of heads and the ancestors common.
987 989
988 990 If heads is None, use the local heads. If common is None, use [nullid].
989 991
990 992 The nodes in common might not all be known locally due to the way the
991 993 current discovery protocol works.
992 994 """
993 995 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps,
994 996 version=version)
995 997
996 998 def changegroup(repo, basenodes, source):
997 999 # to avoid a race we use changegroupsubset() (issue1320)
998 1000 return changegroupsubset(repo, basenodes, repo.heads(), source)
999 1001
1000 1002 def _addchangegroupfiles(repo, source, revmap, trp, expectedfiles, needfiles):
1001 1003 revisions = 0
1002 1004 files = 0
1003 1005 for chunkdata in iter(source.filelogheader, {}):
1004 1006 files += 1
1005 1007 f = chunkdata["filename"]
1006 1008 repo.ui.debug("adding %s revisions\n" % f)
1007 1009 repo.ui.progress(_('files'), files, unit=_('files'),
1008 1010 total=expectedfiles)
1009 1011 fl = repo.file(f)
1010 1012 o = len(fl)
1011 1013 try:
1012 1014 if not fl.addgroup(source, revmap, trp):
1013 1015 raise error.Abort(_("received file revlog group is empty"))
1014 1016 except error.CensoredBaseError as e:
1015 1017 raise error.Abort(_("received delta base is censored: %s") % e)
1016 1018 revisions += len(fl) - o
1017 1019 if f in needfiles:
1018 1020 needs = needfiles[f]
1019 1021 for new in xrange(o, len(fl)):
1020 1022 n = fl.node(new)
1021 1023 if n in needs:
1022 1024 needs.remove(n)
1023 1025 else:
1024 1026 raise error.Abort(
1025 1027 _("received spurious file revlog entry"))
1026 1028 if not needs:
1027 1029 del needfiles[f]
1028 1030 repo.ui.progress(_('files'), None)
1029 1031
1030 1032 for f, needs in needfiles.iteritems():
1031 1033 fl = repo.file(f)
1032 1034 for n in needs:
1033 1035 try:
1034 1036 fl.rev(n)
1035 1037 except error.LookupError:
1036 1038 raise error.Abort(
1037 1039 _('missing file data for %s:%s - run hg verify') %
1038 1040 (f, hex(n)))
1039 1041
1040 1042 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now