##// END OF EJS Templates
changegroup3: move treemanifest support into _unpackmanifests()...
Martin von Zweigbergk -
r27754:a09f143d default
parent child Browse files
Show More
@@ -1,1107 +1,1111 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullid,
19 19 nullrev,
20 20 short,
21 21 )
22 22
23 23 from . import (
24 24 branchmap,
25 25 dagutil,
26 26 discovery,
27 27 error,
28 28 mdiff,
29 29 phases,
30 30 util,
31 31 )
32 32
33 33 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
34 34 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
35 35 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
36 36
37 37 def readexactly(stream, n):
38 38 '''read n bytes from stream.read and abort if less was available'''
39 39 s = stream.read(n)
40 40 if len(s) < n:
41 41 raise error.Abort(_("stream ended unexpectedly"
42 42 " (got %d bytes, expected %d)")
43 43 % (len(s), n))
44 44 return s
45 45
46 46 def getchunk(stream):
47 47 """return the next chunk from stream as a string"""
48 48 d = readexactly(stream, 4)
49 49 l = struct.unpack(">l", d)[0]
50 50 if l <= 4:
51 51 if l:
52 52 raise error.Abort(_("invalid chunk length %d") % l)
53 53 return ""
54 54 return readexactly(stream, l - 4)
55 55
56 56 def chunkheader(length):
57 57 """return a changegroup chunk header (string)"""
58 58 return struct.pack(">l", length + 4)
59 59
60 60 def closechunk():
61 61 """return a changegroup chunk header (string) for a zero-length chunk"""
62 62 return struct.pack(">l", 0)
63 63
64 64 def combineresults(results):
65 65 """logic to combine 0 or more addchangegroup results into one"""
66 66 changedheads = 0
67 67 result = 1
68 68 for ret in results:
69 69 # If any changegroup result is 0, return 0
70 70 if ret == 0:
71 71 result = 0
72 72 break
73 73 if ret < -1:
74 74 changedheads += ret + 1
75 75 elif ret > 1:
76 76 changedheads += ret - 1
77 77 if changedheads > 0:
78 78 result = 1 + changedheads
79 79 elif changedheads < 0:
80 80 result = -1 + changedheads
81 81 return result
82 82
83 83 bundletypes = {
84 84 "": ("", None), # only when using unbundle on ssh and old http servers
85 85 # since the unification ssh accepts a header but there
86 86 # is no capability signaling it.
87 87 "HG20": (), # special-cased below
88 88 "HG10UN": ("HG10UN", None),
89 89 "HG10BZ": ("HG10", 'BZ'),
90 90 "HG10GZ": ("HG10GZ", 'GZ'),
91 91 }
92 92
93 93 # hgweb uses this list to communicate its preferred type
94 94 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
95 95
96 96 def writechunks(ui, chunks, filename, vfs=None):
97 97 """Write chunks to a file and return its filename.
98 98
99 99 The stream is assumed to be a bundle file.
100 100 Existing files will not be overwritten.
101 101 If no filename is specified, a temporary file is created.
102 102 """
103 103 fh = None
104 104 cleanup = None
105 105 try:
106 106 if filename:
107 107 if vfs:
108 108 fh = vfs.open(filename, "wb")
109 109 else:
110 110 fh = open(filename, "wb")
111 111 else:
112 112 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
113 113 fh = os.fdopen(fd, "wb")
114 114 cleanup = filename
115 115 for c in chunks:
116 116 fh.write(c)
117 117 cleanup = None
118 118 return filename
119 119 finally:
120 120 if fh is not None:
121 121 fh.close()
122 122 if cleanup is not None:
123 123 if filename and vfs:
124 124 vfs.unlink(cleanup)
125 125 else:
126 126 os.unlink(cleanup)
127 127
128 128 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
129 129 """Write a bundle file and return its filename.
130 130
131 131 Existing files will not be overwritten.
132 132 If no filename is specified, a temporary file is created.
133 133 bz2 compression can be turned off.
134 134 The bundle file will be deleted in case of errors.
135 135 """
136 136
137 137 if bundletype == "HG20":
138 138 from . import bundle2
139 139 bundle = bundle2.bundle20(ui)
140 140 bundle.setcompression(compression)
141 141 part = bundle.newpart('changegroup', data=cg.getchunks())
142 142 part.addparam('version', cg.version)
143 143 chunkiter = bundle.getchunks()
144 144 else:
145 145 # compression argument is only for the bundle2 case
146 146 assert compression is None
147 147 if cg.version != '01':
148 148 raise error.Abort(_('old bundle types only supports v1 '
149 149 'changegroups'))
150 150 header, comp = bundletypes[bundletype]
151 151 if comp not in util.compressors:
152 152 raise error.Abort(_('unknown stream compression type: %s')
153 153 % comp)
154 154 z = util.compressors[comp]()
155 155 subchunkiter = cg.getchunks()
156 156 def chunkiter():
157 157 yield header
158 158 for chunk in subchunkiter:
159 159 yield z.compress(chunk)
160 160 yield z.flush()
161 161 chunkiter = chunkiter()
162 162
163 163 # parse the changegroup data, otherwise we will block
164 164 # in case of sshrepo because we don't know the end of the stream
165 165
166 166 # an empty chunkgroup is the end of the changegroup
167 167 # a changegroup has at least 2 chunkgroups (changelog and manifest).
168 168 # after that, an empty chunkgroup is the end of the changegroup
169 169 return writechunks(ui, chunkiter, filename, vfs=vfs)
170 170
171 171 class cg1unpacker(object):
172 172 """Unpacker for cg1 changegroup streams.
173 173
174 174 A changegroup unpacker handles the framing of the revision data in
175 175 the wire format. Most consumers will want to use the apply()
176 176 method to add the changes from the changegroup to a repository.
177 177
178 178 If you're forwarding a changegroup unmodified to another consumer,
179 179 use getchunks(), which returns an iterator of changegroup
180 180 chunks. This is mostly useful for cases where you need to know the
181 181 data stream has ended by observing the end of the changegroup.
182 182
183 183 deltachunk() is useful only if you're applying delta data. Most
184 184 consumers should prefer apply() instead.
185 185
186 186 A few other public methods exist. Those are used only for
187 187 bundlerepo and some debug commands - their use is discouraged.
188 188 """
189 189 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
190 190 deltaheadersize = struct.calcsize(deltaheader)
191 191 version = '01'
192 192 def __init__(self, fh, alg):
193 193 if alg == 'UN':
194 194 alg = None # get more modern without breaking too much
195 195 if not alg in util.decompressors:
196 196 raise error.Abort(_('unknown stream compression type: %s')
197 197 % alg)
198 198 if alg == 'BZ':
199 199 alg = '_truncatedBZ'
200 200 self._stream = util.decompressors[alg](fh)
201 201 self._type = alg
202 202 self.callback = None
203 203
204 204 # These methods (compressed, read, seek, tell) all appear to only
205 205 # be used by bundlerepo, but it's a little hard to tell.
206 206 def compressed(self):
207 207 return self._type is not None
208 208 def read(self, l):
209 209 return self._stream.read(l)
210 210 def seek(self, pos):
211 211 return self._stream.seek(pos)
212 212 def tell(self):
213 213 return self._stream.tell()
214 214 def close(self):
215 215 return self._stream.close()
216 216
217 217 def _chunklength(self):
218 218 d = readexactly(self._stream, 4)
219 219 l = struct.unpack(">l", d)[0]
220 220 if l <= 4:
221 221 if l:
222 222 raise error.Abort(_("invalid chunk length %d") % l)
223 223 return 0
224 224 if self.callback:
225 225 self.callback()
226 226 return l - 4
227 227
228 228 def changelogheader(self):
229 229 """v10 does not have a changelog header chunk"""
230 230 return {}
231 231
232 232 def manifestheader(self):
233 233 """v10 does not have a manifest header chunk"""
234 234 return {}
235 235
236 236 def filelogheader(self):
237 237 """return the header of the filelogs chunk, v10 only has the filename"""
238 238 l = self._chunklength()
239 239 if not l:
240 240 return {}
241 241 fname = readexactly(self._stream, l)
242 242 return {'filename': fname}
243 243
244 244 def _deltaheader(self, headertuple, prevnode):
245 245 node, p1, p2, cs = headertuple
246 246 if prevnode is None:
247 247 deltabase = p1
248 248 else:
249 249 deltabase = prevnode
250 250 flags = 0
251 251 return node, p1, p2, deltabase, cs, flags
252 252
253 253 def deltachunk(self, prevnode):
254 254 l = self._chunklength()
255 255 if not l:
256 256 return {}
257 257 headerdata = readexactly(self._stream, self.deltaheadersize)
258 258 header = struct.unpack(self.deltaheader, headerdata)
259 259 delta = readexactly(self._stream, l - self.deltaheadersize)
260 260 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
261 261 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
262 262 'deltabase': deltabase, 'delta': delta, 'flags': flags}
263 263
264 264 def getchunks(self):
265 265 """returns all the chunks contains in the bundle
266 266
267 267 Used when you need to forward the binary stream to a file or another
268 268 network API. To do so, it parse the changegroup data, otherwise it will
269 269 block in case of sshrepo because it don't know the end of the stream.
270 270 """
271 271 # an empty chunkgroup is the end of the changegroup
272 272 # a changegroup has at least 2 chunkgroups (changelog and manifest).
273 273 # after that, an empty chunkgroup is the end of the changegroup
274 274 empty = False
275 275 count = 0
276 276 while not empty or count <= 2:
277 277 empty = True
278 278 count += 1
279 279 while True:
280 280 chunk = getchunk(self)
281 281 if not chunk:
282 282 break
283 283 empty = False
284 284 yield chunkheader(len(chunk))
285 285 pos = 0
286 286 while pos < len(chunk):
287 287 next = pos + 2**20
288 288 yield chunk[pos:next]
289 289 pos = next
290 290 yield closechunk()
291 291
292 292 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
293 293 # We know that we'll never have more manifests than we had
294 294 # changesets.
295 295 self.callback = prog(_('manifests'), numchanges)
296 296 # no need to check for empty manifest group here:
297 297 # if the result of the merge of 1 and 2 is the same in 3 and 4,
298 298 # no new manifest will be created and the manifest group will
299 299 # be empty during the pull
300 300 self.manifestheader()
301 301 repo.manifest.addgroup(self, revmap, trp)
302 302 repo.ui.progress(_('manifests'), None)
303 303
304 304 def apply(self, repo, srctype, url, emptyok=False,
305 305 targetphase=phases.draft, expectedtotal=None):
306 306 """Add the changegroup returned by source.read() to this repo.
307 307 srctype is a string like 'push', 'pull', or 'unbundle'. url is
308 308 the URL of the repo where this changegroup is coming from.
309 309
310 310 Return an integer summarizing the change to this repo:
311 311 - nothing changed or no source: 0
312 312 - more heads than before: 1+added heads (2..n)
313 313 - fewer heads than before: -1-removed heads (-2..-n)
314 314 - number of heads stays the same: 1
315 315 """
316 316 repo = repo.unfiltered()
317 317 def csmap(x):
318 318 repo.ui.debug("add changeset %s\n" % short(x))
319 319 return len(cl)
320 320
321 321 def revmap(x):
322 322 return cl.rev(x)
323 323
324 324 changesets = files = revisions = 0
325 325
326 326 tr = repo.transaction("\n".join([srctype, util.hidepassword(url)]))
327 327 try:
328 328 # The transaction could have been created before and already
329 329 # carries source information. In this case we use the top
330 330 # level data. We overwrite the argument because we need to use
331 331 # the top level value (if they exist) in this function.
332 332 srctype = tr.hookargs.setdefault('source', srctype)
333 333 url = tr.hookargs.setdefault('url', url)
334 334 repo.hook('prechangegroup', throw=True, **tr.hookargs)
335 335
336 336 # write changelog data to temp files so concurrent readers
337 337 # will not see an inconsistent view
338 338 cl = repo.changelog
339 339 cl.delayupdate(tr)
340 340 oldheads = cl.heads()
341 341
342 342 trp = weakref.proxy(tr)
343 343 # pull off the changeset group
344 344 repo.ui.status(_("adding changesets\n"))
345 345 clstart = len(cl)
346 346 class prog(object):
347 347 def __init__(self, step, total):
348 348 self._step = step
349 349 self._total = total
350 350 self._count = 1
351 351 def __call__(self):
352 352 repo.ui.progress(self._step, self._count, unit=_('chunks'),
353 353 total=self._total)
354 354 self._count += 1
355 355 self.callback = prog(_('changesets'), expectedtotal)
356 356
357 357 efiles = set()
358 358 def onchangelog(cl, node):
359 359 efiles.update(cl.read(node)[3])
360 360
361 361 self.changelogheader()
362 362 srccontent = cl.addgroup(self, csmap, trp,
363 363 addrevisioncb=onchangelog)
364 364 efiles = len(efiles)
365 365
366 366 if not (srccontent or emptyok):
367 367 raise error.Abort(_("received changelog group is empty"))
368 368 clend = len(cl)
369 369 changesets = clend - clstart
370 370 repo.ui.progress(_('changesets'), None)
371 371
372 372 # pull off the manifest group
373 373 repo.ui.status(_("adding manifests\n"))
374 374 self._unpackmanifests(repo, revmap, trp, prog, changesets)
375 375
376 376 needfiles = {}
377 377 if repo.ui.configbool('server', 'validate', default=False):
378 378 # validate incoming csets have their manifests
379 379 for cset in xrange(clstart, clend):
380 380 mfnode = repo.changelog.read(repo.changelog.node(cset))[0]
381 381 mfest = repo.manifest.readdelta(mfnode)
382 382 # store file nodes we must see
383 383 for f, n in mfest.iteritems():
384 384 needfiles.setdefault(f, set()).add(n)
385 385
386 386 # process the files
387 387 repo.ui.status(_("adding file changes\n"))
388 388 self.callback = None
389 389 pr = prog(_('files'), efiles)
390 390 newrevs, newfiles = _addchangegroupfiles(
391 391 repo, self, revmap, trp, pr, needfiles)
392 392 revisions += newrevs
393 393 files += newfiles
394 394
395 395 dh = 0
396 396 if oldheads:
397 397 heads = cl.heads()
398 398 dh = len(heads) - len(oldheads)
399 399 for h in heads:
400 400 if h not in oldheads and repo[h].closesbranch():
401 401 dh -= 1
402 402 htext = ""
403 403 if dh:
404 404 htext = _(" (%+d heads)") % dh
405 405
406 406 repo.ui.status(_("added %d changesets"
407 407 " with %d changes to %d files%s\n")
408 408 % (changesets, revisions, files, htext))
409 409 repo.invalidatevolatilesets()
410 410
411 411 if changesets > 0:
412 412 if 'node' not in tr.hookargs:
413 413 tr.hookargs['node'] = hex(cl.node(clstart))
414 414 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
415 415 hookargs = dict(tr.hookargs)
416 416 else:
417 417 hookargs = dict(tr.hookargs)
418 418 hookargs['node'] = hex(cl.node(clstart))
419 419 hookargs['node_last'] = hex(cl.node(clend - 1))
420 420 repo.hook('pretxnchangegroup', throw=True, **hookargs)
421 421
422 422 added = [cl.node(r) for r in xrange(clstart, clend)]
423 423 publishing = repo.publishing()
424 424 if srctype in ('push', 'serve'):
425 425 # Old servers can not push the boundary themselves.
426 426 # New servers won't push the boundary if changeset already
427 427 # exists locally as secret
428 428 #
429 429 # We should not use added here but the list of all change in
430 430 # the bundle
431 431 if publishing:
432 432 phases.advanceboundary(repo, tr, phases.public, srccontent)
433 433 else:
434 434 # Those changesets have been pushed from the outside, their
435 435 # phases are going to be pushed alongside. Therefor
436 436 # `targetphase` is ignored.
437 437 phases.advanceboundary(repo, tr, phases.draft, srccontent)
438 438 phases.retractboundary(repo, tr, phases.draft, added)
439 439 elif srctype != 'strip':
440 440 # publishing only alter behavior during push
441 441 #
442 442 # strip should not touch boundary at all
443 443 phases.retractboundary(repo, tr, targetphase, added)
444 444
445 445 if changesets > 0:
446 446 if srctype != 'strip':
447 447 # During strip, branchcache is invalid but coming call to
448 448 # `destroyed` will repair it.
449 449 # In other case we can safely update cache on disk.
450 450 branchmap.updatecache(repo.filtered('served'))
451 451
452 452 def runhooks():
453 453 # These hooks run when the lock releases, not when the
454 454 # transaction closes. So it's possible for the changelog
455 455 # to have changed since we last saw it.
456 456 if clstart >= len(repo):
457 457 return
458 458
459 459 # forcefully update the on-disk branch cache
460 460 repo.ui.debug("updating the branch cache\n")
461 461 repo.hook("changegroup", **hookargs)
462 462
463 463 for n in added:
464 464 args = hookargs.copy()
465 465 args['node'] = hex(n)
466 466 del args['node_last']
467 467 repo.hook("incoming", **args)
468 468
469 469 newheads = [h for h in repo.heads() if h not in oldheads]
470 470 repo.ui.log("incoming",
471 471 "%s incoming changes - new heads: %s\n",
472 472 len(added),
473 473 ', '.join([hex(c[:6]) for c in newheads]))
474 474
475 475 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
476 476 lambda tr: repo._afterlock(runhooks))
477 477
478 478 tr.close()
479 479
480 480 finally:
481 481 tr.release()
482 482 repo.ui.flush()
483 483 # never return 0 here:
484 484 if dh < 0:
485 485 return dh - 1
486 486 else:
487 487 return dh + 1
488 488
489 489 class cg2unpacker(cg1unpacker):
490 490 """Unpacker for cg2 streams.
491 491
492 492 cg2 streams add support for generaldelta, so the delta header
493 493 format is slightly different. All other features about the data
494 494 remain the same.
495 495 """
496 496 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
497 497 deltaheadersize = struct.calcsize(deltaheader)
498 498 version = '02'
499 499
500 500 def _deltaheader(self, headertuple, prevnode):
501 501 node, p1, p2, deltabase, cs = headertuple
502 502 flags = 0
503 503 return node, p1, p2, deltabase, cs, flags
504 504
505 505 class cg3unpacker(cg2unpacker):
506 506 """Unpacker for cg3 streams.
507 507
508 508 cg3 streams add support for exchanging treemanifests and revlog
509 509 flags. It adds the revlog flags to the delta header and an empty chunk
510 510 separating manifests and files.
511 511 """
512 512 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
513 513 deltaheadersize = struct.calcsize(deltaheader)
514 514 version = '03'
515 515
516 516 def _deltaheader(self, headertuple, prevnode):
517 517 node, p1, p2, deltabase, cs, flags = headertuple
518 518 return node, p1, p2, deltabase, cs, flags
519 519
520 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
521 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog,
522 numchanges)
523 while True:
524 chunkdata = self.filelogheader()
525 if not chunkdata:
526 break
527 # If we get here, there are directory manifests in the changegroup
528 d = chunkdata["filename"]
529 repo.ui.debug("adding %s revisions\n" % d)
530 dirlog = repo.manifest.dirlog(d)
531 if not dirlog.addgroup(self, revmap, trp):
532 raise error.Abort(_("received dir revlog group is empty"))
533
520 534 class headerlessfixup(object):
521 535 def __init__(self, fh, h):
522 536 self._h = h
523 537 self._fh = fh
524 538 def read(self, n):
525 539 if self._h:
526 540 d, self._h = self._h[:n], self._h[n:]
527 541 if len(d) < n:
528 542 d += readexactly(self._fh, n - len(d))
529 543 return d
530 544 return readexactly(self._fh, n)
531 545
532 546 def _moddirs(files):
533 547 """Given a set of modified files, find the list of modified directories.
534 548
535 549 This returns a list of (path to changed dir, changed dir) tuples,
536 550 as that's what the one client needs anyway.
537 551
538 552 >>> _moddirs(['a/b/c.py', 'a/b/c.txt', 'a/d/e/f/g.txt', 'i.txt', ])
539 553 [('/', 'a/'), ('a/', 'b/'), ('a/', 'd/'), ('a/d/', 'e/'), ('a/d/e/', 'f/')]
540 554
541 555 """
542 556 alldirs = set()
543 557 for f in files:
544 558 path = f.split('/')[:-1]
545 559 for i in xrange(len(path) - 1, -1, -1):
546 560 dn = '/'.join(path[:i])
547 561 current = dn + '/', path[i] + '/'
548 562 if current in alldirs:
549 563 break
550 564 alldirs.add(current)
551 565 return sorted(alldirs)
552 566
553 567 class cg1packer(object):
554 568 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
555 569 version = '01'
556 570 def __init__(self, repo, bundlecaps=None):
557 571 """Given a source repo, construct a bundler.
558 572
559 573 bundlecaps is optional and can be used to specify the set of
560 574 capabilities which can be used to build the bundle.
561 575 """
562 576 # Set of capabilities we can use to build the bundle.
563 577 if bundlecaps is None:
564 578 bundlecaps = set()
565 579 self._bundlecaps = bundlecaps
566 580 # experimental config: bundle.reorder
567 581 reorder = repo.ui.config('bundle', 'reorder', 'auto')
568 582 if reorder == 'auto':
569 583 reorder = None
570 584 else:
571 585 reorder = util.parsebool(reorder)
572 586 self._repo = repo
573 587 self._reorder = reorder
574 588 self._progress = repo.ui.progress
575 589 if self._repo.ui.verbose and not self._repo.ui.debugflag:
576 590 self._verbosenote = self._repo.ui.note
577 591 else:
578 592 self._verbosenote = lambda s: None
579 593
580 594 def close(self):
581 595 return closechunk()
582 596
583 597 def fileheader(self, fname):
584 598 return chunkheader(len(fname)) + fname
585 599
586 600 def group(self, nodelist, revlog, lookup, units=None):
587 601 """Calculate a delta group, yielding a sequence of changegroup chunks
588 602 (strings).
589 603
590 604 Given a list of changeset revs, return a set of deltas and
591 605 metadata corresponding to nodes. The first delta is
592 606 first parent(nodelist[0]) -> nodelist[0], the receiver is
593 607 guaranteed to have this parent as it has all history before
594 608 these changesets. In the case firstparent is nullrev the
595 609 changegroup starts with a full revision.
596 610
597 611 If units is not None, progress detail will be generated, units specifies
598 612 the type of revlog that is touched (changelog, manifest, etc.).
599 613 """
600 614 # if we don't have any revisions touched by these changesets, bail
601 615 if len(nodelist) == 0:
602 616 yield self.close()
603 617 return
604 618
605 619 # for generaldelta revlogs, we linearize the revs; this will both be
606 620 # much quicker and generate a much smaller bundle
607 621 if (revlog._generaldelta and self._reorder is None) or self._reorder:
608 622 dag = dagutil.revlogdag(revlog)
609 623 revs = set(revlog.rev(n) for n in nodelist)
610 624 revs = dag.linearize(revs)
611 625 else:
612 626 revs = sorted([revlog.rev(n) for n in nodelist])
613 627
614 628 # add the parent of the first rev
615 629 p = revlog.parentrevs(revs[0])[0]
616 630 revs.insert(0, p)
617 631
618 632 # build deltas
619 633 total = len(revs) - 1
620 634 msgbundling = _('bundling')
621 635 for r in xrange(len(revs) - 1):
622 636 if units is not None:
623 637 self._progress(msgbundling, r + 1, unit=units, total=total)
624 638 prev, curr = revs[r], revs[r + 1]
625 639 linknode = lookup(revlog.node(curr))
626 640 for c in self.revchunk(revlog, curr, prev, linknode):
627 641 yield c
628 642
629 643 if units is not None:
630 644 self._progress(msgbundling, None)
631 645 yield self.close()
632 646
633 647 # filter any nodes that claim to be part of the known set
634 648 def prune(self, revlog, missing, commonrevs):
635 649 rr, rl = revlog.rev, revlog.linkrev
636 650 return [n for n in missing if rl(rr(n)) not in commonrevs]
637 651
638 652 def _packmanifests(self, mfnodes, tmfnodes, lookuplinknode):
639 653 """Pack flat manifests into a changegroup stream."""
640 654 ml = self._repo.manifest
641 655 size = 0
642 656 for chunk in self.group(
643 657 mfnodes, ml, lookuplinknode, units=_('manifests')):
644 658 size += len(chunk)
645 659 yield chunk
646 660 self._verbosenote(_('%8.i (manifests)\n') % size)
647 661 # It looks odd to assert this here, but tmfnodes doesn't get
648 662 # filled in until after we've called lookuplinknode for
649 663 # sending root manifests, so the only way to tell the streams
650 664 # got crossed is to check after we've done all the work.
651 665 assert not tmfnodes
652 666
653 667 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
654 668 '''yield a sequence of changegroup chunks (strings)'''
655 669 repo = self._repo
656 670 cl = repo.changelog
657 671 ml = repo.manifest
658 672
659 673 clrevorder = {}
660 674 mfs = {} # needed manifests
661 675 tmfnodes = {}
662 676 fnodes = {} # needed file nodes
663 677 # maps manifest node id -> set(changed files)
664 678 mfchangedfiles = {}
665 679
666 680 # Callback for the changelog, used to collect changed files and manifest
667 681 # nodes.
668 682 # Returns the linkrev node (identity in the changelog case).
669 683 def lookupcl(x):
670 684 c = cl.read(x)
671 685 clrevorder[x] = len(clrevorder)
672 686 n = c[0]
673 687 # record the first changeset introducing this manifest version
674 688 mfs.setdefault(n, x)
675 689 # Record a complete list of potentially-changed files in
676 690 # this manifest.
677 691 mfchangedfiles.setdefault(n, set()).update(c[3])
678 692 return x
679 693
680 694 self._verbosenote(_('uncompressed size of bundle content:\n'))
681 695 size = 0
682 696 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
683 697 size += len(chunk)
684 698 yield chunk
685 699 self._verbosenote(_('%8.i (changelog)\n') % size)
686 700
687 701 # We need to make sure that the linkrev in the changegroup refers to
688 702 # the first changeset that introduced the manifest or file revision.
689 703 # The fastpath is usually safer than the slowpath, because the filelogs
690 704 # are walked in revlog order.
691 705 #
692 706 # When taking the slowpath with reorder=None and the manifest revlog
693 707 # uses generaldelta, the manifest may be walked in the "wrong" order.
694 708 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
695 709 # cc0ff93d0c0c).
696 710 #
697 711 # When taking the fastpath, we are only vulnerable to reordering
698 712 # of the changelog itself. The changelog never uses generaldelta, so
699 713 # it is only reordered when reorder=True. To handle this case, we
700 714 # simply take the slowpath, which already has the 'clrevorder' logic.
701 715 # This was also fixed in cc0ff93d0c0c.
702 716 fastpathlinkrev = fastpathlinkrev and not self._reorder
703 717 # Treemanifests don't work correctly with fastpathlinkrev
704 718 # either, because we don't discover which directory nodes to
705 719 # send along with files. This could probably be fixed.
706 720 fastpathlinkrev = fastpathlinkrev and (
707 721 'treemanifest' not in repo.requirements)
708 722 # Callback for the manifest, used to collect linkrevs for filelog
709 723 # revisions.
710 724 # Returns the linkrev node (collected in lookupcl).
711 725 if fastpathlinkrev:
712 726 lookupmflinknode = mfs.__getitem__
713 727 else:
714 728 def lookupmflinknode(x):
715 729 """Callback for looking up the linknode for manifests.
716 730
717 731 Returns the linkrev node for the specified manifest.
718 732
719 733 SIDE EFFECT:
720 734
721 735 1) fclnodes gets populated with the list of relevant
722 736 file nodes if we're not using fastpathlinkrev
723 737 2) When treemanifests are in use, collects treemanifest nodes
724 738 to send
725 739
726 740 Note that this means manifests must be completely sent to
727 741 the client before you can trust the list of files and
728 742 treemanifests to send.
729 743 """
730 744 clnode = mfs[x]
731 745 # We no longer actually care about reading deltas of
732 746 # the manifest here, because we already know the list
733 747 # of changed files, so for treemanifests (which
734 748 # lazily-load anyway to *generate* a readdelta) we can
735 749 # just load them with read() and then we'll actually
736 750 # be able to correctly load node IDs from the
737 751 # submanifest entries.
738 752 if 'treemanifest' in repo.requirements:
739 753 mdata = ml.read(x)
740 754 else:
741 755 mdata = ml.readfast(x)
742 756 for f in mfchangedfiles[x]:
743 757 try:
744 758 n = mdata[f]
745 759 except KeyError:
746 760 continue
747 761 # record the first changeset introducing this filelog
748 762 # version
749 763 fclnodes = fnodes.setdefault(f, {})
750 764 fclnode = fclnodes.setdefault(n, clnode)
751 765 if clrevorder[clnode] < clrevorder[fclnode]:
752 766 fclnodes[n] = clnode
753 767 # gather list of changed treemanifest nodes
754 768 if 'treemanifest' in repo.requirements:
755 769 submfs = {'/': mdata}
756 770 for dn, bn in _moddirs(mfchangedfiles[x]):
757 771 submf = submfs[dn]
758 772 submf = submf._dirs[bn]
759 773 submfs[submf.dir()] = submf
760 774 tmfclnodes = tmfnodes.setdefault(submf.dir(), {})
761 775 tmfclnodes.setdefault(submf._node, clnode)
762 776 if clrevorder[clnode] < clrevorder[fclnode]:
763 777 tmfclnodes[n] = clnode
764 778 return clnode
765 779
766 780 mfnodes = self.prune(ml, mfs, commonrevs)
767 781 for x in self._packmanifests(
768 782 mfnodes, tmfnodes, lookupmflinknode):
769 783 yield x
770 784
771 785 mfs.clear()
772 786 clrevs = set(cl.rev(x) for x in clnodes)
773 787
774 788 if not fastpathlinkrev:
775 789 def linknodes(unused, fname):
776 790 return fnodes.get(fname, {})
777 791 else:
778 792 cln = cl.node
779 793 def linknodes(filerevlog, fname):
780 794 llr = filerevlog.linkrev
781 795 fln = filerevlog.node
782 796 revs = ((r, llr(r)) for r in filerevlog)
783 797 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
784 798
785 799 changedfiles = set()
786 800 for x in mfchangedfiles.itervalues():
787 801 changedfiles.update(x)
788 802 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
789 803 source):
790 804 yield chunk
791 805
792 806 yield self.close()
793 807
794 808 if clnodes:
795 809 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
796 810
797 811 # The 'source' parameter is useful for extensions
798 812 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
799 813 repo = self._repo
800 814 progress = self._progress
801 815 msgbundling = _('bundling')
802 816
803 817 total = len(changedfiles)
804 818 # for progress output
805 819 msgfiles = _('files')
806 820 for i, fname in enumerate(sorted(changedfiles)):
807 821 filerevlog = repo.file(fname)
808 822 if not filerevlog:
809 823 raise error.Abort(_("empty or missing revlog for %s") % fname)
810 824
811 825 linkrevnodes = linknodes(filerevlog, fname)
812 826 # Lookup for filenodes, we collected the linkrev nodes above in the
813 827 # fastpath case and with lookupmf in the slowpath case.
814 828 def lookupfilelog(x):
815 829 return linkrevnodes[x]
816 830
817 831 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
818 832 if filenodes:
819 833 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
820 834 total=total)
821 835 h = self.fileheader(fname)
822 836 size = len(h)
823 837 yield h
824 838 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
825 839 size += len(chunk)
826 840 yield chunk
827 841 self._verbosenote(_('%8.i %s\n') % (size, fname))
828 842 progress(msgbundling, None)
829 843
830 844 def deltaparent(self, revlog, rev, p1, p2, prev):
831 845 return prev
832 846
833 847 def revchunk(self, revlog, rev, prev, linknode):
834 848 node = revlog.node(rev)
835 849 p1, p2 = revlog.parentrevs(rev)
836 850 base = self.deltaparent(revlog, rev, p1, p2, prev)
837 851
838 852 prefix = ''
839 853 if revlog.iscensored(base) or revlog.iscensored(rev):
840 854 try:
841 855 delta = revlog.revision(node)
842 856 except error.CensoredNodeError as e:
843 857 delta = e.tombstone
844 858 if base == nullrev:
845 859 prefix = mdiff.trivialdiffheader(len(delta))
846 860 else:
847 861 baselen = revlog.rawsize(base)
848 862 prefix = mdiff.replacediffheader(baselen, len(delta))
849 863 elif base == nullrev:
850 864 delta = revlog.revision(node)
851 865 prefix = mdiff.trivialdiffheader(len(delta))
852 866 else:
853 867 delta = revlog.revdiff(base, rev)
854 868 p1n, p2n = revlog.parents(node)
855 869 basenode = revlog.node(base)
856 870 flags = revlog.flags(rev)
857 871 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
858 872 meta += prefix
859 873 l = len(meta) + len(delta)
860 874 yield chunkheader(l)
861 875 yield meta
862 876 yield delta
863 877 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
864 878 # do nothing with basenode, it is implicitly the previous one in HG10
865 879 # do nothing with flags, it is implicitly 0 for cg1 and cg2
866 880 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
867 881
868 882 class cg2packer(cg1packer):
869 883 version = '02'
870 884 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
871 885
872 886 def __init__(self, repo, bundlecaps=None):
873 887 super(cg2packer, self).__init__(repo, bundlecaps)
874 888 if self._reorder is None:
875 889 # Since generaldelta is directly supported by cg2, reordering
876 890 # generally doesn't help, so we disable it by default (treating
877 891 # bundle.reorder=auto just like bundle.reorder=False).
878 892 self._reorder = False
879 893
880 894 def deltaparent(self, revlog, rev, p1, p2, prev):
881 895 dp = revlog.deltaparent(rev)
882 896 # avoid storing full revisions; pick prev in those cases
883 897 # also pick prev when we can't be sure remote has dp
884 898 if dp == nullrev or (dp != p1 and dp != p2 and dp != prev):
885 899 return prev
886 900 return dp
887 901
888 902 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
889 903 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
890 904 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
891 905
892 906 class cg3packer(cg2packer):
893 907 version = '03'
894 908 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
895 909
896 910 def _packmanifests(self, mfnodes, tmfnodes, lookuplinknode):
897 911 # Note that debug prints are super confusing in this code, as
898 912 # tmfnodes gets populated by the calls to lookuplinknode in
899 913 # the superclass's manifest packer. In the future we should
900 914 # probably see if we can refactor this somehow to be less
901 915 # confusing.
902 916 for x in super(cg3packer, self)._packmanifests(
903 917 mfnodes, {}, lookuplinknode):
904 918 yield x
905 919 dirlog = self._repo.manifest.dirlog
906 920 for name, nodes in tmfnodes.iteritems():
907 921 # For now, directory headers are simply file headers with
908 922 # a trailing '/' on the path (already in the name).
909 923 yield self.fileheader(name)
910 924 for chunk in self.group(nodes, dirlog(name), nodes.get):
911 925 yield chunk
912 926 yield self.close()
913 927
914 928 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
915 929 return struct.pack(
916 930 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
917 931
918 932 _packermap = {'01': (cg1packer, cg1unpacker),
919 933 # cg2 adds support for exchanging generaldelta
920 934 '02': (cg2packer, cg2unpacker),
921 935 # cg3 adds support for exchanging revlog flags and treemanifests
922 936 '03': (cg3packer, cg3unpacker),
923 937 }
924 938
925 939 def supportedversions(repo):
926 940 versions = _packermap.keys()
927 941 cg3 = ('treemanifest' in repo.requirements or
928 942 repo.ui.configbool('experimental', 'changegroup3') or
929 943 repo.ui.configbool('experimental', 'treemanifest'))
930 944 if not cg3:
931 945 versions.remove('03')
932 946 return versions
933 947
934 948 def getbundler(version, repo, bundlecaps=None):
935 949 assert version in supportedversions(repo)
936 950 return _packermap[version][0](repo, bundlecaps)
937 951
938 952 def getunbundler(version, fh, alg):
939 953 return _packermap[version][1](fh, alg)
940 954
941 955 def _changegroupinfo(repo, nodes, source):
942 956 if repo.ui.verbose or source == 'bundle':
943 957 repo.ui.status(_("%d changesets found\n") % len(nodes))
944 958 if repo.ui.debugflag:
945 959 repo.ui.debug("list of changesets:\n")
946 960 for node in nodes:
947 961 repo.ui.debug("%s\n" % hex(node))
948 962
949 963 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
950 964 repo = repo.unfiltered()
951 965 commonrevs = outgoing.common
952 966 csets = outgoing.missing
953 967 heads = outgoing.missingheads
954 968 # We go through the fast path if we get told to, or if all (unfiltered
955 969 # heads have been requested (since we then know there all linkrevs will
956 970 # be pulled by the client).
957 971 heads.sort()
958 972 fastpathlinkrev = fastpath or (
959 973 repo.filtername is None and heads == sorted(repo.heads()))
960 974
961 975 repo.hook('preoutgoing', throw=True, source=source)
962 976 _changegroupinfo(repo, csets, source)
963 977 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
964 978
965 979 def getsubset(repo, outgoing, bundler, source, fastpath=False):
966 980 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
967 981 return getunbundler(bundler.version, util.chunkbuffer(gengroup), None)
968 982
969 983 def changegroupsubset(repo, roots, heads, source, version='01'):
970 984 """Compute a changegroup consisting of all the nodes that are
971 985 descendants of any of the roots and ancestors of any of the heads.
972 986 Return a chunkbuffer object whose read() method will return
973 987 successive changegroup chunks.
974 988
975 989 It is fairly complex as determining which filenodes and which
976 990 manifest nodes need to be included for the changeset to be complete
977 991 is non-trivial.
978 992
979 993 Another wrinkle is doing the reverse, figuring out which changeset in
980 994 the changegroup a particular filenode or manifestnode belongs to.
981 995 """
982 996 cl = repo.changelog
983 997 if not roots:
984 998 roots = [nullid]
985 999 discbases = []
986 1000 for n in roots:
987 1001 discbases.extend([p for p in cl.parents(n) if p != nullid])
988 1002 # TODO: remove call to nodesbetween.
989 1003 csets, roots, heads = cl.nodesbetween(roots, heads)
990 1004 included = set(csets)
991 1005 discbases = [n for n in discbases if n not in included]
992 1006 outgoing = discovery.outgoing(cl, discbases, heads)
993 1007 bundler = getbundler(version, repo)
994 1008 return getsubset(repo, outgoing, bundler, source)
995 1009
996 1010 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
997 1011 version='01'):
998 1012 """Like getbundle, but taking a discovery.outgoing as an argument.
999 1013
1000 1014 This is only implemented for local repos and reuses potentially
1001 1015 precomputed sets in outgoing. Returns a raw changegroup generator."""
1002 1016 if not outgoing.missing:
1003 1017 return None
1004 1018 bundler = getbundler(version, repo, bundlecaps)
1005 1019 return getsubsetraw(repo, outgoing, bundler, source)
1006 1020
1007 1021 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None,
1008 1022 version='01'):
1009 1023 """Like getbundle, but taking a discovery.outgoing as an argument.
1010 1024
1011 1025 This is only implemented for local repos and reuses potentially
1012 1026 precomputed sets in outgoing."""
1013 1027 if not outgoing.missing:
1014 1028 return None
1015 1029 bundler = getbundler(version, repo, bundlecaps)
1016 1030 return getsubset(repo, outgoing, bundler, source)
1017 1031
1018 1032 def computeoutgoing(repo, heads, common):
1019 1033 """Computes which revs are outgoing given a set of common
1020 1034 and a set of heads.
1021 1035
1022 1036 This is a separate function so extensions can have access to
1023 1037 the logic.
1024 1038
1025 1039 Returns a discovery.outgoing object.
1026 1040 """
1027 1041 cl = repo.changelog
1028 1042 if common:
1029 1043 hasnode = cl.hasnode
1030 1044 common = [n for n in common if hasnode(n)]
1031 1045 else:
1032 1046 common = [nullid]
1033 1047 if not heads:
1034 1048 heads = cl.heads()
1035 1049 return discovery.outgoing(cl, common, heads)
1036 1050
1037 1051 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None,
1038 1052 version='01'):
1039 1053 """Like changegroupsubset, but returns the set difference between the
1040 1054 ancestors of heads and the ancestors common.
1041 1055
1042 1056 If heads is None, use the local heads. If common is None, use [nullid].
1043 1057
1044 1058 The nodes in common might not all be known locally due to the way the
1045 1059 current discovery protocol works.
1046 1060 """
1047 1061 outgoing = computeoutgoing(repo, heads, common)
1048 1062 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps,
1049 1063 version=version)
1050 1064
1051 1065 def changegroup(repo, basenodes, source):
1052 1066 # to avoid a race we use changegroupsubset() (issue1320)
1053 1067 return changegroupsubset(repo, basenodes, repo.heads(), source)
1054 1068
1055 1069 def _addchangegroupfiles(repo, source, revmap, trp, pr, needfiles):
1056 1070 revisions = 0
1057 1071 files = 0
1058 submfsdone = False
1059 1072 while True:
1060 1073 chunkdata = source.filelogheader()
1061 1074 if not chunkdata:
1062 if source.version == "03" and not submfsdone:
1063 submfsdone = True
1064 continue
1065 1075 break
1066 1076 f = chunkdata["filename"]
1067 1077 repo.ui.debug("adding %s revisions\n" % f)
1068 1078 pr()
1069 directory = (f[-1] == '/')
1070 if directory:
1071 # a directory using treemanifests
1072 fl = repo.manifest.dirlog(f)
1073 else:
1074 fl = repo.file(f)
1079 fl = repo.file(f)
1075 1080 o = len(fl)
1076 1081 try:
1077 1082 if not fl.addgroup(source, revmap, trp):
1078 1083 raise error.Abort(_("received file revlog group is empty"))
1079 1084 except error.CensoredBaseError as e:
1080 1085 raise error.Abort(_("received delta base is censored: %s") % e)
1081 if not directory:
1082 revisions += len(fl) - o
1083 files += 1
1086 revisions += len(fl) - o
1087 files += 1
1084 1088 if f in needfiles:
1085 1089 needs = needfiles[f]
1086 1090 for new in xrange(o, len(fl)):
1087 1091 n = fl.node(new)
1088 1092 if n in needs:
1089 1093 needs.remove(n)
1090 1094 else:
1091 1095 raise error.Abort(
1092 1096 _("received spurious file revlog entry"))
1093 1097 if not needs:
1094 1098 del needfiles[f]
1095 1099 repo.ui.progress(_('files'), None)
1096 1100
1097 1101 for f, needs in needfiles.iteritems():
1098 1102 fl = repo.file(f)
1099 1103 for n in needs:
1100 1104 try:
1101 1105 fl.rev(n)
1102 1106 except error.LookupError:
1103 1107 raise error.Abort(
1104 1108 _('missing file data for %s:%s - run hg verify') %
1105 1109 (f, hex(n)))
1106 1110
1107 1111 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now