##// END OF EJS Templates
changegroup: drop 'if True' that made the previous change clearer
Augie Fackler -
r27238:c3dc0310 default
parent child Browse files
Show More
@@ -1,972 +1,971 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullid,
19 19 nullrev,
20 20 short,
21 21 )
22 22
23 23 from . import (
24 24 branchmap,
25 25 dagutil,
26 26 discovery,
27 27 error,
28 28 mdiff,
29 29 phases,
30 30 util,
31 31 )
32 32
33 33 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
34 34 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
35 35
36 36 def readexactly(stream, n):
37 37 '''read n bytes from stream.read and abort if less was available'''
38 38 s = stream.read(n)
39 39 if len(s) < n:
40 40 raise error.Abort(_("stream ended unexpectedly"
41 41 " (got %d bytes, expected %d)")
42 42 % (len(s), n))
43 43 return s
44 44
45 45 def getchunk(stream):
46 46 """return the next chunk from stream as a string"""
47 47 d = readexactly(stream, 4)
48 48 l = struct.unpack(">l", d)[0]
49 49 if l <= 4:
50 50 if l:
51 51 raise error.Abort(_("invalid chunk length %d") % l)
52 52 return ""
53 53 return readexactly(stream, l - 4)
54 54
55 55 def chunkheader(length):
56 56 """return a changegroup chunk header (string)"""
57 57 return struct.pack(">l", length + 4)
58 58
59 59 def closechunk():
60 60 """return a changegroup chunk header (string) for a zero-length chunk"""
61 61 return struct.pack(">l", 0)
62 62
63 63 def combineresults(results):
64 64 """logic to combine 0 or more addchangegroup results into one"""
65 65 changedheads = 0
66 66 result = 1
67 67 for ret in results:
68 68 # If any changegroup result is 0, return 0
69 69 if ret == 0:
70 70 result = 0
71 71 break
72 72 if ret < -1:
73 73 changedheads += ret + 1
74 74 elif ret > 1:
75 75 changedheads += ret - 1
76 76 if changedheads > 0:
77 77 result = 1 + changedheads
78 78 elif changedheads < 0:
79 79 result = -1 + changedheads
80 80 return result
81 81
82 82 bundletypes = {
83 83 "": ("", None), # only when using unbundle on ssh and old http servers
84 84 # since the unification ssh accepts a header but there
85 85 # is no capability signaling it.
86 86 "HG20": (), # special-cased below
87 87 "HG10UN": ("HG10UN", None),
88 88 "HG10BZ": ("HG10", 'BZ'),
89 89 "HG10GZ": ("HG10GZ", 'GZ'),
90 90 }
91 91
92 92 # hgweb uses this list to communicate its preferred type
93 93 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
94 94
95 95 def writechunks(ui, chunks, filename, vfs=None):
96 96 """Write chunks to a file and return its filename.
97 97
98 98 The stream is assumed to be a bundle file.
99 99 Existing files will not be overwritten.
100 100 If no filename is specified, a temporary file is created.
101 101 """
102 102 fh = None
103 103 cleanup = None
104 104 try:
105 105 if filename:
106 106 if vfs:
107 107 fh = vfs.open(filename, "wb")
108 108 else:
109 109 fh = open(filename, "wb")
110 110 else:
111 111 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
112 112 fh = os.fdopen(fd, "wb")
113 113 cleanup = filename
114 114 for c in chunks:
115 115 fh.write(c)
116 116 cleanup = None
117 117 return filename
118 118 finally:
119 119 if fh is not None:
120 120 fh.close()
121 121 if cleanup is not None:
122 122 if filename and vfs:
123 123 vfs.unlink(cleanup)
124 124 else:
125 125 os.unlink(cleanup)
126 126
127 127 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
128 128 """Write a bundle file and return its filename.
129 129
130 130 Existing files will not be overwritten.
131 131 If no filename is specified, a temporary file is created.
132 132 bz2 compression can be turned off.
133 133 The bundle file will be deleted in case of errors.
134 134 """
135 135
136 136 if bundletype == "HG20":
137 137 from . import bundle2
138 138 bundle = bundle2.bundle20(ui)
139 139 bundle.setcompression(compression)
140 140 part = bundle.newpart('changegroup', data=cg.getchunks())
141 141 part.addparam('version', cg.version)
142 142 chunkiter = bundle.getchunks()
143 143 else:
144 144 # compression argument is only for the bundle2 case
145 145 assert compression is None
146 146 if cg.version != '01':
147 147 raise error.Abort(_('old bundle types only supports v1 '
148 148 'changegroups'))
149 149 header, comp = bundletypes[bundletype]
150 150 if comp not in util.compressors:
151 151 raise error.Abort(_('unknown stream compression type: %s')
152 152 % comp)
153 153 z = util.compressors[comp]()
154 154 subchunkiter = cg.getchunks()
155 155 def chunkiter():
156 156 yield header
157 157 for chunk in subchunkiter:
158 158 yield z.compress(chunk)
159 159 yield z.flush()
160 160 chunkiter = chunkiter()
161 161
162 162 # parse the changegroup data, otherwise we will block
163 163 # in case of sshrepo because we don't know the end of the stream
164 164
165 165 # an empty chunkgroup is the end of the changegroup
166 166 # a changegroup has at least 2 chunkgroups (changelog and manifest).
167 167 # after that, an empty chunkgroup is the end of the changegroup
168 168 return writechunks(ui, chunkiter, filename, vfs=vfs)
169 169
170 170 class cg1unpacker(object):
171 171 """Unpacker for cg1 changegroup streams.
172 172
173 173 A changegroup unpacker handles the framing of the revision data in
174 174 the wire format. Most consumers will want to use the apply()
175 175 method to add the changes from the changegroup to a repository.
176 176
177 177 If you're forwarding a changegroup unmodified to another consumer,
178 178 use getchunks(), which returns an iterator of changegroup
179 179 chunks. This is mostly useful for cases where you need to know the
180 180 data stream has ended by observing the end of the changegroup.
181 181
182 182 deltachunk() is useful only if you're applying delta data. Most
183 183 consumers should prefer apply() instead.
184 184
185 185 A few other public methods exist. Those are used only for
186 186 bundlerepo and some debug commands - their use is discouraged.
187 187 """
188 188 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
189 189 deltaheadersize = struct.calcsize(deltaheader)
190 190 version = '01'
191 191 def __init__(self, fh, alg):
192 192 if alg == 'UN':
193 193 alg = None # get more modern without breaking too much
194 194 if not alg in util.decompressors:
195 195 raise error.Abort(_('unknown stream compression type: %s')
196 196 % alg)
197 197 if alg == 'BZ':
198 198 alg = '_truncatedBZ'
199 199 self._stream = util.decompressors[alg](fh)
200 200 self._type = alg
201 201 self.callback = None
202 202
203 203 # These methods (compressed, read, seek, tell) all appear to only
204 204 # be used by bundlerepo, but it's a little hard to tell.
205 205 def compressed(self):
206 206 return self._type is not None
207 207 def read(self, l):
208 208 return self._stream.read(l)
209 209 def seek(self, pos):
210 210 return self._stream.seek(pos)
211 211 def tell(self):
212 212 return self._stream.tell()
213 213 def close(self):
214 214 return self._stream.close()
215 215
216 216 def _chunklength(self):
217 217 d = readexactly(self._stream, 4)
218 218 l = struct.unpack(">l", d)[0]
219 219 if l <= 4:
220 220 if l:
221 221 raise error.Abort(_("invalid chunk length %d") % l)
222 222 return 0
223 223 if self.callback:
224 224 self.callback()
225 225 return l - 4
226 226
227 227 def changelogheader(self):
228 228 """v10 does not have a changelog header chunk"""
229 229 return {}
230 230
231 231 def manifestheader(self):
232 232 """v10 does not have a manifest header chunk"""
233 233 return {}
234 234
235 235 def filelogheader(self):
236 236 """return the header of the filelogs chunk, v10 only has the filename"""
237 237 l = self._chunklength()
238 238 if not l:
239 239 return {}
240 240 fname = readexactly(self._stream, l)
241 241 return {'filename': fname}
242 242
243 243 def _deltaheader(self, headertuple, prevnode):
244 244 node, p1, p2, cs = headertuple
245 245 if prevnode is None:
246 246 deltabase = p1
247 247 else:
248 248 deltabase = prevnode
249 249 return node, p1, p2, deltabase, cs
250 250
251 251 def deltachunk(self, prevnode):
252 252 l = self._chunklength()
253 253 if not l:
254 254 return {}
255 255 headerdata = readexactly(self._stream, self.deltaheadersize)
256 256 header = struct.unpack(self.deltaheader, headerdata)
257 257 delta = readexactly(self._stream, l - self.deltaheadersize)
258 258 node, p1, p2, deltabase, cs = self._deltaheader(header, prevnode)
259 259 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
260 260 'deltabase': deltabase, 'delta': delta}
261 261
262 262 def getchunks(self):
263 263 """returns all the chunks contains in the bundle
264 264
265 265 Used when you need to forward the binary stream to a file or another
266 266 network API. To do so, it parse the changegroup data, otherwise it will
267 267 block in case of sshrepo because it don't know the end of the stream.
268 268 """
269 269 # an empty chunkgroup is the end of the changegroup
270 270 # a changegroup has at least 2 chunkgroups (changelog and manifest).
271 271 # after that, an empty chunkgroup is the end of the changegroup
272 272 empty = False
273 273 count = 0
274 274 while not empty or count <= 2:
275 275 empty = True
276 276 count += 1
277 277 while True:
278 278 chunk = getchunk(self)
279 279 if not chunk:
280 280 break
281 281 empty = False
282 282 yield chunkheader(len(chunk))
283 283 pos = 0
284 284 while pos < len(chunk):
285 285 next = pos + 2**20
286 286 yield chunk[pos:next]
287 287 pos = next
288 288 yield closechunk()
289 289
290 290 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
291 291 # We know that we'll never have more manifests than we had
292 292 # changesets.
293 293 self.callback = prog(_('manifests'), numchanges)
294 294 # no need to check for empty manifest group here:
295 295 # if the result of the merge of 1 and 2 is the same in 3 and 4,
296 296 # no new manifest will be created and the manifest group will
297 297 # be empty during the pull
298 298 self.manifestheader()
299 299 repo.manifest.addgroup(self, revmap, trp)
300 300 repo.ui.progress(_('manifests'), None)
301 301
302 302 def apply(self, repo, srctype, url, emptyok=False,
303 303 targetphase=phases.draft, expectedtotal=None):
304 304 """Add the changegroup returned by source.read() to this repo.
305 305 srctype is a string like 'push', 'pull', or 'unbundle'. url is
306 306 the URL of the repo where this changegroup is coming from.
307 307
308 308 Return an integer summarizing the change to this repo:
309 309 - nothing changed or no source: 0
310 310 - more heads than before: 1+added heads (2..n)
311 311 - fewer heads than before: -1-removed heads (-2..-n)
312 312 - number of heads stays the same: 1
313 313 """
314 314 repo = repo.unfiltered()
315 315 wasempty = (len(repo.changelog) == 0)
316 316 def csmap(x):
317 317 repo.ui.debug("add changeset %s\n" % short(x))
318 318 return len(cl)
319 319
320 320 def revmap(x):
321 321 return cl.rev(x)
322 322
323 323 changesets = files = revisions = 0
324 324
325 325 tr = repo.transaction("\n".join([srctype, util.hidepassword(url)]))
326 326 try:
327 327 # The transaction could have been created before and already
328 328 # carries source information. In this case we use the top
329 329 # level data. We overwrite the argument because we need to use
330 330 # the top level value (if they exist) in this function.
331 331 srctype = tr.hookargs.setdefault('source', srctype)
332 332 url = tr.hookargs.setdefault('url', url)
333 333 repo.hook('prechangegroup', throw=True, **tr.hookargs)
334 334
335 335 # write changelog data to temp files so concurrent readers
336 336 # will not see an inconsistent view
337 337 cl = repo.changelog
338 338 cl.delayupdate(tr)
339 339 oldheads = cl.heads()
340 340
341 341 trp = weakref.proxy(tr)
342 342 # pull off the changeset group
343 343 repo.ui.status(_("adding changesets\n"))
344 344 clstart = len(cl)
345 345 class prog(object):
346 346 def __init__(self, step, total):
347 347 self._step = step
348 348 self._total = total
349 349 self._count = 1
350 350 def __call__(self):
351 351 repo.ui.progress(self._step, self._count, unit=_('chunks'),
352 352 total=self._total)
353 353 self._count += 1
354 354 self.callback = prog(_('changesets'), expectedtotal)
355 355
356 356 efiles = set()
357 357 def onchangelog(cl, node):
358 358 efiles.update(cl.read(node)[3])
359 359
360 360 self.changelogheader()
361 361 srccontent = cl.addgroup(self, csmap, trp,
362 362 addrevisioncb=onchangelog)
363 363 efiles = len(efiles)
364 364
365 365 if not (srccontent or emptyok):
366 366 raise error.Abort(_("received changelog group is empty"))
367 367 clend = len(cl)
368 368 changesets = clend - clstart
369 369 repo.ui.progress(_('changesets'), None)
370 370
371 371 # pull off the manifest group
372 372 repo.ui.status(_("adding manifests\n"))
373 373 self._unpackmanifests(repo, revmap, trp, prog, changesets)
374 374
375 375 needfiles = {}
376 376 if repo.ui.configbool('server', 'validate', default=False):
377 377 # validate incoming csets have their manifests
378 378 for cset in xrange(clstart, clend):
379 379 mfnode = repo.changelog.read(repo.changelog.node(cset))[0]
380 380 mfest = repo.manifest.readdelta(mfnode)
381 381 # store file nodes we must see
382 382 for f, n in mfest.iteritems():
383 383 needfiles.setdefault(f, set()).add(n)
384 384
385 385 # process the files
386 386 repo.ui.status(_("adding file changes\n"))
387 387 self.callback = None
388 388 pr = prog(_('files'), efiles)
389 389 newrevs, newfiles = _addchangegroupfiles(
390 390 repo, self, revmap, trp, pr, needfiles, wasempty)
391 391 revisions += newrevs
392 392 files += newfiles
393 393
394 394 dh = 0
395 395 if oldheads:
396 396 heads = cl.heads()
397 397 dh = len(heads) - len(oldheads)
398 398 for h in heads:
399 399 if h not in oldheads and repo[h].closesbranch():
400 400 dh -= 1
401 401 htext = ""
402 402 if dh:
403 403 htext = _(" (%+d heads)") % dh
404 404
405 405 repo.ui.status(_("added %d changesets"
406 406 " with %d changes to %d files%s\n")
407 407 % (changesets, revisions, files, htext))
408 408 repo.invalidatevolatilesets()
409 409
410 410 if changesets > 0:
411 411 if 'node' not in tr.hookargs:
412 412 tr.hookargs['node'] = hex(cl.node(clstart))
413 413 hookargs = dict(tr.hookargs)
414 414 else:
415 415 hookargs = dict(tr.hookargs)
416 416 hookargs['node'] = hex(cl.node(clstart))
417 417 repo.hook('pretxnchangegroup', throw=True, **hookargs)
418 418
419 419 added = [cl.node(r) for r in xrange(clstart, clend)]
420 420 publishing = repo.publishing()
421 421 if srctype in ('push', 'serve'):
422 422 # Old servers can not push the boundary themselves.
423 423 # New servers won't push the boundary if changeset already
424 424 # exists locally as secret
425 425 #
426 426 # We should not use added here but the list of all change in
427 427 # the bundle
428 428 if publishing:
429 429 phases.advanceboundary(repo, tr, phases.public, srccontent)
430 430 else:
431 431 # Those changesets have been pushed from the outside, their
432 432 # phases are going to be pushed alongside. Therefor
433 433 # `targetphase` is ignored.
434 434 phases.advanceboundary(repo, tr, phases.draft, srccontent)
435 435 phases.retractboundary(repo, tr, phases.draft, added)
436 436 elif srctype != 'strip':
437 437 # publishing only alter behavior during push
438 438 #
439 439 # strip should not touch boundary at all
440 440 phases.retractboundary(repo, tr, targetphase, added)
441 441
442 442 if changesets > 0:
443 443 if srctype != 'strip':
444 444 # During strip, branchcache is invalid but coming call to
445 445 # `destroyed` will repair it.
446 446 # In other case we can safely update cache on disk.
447 447 branchmap.updatecache(repo.filtered('served'))
448 448
449 449 def runhooks():
450 450 # These hooks run when the lock releases, not when the
451 451 # transaction closes. So it's possible for the changelog
452 452 # to have changed since we last saw it.
453 453 if clstart >= len(repo):
454 454 return
455 455
456 456 # forcefully update the on-disk branch cache
457 457 repo.ui.debug("updating the branch cache\n")
458 458 repo.hook("changegroup", **hookargs)
459 459
460 460 for n in added:
461 461 args = hookargs.copy()
462 462 args['node'] = hex(n)
463 463 repo.hook("incoming", **args)
464 464
465 465 newheads = [h for h in repo.heads() if h not in oldheads]
466 466 repo.ui.log("incoming",
467 467 "%s incoming changes - new heads: %s\n",
468 468 len(added),
469 469 ', '.join([hex(c[:6]) for c in newheads]))
470 470
471 471 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
472 472 lambda tr: repo._afterlock(runhooks))
473 473
474 474 tr.close()
475 475
476 476 finally:
477 477 tr.release()
478 478 repo.ui.flush()
479 479 # never return 0 here:
480 480 if dh < 0:
481 481 return dh - 1
482 482 else:
483 483 return dh + 1
484 484
485 485 class cg2unpacker(cg1unpacker):
486 486 """Unpacker for cg2 streams.
487 487
488 488 cg2 streams add support for generaldelta, so the delta header
489 489 format is slightly different. All other features about the data
490 490 remain the same.
491 491 """
492 492 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
493 493 deltaheadersize = struct.calcsize(deltaheader)
494 494 version = '02'
495 495
496 496 def _deltaheader(self, headertuple, prevnode):
497 497 node, p1, p2, deltabase, cs = headertuple
498 498 return node, p1, p2, deltabase, cs
499 499
500 500 class headerlessfixup(object):
501 501 def __init__(self, fh, h):
502 502 self._h = h
503 503 self._fh = fh
504 504 def read(self, n):
505 505 if self._h:
506 506 d, self._h = self._h[:n], self._h[n:]
507 507 if len(d) < n:
508 508 d += readexactly(self._fh, n - len(d))
509 509 return d
510 510 return readexactly(self._fh, n)
511 511
512 512 class cg1packer(object):
513 513 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
514 514 version = '01'
515 515 def __init__(self, repo, bundlecaps=None):
516 516 """Given a source repo, construct a bundler.
517 517
518 518 bundlecaps is optional and can be used to specify the set of
519 519 capabilities which can be used to build the bundle.
520 520 """
521 521 # Set of capabilities we can use to build the bundle.
522 522 if bundlecaps is None:
523 523 bundlecaps = set()
524 524 self._bundlecaps = bundlecaps
525 525 # experimental config: bundle.reorder
526 526 reorder = repo.ui.config('bundle', 'reorder', 'auto')
527 527 if reorder == 'auto':
528 528 reorder = None
529 529 else:
530 530 reorder = util.parsebool(reorder)
531 531 self._repo = repo
532 532 self._reorder = reorder
533 533 self._progress = repo.ui.progress
534 534 if self._repo.ui.verbose and not self._repo.ui.debugflag:
535 535 self._verbosenote = self._repo.ui.note
536 536 else:
537 537 self._verbosenote = lambda s: None
538 538
539 539 def close(self):
540 540 return closechunk()
541 541
542 542 def fileheader(self, fname):
543 543 return chunkheader(len(fname)) + fname
544 544
545 545 def group(self, nodelist, revlog, lookup, units=None):
546 546 """Calculate a delta group, yielding a sequence of changegroup chunks
547 547 (strings).
548 548
549 549 Given a list of changeset revs, return a set of deltas and
550 550 metadata corresponding to nodes. The first delta is
551 551 first parent(nodelist[0]) -> nodelist[0], the receiver is
552 552 guaranteed to have this parent as it has all history before
553 553 these changesets. In the case firstparent is nullrev the
554 554 changegroup starts with a full revision.
555 555
556 556 If units is not None, progress detail will be generated, units specifies
557 557 the type of revlog that is touched (changelog, manifest, etc.).
558 558 """
559 559 # if we don't have any revisions touched by these changesets, bail
560 560 if len(nodelist) == 0:
561 561 yield self.close()
562 562 return
563 563
564 564 # for generaldelta revlogs, we linearize the revs; this will both be
565 565 # much quicker and generate a much smaller bundle
566 566 if (revlog._generaldelta and self._reorder is None) or self._reorder:
567 567 dag = dagutil.revlogdag(revlog)
568 568 revs = set(revlog.rev(n) for n in nodelist)
569 569 revs = dag.linearize(revs)
570 570 else:
571 571 revs = sorted([revlog.rev(n) for n in nodelist])
572 572
573 573 # add the parent of the first rev
574 574 p = revlog.parentrevs(revs[0])[0]
575 575 revs.insert(0, p)
576 576
577 577 # build deltas
578 578 total = len(revs) - 1
579 579 msgbundling = _('bundling')
580 580 for r in xrange(len(revs) - 1):
581 581 if units is not None:
582 582 self._progress(msgbundling, r + 1, unit=units, total=total)
583 583 prev, curr = revs[r], revs[r + 1]
584 584 linknode = lookup(revlog.node(curr))
585 585 for c in self.revchunk(revlog, curr, prev, linknode):
586 586 yield c
587 587
588 588 if units is not None:
589 589 self._progress(msgbundling, None)
590 590 yield self.close()
591 591
592 592 # filter any nodes that claim to be part of the known set
593 593 def prune(self, revlog, missing, commonrevs):
594 594 rr, rl = revlog.rev, revlog.linkrev
595 595 return [n for n in missing if rl(rr(n)) not in commonrevs]
596 596
597 597 def _packmanifests(self, mfnodes, lookuplinknode):
598 598 """Pack flat manifests into a changegroup stream."""
599 599 ml = self._repo.manifest
600 600 size = 0
601 601 for chunk in self.group(
602 602 mfnodes, ml, lookuplinknode, units=_('manifests')):
603 603 size += len(chunk)
604 604 yield chunk
605 605 self._verbosenote(_('%8.i (manifests)\n') % size)
606 606
607 607 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
608 608 '''yield a sequence of changegroup chunks (strings)'''
609 609 repo = self._repo
610 610 cl = repo.changelog
611 611 ml = repo.manifest
612 612
613 613 clrevorder = {}
614 614 mfs = {} # needed manifests
615 615 fnodes = {} # needed file nodes
616 616 # maps manifest node id -> set(changed files)
617 617 mfchangedfiles = {}
618 618
619 619 # Callback for the changelog, used to collect changed files and manifest
620 620 # nodes.
621 621 # Returns the linkrev node (identity in the changelog case).
622 622 def lookupcl(x):
623 623 c = cl.read(x)
624 624 clrevorder[x] = len(clrevorder)
625 625 n = c[0]
626 626 # record the first changeset introducing this manifest version
627 627 mfs.setdefault(n, x)
628 628 # Record a complete list of potentially-changed files in
629 629 # this manifest.
630 630 mfchangedfiles.setdefault(n, set()).update(c[3])
631 631 return x
632 632
633 633 self._verbosenote(_('uncompressed size of bundle content:\n'))
634 634 size = 0
635 635 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
636 636 size += len(chunk)
637 637 yield chunk
638 638 self._verbosenote(_('%8.i (changelog)\n') % size)
639 639
640 640 # We need to make sure that the linkrev in the changegroup refers to
641 641 # the first changeset that introduced the manifest or file revision.
642 642 # The fastpath is usually safer than the slowpath, because the filelogs
643 643 # are walked in revlog order.
644 644 #
645 645 # When taking the slowpath with reorder=None and the manifest revlog
646 646 # uses generaldelta, the manifest may be walked in the "wrong" order.
647 647 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
648 648 # cc0ff93d0c0c).
649 649 #
650 650 # When taking the fastpath, we are only vulnerable to reordering
651 651 # of the changelog itself. The changelog never uses generaldelta, so
652 652 # it is only reordered when reorder=True. To handle this case, we
653 653 # simply take the slowpath, which already has the 'clrevorder' logic.
654 654 # This was also fixed in cc0ff93d0c0c.
655 655 fastpathlinkrev = fastpathlinkrev and not self._reorder
656 656 # Callback for the manifest, used to collect linkrevs for filelog
657 657 # revisions.
658 658 # Returns the linkrev node (collected in lookupcl).
659 659 def lookupmflinknode(x):
660 660 """Callback for looking up the linknode for manifests.
661 661
662 662 Returns the linkrev node for the specified manifest.
663 663
664 664 SIDE EFFECT:
665 665
666 666 fclnodes gets populated with the list of relevant
667 667 file nodes if we're not using fastpathlinkrev.
668 668
669 669 Note that this means you can't trust fclnodes until
670 670 after manifests have been sent to the client.
671 671 """
672 672 clnode = mfs[x]
673 673 if not fastpathlinkrev:
674 674 mdata = ml.readfast(x)
675 675 for f in mfchangedfiles[x]:
676 if True:
677 try:
678 n = mdata[f]
679 except KeyError:
680 continue
681 # record the first changeset introducing this filelog
682 # version
683 fclnodes = fnodes.setdefault(f, {})
684 fclnode = fclnodes.setdefault(n, clnode)
685 if clrevorder[clnode] < clrevorder[fclnode]:
686 fclnodes[n] = clnode
676 try:
677 n = mdata[f]
678 except KeyError:
679 continue
680 # record the first changeset introducing this filelog
681 # version
682 fclnodes = fnodes.setdefault(f, {})
683 fclnode = fclnodes.setdefault(n, clnode)
684 if clrevorder[clnode] < clrevorder[fclnode]:
685 fclnodes[n] = clnode
687 686 return clnode
688 687
689 688 mfnodes = self.prune(ml, mfs, commonrevs)
690 689 for x in self._packmanifests(mfnodes, lookupmflinknode):
691 690 yield x
692 691
693 692 mfs.clear()
694 693 clrevs = set(cl.rev(x) for x in clnodes)
695 694
696 695 def linknodes(filerevlog, fname):
697 696 if fastpathlinkrev:
698 697 llr = filerevlog.linkrev
699 698 def genfilenodes():
700 699 for r in filerevlog:
701 700 linkrev = llr(r)
702 701 if linkrev in clrevs:
703 702 yield filerevlog.node(r), cl.node(linkrev)
704 703 return dict(genfilenodes())
705 704 return fnodes.get(fname, {})
706 705
707 706 changedfiles = set()
708 707 for x in mfchangedfiles.itervalues():
709 708 changedfiles.update(x)
710 709 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
711 710 source):
712 711 yield chunk
713 712
714 713 yield self.close()
715 714
716 715 if clnodes:
717 716 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
718 717
719 718 # The 'source' parameter is useful for extensions
720 719 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
721 720 repo = self._repo
722 721 progress = self._progress
723 722 msgbundling = _('bundling')
724 723
725 724 total = len(changedfiles)
726 725 # for progress output
727 726 msgfiles = _('files')
728 727 for i, fname in enumerate(sorted(changedfiles)):
729 728 filerevlog = repo.file(fname)
730 729 if not filerevlog:
731 730 raise error.Abort(_("empty or missing revlog for %s") % fname)
732 731
733 732 linkrevnodes = linknodes(filerevlog, fname)
734 733 # Lookup for filenodes, we collected the linkrev nodes above in the
735 734 # fastpath case and with lookupmf in the slowpath case.
736 735 def lookupfilelog(x):
737 736 return linkrevnodes[x]
738 737
739 738 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
740 739 if filenodes:
741 740 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
742 741 total=total)
743 742 h = self.fileheader(fname)
744 743 size = len(h)
745 744 yield h
746 745 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
747 746 size += len(chunk)
748 747 yield chunk
749 748 self._verbosenote(_('%8.i %s\n') % (size, fname))
750 749 progress(msgbundling, None)
751 750
752 751 def deltaparent(self, revlog, rev, p1, p2, prev):
753 752 return prev
754 753
755 754 def revchunk(self, revlog, rev, prev, linknode):
756 755 node = revlog.node(rev)
757 756 p1, p2 = revlog.parentrevs(rev)
758 757 base = self.deltaparent(revlog, rev, p1, p2, prev)
759 758
760 759 prefix = ''
761 760 if revlog.iscensored(base) or revlog.iscensored(rev):
762 761 try:
763 762 delta = revlog.revision(node)
764 763 except error.CensoredNodeError as e:
765 764 delta = e.tombstone
766 765 if base == nullrev:
767 766 prefix = mdiff.trivialdiffheader(len(delta))
768 767 else:
769 768 baselen = revlog.rawsize(base)
770 769 prefix = mdiff.replacediffheader(baselen, len(delta))
771 770 elif base == nullrev:
772 771 delta = revlog.revision(node)
773 772 prefix = mdiff.trivialdiffheader(len(delta))
774 773 else:
775 774 delta = revlog.revdiff(base, rev)
776 775 p1n, p2n = revlog.parents(node)
777 776 basenode = revlog.node(base)
778 777 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode)
779 778 meta += prefix
780 779 l = len(meta) + len(delta)
781 780 yield chunkheader(l)
782 781 yield meta
783 782 yield delta
784 783 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
785 784 # do nothing with basenode, it is implicitly the previous one in HG10
786 785 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
787 786
788 787 class cg2packer(cg1packer):
789 788 version = '02'
790 789 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
791 790
792 791 def __init__(self, repo, bundlecaps=None):
793 792 super(cg2packer, self).__init__(repo, bundlecaps)
794 793 if self._reorder is None:
795 794 # Since generaldelta is directly supported by cg2, reordering
796 795 # generally doesn't help, so we disable it by default (treating
797 796 # bundle.reorder=auto just like bundle.reorder=False).
798 797 self._reorder = False
799 798
800 799 def deltaparent(self, revlog, rev, p1, p2, prev):
801 800 dp = revlog.deltaparent(rev)
802 801 # avoid storing full revisions; pick prev in those cases
803 802 # also pick prev when we can't be sure remote has dp
804 803 if dp == nullrev or (dp != p1 and dp != p2 and dp != prev):
805 804 return prev
806 805 return dp
807 806
808 807 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
809 808 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
810 809
811 810 packermap = {'01': (cg1packer, cg1unpacker),
812 811 # cg2 adds support for exchanging generaldelta
813 812 '02': (cg2packer, cg2unpacker),
814 813 }
815 814
816 815 def _changegroupinfo(repo, nodes, source):
817 816 if repo.ui.verbose or source == 'bundle':
818 817 repo.ui.status(_("%d changesets found\n") % len(nodes))
819 818 if repo.ui.debugflag:
820 819 repo.ui.debug("list of changesets:\n")
821 820 for node in nodes:
822 821 repo.ui.debug("%s\n" % hex(node))
823 822
824 823 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
825 824 repo = repo.unfiltered()
826 825 commonrevs = outgoing.common
827 826 csets = outgoing.missing
828 827 heads = outgoing.missingheads
829 828 # We go through the fast path if we get told to, or if all (unfiltered
830 829 # heads have been requested (since we then know there all linkrevs will
831 830 # be pulled by the client).
832 831 heads.sort()
833 832 fastpathlinkrev = fastpath or (
834 833 repo.filtername is None and heads == sorted(repo.heads()))
835 834
836 835 repo.hook('preoutgoing', throw=True, source=source)
837 836 _changegroupinfo(repo, csets, source)
838 837 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
839 838
840 839 def getsubset(repo, outgoing, bundler, source, fastpath=False):
841 840 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
842 841 return packermap[bundler.version][1](util.chunkbuffer(gengroup), None)
843 842
844 843 def changegroupsubset(repo, roots, heads, source, version='01'):
845 844 """Compute a changegroup consisting of all the nodes that are
846 845 descendants of any of the roots and ancestors of any of the heads.
847 846 Return a chunkbuffer object whose read() method will return
848 847 successive changegroup chunks.
849 848
850 849 It is fairly complex as determining which filenodes and which
851 850 manifest nodes need to be included for the changeset to be complete
852 851 is non-trivial.
853 852
854 853 Another wrinkle is doing the reverse, figuring out which changeset in
855 854 the changegroup a particular filenode or manifestnode belongs to.
856 855 """
857 856 cl = repo.changelog
858 857 if not roots:
859 858 roots = [nullid]
860 859 discbases = []
861 860 for n in roots:
862 861 discbases.extend([p for p in cl.parents(n) if p != nullid])
863 862 # TODO: remove call to nodesbetween.
864 863 csets, roots, heads = cl.nodesbetween(roots, heads)
865 864 included = set(csets)
866 865 discbases = [n for n in discbases if n not in included]
867 866 outgoing = discovery.outgoing(cl, discbases, heads)
868 867 bundler = packermap[version][0](repo)
869 868 return getsubset(repo, outgoing, bundler, source)
870 869
871 870 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
872 871 version='01'):
873 872 """Like getbundle, but taking a discovery.outgoing as an argument.
874 873
875 874 This is only implemented for local repos and reuses potentially
876 875 precomputed sets in outgoing. Returns a raw changegroup generator."""
877 876 if not outgoing.missing:
878 877 return None
879 878 bundler = packermap[version][0](repo, bundlecaps)
880 879 return getsubsetraw(repo, outgoing, bundler, source)
881 880
882 881 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None,
883 882 version='01'):
884 883 """Like getbundle, but taking a discovery.outgoing as an argument.
885 884
886 885 This is only implemented for local repos and reuses potentially
887 886 precomputed sets in outgoing."""
888 887 if not outgoing.missing:
889 888 return None
890 889 bundler = packermap[version][0](repo, bundlecaps)
891 890 return getsubset(repo, outgoing, bundler, source)
892 891
893 892 def computeoutgoing(repo, heads, common):
894 893 """Computes which revs are outgoing given a set of common
895 894 and a set of heads.
896 895
897 896 This is a separate function so extensions can have access to
898 897 the logic.
899 898
900 899 Returns a discovery.outgoing object.
901 900 """
902 901 cl = repo.changelog
903 902 if common:
904 903 hasnode = cl.hasnode
905 904 common = [n for n in common if hasnode(n)]
906 905 else:
907 906 common = [nullid]
908 907 if not heads:
909 908 heads = cl.heads()
910 909 return discovery.outgoing(cl, common, heads)
911 910
912 911 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None,
913 912 version='01'):
914 913 """Like changegroupsubset, but returns the set difference between the
915 914 ancestors of heads and the ancestors common.
916 915
917 916 If heads is None, use the local heads. If common is None, use [nullid].
918 917
919 918 The nodes in common might not all be known locally due to the way the
920 919 current discovery protocol works.
921 920 """
922 921 outgoing = computeoutgoing(repo, heads, common)
923 922 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps,
924 923 version=version)
925 924
926 925 def changegroup(repo, basenodes, source):
927 926 # to avoid a race we use changegroupsubset() (issue1320)
928 927 return changegroupsubset(repo, basenodes, repo.heads(), source)
929 928
930 929 def _addchangegroupfiles(repo, source, revmap, trp, pr, needfiles, wasempty):
931 930 revisions = 0
932 931 files = 0
933 932 while True:
934 933 chunkdata = source.filelogheader()
935 934 if not chunkdata:
936 935 break
937 936 f = chunkdata["filename"]
938 937 repo.ui.debug("adding %s revisions\n" % f)
939 938 pr()
940 939 fl = repo.file(f)
941 940 o = len(fl)
942 941 try:
943 942 if not fl.addgroup(source, revmap, trp):
944 943 raise error.Abort(_("received file revlog group is empty"))
945 944 except error.CensoredBaseError as e:
946 945 raise error.Abort(_("received delta base is censored: %s") % e)
947 946 revisions += len(fl) - o
948 947 files += 1
949 948 if f in needfiles:
950 949 needs = needfiles[f]
951 950 for new in xrange(o, len(fl)):
952 951 n = fl.node(new)
953 952 if n in needs:
954 953 needs.remove(n)
955 954 else:
956 955 raise error.Abort(
957 956 _("received spurious file revlog entry"))
958 957 if not needs:
959 958 del needfiles[f]
960 959 repo.ui.progress(_('files'), None)
961 960
962 961 for f, needs in needfiles.iteritems():
963 962 fl = repo.file(f)
964 963 for n in needs:
965 964 try:
966 965 fl.rev(n)
967 966 except error.LookupError:
968 967 raise error.Abort(
969 968 _('missing file data for %s:%s - run hg verify') %
970 969 (f, hex(n)))
971 970
972 971 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now