##// END OF EJS Templates
changegroup: cg3 has two empty groups *after* manifests...
Martin von Zweigbergk -
r27920:da5f2336 stable
parent child Browse files
Show More
@@ -1,1114 +1,1121 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullid,
19 19 nullrev,
20 20 short,
21 21 )
22 22
23 23 from . import (
24 24 branchmap,
25 25 dagutil,
26 26 discovery,
27 27 error,
28 28 mdiff,
29 29 phases,
30 30 util,
31 31 )
32 32
33 33 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
34 34 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
35 35 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
36 36
37 37 def readexactly(stream, n):
38 38 '''read n bytes from stream.read and abort if less was available'''
39 39 s = stream.read(n)
40 40 if len(s) < n:
41 41 raise error.Abort(_("stream ended unexpectedly"
42 42 " (got %d bytes, expected %d)")
43 43 % (len(s), n))
44 44 return s
45 45
46 46 def getchunk(stream):
47 47 """return the next chunk from stream as a string"""
48 48 d = readexactly(stream, 4)
49 49 l = struct.unpack(">l", d)[0]
50 50 if l <= 4:
51 51 if l:
52 52 raise error.Abort(_("invalid chunk length %d") % l)
53 53 return ""
54 54 return readexactly(stream, l - 4)
55 55
56 56 def chunkheader(length):
57 57 """return a changegroup chunk header (string)"""
58 58 return struct.pack(">l", length + 4)
59 59
60 60 def closechunk():
61 61 """return a changegroup chunk header (string) for a zero-length chunk"""
62 62 return struct.pack(">l", 0)
63 63
64 64 def combineresults(results):
65 65 """logic to combine 0 or more addchangegroup results into one"""
66 66 changedheads = 0
67 67 result = 1
68 68 for ret in results:
69 69 # If any changegroup result is 0, return 0
70 70 if ret == 0:
71 71 result = 0
72 72 break
73 73 if ret < -1:
74 74 changedheads += ret + 1
75 75 elif ret > 1:
76 76 changedheads += ret - 1
77 77 if changedheads > 0:
78 78 result = 1 + changedheads
79 79 elif changedheads < 0:
80 80 result = -1 + changedheads
81 81 return result
82 82
83 83 bundletypes = {
84 84 "": ("", None), # only when using unbundle on ssh and old http servers
85 85 # since the unification ssh accepts a header but there
86 86 # is no capability signaling it.
87 87 "HG20": (), # special-cased below
88 88 "HG10UN": ("HG10UN", None),
89 89 "HG10BZ": ("HG10", 'BZ'),
90 90 "HG10GZ": ("HG10GZ", 'GZ'),
91 91 }
92 92
93 93 # hgweb uses this list to communicate its preferred type
94 94 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
95 95
96 96 def writechunks(ui, chunks, filename, vfs=None):
97 97 """Write chunks to a file and return its filename.
98 98
99 99 The stream is assumed to be a bundle file.
100 100 Existing files will not be overwritten.
101 101 If no filename is specified, a temporary file is created.
102 102 """
103 103 fh = None
104 104 cleanup = None
105 105 try:
106 106 if filename:
107 107 if vfs:
108 108 fh = vfs.open(filename, "wb")
109 109 else:
110 110 fh = open(filename, "wb")
111 111 else:
112 112 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
113 113 fh = os.fdopen(fd, "wb")
114 114 cleanup = filename
115 115 for c in chunks:
116 116 fh.write(c)
117 117 cleanup = None
118 118 return filename
119 119 finally:
120 120 if fh is not None:
121 121 fh.close()
122 122 if cleanup is not None:
123 123 if filename and vfs:
124 124 vfs.unlink(cleanup)
125 125 else:
126 126 os.unlink(cleanup)
127 127
128 128 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
129 129 """Write a bundle file and return its filename.
130 130
131 131 Existing files will not be overwritten.
132 132 If no filename is specified, a temporary file is created.
133 133 bz2 compression can be turned off.
134 134 The bundle file will be deleted in case of errors.
135 135 """
136 136
137 137 if bundletype == "HG20":
138 138 from . import bundle2
139 139 bundle = bundle2.bundle20(ui)
140 140 bundle.setcompression(compression)
141 141 part = bundle.newpart('changegroup', data=cg.getchunks())
142 142 part.addparam('version', cg.version)
143 143 chunkiter = bundle.getchunks()
144 144 else:
145 145 # compression argument is only for the bundle2 case
146 146 assert compression is None
147 147 if cg.version != '01':
148 148 raise error.Abort(_('old bundle types only supports v1 '
149 149 'changegroups'))
150 150 header, comp = bundletypes[bundletype]
151 151 if comp not in util.compressors:
152 152 raise error.Abort(_('unknown stream compression type: %s')
153 153 % comp)
154 154 z = util.compressors[comp]()
155 155 subchunkiter = cg.getchunks()
156 156 def chunkiter():
157 157 yield header
158 158 for chunk in subchunkiter:
159 159 yield z.compress(chunk)
160 160 yield z.flush()
161 161 chunkiter = chunkiter()
162 162
163 163 # parse the changegroup data, otherwise we will block
164 164 # in case of sshrepo because we don't know the end of the stream
165 165
166 166 # an empty chunkgroup is the end of the changegroup
167 167 # a changegroup has at least 2 chunkgroups (changelog and manifest).
168 168 # after that, an empty chunkgroup is the end of the changegroup
169 169 return writechunks(ui, chunkiter, filename, vfs=vfs)
170 170
171 171 class cg1unpacker(object):
172 172 """Unpacker for cg1 changegroup streams.
173 173
174 174 A changegroup unpacker handles the framing of the revision data in
175 175 the wire format. Most consumers will want to use the apply()
176 176 method to add the changes from the changegroup to a repository.
177 177
178 178 If you're forwarding a changegroup unmodified to another consumer,
179 179 use getchunks(), which returns an iterator of changegroup
180 180 chunks. This is mostly useful for cases where you need to know the
181 181 data stream has ended by observing the end of the changegroup.
182 182
183 183 deltachunk() is useful only if you're applying delta data. Most
184 184 consumers should prefer apply() instead.
185 185
186 186 A few other public methods exist. Those are used only for
187 187 bundlerepo and some debug commands - their use is discouraged.
188 188 """
189 189 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
190 190 deltaheadersize = struct.calcsize(deltaheader)
191 191 version = '01'
192 _grouplistcount = 1 # One list of files after the manifests
193
192 194 def __init__(self, fh, alg):
193 195 if alg == 'UN':
194 196 alg = None # get more modern without breaking too much
195 197 if not alg in util.decompressors:
196 198 raise error.Abort(_('unknown stream compression type: %s')
197 199 % alg)
198 200 if alg == 'BZ':
199 201 alg = '_truncatedBZ'
200 202 self._stream = util.decompressors[alg](fh)
201 203 self._type = alg
202 204 self.callback = None
203 205
204 206 # These methods (compressed, read, seek, tell) all appear to only
205 207 # be used by bundlerepo, but it's a little hard to tell.
206 208 def compressed(self):
207 209 return self._type is not None
208 210 def read(self, l):
209 211 return self._stream.read(l)
210 212 def seek(self, pos):
211 213 return self._stream.seek(pos)
212 214 def tell(self):
213 215 return self._stream.tell()
214 216 def close(self):
215 217 return self._stream.close()
216 218
217 219 def _chunklength(self):
218 220 d = readexactly(self._stream, 4)
219 221 l = struct.unpack(">l", d)[0]
220 222 if l <= 4:
221 223 if l:
222 224 raise error.Abort(_("invalid chunk length %d") % l)
223 225 return 0
224 226 if self.callback:
225 227 self.callback()
226 228 return l - 4
227 229
228 230 def changelogheader(self):
229 231 """v10 does not have a changelog header chunk"""
230 232 return {}
231 233
232 234 def manifestheader(self):
233 235 """v10 does not have a manifest header chunk"""
234 236 return {}
235 237
236 238 def filelogheader(self):
237 239 """return the header of the filelogs chunk, v10 only has the filename"""
238 240 l = self._chunklength()
239 241 if not l:
240 242 return {}
241 243 fname = readexactly(self._stream, l)
242 244 return {'filename': fname}
243 245
244 246 def _deltaheader(self, headertuple, prevnode):
245 247 node, p1, p2, cs = headertuple
246 248 if prevnode is None:
247 249 deltabase = p1
248 250 else:
249 251 deltabase = prevnode
250 252 flags = 0
251 253 return node, p1, p2, deltabase, cs, flags
252 254
253 255 def deltachunk(self, prevnode):
254 256 l = self._chunklength()
255 257 if not l:
256 258 return {}
257 259 headerdata = readexactly(self._stream, self.deltaheadersize)
258 260 header = struct.unpack(self.deltaheader, headerdata)
259 261 delta = readexactly(self._stream, l - self.deltaheadersize)
260 262 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
261 263 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
262 264 'deltabase': deltabase, 'delta': delta, 'flags': flags}
263 265
264 266 def getchunks(self):
265 267 """returns all the chunks contains in the bundle
266 268
267 269 Used when you need to forward the binary stream to a file or another
268 270 network API. To do so, it parse the changegroup data, otherwise it will
269 271 block in case of sshrepo because it don't know the end of the stream.
270 272 """
271 273 # an empty chunkgroup is the end of the changegroup
272 274 # a changegroup has at least 2 chunkgroups (changelog and manifest).
273 # after that, an empty chunkgroup is the end of the changegroup
274 empty = False
275 # after that, changegroup versions 1 and 2 have a series of groups
276 # with one group per file. changegroup 3 has a series of directory
277 # manifests before the files.
275 278 count = 0
276 while not empty or count <= 2:
279 emptycount = 0
280 while emptycount < self._grouplistcount:
277 281 empty = True
278 282 count += 1
279 283 while True:
280 284 chunk = getchunk(self)
281 285 if not chunk:
286 if empty and count > 2:
287 emptycount += 1
282 288 break
283 289 empty = False
284 290 yield chunkheader(len(chunk))
285 291 pos = 0
286 292 while pos < len(chunk):
287 293 next = pos + 2**20
288 294 yield chunk[pos:next]
289 295 pos = next
290 296 yield closechunk()
291 297
292 298 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
293 299 # We know that we'll never have more manifests than we had
294 300 # changesets.
295 301 self.callback = prog(_('manifests'), numchanges)
296 302 # no need to check for empty manifest group here:
297 303 # if the result of the merge of 1 and 2 is the same in 3 and 4,
298 304 # no new manifest will be created and the manifest group will
299 305 # be empty during the pull
300 306 self.manifestheader()
301 307 repo.manifest.addgroup(self, revmap, trp)
302 308 repo.ui.progress(_('manifests'), None)
303 309
304 310 def apply(self, repo, srctype, url, emptyok=False,
305 311 targetphase=phases.draft, expectedtotal=None):
306 312 """Add the changegroup returned by source.read() to this repo.
307 313 srctype is a string like 'push', 'pull', or 'unbundle'. url is
308 314 the URL of the repo where this changegroup is coming from.
309 315
310 316 Return an integer summarizing the change to this repo:
311 317 - nothing changed or no source: 0
312 318 - more heads than before: 1+added heads (2..n)
313 319 - fewer heads than before: -1-removed heads (-2..-n)
314 320 - number of heads stays the same: 1
315 321 """
316 322 repo = repo.unfiltered()
317 323 def csmap(x):
318 324 repo.ui.debug("add changeset %s\n" % short(x))
319 325 return len(cl)
320 326
321 327 def revmap(x):
322 328 return cl.rev(x)
323 329
324 330 changesets = files = revisions = 0
325 331
326 332 try:
327 333 with repo.transaction("\n".join([srctype,
328 334 util.hidepassword(url)])) as tr:
329 335 # The transaction could have been created before and already
330 336 # carries source information. In this case we use the top
331 337 # level data. We overwrite the argument because we need to use
332 338 # the top level value (if they exist) in this function.
333 339 srctype = tr.hookargs.setdefault('source', srctype)
334 340 url = tr.hookargs.setdefault('url', url)
335 341 repo.hook('prechangegroup', throw=True, **tr.hookargs)
336 342
337 343 # write changelog data to temp files so concurrent readers
338 344 # will not see an inconsistent view
339 345 cl = repo.changelog
340 346 cl.delayupdate(tr)
341 347 oldheads = cl.heads()
342 348
343 349 trp = weakref.proxy(tr)
344 350 # pull off the changeset group
345 351 repo.ui.status(_("adding changesets\n"))
346 352 clstart = len(cl)
347 353 class prog(object):
348 354 def __init__(self, step, total):
349 355 self._step = step
350 356 self._total = total
351 357 self._count = 1
352 358 def __call__(self):
353 359 repo.ui.progress(self._step, self._count,
354 360 unit=_('chunks'), total=self._total)
355 361 self._count += 1
356 362 self.callback = prog(_('changesets'), expectedtotal)
357 363
358 364 efiles = set()
359 365 def onchangelog(cl, node):
360 366 efiles.update(cl.read(node)[3])
361 367
362 368 self.changelogheader()
363 369 srccontent = cl.addgroup(self, csmap, trp,
364 370 addrevisioncb=onchangelog)
365 371 efiles = len(efiles)
366 372
367 373 if not (srccontent or emptyok):
368 374 raise error.Abort(_("received changelog group is empty"))
369 375 clend = len(cl)
370 376 changesets = clend - clstart
371 377 repo.ui.progress(_('changesets'), None)
372 378
373 379 # pull off the manifest group
374 380 repo.ui.status(_("adding manifests\n"))
375 381 self._unpackmanifests(repo, revmap, trp, prog, changesets)
376 382
377 383 needfiles = {}
378 384 if repo.ui.configbool('server', 'validate', default=False):
379 385 # validate incoming csets have their manifests
380 386 for cset in xrange(clstart, clend):
381 387 mfnode = repo.changelog.read(
382 388 repo.changelog.node(cset))[0]
383 389 mfest = repo.manifest.readdelta(mfnode)
384 390 # store file nodes we must see
385 391 for f, n in mfest.iteritems():
386 392 needfiles.setdefault(f, set()).add(n)
387 393
388 394 # process the files
389 395 repo.ui.status(_("adding file changes\n"))
390 396 self.callback = None
391 397 pr = prog(_('files'), efiles)
392 398 newrevs, newfiles = _addchangegroupfiles(
393 399 repo, self, revmap, trp, pr, needfiles)
394 400 revisions += newrevs
395 401 files += newfiles
396 402
397 403 dh = 0
398 404 if oldheads:
399 405 heads = cl.heads()
400 406 dh = len(heads) - len(oldheads)
401 407 for h in heads:
402 408 if h not in oldheads and repo[h].closesbranch():
403 409 dh -= 1
404 410 htext = ""
405 411 if dh:
406 412 htext = _(" (%+d heads)") % dh
407 413
408 414 repo.ui.status(_("added %d changesets"
409 415 " with %d changes to %d files%s\n")
410 416 % (changesets, revisions, files, htext))
411 417 repo.invalidatevolatilesets()
412 418
413 419 if changesets > 0:
414 420 if 'node' not in tr.hookargs:
415 421 tr.hookargs['node'] = hex(cl.node(clstart))
416 422 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
417 423 hookargs = dict(tr.hookargs)
418 424 else:
419 425 hookargs = dict(tr.hookargs)
420 426 hookargs['node'] = hex(cl.node(clstart))
421 427 hookargs['node_last'] = hex(cl.node(clend - 1))
422 428 repo.hook('pretxnchangegroup', throw=True, **hookargs)
423 429
424 430 added = [cl.node(r) for r in xrange(clstart, clend)]
425 431 publishing = repo.publishing()
426 432 if srctype in ('push', 'serve'):
427 433 # Old servers can not push the boundary themselves.
428 434 # New servers won't push the boundary if changeset already
429 435 # exists locally as secret
430 436 #
431 437 # We should not use added here but the list of all change in
432 438 # the bundle
433 439 if publishing:
434 440 phases.advanceboundary(repo, tr, phases.public,
435 441 srccontent)
436 442 else:
437 443 # Those changesets have been pushed from the
438 444 # outside, their phases are going to be pushed
439 445 # alongside. Therefor `targetphase` is
440 446 # ignored.
441 447 phases.advanceboundary(repo, tr, phases.draft,
442 448 srccontent)
443 449 phases.retractboundary(repo, tr, phases.draft, added)
444 450 elif srctype != 'strip':
445 451 # publishing only alter behavior during push
446 452 #
447 453 # strip should not touch boundary at all
448 454 phases.retractboundary(repo, tr, targetphase, added)
449 455
450 456 if changesets > 0:
451 457 if srctype != 'strip':
452 458 # During strip, branchcache is invalid but
453 459 # coming call to `destroyed` will repair it.
454 460 # In other case we can safely update cache on
455 461 # disk.
456 462 branchmap.updatecache(repo.filtered('served'))
457 463
458 464 def runhooks():
459 465 # These hooks run when the lock releases, not when the
460 466 # transaction closes. So it's possible for the changelog
461 467 # to have changed since we last saw it.
462 468 if clstart >= len(repo):
463 469 return
464 470
465 471 # forcefully update the on-disk branch cache
466 472 repo.ui.debug("updating the branch cache\n")
467 473 repo.hook("changegroup", **hookargs)
468 474
469 475 for n in added:
470 476 args = hookargs.copy()
471 477 args['node'] = hex(n)
472 478 del args['node_last']
473 479 repo.hook("incoming", **args)
474 480
475 481 newheads = [h for h in repo.heads()
476 482 if h not in oldheads]
477 483 repo.ui.log("incoming",
478 484 "%s incoming changes - new heads: %s\n",
479 485 len(added),
480 486 ', '.join([hex(c[:6]) for c in newheads]))
481 487
482 488 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
483 489 lambda tr: repo._afterlock(runhooks))
484 490 finally:
485 491 repo.ui.flush()
486 492 # never return 0 here:
487 493 if dh < 0:
488 494 return dh - 1
489 495 else:
490 496 return dh + 1
491 497
492 498 class cg2unpacker(cg1unpacker):
493 499 """Unpacker for cg2 streams.
494 500
495 501 cg2 streams add support for generaldelta, so the delta header
496 502 format is slightly different. All other features about the data
497 503 remain the same.
498 504 """
499 505 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
500 506 deltaheadersize = struct.calcsize(deltaheader)
501 507 version = '02'
502 508
503 509 def _deltaheader(self, headertuple, prevnode):
504 510 node, p1, p2, deltabase, cs = headertuple
505 511 flags = 0
506 512 return node, p1, p2, deltabase, cs, flags
507 513
508 514 class cg3unpacker(cg2unpacker):
509 515 """Unpacker for cg3 streams.
510 516
511 517 cg3 streams add support for exchanging treemanifests and revlog
512 518 flags. It adds the revlog flags to the delta header and an empty chunk
513 519 separating manifests and files.
514 520 """
515 521 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
516 522 deltaheadersize = struct.calcsize(deltaheader)
517 523 version = '03'
524 _grouplistcount = 2 # One list of manifests and one list of files
518 525
519 526 def _deltaheader(self, headertuple, prevnode):
520 527 node, p1, p2, deltabase, cs, flags = headertuple
521 528 return node, p1, p2, deltabase, cs, flags
522 529
523 530 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
524 531 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog,
525 532 numchanges)
526 533 while True:
527 534 chunkdata = self.filelogheader()
528 535 if not chunkdata:
529 536 break
530 537 # If we get here, there are directory manifests in the changegroup
531 538 d = chunkdata["filename"]
532 539 repo.ui.debug("adding %s revisions\n" % d)
533 540 dirlog = repo.manifest.dirlog(d)
534 541 if not dirlog.addgroup(self, revmap, trp):
535 542 raise error.Abort(_("received dir revlog group is empty"))
536 543
537 544 class headerlessfixup(object):
538 545 def __init__(self, fh, h):
539 546 self._h = h
540 547 self._fh = fh
541 548 def read(self, n):
542 549 if self._h:
543 550 d, self._h = self._h[:n], self._h[n:]
544 551 if len(d) < n:
545 552 d += readexactly(self._fh, n - len(d))
546 553 return d
547 554 return readexactly(self._fh, n)
548 555
549 556 def _moddirs(files):
550 557 """Given a set of modified files, find the list of modified directories.
551 558
552 559 This returns a list of (path to changed dir, changed dir) tuples,
553 560 as that's what the one client needs anyway.
554 561
555 562 >>> _moddirs(['a/b/c.py', 'a/b/c.txt', 'a/d/e/f/g.txt', 'i.txt', ])
556 563 [('/', 'a/'), ('a/', 'b/'), ('a/', 'd/'), ('a/d/', 'e/'), ('a/d/e/', 'f/')]
557 564
558 565 """
559 566 alldirs = set()
560 567 for f in files:
561 568 path = f.split('/')[:-1]
562 569 for i in xrange(len(path) - 1, -1, -1):
563 570 dn = '/'.join(path[:i])
564 571 current = dn + '/', path[i] + '/'
565 572 if current in alldirs:
566 573 break
567 574 alldirs.add(current)
568 575 return sorted(alldirs)
569 576
570 577 class cg1packer(object):
571 578 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
572 579 version = '01'
573 580 def __init__(self, repo, bundlecaps=None):
574 581 """Given a source repo, construct a bundler.
575 582
576 583 bundlecaps is optional and can be used to specify the set of
577 584 capabilities which can be used to build the bundle.
578 585 """
579 586 # Set of capabilities we can use to build the bundle.
580 587 if bundlecaps is None:
581 588 bundlecaps = set()
582 589 self._bundlecaps = bundlecaps
583 590 # experimental config: bundle.reorder
584 591 reorder = repo.ui.config('bundle', 'reorder', 'auto')
585 592 if reorder == 'auto':
586 593 reorder = None
587 594 else:
588 595 reorder = util.parsebool(reorder)
589 596 self._repo = repo
590 597 self._reorder = reorder
591 598 self._progress = repo.ui.progress
592 599 if self._repo.ui.verbose and not self._repo.ui.debugflag:
593 600 self._verbosenote = self._repo.ui.note
594 601 else:
595 602 self._verbosenote = lambda s: None
596 603
597 604 def close(self):
598 605 return closechunk()
599 606
600 607 def fileheader(self, fname):
601 608 return chunkheader(len(fname)) + fname
602 609
603 610 def group(self, nodelist, revlog, lookup, units=None):
604 611 """Calculate a delta group, yielding a sequence of changegroup chunks
605 612 (strings).
606 613
607 614 Given a list of changeset revs, return a set of deltas and
608 615 metadata corresponding to nodes. The first delta is
609 616 first parent(nodelist[0]) -> nodelist[0], the receiver is
610 617 guaranteed to have this parent as it has all history before
611 618 these changesets. In the case firstparent is nullrev the
612 619 changegroup starts with a full revision.
613 620
614 621 If units is not None, progress detail will be generated, units specifies
615 622 the type of revlog that is touched (changelog, manifest, etc.).
616 623 """
617 624 # if we don't have any revisions touched by these changesets, bail
618 625 if len(nodelist) == 0:
619 626 yield self.close()
620 627 return
621 628
622 629 # for generaldelta revlogs, we linearize the revs; this will both be
623 630 # much quicker and generate a much smaller bundle
624 631 if (revlog._generaldelta and self._reorder is None) or self._reorder:
625 632 dag = dagutil.revlogdag(revlog)
626 633 revs = set(revlog.rev(n) for n in nodelist)
627 634 revs = dag.linearize(revs)
628 635 else:
629 636 revs = sorted([revlog.rev(n) for n in nodelist])
630 637
631 638 # add the parent of the first rev
632 639 p = revlog.parentrevs(revs[0])[0]
633 640 revs.insert(0, p)
634 641
635 642 # build deltas
636 643 total = len(revs) - 1
637 644 msgbundling = _('bundling')
638 645 for r in xrange(len(revs) - 1):
639 646 if units is not None:
640 647 self._progress(msgbundling, r + 1, unit=units, total=total)
641 648 prev, curr = revs[r], revs[r + 1]
642 649 linknode = lookup(revlog.node(curr))
643 650 for c in self.revchunk(revlog, curr, prev, linknode):
644 651 yield c
645 652
646 653 if units is not None:
647 654 self._progress(msgbundling, None)
648 655 yield self.close()
649 656
650 657 # filter any nodes that claim to be part of the known set
651 658 def prune(self, revlog, missing, commonrevs):
652 659 rr, rl = revlog.rev, revlog.linkrev
653 660 return [n for n in missing if rl(rr(n)) not in commonrevs]
654 661
655 662 def _packmanifests(self, mfnodes, tmfnodes, lookuplinknode):
656 663 """Pack flat manifests into a changegroup stream."""
657 664 ml = self._repo.manifest
658 665 size = 0
659 666 for chunk in self.group(
660 667 mfnodes, ml, lookuplinknode, units=_('manifests')):
661 668 size += len(chunk)
662 669 yield chunk
663 670 self._verbosenote(_('%8.i (manifests)\n') % size)
664 671 # It looks odd to assert this here, but tmfnodes doesn't get
665 672 # filled in until after we've called lookuplinknode for
666 673 # sending root manifests, so the only way to tell the streams
667 674 # got crossed is to check after we've done all the work.
668 675 assert not tmfnodes
669 676
670 677 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
671 678 '''yield a sequence of changegroup chunks (strings)'''
672 679 repo = self._repo
673 680 cl = repo.changelog
674 681 ml = repo.manifest
675 682
676 683 clrevorder = {}
677 684 mfs = {} # needed manifests
678 685 tmfnodes = {}
679 686 fnodes = {} # needed file nodes
680 687 # maps manifest node id -> set(changed files)
681 688 mfchangedfiles = {}
682 689
683 690 # Callback for the changelog, used to collect changed files and manifest
684 691 # nodes.
685 692 # Returns the linkrev node (identity in the changelog case).
686 693 def lookupcl(x):
687 694 c = cl.read(x)
688 695 clrevorder[x] = len(clrevorder)
689 696 n = c[0]
690 697 # record the first changeset introducing this manifest version
691 698 mfs.setdefault(n, x)
692 699 # Record a complete list of potentially-changed files in
693 700 # this manifest.
694 701 mfchangedfiles.setdefault(n, set()).update(c[3])
695 702 return x
696 703
697 704 self._verbosenote(_('uncompressed size of bundle content:\n'))
698 705 size = 0
699 706 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
700 707 size += len(chunk)
701 708 yield chunk
702 709 self._verbosenote(_('%8.i (changelog)\n') % size)
703 710
704 711 # We need to make sure that the linkrev in the changegroup refers to
705 712 # the first changeset that introduced the manifest or file revision.
706 713 # The fastpath is usually safer than the slowpath, because the filelogs
707 714 # are walked in revlog order.
708 715 #
709 716 # When taking the slowpath with reorder=None and the manifest revlog
710 717 # uses generaldelta, the manifest may be walked in the "wrong" order.
711 718 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
712 719 # cc0ff93d0c0c).
713 720 #
714 721 # When taking the fastpath, we are only vulnerable to reordering
715 722 # of the changelog itself. The changelog never uses generaldelta, so
716 723 # it is only reordered when reorder=True. To handle this case, we
717 724 # simply take the slowpath, which already has the 'clrevorder' logic.
718 725 # This was also fixed in cc0ff93d0c0c.
719 726 fastpathlinkrev = fastpathlinkrev and not self._reorder
720 727 # Treemanifests don't work correctly with fastpathlinkrev
721 728 # either, because we don't discover which directory nodes to
722 729 # send along with files. This could probably be fixed.
723 730 fastpathlinkrev = fastpathlinkrev and (
724 731 'treemanifest' not in repo.requirements)
725 732 # Callback for the manifest, used to collect linkrevs for filelog
726 733 # revisions.
727 734 # Returns the linkrev node (collected in lookupcl).
728 735 if fastpathlinkrev:
729 736 lookupmflinknode = mfs.__getitem__
730 737 else:
731 738 def lookupmflinknode(x):
732 739 """Callback for looking up the linknode for manifests.
733 740
734 741 Returns the linkrev node for the specified manifest.
735 742
736 743 SIDE EFFECT:
737 744
738 745 1) fclnodes gets populated with the list of relevant
739 746 file nodes if we're not using fastpathlinkrev
740 747 2) When treemanifests are in use, collects treemanifest nodes
741 748 to send
742 749
743 750 Note that this means manifests must be completely sent to
744 751 the client before you can trust the list of files and
745 752 treemanifests to send.
746 753 """
747 754 clnode = mfs[x]
748 755 # We no longer actually care about reading deltas of
749 756 # the manifest here, because we already know the list
750 757 # of changed files, so for treemanifests (which
751 758 # lazily-load anyway to *generate* a readdelta) we can
752 759 # just load them with read() and then we'll actually
753 760 # be able to correctly load node IDs from the
754 761 # submanifest entries.
755 762 if 'treemanifest' in repo.requirements:
756 763 mdata = ml.read(x)
757 764 else:
758 765 mdata = ml.readfast(x)
759 766 for f in mfchangedfiles[x]:
760 767 try:
761 768 n = mdata[f]
762 769 except KeyError:
763 770 continue
764 771 # record the first changeset introducing this filelog
765 772 # version
766 773 fclnodes = fnodes.setdefault(f, {})
767 774 fclnode = fclnodes.setdefault(n, clnode)
768 775 if clrevorder[clnode] < clrevorder[fclnode]:
769 776 fclnodes[n] = clnode
770 777 # gather list of changed treemanifest nodes
771 778 if 'treemanifest' in repo.requirements:
772 779 submfs = {'/': mdata}
773 780 for dn, bn in _moddirs(mfchangedfiles[x]):
774 781 submf = submfs[dn]
775 782 submf = submf._dirs[bn]
776 783 submfs[submf.dir()] = submf
777 784 tmfclnodes = tmfnodes.setdefault(submf.dir(), {})
778 785 tmfclnodes.setdefault(submf._node, clnode)
779 786 if clrevorder[clnode] < clrevorder[fclnode]:
780 787 tmfclnodes[n] = clnode
781 788 return clnode
782 789
783 790 mfnodes = self.prune(ml, mfs, commonrevs)
784 791 for x in self._packmanifests(
785 792 mfnodes, tmfnodes, lookupmflinknode):
786 793 yield x
787 794
788 795 mfs.clear()
789 796 clrevs = set(cl.rev(x) for x in clnodes)
790 797
791 798 if not fastpathlinkrev:
792 799 def linknodes(unused, fname):
793 800 return fnodes.get(fname, {})
794 801 else:
795 802 cln = cl.node
796 803 def linknodes(filerevlog, fname):
797 804 llr = filerevlog.linkrev
798 805 fln = filerevlog.node
799 806 revs = ((r, llr(r)) for r in filerevlog)
800 807 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
801 808
802 809 changedfiles = set()
803 810 for x in mfchangedfiles.itervalues():
804 811 changedfiles.update(x)
805 812 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
806 813 source):
807 814 yield chunk
808 815
809 816 yield self.close()
810 817
811 818 if clnodes:
812 819 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
813 820
814 821 # The 'source' parameter is useful for extensions
815 822 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
816 823 repo = self._repo
817 824 progress = self._progress
818 825 msgbundling = _('bundling')
819 826
820 827 total = len(changedfiles)
821 828 # for progress output
822 829 msgfiles = _('files')
823 830 for i, fname in enumerate(sorted(changedfiles)):
824 831 filerevlog = repo.file(fname)
825 832 if not filerevlog:
826 833 raise error.Abort(_("empty or missing revlog for %s") % fname)
827 834
828 835 linkrevnodes = linknodes(filerevlog, fname)
829 836 # Lookup for filenodes, we collected the linkrev nodes above in the
830 837 # fastpath case and with lookupmf in the slowpath case.
831 838 def lookupfilelog(x):
832 839 return linkrevnodes[x]
833 840
834 841 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
835 842 if filenodes:
836 843 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
837 844 total=total)
838 845 h = self.fileheader(fname)
839 846 size = len(h)
840 847 yield h
841 848 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
842 849 size += len(chunk)
843 850 yield chunk
844 851 self._verbosenote(_('%8.i %s\n') % (size, fname))
845 852 progress(msgbundling, None)
846 853
847 854 def deltaparent(self, revlog, rev, p1, p2, prev):
848 855 return prev
849 856
850 857 def revchunk(self, revlog, rev, prev, linknode):
851 858 node = revlog.node(rev)
852 859 p1, p2 = revlog.parentrevs(rev)
853 860 base = self.deltaparent(revlog, rev, p1, p2, prev)
854 861
855 862 prefix = ''
856 863 if revlog.iscensored(base) or revlog.iscensored(rev):
857 864 try:
858 865 delta = revlog.revision(node)
859 866 except error.CensoredNodeError as e:
860 867 delta = e.tombstone
861 868 if base == nullrev:
862 869 prefix = mdiff.trivialdiffheader(len(delta))
863 870 else:
864 871 baselen = revlog.rawsize(base)
865 872 prefix = mdiff.replacediffheader(baselen, len(delta))
866 873 elif base == nullrev:
867 874 delta = revlog.revision(node)
868 875 prefix = mdiff.trivialdiffheader(len(delta))
869 876 else:
870 877 delta = revlog.revdiff(base, rev)
871 878 p1n, p2n = revlog.parents(node)
872 879 basenode = revlog.node(base)
873 880 flags = revlog.flags(rev)
874 881 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
875 882 meta += prefix
876 883 l = len(meta) + len(delta)
877 884 yield chunkheader(l)
878 885 yield meta
879 886 yield delta
880 887 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
881 888 # do nothing with basenode, it is implicitly the previous one in HG10
882 889 # do nothing with flags, it is implicitly 0 for cg1 and cg2
883 890 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
884 891
885 892 class cg2packer(cg1packer):
886 893 version = '02'
887 894 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
888 895
889 896 def __init__(self, repo, bundlecaps=None):
890 897 super(cg2packer, self).__init__(repo, bundlecaps)
891 898 if self._reorder is None:
892 899 # Since generaldelta is directly supported by cg2, reordering
893 900 # generally doesn't help, so we disable it by default (treating
894 901 # bundle.reorder=auto just like bundle.reorder=False).
895 902 self._reorder = False
896 903
897 904 def deltaparent(self, revlog, rev, p1, p2, prev):
898 905 dp = revlog.deltaparent(rev)
899 906 # avoid storing full revisions; pick prev in those cases
900 907 # also pick prev when we can't be sure remote has dp
901 908 if dp == nullrev or (dp != p1 and dp != p2 and dp != prev):
902 909 return prev
903 910 return dp
904 911
905 912 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
906 913 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
907 914 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
908 915
909 916 class cg3packer(cg2packer):
910 917 version = '03'
911 918 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
912 919
913 920 def _packmanifests(self, mfnodes, tmfnodes, lookuplinknode):
914 921 # Note that debug prints are super confusing in this code, as
915 922 # tmfnodes gets populated by the calls to lookuplinknode in
916 923 # the superclass's manifest packer. In the future we should
917 924 # probably see if we can refactor this somehow to be less
918 925 # confusing.
919 926 for x in super(cg3packer, self)._packmanifests(
920 927 mfnodes, {}, lookuplinknode):
921 928 yield x
922 929 dirlog = self._repo.manifest.dirlog
923 930 for name, nodes in tmfnodes.iteritems():
924 931 # For now, directory headers are simply file headers with
925 932 # a trailing '/' on the path (already in the name).
926 933 yield self.fileheader(name)
927 934 for chunk in self.group(nodes, dirlog(name), nodes.get):
928 935 yield chunk
929 936 yield self.close()
930 937
931 938 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
932 939 return struct.pack(
933 940 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
934 941
935 942 _packermap = {'01': (cg1packer, cg1unpacker),
936 943 # cg2 adds support for exchanging generaldelta
937 944 '02': (cg2packer, cg2unpacker),
938 945 # cg3 adds support for exchanging revlog flags and treemanifests
939 946 '03': (cg3packer, cg3unpacker),
940 947 }
941 948
942 949 def supportedversions(repo):
943 950 versions = _packermap.keys()
944 951 cg3 = ('treemanifest' in repo.requirements or
945 952 repo.ui.configbool('experimental', 'changegroup3') or
946 953 repo.ui.configbool('experimental', 'treemanifest'))
947 954 if not cg3:
948 955 versions.remove('03')
949 956 return versions
950 957
951 958 def getbundler(version, repo, bundlecaps=None):
952 959 assert version in supportedversions(repo)
953 960 return _packermap[version][0](repo, bundlecaps)
954 961
955 962 def getunbundler(version, fh, alg):
956 963 return _packermap[version][1](fh, alg)
957 964
958 965 def _changegroupinfo(repo, nodes, source):
959 966 if repo.ui.verbose or source == 'bundle':
960 967 repo.ui.status(_("%d changesets found\n") % len(nodes))
961 968 if repo.ui.debugflag:
962 969 repo.ui.debug("list of changesets:\n")
963 970 for node in nodes:
964 971 repo.ui.debug("%s\n" % hex(node))
965 972
966 973 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
967 974 repo = repo.unfiltered()
968 975 commonrevs = outgoing.common
969 976 csets = outgoing.missing
970 977 heads = outgoing.missingheads
971 978 # We go through the fast path if we get told to, or if all (unfiltered
972 979 # heads have been requested (since we then know there all linkrevs will
973 980 # be pulled by the client).
974 981 heads.sort()
975 982 fastpathlinkrev = fastpath or (
976 983 repo.filtername is None and heads == sorted(repo.heads()))
977 984
978 985 repo.hook('preoutgoing', throw=True, source=source)
979 986 _changegroupinfo(repo, csets, source)
980 987 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
981 988
982 989 def getsubset(repo, outgoing, bundler, source, fastpath=False):
983 990 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
984 991 return getunbundler(bundler.version, util.chunkbuffer(gengroup), None)
985 992
986 993 def changegroupsubset(repo, roots, heads, source, version='01'):
987 994 """Compute a changegroup consisting of all the nodes that are
988 995 descendants of any of the roots and ancestors of any of the heads.
989 996 Return a chunkbuffer object whose read() method will return
990 997 successive changegroup chunks.
991 998
992 999 It is fairly complex as determining which filenodes and which
993 1000 manifest nodes need to be included for the changeset to be complete
994 1001 is non-trivial.
995 1002
996 1003 Another wrinkle is doing the reverse, figuring out which changeset in
997 1004 the changegroup a particular filenode or manifestnode belongs to.
998 1005 """
999 1006 cl = repo.changelog
1000 1007 if not roots:
1001 1008 roots = [nullid]
1002 1009 discbases = []
1003 1010 for n in roots:
1004 1011 discbases.extend([p for p in cl.parents(n) if p != nullid])
1005 1012 # TODO: remove call to nodesbetween.
1006 1013 csets, roots, heads = cl.nodesbetween(roots, heads)
1007 1014 included = set(csets)
1008 1015 discbases = [n for n in discbases if n not in included]
1009 1016 outgoing = discovery.outgoing(cl, discbases, heads)
1010 1017 bundler = getbundler(version, repo)
1011 1018 return getsubset(repo, outgoing, bundler, source)
1012 1019
1013 1020 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
1014 1021 version='01'):
1015 1022 """Like getbundle, but taking a discovery.outgoing as an argument.
1016 1023
1017 1024 This is only implemented for local repos and reuses potentially
1018 1025 precomputed sets in outgoing. Returns a raw changegroup generator."""
1019 1026 if not outgoing.missing:
1020 1027 return None
1021 1028 bundler = getbundler(version, repo, bundlecaps)
1022 1029 return getsubsetraw(repo, outgoing, bundler, source)
1023 1030
1024 1031 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None,
1025 1032 version='01'):
1026 1033 """Like getbundle, but taking a discovery.outgoing as an argument.
1027 1034
1028 1035 This is only implemented for local repos and reuses potentially
1029 1036 precomputed sets in outgoing."""
1030 1037 if not outgoing.missing:
1031 1038 return None
1032 1039 bundler = getbundler(version, repo, bundlecaps)
1033 1040 return getsubset(repo, outgoing, bundler, source)
1034 1041
1035 1042 def computeoutgoing(repo, heads, common):
1036 1043 """Computes which revs are outgoing given a set of common
1037 1044 and a set of heads.
1038 1045
1039 1046 This is a separate function so extensions can have access to
1040 1047 the logic.
1041 1048
1042 1049 Returns a discovery.outgoing object.
1043 1050 """
1044 1051 cl = repo.changelog
1045 1052 if common:
1046 1053 hasnode = cl.hasnode
1047 1054 common = [n for n in common if hasnode(n)]
1048 1055 else:
1049 1056 common = [nullid]
1050 1057 if not heads:
1051 1058 heads = cl.heads()
1052 1059 return discovery.outgoing(cl, common, heads)
1053 1060
1054 1061 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None,
1055 1062 version='01'):
1056 1063 """Like changegroupsubset, but returns the set difference between the
1057 1064 ancestors of heads and the ancestors common.
1058 1065
1059 1066 If heads is None, use the local heads. If common is None, use [nullid].
1060 1067
1061 1068 The nodes in common might not all be known locally due to the way the
1062 1069 current discovery protocol works.
1063 1070 """
1064 1071 outgoing = computeoutgoing(repo, heads, common)
1065 1072 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps,
1066 1073 version=version)
1067 1074
1068 1075 def changegroup(repo, basenodes, source):
1069 1076 # to avoid a race we use changegroupsubset() (issue1320)
1070 1077 return changegroupsubset(repo, basenodes, repo.heads(), source)
1071 1078
1072 1079 def _addchangegroupfiles(repo, source, revmap, trp, pr, needfiles):
1073 1080 revisions = 0
1074 1081 files = 0
1075 1082 while True:
1076 1083 chunkdata = source.filelogheader()
1077 1084 if not chunkdata:
1078 1085 break
1079 1086 f = chunkdata["filename"]
1080 1087 repo.ui.debug("adding %s revisions\n" % f)
1081 1088 pr()
1082 1089 fl = repo.file(f)
1083 1090 o = len(fl)
1084 1091 try:
1085 1092 if not fl.addgroup(source, revmap, trp):
1086 1093 raise error.Abort(_("received file revlog group is empty"))
1087 1094 except error.CensoredBaseError as e:
1088 1095 raise error.Abort(_("received delta base is censored: %s") % e)
1089 1096 revisions += len(fl) - o
1090 1097 files += 1
1091 1098 if f in needfiles:
1092 1099 needs = needfiles[f]
1093 1100 for new in xrange(o, len(fl)):
1094 1101 n = fl.node(new)
1095 1102 if n in needs:
1096 1103 needs.remove(n)
1097 1104 else:
1098 1105 raise error.Abort(
1099 1106 _("received spurious file revlog entry"))
1100 1107 if not needs:
1101 1108 del needfiles[f]
1102 1109 repo.ui.progress(_('files'), None)
1103 1110
1104 1111 for f, needs in needfiles.iteritems():
1105 1112 fl = repo.file(f)
1106 1113 for n in needs:
1107 1114 try:
1108 1115 fl.rev(n)
1109 1116 except error.LookupError:
1110 1117 raise error.Abort(
1111 1118 _('missing file data for %s:%s - run hg verify') %
1112 1119 (f, hex(n)))
1113 1120
1114 1121 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now