##// END OF EJS Templates
changegroup: make _packmanifests() dumber...
Martin von Zweigbergk -
r28228:abf12026 default
parent child Browse files
Show More
@@ -1,1162 +1,1155 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullid,
19 19 nullrev,
20 20 short,
21 21 )
22 22
23 23 from . import (
24 24 branchmap,
25 25 dagutil,
26 26 discovery,
27 27 error,
28 28 mdiff,
29 29 phases,
30 30 util,
31 31 )
32 32
33 33 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
34 34 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
35 35 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
36 36
37 37 def readexactly(stream, n):
38 38 '''read n bytes from stream.read and abort if less was available'''
39 39 s = stream.read(n)
40 40 if len(s) < n:
41 41 raise error.Abort(_("stream ended unexpectedly"
42 42 " (got %d bytes, expected %d)")
43 43 % (len(s), n))
44 44 return s
45 45
46 46 def getchunk(stream):
47 47 """return the next chunk from stream as a string"""
48 48 d = readexactly(stream, 4)
49 49 l = struct.unpack(">l", d)[0]
50 50 if l <= 4:
51 51 if l:
52 52 raise error.Abort(_("invalid chunk length %d") % l)
53 53 return ""
54 54 return readexactly(stream, l - 4)
55 55
56 56 def chunkheader(length):
57 57 """return a changegroup chunk header (string)"""
58 58 return struct.pack(">l", length + 4)
59 59
60 60 def closechunk():
61 61 """return a changegroup chunk header (string) for a zero-length chunk"""
62 62 return struct.pack(">l", 0)
63 63
64 64 def combineresults(results):
65 65 """logic to combine 0 or more addchangegroup results into one"""
66 66 changedheads = 0
67 67 result = 1
68 68 for ret in results:
69 69 # If any changegroup result is 0, return 0
70 70 if ret == 0:
71 71 result = 0
72 72 break
73 73 if ret < -1:
74 74 changedheads += ret + 1
75 75 elif ret > 1:
76 76 changedheads += ret - 1
77 77 if changedheads > 0:
78 78 result = 1 + changedheads
79 79 elif changedheads < 0:
80 80 result = -1 + changedheads
81 81 return result
82 82
83 83 bundletypes = {
84 84 "": ("", None), # only when using unbundle on ssh and old http servers
85 85 # since the unification ssh accepts a header but there
86 86 # is no capability signaling it.
87 87 "HG20": (), # special-cased below
88 88 "HG10UN": ("HG10UN", None),
89 89 "HG10BZ": ("HG10", 'BZ'),
90 90 "HG10GZ": ("HG10GZ", 'GZ'),
91 91 }
92 92
93 93 # hgweb uses this list to communicate its preferred type
94 94 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
95 95
96 96 def writechunks(ui, chunks, filename, vfs=None):
97 97 """Write chunks to a file and return its filename.
98 98
99 99 The stream is assumed to be a bundle file.
100 100 Existing files will not be overwritten.
101 101 If no filename is specified, a temporary file is created.
102 102 """
103 103 fh = None
104 104 cleanup = None
105 105 try:
106 106 if filename:
107 107 if vfs:
108 108 fh = vfs.open(filename, "wb")
109 109 else:
110 110 fh = open(filename, "wb")
111 111 else:
112 112 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
113 113 fh = os.fdopen(fd, "wb")
114 114 cleanup = filename
115 115 for c in chunks:
116 116 fh.write(c)
117 117 cleanup = None
118 118 return filename
119 119 finally:
120 120 if fh is not None:
121 121 fh.close()
122 122 if cleanup is not None:
123 123 if filename and vfs:
124 124 vfs.unlink(cleanup)
125 125 else:
126 126 os.unlink(cleanup)
127 127
128 128 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
129 129 """Write a bundle file and return its filename.
130 130
131 131 Existing files will not be overwritten.
132 132 If no filename is specified, a temporary file is created.
133 133 bz2 compression can be turned off.
134 134 The bundle file will be deleted in case of errors.
135 135 """
136 136
137 137 if bundletype == "HG20":
138 138 from . import bundle2
139 139 bundle = bundle2.bundle20(ui)
140 140 bundle.setcompression(compression)
141 141 part = bundle.newpart('changegroup', data=cg.getchunks())
142 142 part.addparam('version', cg.version)
143 143 chunkiter = bundle.getchunks()
144 144 else:
145 145 # compression argument is only for the bundle2 case
146 146 assert compression is None
147 147 if cg.version != '01':
148 148 raise error.Abort(_('old bundle types only supports v1 '
149 149 'changegroups'))
150 150 header, comp = bundletypes[bundletype]
151 151 if comp not in util.compressors:
152 152 raise error.Abort(_('unknown stream compression type: %s')
153 153 % comp)
154 154 z = util.compressors[comp]()
155 155 subchunkiter = cg.getchunks()
156 156 def chunkiter():
157 157 yield header
158 158 for chunk in subchunkiter:
159 159 yield z.compress(chunk)
160 160 yield z.flush()
161 161 chunkiter = chunkiter()
162 162
163 163 # parse the changegroup data, otherwise we will block
164 164 # in case of sshrepo because we don't know the end of the stream
165 165
166 166 # an empty chunkgroup is the end of the changegroup
167 167 # a changegroup has at least 2 chunkgroups (changelog and manifest).
168 168 # after that, an empty chunkgroup is the end of the changegroup
169 169 return writechunks(ui, chunkiter, filename, vfs=vfs)
170 170
171 171 class cg1unpacker(object):
172 172 """Unpacker for cg1 changegroup streams.
173 173
174 174 A changegroup unpacker handles the framing of the revision data in
175 175 the wire format. Most consumers will want to use the apply()
176 176 method to add the changes from the changegroup to a repository.
177 177
178 178 If you're forwarding a changegroup unmodified to another consumer,
179 179 use getchunks(), which returns an iterator of changegroup
180 180 chunks. This is mostly useful for cases where you need to know the
181 181 data stream has ended by observing the end of the changegroup.
182 182
183 183 deltachunk() is useful only if you're applying delta data. Most
184 184 consumers should prefer apply() instead.
185 185
186 186 A few other public methods exist. Those are used only for
187 187 bundlerepo and some debug commands - their use is discouraged.
188 188 """
189 189 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
190 190 deltaheadersize = struct.calcsize(deltaheader)
191 191 version = '01'
192 192 _grouplistcount = 1 # One list of files after the manifests
193 193
194 194 def __init__(self, fh, alg):
195 195 if alg == 'UN':
196 196 alg = None # get more modern without breaking too much
197 197 if not alg in util.decompressors:
198 198 raise error.Abort(_('unknown stream compression type: %s')
199 199 % alg)
200 200 if alg == 'BZ':
201 201 alg = '_truncatedBZ'
202 202 self._stream = util.decompressors[alg](fh)
203 203 self._type = alg
204 204 self.callback = None
205 205
206 206 # These methods (compressed, read, seek, tell) all appear to only
207 207 # be used by bundlerepo, but it's a little hard to tell.
208 208 def compressed(self):
209 209 return self._type is not None
210 210 def read(self, l):
211 211 return self._stream.read(l)
212 212 def seek(self, pos):
213 213 return self._stream.seek(pos)
214 214 def tell(self):
215 215 return self._stream.tell()
216 216 def close(self):
217 217 return self._stream.close()
218 218
219 219 def _chunklength(self):
220 220 d = readexactly(self._stream, 4)
221 221 l = struct.unpack(">l", d)[0]
222 222 if l <= 4:
223 223 if l:
224 224 raise error.Abort(_("invalid chunk length %d") % l)
225 225 return 0
226 226 if self.callback:
227 227 self.callback()
228 228 return l - 4
229 229
230 230 def changelogheader(self):
231 231 """v10 does not have a changelog header chunk"""
232 232 return {}
233 233
234 234 def manifestheader(self):
235 235 """v10 does not have a manifest header chunk"""
236 236 return {}
237 237
238 238 def filelogheader(self):
239 239 """return the header of the filelogs chunk, v10 only has the filename"""
240 240 l = self._chunklength()
241 241 if not l:
242 242 return {}
243 243 fname = readexactly(self._stream, l)
244 244 return {'filename': fname}
245 245
246 246 def _deltaheader(self, headertuple, prevnode):
247 247 node, p1, p2, cs = headertuple
248 248 if prevnode is None:
249 249 deltabase = p1
250 250 else:
251 251 deltabase = prevnode
252 252 flags = 0
253 253 return node, p1, p2, deltabase, cs, flags
254 254
255 255 def deltachunk(self, prevnode):
256 256 l = self._chunklength()
257 257 if not l:
258 258 return {}
259 259 headerdata = readexactly(self._stream, self.deltaheadersize)
260 260 header = struct.unpack(self.deltaheader, headerdata)
261 261 delta = readexactly(self._stream, l - self.deltaheadersize)
262 262 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
263 263 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
264 264 'deltabase': deltabase, 'delta': delta, 'flags': flags}
265 265
266 266 def getchunks(self):
267 267 """returns all the chunks contains in the bundle
268 268
269 269 Used when you need to forward the binary stream to a file or another
270 270 network API. To do so, it parse the changegroup data, otherwise it will
271 271 block in case of sshrepo because it don't know the end of the stream.
272 272 """
273 273 # an empty chunkgroup is the end of the changegroup
274 274 # a changegroup has at least 2 chunkgroups (changelog and manifest).
275 275 # after that, changegroup versions 1 and 2 have a series of groups
276 276 # with one group per file. changegroup 3 has a series of directory
277 277 # manifests before the files.
278 278 count = 0
279 279 emptycount = 0
280 280 while emptycount < self._grouplistcount:
281 281 empty = True
282 282 count += 1
283 283 while True:
284 284 chunk = getchunk(self)
285 285 if not chunk:
286 286 if empty and count > 2:
287 287 emptycount += 1
288 288 break
289 289 empty = False
290 290 yield chunkheader(len(chunk))
291 291 pos = 0
292 292 while pos < len(chunk):
293 293 next = pos + 2**20
294 294 yield chunk[pos:next]
295 295 pos = next
296 296 yield closechunk()
297 297
298 298 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
299 299 # We know that we'll never have more manifests than we had
300 300 # changesets.
301 301 self.callback = prog(_('manifests'), numchanges)
302 302 # no need to check for empty manifest group here:
303 303 # if the result of the merge of 1 and 2 is the same in 3 and 4,
304 304 # no new manifest will be created and the manifest group will
305 305 # be empty during the pull
306 306 self.manifestheader()
307 307 repo.manifest.addgroup(self, revmap, trp)
308 308 repo.ui.progress(_('manifests'), None)
309 309
310 310 def apply(self, repo, srctype, url, emptyok=False,
311 311 targetphase=phases.draft, expectedtotal=None):
312 312 """Add the changegroup returned by source.read() to this repo.
313 313 srctype is a string like 'push', 'pull', or 'unbundle'. url is
314 314 the URL of the repo where this changegroup is coming from.
315 315
316 316 Return an integer summarizing the change to this repo:
317 317 - nothing changed or no source: 0
318 318 - more heads than before: 1+added heads (2..n)
319 319 - fewer heads than before: -1-removed heads (-2..-n)
320 320 - number of heads stays the same: 1
321 321 """
322 322 repo = repo.unfiltered()
323 323 def csmap(x):
324 324 repo.ui.debug("add changeset %s\n" % short(x))
325 325 return len(cl)
326 326
327 327 def revmap(x):
328 328 return cl.rev(x)
329 329
330 330 changesets = files = revisions = 0
331 331
332 332 try:
333 333 with repo.transaction("\n".join([srctype,
334 334 util.hidepassword(url)])) as tr:
335 335 # The transaction could have been created before and already
336 336 # carries source information. In this case we use the top
337 337 # level data. We overwrite the argument because we need to use
338 338 # the top level value (if they exist) in this function.
339 339 srctype = tr.hookargs.setdefault('source', srctype)
340 340 url = tr.hookargs.setdefault('url', url)
341 341 repo.hook('prechangegroup', throw=True, **tr.hookargs)
342 342
343 343 # write changelog data to temp files so concurrent readers
344 344 # will not see an inconsistent view
345 345 cl = repo.changelog
346 346 cl.delayupdate(tr)
347 347 oldheads = cl.heads()
348 348
349 349 trp = weakref.proxy(tr)
350 350 # pull off the changeset group
351 351 repo.ui.status(_("adding changesets\n"))
352 352 clstart = len(cl)
353 353 class prog(object):
354 354 def __init__(self, step, total):
355 355 self._step = step
356 356 self._total = total
357 357 self._count = 1
358 358 def __call__(self):
359 359 repo.ui.progress(self._step, self._count,
360 360 unit=_('chunks'), total=self._total)
361 361 self._count += 1
362 362 self.callback = prog(_('changesets'), expectedtotal)
363 363
364 364 efiles = set()
365 365 def onchangelog(cl, node):
366 366 efiles.update(cl.read(node)[3])
367 367
368 368 self.changelogheader()
369 369 srccontent = cl.addgroup(self, csmap, trp,
370 370 addrevisioncb=onchangelog)
371 371 efiles = len(efiles)
372 372
373 373 if not (srccontent or emptyok):
374 374 raise error.Abort(_("received changelog group is empty"))
375 375 clend = len(cl)
376 376 changesets = clend - clstart
377 377 repo.ui.progress(_('changesets'), None)
378 378
379 379 # pull off the manifest group
380 380 repo.ui.status(_("adding manifests\n"))
381 381 self._unpackmanifests(repo, revmap, trp, prog, changesets)
382 382
383 383 needfiles = {}
384 384 if repo.ui.configbool('server', 'validate', default=False):
385 385 # validate incoming csets have their manifests
386 386 for cset in xrange(clstart, clend):
387 387 mfnode = repo.changelog.read(
388 388 repo.changelog.node(cset))[0]
389 389 mfest = repo.manifest.readdelta(mfnode)
390 390 # store file nodes we must see
391 391 for f, n in mfest.iteritems():
392 392 needfiles.setdefault(f, set()).add(n)
393 393
394 394 # process the files
395 395 repo.ui.status(_("adding file changes\n"))
396 396 self.callback = None
397 397 pr = prog(_('files'), efiles)
398 398 newrevs, newfiles = _addchangegroupfiles(
399 399 repo, self, revmap, trp, pr, needfiles)
400 400 revisions += newrevs
401 401 files += newfiles
402 402
403 403 dh = 0
404 404 if oldheads:
405 405 heads = cl.heads()
406 406 dh = len(heads) - len(oldheads)
407 407 for h in heads:
408 408 if h not in oldheads and repo[h].closesbranch():
409 409 dh -= 1
410 410 htext = ""
411 411 if dh:
412 412 htext = _(" (%+d heads)") % dh
413 413
414 414 repo.ui.status(_("added %d changesets"
415 415 " with %d changes to %d files%s\n")
416 416 % (changesets, revisions, files, htext))
417 417 repo.invalidatevolatilesets()
418 418
419 419 if changesets > 0:
420 420 if 'node' not in tr.hookargs:
421 421 tr.hookargs['node'] = hex(cl.node(clstart))
422 422 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
423 423 hookargs = dict(tr.hookargs)
424 424 else:
425 425 hookargs = dict(tr.hookargs)
426 426 hookargs['node'] = hex(cl.node(clstart))
427 427 hookargs['node_last'] = hex(cl.node(clend - 1))
428 428 repo.hook('pretxnchangegroup', throw=True, **hookargs)
429 429
430 430 added = [cl.node(r) for r in xrange(clstart, clend)]
431 431 publishing = repo.publishing()
432 432 if srctype in ('push', 'serve'):
433 433 # Old servers can not push the boundary themselves.
434 434 # New servers won't push the boundary if changeset already
435 435 # exists locally as secret
436 436 #
437 437 # We should not use added here but the list of all change in
438 438 # the bundle
439 439 if publishing:
440 440 phases.advanceboundary(repo, tr, phases.public,
441 441 srccontent)
442 442 else:
443 443 # Those changesets have been pushed from the
444 444 # outside, their phases are going to be pushed
445 445 # alongside. Therefor `targetphase` is
446 446 # ignored.
447 447 phases.advanceboundary(repo, tr, phases.draft,
448 448 srccontent)
449 449 phases.retractboundary(repo, tr, phases.draft, added)
450 450 elif srctype != 'strip':
451 451 # publishing only alter behavior during push
452 452 #
453 453 # strip should not touch boundary at all
454 454 phases.retractboundary(repo, tr, targetphase, added)
455 455
456 456 if changesets > 0:
457 457 if srctype != 'strip':
458 458 # During strip, branchcache is invalid but
459 459 # coming call to `destroyed` will repair it.
460 460 # In other case we can safely update cache on
461 461 # disk.
462 462 branchmap.updatecache(repo.filtered('served'))
463 463
464 464 def runhooks():
465 465 # These hooks run when the lock releases, not when the
466 466 # transaction closes. So it's possible for the changelog
467 467 # to have changed since we last saw it.
468 468 if clstart >= len(repo):
469 469 return
470 470
471 471 # forcefully update the on-disk branch cache
472 472 repo.ui.debug("updating the branch cache\n")
473 473 repo.hook("changegroup", **hookargs)
474 474
475 475 for n in added:
476 476 args = hookargs.copy()
477 477 args['node'] = hex(n)
478 478 del args['node_last']
479 479 repo.hook("incoming", **args)
480 480
481 481 newheads = [h for h in repo.heads()
482 482 if h not in oldheads]
483 483 repo.ui.log("incoming",
484 484 "%s incoming changes - new heads: %s\n",
485 485 len(added),
486 486 ', '.join([hex(c[:6]) for c in newheads]))
487 487
488 488 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
489 489 lambda tr: repo._afterlock(runhooks))
490 490 finally:
491 491 repo.ui.flush()
492 492 # never return 0 here:
493 493 if dh < 0:
494 494 return dh - 1
495 495 else:
496 496 return dh + 1
497 497
498 498 class cg2unpacker(cg1unpacker):
499 499 """Unpacker for cg2 streams.
500 500
501 501 cg2 streams add support for generaldelta, so the delta header
502 502 format is slightly different. All other features about the data
503 503 remain the same.
504 504 """
505 505 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
506 506 deltaheadersize = struct.calcsize(deltaheader)
507 507 version = '02'
508 508
509 509 def _deltaheader(self, headertuple, prevnode):
510 510 node, p1, p2, deltabase, cs = headertuple
511 511 flags = 0
512 512 return node, p1, p2, deltabase, cs, flags
513 513
514 514 class cg3unpacker(cg2unpacker):
515 515 """Unpacker for cg3 streams.
516 516
517 517 cg3 streams add support for exchanging treemanifests and revlog
518 518 flags. It adds the revlog flags to the delta header and an empty chunk
519 519 separating manifests and files.
520 520 """
521 521 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
522 522 deltaheadersize = struct.calcsize(deltaheader)
523 523 version = '03'
524 524 _grouplistcount = 2 # One list of manifests and one list of files
525 525
526 526 def _deltaheader(self, headertuple, prevnode):
527 527 node, p1, p2, deltabase, cs, flags = headertuple
528 528 return node, p1, p2, deltabase, cs, flags
529 529
530 530 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
531 531 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog,
532 532 numchanges)
533 533 while True:
534 534 chunkdata = self.filelogheader()
535 535 if not chunkdata:
536 536 break
537 537 # If we get here, there are directory manifests in the changegroup
538 538 d = chunkdata["filename"]
539 539 repo.ui.debug("adding %s revisions\n" % d)
540 540 dirlog = repo.manifest.dirlog(d)
541 541 if not dirlog.addgroup(self, revmap, trp):
542 542 raise error.Abort(_("received dir revlog group is empty"))
543 543
544 544 class headerlessfixup(object):
545 545 def __init__(self, fh, h):
546 546 self._h = h
547 547 self._fh = fh
548 548 def read(self, n):
549 549 if self._h:
550 550 d, self._h = self._h[:n], self._h[n:]
551 551 if len(d) < n:
552 552 d += readexactly(self._fh, n - len(d))
553 553 return d
554 554 return readexactly(self._fh, n)
555 555
556 556 def _moddirs(files):
557 557 """Given a set of modified files, find the list of modified directories.
558 558
559 559 This returns a list of (path to changed dir, changed dir) tuples,
560 560 as that's what the one client needs anyway.
561 561
562 562 >>> _moddirs(['a/b/c.py', 'a/b/c.txt', 'a/d/e/f/g.txt', 'i.txt', ])
563 563 [('/', 'a/'), ('a/', 'b/'), ('a/', 'd/'), ('a/d/', 'e/'), ('a/d/e/', 'f/')]
564 564
565 565 """
566 566 alldirs = set()
567 567 for f in files:
568 568 path = f.split('/')[:-1]
569 569 for i in xrange(len(path) - 1, -1, -1):
570 570 dn = '/'.join(path[:i])
571 571 current = dn + '/', path[i] + '/'
572 572 if current in alldirs:
573 573 break
574 574 alldirs.add(current)
575 575 return sorted(alldirs)
576 576
577 577 class cg1packer(object):
578 578 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
579 579 version = '01'
580 580 def __init__(self, repo, bundlecaps=None):
581 581 """Given a source repo, construct a bundler.
582 582
583 583 bundlecaps is optional and can be used to specify the set of
584 584 capabilities which can be used to build the bundle.
585 585 """
586 586 # Set of capabilities we can use to build the bundle.
587 587 if bundlecaps is None:
588 588 bundlecaps = set()
589 589 self._bundlecaps = bundlecaps
590 590 # experimental config: bundle.reorder
591 591 reorder = repo.ui.config('bundle', 'reorder', 'auto')
592 592 if reorder == 'auto':
593 593 reorder = None
594 594 else:
595 595 reorder = util.parsebool(reorder)
596 596 self._repo = repo
597 597 self._reorder = reorder
598 598 self._progress = repo.ui.progress
599 599 if self._repo.ui.verbose and not self._repo.ui.debugflag:
600 600 self._verbosenote = self._repo.ui.note
601 601 else:
602 602 self._verbosenote = lambda s: None
603 603
604 604 def close(self):
605 605 return closechunk()
606 606
607 607 def fileheader(self, fname):
608 608 return chunkheader(len(fname)) + fname
609 609
610 610 def group(self, nodelist, revlog, lookup, units=None):
611 611 """Calculate a delta group, yielding a sequence of changegroup chunks
612 612 (strings).
613 613
614 614 Given a list of changeset revs, return a set of deltas and
615 615 metadata corresponding to nodes. The first delta is
616 616 first parent(nodelist[0]) -> nodelist[0], the receiver is
617 617 guaranteed to have this parent as it has all history before
618 618 these changesets. In the case firstparent is nullrev the
619 619 changegroup starts with a full revision.
620 620
621 621 If units is not None, progress detail will be generated, units specifies
622 622 the type of revlog that is touched (changelog, manifest, etc.).
623 623 """
624 624 # if we don't have any revisions touched by these changesets, bail
625 625 if len(nodelist) == 0:
626 626 yield self.close()
627 627 return
628 628
629 629 # for generaldelta revlogs, we linearize the revs; this will both be
630 630 # much quicker and generate a much smaller bundle
631 631 if (revlog._generaldelta and self._reorder is None) or self._reorder:
632 632 dag = dagutil.revlogdag(revlog)
633 633 revs = set(revlog.rev(n) for n in nodelist)
634 634 revs = dag.linearize(revs)
635 635 else:
636 636 revs = sorted([revlog.rev(n) for n in nodelist])
637 637
638 638 # add the parent of the first rev
639 639 p = revlog.parentrevs(revs[0])[0]
640 640 revs.insert(0, p)
641 641
642 642 # build deltas
643 643 total = len(revs) - 1
644 644 msgbundling = _('bundling')
645 645 for r in xrange(len(revs) - 1):
646 646 if units is not None:
647 647 self._progress(msgbundling, r + 1, unit=units, total=total)
648 648 prev, curr = revs[r], revs[r + 1]
649 649 linknode = lookup(revlog.node(curr))
650 650 for c in self.revchunk(revlog, curr, prev, linknode):
651 651 yield c
652 652
653 653 if units is not None:
654 654 self._progress(msgbundling, None)
655 655 yield self.close()
656 656
657 657 # filter any nodes that claim to be part of the known set
658 658 def prune(self, revlog, missing, commonrevs):
659 659 rr, rl = revlog.rev, revlog.linkrev
660 660 return [n for n in missing if rl(rr(n)) not in commonrevs]
661 661
662 def _packmanifests(self, mfnodes, tmfnodes, lookuplinknode):
662 def _packmanifests(self, dir, mfnodes, lookuplinknode):
663 663 """Pack flat manifests into a changegroup stream."""
664 ml = self._repo.manifest
665 size = 0
666 for chunk in self.group(
667 mfnodes, ml, lookuplinknode, units=_('manifests')):
668 size += len(chunk)
664 assert not dir
665 for chunk in self.group(mfnodes, self._repo.manifest,
666 lookuplinknode, units=_('manifests')):
669 667 yield chunk
670 self._verbosenote(_('%8.i (manifests)\n') % size)
671 # It looks odd to assert this here, but tmfnodes doesn't get
672 # filled in until after we've called lookuplinknode for
673 # sending root manifests, so the only way to tell the streams
674 # got crossed is to check after we've done all the work.
675 assert not tmfnodes
668
669 def _manifestsdone(self):
670 return ''
676 671
677 672 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
678 673 '''yield a sequence of changegroup chunks (strings)'''
679 674 repo = self._repo
680 675 cl = repo.changelog
681 676
682 677 clrevorder = {}
683 678 mfs = {} # needed manifests
684 679 fnodes = {} # needed file nodes
685 680 # maps manifest node id -> set(changed files)
686 681 mfchangedfiles = {}
687 682
688 683 # Callback for the changelog, used to collect changed files and manifest
689 684 # nodes.
690 685 # Returns the linkrev node (identity in the changelog case).
691 686 def lookupcl(x):
692 687 c = cl.read(x)
693 688 clrevorder[x] = len(clrevorder)
694 689 n = c[0]
695 690 # record the first changeset introducing this manifest version
696 691 mfs.setdefault(n, x)
697 692 # Record a complete list of potentially-changed files in
698 693 # this manifest.
699 694 mfchangedfiles.setdefault(n, set()).update(c[3])
700 695 return x
701 696
702 697 self._verbosenote(_('uncompressed size of bundle content:\n'))
703 698 size = 0
704 699 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
705 700 size += len(chunk)
706 701 yield chunk
707 702 self._verbosenote(_('%8.i (changelog)\n') % size)
708 703
709 704 # We need to make sure that the linkrev in the changegroup refers to
710 705 # the first changeset that introduced the manifest or file revision.
711 706 # The fastpath is usually safer than the slowpath, because the filelogs
712 707 # are walked in revlog order.
713 708 #
714 709 # When taking the slowpath with reorder=None and the manifest revlog
715 710 # uses generaldelta, the manifest may be walked in the "wrong" order.
716 711 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
717 712 # cc0ff93d0c0c).
718 713 #
719 714 # When taking the fastpath, we are only vulnerable to reordering
720 715 # of the changelog itself. The changelog never uses generaldelta, so
721 716 # it is only reordered when reorder=True. To handle this case, we
722 717 # simply take the slowpath, which already has the 'clrevorder' logic.
723 718 # This was also fixed in cc0ff93d0c0c.
724 719 fastpathlinkrev = fastpathlinkrev and not self._reorder
725 720 # Treemanifests don't work correctly with fastpathlinkrev
726 721 # either, because we don't discover which directory nodes to
727 722 # send along with files. This could probably be fixed.
728 723 fastpathlinkrev = fastpathlinkrev and (
729 724 'treemanifest' not in repo.requirements)
730 725
731 726 for chunk in self.generatemanifests(commonrevs, clrevorder,
732 727 fastpathlinkrev, mfs, mfchangedfiles, fnodes):
733 728 yield chunk
734 729 mfs.clear()
735 730 clrevs = set(cl.rev(x) for x in clnodes)
736 731
737 732 if not fastpathlinkrev:
738 733 def linknodes(unused, fname):
739 734 return fnodes.get(fname, {})
740 735 else:
741 736 cln = cl.node
742 737 def linknodes(filerevlog, fname):
743 738 llr = filerevlog.linkrev
744 739 fln = filerevlog.node
745 740 revs = ((r, llr(r)) for r in filerevlog)
746 741 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
747 742
748 743 changedfiles = set()
749 744 for x in mfchangedfiles.itervalues():
750 745 changedfiles.update(x)
751 746 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
752 747 source):
753 748 yield chunk
754 749
755 750 yield self.close()
756 751
757 752 if clnodes:
758 753 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
759 754
760 755 def generatemanifests(self, commonrevs, clrevorder, fastpathlinkrev, mfs,
761 756 mfchangedfiles, fnodes):
762 757 repo = self._repo
763 758 ml = repo.manifest
764 759 tmfnodes = {}
765 760
766 761 # Callback for the manifest, used to collect linkrevs for filelog
767 762 # revisions.
768 763 # Returns the linkrev node (collected in lookupcl).
769 764 if fastpathlinkrev:
770 765 lookupmflinknode = mfs.__getitem__
771 766 else:
772 767 def lookupmflinknode(x):
773 768 """Callback for looking up the linknode for manifests.
774 769
775 770 Returns the linkrev node for the specified manifest.
776 771
777 772 SIDE EFFECT:
778 773
779 774 1) fclnodes gets populated with the list of relevant
780 775 file nodes if we're not using fastpathlinkrev
781 776 2) When treemanifests are in use, collects treemanifest nodes
782 777 to send
783 778
784 779 Note that this means manifests must be completely sent to
785 780 the client before you can trust the list of files and
786 781 treemanifests to send.
787 782 """
788 783 clnode = mfs[x]
789 784 # We no longer actually care about reading deltas of
790 785 # the manifest here, because we already know the list
791 786 # of changed files, so for treemanifests (which
792 787 # lazily-load anyway to *generate* a readdelta) we can
793 788 # just load them with read() and then we'll actually
794 789 # be able to correctly load node IDs from the
795 790 # submanifest entries.
796 791 if 'treemanifest' in repo.requirements:
797 792 mdata = ml.read(x)
798 793 else:
799 794 mdata = ml.readfast(x)
800 795 for f in mfchangedfiles[x]:
801 796 try:
802 797 n = mdata[f]
803 798 except KeyError:
804 799 continue
805 800 # record the first changeset introducing this filelog
806 801 # version
807 802 fclnodes = fnodes.setdefault(f, {})
808 803 fclnode = fclnodes.setdefault(n, clnode)
809 804 if clrevorder[clnode] < clrevorder[fclnode]:
810 805 fclnodes[n] = clnode
811 806 # gather list of changed treemanifest nodes
812 807 if 'treemanifest' in repo.requirements:
813 808 submfs = {'/': mdata}
814 809 for dn, bn in _moddirs(mfchangedfiles[x]):
815 810 try:
816 811 submf = submfs[dn]
817 812 submf = submf._dirs[bn]
818 813 except KeyError:
819 814 continue # deleted directory, so nothing to send
820 815 submfs[submf.dir()] = submf
821 816 tmfclnodes = tmfnodes.setdefault(submf.dir(), {})
822 817 tmfclnode = tmfclnodes.setdefault(submf._node, clnode)
823 818 if clrevorder[clnode] < clrevorder[tmfclnode]:
824 819 tmfclnodes[n] = clnode
825 820 return clnode
826 821
827 822 mfnodes = self.prune(ml, mfs, commonrevs)
828 for x in self._packmanifests(
829 mfnodes, tmfnodes, lookupmflinknode):
823 size = 0
824 for x in self._packmanifests('', mfnodes, lookupmflinknode):
825 size += len(x)
830 826 yield x
827 self._verbosenote(_('%8.i (manifests)\n') % size)
828 for dir, nodes in tmfnodes.iteritems():
829 for x in self._packmanifests(dir, nodes, nodes.get):
830 yield x
831 yield self._manifestsdone()
831 832
832 833 # The 'source' parameter is useful for extensions
833 834 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
834 835 repo = self._repo
835 836 progress = self._progress
836 837 msgbundling = _('bundling')
837 838
838 839 total = len(changedfiles)
839 840 # for progress output
840 841 msgfiles = _('files')
841 842 for i, fname in enumerate(sorted(changedfiles)):
842 843 filerevlog = repo.file(fname)
843 844 if not filerevlog:
844 845 raise error.Abort(_("empty or missing revlog for %s") % fname)
845 846
846 847 linkrevnodes = linknodes(filerevlog, fname)
847 848 # Lookup for filenodes, we collected the linkrev nodes above in the
848 849 # fastpath case and with lookupmf in the slowpath case.
849 850 def lookupfilelog(x):
850 851 return linkrevnodes[x]
851 852
852 853 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
853 854 if filenodes:
854 855 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
855 856 total=total)
856 857 h = self.fileheader(fname)
857 858 size = len(h)
858 859 yield h
859 860 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
860 861 size += len(chunk)
861 862 yield chunk
862 863 self._verbosenote(_('%8.i %s\n') % (size, fname))
863 864 progress(msgbundling, None)
864 865
865 866 def deltaparent(self, revlog, rev, p1, p2, prev):
866 867 return prev
867 868
868 869 def revchunk(self, revlog, rev, prev, linknode):
869 870 node = revlog.node(rev)
870 871 p1, p2 = revlog.parentrevs(rev)
871 872 base = self.deltaparent(revlog, rev, p1, p2, prev)
872 873
873 874 prefix = ''
874 875 if revlog.iscensored(base) or revlog.iscensored(rev):
875 876 try:
876 877 delta = revlog.revision(node)
877 878 except error.CensoredNodeError as e:
878 879 delta = e.tombstone
879 880 if base == nullrev:
880 881 prefix = mdiff.trivialdiffheader(len(delta))
881 882 else:
882 883 baselen = revlog.rawsize(base)
883 884 prefix = mdiff.replacediffheader(baselen, len(delta))
884 885 elif base == nullrev:
885 886 delta = revlog.revision(node)
886 887 prefix = mdiff.trivialdiffheader(len(delta))
887 888 else:
888 889 delta = revlog.revdiff(base, rev)
889 890 p1n, p2n = revlog.parents(node)
890 891 basenode = revlog.node(base)
891 892 flags = revlog.flags(rev)
892 893 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
893 894 meta += prefix
894 895 l = len(meta) + len(delta)
895 896 yield chunkheader(l)
896 897 yield meta
897 898 yield delta
898 899 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
899 900 # do nothing with basenode, it is implicitly the previous one in HG10
900 901 # do nothing with flags, it is implicitly 0 for cg1 and cg2
901 902 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
902 903
903 904 class cg2packer(cg1packer):
904 905 version = '02'
905 906 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
906 907
907 908 def __init__(self, repo, bundlecaps=None):
908 909 super(cg2packer, self).__init__(repo, bundlecaps)
909 910 if self._reorder is None:
910 911 # Since generaldelta is directly supported by cg2, reordering
911 912 # generally doesn't help, so we disable it by default (treating
912 913 # bundle.reorder=auto just like bundle.reorder=False).
913 914 self._reorder = False
914 915
915 916 def deltaparent(self, revlog, rev, p1, p2, prev):
916 917 dp = revlog.deltaparent(rev)
917 918 # avoid storing full revisions; pick prev in those cases
918 919 # also pick prev when we can't be sure remote has dp
919 920 if dp == nullrev or (dp != p1 and dp != p2 and dp != prev):
920 921 return prev
921 922 return dp
922 923
923 924 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
924 925 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
925 926 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
926 927
927 928 class cg3packer(cg2packer):
928 929 version = '03'
929 930 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
930 931
931 def _packmanifests(self, mfnodes, tmfnodes, lookuplinknode):
932 # Note that debug prints are super confusing in this code, as
933 # tmfnodes gets populated by the calls to lookuplinknode in
934 # the superclass's manifest packer. In the future we should
935 # probably see if we can refactor this somehow to be less
936 # confusing.
937 for x in super(cg3packer, self)._packmanifests(
938 mfnodes, {}, lookuplinknode):
939 yield x
940 dirlog = self._repo.manifest.dirlog
941 for name, nodes in tmfnodes.iteritems():
942 # For now, directory headers are simply file headers with
943 # a trailing '/' on the path (already in the name).
944 yield self.fileheader(name)
945 for chunk in self.group(nodes, dirlog(name), nodes.get):
932 def _packmanifests(self, dir, mfnodes, lookuplinknode):
933 if dir:
934 yield self.fileheader(dir)
935 for chunk in self.group(mfnodes, self._repo.manifest.dirlog(dir),
936 lookuplinknode, units=_('manifests')):
946 937 yield chunk
947 yield self.close()
938
939 def _manifestsdone(self):
940 return self.close()
948 941
949 942 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
950 943 return struct.pack(
951 944 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
952 945
953 946 _packermap = {'01': (cg1packer, cg1unpacker),
954 947 # cg2 adds support for exchanging generaldelta
955 948 '02': (cg2packer, cg2unpacker),
956 949 # cg3 adds support for exchanging revlog flags and treemanifests
957 950 '03': (cg3packer, cg3unpacker),
958 951 }
959 952
960 953 def allsupportedversions(ui):
961 954 versions = set(_packermap.keys())
962 955 versions.discard('03')
963 956 if (ui.configbool('experimental', 'changegroup3') or
964 957 ui.configbool('experimental', 'treemanifest')):
965 958 versions.add('03')
966 959 return versions
967 960
968 961 # Changegroup versions that can be applied to the repo
969 962 def supportedincomingversions(repo):
970 963 versions = allsupportedversions(repo.ui)
971 964 if 'treemanifest' in repo.requirements:
972 965 versions.add('03')
973 966 return versions
974 967
975 968 # Changegroup versions that can be created from the repo
976 969 def supportedoutgoingversions(repo):
977 970 versions = allsupportedversions(repo.ui)
978 971 if 'treemanifest' in repo.requirements:
979 972 # Versions 01 and 02 support only flat manifests and it's just too
980 973 # expensive to convert between the flat manifest and tree manifest on
981 974 # the fly. Since tree manifests are hashed differently, all of history
982 975 # would have to be converted. Instead, we simply don't even pretend to
983 976 # support versions 01 and 02.
984 977 versions.discard('01')
985 978 versions.discard('02')
986 979 versions.add('03')
987 980 return versions
988 981
989 982 def safeversion(repo):
990 983 # Finds the smallest version that it's safe to assume clients of the repo
991 984 # will support. For example, all hg versions that support generaldelta also
992 985 # support changegroup 02.
993 986 versions = supportedoutgoingversions(repo)
994 987 if 'generaldelta' in repo.requirements:
995 988 versions.discard('01')
996 989 assert versions
997 990 return min(versions)
998 991
999 992 def getbundler(version, repo, bundlecaps=None):
1000 993 assert version in supportedoutgoingversions(repo)
1001 994 return _packermap[version][0](repo, bundlecaps)
1002 995
1003 996 def getunbundler(version, fh, alg):
1004 997 return _packermap[version][1](fh, alg)
1005 998
1006 999 def _changegroupinfo(repo, nodes, source):
1007 1000 if repo.ui.verbose or source == 'bundle':
1008 1001 repo.ui.status(_("%d changesets found\n") % len(nodes))
1009 1002 if repo.ui.debugflag:
1010 1003 repo.ui.debug("list of changesets:\n")
1011 1004 for node in nodes:
1012 1005 repo.ui.debug("%s\n" % hex(node))
1013 1006
1014 1007 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
1015 1008 repo = repo.unfiltered()
1016 1009 commonrevs = outgoing.common
1017 1010 csets = outgoing.missing
1018 1011 heads = outgoing.missingheads
1019 1012 # We go through the fast path if we get told to, or if all (unfiltered
1020 1013 # heads have been requested (since we then know there all linkrevs will
1021 1014 # be pulled by the client).
1022 1015 heads.sort()
1023 1016 fastpathlinkrev = fastpath or (
1024 1017 repo.filtername is None and heads == sorted(repo.heads()))
1025 1018
1026 1019 repo.hook('preoutgoing', throw=True, source=source)
1027 1020 _changegroupinfo(repo, csets, source)
1028 1021 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
1029 1022
1030 1023 def getsubset(repo, outgoing, bundler, source, fastpath=False):
1031 1024 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
1032 1025 return getunbundler(bundler.version, util.chunkbuffer(gengroup), None)
1033 1026
1034 1027 def changegroupsubset(repo, roots, heads, source, version='01'):
1035 1028 """Compute a changegroup consisting of all the nodes that are
1036 1029 descendants of any of the roots and ancestors of any of the heads.
1037 1030 Return a chunkbuffer object whose read() method will return
1038 1031 successive changegroup chunks.
1039 1032
1040 1033 It is fairly complex as determining which filenodes and which
1041 1034 manifest nodes need to be included for the changeset to be complete
1042 1035 is non-trivial.
1043 1036
1044 1037 Another wrinkle is doing the reverse, figuring out which changeset in
1045 1038 the changegroup a particular filenode or manifestnode belongs to.
1046 1039 """
1047 1040 cl = repo.changelog
1048 1041 if not roots:
1049 1042 roots = [nullid]
1050 1043 discbases = []
1051 1044 for n in roots:
1052 1045 discbases.extend([p for p in cl.parents(n) if p != nullid])
1053 1046 # TODO: remove call to nodesbetween.
1054 1047 csets, roots, heads = cl.nodesbetween(roots, heads)
1055 1048 included = set(csets)
1056 1049 discbases = [n for n in discbases if n not in included]
1057 1050 outgoing = discovery.outgoing(cl, discbases, heads)
1058 1051 bundler = getbundler(version, repo)
1059 1052 return getsubset(repo, outgoing, bundler, source)
1060 1053
1061 1054 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
1062 1055 version='01'):
1063 1056 """Like getbundle, but taking a discovery.outgoing as an argument.
1064 1057
1065 1058 This is only implemented for local repos and reuses potentially
1066 1059 precomputed sets in outgoing. Returns a raw changegroup generator."""
1067 1060 if not outgoing.missing:
1068 1061 return None
1069 1062 bundler = getbundler(version, repo, bundlecaps)
1070 1063 return getsubsetraw(repo, outgoing, bundler, source)
1071 1064
1072 1065 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None,
1073 1066 version='01'):
1074 1067 """Like getbundle, but taking a discovery.outgoing as an argument.
1075 1068
1076 1069 This is only implemented for local repos and reuses potentially
1077 1070 precomputed sets in outgoing."""
1078 1071 if not outgoing.missing:
1079 1072 return None
1080 1073 bundler = getbundler(version, repo, bundlecaps)
1081 1074 return getsubset(repo, outgoing, bundler, source)
1082 1075
1083 1076 def computeoutgoing(repo, heads, common):
1084 1077 """Computes which revs are outgoing given a set of common
1085 1078 and a set of heads.
1086 1079
1087 1080 This is a separate function so extensions can have access to
1088 1081 the logic.
1089 1082
1090 1083 Returns a discovery.outgoing object.
1091 1084 """
1092 1085 cl = repo.changelog
1093 1086 if common:
1094 1087 hasnode = cl.hasnode
1095 1088 common = [n for n in common if hasnode(n)]
1096 1089 else:
1097 1090 common = [nullid]
1098 1091 if not heads:
1099 1092 heads = cl.heads()
1100 1093 return discovery.outgoing(cl, common, heads)
1101 1094
1102 1095 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None,
1103 1096 version='01'):
1104 1097 """Like changegroupsubset, but returns the set difference between the
1105 1098 ancestors of heads and the ancestors common.
1106 1099
1107 1100 If heads is None, use the local heads. If common is None, use [nullid].
1108 1101
1109 1102 The nodes in common might not all be known locally due to the way the
1110 1103 current discovery protocol works.
1111 1104 """
1112 1105 outgoing = computeoutgoing(repo, heads, common)
1113 1106 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps,
1114 1107 version=version)
1115 1108
1116 1109 def changegroup(repo, basenodes, source):
1117 1110 # to avoid a race we use changegroupsubset() (issue1320)
1118 1111 return changegroupsubset(repo, basenodes, repo.heads(), source)
1119 1112
1120 1113 def _addchangegroupfiles(repo, source, revmap, trp, pr, needfiles):
1121 1114 revisions = 0
1122 1115 files = 0
1123 1116 while True:
1124 1117 chunkdata = source.filelogheader()
1125 1118 if not chunkdata:
1126 1119 break
1127 1120 f = chunkdata["filename"]
1128 1121 repo.ui.debug("adding %s revisions\n" % f)
1129 1122 pr()
1130 1123 fl = repo.file(f)
1131 1124 o = len(fl)
1132 1125 try:
1133 1126 if not fl.addgroup(source, revmap, trp):
1134 1127 raise error.Abort(_("received file revlog group is empty"))
1135 1128 except error.CensoredBaseError as e:
1136 1129 raise error.Abort(_("received delta base is censored: %s") % e)
1137 1130 revisions += len(fl) - o
1138 1131 files += 1
1139 1132 if f in needfiles:
1140 1133 needs = needfiles[f]
1141 1134 for new in xrange(o, len(fl)):
1142 1135 n = fl.node(new)
1143 1136 if n in needs:
1144 1137 needs.remove(n)
1145 1138 else:
1146 1139 raise error.Abort(
1147 1140 _("received spurious file revlog entry"))
1148 1141 if not needs:
1149 1142 del needfiles[f]
1150 1143 repo.ui.progress(_('files'), None)
1151 1144
1152 1145 for f, needs in needfiles.iteritems():
1153 1146 fl = repo.file(f)
1154 1147 for n in needs:
1155 1148 try:
1156 1149 fl.rev(n)
1157 1150 except error.LookupError:
1158 1151 raise error.Abort(
1159 1152 _('missing file data for %s:%s - run hg verify') %
1160 1153 (f, hex(n)))
1161 1154
1162 1155 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now