##// END OF EJS Templates
changegroup: remove option to allow empty changegroup (API)...
Martin von Zweigbergk -
r33308:248d5890 default
parent child Browse files
Show More
@@ -1,1007 +1,1007
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullrev,
19 19 short,
20 20 )
21 21
22 22 from . import (
23 23 dagutil,
24 24 discovery,
25 25 error,
26 26 mdiff,
27 27 phases,
28 28 pycompat,
29 29 util,
30 30 )
31 31
32 32 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
33 33 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
34 34 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
35 35
36 36 def readexactly(stream, n):
37 37 '''read n bytes from stream.read and abort if less was available'''
38 38 s = stream.read(n)
39 39 if len(s) < n:
40 40 raise error.Abort(_("stream ended unexpectedly"
41 41 " (got %d bytes, expected %d)")
42 42 % (len(s), n))
43 43 return s
44 44
45 45 def getchunk(stream):
46 46 """return the next chunk from stream as a string"""
47 47 d = readexactly(stream, 4)
48 48 l = struct.unpack(">l", d)[0]
49 49 if l <= 4:
50 50 if l:
51 51 raise error.Abort(_("invalid chunk length %d") % l)
52 52 return ""
53 53 return readexactly(stream, l - 4)
54 54
55 55 def chunkheader(length):
56 56 """return a changegroup chunk header (string)"""
57 57 return struct.pack(">l", length + 4)
58 58
59 59 def closechunk():
60 60 """return a changegroup chunk header (string) for a zero-length chunk"""
61 61 return struct.pack(">l", 0)
62 62
63 63 def writechunks(ui, chunks, filename, vfs=None):
64 64 """Write chunks to a file and return its filename.
65 65
66 66 The stream is assumed to be a bundle file.
67 67 Existing files will not be overwritten.
68 68 If no filename is specified, a temporary file is created.
69 69 """
70 70 fh = None
71 71 cleanup = None
72 72 try:
73 73 if filename:
74 74 if vfs:
75 75 fh = vfs.open(filename, "wb")
76 76 else:
77 77 # Increase default buffer size because default is usually
78 78 # small (4k is common on Linux).
79 79 fh = open(filename, "wb", 131072)
80 80 else:
81 81 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
82 82 fh = os.fdopen(fd, pycompat.sysstr("wb"))
83 83 cleanup = filename
84 84 for c in chunks:
85 85 fh.write(c)
86 86 cleanup = None
87 87 return filename
88 88 finally:
89 89 if fh is not None:
90 90 fh.close()
91 91 if cleanup is not None:
92 92 if filename and vfs:
93 93 vfs.unlink(cleanup)
94 94 else:
95 95 os.unlink(cleanup)
96 96
97 97 class cg1unpacker(object):
98 98 """Unpacker for cg1 changegroup streams.
99 99
100 100 A changegroup unpacker handles the framing of the revision data in
101 101 the wire format. Most consumers will want to use the apply()
102 102 method to add the changes from the changegroup to a repository.
103 103
104 104 If you're forwarding a changegroup unmodified to another consumer,
105 105 use getchunks(), which returns an iterator of changegroup
106 106 chunks. This is mostly useful for cases where you need to know the
107 107 data stream has ended by observing the end of the changegroup.
108 108
109 109 deltachunk() is useful only if you're applying delta data. Most
110 110 consumers should prefer apply() instead.
111 111
112 112 A few other public methods exist. Those are used only for
113 113 bundlerepo and some debug commands - their use is discouraged.
114 114 """
115 115 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
116 116 deltaheadersize = struct.calcsize(deltaheader)
117 117 version = '01'
118 118 _grouplistcount = 1 # One list of files after the manifests
119 119
120 120 def __init__(self, fh, alg, extras=None):
121 121 if alg is None:
122 122 alg = 'UN'
123 123 if alg not in util.compengines.supportedbundletypes:
124 124 raise error.Abort(_('unknown stream compression type: %s')
125 125 % alg)
126 126 if alg == 'BZ':
127 127 alg = '_truncatedBZ'
128 128
129 129 compengine = util.compengines.forbundletype(alg)
130 130 self._stream = compengine.decompressorreader(fh)
131 131 self._type = alg
132 132 self.extras = extras or {}
133 133 self.callback = None
134 134
135 135 # These methods (compressed, read, seek, tell) all appear to only
136 136 # be used by bundlerepo, but it's a little hard to tell.
137 137 def compressed(self):
138 138 return self._type is not None and self._type != 'UN'
139 139 def read(self, l):
140 140 return self._stream.read(l)
141 141 def seek(self, pos):
142 142 return self._stream.seek(pos)
143 143 def tell(self):
144 144 return self._stream.tell()
145 145 def close(self):
146 146 return self._stream.close()
147 147
148 148 def _chunklength(self):
149 149 d = readexactly(self._stream, 4)
150 150 l = struct.unpack(">l", d)[0]
151 151 if l <= 4:
152 152 if l:
153 153 raise error.Abort(_("invalid chunk length %d") % l)
154 154 return 0
155 155 if self.callback:
156 156 self.callback()
157 157 return l - 4
158 158
159 159 def changelogheader(self):
160 160 """v10 does not have a changelog header chunk"""
161 161 return {}
162 162
163 163 def manifestheader(self):
164 164 """v10 does not have a manifest header chunk"""
165 165 return {}
166 166
167 167 def filelogheader(self):
168 168 """return the header of the filelogs chunk, v10 only has the filename"""
169 169 l = self._chunklength()
170 170 if not l:
171 171 return {}
172 172 fname = readexactly(self._stream, l)
173 173 return {'filename': fname}
174 174
175 175 def _deltaheader(self, headertuple, prevnode):
176 176 node, p1, p2, cs = headertuple
177 177 if prevnode is None:
178 178 deltabase = p1
179 179 else:
180 180 deltabase = prevnode
181 181 flags = 0
182 182 return node, p1, p2, deltabase, cs, flags
183 183
184 184 def deltachunk(self, prevnode):
185 185 l = self._chunklength()
186 186 if not l:
187 187 return {}
188 188 headerdata = readexactly(self._stream, self.deltaheadersize)
189 189 header = struct.unpack(self.deltaheader, headerdata)
190 190 delta = readexactly(self._stream, l - self.deltaheadersize)
191 191 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
192 192 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
193 193 'deltabase': deltabase, 'delta': delta, 'flags': flags}
194 194
195 195 def getchunks(self):
196 196 """returns all the chunks contains in the bundle
197 197
198 198 Used when you need to forward the binary stream to a file or another
199 199 network API. To do so, it parse the changegroup data, otherwise it will
200 200 block in case of sshrepo because it don't know the end of the stream.
201 201 """
202 202 # an empty chunkgroup is the end of the changegroup
203 203 # a changegroup has at least 2 chunkgroups (changelog and manifest).
204 204 # after that, changegroup versions 1 and 2 have a series of groups
205 205 # with one group per file. changegroup 3 has a series of directory
206 206 # manifests before the files.
207 207 count = 0
208 208 emptycount = 0
209 209 while emptycount < self._grouplistcount:
210 210 empty = True
211 211 count += 1
212 212 while True:
213 213 chunk = getchunk(self)
214 214 if not chunk:
215 215 if empty and count > 2:
216 216 emptycount += 1
217 217 break
218 218 empty = False
219 219 yield chunkheader(len(chunk))
220 220 pos = 0
221 221 while pos < len(chunk):
222 222 next = pos + 2**20
223 223 yield chunk[pos:next]
224 224 pos = next
225 225 yield closechunk()
226 226
227 227 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
228 228 # We know that we'll never have more manifests than we had
229 229 # changesets.
230 230 self.callback = prog(_('manifests'), numchanges)
231 231 # no need to check for empty manifest group here:
232 232 # if the result of the merge of 1 and 2 is the same in 3 and 4,
233 233 # no new manifest will be created and the manifest group will
234 234 # be empty during the pull
235 235 self.manifestheader()
236 236 repo.manifestlog._revlog.addgroup(self, revmap, trp)
237 237 repo.ui.progress(_('manifests'), None)
238 238 self.callback = None
239 239
240 def apply(self, repo, tr, srctype, url, emptyok=False,
241 targetphase=phases.draft, expectedtotal=None):
240 def apply(self, repo, tr, srctype, url, targetphase=phases.draft,
241 expectedtotal=None):
242 242 """Add the changegroup returned by source.read() to this repo.
243 243 srctype is a string like 'push', 'pull', or 'unbundle'. url is
244 244 the URL of the repo where this changegroup is coming from.
245 245
246 246 Return an integer summarizing the change to this repo:
247 247 - nothing changed or no source: 0
248 248 - more heads than before: 1+added heads (2..n)
249 249 - fewer heads than before: -1-removed heads (-2..-n)
250 250 - number of heads stays the same: 1
251 251 """
252 252 repo = repo.unfiltered()
253 253 def csmap(x):
254 254 repo.ui.debug("add changeset %s\n" % short(x))
255 255 return len(cl)
256 256
257 257 def revmap(x):
258 258 return cl.rev(x)
259 259
260 260 changesets = files = revisions = 0
261 261
262 262 try:
263 263 # The transaction may already carry source information. In this
264 264 # case we use the top level data. We overwrite the argument
265 265 # because we need to use the top level value (if they exist)
266 266 # in this function.
267 267 srctype = tr.hookargs.setdefault('source', srctype)
268 268 url = tr.hookargs.setdefault('url', url)
269 269 repo.hook('prechangegroup', throw=True, **tr.hookargs)
270 270
271 271 # write changelog data to temp files so concurrent readers
272 272 # will not see an inconsistent view
273 273 cl = repo.changelog
274 274 cl.delayupdate(tr)
275 275 oldheads = set(cl.heads())
276 276
277 277 trp = weakref.proxy(tr)
278 278 # pull off the changeset group
279 279 repo.ui.status(_("adding changesets\n"))
280 280 clstart = len(cl)
281 281 class prog(object):
282 282 def __init__(self, step, total):
283 283 self._step = step
284 284 self._total = total
285 285 self._count = 1
286 286 def __call__(self):
287 287 repo.ui.progress(self._step, self._count, unit=_('chunks'),
288 288 total=self._total)
289 289 self._count += 1
290 290 self.callback = prog(_('changesets'), expectedtotal)
291 291
292 292 efiles = set()
293 293 def onchangelog(cl, node):
294 294 efiles.update(cl.readfiles(node))
295 295
296 296 self.changelogheader()
297 297 cgnodes = cl.addgroup(self, csmap, trp, addrevisioncb=onchangelog)
298 298 efiles = len(efiles)
299 299
300 if not (cgnodes or emptyok):
300 if not cgnodes:
301 301 raise error.Abort(_("received changelog group is empty"))
302 302 clend = len(cl)
303 303 changesets = clend - clstart
304 304 repo.ui.progress(_('changesets'), None)
305 305 self.callback = None
306 306
307 307 # pull off the manifest group
308 308 repo.ui.status(_("adding manifests\n"))
309 309 self._unpackmanifests(repo, revmap, trp, prog, changesets)
310 310
311 311 needfiles = {}
312 312 if repo.ui.configbool('server', 'validate'):
313 313 cl = repo.changelog
314 314 ml = repo.manifestlog
315 315 # validate incoming csets have their manifests
316 316 for cset in xrange(clstart, clend):
317 317 mfnode = cl.changelogrevision(cset).manifest
318 318 mfest = ml[mfnode].readdelta()
319 319 # store file cgnodes we must see
320 320 for f, n in mfest.iteritems():
321 321 needfiles.setdefault(f, set()).add(n)
322 322
323 323 # process the files
324 324 repo.ui.status(_("adding file changes\n"))
325 325 newrevs, newfiles = _addchangegroupfiles(
326 326 repo, self, revmap, trp, efiles, needfiles)
327 327 revisions += newrevs
328 328 files += newfiles
329 329
330 330 deltaheads = 0
331 331 if oldheads:
332 332 heads = cl.heads()
333 333 deltaheads = len(heads) - len(oldheads)
334 334 for h in heads:
335 335 if h not in oldheads and repo[h].closesbranch():
336 336 deltaheads -= 1
337 337 htext = ""
338 338 if deltaheads:
339 339 htext = _(" (%+d heads)") % deltaheads
340 340
341 341 repo.ui.status(_("added %d changesets"
342 342 " with %d changes to %d files%s\n")
343 343 % (changesets, revisions, files, htext))
344 344 repo.invalidatevolatilesets()
345 345
346 346 if changesets > 0:
347 347 if 'node' not in tr.hookargs:
348 348 tr.hookargs['node'] = hex(cl.node(clstart))
349 349 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
350 350 hookargs = dict(tr.hookargs)
351 351 else:
352 352 hookargs = dict(tr.hookargs)
353 353 hookargs['node'] = hex(cl.node(clstart))
354 354 hookargs['node_last'] = hex(cl.node(clend - 1))
355 355 repo.hook('pretxnchangegroup', throw=True, **hookargs)
356 356
357 357 added = [cl.node(r) for r in xrange(clstart, clend)]
358 358 if srctype in ('push', 'serve'):
359 359 # Old servers can not push the boundary themselves.
360 360 # New servers won't push the boundary if changeset already
361 361 # exists locally as secret
362 362 #
363 363 # We should not use added here but the list of all change in
364 364 # the bundle
365 365 if repo.publishing():
366 366 phases.advanceboundary(repo, tr, phases.public, cgnodes)
367 367 else:
368 368 # Those changesets have been pushed from the
369 369 # outside, their phases are going to be pushed
370 370 # alongside. Therefor `targetphase` is
371 371 # ignored.
372 372 phases.advanceboundary(repo, tr, phases.draft, cgnodes)
373 373 phases.retractboundary(repo, tr, phases.draft, added)
374 374 elif srctype != 'strip':
375 375 # publishing only alter behavior during push
376 376 #
377 377 # strip should not touch boundary at all
378 378 phases.retractboundary(repo, tr, targetphase, added)
379 379
380 380 if changesets > 0:
381 381
382 382 def runhooks():
383 383 # These hooks run when the lock releases, not when the
384 384 # transaction closes. So it's possible for the changelog
385 385 # to have changed since we last saw it.
386 386 if clstart >= len(repo):
387 387 return
388 388
389 389 repo.hook("changegroup", **hookargs)
390 390
391 391 for n in added:
392 392 args = hookargs.copy()
393 393 args['node'] = hex(n)
394 394 del args['node_last']
395 395 repo.hook("incoming", **args)
396 396
397 397 newheads = [h for h in repo.heads()
398 398 if h not in oldheads]
399 399 repo.ui.log("incoming",
400 400 "%s incoming changes - new heads: %s\n",
401 401 len(added),
402 402 ', '.join([hex(c[:6]) for c in newheads]))
403 403
404 404 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
405 405 lambda tr: repo._afterlock(runhooks))
406 406 finally:
407 407 repo.ui.flush()
408 408 # never return 0 here:
409 409 if deltaheads < 0:
410 410 ret = deltaheads - 1
411 411 else:
412 412 ret = deltaheads + 1
413 413 return ret, added
414 414
415 415 class cg2unpacker(cg1unpacker):
416 416 """Unpacker for cg2 streams.
417 417
418 418 cg2 streams add support for generaldelta, so the delta header
419 419 format is slightly different. All other features about the data
420 420 remain the same.
421 421 """
422 422 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
423 423 deltaheadersize = struct.calcsize(deltaheader)
424 424 version = '02'
425 425
426 426 def _deltaheader(self, headertuple, prevnode):
427 427 node, p1, p2, deltabase, cs = headertuple
428 428 flags = 0
429 429 return node, p1, p2, deltabase, cs, flags
430 430
431 431 class cg3unpacker(cg2unpacker):
432 432 """Unpacker for cg3 streams.
433 433
434 434 cg3 streams add support for exchanging treemanifests and revlog
435 435 flags. It adds the revlog flags to the delta header and an empty chunk
436 436 separating manifests and files.
437 437 """
438 438 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
439 439 deltaheadersize = struct.calcsize(deltaheader)
440 440 version = '03'
441 441 _grouplistcount = 2 # One list of manifests and one list of files
442 442
443 443 def _deltaheader(self, headertuple, prevnode):
444 444 node, p1, p2, deltabase, cs, flags = headertuple
445 445 return node, p1, p2, deltabase, cs, flags
446 446
447 447 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
448 448 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog,
449 449 numchanges)
450 450 for chunkdata in iter(self.filelogheader, {}):
451 451 # If we get here, there are directory manifests in the changegroup
452 452 d = chunkdata["filename"]
453 453 repo.ui.debug("adding %s revisions\n" % d)
454 454 dirlog = repo.manifestlog._revlog.dirlog(d)
455 455 if not dirlog.addgroup(self, revmap, trp):
456 456 raise error.Abort(_("received dir revlog group is empty"))
457 457
458 458 class headerlessfixup(object):
459 459 def __init__(self, fh, h):
460 460 self._h = h
461 461 self._fh = fh
462 462 def read(self, n):
463 463 if self._h:
464 464 d, self._h = self._h[:n], self._h[n:]
465 465 if len(d) < n:
466 466 d += readexactly(self._fh, n - len(d))
467 467 return d
468 468 return readexactly(self._fh, n)
469 469
470 470 class cg1packer(object):
471 471 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
472 472 version = '01'
473 473 def __init__(self, repo, bundlecaps=None):
474 474 """Given a source repo, construct a bundler.
475 475
476 476 bundlecaps is optional and can be used to specify the set of
477 477 capabilities which can be used to build the bundle. While bundlecaps is
478 478 unused in core Mercurial, extensions rely on this feature to communicate
479 479 capabilities to customize the changegroup packer.
480 480 """
481 481 # Set of capabilities we can use to build the bundle.
482 482 if bundlecaps is None:
483 483 bundlecaps = set()
484 484 self._bundlecaps = bundlecaps
485 485 # experimental config: bundle.reorder
486 486 reorder = repo.ui.config('bundle', 'reorder')
487 487 if reorder == 'auto':
488 488 reorder = None
489 489 else:
490 490 reorder = util.parsebool(reorder)
491 491 self._repo = repo
492 492 self._reorder = reorder
493 493 self._progress = repo.ui.progress
494 494 if self._repo.ui.verbose and not self._repo.ui.debugflag:
495 495 self._verbosenote = self._repo.ui.note
496 496 else:
497 497 self._verbosenote = lambda s: None
498 498
499 499 def close(self):
500 500 return closechunk()
501 501
502 502 def fileheader(self, fname):
503 503 return chunkheader(len(fname)) + fname
504 504
505 505 # Extracted both for clarity and for overriding in extensions.
506 506 def _sortgroup(self, revlog, nodelist, lookup):
507 507 """Sort nodes for change group and turn them into revnums."""
508 508 # for generaldelta revlogs, we linearize the revs; this will both be
509 509 # much quicker and generate a much smaller bundle
510 510 if (revlog._generaldelta and self._reorder is None) or self._reorder:
511 511 dag = dagutil.revlogdag(revlog)
512 512 return dag.linearize(set(revlog.rev(n) for n in nodelist))
513 513 else:
514 514 return sorted([revlog.rev(n) for n in nodelist])
515 515
516 516 def group(self, nodelist, revlog, lookup, units=None):
517 517 """Calculate a delta group, yielding a sequence of changegroup chunks
518 518 (strings).
519 519
520 520 Given a list of changeset revs, return a set of deltas and
521 521 metadata corresponding to nodes. The first delta is
522 522 first parent(nodelist[0]) -> nodelist[0], the receiver is
523 523 guaranteed to have this parent as it has all history before
524 524 these changesets. In the case firstparent is nullrev the
525 525 changegroup starts with a full revision.
526 526
527 527 If units is not None, progress detail will be generated, units specifies
528 528 the type of revlog that is touched (changelog, manifest, etc.).
529 529 """
530 530 # if we don't have any revisions touched by these changesets, bail
531 531 if len(nodelist) == 0:
532 532 yield self.close()
533 533 return
534 534
535 535 revs = self._sortgroup(revlog, nodelist, lookup)
536 536
537 537 # add the parent of the first rev
538 538 p = revlog.parentrevs(revs[0])[0]
539 539 revs.insert(0, p)
540 540
541 541 # build deltas
542 542 total = len(revs) - 1
543 543 msgbundling = _('bundling')
544 544 for r in xrange(len(revs) - 1):
545 545 if units is not None:
546 546 self._progress(msgbundling, r + 1, unit=units, total=total)
547 547 prev, curr = revs[r], revs[r + 1]
548 548 linknode = lookup(revlog.node(curr))
549 549 for c in self.revchunk(revlog, curr, prev, linknode):
550 550 yield c
551 551
552 552 if units is not None:
553 553 self._progress(msgbundling, None)
554 554 yield self.close()
555 555
556 556 # filter any nodes that claim to be part of the known set
557 557 def prune(self, revlog, missing, commonrevs):
558 558 rr, rl = revlog.rev, revlog.linkrev
559 559 return [n for n in missing if rl(rr(n)) not in commonrevs]
560 560
561 561 def _packmanifests(self, dir, mfnodes, lookuplinknode):
562 562 """Pack flat manifests into a changegroup stream."""
563 563 assert not dir
564 564 for chunk in self.group(mfnodes, self._repo.manifestlog._revlog,
565 565 lookuplinknode, units=_('manifests')):
566 566 yield chunk
567 567
568 568 def _manifestsdone(self):
569 569 return ''
570 570
571 571 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
572 572 '''yield a sequence of changegroup chunks (strings)'''
573 573 repo = self._repo
574 574 cl = repo.changelog
575 575
576 576 clrevorder = {}
577 577 mfs = {} # needed manifests
578 578 fnodes = {} # needed file nodes
579 579 changedfiles = set()
580 580
581 581 # Callback for the changelog, used to collect changed files and manifest
582 582 # nodes.
583 583 # Returns the linkrev node (identity in the changelog case).
584 584 def lookupcl(x):
585 585 c = cl.read(x)
586 586 clrevorder[x] = len(clrevorder)
587 587 n = c[0]
588 588 # record the first changeset introducing this manifest version
589 589 mfs.setdefault(n, x)
590 590 # Record a complete list of potentially-changed files in
591 591 # this manifest.
592 592 changedfiles.update(c[3])
593 593 return x
594 594
595 595 self._verbosenote(_('uncompressed size of bundle content:\n'))
596 596 size = 0
597 597 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
598 598 size += len(chunk)
599 599 yield chunk
600 600 self._verbosenote(_('%8.i (changelog)\n') % size)
601 601
602 602 # We need to make sure that the linkrev in the changegroup refers to
603 603 # the first changeset that introduced the manifest or file revision.
604 604 # The fastpath is usually safer than the slowpath, because the filelogs
605 605 # are walked in revlog order.
606 606 #
607 607 # When taking the slowpath with reorder=None and the manifest revlog
608 608 # uses generaldelta, the manifest may be walked in the "wrong" order.
609 609 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
610 610 # cc0ff93d0c0c).
611 611 #
612 612 # When taking the fastpath, we are only vulnerable to reordering
613 613 # of the changelog itself. The changelog never uses generaldelta, so
614 614 # it is only reordered when reorder=True. To handle this case, we
615 615 # simply take the slowpath, which already has the 'clrevorder' logic.
616 616 # This was also fixed in cc0ff93d0c0c.
617 617 fastpathlinkrev = fastpathlinkrev and not self._reorder
618 618 # Treemanifests don't work correctly with fastpathlinkrev
619 619 # either, because we don't discover which directory nodes to
620 620 # send along with files. This could probably be fixed.
621 621 fastpathlinkrev = fastpathlinkrev and (
622 622 'treemanifest' not in repo.requirements)
623 623
624 624 for chunk in self.generatemanifests(commonrevs, clrevorder,
625 625 fastpathlinkrev, mfs, fnodes):
626 626 yield chunk
627 627 mfs.clear()
628 628 clrevs = set(cl.rev(x) for x in clnodes)
629 629
630 630 if not fastpathlinkrev:
631 631 def linknodes(unused, fname):
632 632 return fnodes.get(fname, {})
633 633 else:
634 634 cln = cl.node
635 635 def linknodes(filerevlog, fname):
636 636 llr = filerevlog.linkrev
637 637 fln = filerevlog.node
638 638 revs = ((r, llr(r)) for r in filerevlog)
639 639 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
640 640
641 641 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
642 642 source):
643 643 yield chunk
644 644
645 645 yield self.close()
646 646
647 647 if clnodes:
648 648 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
649 649
650 650 def generatemanifests(self, commonrevs, clrevorder, fastpathlinkrev, mfs,
651 651 fnodes):
652 652 repo = self._repo
653 653 mfl = repo.manifestlog
654 654 dirlog = mfl._revlog.dirlog
655 655 tmfnodes = {'': mfs}
656 656
657 657 # Callback for the manifest, used to collect linkrevs for filelog
658 658 # revisions.
659 659 # Returns the linkrev node (collected in lookupcl).
660 660 def makelookupmflinknode(dir):
661 661 if fastpathlinkrev:
662 662 assert not dir
663 663 return mfs.__getitem__
664 664
665 665 def lookupmflinknode(x):
666 666 """Callback for looking up the linknode for manifests.
667 667
668 668 Returns the linkrev node for the specified manifest.
669 669
670 670 SIDE EFFECT:
671 671
672 672 1) fclnodes gets populated with the list of relevant
673 673 file nodes if we're not using fastpathlinkrev
674 674 2) When treemanifests are in use, collects treemanifest nodes
675 675 to send
676 676
677 677 Note that this means manifests must be completely sent to
678 678 the client before you can trust the list of files and
679 679 treemanifests to send.
680 680 """
681 681 clnode = tmfnodes[dir][x]
682 682 mdata = mfl.get(dir, x).readfast(shallow=True)
683 683 for p, n, fl in mdata.iterentries():
684 684 if fl == 't': # subdirectory manifest
685 685 subdir = dir + p + '/'
686 686 tmfclnodes = tmfnodes.setdefault(subdir, {})
687 687 tmfclnode = tmfclnodes.setdefault(n, clnode)
688 688 if clrevorder[clnode] < clrevorder[tmfclnode]:
689 689 tmfclnodes[n] = clnode
690 690 else:
691 691 f = dir + p
692 692 fclnodes = fnodes.setdefault(f, {})
693 693 fclnode = fclnodes.setdefault(n, clnode)
694 694 if clrevorder[clnode] < clrevorder[fclnode]:
695 695 fclnodes[n] = clnode
696 696 return clnode
697 697 return lookupmflinknode
698 698
699 699 size = 0
700 700 while tmfnodes:
701 701 dir = min(tmfnodes)
702 702 nodes = tmfnodes[dir]
703 703 prunednodes = self.prune(dirlog(dir), nodes, commonrevs)
704 704 if not dir or prunednodes:
705 705 for x in self._packmanifests(dir, prunednodes,
706 706 makelookupmflinknode(dir)):
707 707 size += len(x)
708 708 yield x
709 709 del tmfnodes[dir]
710 710 self._verbosenote(_('%8.i (manifests)\n') % size)
711 711 yield self._manifestsdone()
712 712
713 713 # The 'source' parameter is useful for extensions
714 714 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
715 715 repo = self._repo
716 716 progress = self._progress
717 717 msgbundling = _('bundling')
718 718
719 719 total = len(changedfiles)
720 720 # for progress output
721 721 msgfiles = _('files')
722 722 for i, fname in enumerate(sorted(changedfiles)):
723 723 filerevlog = repo.file(fname)
724 724 if not filerevlog:
725 725 raise error.Abort(_("empty or missing revlog for %s") % fname)
726 726
727 727 linkrevnodes = linknodes(filerevlog, fname)
728 728 # Lookup for filenodes, we collected the linkrev nodes above in the
729 729 # fastpath case and with lookupmf in the slowpath case.
730 730 def lookupfilelog(x):
731 731 return linkrevnodes[x]
732 732
733 733 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
734 734 if filenodes:
735 735 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
736 736 total=total)
737 737 h = self.fileheader(fname)
738 738 size = len(h)
739 739 yield h
740 740 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
741 741 size += len(chunk)
742 742 yield chunk
743 743 self._verbosenote(_('%8.i %s\n') % (size, fname))
744 744 progress(msgbundling, None)
745 745
746 746 def deltaparent(self, revlog, rev, p1, p2, prev):
747 747 return prev
748 748
749 749 def revchunk(self, revlog, rev, prev, linknode):
750 750 node = revlog.node(rev)
751 751 p1, p2 = revlog.parentrevs(rev)
752 752 base = self.deltaparent(revlog, rev, p1, p2, prev)
753 753
754 754 prefix = ''
755 755 if revlog.iscensored(base) or revlog.iscensored(rev):
756 756 try:
757 757 delta = revlog.revision(node, raw=True)
758 758 except error.CensoredNodeError as e:
759 759 delta = e.tombstone
760 760 if base == nullrev:
761 761 prefix = mdiff.trivialdiffheader(len(delta))
762 762 else:
763 763 baselen = revlog.rawsize(base)
764 764 prefix = mdiff.replacediffheader(baselen, len(delta))
765 765 elif base == nullrev:
766 766 delta = revlog.revision(node, raw=True)
767 767 prefix = mdiff.trivialdiffheader(len(delta))
768 768 else:
769 769 delta = revlog.revdiff(base, rev)
770 770 p1n, p2n = revlog.parents(node)
771 771 basenode = revlog.node(base)
772 772 flags = revlog.flags(rev)
773 773 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
774 774 meta += prefix
775 775 l = len(meta) + len(delta)
776 776 yield chunkheader(l)
777 777 yield meta
778 778 yield delta
779 779 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
780 780 # do nothing with basenode, it is implicitly the previous one in HG10
781 781 # do nothing with flags, it is implicitly 0 for cg1 and cg2
782 782 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
783 783
784 784 class cg2packer(cg1packer):
785 785 version = '02'
786 786 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
787 787
788 788 def __init__(self, repo, bundlecaps=None):
789 789 super(cg2packer, self).__init__(repo, bundlecaps)
790 790 if self._reorder is None:
791 791 # Since generaldelta is directly supported by cg2, reordering
792 792 # generally doesn't help, so we disable it by default (treating
793 793 # bundle.reorder=auto just like bundle.reorder=False).
794 794 self._reorder = False
795 795
796 796 def deltaparent(self, revlog, rev, p1, p2, prev):
797 797 dp = revlog.deltaparent(rev)
798 798 if dp == nullrev and revlog.storedeltachains:
799 799 # Avoid sending full revisions when delta parent is null. Pick prev
800 800 # in that case. It's tempting to pick p1 in this case, as p1 will
801 801 # be smaller in the common case. However, computing a delta against
802 802 # p1 may require resolving the raw text of p1, which could be
803 803 # expensive. The revlog caches should have prev cached, meaning
804 804 # less CPU for changegroup generation. There is likely room to add
805 805 # a flag and/or config option to control this behavior.
806 806 return prev
807 807 elif dp == nullrev:
808 808 # revlog is configured to use full snapshot for a reason,
809 809 # stick to full snapshot.
810 810 return nullrev
811 811 elif dp not in (p1, p2, prev):
812 812 # Pick prev when we can't be sure remote has the base revision.
813 813 return prev
814 814 else:
815 815 return dp
816 816
817 817 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
818 818 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
819 819 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
820 820
821 821 class cg3packer(cg2packer):
822 822 version = '03'
823 823 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
824 824
825 825 def _packmanifests(self, dir, mfnodes, lookuplinknode):
826 826 if dir:
827 827 yield self.fileheader(dir)
828 828
829 829 dirlog = self._repo.manifestlog._revlog.dirlog(dir)
830 830 for chunk in self.group(mfnodes, dirlog, lookuplinknode,
831 831 units=_('manifests')):
832 832 yield chunk
833 833
834 834 def _manifestsdone(self):
835 835 return self.close()
836 836
837 837 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
838 838 return struct.pack(
839 839 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
840 840
841 841 _packermap = {'01': (cg1packer, cg1unpacker),
842 842 # cg2 adds support for exchanging generaldelta
843 843 '02': (cg2packer, cg2unpacker),
844 844 # cg3 adds support for exchanging revlog flags and treemanifests
845 845 '03': (cg3packer, cg3unpacker),
846 846 }
847 847
848 848 def allsupportedversions(repo):
849 849 versions = set(_packermap.keys())
850 850 if not (repo.ui.configbool('experimental', 'changegroup3') or
851 851 repo.ui.configbool('experimental', 'treemanifest') or
852 852 'treemanifest' in repo.requirements):
853 853 versions.discard('03')
854 854 return versions
855 855
856 856 # Changegroup versions that can be applied to the repo
857 857 def supportedincomingversions(repo):
858 858 return allsupportedversions(repo)
859 859
860 860 # Changegroup versions that can be created from the repo
861 861 def supportedoutgoingversions(repo):
862 862 versions = allsupportedversions(repo)
863 863 if 'treemanifest' in repo.requirements:
864 864 # Versions 01 and 02 support only flat manifests and it's just too
865 865 # expensive to convert between the flat manifest and tree manifest on
866 866 # the fly. Since tree manifests are hashed differently, all of history
867 867 # would have to be converted. Instead, we simply don't even pretend to
868 868 # support versions 01 and 02.
869 869 versions.discard('01')
870 870 versions.discard('02')
871 871 return versions
872 872
873 873 def safeversion(repo):
874 874 # Finds the smallest version that it's safe to assume clients of the repo
875 875 # will support. For example, all hg versions that support generaldelta also
876 876 # support changegroup 02.
877 877 versions = supportedoutgoingversions(repo)
878 878 if 'generaldelta' in repo.requirements:
879 879 versions.discard('01')
880 880 assert versions
881 881 return min(versions)
882 882
883 883 def getbundler(version, repo, bundlecaps=None):
884 884 assert version in supportedoutgoingversions(repo)
885 885 return _packermap[version][0](repo, bundlecaps)
886 886
887 887 def getunbundler(version, fh, alg, extras=None):
888 888 return _packermap[version][1](fh, alg, extras=extras)
889 889
890 890 def _changegroupinfo(repo, nodes, source):
891 891 if repo.ui.verbose or source == 'bundle':
892 892 repo.ui.status(_("%d changesets found\n") % len(nodes))
893 893 if repo.ui.debugflag:
894 894 repo.ui.debug("list of changesets:\n")
895 895 for node in nodes:
896 896 repo.ui.debug("%s\n" % hex(node))
897 897
898 898 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
899 899 repo = repo.unfiltered()
900 900 commonrevs = outgoing.common
901 901 csets = outgoing.missing
902 902 heads = outgoing.missingheads
903 903 # We go through the fast path if we get told to, or if all (unfiltered
904 904 # heads have been requested (since we then know there all linkrevs will
905 905 # be pulled by the client).
906 906 heads.sort()
907 907 fastpathlinkrev = fastpath or (
908 908 repo.filtername is None and heads == sorted(repo.heads()))
909 909
910 910 repo.hook('preoutgoing', throw=True, source=source)
911 911 _changegroupinfo(repo, csets, source)
912 912 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
913 913
914 914 def getsubset(repo, outgoing, bundler, source, fastpath=False):
915 915 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
916 916 return getunbundler(bundler.version, util.chunkbuffer(gengroup), None,
917 917 {'clcount': len(outgoing.missing)})
918 918
919 919 def changegroupsubset(repo, roots, heads, source, version='01'):
920 920 """Compute a changegroup consisting of all the nodes that are
921 921 descendants of any of the roots and ancestors of any of the heads.
922 922 Return a chunkbuffer object whose read() method will return
923 923 successive changegroup chunks.
924 924
925 925 It is fairly complex as determining which filenodes and which
926 926 manifest nodes need to be included for the changeset to be complete
927 927 is non-trivial.
928 928
929 929 Another wrinkle is doing the reverse, figuring out which changeset in
930 930 the changegroup a particular filenode or manifestnode belongs to.
931 931 """
932 932 outgoing = discovery.outgoing(repo, missingroots=roots, missingheads=heads)
933 933 bundler = getbundler(version, repo)
934 934 return getsubset(repo, outgoing, bundler, source)
935 935
936 936 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
937 937 version='01'):
938 938 """Like getbundle, but taking a discovery.outgoing as an argument.
939 939
940 940 This is only implemented for local repos and reuses potentially
941 941 precomputed sets in outgoing. Returns a raw changegroup generator."""
942 942 if not outgoing.missing:
943 943 return None
944 944 bundler = getbundler(version, repo, bundlecaps)
945 945 return getsubsetraw(repo, outgoing, bundler, source)
946 946
947 947 def getchangegroup(repo, source, outgoing, bundlecaps=None,
948 948 version='01'):
949 949 """Like getbundle, but taking a discovery.outgoing as an argument.
950 950
951 951 This is only implemented for local repos and reuses potentially
952 952 precomputed sets in outgoing."""
953 953 if not outgoing.missing:
954 954 return None
955 955 bundler = getbundler(version, repo, bundlecaps)
956 956 return getsubset(repo, outgoing, bundler, source)
957 957
958 958 def getlocalchangegroup(repo, *args, **kwargs):
959 959 repo.ui.deprecwarn('getlocalchangegroup is deprecated, use getchangegroup',
960 960 '4.3')
961 961 return getchangegroup(repo, *args, **kwargs)
962 962
963 963 def changegroup(repo, basenodes, source):
964 964 # to avoid a race we use changegroupsubset() (issue1320)
965 965 return changegroupsubset(repo, basenodes, repo.heads(), source)
966 966
967 967 def _addchangegroupfiles(repo, source, revmap, trp, expectedfiles, needfiles):
968 968 revisions = 0
969 969 files = 0
970 970 for chunkdata in iter(source.filelogheader, {}):
971 971 files += 1
972 972 f = chunkdata["filename"]
973 973 repo.ui.debug("adding %s revisions\n" % f)
974 974 repo.ui.progress(_('files'), files, unit=_('files'),
975 975 total=expectedfiles)
976 976 fl = repo.file(f)
977 977 o = len(fl)
978 978 try:
979 979 if not fl.addgroup(source, revmap, trp):
980 980 raise error.Abort(_("received file revlog group is empty"))
981 981 except error.CensoredBaseError as e:
982 982 raise error.Abort(_("received delta base is censored: %s") % e)
983 983 revisions += len(fl) - o
984 984 if f in needfiles:
985 985 needs = needfiles[f]
986 986 for new in xrange(o, len(fl)):
987 987 n = fl.node(new)
988 988 if n in needs:
989 989 needs.remove(n)
990 990 else:
991 991 raise error.Abort(
992 992 _("received spurious file revlog entry"))
993 993 if not needs:
994 994 del needfiles[f]
995 995 repo.ui.progress(_('files'), None)
996 996
997 997 for f, needs in needfiles.iteritems():
998 998 fl = repo.file(f)
999 999 for n in needs:
1000 1000 try:
1001 1001 fl.rev(n)
1002 1002 except error.LookupError:
1003 1003 raise error.Abort(
1004 1004 _('missing file data for %s:%s - run hg verify') %
1005 1005 (f, hex(n)))
1006 1006
1007 1007 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now