##// END OF EJS Templates
changegroup: note during bundle apply if the repo was empty...
Augie Fackler -
r27218:de3335b4 default
parent child Browse files
Show More
@@ -1,948 +1,949
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullid,
19 19 nullrev,
20 20 short,
21 21 )
22 22
23 23 from . import (
24 24 branchmap,
25 25 dagutil,
26 26 discovery,
27 27 error,
28 28 mdiff,
29 29 phases,
30 30 util,
31 31 )
32 32
33 33 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
34 34 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
35 35
36 36 def readexactly(stream, n):
37 37 '''read n bytes from stream.read and abort if less was available'''
38 38 s = stream.read(n)
39 39 if len(s) < n:
40 40 raise error.Abort(_("stream ended unexpectedly"
41 41 " (got %d bytes, expected %d)")
42 42 % (len(s), n))
43 43 return s
44 44
45 45 def getchunk(stream):
46 46 """return the next chunk from stream as a string"""
47 47 d = readexactly(stream, 4)
48 48 l = struct.unpack(">l", d)[0]
49 49 if l <= 4:
50 50 if l:
51 51 raise error.Abort(_("invalid chunk length %d") % l)
52 52 return ""
53 53 return readexactly(stream, l - 4)
54 54
55 55 def chunkheader(length):
56 56 """return a changegroup chunk header (string)"""
57 57 return struct.pack(">l", length + 4)
58 58
59 59 def closechunk():
60 60 """return a changegroup chunk header (string) for a zero-length chunk"""
61 61 return struct.pack(">l", 0)
62 62
63 63 def combineresults(results):
64 64 """logic to combine 0 or more addchangegroup results into one"""
65 65 changedheads = 0
66 66 result = 1
67 67 for ret in results:
68 68 # If any changegroup result is 0, return 0
69 69 if ret == 0:
70 70 result = 0
71 71 break
72 72 if ret < -1:
73 73 changedheads += ret + 1
74 74 elif ret > 1:
75 75 changedheads += ret - 1
76 76 if changedheads > 0:
77 77 result = 1 + changedheads
78 78 elif changedheads < 0:
79 79 result = -1 + changedheads
80 80 return result
81 81
82 82 bundletypes = {
83 83 "": ("", None), # only when using unbundle on ssh and old http servers
84 84 # since the unification ssh accepts a header but there
85 85 # is no capability signaling it.
86 86 "HG20": (), # special-cased below
87 87 "HG10UN": ("HG10UN", None),
88 88 "HG10BZ": ("HG10", 'BZ'),
89 89 "HG10GZ": ("HG10GZ", 'GZ'),
90 90 }
91 91
92 92 # hgweb uses this list to communicate its preferred type
93 93 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
94 94
95 95 def writechunks(ui, chunks, filename, vfs=None):
96 96 """Write chunks to a file and return its filename.
97 97
98 98 The stream is assumed to be a bundle file.
99 99 Existing files will not be overwritten.
100 100 If no filename is specified, a temporary file is created.
101 101 """
102 102 fh = None
103 103 cleanup = None
104 104 try:
105 105 if filename:
106 106 if vfs:
107 107 fh = vfs.open(filename, "wb")
108 108 else:
109 109 fh = open(filename, "wb")
110 110 else:
111 111 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
112 112 fh = os.fdopen(fd, "wb")
113 113 cleanup = filename
114 114 for c in chunks:
115 115 fh.write(c)
116 116 cleanup = None
117 117 return filename
118 118 finally:
119 119 if fh is not None:
120 120 fh.close()
121 121 if cleanup is not None:
122 122 if filename and vfs:
123 123 vfs.unlink(cleanup)
124 124 else:
125 125 os.unlink(cleanup)
126 126
127 127 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
128 128 """Write a bundle file and return its filename.
129 129
130 130 Existing files will not be overwritten.
131 131 If no filename is specified, a temporary file is created.
132 132 bz2 compression can be turned off.
133 133 The bundle file will be deleted in case of errors.
134 134 """
135 135
136 136 if bundletype == "HG20":
137 137 from . import bundle2
138 138 bundle = bundle2.bundle20(ui)
139 139 bundle.setcompression(compression)
140 140 part = bundle.newpart('changegroup', data=cg.getchunks())
141 141 part.addparam('version', cg.version)
142 142 chunkiter = bundle.getchunks()
143 143 else:
144 144 # compression argument is only for the bundle2 case
145 145 assert compression is None
146 146 if cg.version != '01':
147 147 raise error.Abort(_('old bundle types only supports v1 '
148 148 'changegroups'))
149 149 header, comp = bundletypes[bundletype]
150 150 if comp not in util.compressors:
151 151 raise error.Abort(_('unknown stream compression type: %s')
152 152 % comp)
153 153 z = util.compressors[comp]()
154 154 subchunkiter = cg.getchunks()
155 155 def chunkiter():
156 156 yield header
157 157 for chunk in subchunkiter:
158 158 yield z.compress(chunk)
159 159 yield z.flush()
160 160 chunkiter = chunkiter()
161 161
162 162 # parse the changegroup data, otherwise we will block
163 163 # in case of sshrepo because we don't know the end of the stream
164 164
165 165 # an empty chunkgroup is the end of the changegroup
166 166 # a changegroup has at least 2 chunkgroups (changelog and manifest).
167 167 # after that, an empty chunkgroup is the end of the changegroup
168 168 return writechunks(ui, chunkiter, filename, vfs=vfs)
169 169
170 170 class cg1unpacker(object):
171 171 """Unpacker for cg1 changegroup streams.
172 172
173 173 A changegroup unpacker handles the framing of the revision data in
174 174 the wire format. Most consumers will want to use the apply()
175 175 method to add the changes from the changegroup to a repository.
176 176
177 177 If you're forwarding a changegroup unmodified to another consumer,
178 178 use getchunks(), which returns an iterator of changegroup
179 179 chunks. This is mostly useful for cases where you need to know the
180 180 data stream has ended by observing the end of the changegroup.
181 181
182 182 deltachunk() is useful only if you're applying delta data. Most
183 183 consumers should prefer apply() instead.
184 184
185 185 A few other public methods exist. Those are used only for
186 186 bundlerepo and some debug commands - their use is discouraged.
187 187 """
188 188 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
189 189 deltaheadersize = struct.calcsize(deltaheader)
190 190 version = '01'
191 191 def __init__(self, fh, alg):
192 192 if alg == 'UN':
193 193 alg = None # get more modern without breaking too much
194 194 if not alg in util.decompressors:
195 195 raise error.Abort(_('unknown stream compression type: %s')
196 196 % alg)
197 197 if alg == 'BZ':
198 198 alg = '_truncatedBZ'
199 199 self._stream = util.decompressors[alg](fh)
200 200 self._type = alg
201 201 self.callback = None
202 202
203 203 # These methods (compressed, read, seek, tell) all appear to only
204 204 # be used by bundlerepo, but it's a little hard to tell.
205 205 def compressed(self):
206 206 return self._type is not None
207 207 def read(self, l):
208 208 return self._stream.read(l)
209 209 def seek(self, pos):
210 210 return self._stream.seek(pos)
211 211 def tell(self):
212 212 return self._stream.tell()
213 213 def close(self):
214 214 return self._stream.close()
215 215
216 216 def _chunklength(self):
217 217 d = readexactly(self._stream, 4)
218 218 l = struct.unpack(">l", d)[0]
219 219 if l <= 4:
220 220 if l:
221 221 raise error.Abort(_("invalid chunk length %d") % l)
222 222 return 0
223 223 if self.callback:
224 224 self.callback()
225 225 return l - 4
226 226
227 227 def changelogheader(self):
228 228 """v10 does not have a changelog header chunk"""
229 229 return {}
230 230
231 231 def manifestheader(self):
232 232 """v10 does not have a manifest header chunk"""
233 233 return {}
234 234
235 235 def filelogheader(self):
236 236 """return the header of the filelogs chunk, v10 only has the filename"""
237 237 l = self._chunklength()
238 238 if not l:
239 239 return {}
240 240 fname = readexactly(self._stream, l)
241 241 return {'filename': fname}
242 242
243 243 def _deltaheader(self, headertuple, prevnode):
244 244 node, p1, p2, cs = headertuple
245 245 if prevnode is None:
246 246 deltabase = p1
247 247 else:
248 248 deltabase = prevnode
249 249 return node, p1, p2, deltabase, cs
250 250
251 251 def deltachunk(self, prevnode):
252 252 l = self._chunklength()
253 253 if not l:
254 254 return {}
255 255 headerdata = readexactly(self._stream, self.deltaheadersize)
256 256 header = struct.unpack(self.deltaheader, headerdata)
257 257 delta = readexactly(self._stream, l - self.deltaheadersize)
258 258 node, p1, p2, deltabase, cs = self._deltaheader(header, prevnode)
259 259 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
260 260 'deltabase': deltabase, 'delta': delta}
261 261
262 262 def getchunks(self):
263 263 """returns all the chunks contains in the bundle
264 264
265 265 Used when you need to forward the binary stream to a file or another
266 266 network API. To do so, it parse the changegroup data, otherwise it will
267 267 block in case of sshrepo because it don't know the end of the stream.
268 268 """
269 269 # an empty chunkgroup is the end of the changegroup
270 270 # a changegroup has at least 2 chunkgroups (changelog and manifest).
271 271 # after that, an empty chunkgroup is the end of the changegroup
272 272 empty = False
273 273 count = 0
274 274 while not empty or count <= 2:
275 275 empty = True
276 276 count += 1
277 277 while True:
278 278 chunk = getchunk(self)
279 279 if not chunk:
280 280 break
281 281 empty = False
282 282 yield chunkheader(len(chunk))
283 283 pos = 0
284 284 while pos < len(chunk):
285 285 next = pos + 2**20
286 286 yield chunk[pos:next]
287 287 pos = next
288 288 yield closechunk()
289 289
290 290 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
291 291 # We know that we'll never have more manifests than we had
292 292 # changesets.
293 293 self.callback = prog(_('manifests'), numchanges)
294 294 # no need to check for empty manifest group here:
295 295 # if the result of the merge of 1 and 2 is the same in 3 and 4,
296 296 # no new manifest will be created and the manifest group will
297 297 # be empty during the pull
298 298 self.manifestheader()
299 299 repo.manifest.addgroup(self, revmap, trp)
300 300 repo.ui.progress(_('manifests'), None)
301 301
302 302 def apply(self, repo, srctype, url, emptyok=False,
303 303 targetphase=phases.draft, expectedtotal=None):
304 304 """Add the changegroup returned by source.read() to this repo.
305 305 srctype is a string like 'push', 'pull', or 'unbundle'. url is
306 306 the URL of the repo where this changegroup is coming from.
307 307
308 308 Return an integer summarizing the change to this repo:
309 309 - nothing changed or no source: 0
310 310 - more heads than before: 1+added heads (2..n)
311 311 - fewer heads than before: -1-removed heads (-2..-n)
312 312 - number of heads stays the same: 1
313 313 """
314 314 repo = repo.unfiltered()
315 wasempty = (len(repo.changelog) == 0)
315 316 def csmap(x):
316 317 repo.ui.debug("add changeset %s\n" % short(x))
317 318 return len(cl)
318 319
319 320 def revmap(x):
320 321 return cl.rev(x)
321 322
322 323 changesets = files = revisions = 0
323 324
324 325 tr = repo.transaction("\n".join([srctype, util.hidepassword(url)]))
325 326 try:
326 327 # The transaction could have been created before and already
327 328 # carries source information. In this case we use the top
328 329 # level data. We overwrite the argument because we need to use
329 330 # the top level value (if they exist) in this function.
330 331 srctype = tr.hookargs.setdefault('source', srctype)
331 332 url = tr.hookargs.setdefault('url', url)
332 333 repo.hook('prechangegroup', throw=True, **tr.hookargs)
333 334
334 335 # write changelog data to temp files so concurrent readers
335 336 # will not see an inconsistent view
336 337 cl = repo.changelog
337 338 cl.delayupdate(tr)
338 339 oldheads = cl.heads()
339 340
340 341 trp = weakref.proxy(tr)
341 342 # pull off the changeset group
342 343 repo.ui.status(_("adding changesets\n"))
343 344 clstart = len(cl)
344 345 class prog(object):
345 346 def __init__(self, step, total):
346 347 self._step = step
347 348 self._total = total
348 349 self._count = 1
349 350 def __call__(self):
350 351 repo.ui.progress(self._step, self._count, unit=_('chunks'),
351 352 total=self._total)
352 353 self._count += 1
353 354 self.callback = prog(_('changesets'), expectedtotal)
354 355
355 356 efiles = set()
356 357 def onchangelog(cl, node):
357 358 efiles.update(cl.read(node)[3])
358 359
359 360 self.changelogheader()
360 361 srccontent = cl.addgroup(self, csmap, trp,
361 362 addrevisioncb=onchangelog)
362 363 efiles = len(efiles)
363 364
364 365 if not (srccontent or emptyok):
365 366 raise error.Abort(_("received changelog group is empty"))
366 367 clend = len(cl)
367 368 changesets = clend - clstart
368 369 repo.ui.progress(_('changesets'), None)
369 370
370 371 # pull off the manifest group
371 372 repo.ui.status(_("adding manifests\n"))
372 373 self._unpackmanifests(repo, revmap, trp, prog, changesets)
373 374
374 375 needfiles = {}
375 376 if repo.ui.configbool('server', 'validate', default=False):
376 377 # validate incoming csets have their manifests
377 378 for cset in xrange(clstart, clend):
378 379 mfnode = repo.changelog.read(repo.changelog.node(cset))[0]
379 380 mfest = repo.manifest.readdelta(mfnode)
380 381 # store file nodes we must see
381 382 for f, n in mfest.iteritems():
382 383 needfiles.setdefault(f, set()).add(n)
383 384
384 385 # process the files
385 386 repo.ui.status(_("adding file changes\n"))
386 387 self.callback = None
387 388 pr = prog(_('files'), efiles)
388 389 newrevs, newfiles = _addchangegroupfiles(
389 repo, self, revmap, trp, pr, needfiles)
390 repo, self, revmap, trp, pr, needfiles, wasempty)
390 391 revisions += newrevs
391 392 files += newfiles
392 393
393 394 dh = 0
394 395 if oldheads:
395 396 heads = cl.heads()
396 397 dh = len(heads) - len(oldheads)
397 398 for h in heads:
398 399 if h not in oldheads and repo[h].closesbranch():
399 400 dh -= 1
400 401 htext = ""
401 402 if dh:
402 403 htext = _(" (%+d heads)") % dh
403 404
404 405 repo.ui.status(_("added %d changesets"
405 406 " with %d changes to %d files%s\n")
406 407 % (changesets, revisions, files, htext))
407 408 repo.invalidatevolatilesets()
408 409
409 410 if changesets > 0:
410 411 if 'node' not in tr.hookargs:
411 412 tr.hookargs['node'] = hex(cl.node(clstart))
412 413 hookargs = dict(tr.hookargs)
413 414 else:
414 415 hookargs = dict(tr.hookargs)
415 416 hookargs['node'] = hex(cl.node(clstart))
416 417 repo.hook('pretxnchangegroup', throw=True, **hookargs)
417 418
418 419 added = [cl.node(r) for r in xrange(clstart, clend)]
419 420 publishing = repo.publishing()
420 421 if srctype in ('push', 'serve'):
421 422 # Old servers can not push the boundary themselves.
422 423 # New servers won't push the boundary if changeset already
423 424 # exists locally as secret
424 425 #
425 426 # We should not use added here but the list of all change in
426 427 # the bundle
427 428 if publishing:
428 429 phases.advanceboundary(repo, tr, phases.public, srccontent)
429 430 else:
430 431 # Those changesets have been pushed from the outside, their
431 432 # phases are going to be pushed alongside. Therefor
432 433 # `targetphase` is ignored.
433 434 phases.advanceboundary(repo, tr, phases.draft, srccontent)
434 435 phases.retractboundary(repo, tr, phases.draft, added)
435 436 elif srctype != 'strip':
436 437 # publishing only alter behavior during push
437 438 #
438 439 # strip should not touch boundary at all
439 440 phases.retractboundary(repo, tr, targetphase, added)
440 441
441 442 if changesets > 0:
442 443 if srctype != 'strip':
443 444 # During strip, branchcache is invalid but coming call to
444 445 # `destroyed` will repair it.
445 446 # In other case we can safely update cache on disk.
446 447 branchmap.updatecache(repo.filtered('served'))
447 448
448 449 def runhooks():
449 450 # These hooks run when the lock releases, not when the
450 451 # transaction closes. So it's possible for the changelog
451 452 # to have changed since we last saw it.
452 453 if clstart >= len(repo):
453 454 return
454 455
455 456 # forcefully update the on-disk branch cache
456 457 repo.ui.debug("updating the branch cache\n")
457 458 repo.hook("changegroup", **hookargs)
458 459
459 460 for n in added:
460 461 args = hookargs.copy()
461 462 args['node'] = hex(n)
462 463 repo.hook("incoming", **args)
463 464
464 465 newheads = [h for h in repo.heads() if h not in oldheads]
465 466 repo.ui.log("incoming",
466 467 "%s incoming changes - new heads: %s\n",
467 468 len(added),
468 469 ', '.join([hex(c[:6]) for c in newheads]))
469 470
470 471 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
471 472 lambda tr: repo._afterlock(runhooks))
472 473
473 474 tr.close()
474 475
475 476 finally:
476 477 tr.release()
477 478 repo.ui.flush()
478 479 # never return 0 here:
479 480 if dh < 0:
480 481 return dh - 1
481 482 else:
482 483 return dh + 1
483 484
484 485 class cg2unpacker(cg1unpacker):
485 486 """Unpacker for cg2 streams.
486 487
487 488 cg2 streams add support for generaldelta, so the delta header
488 489 format is slightly different. All other features about the data
489 490 remain the same.
490 491 """
491 492 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
492 493 deltaheadersize = struct.calcsize(deltaheader)
493 494 version = '02'
494 495
495 496 def _deltaheader(self, headertuple, prevnode):
496 497 node, p1, p2, deltabase, cs = headertuple
497 498 return node, p1, p2, deltabase, cs
498 499
499 500 class headerlessfixup(object):
500 501 def __init__(self, fh, h):
501 502 self._h = h
502 503 self._fh = fh
503 504 def read(self, n):
504 505 if self._h:
505 506 d, self._h = self._h[:n], self._h[n:]
506 507 if len(d) < n:
507 508 d += readexactly(self._fh, n - len(d))
508 509 return d
509 510 return readexactly(self._fh, n)
510 511
511 512 class cg1packer(object):
512 513 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
513 514 version = '01'
514 515 def __init__(self, repo, bundlecaps=None):
515 516 """Given a source repo, construct a bundler.
516 517
517 518 bundlecaps is optional and can be used to specify the set of
518 519 capabilities which can be used to build the bundle.
519 520 """
520 521 # Set of capabilities we can use to build the bundle.
521 522 if bundlecaps is None:
522 523 bundlecaps = set()
523 524 self._bundlecaps = bundlecaps
524 525 # experimental config: bundle.reorder
525 526 reorder = repo.ui.config('bundle', 'reorder', 'auto')
526 527 if reorder == 'auto':
527 528 reorder = None
528 529 else:
529 530 reorder = util.parsebool(reorder)
530 531 self._repo = repo
531 532 self._reorder = reorder
532 533 self._progress = repo.ui.progress
533 534 if self._repo.ui.verbose and not self._repo.ui.debugflag:
534 535 self._verbosenote = self._repo.ui.note
535 536 else:
536 537 self._verbosenote = lambda s: None
537 538
538 539 def close(self):
539 540 return closechunk()
540 541
541 542 def fileheader(self, fname):
542 543 return chunkheader(len(fname)) + fname
543 544
544 545 def group(self, nodelist, revlog, lookup, units=None):
545 546 """Calculate a delta group, yielding a sequence of changegroup chunks
546 547 (strings).
547 548
548 549 Given a list of changeset revs, return a set of deltas and
549 550 metadata corresponding to nodes. The first delta is
550 551 first parent(nodelist[0]) -> nodelist[0], the receiver is
551 552 guaranteed to have this parent as it has all history before
552 553 these changesets. In the case firstparent is nullrev the
553 554 changegroup starts with a full revision.
554 555
555 556 If units is not None, progress detail will be generated, units specifies
556 557 the type of revlog that is touched (changelog, manifest, etc.).
557 558 """
558 559 # if we don't have any revisions touched by these changesets, bail
559 560 if len(nodelist) == 0:
560 561 yield self.close()
561 562 return
562 563
563 564 # for generaldelta revlogs, we linearize the revs; this will both be
564 565 # much quicker and generate a much smaller bundle
565 566 if (revlog._generaldelta and self._reorder is None) or self._reorder:
566 567 dag = dagutil.revlogdag(revlog)
567 568 revs = set(revlog.rev(n) for n in nodelist)
568 569 revs = dag.linearize(revs)
569 570 else:
570 571 revs = sorted([revlog.rev(n) for n in nodelist])
571 572
572 573 # add the parent of the first rev
573 574 p = revlog.parentrevs(revs[0])[0]
574 575 revs.insert(0, p)
575 576
576 577 # build deltas
577 578 total = len(revs) - 1
578 579 msgbundling = _('bundling')
579 580 for r in xrange(len(revs) - 1):
580 581 if units is not None:
581 582 self._progress(msgbundling, r + 1, unit=units, total=total)
582 583 prev, curr = revs[r], revs[r + 1]
583 584 linknode = lookup(revlog.node(curr))
584 585 for c in self.revchunk(revlog, curr, prev, linknode):
585 586 yield c
586 587
587 588 if units is not None:
588 589 self._progress(msgbundling, None)
589 590 yield self.close()
590 591
591 592 # filter any nodes that claim to be part of the known set
592 593 def prune(self, revlog, missing, commonrevs):
593 594 rr, rl = revlog.rev, revlog.linkrev
594 595 return [n for n in missing if rl(rr(n)) not in commonrevs]
595 596
596 597 def _packmanifests(self, mfnodes, lookuplinknode):
597 598 """Pack flat manifests into a changegroup stream."""
598 599 ml = self._repo.manifest
599 600 size = 0
600 601 for chunk in self.group(
601 602 mfnodes, ml, lookuplinknode, units=_('manifests')):
602 603 size += len(chunk)
603 604 yield chunk
604 605 self._verbosenote(_('%8.i (manifests)\n') % size)
605 606
606 607 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
607 608 '''yield a sequence of changegroup chunks (strings)'''
608 609 repo = self._repo
609 610 cl = repo.changelog
610 611 ml = repo.manifest
611 612
612 613 clrevorder = {}
613 614 mfs = {} # needed manifests
614 615 fnodes = {} # needed file nodes
615 616 changedfiles = set()
616 617
617 618 # Callback for the changelog, used to collect changed files and manifest
618 619 # nodes.
619 620 # Returns the linkrev node (identity in the changelog case).
620 621 def lookupcl(x):
621 622 c = cl.read(x)
622 623 clrevorder[x] = len(clrevorder)
623 624 changedfiles.update(c[3])
624 625 # record the first changeset introducing this manifest version
625 626 mfs.setdefault(c[0], x)
626 627 return x
627 628
628 629 self._verbosenote(_('uncompressed size of bundle content:\n'))
629 630 size = 0
630 631 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
631 632 size += len(chunk)
632 633 yield chunk
633 634 self._verbosenote(_('%8.i (changelog)\n') % size)
634 635
635 636 # We need to make sure that the linkrev in the changegroup refers to
636 637 # the first changeset that introduced the manifest or file revision.
637 638 # The fastpath is usually safer than the slowpath, because the filelogs
638 639 # are walked in revlog order.
639 640 #
640 641 # When taking the slowpath with reorder=None and the manifest revlog
641 642 # uses generaldelta, the manifest may be walked in the "wrong" order.
642 643 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
643 644 # cc0ff93d0c0c).
644 645 #
645 646 # When taking the fastpath, we are only vulnerable to reordering
646 647 # of the changelog itself. The changelog never uses generaldelta, so
647 648 # it is only reordered when reorder=True. To handle this case, we
648 649 # simply take the slowpath, which already has the 'clrevorder' logic.
649 650 # This was also fixed in cc0ff93d0c0c.
650 651 fastpathlinkrev = fastpathlinkrev and not self._reorder
651 652 # Callback for the manifest, used to collect linkrevs for filelog
652 653 # revisions.
653 654 # Returns the linkrev node (collected in lookupcl).
654 655 def lookupmflinknode(x):
655 656 clnode = mfs[x]
656 657 if not fastpathlinkrev:
657 658 mdata = ml.readfast(x)
658 659 for f, n in mdata.iteritems():
659 660 if f in changedfiles:
660 661 # record the first changeset introducing this filelog
661 662 # version
662 663 fclnodes = fnodes.setdefault(f, {})
663 664 fclnode = fclnodes.setdefault(n, clnode)
664 665 if clrevorder[clnode] < clrevorder[fclnode]:
665 666 fclnodes[n] = clnode
666 667 return clnode
667 668
668 669 mfnodes = self.prune(ml, mfs, commonrevs)
669 670 for x in self._packmanifests(mfnodes, lookupmflinknode):
670 671 yield x
671 672
672 673 mfs.clear()
673 674 clrevs = set(cl.rev(x) for x in clnodes)
674 675
675 676 def linknodes(filerevlog, fname):
676 677 if fastpathlinkrev:
677 678 llr = filerevlog.linkrev
678 679 def genfilenodes():
679 680 for r in filerevlog:
680 681 linkrev = llr(r)
681 682 if linkrev in clrevs:
682 683 yield filerevlog.node(r), cl.node(linkrev)
683 684 return dict(genfilenodes())
684 685 return fnodes.get(fname, {})
685 686
686 687 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
687 688 source):
688 689 yield chunk
689 690
690 691 yield self.close()
691 692
692 693 if clnodes:
693 694 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
694 695
695 696 # The 'source' parameter is useful for extensions
696 697 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
697 698 repo = self._repo
698 699 progress = self._progress
699 700 msgbundling = _('bundling')
700 701
701 702 total = len(changedfiles)
702 703 # for progress output
703 704 msgfiles = _('files')
704 705 for i, fname in enumerate(sorted(changedfiles)):
705 706 filerevlog = repo.file(fname)
706 707 if not filerevlog:
707 708 raise error.Abort(_("empty or missing revlog for %s") % fname)
708 709
709 710 linkrevnodes = linknodes(filerevlog, fname)
710 711 # Lookup for filenodes, we collected the linkrev nodes above in the
711 712 # fastpath case and with lookupmf in the slowpath case.
712 713 def lookupfilelog(x):
713 714 return linkrevnodes[x]
714 715
715 716 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
716 717 if filenodes:
717 718 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
718 719 total=total)
719 720 h = self.fileheader(fname)
720 721 size = len(h)
721 722 yield h
722 723 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
723 724 size += len(chunk)
724 725 yield chunk
725 726 self._verbosenote(_('%8.i %s\n') % (size, fname))
726 727 progress(msgbundling, None)
727 728
728 729 def deltaparent(self, revlog, rev, p1, p2, prev):
729 730 return prev
730 731
731 732 def revchunk(self, revlog, rev, prev, linknode):
732 733 node = revlog.node(rev)
733 734 p1, p2 = revlog.parentrevs(rev)
734 735 base = self.deltaparent(revlog, rev, p1, p2, prev)
735 736
736 737 prefix = ''
737 738 if revlog.iscensored(base) or revlog.iscensored(rev):
738 739 try:
739 740 delta = revlog.revision(node)
740 741 except error.CensoredNodeError as e:
741 742 delta = e.tombstone
742 743 if base == nullrev:
743 744 prefix = mdiff.trivialdiffheader(len(delta))
744 745 else:
745 746 baselen = revlog.rawsize(base)
746 747 prefix = mdiff.replacediffheader(baselen, len(delta))
747 748 elif base == nullrev:
748 749 delta = revlog.revision(node)
749 750 prefix = mdiff.trivialdiffheader(len(delta))
750 751 else:
751 752 delta = revlog.revdiff(base, rev)
752 753 p1n, p2n = revlog.parents(node)
753 754 basenode = revlog.node(base)
754 755 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode)
755 756 meta += prefix
756 757 l = len(meta) + len(delta)
757 758 yield chunkheader(l)
758 759 yield meta
759 760 yield delta
760 761 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
761 762 # do nothing with basenode, it is implicitly the previous one in HG10
762 763 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
763 764
764 765 class cg2packer(cg1packer):
765 766 version = '02'
766 767 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
767 768
768 769 def __init__(self, repo, bundlecaps=None):
769 770 super(cg2packer, self).__init__(repo, bundlecaps)
770 771 if self._reorder is None:
771 772 # Since generaldelta is directly supported by cg2, reordering
772 773 # generally doesn't help, so we disable it by default (treating
773 774 # bundle.reorder=auto just like bundle.reorder=False).
774 775 self._reorder = False
775 776
776 777 def deltaparent(self, revlog, rev, p1, p2, prev):
777 778 dp = revlog.deltaparent(rev)
778 779 # avoid storing full revisions; pick prev in those cases
779 780 # also pick prev when we can't be sure remote has dp
780 781 if dp == nullrev or (dp != p1 and dp != p2 and dp != prev):
781 782 return prev
782 783 return dp
783 784
784 785 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
785 786 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
786 787
787 788 packermap = {'01': (cg1packer, cg1unpacker),
788 789 # cg2 adds support for exchanging generaldelta
789 790 '02': (cg2packer, cg2unpacker),
790 791 }
791 792
792 793 def _changegroupinfo(repo, nodes, source):
793 794 if repo.ui.verbose or source == 'bundle':
794 795 repo.ui.status(_("%d changesets found\n") % len(nodes))
795 796 if repo.ui.debugflag:
796 797 repo.ui.debug("list of changesets:\n")
797 798 for node in nodes:
798 799 repo.ui.debug("%s\n" % hex(node))
799 800
800 801 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
801 802 repo = repo.unfiltered()
802 803 commonrevs = outgoing.common
803 804 csets = outgoing.missing
804 805 heads = outgoing.missingheads
805 806 # We go through the fast path if we get told to, or if all (unfiltered
806 807 # heads have been requested (since we then know there all linkrevs will
807 808 # be pulled by the client).
808 809 heads.sort()
809 810 fastpathlinkrev = fastpath or (
810 811 repo.filtername is None and heads == sorted(repo.heads()))
811 812
812 813 repo.hook('preoutgoing', throw=True, source=source)
813 814 _changegroupinfo(repo, csets, source)
814 815 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
815 816
816 817 def getsubset(repo, outgoing, bundler, source, fastpath=False):
817 818 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
818 819 return packermap[bundler.version][1](util.chunkbuffer(gengroup), None)
819 820
820 821 def changegroupsubset(repo, roots, heads, source, version='01'):
821 822 """Compute a changegroup consisting of all the nodes that are
822 823 descendants of any of the roots and ancestors of any of the heads.
823 824 Return a chunkbuffer object whose read() method will return
824 825 successive changegroup chunks.
825 826
826 827 It is fairly complex as determining which filenodes and which
827 828 manifest nodes need to be included for the changeset to be complete
828 829 is non-trivial.
829 830
830 831 Another wrinkle is doing the reverse, figuring out which changeset in
831 832 the changegroup a particular filenode or manifestnode belongs to.
832 833 """
833 834 cl = repo.changelog
834 835 if not roots:
835 836 roots = [nullid]
836 837 discbases = []
837 838 for n in roots:
838 839 discbases.extend([p for p in cl.parents(n) if p != nullid])
839 840 # TODO: remove call to nodesbetween.
840 841 csets, roots, heads = cl.nodesbetween(roots, heads)
841 842 included = set(csets)
842 843 discbases = [n for n in discbases if n not in included]
843 844 outgoing = discovery.outgoing(cl, discbases, heads)
844 845 bundler = packermap[version][0](repo)
845 846 return getsubset(repo, outgoing, bundler, source)
846 847
847 848 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
848 849 version='01'):
849 850 """Like getbundle, but taking a discovery.outgoing as an argument.
850 851
851 852 This is only implemented for local repos and reuses potentially
852 853 precomputed sets in outgoing. Returns a raw changegroup generator."""
853 854 if not outgoing.missing:
854 855 return None
855 856 bundler = packermap[version][0](repo, bundlecaps)
856 857 return getsubsetraw(repo, outgoing, bundler, source)
857 858
858 859 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None,
859 860 version='01'):
860 861 """Like getbundle, but taking a discovery.outgoing as an argument.
861 862
862 863 This is only implemented for local repos and reuses potentially
863 864 precomputed sets in outgoing."""
864 865 if not outgoing.missing:
865 866 return None
866 867 bundler = packermap[version][0](repo, bundlecaps)
867 868 return getsubset(repo, outgoing, bundler, source)
868 869
869 870 def computeoutgoing(repo, heads, common):
870 871 """Computes which revs are outgoing given a set of common
871 872 and a set of heads.
872 873
873 874 This is a separate function so extensions can have access to
874 875 the logic.
875 876
876 877 Returns a discovery.outgoing object.
877 878 """
878 879 cl = repo.changelog
879 880 if common:
880 881 hasnode = cl.hasnode
881 882 common = [n for n in common if hasnode(n)]
882 883 else:
883 884 common = [nullid]
884 885 if not heads:
885 886 heads = cl.heads()
886 887 return discovery.outgoing(cl, common, heads)
887 888
888 889 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None,
889 890 version='01'):
890 891 """Like changegroupsubset, but returns the set difference between the
891 892 ancestors of heads and the ancestors common.
892 893
893 894 If heads is None, use the local heads. If common is None, use [nullid].
894 895
895 896 The nodes in common might not all be known locally due to the way the
896 897 current discovery protocol works.
897 898 """
898 899 outgoing = computeoutgoing(repo, heads, common)
899 900 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps,
900 901 version=version)
901 902
902 903 def changegroup(repo, basenodes, source):
903 904 # to avoid a race we use changegroupsubset() (issue1320)
904 905 return changegroupsubset(repo, basenodes, repo.heads(), source)
905 906
906 def _addchangegroupfiles(repo, source, revmap, trp, pr, needfiles):
907 def _addchangegroupfiles(repo, source, revmap, trp, pr, needfiles, wasempty):
907 908 revisions = 0
908 909 files = 0
909 910 while True:
910 911 chunkdata = source.filelogheader()
911 912 if not chunkdata:
912 913 break
913 914 f = chunkdata["filename"]
914 915 repo.ui.debug("adding %s revisions\n" % f)
915 916 pr()
916 917 fl = repo.file(f)
917 918 o = len(fl)
918 919 try:
919 920 if not fl.addgroup(source, revmap, trp):
920 921 raise error.Abort(_("received file revlog group is empty"))
921 922 except error.CensoredBaseError as e:
922 923 raise error.Abort(_("received delta base is censored: %s") % e)
923 924 revisions += len(fl) - o
924 925 files += 1
925 926 if f in needfiles:
926 927 needs = needfiles[f]
927 928 for new in xrange(o, len(fl)):
928 929 n = fl.node(new)
929 930 if n in needs:
930 931 needs.remove(n)
931 932 else:
932 933 raise error.Abort(
933 934 _("received spurious file revlog entry"))
934 935 if not needs:
935 936 del needfiles[f]
936 937 repo.ui.progress(_('files'), None)
937 938
938 939 for f, needs in needfiles.iteritems():
939 940 fl = repo.file(f)
940 941 for n in needs:
941 942 try:
942 943 fl.rev(n)
943 944 except error.LookupError:
944 945 raise error.Abort(
945 946 _('missing file data for %s:%s - run hg verify') %
946 947 (f, hex(n)))
947 948
948 949 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now