##// END OF EJS Templates
changegroup: drop special-casing of flat manifests...
Martin von Zweigbergk -
r28241:a4286175 default
parent child Browse files
Show More
@@ -1,1130 +1,1113 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullid,
19 19 nullrev,
20 20 short,
21 21 )
22 22
23 23 from . import (
24 24 branchmap,
25 25 dagutil,
26 26 discovery,
27 27 error,
28 28 mdiff,
29 29 phases,
30 30 util,
31 31 )
32 32
33 33 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
34 34 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
35 35 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
36 36
37 37 def readexactly(stream, n):
38 38 '''read n bytes from stream.read and abort if less was available'''
39 39 s = stream.read(n)
40 40 if len(s) < n:
41 41 raise error.Abort(_("stream ended unexpectedly"
42 42 " (got %d bytes, expected %d)")
43 43 % (len(s), n))
44 44 return s
45 45
46 46 def getchunk(stream):
47 47 """return the next chunk from stream as a string"""
48 48 d = readexactly(stream, 4)
49 49 l = struct.unpack(">l", d)[0]
50 50 if l <= 4:
51 51 if l:
52 52 raise error.Abort(_("invalid chunk length %d") % l)
53 53 return ""
54 54 return readexactly(stream, l - 4)
55 55
56 56 def chunkheader(length):
57 57 """return a changegroup chunk header (string)"""
58 58 return struct.pack(">l", length + 4)
59 59
60 60 def closechunk():
61 61 """return a changegroup chunk header (string) for a zero-length chunk"""
62 62 return struct.pack(">l", 0)
63 63
64 64 def combineresults(results):
65 65 """logic to combine 0 or more addchangegroup results into one"""
66 66 changedheads = 0
67 67 result = 1
68 68 for ret in results:
69 69 # If any changegroup result is 0, return 0
70 70 if ret == 0:
71 71 result = 0
72 72 break
73 73 if ret < -1:
74 74 changedheads += ret + 1
75 75 elif ret > 1:
76 76 changedheads += ret - 1
77 77 if changedheads > 0:
78 78 result = 1 + changedheads
79 79 elif changedheads < 0:
80 80 result = -1 + changedheads
81 81 return result
82 82
83 83 bundletypes = {
84 84 "": ("", None), # only when using unbundle on ssh and old http servers
85 85 # since the unification ssh accepts a header but there
86 86 # is no capability signaling it.
87 87 "HG20": (), # special-cased below
88 88 "HG10UN": ("HG10UN", None),
89 89 "HG10BZ": ("HG10", 'BZ'),
90 90 "HG10GZ": ("HG10GZ", 'GZ'),
91 91 }
92 92
93 93 # hgweb uses this list to communicate its preferred type
94 94 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
95 95
96 96 def writechunks(ui, chunks, filename, vfs=None):
97 97 """Write chunks to a file and return its filename.
98 98
99 99 The stream is assumed to be a bundle file.
100 100 Existing files will not be overwritten.
101 101 If no filename is specified, a temporary file is created.
102 102 """
103 103 fh = None
104 104 cleanup = None
105 105 try:
106 106 if filename:
107 107 if vfs:
108 108 fh = vfs.open(filename, "wb")
109 109 else:
110 110 fh = open(filename, "wb")
111 111 else:
112 112 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
113 113 fh = os.fdopen(fd, "wb")
114 114 cleanup = filename
115 115 for c in chunks:
116 116 fh.write(c)
117 117 cleanup = None
118 118 return filename
119 119 finally:
120 120 if fh is not None:
121 121 fh.close()
122 122 if cleanup is not None:
123 123 if filename and vfs:
124 124 vfs.unlink(cleanup)
125 125 else:
126 126 os.unlink(cleanup)
127 127
128 128 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
129 129 """Write a bundle file and return its filename.
130 130
131 131 Existing files will not be overwritten.
132 132 If no filename is specified, a temporary file is created.
133 133 bz2 compression can be turned off.
134 134 The bundle file will be deleted in case of errors.
135 135 """
136 136
137 137 if bundletype == "HG20":
138 138 from . import bundle2
139 139 bundle = bundle2.bundle20(ui)
140 140 bundle.setcompression(compression)
141 141 part = bundle.newpart('changegroup', data=cg.getchunks())
142 142 part.addparam('version', cg.version)
143 143 chunkiter = bundle.getchunks()
144 144 else:
145 145 # compression argument is only for the bundle2 case
146 146 assert compression is None
147 147 if cg.version != '01':
148 148 raise error.Abort(_('old bundle types only supports v1 '
149 149 'changegroups'))
150 150 header, comp = bundletypes[bundletype]
151 151 if comp not in util.compressors:
152 152 raise error.Abort(_('unknown stream compression type: %s')
153 153 % comp)
154 154 z = util.compressors[comp]()
155 155 subchunkiter = cg.getchunks()
156 156 def chunkiter():
157 157 yield header
158 158 for chunk in subchunkiter:
159 159 yield z.compress(chunk)
160 160 yield z.flush()
161 161 chunkiter = chunkiter()
162 162
163 163 # parse the changegroup data, otherwise we will block
164 164 # in case of sshrepo because we don't know the end of the stream
165 165
166 166 # an empty chunkgroup is the end of the changegroup
167 167 # a changegroup has at least 2 chunkgroups (changelog and manifest).
168 168 # after that, an empty chunkgroup is the end of the changegroup
169 169 return writechunks(ui, chunkiter, filename, vfs=vfs)
170 170
171 171 class cg1unpacker(object):
172 172 """Unpacker for cg1 changegroup streams.
173 173
174 174 A changegroup unpacker handles the framing of the revision data in
175 175 the wire format. Most consumers will want to use the apply()
176 176 method to add the changes from the changegroup to a repository.
177 177
178 178 If you're forwarding a changegroup unmodified to another consumer,
179 179 use getchunks(), which returns an iterator of changegroup
180 180 chunks. This is mostly useful for cases where you need to know the
181 181 data stream has ended by observing the end of the changegroup.
182 182
183 183 deltachunk() is useful only if you're applying delta data. Most
184 184 consumers should prefer apply() instead.
185 185
186 186 A few other public methods exist. Those are used only for
187 187 bundlerepo and some debug commands - their use is discouraged.
188 188 """
189 189 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
190 190 deltaheadersize = struct.calcsize(deltaheader)
191 191 version = '01'
192 192 _grouplistcount = 1 # One list of files after the manifests
193 193
194 194 def __init__(self, fh, alg):
195 195 if alg == 'UN':
196 196 alg = None # get more modern without breaking too much
197 197 if not alg in util.decompressors:
198 198 raise error.Abort(_('unknown stream compression type: %s')
199 199 % alg)
200 200 if alg == 'BZ':
201 201 alg = '_truncatedBZ'
202 202 self._stream = util.decompressors[alg](fh)
203 203 self._type = alg
204 204 self.callback = None
205 205
206 206 # These methods (compressed, read, seek, tell) all appear to only
207 207 # be used by bundlerepo, but it's a little hard to tell.
208 208 def compressed(self):
209 209 return self._type is not None
210 210 def read(self, l):
211 211 return self._stream.read(l)
212 212 def seek(self, pos):
213 213 return self._stream.seek(pos)
214 214 def tell(self):
215 215 return self._stream.tell()
216 216 def close(self):
217 217 return self._stream.close()
218 218
219 219 def _chunklength(self):
220 220 d = readexactly(self._stream, 4)
221 221 l = struct.unpack(">l", d)[0]
222 222 if l <= 4:
223 223 if l:
224 224 raise error.Abort(_("invalid chunk length %d") % l)
225 225 return 0
226 226 if self.callback:
227 227 self.callback()
228 228 return l - 4
229 229
230 230 def changelogheader(self):
231 231 """v10 does not have a changelog header chunk"""
232 232 return {}
233 233
234 234 def manifestheader(self):
235 235 """v10 does not have a manifest header chunk"""
236 236 return {}
237 237
238 238 def filelogheader(self):
239 239 """return the header of the filelogs chunk, v10 only has the filename"""
240 240 l = self._chunklength()
241 241 if not l:
242 242 return {}
243 243 fname = readexactly(self._stream, l)
244 244 return {'filename': fname}
245 245
246 246 def _deltaheader(self, headertuple, prevnode):
247 247 node, p1, p2, cs = headertuple
248 248 if prevnode is None:
249 249 deltabase = p1
250 250 else:
251 251 deltabase = prevnode
252 252 flags = 0
253 253 return node, p1, p2, deltabase, cs, flags
254 254
255 255 def deltachunk(self, prevnode):
256 256 l = self._chunklength()
257 257 if not l:
258 258 return {}
259 259 headerdata = readexactly(self._stream, self.deltaheadersize)
260 260 header = struct.unpack(self.deltaheader, headerdata)
261 261 delta = readexactly(self._stream, l - self.deltaheadersize)
262 262 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
263 263 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
264 264 'deltabase': deltabase, 'delta': delta, 'flags': flags}
265 265
266 266 def getchunks(self):
267 267 """returns all the chunks contains in the bundle
268 268
269 269 Used when you need to forward the binary stream to a file or another
270 270 network API. To do so, it parse the changegroup data, otherwise it will
271 271 block in case of sshrepo because it don't know the end of the stream.
272 272 """
273 273 # an empty chunkgroup is the end of the changegroup
274 274 # a changegroup has at least 2 chunkgroups (changelog and manifest).
275 275 # after that, changegroup versions 1 and 2 have a series of groups
276 276 # with one group per file. changegroup 3 has a series of directory
277 277 # manifests before the files.
278 278 count = 0
279 279 emptycount = 0
280 280 while emptycount < self._grouplistcount:
281 281 empty = True
282 282 count += 1
283 283 while True:
284 284 chunk = getchunk(self)
285 285 if not chunk:
286 286 if empty and count > 2:
287 287 emptycount += 1
288 288 break
289 289 empty = False
290 290 yield chunkheader(len(chunk))
291 291 pos = 0
292 292 while pos < len(chunk):
293 293 next = pos + 2**20
294 294 yield chunk[pos:next]
295 295 pos = next
296 296 yield closechunk()
297 297
298 298 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
299 299 # We know that we'll never have more manifests than we had
300 300 # changesets.
301 301 self.callback = prog(_('manifests'), numchanges)
302 302 # no need to check for empty manifest group here:
303 303 # if the result of the merge of 1 and 2 is the same in 3 and 4,
304 304 # no new manifest will be created and the manifest group will
305 305 # be empty during the pull
306 306 self.manifestheader()
307 307 repo.manifest.addgroup(self, revmap, trp)
308 308 repo.ui.progress(_('manifests'), None)
309 309
310 310 def apply(self, repo, srctype, url, emptyok=False,
311 311 targetphase=phases.draft, expectedtotal=None):
312 312 """Add the changegroup returned by source.read() to this repo.
313 313 srctype is a string like 'push', 'pull', or 'unbundle'. url is
314 314 the URL of the repo where this changegroup is coming from.
315 315
316 316 Return an integer summarizing the change to this repo:
317 317 - nothing changed or no source: 0
318 318 - more heads than before: 1+added heads (2..n)
319 319 - fewer heads than before: -1-removed heads (-2..-n)
320 320 - number of heads stays the same: 1
321 321 """
322 322 repo = repo.unfiltered()
323 323 def csmap(x):
324 324 repo.ui.debug("add changeset %s\n" % short(x))
325 325 return len(cl)
326 326
327 327 def revmap(x):
328 328 return cl.rev(x)
329 329
330 330 changesets = files = revisions = 0
331 331
332 332 try:
333 333 with repo.transaction("\n".join([srctype,
334 334 util.hidepassword(url)])) as tr:
335 335 # The transaction could have been created before and already
336 336 # carries source information. In this case we use the top
337 337 # level data. We overwrite the argument because we need to use
338 338 # the top level value (if they exist) in this function.
339 339 srctype = tr.hookargs.setdefault('source', srctype)
340 340 url = tr.hookargs.setdefault('url', url)
341 341 repo.hook('prechangegroup', throw=True, **tr.hookargs)
342 342
343 343 # write changelog data to temp files so concurrent readers
344 344 # will not see an inconsistent view
345 345 cl = repo.changelog
346 346 cl.delayupdate(tr)
347 347 oldheads = cl.heads()
348 348
349 349 trp = weakref.proxy(tr)
350 350 # pull off the changeset group
351 351 repo.ui.status(_("adding changesets\n"))
352 352 clstart = len(cl)
353 353 class prog(object):
354 354 def __init__(self, step, total):
355 355 self._step = step
356 356 self._total = total
357 357 self._count = 1
358 358 def __call__(self):
359 359 repo.ui.progress(self._step, self._count,
360 360 unit=_('chunks'), total=self._total)
361 361 self._count += 1
362 362 self.callback = prog(_('changesets'), expectedtotal)
363 363
364 364 efiles = set()
365 365 def onchangelog(cl, node):
366 366 efiles.update(cl.read(node)[3])
367 367
368 368 self.changelogheader()
369 369 srccontent = cl.addgroup(self, csmap, trp,
370 370 addrevisioncb=onchangelog)
371 371 efiles = len(efiles)
372 372
373 373 if not (srccontent or emptyok):
374 374 raise error.Abort(_("received changelog group is empty"))
375 375 clend = len(cl)
376 376 changesets = clend - clstart
377 377 repo.ui.progress(_('changesets'), None)
378 378
379 379 # pull off the manifest group
380 380 repo.ui.status(_("adding manifests\n"))
381 381 self._unpackmanifests(repo, revmap, trp, prog, changesets)
382 382
383 383 needfiles = {}
384 384 if repo.ui.configbool('server', 'validate', default=False):
385 385 # validate incoming csets have their manifests
386 386 for cset in xrange(clstart, clend):
387 387 mfnode = repo.changelog.read(
388 388 repo.changelog.node(cset))[0]
389 389 mfest = repo.manifest.readdelta(mfnode)
390 390 # store file nodes we must see
391 391 for f, n in mfest.iteritems():
392 392 needfiles.setdefault(f, set()).add(n)
393 393
394 394 # process the files
395 395 repo.ui.status(_("adding file changes\n"))
396 396 self.callback = None
397 397 pr = prog(_('files'), efiles)
398 398 newrevs, newfiles = _addchangegroupfiles(
399 399 repo, self, revmap, trp, pr, needfiles)
400 400 revisions += newrevs
401 401 files += newfiles
402 402
403 403 dh = 0
404 404 if oldheads:
405 405 heads = cl.heads()
406 406 dh = len(heads) - len(oldheads)
407 407 for h in heads:
408 408 if h not in oldheads and repo[h].closesbranch():
409 409 dh -= 1
410 410 htext = ""
411 411 if dh:
412 412 htext = _(" (%+d heads)") % dh
413 413
414 414 repo.ui.status(_("added %d changesets"
415 415 " with %d changes to %d files%s\n")
416 416 % (changesets, revisions, files, htext))
417 417 repo.invalidatevolatilesets()
418 418
419 419 if changesets > 0:
420 420 if 'node' not in tr.hookargs:
421 421 tr.hookargs['node'] = hex(cl.node(clstart))
422 422 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
423 423 hookargs = dict(tr.hookargs)
424 424 else:
425 425 hookargs = dict(tr.hookargs)
426 426 hookargs['node'] = hex(cl.node(clstart))
427 427 hookargs['node_last'] = hex(cl.node(clend - 1))
428 428 repo.hook('pretxnchangegroup', throw=True, **hookargs)
429 429
430 430 added = [cl.node(r) for r in xrange(clstart, clend)]
431 431 publishing = repo.publishing()
432 432 if srctype in ('push', 'serve'):
433 433 # Old servers can not push the boundary themselves.
434 434 # New servers won't push the boundary if changeset already
435 435 # exists locally as secret
436 436 #
437 437 # We should not use added here but the list of all change in
438 438 # the bundle
439 439 if publishing:
440 440 phases.advanceboundary(repo, tr, phases.public,
441 441 srccontent)
442 442 else:
443 443 # Those changesets have been pushed from the
444 444 # outside, their phases are going to be pushed
445 445 # alongside. Therefor `targetphase` is
446 446 # ignored.
447 447 phases.advanceboundary(repo, tr, phases.draft,
448 448 srccontent)
449 449 phases.retractboundary(repo, tr, phases.draft, added)
450 450 elif srctype != 'strip':
451 451 # publishing only alter behavior during push
452 452 #
453 453 # strip should not touch boundary at all
454 454 phases.retractboundary(repo, tr, targetphase, added)
455 455
456 456 if changesets > 0:
457 457 if srctype != 'strip':
458 458 # During strip, branchcache is invalid but
459 459 # coming call to `destroyed` will repair it.
460 460 # In other case we can safely update cache on
461 461 # disk.
462 462 branchmap.updatecache(repo.filtered('served'))
463 463
464 464 def runhooks():
465 465 # These hooks run when the lock releases, not when the
466 466 # transaction closes. So it's possible for the changelog
467 467 # to have changed since we last saw it.
468 468 if clstart >= len(repo):
469 469 return
470 470
471 471 # forcefully update the on-disk branch cache
472 472 repo.ui.debug("updating the branch cache\n")
473 473 repo.hook("changegroup", **hookargs)
474 474
475 475 for n in added:
476 476 args = hookargs.copy()
477 477 args['node'] = hex(n)
478 478 del args['node_last']
479 479 repo.hook("incoming", **args)
480 480
481 481 newheads = [h for h in repo.heads()
482 482 if h not in oldheads]
483 483 repo.ui.log("incoming",
484 484 "%s incoming changes - new heads: %s\n",
485 485 len(added),
486 486 ', '.join([hex(c[:6]) for c in newheads]))
487 487
488 488 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
489 489 lambda tr: repo._afterlock(runhooks))
490 490 finally:
491 491 repo.ui.flush()
492 492 # never return 0 here:
493 493 if dh < 0:
494 494 return dh - 1
495 495 else:
496 496 return dh + 1
497 497
498 498 class cg2unpacker(cg1unpacker):
499 499 """Unpacker for cg2 streams.
500 500
501 501 cg2 streams add support for generaldelta, so the delta header
502 502 format is slightly different. All other features about the data
503 503 remain the same.
504 504 """
505 505 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
506 506 deltaheadersize = struct.calcsize(deltaheader)
507 507 version = '02'
508 508
509 509 def _deltaheader(self, headertuple, prevnode):
510 510 node, p1, p2, deltabase, cs = headertuple
511 511 flags = 0
512 512 return node, p1, p2, deltabase, cs, flags
513 513
514 514 class cg3unpacker(cg2unpacker):
515 515 """Unpacker for cg3 streams.
516 516
517 517 cg3 streams add support for exchanging treemanifests and revlog
518 518 flags. It adds the revlog flags to the delta header and an empty chunk
519 519 separating manifests and files.
520 520 """
521 521 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
522 522 deltaheadersize = struct.calcsize(deltaheader)
523 523 version = '03'
524 524 _grouplistcount = 2 # One list of manifests and one list of files
525 525
526 526 def _deltaheader(self, headertuple, prevnode):
527 527 node, p1, p2, deltabase, cs, flags = headertuple
528 528 return node, p1, p2, deltabase, cs, flags
529 529
530 530 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
531 531 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog,
532 532 numchanges)
533 533 while True:
534 534 chunkdata = self.filelogheader()
535 535 if not chunkdata:
536 536 break
537 537 # If we get here, there are directory manifests in the changegroup
538 538 d = chunkdata["filename"]
539 539 repo.ui.debug("adding %s revisions\n" % d)
540 540 dirlog = repo.manifest.dirlog(d)
541 541 if not dirlog.addgroup(self, revmap, trp):
542 542 raise error.Abort(_("received dir revlog group is empty"))
543 543
544 544 class headerlessfixup(object):
545 545 def __init__(self, fh, h):
546 546 self._h = h
547 547 self._fh = fh
548 548 def read(self, n):
549 549 if self._h:
550 550 d, self._h = self._h[:n], self._h[n:]
551 551 if len(d) < n:
552 552 d += readexactly(self._fh, n - len(d))
553 553 return d
554 554 return readexactly(self._fh, n)
555 555
556 556 class cg1packer(object):
557 557 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
558 558 version = '01'
559 559 def __init__(self, repo, bundlecaps=None):
560 560 """Given a source repo, construct a bundler.
561 561
562 562 bundlecaps is optional and can be used to specify the set of
563 563 capabilities which can be used to build the bundle.
564 564 """
565 565 # Set of capabilities we can use to build the bundle.
566 566 if bundlecaps is None:
567 567 bundlecaps = set()
568 568 self._bundlecaps = bundlecaps
569 569 # experimental config: bundle.reorder
570 570 reorder = repo.ui.config('bundle', 'reorder', 'auto')
571 571 if reorder == 'auto':
572 572 reorder = None
573 573 else:
574 574 reorder = util.parsebool(reorder)
575 575 self._repo = repo
576 576 self._reorder = reorder
577 577 self._progress = repo.ui.progress
578 578 if self._repo.ui.verbose and not self._repo.ui.debugflag:
579 579 self._verbosenote = self._repo.ui.note
580 580 else:
581 581 self._verbosenote = lambda s: None
582 582
583 583 def close(self):
584 584 return closechunk()
585 585
586 586 def fileheader(self, fname):
587 587 return chunkheader(len(fname)) + fname
588 588
589 589 def group(self, nodelist, revlog, lookup, units=None):
590 590 """Calculate a delta group, yielding a sequence of changegroup chunks
591 591 (strings).
592 592
593 593 Given a list of changeset revs, return a set of deltas and
594 594 metadata corresponding to nodes. The first delta is
595 595 first parent(nodelist[0]) -> nodelist[0], the receiver is
596 596 guaranteed to have this parent as it has all history before
597 597 these changesets. In the case firstparent is nullrev the
598 598 changegroup starts with a full revision.
599 599
600 600 If units is not None, progress detail will be generated, units specifies
601 601 the type of revlog that is touched (changelog, manifest, etc.).
602 602 """
603 603 # if we don't have any revisions touched by these changesets, bail
604 604 if len(nodelist) == 0:
605 605 yield self.close()
606 606 return
607 607
608 608 # for generaldelta revlogs, we linearize the revs; this will both be
609 609 # much quicker and generate a much smaller bundle
610 610 if (revlog._generaldelta and self._reorder is None) or self._reorder:
611 611 dag = dagutil.revlogdag(revlog)
612 612 revs = set(revlog.rev(n) for n in nodelist)
613 613 revs = dag.linearize(revs)
614 614 else:
615 615 revs = sorted([revlog.rev(n) for n in nodelist])
616 616
617 617 # add the parent of the first rev
618 618 p = revlog.parentrevs(revs[0])[0]
619 619 revs.insert(0, p)
620 620
621 621 # build deltas
622 622 total = len(revs) - 1
623 623 msgbundling = _('bundling')
624 624 for r in xrange(len(revs) - 1):
625 625 if units is not None:
626 626 self._progress(msgbundling, r + 1, unit=units, total=total)
627 627 prev, curr = revs[r], revs[r + 1]
628 628 linknode = lookup(revlog.node(curr))
629 629 for c in self.revchunk(revlog, curr, prev, linknode):
630 630 yield c
631 631
632 632 if units is not None:
633 633 self._progress(msgbundling, None)
634 634 yield self.close()
635 635
636 636 # filter any nodes that claim to be part of the known set
637 637 def prune(self, revlog, missing, commonrevs):
638 638 rr, rl = revlog.rev, revlog.linkrev
639 639 return [n for n in missing if rl(rr(n)) not in commonrevs]
640 640
641 641 def _packmanifests(self, dir, mfnodes, lookuplinknode):
642 642 """Pack flat manifests into a changegroup stream."""
643 643 assert not dir
644 644 for chunk in self.group(mfnodes, self._repo.manifest,
645 645 lookuplinknode, units=_('manifests')):
646 646 yield chunk
647 647
648 648 def _manifestsdone(self):
649 649 return ''
650 650
651 651 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
652 652 '''yield a sequence of changegroup chunks (strings)'''
653 653 repo = self._repo
654 654 cl = repo.changelog
655 655
656 656 clrevorder = {}
657 657 mfs = {} # needed manifests
658 658 fnodes = {} # needed file nodes
659 # maps manifest node id -> set(changed files)
660 mfchangedfiles = {}
659 changedfiles = set()
661 660
662 661 # Callback for the changelog, used to collect changed files and manifest
663 662 # nodes.
664 663 # Returns the linkrev node (identity in the changelog case).
665 664 def lookupcl(x):
666 665 c = cl.read(x)
667 666 clrevorder[x] = len(clrevorder)
668 667 n = c[0]
669 668 # record the first changeset introducing this manifest version
670 669 mfs.setdefault(n, x)
671 670 # Record a complete list of potentially-changed files in
672 671 # this manifest.
673 mfchangedfiles.setdefault(n, set()).update(c[3])
672 changedfiles.update(c[3])
674 673 return x
675 674
676 675 self._verbosenote(_('uncompressed size of bundle content:\n'))
677 676 size = 0
678 677 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
679 678 size += len(chunk)
680 679 yield chunk
681 680 self._verbosenote(_('%8.i (changelog)\n') % size)
682 681
683 682 # We need to make sure that the linkrev in the changegroup refers to
684 683 # the first changeset that introduced the manifest or file revision.
685 684 # The fastpath is usually safer than the slowpath, because the filelogs
686 685 # are walked in revlog order.
687 686 #
688 687 # When taking the slowpath with reorder=None and the manifest revlog
689 688 # uses generaldelta, the manifest may be walked in the "wrong" order.
690 689 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
691 690 # cc0ff93d0c0c).
692 691 #
693 692 # When taking the fastpath, we are only vulnerable to reordering
694 693 # of the changelog itself. The changelog never uses generaldelta, so
695 694 # it is only reordered when reorder=True. To handle this case, we
696 695 # simply take the slowpath, which already has the 'clrevorder' logic.
697 696 # This was also fixed in cc0ff93d0c0c.
698 697 fastpathlinkrev = fastpathlinkrev and not self._reorder
699 698 # Treemanifests don't work correctly with fastpathlinkrev
700 699 # either, because we don't discover which directory nodes to
701 700 # send along with files. This could probably be fixed.
702 701 fastpathlinkrev = fastpathlinkrev and (
703 702 'treemanifest' not in repo.requirements)
704 703
705 704 for chunk in self.generatemanifests(commonrevs, clrevorder,
706 fastpathlinkrev, mfs, mfchangedfiles, fnodes):
705 fastpathlinkrev, mfs, fnodes):
707 706 yield chunk
708 707 mfs.clear()
709 708 clrevs = set(cl.rev(x) for x in clnodes)
710 709
711 710 if not fastpathlinkrev:
712 711 def linknodes(unused, fname):
713 712 return fnodes.get(fname, {})
714 713 else:
715 714 cln = cl.node
716 715 def linknodes(filerevlog, fname):
717 716 llr = filerevlog.linkrev
718 717 fln = filerevlog.node
719 718 revs = ((r, llr(r)) for r in filerevlog)
720 719 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
721 720
722 changedfiles = set()
723 for x in mfchangedfiles.itervalues():
724 changedfiles.update(x)
725 721 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
726 722 source):
727 723 yield chunk
728 724
729 725 yield self.close()
730 726
731 727 if clnodes:
732 728 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
733 729
734 730 def generatemanifests(self, commonrevs, clrevorder, fastpathlinkrev, mfs,
735 mfchangedfiles, fnodes):
731 fnodes):
736 732 repo = self._repo
737 733 dirlog = repo.manifest.dirlog
738 734 tmfnodes = {'': mfs}
739 735
740 736 # Callback for the manifest, used to collect linkrevs for filelog
741 737 # revisions.
742 738 # Returns the linkrev node (collected in lookupcl).
743 739 def makelookupmflinknode(dir):
744 740 if fastpathlinkrev:
745 741 assert not dir
746 742 return mfs.__getitem__
747 743
748 744 def lookupmflinknode(x):
749 745 """Callback for looking up the linknode for manifests.
750 746
751 747 Returns the linkrev node for the specified manifest.
752 748
753 749 SIDE EFFECT:
754 750
755 751 1) fclnodes gets populated with the list of relevant
756 752 file nodes if we're not using fastpathlinkrev
757 753 2) When treemanifests are in use, collects treemanifest nodes
758 754 to send
759 755
760 756 Note that this means manifests must be completely sent to
761 757 the client before you can trust the list of files and
762 758 treemanifests to send.
763 759 """
764 760 clnode = tmfnodes[dir][x]
765 761 mdata = dirlog(dir).readshallowfast(x)
766 if 'treemanifest' in repo.requirements:
767 for p, n, fl in mdata.iterentries():
768 if fl == 't': # subdirectory manifest
769 subdir = dir + p + '/'
770 tmfclnodes = tmfnodes.setdefault(subdir, {})
771 tmfclnode = tmfclnodes.setdefault(n, clnode)
772 if clrevorder[clnode] < clrevorder[tmfclnode]:
773 tmfclnodes[n] = clnode
774 else:
775 f = dir + p
776 fclnodes = fnodes.setdefault(f, {})
777 fclnode = fclnodes.setdefault(n, clnode)
778 if clrevorder[clnode] < clrevorder[fclnode]:
779 fclnodes[n] = clnode
780 else:
781 for f in mfchangedfiles[x]:
782 try:
783 n = mdata[f]
784 except KeyError:
785 continue
786 # record the first changeset introducing this filelog
787 # version
762 for p, n, fl in mdata.iterentries():
763 if fl == 't': # subdirectory manifest
764 subdir = dir + p + '/'
765 tmfclnodes = tmfnodes.setdefault(subdir, {})
766 tmfclnode = tmfclnodes.setdefault(n, clnode)
767 if clrevorder[clnode] < clrevorder[tmfclnode]:
768 tmfclnodes[n] = clnode
769 else:
770 f = dir + p
788 771 fclnodes = fnodes.setdefault(f, {})
789 772 fclnode = fclnodes.setdefault(n, clnode)
790 773 if clrevorder[clnode] < clrevorder[fclnode]:
791 774 fclnodes[n] = clnode
792 775 return clnode
793 776 return lookupmflinknode
794 777
795 778 size = 0
796 779 while tmfnodes:
797 780 dir = min(tmfnodes)
798 781 nodes = tmfnodes[dir]
799 782 prunednodes = self.prune(dirlog(dir), nodes, commonrevs)
800 783 for x in self._packmanifests(dir, prunednodes,
801 784 makelookupmflinknode(dir)):
802 785 size += len(x)
803 786 yield x
804 787 del tmfnodes[dir]
805 788 self._verbosenote(_('%8.i (manifests)\n') % size)
806 789 yield self._manifestsdone()
807 790
808 791 # The 'source' parameter is useful for extensions
809 792 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
810 793 repo = self._repo
811 794 progress = self._progress
812 795 msgbundling = _('bundling')
813 796
814 797 total = len(changedfiles)
815 798 # for progress output
816 799 msgfiles = _('files')
817 800 for i, fname in enumerate(sorted(changedfiles)):
818 801 filerevlog = repo.file(fname)
819 802 if not filerevlog:
820 803 raise error.Abort(_("empty or missing revlog for %s") % fname)
821 804
822 805 linkrevnodes = linknodes(filerevlog, fname)
823 806 # Lookup for filenodes, we collected the linkrev nodes above in the
824 807 # fastpath case and with lookupmf in the slowpath case.
825 808 def lookupfilelog(x):
826 809 return linkrevnodes[x]
827 810
828 811 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
829 812 if filenodes:
830 813 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
831 814 total=total)
832 815 h = self.fileheader(fname)
833 816 size = len(h)
834 817 yield h
835 818 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
836 819 size += len(chunk)
837 820 yield chunk
838 821 self._verbosenote(_('%8.i %s\n') % (size, fname))
839 822 progress(msgbundling, None)
840 823
841 824 def deltaparent(self, revlog, rev, p1, p2, prev):
842 825 return prev
843 826
844 827 def revchunk(self, revlog, rev, prev, linknode):
845 828 node = revlog.node(rev)
846 829 p1, p2 = revlog.parentrevs(rev)
847 830 base = self.deltaparent(revlog, rev, p1, p2, prev)
848 831
849 832 prefix = ''
850 833 if revlog.iscensored(base) or revlog.iscensored(rev):
851 834 try:
852 835 delta = revlog.revision(node)
853 836 except error.CensoredNodeError as e:
854 837 delta = e.tombstone
855 838 if base == nullrev:
856 839 prefix = mdiff.trivialdiffheader(len(delta))
857 840 else:
858 841 baselen = revlog.rawsize(base)
859 842 prefix = mdiff.replacediffheader(baselen, len(delta))
860 843 elif base == nullrev:
861 844 delta = revlog.revision(node)
862 845 prefix = mdiff.trivialdiffheader(len(delta))
863 846 else:
864 847 delta = revlog.revdiff(base, rev)
865 848 p1n, p2n = revlog.parents(node)
866 849 basenode = revlog.node(base)
867 850 flags = revlog.flags(rev)
868 851 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
869 852 meta += prefix
870 853 l = len(meta) + len(delta)
871 854 yield chunkheader(l)
872 855 yield meta
873 856 yield delta
874 857 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
875 858 # do nothing with basenode, it is implicitly the previous one in HG10
876 859 # do nothing with flags, it is implicitly 0 for cg1 and cg2
877 860 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
878 861
879 862 class cg2packer(cg1packer):
880 863 version = '02'
881 864 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
882 865
883 866 def __init__(self, repo, bundlecaps=None):
884 867 super(cg2packer, self).__init__(repo, bundlecaps)
885 868 if self._reorder is None:
886 869 # Since generaldelta is directly supported by cg2, reordering
887 870 # generally doesn't help, so we disable it by default (treating
888 871 # bundle.reorder=auto just like bundle.reorder=False).
889 872 self._reorder = False
890 873
891 874 def deltaparent(self, revlog, rev, p1, p2, prev):
892 875 dp = revlog.deltaparent(rev)
893 876 # avoid storing full revisions; pick prev in those cases
894 877 # also pick prev when we can't be sure remote has dp
895 878 if dp == nullrev or (dp != p1 and dp != p2 and dp != prev):
896 879 return prev
897 880 return dp
898 881
899 882 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
900 883 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
901 884 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
902 885
903 886 class cg3packer(cg2packer):
904 887 version = '03'
905 888 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
906 889
907 890 def _packmanifests(self, dir, mfnodes, lookuplinknode):
908 891 if dir:
909 892 yield self.fileheader(dir)
910 893 for chunk in self.group(mfnodes, self._repo.manifest.dirlog(dir),
911 894 lookuplinknode, units=_('manifests')):
912 895 yield chunk
913 896
914 897 def _manifestsdone(self):
915 898 return self.close()
916 899
917 900 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
918 901 return struct.pack(
919 902 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
920 903
921 904 _packermap = {'01': (cg1packer, cg1unpacker),
922 905 # cg2 adds support for exchanging generaldelta
923 906 '02': (cg2packer, cg2unpacker),
924 907 # cg3 adds support for exchanging revlog flags and treemanifests
925 908 '03': (cg3packer, cg3unpacker),
926 909 }
927 910
928 911 def allsupportedversions(ui):
929 912 versions = set(_packermap.keys())
930 913 versions.discard('03')
931 914 if (ui.configbool('experimental', 'changegroup3') or
932 915 ui.configbool('experimental', 'treemanifest')):
933 916 versions.add('03')
934 917 return versions
935 918
936 919 # Changegroup versions that can be applied to the repo
937 920 def supportedincomingversions(repo):
938 921 versions = allsupportedversions(repo.ui)
939 922 if 'treemanifest' in repo.requirements:
940 923 versions.add('03')
941 924 return versions
942 925
943 926 # Changegroup versions that can be created from the repo
944 927 def supportedoutgoingversions(repo):
945 928 versions = allsupportedversions(repo.ui)
946 929 if 'treemanifest' in repo.requirements:
947 930 # Versions 01 and 02 support only flat manifests and it's just too
948 931 # expensive to convert between the flat manifest and tree manifest on
949 932 # the fly. Since tree manifests are hashed differently, all of history
950 933 # would have to be converted. Instead, we simply don't even pretend to
951 934 # support versions 01 and 02.
952 935 versions.discard('01')
953 936 versions.discard('02')
954 937 versions.add('03')
955 938 return versions
956 939
957 940 def safeversion(repo):
958 941 # Finds the smallest version that it's safe to assume clients of the repo
959 942 # will support. For example, all hg versions that support generaldelta also
960 943 # support changegroup 02.
961 944 versions = supportedoutgoingversions(repo)
962 945 if 'generaldelta' in repo.requirements:
963 946 versions.discard('01')
964 947 assert versions
965 948 return min(versions)
966 949
967 950 def getbundler(version, repo, bundlecaps=None):
968 951 assert version in supportedoutgoingversions(repo)
969 952 return _packermap[version][0](repo, bundlecaps)
970 953
971 954 def getunbundler(version, fh, alg):
972 955 return _packermap[version][1](fh, alg)
973 956
974 957 def _changegroupinfo(repo, nodes, source):
975 958 if repo.ui.verbose or source == 'bundle':
976 959 repo.ui.status(_("%d changesets found\n") % len(nodes))
977 960 if repo.ui.debugflag:
978 961 repo.ui.debug("list of changesets:\n")
979 962 for node in nodes:
980 963 repo.ui.debug("%s\n" % hex(node))
981 964
982 965 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
983 966 repo = repo.unfiltered()
984 967 commonrevs = outgoing.common
985 968 csets = outgoing.missing
986 969 heads = outgoing.missingheads
987 970 # We go through the fast path if we get told to, or if all (unfiltered
988 971 # heads have been requested (since we then know there all linkrevs will
989 972 # be pulled by the client).
990 973 heads.sort()
991 974 fastpathlinkrev = fastpath or (
992 975 repo.filtername is None and heads == sorted(repo.heads()))
993 976
994 977 repo.hook('preoutgoing', throw=True, source=source)
995 978 _changegroupinfo(repo, csets, source)
996 979 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
997 980
998 981 def getsubset(repo, outgoing, bundler, source, fastpath=False):
999 982 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
1000 983 return getunbundler(bundler.version, util.chunkbuffer(gengroup), None)
1001 984
1002 985 def changegroupsubset(repo, roots, heads, source, version='01'):
1003 986 """Compute a changegroup consisting of all the nodes that are
1004 987 descendants of any of the roots and ancestors of any of the heads.
1005 988 Return a chunkbuffer object whose read() method will return
1006 989 successive changegroup chunks.
1007 990
1008 991 It is fairly complex as determining which filenodes and which
1009 992 manifest nodes need to be included for the changeset to be complete
1010 993 is non-trivial.
1011 994
1012 995 Another wrinkle is doing the reverse, figuring out which changeset in
1013 996 the changegroup a particular filenode or manifestnode belongs to.
1014 997 """
1015 998 cl = repo.changelog
1016 999 if not roots:
1017 1000 roots = [nullid]
1018 1001 discbases = []
1019 1002 for n in roots:
1020 1003 discbases.extend([p for p in cl.parents(n) if p != nullid])
1021 1004 # TODO: remove call to nodesbetween.
1022 1005 csets, roots, heads = cl.nodesbetween(roots, heads)
1023 1006 included = set(csets)
1024 1007 discbases = [n for n in discbases if n not in included]
1025 1008 outgoing = discovery.outgoing(cl, discbases, heads)
1026 1009 bundler = getbundler(version, repo)
1027 1010 return getsubset(repo, outgoing, bundler, source)
1028 1011
1029 1012 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
1030 1013 version='01'):
1031 1014 """Like getbundle, but taking a discovery.outgoing as an argument.
1032 1015
1033 1016 This is only implemented for local repos and reuses potentially
1034 1017 precomputed sets in outgoing. Returns a raw changegroup generator."""
1035 1018 if not outgoing.missing:
1036 1019 return None
1037 1020 bundler = getbundler(version, repo, bundlecaps)
1038 1021 return getsubsetraw(repo, outgoing, bundler, source)
1039 1022
1040 1023 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None,
1041 1024 version='01'):
1042 1025 """Like getbundle, but taking a discovery.outgoing as an argument.
1043 1026
1044 1027 This is only implemented for local repos and reuses potentially
1045 1028 precomputed sets in outgoing."""
1046 1029 if not outgoing.missing:
1047 1030 return None
1048 1031 bundler = getbundler(version, repo, bundlecaps)
1049 1032 return getsubset(repo, outgoing, bundler, source)
1050 1033
1051 1034 def computeoutgoing(repo, heads, common):
1052 1035 """Computes which revs are outgoing given a set of common
1053 1036 and a set of heads.
1054 1037
1055 1038 This is a separate function so extensions can have access to
1056 1039 the logic.
1057 1040
1058 1041 Returns a discovery.outgoing object.
1059 1042 """
1060 1043 cl = repo.changelog
1061 1044 if common:
1062 1045 hasnode = cl.hasnode
1063 1046 common = [n for n in common if hasnode(n)]
1064 1047 else:
1065 1048 common = [nullid]
1066 1049 if not heads:
1067 1050 heads = cl.heads()
1068 1051 return discovery.outgoing(cl, common, heads)
1069 1052
1070 1053 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None,
1071 1054 version='01'):
1072 1055 """Like changegroupsubset, but returns the set difference between the
1073 1056 ancestors of heads and the ancestors common.
1074 1057
1075 1058 If heads is None, use the local heads. If common is None, use [nullid].
1076 1059
1077 1060 The nodes in common might not all be known locally due to the way the
1078 1061 current discovery protocol works.
1079 1062 """
1080 1063 outgoing = computeoutgoing(repo, heads, common)
1081 1064 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps,
1082 1065 version=version)
1083 1066
1084 1067 def changegroup(repo, basenodes, source):
1085 1068 # to avoid a race we use changegroupsubset() (issue1320)
1086 1069 return changegroupsubset(repo, basenodes, repo.heads(), source)
1087 1070
1088 1071 def _addchangegroupfiles(repo, source, revmap, trp, pr, needfiles):
1089 1072 revisions = 0
1090 1073 files = 0
1091 1074 while True:
1092 1075 chunkdata = source.filelogheader()
1093 1076 if not chunkdata:
1094 1077 break
1095 1078 f = chunkdata["filename"]
1096 1079 repo.ui.debug("adding %s revisions\n" % f)
1097 1080 pr()
1098 1081 fl = repo.file(f)
1099 1082 o = len(fl)
1100 1083 try:
1101 1084 if not fl.addgroup(source, revmap, trp):
1102 1085 raise error.Abort(_("received file revlog group is empty"))
1103 1086 except error.CensoredBaseError as e:
1104 1087 raise error.Abort(_("received delta base is censored: %s") % e)
1105 1088 revisions += len(fl) - o
1106 1089 files += 1
1107 1090 if f in needfiles:
1108 1091 needs = needfiles[f]
1109 1092 for new in xrange(o, len(fl)):
1110 1093 n = fl.node(new)
1111 1094 if n in needs:
1112 1095 needs.remove(n)
1113 1096 else:
1114 1097 raise error.Abort(
1115 1098 _("received spurious file revlog entry"))
1116 1099 if not needs:
1117 1100 del needfiles[f]
1118 1101 repo.ui.progress(_('files'), None)
1119 1102
1120 1103 for f, needs in needfiles.iteritems():
1121 1104 fl = repo.file(f)
1122 1105 for n in needs:
1123 1106 try:
1124 1107 fl.rev(n)
1125 1108 except error.LookupError:
1126 1109 raise error.Abort(
1127 1110 _('missing file data for %s:%s - run hg verify') %
1128 1111 (f, hex(n)))
1129 1112
1130 1113 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now