##// END OF EJS Templates
changegroup: use changelogrevision()...
Gregory Szorc -
r30268:dc7c4dbc default
parent child Browse files
Show More
@@ -1,1043 +1,1043 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullrev,
19 19 short,
20 20 )
21 21
22 22 from . import (
23 23 branchmap,
24 24 dagutil,
25 25 discovery,
26 26 error,
27 27 mdiff,
28 28 phases,
29 29 util,
30 30 )
31 31
32 32 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
33 33 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
34 34 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
35 35
36 36 def readexactly(stream, n):
37 37 '''read n bytes from stream.read and abort if less was available'''
38 38 s = stream.read(n)
39 39 if len(s) < n:
40 40 raise error.Abort(_("stream ended unexpectedly"
41 41 " (got %d bytes, expected %d)")
42 42 % (len(s), n))
43 43 return s
44 44
45 45 def getchunk(stream):
46 46 """return the next chunk from stream as a string"""
47 47 d = readexactly(stream, 4)
48 48 l = struct.unpack(">l", d)[0]
49 49 if l <= 4:
50 50 if l:
51 51 raise error.Abort(_("invalid chunk length %d") % l)
52 52 return ""
53 53 return readexactly(stream, l - 4)
54 54
55 55 def chunkheader(length):
56 56 """return a changegroup chunk header (string)"""
57 57 return struct.pack(">l", length + 4)
58 58
59 59 def closechunk():
60 60 """return a changegroup chunk header (string) for a zero-length chunk"""
61 61 return struct.pack(">l", 0)
62 62
63 63 def combineresults(results):
64 64 """logic to combine 0 or more addchangegroup results into one"""
65 65 changedheads = 0
66 66 result = 1
67 67 for ret in results:
68 68 # If any changegroup result is 0, return 0
69 69 if ret == 0:
70 70 result = 0
71 71 break
72 72 if ret < -1:
73 73 changedheads += ret + 1
74 74 elif ret > 1:
75 75 changedheads += ret - 1
76 76 if changedheads > 0:
77 77 result = 1 + changedheads
78 78 elif changedheads < 0:
79 79 result = -1 + changedheads
80 80 return result
81 81
82 82 def writechunks(ui, chunks, filename, vfs=None):
83 83 """Write chunks to a file and return its filename.
84 84
85 85 The stream is assumed to be a bundle file.
86 86 Existing files will not be overwritten.
87 87 If no filename is specified, a temporary file is created.
88 88 """
89 89 fh = None
90 90 cleanup = None
91 91 try:
92 92 if filename:
93 93 if vfs:
94 94 fh = vfs.open(filename, "wb")
95 95 else:
96 96 # Increase default buffer size because default is usually
97 97 # small (4k is common on Linux).
98 98 fh = open(filename, "wb", 131072)
99 99 else:
100 100 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
101 101 fh = os.fdopen(fd, "wb")
102 102 cleanup = filename
103 103 for c in chunks:
104 104 fh.write(c)
105 105 cleanup = None
106 106 return filename
107 107 finally:
108 108 if fh is not None:
109 109 fh.close()
110 110 if cleanup is not None:
111 111 if filename and vfs:
112 112 vfs.unlink(cleanup)
113 113 else:
114 114 os.unlink(cleanup)
115 115
116 116 class cg1unpacker(object):
117 117 """Unpacker for cg1 changegroup streams.
118 118
119 119 A changegroup unpacker handles the framing of the revision data in
120 120 the wire format. Most consumers will want to use the apply()
121 121 method to add the changes from the changegroup to a repository.
122 122
123 123 If you're forwarding a changegroup unmodified to another consumer,
124 124 use getchunks(), which returns an iterator of changegroup
125 125 chunks. This is mostly useful for cases where you need to know the
126 126 data stream has ended by observing the end of the changegroup.
127 127
128 128 deltachunk() is useful only if you're applying delta data. Most
129 129 consumers should prefer apply() instead.
130 130
131 131 A few other public methods exist. Those are used only for
132 132 bundlerepo and some debug commands - their use is discouraged.
133 133 """
134 134 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
135 135 deltaheadersize = struct.calcsize(deltaheader)
136 136 version = '01'
137 137 _grouplistcount = 1 # One list of files after the manifests
138 138
139 139 def __init__(self, fh, alg, extras=None):
140 140 if alg == 'UN':
141 141 alg = None # get more modern without breaking too much
142 142 if not alg in util.decompressors:
143 143 raise error.Abort(_('unknown stream compression type: %s')
144 144 % alg)
145 145 if alg == 'BZ':
146 146 alg = '_truncatedBZ'
147 147 self._stream = util.decompressors[alg](fh)
148 148 self._type = alg
149 149 self.extras = extras or {}
150 150 self.callback = None
151 151
152 152 # These methods (compressed, read, seek, tell) all appear to only
153 153 # be used by bundlerepo, but it's a little hard to tell.
154 154 def compressed(self):
155 155 return self._type is not None
156 156 def read(self, l):
157 157 return self._stream.read(l)
158 158 def seek(self, pos):
159 159 return self._stream.seek(pos)
160 160 def tell(self):
161 161 return self._stream.tell()
162 162 def close(self):
163 163 return self._stream.close()
164 164
165 165 def _chunklength(self):
166 166 d = readexactly(self._stream, 4)
167 167 l = struct.unpack(">l", d)[0]
168 168 if l <= 4:
169 169 if l:
170 170 raise error.Abort(_("invalid chunk length %d") % l)
171 171 return 0
172 172 if self.callback:
173 173 self.callback()
174 174 return l - 4
175 175
176 176 def changelogheader(self):
177 177 """v10 does not have a changelog header chunk"""
178 178 return {}
179 179
180 180 def manifestheader(self):
181 181 """v10 does not have a manifest header chunk"""
182 182 return {}
183 183
184 184 def filelogheader(self):
185 185 """return the header of the filelogs chunk, v10 only has the filename"""
186 186 l = self._chunklength()
187 187 if not l:
188 188 return {}
189 189 fname = readexactly(self._stream, l)
190 190 return {'filename': fname}
191 191
192 192 def _deltaheader(self, headertuple, prevnode):
193 193 node, p1, p2, cs = headertuple
194 194 if prevnode is None:
195 195 deltabase = p1
196 196 else:
197 197 deltabase = prevnode
198 198 flags = 0
199 199 return node, p1, p2, deltabase, cs, flags
200 200
201 201 def deltachunk(self, prevnode):
202 202 l = self._chunklength()
203 203 if not l:
204 204 return {}
205 205 headerdata = readexactly(self._stream, self.deltaheadersize)
206 206 header = struct.unpack(self.deltaheader, headerdata)
207 207 delta = readexactly(self._stream, l - self.deltaheadersize)
208 208 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
209 209 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
210 210 'deltabase': deltabase, 'delta': delta, 'flags': flags}
211 211
212 212 def getchunks(self):
213 213 """returns all the chunks contains in the bundle
214 214
215 215 Used when you need to forward the binary stream to a file or another
216 216 network API. To do so, it parse the changegroup data, otherwise it will
217 217 block in case of sshrepo because it don't know the end of the stream.
218 218 """
219 219 # an empty chunkgroup is the end of the changegroup
220 220 # a changegroup has at least 2 chunkgroups (changelog and manifest).
221 221 # after that, changegroup versions 1 and 2 have a series of groups
222 222 # with one group per file. changegroup 3 has a series of directory
223 223 # manifests before the files.
224 224 count = 0
225 225 emptycount = 0
226 226 while emptycount < self._grouplistcount:
227 227 empty = True
228 228 count += 1
229 229 while True:
230 230 chunk = getchunk(self)
231 231 if not chunk:
232 232 if empty and count > 2:
233 233 emptycount += 1
234 234 break
235 235 empty = False
236 236 yield chunkheader(len(chunk))
237 237 pos = 0
238 238 while pos < len(chunk):
239 239 next = pos + 2**20
240 240 yield chunk[pos:next]
241 241 pos = next
242 242 yield closechunk()
243 243
244 244 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
245 245 # We know that we'll never have more manifests than we had
246 246 # changesets.
247 247 self.callback = prog(_('manifests'), numchanges)
248 248 # no need to check for empty manifest group here:
249 249 # if the result of the merge of 1 and 2 is the same in 3 and 4,
250 250 # no new manifest will be created and the manifest group will
251 251 # be empty during the pull
252 252 self.manifestheader()
253 253 repo.manifest.addgroup(self, revmap, trp)
254 254 repo.ui.progress(_('manifests'), None)
255 255 self.callback = None
256 256
257 257 def apply(self, repo, srctype, url, emptyok=False,
258 258 targetphase=phases.draft, expectedtotal=None):
259 259 """Add the changegroup returned by source.read() to this repo.
260 260 srctype is a string like 'push', 'pull', or 'unbundle'. url is
261 261 the URL of the repo where this changegroup is coming from.
262 262
263 263 Return an integer summarizing the change to this repo:
264 264 - nothing changed or no source: 0
265 265 - more heads than before: 1+added heads (2..n)
266 266 - fewer heads than before: -1-removed heads (-2..-n)
267 267 - number of heads stays the same: 1
268 268 """
269 269 repo = repo.unfiltered()
270 270 def csmap(x):
271 271 repo.ui.debug("add changeset %s\n" % short(x))
272 272 return len(cl)
273 273
274 274 def revmap(x):
275 275 return cl.rev(x)
276 276
277 277 changesets = files = revisions = 0
278 278
279 279 try:
280 280 with repo.transaction("\n".join([srctype,
281 281 util.hidepassword(url)])) as tr:
282 282 # The transaction could have been created before and already
283 283 # carries source information. In this case we use the top
284 284 # level data. We overwrite the argument because we need to use
285 285 # the top level value (if they exist) in this function.
286 286 srctype = tr.hookargs.setdefault('source', srctype)
287 287 url = tr.hookargs.setdefault('url', url)
288 288 repo.hook('prechangegroup', throw=True, **tr.hookargs)
289 289
290 290 # write changelog data to temp files so concurrent readers
291 291 # will not see an inconsistent view
292 292 cl = repo.changelog
293 293 cl.delayupdate(tr)
294 294 oldheads = cl.heads()
295 295
296 296 trp = weakref.proxy(tr)
297 297 # pull off the changeset group
298 298 repo.ui.status(_("adding changesets\n"))
299 299 clstart = len(cl)
300 300 class prog(object):
301 301 def __init__(self, step, total):
302 302 self._step = step
303 303 self._total = total
304 304 self._count = 1
305 305 def __call__(self):
306 306 repo.ui.progress(self._step, self._count,
307 307 unit=_('chunks'), total=self._total)
308 308 self._count += 1
309 309 self.callback = prog(_('changesets'), expectedtotal)
310 310
311 311 efiles = set()
312 312 def onchangelog(cl, node):
313 313 efiles.update(cl.readfiles(node))
314 314
315 315 self.changelogheader()
316 316 srccontent = cl.addgroup(self, csmap, trp,
317 317 addrevisioncb=onchangelog)
318 318 efiles = len(efiles)
319 319
320 320 if not (srccontent or emptyok):
321 321 raise error.Abort(_("received changelog group is empty"))
322 322 clend = len(cl)
323 323 changesets = clend - clstart
324 324 repo.ui.progress(_('changesets'), None)
325 325 self.callback = None
326 326
327 327 # pull off the manifest group
328 328 repo.ui.status(_("adding manifests\n"))
329 329 self._unpackmanifests(repo, revmap, trp, prog, changesets)
330 330
331 331 needfiles = {}
332 332 if repo.ui.configbool('server', 'validate', default=False):
333 333 cl = repo.changelog
334 334 ml = repo.manifestlog
335 335 # validate incoming csets have their manifests
336 336 for cset in xrange(clstart, clend):
337 mfnode = cl.read(cl.node(cset))[0]
337 mfnode = cl.changelogrevision(cset).manifest
338 338 mfest = ml[mfnode].readdelta()
339 339 # store file nodes we must see
340 340 for f, n in mfest.iteritems():
341 341 needfiles.setdefault(f, set()).add(n)
342 342
343 343 # process the files
344 344 repo.ui.status(_("adding file changes\n"))
345 345 newrevs, newfiles = _addchangegroupfiles(
346 346 repo, self, revmap, trp, efiles, needfiles)
347 347 revisions += newrevs
348 348 files += newfiles
349 349
350 350 dh = 0
351 351 if oldheads:
352 352 heads = cl.heads()
353 353 dh = len(heads) - len(oldheads)
354 354 for h in heads:
355 355 if h not in oldheads and repo[h].closesbranch():
356 356 dh -= 1
357 357 htext = ""
358 358 if dh:
359 359 htext = _(" (%+d heads)") % dh
360 360
361 361 repo.ui.status(_("added %d changesets"
362 362 " with %d changes to %d files%s\n")
363 363 % (changesets, revisions, files, htext))
364 364 repo.invalidatevolatilesets()
365 365
366 366 if changesets > 0:
367 367 if 'node' not in tr.hookargs:
368 368 tr.hookargs['node'] = hex(cl.node(clstart))
369 369 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
370 370 hookargs = dict(tr.hookargs)
371 371 else:
372 372 hookargs = dict(tr.hookargs)
373 373 hookargs['node'] = hex(cl.node(clstart))
374 374 hookargs['node_last'] = hex(cl.node(clend - 1))
375 375 repo.hook('pretxnchangegroup', throw=True, **hookargs)
376 376
377 377 added = [cl.node(r) for r in xrange(clstart, clend)]
378 378 publishing = repo.publishing()
379 379 if srctype in ('push', 'serve'):
380 380 # Old servers can not push the boundary themselves.
381 381 # New servers won't push the boundary if changeset already
382 382 # exists locally as secret
383 383 #
384 384 # We should not use added here but the list of all change in
385 385 # the bundle
386 386 if publishing:
387 387 phases.advanceboundary(repo, tr, phases.public,
388 388 srccontent)
389 389 else:
390 390 # Those changesets have been pushed from the
391 391 # outside, their phases are going to be pushed
392 392 # alongside. Therefor `targetphase` is
393 393 # ignored.
394 394 phases.advanceboundary(repo, tr, phases.draft,
395 395 srccontent)
396 396 phases.retractboundary(repo, tr, phases.draft, added)
397 397 elif srctype != 'strip':
398 398 # publishing only alter behavior during push
399 399 #
400 400 # strip should not touch boundary at all
401 401 phases.retractboundary(repo, tr, targetphase, added)
402 402
403 403 if changesets > 0:
404 404 if srctype != 'strip':
405 405 # During strip, branchcache is invalid but
406 406 # coming call to `destroyed` will repair it.
407 407 # In other case we can safely update cache on
408 408 # disk.
409 409 repo.ui.debug('updating the branch cache\n')
410 410 branchmap.updatecache(repo.filtered('served'))
411 411
412 412 def runhooks():
413 413 # These hooks run when the lock releases, not when the
414 414 # transaction closes. So it's possible for the changelog
415 415 # to have changed since we last saw it.
416 416 if clstart >= len(repo):
417 417 return
418 418
419 419 repo.hook("changegroup", **hookargs)
420 420
421 421 for n in added:
422 422 args = hookargs.copy()
423 423 args['node'] = hex(n)
424 424 del args['node_last']
425 425 repo.hook("incoming", **args)
426 426
427 427 newheads = [h for h in repo.heads()
428 428 if h not in oldheads]
429 429 repo.ui.log("incoming",
430 430 "%s incoming changes - new heads: %s\n",
431 431 len(added),
432 432 ', '.join([hex(c[:6]) for c in newheads]))
433 433
434 434 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
435 435 lambda tr: repo._afterlock(runhooks))
436 436 finally:
437 437 repo.ui.flush()
438 438 # never return 0 here:
439 439 if dh < 0:
440 440 return dh - 1
441 441 else:
442 442 return dh + 1
443 443
444 444 class cg2unpacker(cg1unpacker):
445 445 """Unpacker for cg2 streams.
446 446
447 447 cg2 streams add support for generaldelta, so the delta header
448 448 format is slightly different. All other features about the data
449 449 remain the same.
450 450 """
451 451 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
452 452 deltaheadersize = struct.calcsize(deltaheader)
453 453 version = '02'
454 454
455 455 def _deltaheader(self, headertuple, prevnode):
456 456 node, p1, p2, deltabase, cs = headertuple
457 457 flags = 0
458 458 return node, p1, p2, deltabase, cs, flags
459 459
460 460 class cg3unpacker(cg2unpacker):
461 461 """Unpacker for cg3 streams.
462 462
463 463 cg3 streams add support for exchanging treemanifests and revlog
464 464 flags. It adds the revlog flags to the delta header and an empty chunk
465 465 separating manifests and files.
466 466 """
467 467 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
468 468 deltaheadersize = struct.calcsize(deltaheader)
469 469 version = '03'
470 470 _grouplistcount = 2 # One list of manifests and one list of files
471 471
472 472 def _deltaheader(self, headertuple, prevnode):
473 473 node, p1, p2, deltabase, cs, flags = headertuple
474 474 return node, p1, p2, deltabase, cs, flags
475 475
476 476 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
477 477 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog,
478 478 numchanges)
479 479 for chunkdata in iter(self.filelogheader, {}):
480 480 # If we get here, there are directory manifests in the changegroup
481 481 d = chunkdata["filename"]
482 482 repo.ui.debug("adding %s revisions\n" % d)
483 483 dirlog = repo.manifest.dirlog(d)
484 484 if not dirlog.addgroup(self, revmap, trp):
485 485 raise error.Abort(_("received dir revlog group is empty"))
486 486
487 487 class headerlessfixup(object):
488 488 def __init__(self, fh, h):
489 489 self._h = h
490 490 self._fh = fh
491 491 def read(self, n):
492 492 if self._h:
493 493 d, self._h = self._h[:n], self._h[n:]
494 494 if len(d) < n:
495 495 d += readexactly(self._fh, n - len(d))
496 496 return d
497 497 return readexactly(self._fh, n)
498 498
499 499 class cg1packer(object):
500 500 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
501 501 version = '01'
502 502 def __init__(self, repo, bundlecaps=None):
503 503 """Given a source repo, construct a bundler.
504 504
505 505 bundlecaps is optional and can be used to specify the set of
506 506 capabilities which can be used to build the bundle.
507 507 """
508 508 # Set of capabilities we can use to build the bundle.
509 509 if bundlecaps is None:
510 510 bundlecaps = set()
511 511 self._bundlecaps = bundlecaps
512 512 # experimental config: bundle.reorder
513 513 reorder = repo.ui.config('bundle', 'reorder', 'auto')
514 514 if reorder == 'auto':
515 515 reorder = None
516 516 else:
517 517 reorder = util.parsebool(reorder)
518 518 self._repo = repo
519 519 self._reorder = reorder
520 520 self._progress = repo.ui.progress
521 521 if self._repo.ui.verbose and not self._repo.ui.debugflag:
522 522 self._verbosenote = self._repo.ui.note
523 523 else:
524 524 self._verbosenote = lambda s: None
525 525
526 526 def close(self):
527 527 return closechunk()
528 528
529 529 def fileheader(self, fname):
530 530 return chunkheader(len(fname)) + fname
531 531
532 532 # Extracted both for clarity and for overriding in extensions.
533 533 def _sortgroup(self, revlog, nodelist, lookup):
534 534 """Sort nodes for change group and turn them into revnums."""
535 535 # for generaldelta revlogs, we linearize the revs; this will both be
536 536 # much quicker and generate a much smaller bundle
537 537 if (revlog._generaldelta and self._reorder is None) or self._reorder:
538 538 dag = dagutil.revlogdag(revlog)
539 539 return dag.linearize(set(revlog.rev(n) for n in nodelist))
540 540 else:
541 541 return sorted([revlog.rev(n) for n in nodelist])
542 542
543 543 def group(self, nodelist, revlog, lookup, units=None):
544 544 """Calculate a delta group, yielding a sequence of changegroup chunks
545 545 (strings).
546 546
547 547 Given a list of changeset revs, return a set of deltas and
548 548 metadata corresponding to nodes. The first delta is
549 549 first parent(nodelist[0]) -> nodelist[0], the receiver is
550 550 guaranteed to have this parent as it has all history before
551 551 these changesets. In the case firstparent is nullrev the
552 552 changegroup starts with a full revision.
553 553
554 554 If units is not None, progress detail will be generated, units specifies
555 555 the type of revlog that is touched (changelog, manifest, etc.).
556 556 """
557 557 # if we don't have any revisions touched by these changesets, bail
558 558 if len(nodelist) == 0:
559 559 yield self.close()
560 560 return
561 561
562 562 revs = self._sortgroup(revlog, nodelist, lookup)
563 563
564 564 # add the parent of the first rev
565 565 p = revlog.parentrevs(revs[0])[0]
566 566 revs.insert(0, p)
567 567
568 568 # build deltas
569 569 total = len(revs) - 1
570 570 msgbundling = _('bundling')
571 571 for r in xrange(len(revs) - 1):
572 572 if units is not None:
573 573 self._progress(msgbundling, r + 1, unit=units, total=total)
574 574 prev, curr = revs[r], revs[r + 1]
575 575 linknode = lookup(revlog.node(curr))
576 576 for c in self.revchunk(revlog, curr, prev, linknode):
577 577 yield c
578 578
579 579 if units is not None:
580 580 self._progress(msgbundling, None)
581 581 yield self.close()
582 582
583 583 # filter any nodes that claim to be part of the known set
584 584 def prune(self, revlog, missing, commonrevs):
585 585 rr, rl = revlog.rev, revlog.linkrev
586 586 return [n for n in missing if rl(rr(n)) not in commonrevs]
587 587
588 588 def _packmanifests(self, dir, mfnodes, lookuplinknode):
589 589 """Pack flat manifests into a changegroup stream."""
590 590 assert not dir
591 591 for chunk in self.group(mfnodes, self._repo.manifest,
592 592 lookuplinknode, units=_('manifests')):
593 593 yield chunk
594 594
595 595 def _manifestsdone(self):
596 596 return ''
597 597
598 598 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
599 599 '''yield a sequence of changegroup chunks (strings)'''
600 600 repo = self._repo
601 601 cl = repo.changelog
602 602
603 603 clrevorder = {}
604 604 mfs = {} # needed manifests
605 605 fnodes = {} # needed file nodes
606 606 changedfiles = set()
607 607
608 608 # Callback for the changelog, used to collect changed files and manifest
609 609 # nodes.
610 610 # Returns the linkrev node (identity in the changelog case).
611 611 def lookupcl(x):
612 612 c = cl.read(x)
613 613 clrevorder[x] = len(clrevorder)
614 614 n = c[0]
615 615 # record the first changeset introducing this manifest version
616 616 mfs.setdefault(n, x)
617 617 # Record a complete list of potentially-changed files in
618 618 # this manifest.
619 619 changedfiles.update(c[3])
620 620 return x
621 621
622 622 self._verbosenote(_('uncompressed size of bundle content:\n'))
623 623 size = 0
624 624 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
625 625 size += len(chunk)
626 626 yield chunk
627 627 self._verbosenote(_('%8.i (changelog)\n') % size)
628 628
629 629 # We need to make sure that the linkrev in the changegroup refers to
630 630 # the first changeset that introduced the manifest or file revision.
631 631 # The fastpath is usually safer than the slowpath, because the filelogs
632 632 # are walked in revlog order.
633 633 #
634 634 # When taking the slowpath with reorder=None and the manifest revlog
635 635 # uses generaldelta, the manifest may be walked in the "wrong" order.
636 636 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
637 637 # cc0ff93d0c0c).
638 638 #
639 639 # When taking the fastpath, we are only vulnerable to reordering
640 640 # of the changelog itself. The changelog never uses generaldelta, so
641 641 # it is only reordered when reorder=True. To handle this case, we
642 642 # simply take the slowpath, which already has the 'clrevorder' logic.
643 643 # This was also fixed in cc0ff93d0c0c.
644 644 fastpathlinkrev = fastpathlinkrev and not self._reorder
645 645 # Treemanifests don't work correctly with fastpathlinkrev
646 646 # either, because we don't discover which directory nodes to
647 647 # send along with files. This could probably be fixed.
648 648 fastpathlinkrev = fastpathlinkrev and (
649 649 'treemanifest' not in repo.requirements)
650 650
651 651 for chunk in self.generatemanifests(commonrevs, clrevorder,
652 652 fastpathlinkrev, mfs, fnodes):
653 653 yield chunk
654 654 mfs.clear()
655 655 clrevs = set(cl.rev(x) for x in clnodes)
656 656
657 657 if not fastpathlinkrev:
658 658 def linknodes(unused, fname):
659 659 return fnodes.get(fname, {})
660 660 else:
661 661 cln = cl.node
662 662 def linknodes(filerevlog, fname):
663 663 llr = filerevlog.linkrev
664 664 fln = filerevlog.node
665 665 revs = ((r, llr(r)) for r in filerevlog)
666 666 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
667 667
668 668 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
669 669 source):
670 670 yield chunk
671 671
672 672 yield self.close()
673 673
674 674 if clnodes:
675 675 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
676 676
677 677 def generatemanifests(self, commonrevs, clrevorder, fastpathlinkrev, mfs,
678 678 fnodes):
679 679 repo = self._repo
680 680 dirlog = repo.manifest.dirlog
681 681 tmfnodes = {'': mfs}
682 682
683 683 # Callback for the manifest, used to collect linkrevs for filelog
684 684 # revisions.
685 685 # Returns the linkrev node (collected in lookupcl).
686 686 def makelookupmflinknode(dir):
687 687 if fastpathlinkrev:
688 688 assert not dir
689 689 return mfs.__getitem__
690 690
691 691 def lookupmflinknode(x):
692 692 """Callback for looking up the linknode for manifests.
693 693
694 694 Returns the linkrev node for the specified manifest.
695 695
696 696 SIDE EFFECT:
697 697
698 698 1) fclnodes gets populated with the list of relevant
699 699 file nodes if we're not using fastpathlinkrev
700 700 2) When treemanifests are in use, collects treemanifest nodes
701 701 to send
702 702
703 703 Note that this means manifests must be completely sent to
704 704 the client before you can trust the list of files and
705 705 treemanifests to send.
706 706 """
707 707 clnode = tmfnodes[dir][x]
708 708 mdata = dirlog(dir).readshallowfast(x)
709 709 for p, n, fl in mdata.iterentries():
710 710 if fl == 't': # subdirectory manifest
711 711 subdir = dir + p + '/'
712 712 tmfclnodes = tmfnodes.setdefault(subdir, {})
713 713 tmfclnode = tmfclnodes.setdefault(n, clnode)
714 714 if clrevorder[clnode] < clrevorder[tmfclnode]:
715 715 tmfclnodes[n] = clnode
716 716 else:
717 717 f = dir + p
718 718 fclnodes = fnodes.setdefault(f, {})
719 719 fclnode = fclnodes.setdefault(n, clnode)
720 720 if clrevorder[clnode] < clrevorder[fclnode]:
721 721 fclnodes[n] = clnode
722 722 return clnode
723 723 return lookupmflinknode
724 724
725 725 size = 0
726 726 while tmfnodes:
727 727 dir = min(tmfnodes)
728 728 nodes = tmfnodes[dir]
729 729 prunednodes = self.prune(dirlog(dir), nodes, commonrevs)
730 730 if not dir or prunednodes:
731 731 for x in self._packmanifests(dir, prunednodes,
732 732 makelookupmflinknode(dir)):
733 733 size += len(x)
734 734 yield x
735 735 del tmfnodes[dir]
736 736 self._verbosenote(_('%8.i (manifests)\n') % size)
737 737 yield self._manifestsdone()
738 738
739 739 # The 'source' parameter is useful for extensions
740 740 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
741 741 repo = self._repo
742 742 progress = self._progress
743 743 msgbundling = _('bundling')
744 744
745 745 total = len(changedfiles)
746 746 # for progress output
747 747 msgfiles = _('files')
748 748 for i, fname in enumerate(sorted(changedfiles)):
749 749 filerevlog = repo.file(fname)
750 750 if not filerevlog:
751 751 raise error.Abort(_("empty or missing revlog for %s") % fname)
752 752
753 753 linkrevnodes = linknodes(filerevlog, fname)
754 754 # Lookup for filenodes, we collected the linkrev nodes above in the
755 755 # fastpath case and with lookupmf in the slowpath case.
756 756 def lookupfilelog(x):
757 757 return linkrevnodes[x]
758 758
759 759 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
760 760 if filenodes:
761 761 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
762 762 total=total)
763 763 h = self.fileheader(fname)
764 764 size = len(h)
765 765 yield h
766 766 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
767 767 size += len(chunk)
768 768 yield chunk
769 769 self._verbosenote(_('%8.i %s\n') % (size, fname))
770 770 progress(msgbundling, None)
771 771
772 772 def deltaparent(self, revlog, rev, p1, p2, prev):
773 773 return prev
774 774
775 775 def revchunk(self, revlog, rev, prev, linknode):
776 776 node = revlog.node(rev)
777 777 p1, p2 = revlog.parentrevs(rev)
778 778 base = self.deltaparent(revlog, rev, p1, p2, prev)
779 779
780 780 prefix = ''
781 781 if revlog.iscensored(base) or revlog.iscensored(rev):
782 782 try:
783 783 delta = revlog.revision(node)
784 784 except error.CensoredNodeError as e:
785 785 delta = e.tombstone
786 786 if base == nullrev:
787 787 prefix = mdiff.trivialdiffheader(len(delta))
788 788 else:
789 789 baselen = revlog.rawsize(base)
790 790 prefix = mdiff.replacediffheader(baselen, len(delta))
791 791 elif base == nullrev:
792 792 delta = revlog.revision(node)
793 793 prefix = mdiff.trivialdiffheader(len(delta))
794 794 else:
795 795 delta = revlog.revdiff(base, rev)
796 796 p1n, p2n = revlog.parents(node)
797 797 basenode = revlog.node(base)
798 798 flags = revlog.flags(rev)
799 799 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
800 800 meta += prefix
801 801 l = len(meta) + len(delta)
802 802 yield chunkheader(l)
803 803 yield meta
804 804 yield delta
805 805 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
806 806 # do nothing with basenode, it is implicitly the previous one in HG10
807 807 # do nothing with flags, it is implicitly 0 for cg1 and cg2
808 808 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
809 809
810 810 class cg2packer(cg1packer):
811 811 version = '02'
812 812 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
813 813
814 814 def __init__(self, repo, bundlecaps=None):
815 815 super(cg2packer, self).__init__(repo, bundlecaps)
816 816 if self._reorder is None:
817 817 # Since generaldelta is directly supported by cg2, reordering
818 818 # generally doesn't help, so we disable it by default (treating
819 819 # bundle.reorder=auto just like bundle.reorder=False).
820 820 self._reorder = False
821 821
822 822 def deltaparent(self, revlog, rev, p1, p2, prev):
823 823 dp = revlog.deltaparent(rev)
824 824 if dp == nullrev and revlog.storedeltachains:
825 825 # Avoid sending full revisions when delta parent is null. Pick prev
826 826 # in that case. It's tempting to pick p1 in this case, as p1 will
827 827 # be smaller in the common case. However, computing a delta against
828 828 # p1 may require resolving the raw text of p1, which could be
829 829 # expensive. The revlog caches should have prev cached, meaning
830 830 # less CPU for changegroup generation. There is likely room to add
831 831 # a flag and/or config option to control this behavior.
832 832 return prev
833 833 elif dp == nullrev:
834 834 # revlog is configured to use full snapshot for a reason,
835 835 # stick to full snapshot.
836 836 return nullrev
837 837 elif dp not in (p1, p2, prev):
838 838 # Pick prev when we can't be sure remote has the base revision.
839 839 return prev
840 840 else:
841 841 return dp
842 842
843 843 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
844 844 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
845 845 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
846 846
847 847 class cg3packer(cg2packer):
848 848 version = '03'
849 849 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
850 850
851 851 def _packmanifests(self, dir, mfnodes, lookuplinknode):
852 852 if dir:
853 853 yield self.fileheader(dir)
854 854 for chunk in self.group(mfnodes, self._repo.manifest.dirlog(dir),
855 855 lookuplinknode, units=_('manifests')):
856 856 yield chunk
857 857
858 858 def _manifestsdone(self):
859 859 return self.close()
860 860
861 861 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
862 862 return struct.pack(
863 863 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
864 864
865 865 _packermap = {'01': (cg1packer, cg1unpacker),
866 866 # cg2 adds support for exchanging generaldelta
867 867 '02': (cg2packer, cg2unpacker),
868 868 # cg3 adds support for exchanging revlog flags and treemanifests
869 869 '03': (cg3packer, cg3unpacker),
870 870 }
871 871
872 872 def allsupportedversions(ui):
873 873 versions = set(_packermap.keys())
874 874 versions.discard('03')
875 875 if (ui.configbool('experimental', 'changegroup3') or
876 876 ui.configbool('experimental', 'treemanifest')):
877 877 versions.add('03')
878 878 return versions
879 879
880 880 # Changegroup versions that can be applied to the repo
881 881 def supportedincomingversions(repo):
882 882 versions = allsupportedversions(repo.ui)
883 883 if 'treemanifest' in repo.requirements:
884 884 versions.add('03')
885 885 return versions
886 886
887 887 # Changegroup versions that can be created from the repo
888 888 def supportedoutgoingversions(repo):
889 889 versions = allsupportedversions(repo.ui)
890 890 if 'treemanifest' in repo.requirements:
891 891 # Versions 01 and 02 support only flat manifests and it's just too
892 892 # expensive to convert between the flat manifest and tree manifest on
893 893 # the fly. Since tree manifests are hashed differently, all of history
894 894 # would have to be converted. Instead, we simply don't even pretend to
895 895 # support versions 01 and 02.
896 896 versions.discard('01')
897 897 versions.discard('02')
898 898 versions.add('03')
899 899 return versions
900 900
901 901 def safeversion(repo):
902 902 # Finds the smallest version that it's safe to assume clients of the repo
903 903 # will support. For example, all hg versions that support generaldelta also
904 904 # support changegroup 02.
905 905 versions = supportedoutgoingversions(repo)
906 906 if 'generaldelta' in repo.requirements:
907 907 versions.discard('01')
908 908 assert versions
909 909 return min(versions)
910 910
911 911 def getbundler(version, repo, bundlecaps=None):
912 912 assert version in supportedoutgoingversions(repo)
913 913 return _packermap[version][0](repo, bundlecaps)
914 914
915 915 def getunbundler(version, fh, alg, extras=None):
916 916 return _packermap[version][1](fh, alg, extras=extras)
917 917
918 918 def _changegroupinfo(repo, nodes, source):
919 919 if repo.ui.verbose or source == 'bundle':
920 920 repo.ui.status(_("%d changesets found\n") % len(nodes))
921 921 if repo.ui.debugflag:
922 922 repo.ui.debug("list of changesets:\n")
923 923 for node in nodes:
924 924 repo.ui.debug("%s\n" % hex(node))
925 925
926 926 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
927 927 repo = repo.unfiltered()
928 928 commonrevs = outgoing.common
929 929 csets = outgoing.missing
930 930 heads = outgoing.missingheads
931 931 # We go through the fast path if we get told to, or if all (unfiltered
932 932 # heads have been requested (since we then know there all linkrevs will
933 933 # be pulled by the client).
934 934 heads.sort()
935 935 fastpathlinkrev = fastpath or (
936 936 repo.filtername is None and heads == sorted(repo.heads()))
937 937
938 938 repo.hook('preoutgoing', throw=True, source=source)
939 939 _changegroupinfo(repo, csets, source)
940 940 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
941 941
942 942 def getsubset(repo, outgoing, bundler, source, fastpath=False):
943 943 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
944 944 return getunbundler(bundler.version, util.chunkbuffer(gengroup), None,
945 945 {'clcount': len(outgoing.missing)})
946 946
947 947 def changegroupsubset(repo, roots, heads, source, version='01'):
948 948 """Compute a changegroup consisting of all the nodes that are
949 949 descendants of any of the roots and ancestors of any of the heads.
950 950 Return a chunkbuffer object whose read() method will return
951 951 successive changegroup chunks.
952 952
953 953 It is fairly complex as determining which filenodes and which
954 954 manifest nodes need to be included for the changeset to be complete
955 955 is non-trivial.
956 956
957 957 Another wrinkle is doing the reverse, figuring out which changeset in
958 958 the changegroup a particular filenode or manifestnode belongs to.
959 959 """
960 960 outgoing = discovery.outgoing(repo, missingroots=roots, missingheads=heads)
961 961 bundler = getbundler(version, repo)
962 962 return getsubset(repo, outgoing, bundler, source)
963 963
964 964 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
965 965 version='01'):
966 966 """Like getbundle, but taking a discovery.outgoing as an argument.
967 967
968 968 This is only implemented for local repos and reuses potentially
969 969 precomputed sets in outgoing. Returns a raw changegroup generator."""
970 970 if not outgoing.missing:
971 971 return None
972 972 bundler = getbundler(version, repo, bundlecaps)
973 973 return getsubsetraw(repo, outgoing, bundler, source)
974 974
975 975 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None,
976 976 version='01'):
977 977 """Like getbundle, but taking a discovery.outgoing as an argument.
978 978
979 979 This is only implemented for local repos and reuses potentially
980 980 precomputed sets in outgoing."""
981 981 if not outgoing.missing:
982 982 return None
983 983 bundler = getbundler(version, repo, bundlecaps)
984 984 return getsubset(repo, outgoing, bundler, source)
985 985
986 986 def getchangegroup(repo, source, outgoing, bundlecaps=None,
987 987 version='01'):
988 988 """Like changegroupsubset, but returns the set difference between the
989 989 ancestors of heads and the ancestors common.
990 990
991 991 If heads is None, use the local heads. If common is None, use [nullid].
992 992
993 993 The nodes in common might not all be known locally due to the way the
994 994 current discovery protocol works.
995 995 """
996 996 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps,
997 997 version=version)
998 998
999 999 def changegroup(repo, basenodes, source):
1000 1000 # to avoid a race we use changegroupsubset() (issue1320)
1001 1001 return changegroupsubset(repo, basenodes, repo.heads(), source)
1002 1002
1003 1003 def _addchangegroupfiles(repo, source, revmap, trp, expectedfiles, needfiles):
1004 1004 revisions = 0
1005 1005 files = 0
1006 1006 for chunkdata in iter(source.filelogheader, {}):
1007 1007 files += 1
1008 1008 f = chunkdata["filename"]
1009 1009 repo.ui.debug("adding %s revisions\n" % f)
1010 1010 repo.ui.progress(_('files'), files, unit=_('files'),
1011 1011 total=expectedfiles)
1012 1012 fl = repo.file(f)
1013 1013 o = len(fl)
1014 1014 try:
1015 1015 if not fl.addgroup(source, revmap, trp):
1016 1016 raise error.Abort(_("received file revlog group is empty"))
1017 1017 except error.CensoredBaseError as e:
1018 1018 raise error.Abort(_("received delta base is censored: %s") % e)
1019 1019 revisions += len(fl) - o
1020 1020 if f in needfiles:
1021 1021 needs = needfiles[f]
1022 1022 for new in xrange(o, len(fl)):
1023 1023 n = fl.node(new)
1024 1024 if n in needs:
1025 1025 needs.remove(n)
1026 1026 else:
1027 1027 raise error.Abort(
1028 1028 _("received spurious file revlog entry"))
1029 1029 if not needs:
1030 1030 del needfiles[f]
1031 1031 repo.ui.progress(_('files'), None)
1032 1032
1033 1033 for f, needs in needfiles.iteritems():
1034 1034 fl = repo.file(f)
1035 1035 for n in needs:
1036 1036 try:
1037 1037 fl.rev(n)
1038 1038 except error.LookupError:
1039 1039 raise error.Abort(
1040 1040 _('missing file data for %s:%s - run hg verify') %
1041 1041 (f, hex(n)))
1042 1042
1043 1043 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now