##// END OF EJS Templates
changegroup: use `iter(callable, sentinel)` instead of while True...
Augie Fackler -
r29724:4e7be6e3 default
parent child Browse files
Show More
@@ -1,1055 +1,1049 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullid,
19 19 nullrev,
20 20 short,
21 21 )
22 22
23 23 from . import (
24 24 branchmap,
25 25 dagutil,
26 26 discovery,
27 27 error,
28 28 mdiff,
29 29 phases,
30 30 util,
31 31 )
32 32
33 33 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
34 34 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
35 35 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
36 36
37 37 def readexactly(stream, n):
38 38 '''read n bytes from stream.read and abort if less was available'''
39 39 s = stream.read(n)
40 40 if len(s) < n:
41 41 raise error.Abort(_("stream ended unexpectedly"
42 42 " (got %d bytes, expected %d)")
43 43 % (len(s), n))
44 44 return s
45 45
46 46 def getchunk(stream):
47 47 """return the next chunk from stream as a string"""
48 48 d = readexactly(stream, 4)
49 49 l = struct.unpack(">l", d)[0]
50 50 if l <= 4:
51 51 if l:
52 52 raise error.Abort(_("invalid chunk length %d") % l)
53 53 return ""
54 54 return readexactly(stream, l - 4)
55 55
56 56 def chunkheader(length):
57 57 """return a changegroup chunk header (string)"""
58 58 return struct.pack(">l", length + 4)
59 59
60 60 def closechunk():
61 61 """return a changegroup chunk header (string) for a zero-length chunk"""
62 62 return struct.pack(">l", 0)
63 63
64 64 def combineresults(results):
65 65 """logic to combine 0 or more addchangegroup results into one"""
66 66 changedheads = 0
67 67 result = 1
68 68 for ret in results:
69 69 # If any changegroup result is 0, return 0
70 70 if ret == 0:
71 71 result = 0
72 72 break
73 73 if ret < -1:
74 74 changedheads += ret + 1
75 75 elif ret > 1:
76 76 changedheads += ret - 1
77 77 if changedheads > 0:
78 78 result = 1 + changedheads
79 79 elif changedheads < 0:
80 80 result = -1 + changedheads
81 81 return result
82 82
83 83 def writechunks(ui, chunks, filename, vfs=None):
84 84 """Write chunks to a file and return its filename.
85 85
86 86 The stream is assumed to be a bundle file.
87 87 Existing files will not be overwritten.
88 88 If no filename is specified, a temporary file is created.
89 89 """
90 90 fh = None
91 91 cleanup = None
92 92 try:
93 93 if filename:
94 94 if vfs:
95 95 fh = vfs.open(filename, "wb")
96 96 else:
97 97 fh = open(filename, "wb")
98 98 else:
99 99 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
100 100 fh = os.fdopen(fd, "wb")
101 101 cleanup = filename
102 102 for c in chunks:
103 103 fh.write(c)
104 104 cleanup = None
105 105 return filename
106 106 finally:
107 107 if fh is not None:
108 108 fh.close()
109 109 if cleanup is not None:
110 110 if filename and vfs:
111 111 vfs.unlink(cleanup)
112 112 else:
113 113 os.unlink(cleanup)
114 114
115 115 class cg1unpacker(object):
116 116 """Unpacker for cg1 changegroup streams.
117 117
118 118 A changegroup unpacker handles the framing of the revision data in
119 119 the wire format. Most consumers will want to use the apply()
120 120 method to add the changes from the changegroup to a repository.
121 121
122 122 If you're forwarding a changegroup unmodified to another consumer,
123 123 use getchunks(), which returns an iterator of changegroup
124 124 chunks. This is mostly useful for cases where you need to know the
125 125 data stream has ended by observing the end of the changegroup.
126 126
127 127 deltachunk() is useful only if you're applying delta data. Most
128 128 consumers should prefer apply() instead.
129 129
130 130 A few other public methods exist. Those are used only for
131 131 bundlerepo and some debug commands - their use is discouraged.
132 132 """
133 133 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
134 134 deltaheadersize = struct.calcsize(deltaheader)
135 135 version = '01'
136 136 _grouplistcount = 1 # One list of files after the manifests
137 137
138 138 def __init__(self, fh, alg, extras=None):
139 139 if alg == 'UN':
140 140 alg = None # get more modern without breaking too much
141 141 if not alg in util.decompressors:
142 142 raise error.Abort(_('unknown stream compression type: %s')
143 143 % alg)
144 144 if alg == 'BZ':
145 145 alg = '_truncatedBZ'
146 146 self._stream = util.decompressors[alg](fh)
147 147 self._type = alg
148 148 self.extras = extras or {}
149 149 self.callback = None
150 150
151 151 # These methods (compressed, read, seek, tell) all appear to only
152 152 # be used by bundlerepo, but it's a little hard to tell.
153 153 def compressed(self):
154 154 return self._type is not None
155 155 def read(self, l):
156 156 return self._stream.read(l)
157 157 def seek(self, pos):
158 158 return self._stream.seek(pos)
159 159 def tell(self):
160 160 return self._stream.tell()
161 161 def close(self):
162 162 return self._stream.close()
163 163
164 164 def _chunklength(self):
165 165 d = readexactly(self._stream, 4)
166 166 l = struct.unpack(">l", d)[0]
167 167 if l <= 4:
168 168 if l:
169 169 raise error.Abort(_("invalid chunk length %d") % l)
170 170 return 0
171 171 if self.callback:
172 172 self.callback()
173 173 return l - 4
174 174
175 175 def changelogheader(self):
176 176 """v10 does not have a changelog header chunk"""
177 177 return {}
178 178
179 179 def manifestheader(self):
180 180 """v10 does not have a manifest header chunk"""
181 181 return {}
182 182
183 183 def filelogheader(self):
184 184 """return the header of the filelogs chunk, v10 only has the filename"""
185 185 l = self._chunklength()
186 186 if not l:
187 187 return {}
188 188 fname = readexactly(self._stream, l)
189 189 return {'filename': fname}
190 190
191 191 def _deltaheader(self, headertuple, prevnode):
192 192 node, p1, p2, cs = headertuple
193 193 if prevnode is None:
194 194 deltabase = p1
195 195 else:
196 196 deltabase = prevnode
197 197 flags = 0
198 198 return node, p1, p2, deltabase, cs, flags
199 199
200 200 def deltachunk(self, prevnode):
201 201 l = self._chunklength()
202 202 if not l:
203 203 return {}
204 204 headerdata = readexactly(self._stream, self.deltaheadersize)
205 205 header = struct.unpack(self.deltaheader, headerdata)
206 206 delta = readexactly(self._stream, l - self.deltaheadersize)
207 207 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
208 208 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
209 209 'deltabase': deltabase, 'delta': delta, 'flags': flags}
210 210
211 211 def getchunks(self):
212 212 """returns all the chunks contains in the bundle
213 213
214 214 Used when you need to forward the binary stream to a file or another
215 215 network API. To do so, it parse the changegroup data, otherwise it will
216 216 block in case of sshrepo because it don't know the end of the stream.
217 217 """
218 218 # an empty chunkgroup is the end of the changegroup
219 219 # a changegroup has at least 2 chunkgroups (changelog and manifest).
220 220 # after that, changegroup versions 1 and 2 have a series of groups
221 221 # with one group per file. changegroup 3 has a series of directory
222 222 # manifests before the files.
223 223 count = 0
224 224 emptycount = 0
225 225 while emptycount < self._grouplistcount:
226 226 empty = True
227 227 count += 1
228 228 while True:
229 229 chunk = getchunk(self)
230 230 if not chunk:
231 231 if empty and count > 2:
232 232 emptycount += 1
233 233 break
234 234 empty = False
235 235 yield chunkheader(len(chunk))
236 236 pos = 0
237 237 while pos < len(chunk):
238 238 next = pos + 2**20
239 239 yield chunk[pos:next]
240 240 pos = next
241 241 yield closechunk()
242 242
243 243 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
244 244 # We know that we'll never have more manifests than we had
245 245 # changesets.
246 246 self.callback = prog(_('manifests'), numchanges)
247 247 # no need to check for empty manifest group here:
248 248 # if the result of the merge of 1 and 2 is the same in 3 and 4,
249 249 # no new manifest will be created and the manifest group will
250 250 # be empty during the pull
251 251 self.manifestheader()
252 252 repo.manifest.addgroup(self, revmap, trp)
253 253 repo.ui.progress(_('manifests'), None)
254 254 self.callback = None
255 255
256 256 def apply(self, repo, srctype, url, emptyok=False,
257 257 targetphase=phases.draft, expectedtotal=None):
258 258 """Add the changegroup returned by source.read() to this repo.
259 259 srctype is a string like 'push', 'pull', or 'unbundle'. url is
260 260 the URL of the repo where this changegroup is coming from.
261 261
262 262 Return an integer summarizing the change to this repo:
263 263 - nothing changed or no source: 0
264 264 - more heads than before: 1+added heads (2..n)
265 265 - fewer heads than before: -1-removed heads (-2..-n)
266 266 - number of heads stays the same: 1
267 267 """
268 268 repo = repo.unfiltered()
269 269 def csmap(x):
270 270 repo.ui.debug("add changeset %s\n" % short(x))
271 271 return len(cl)
272 272
273 273 def revmap(x):
274 274 return cl.rev(x)
275 275
276 276 changesets = files = revisions = 0
277 277
278 278 try:
279 279 with repo.transaction("\n".join([srctype,
280 280 util.hidepassword(url)])) as tr:
281 281 # The transaction could have been created before and already
282 282 # carries source information. In this case we use the top
283 283 # level data. We overwrite the argument because we need to use
284 284 # the top level value (if they exist) in this function.
285 285 srctype = tr.hookargs.setdefault('source', srctype)
286 286 url = tr.hookargs.setdefault('url', url)
287 287 repo.hook('prechangegroup', throw=True, **tr.hookargs)
288 288
289 289 # write changelog data to temp files so concurrent readers
290 290 # will not see an inconsistent view
291 291 cl = repo.changelog
292 292 cl.delayupdate(tr)
293 293 oldheads = cl.heads()
294 294
295 295 trp = weakref.proxy(tr)
296 296 # pull off the changeset group
297 297 repo.ui.status(_("adding changesets\n"))
298 298 clstart = len(cl)
299 299 class prog(object):
300 300 def __init__(self, step, total):
301 301 self._step = step
302 302 self._total = total
303 303 self._count = 1
304 304 def __call__(self):
305 305 repo.ui.progress(self._step, self._count,
306 306 unit=_('chunks'), total=self._total)
307 307 self._count += 1
308 308 self.callback = prog(_('changesets'), expectedtotal)
309 309
310 310 efiles = set()
311 311 def onchangelog(cl, node):
312 312 efiles.update(cl.readfiles(node))
313 313
314 314 self.changelogheader()
315 315 srccontent = cl.addgroup(self, csmap, trp,
316 316 addrevisioncb=onchangelog)
317 317 efiles = len(efiles)
318 318
319 319 if not (srccontent or emptyok):
320 320 raise error.Abort(_("received changelog group is empty"))
321 321 clend = len(cl)
322 322 changesets = clend - clstart
323 323 repo.ui.progress(_('changesets'), None)
324 324 self.callback = None
325 325
326 326 # pull off the manifest group
327 327 repo.ui.status(_("adding manifests\n"))
328 328 self._unpackmanifests(repo, revmap, trp, prog, changesets)
329 329
330 330 needfiles = {}
331 331 if repo.ui.configbool('server', 'validate', default=False):
332 332 # validate incoming csets have their manifests
333 333 for cset in xrange(clstart, clend):
334 334 mfnode = repo.changelog.read(
335 335 repo.changelog.node(cset))[0]
336 336 mfest = repo.manifest.readdelta(mfnode)
337 337 # store file nodes we must see
338 338 for f, n in mfest.iteritems():
339 339 needfiles.setdefault(f, set()).add(n)
340 340
341 341 # process the files
342 342 repo.ui.status(_("adding file changes\n"))
343 343 newrevs, newfiles = _addchangegroupfiles(
344 344 repo, self, revmap, trp, efiles, needfiles)
345 345 revisions += newrevs
346 346 files += newfiles
347 347
348 348 dh = 0
349 349 if oldheads:
350 350 heads = cl.heads()
351 351 dh = len(heads) - len(oldheads)
352 352 for h in heads:
353 353 if h not in oldheads and repo[h].closesbranch():
354 354 dh -= 1
355 355 htext = ""
356 356 if dh:
357 357 htext = _(" (%+d heads)") % dh
358 358
359 359 repo.ui.status(_("added %d changesets"
360 360 " with %d changes to %d files%s\n")
361 361 % (changesets, revisions, files, htext))
362 362 repo.invalidatevolatilesets()
363 363
364 364 if changesets > 0:
365 365 if 'node' not in tr.hookargs:
366 366 tr.hookargs['node'] = hex(cl.node(clstart))
367 367 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
368 368 hookargs = dict(tr.hookargs)
369 369 else:
370 370 hookargs = dict(tr.hookargs)
371 371 hookargs['node'] = hex(cl.node(clstart))
372 372 hookargs['node_last'] = hex(cl.node(clend - 1))
373 373 repo.hook('pretxnchangegroup', throw=True, **hookargs)
374 374
375 375 added = [cl.node(r) for r in xrange(clstart, clend)]
376 376 publishing = repo.publishing()
377 377 if srctype in ('push', 'serve'):
378 378 # Old servers can not push the boundary themselves.
379 379 # New servers won't push the boundary if changeset already
380 380 # exists locally as secret
381 381 #
382 382 # We should not use added here but the list of all change in
383 383 # the bundle
384 384 if publishing:
385 385 phases.advanceboundary(repo, tr, phases.public,
386 386 srccontent)
387 387 else:
388 388 # Those changesets have been pushed from the
389 389 # outside, their phases are going to be pushed
390 390 # alongside. Therefor `targetphase` is
391 391 # ignored.
392 392 phases.advanceboundary(repo, tr, phases.draft,
393 393 srccontent)
394 394 phases.retractboundary(repo, tr, phases.draft, added)
395 395 elif srctype != 'strip':
396 396 # publishing only alter behavior during push
397 397 #
398 398 # strip should not touch boundary at all
399 399 phases.retractboundary(repo, tr, targetphase, added)
400 400
401 401 if changesets > 0:
402 402 if srctype != 'strip':
403 403 # During strip, branchcache is invalid but
404 404 # coming call to `destroyed` will repair it.
405 405 # In other case we can safely update cache on
406 406 # disk.
407 407 branchmap.updatecache(repo.filtered('served'))
408 408
409 409 def runhooks():
410 410 # These hooks run when the lock releases, not when the
411 411 # transaction closes. So it's possible for the changelog
412 412 # to have changed since we last saw it.
413 413 if clstart >= len(repo):
414 414 return
415 415
416 416 # forcefully update the on-disk branch cache
417 417 repo.ui.debug("updating the branch cache\n")
418 418 repo.hook("changegroup", **hookargs)
419 419
420 420 for n in added:
421 421 args = hookargs.copy()
422 422 args['node'] = hex(n)
423 423 del args['node_last']
424 424 repo.hook("incoming", **args)
425 425
426 426 newheads = [h for h in repo.heads()
427 427 if h not in oldheads]
428 428 repo.ui.log("incoming",
429 429 "%s incoming changes - new heads: %s\n",
430 430 len(added),
431 431 ', '.join([hex(c[:6]) for c in newheads]))
432 432
433 433 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
434 434 lambda tr: repo._afterlock(runhooks))
435 435 finally:
436 436 repo.ui.flush()
437 437 # never return 0 here:
438 438 if dh < 0:
439 439 return dh - 1
440 440 else:
441 441 return dh + 1
442 442
443 443 class cg2unpacker(cg1unpacker):
444 444 """Unpacker for cg2 streams.
445 445
446 446 cg2 streams add support for generaldelta, so the delta header
447 447 format is slightly different. All other features about the data
448 448 remain the same.
449 449 """
450 450 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
451 451 deltaheadersize = struct.calcsize(deltaheader)
452 452 version = '02'
453 453
454 454 def _deltaheader(self, headertuple, prevnode):
455 455 node, p1, p2, deltabase, cs = headertuple
456 456 flags = 0
457 457 return node, p1, p2, deltabase, cs, flags
458 458
459 459 class cg3unpacker(cg2unpacker):
460 460 """Unpacker for cg3 streams.
461 461
462 462 cg3 streams add support for exchanging treemanifests and revlog
463 463 flags. It adds the revlog flags to the delta header and an empty chunk
464 464 separating manifests and files.
465 465 """
466 466 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
467 467 deltaheadersize = struct.calcsize(deltaheader)
468 468 version = '03'
469 469 _grouplistcount = 2 # One list of manifests and one list of files
470 470
471 471 def _deltaheader(self, headertuple, prevnode):
472 472 node, p1, p2, deltabase, cs, flags = headertuple
473 473 return node, p1, p2, deltabase, cs, flags
474 474
475 475 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
476 476 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog,
477 477 numchanges)
478 while True:
479 chunkdata = self.filelogheader()
480 if not chunkdata:
481 break
478 for chunkdata in iter(self.filelogheader, {}):
482 479 # If we get here, there are directory manifests in the changegroup
483 480 d = chunkdata["filename"]
484 481 repo.ui.debug("adding %s revisions\n" % d)
485 482 dirlog = repo.manifest.dirlog(d)
486 483 if not dirlog.addgroup(self, revmap, trp):
487 484 raise error.Abort(_("received dir revlog group is empty"))
488 485
489 486 class headerlessfixup(object):
490 487 def __init__(self, fh, h):
491 488 self._h = h
492 489 self._fh = fh
493 490 def read(self, n):
494 491 if self._h:
495 492 d, self._h = self._h[:n], self._h[n:]
496 493 if len(d) < n:
497 494 d += readexactly(self._fh, n - len(d))
498 495 return d
499 496 return readexactly(self._fh, n)
500 497
501 498 class cg1packer(object):
502 499 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
503 500 version = '01'
504 501 def __init__(self, repo, bundlecaps=None):
505 502 """Given a source repo, construct a bundler.
506 503
507 504 bundlecaps is optional and can be used to specify the set of
508 505 capabilities which can be used to build the bundle.
509 506 """
510 507 # Set of capabilities we can use to build the bundle.
511 508 if bundlecaps is None:
512 509 bundlecaps = set()
513 510 self._bundlecaps = bundlecaps
514 511 # experimental config: bundle.reorder
515 512 reorder = repo.ui.config('bundle', 'reorder', 'auto')
516 513 if reorder == 'auto':
517 514 reorder = None
518 515 else:
519 516 reorder = util.parsebool(reorder)
520 517 self._repo = repo
521 518 self._reorder = reorder
522 519 self._progress = repo.ui.progress
523 520 if self._repo.ui.verbose and not self._repo.ui.debugflag:
524 521 self._verbosenote = self._repo.ui.note
525 522 else:
526 523 self._verbosenote = lambda s: None
527 524
528 525 def close(self):
529 526 return closechunk()
530 527
531 528 def fileheader(self, fname):
532 529 return chunkheader(len(fname)) + fname
533 530
534 531 # Extracted both for clarity and for overriding in extensions.
535 532 def _sortgroup(self, revlog, nodelist, lookup):
536 533 """Sort nodes for change group and turn them into revnums."""
537 534 # for generaldelta revlogs, we linearize the revs; this will both be
538 535 # much quicker and generate a much smaller bundle
539 536 if (revlog._generaldelta and self._reorder is None) or self._reorder:
540 537 dag = dagutil.revlogdag(revlog)
541 538 return dag.linearize(set(revlog.rev(n) for n in nodelist))
542 539 else:
543 540 return sorted([revlog.rev(n) for n in nodelist])
544 541
545 542 def group(self, nodelist, revlog, lookup, units=None):
546 543 """Calculate a delta group, yielding a sequence of changegroup chunks
547 544 (strings).
548 545
549 546 Given a list of changeset revs, return a set of deltas and
550 547 metadata corresponding to nodes. The first delta is
551 548 first parent(nodelist[0]) -> nodelist[0], the receiver is
552 549 guaranteed to have this parent as it has all history before
553 550 these changesets. In the case firstparent is nullrev the
554 551 changegroup starts with a full revision.
555 552
556 553 If units is not None, progress detail will be generated, units specifies
557 554 the type of revlog that is touched (changelog, manifest, etc.).
558 555 """
559 556 # if we don't have any revisions touched by these changesets, bail
560 557 if len(nodelist) == 0:
561 558 yield self.close()
562 559 return
563 560
564 561 revs = self._sortgroup(revlog, nodelist, lookup)
565 562
566 563 # add the parent of the first rev
567 564 p = revlog.parentrevs(revs[0])[0]
568 565 revs.insert(0, p)
569 566
570 567 # build deltas
571 568 total = len(revs) - 1
572 569 msgbundling = _('bundling')
573 570 for r in xrange(len(revs) - 1):
574 571 if units is not None:
575 572 self._progress(msgbundling, r + 1, unit=units, total=total)
576 573 prev, curr = revs[r], revs[r + 1]
577 574 linknode = lookup(revlog.node(curr))
578 575 for c in self.revchunk(revlog, curr, prev, linknode):
579 576 yield c
580 577
581 578 if units is not None:
582 579 self._progress(msgbundling, None)
583 580 yield self.close()
584 581
585 582 # filter any nodes that claim to be part of the known set
586 583 def prune(self, revlog, missing, commonrevs):
587 584 rr, rl = revlog.rev, revlog.linkrev
588 585 return [n for n in missing if rl(rr(n)) not in commonrevs]
589 586
590 587 def _packmanifests(self, dir, mfnodes, lookuplinknode):
591 588 """Pack flat manifests into a changegroup stream."""
592 589 assert not dir
593 590 for chunk in self.group(mfnodes, self._repo.manifest,
594 591 lookuplinknode, units=_('manifests')):
595 592 yield chunk
596 593
597 594 def _manifestsdone(self):
598 595 return ''
599 596
600 597 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
601 598 '''yield a sequence of changegroup chunks (strings)'''
602 599 repo = self._repo
603 600 cl = repo.changelog
604 601
605 602 clrevorder = {}
606 603 mfs = {} # needed manifests
607 604 fnodes = {} # needed file nodes
608 605 changedfiles = set()
609 606
610 607 # Callback for the changelog, used to collect changed files and manifest
611 608 # nodes.
612 609 # Returns the linkrev node (identity in the changelog case).
613 610 def lookupcl(x):
614 611 c = cl.read(x)
615 612 clrevorder[x] = len(clrevorder)
616 613 n = c[0]
617 614 # record the first changeset introducing this manifest version
618 615 mfs.setdefault(n, x)
619 616 # Record a complete list of potentially-changed files in
620 617 # this manifest.
621 618 changedfiles.update(c[3])
622 619 return x
623 620
624 621 self._verbosenote(_('uncompressed size of bundle content:\n'))
625 622 size = 0
626 623 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
627 624 size += len(chunk)
628 625 yield chunk
629 626 self._verbosenote(_('%8.i (changelog)\n') % size)
630 627
631 628 # We need to make sure that the linkrev in the changegroup refers to
632 629 # the first changeset that introduced the manifest or file revision.
633 630 # The fastpath is usually safer than the slowpath, because the filelogs
634 631 # are walked in revlog order.
635 632 #
636 633 # When taking the slowpath with reorder=None and the manifest revlog
637 634 # uses generaldelta, the manifest may be walked in the "wrong" order.
638 635 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
639 636 # cc0ff93d0c0c).
640 637 #
641 638 # When taking the fastpath, we are only vulnerable to reordering
642 639 # of the changelog itself. The changelog never uses generaldelta, so
643 640 # it is only reordered when reorder=True. To handle this case, we
644 641 # simply take the slowpath, which already has the 'clrevorder' logic.
645 642 # This was also fixed in cc0ff93d0c0c.
646 643 fastpathlinkrev = fastpathlinkrev and not self._reorder
647 644 # Treemanifests don't work correctly with fastpathlinkrev
648 645 # either, because we don't discover which directory nodes to
649 646 # send along with files. This could probably be fixed.
650 647 fastpathlinkrev = fastpathlinkrev and (
651 648 'treemanifest' not in repo.requirements)
652 649
653 650 for chunk in self.generatemanifests(commonrevs, clrevorder,
654 651 fastpathlinkrev, mfs, fnodes):
655 652 yield chunk
656 653 mfs.clear()
657 654 clrevs = set(cl.rev(x) for x in clnodes)
658 655
659 656 if not fastpathlinkrev:
660 657 def linknodes(unused, fname):
661 658 return fnodes.get(fname, {})
662 659 else:
663 660 cln = cl.node
664 661 def linknodes(filerevlog, fname):
665 662 llr = filerevlog.linkrev
666 663 fln = filerevlog.node
667 664 revs = ((r, llr(r)) for r in filerevlog)
668 665 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
669 666
670 667 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
671 668 source):
672 669 yield chunk
673 670
674 671 yield self.close()
675 672
676 673 if clnodes:
677 674 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
678 675
679 676 def generatemanifests(self, commonrevs, clrevorder, fastpathlinkrev, mfs,
680 677 fnodes):
681 678 repo = self._repo
682 679 dirlog = repo.manifest.dirlog
683 680 tmfnodes = {'': mfs}
684 681
685 682 # Callback for the manifest, used to collect linkrevs for filelog
686 683 # revisions.
687 684 # Returns the linkrev node (collected in lookupcl).
688 685 def makelookupmflinknode(dir):
689 686 if fastpathlinkrev:
690 687 assert not dir
691 688 return mfs.__getitem__
692 689
693 690 def lookupmflinknode(x):
694 691 """Callback for looking up the linknode for manifests.
695 692
696 693 Returns the linkrev node for the specified manifest.
697 694
698 695 SIDE EFFECT:
699 696
700 697 1) fclnodes gets populated with the list of relevant
701 698 file nodes if we're not using fastpathlinkrev
702 699 2) When treemanifests are in use, collects treemanifest nodes
703 700 to send
704 701
705 702 Note that this means manifests must be completely sent to
706 703 the client before you can trust the list of files and
707 704 treemanifests to send.
708 705 """
709 706 clnode = tmfnodes[dir][x]
710 707 mdata = dirlog(dir).readshallowfast(x)
711 708 for p, n, fl in mdata.iterentries():
712 709 if fl == 't': # subdirectory manifest
713 710 subdir = dir + p + '/'
714 711 tmfclnodes = tmfnodes.setdefault(subdir, {})
715 712 tmfclnode = tmfclnodes.setdefault(n, clnode)
716 713 if clrevorder[clnode] < clrevorder[tmfclnode]:
717 714 tmfclnodes[n] = clnode
718 715 else:
719 716 f = dir + p
720 717 fclnodes = fnodes.setdefault(f, {})
721 718 fclnode = fclnodes.setdefault(n, clnode)
722 719 if clrevorder[clnode] < clrevorder[fclnode]:
723 720 fclnodes[n] = clnode
724 721 return clnode
725 722 return lookupmflinknode
726 723
727 724 size = 0
728 725 while tmfnodes:
729 726 dir = min(tmfnodes)
730 727 nodes = tmfnodes[dir]
731 728 prunednodes = self.prune(dirlog(dir), nodes, commonrevs)
732 729 if not dir or prunednodes:
733 730 for x in self._packmanifests(dir, prunednodes,
734 731 makelookupmflinknode(dir)):
735 732 size += len(x)
736 733 yield x
737 734 del tmfnodes[dir]
738 735 self._verbosenote(_('%8.i (manifests)\n') % size)
739 736 yield self._manifestsdone()
740 737
741 738 # The 'source' parameter is useful for extensions
742 739 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
743 740 repo = self._repo
744 741 progress = self._progress
745 742 msgbundling = _('bundling')
746 743
747 744 total = len(changedfiles)
748 745 # for progress output
749 746 msgfiles = _('files')
750 747 for i, fname in enumerate(sorted(changedfiles)):
751 748 filerevlog = repo.file(fname)
752 749 if not filerevlog:
753 750 raise error.Abort(_("empty or missing revlog for %s") % fname)
754 751
755 752 linkrevnodes = linknodes(filerevlog, fname)
756 753 # Lookup for filenodes, we collected the linkrev nodes above in the
757 754 # fastpath case and with lookupmf in the slowpath case.
758 755 def lookupfilelog(x):
759 756 return linkrevnodes[x]
760 757
761 758 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
762 759 if filenodes:
763 760 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
764 761 total=total)
765 762 h = self.fileheader(fname)
766 763 size = len(h)
767 764 yield h
768 765 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
769 766 size += len(chunk)
770 767 yield chunk
771 768 self._verbosenote(_('%8.i %s\n') % (size, fname))
772 769 progress(msgbundling, None)
773 770
774 771 def deltaparent(self, revlog, rev, p1, p2, prev):
775 772 return prev
776 773
777 774 def revchunk(self, revlog, rev, prev, linknode):
778 775 node = revlog.node(rev)
779 776 p1, p2 = revlog.parentrevs(rev)
780 777 base = self.deltaparent(revlog, rev, p1, p2, prev)
781 778
782 779 prefix = ''
783 780 if revlog.iscensored(base) or revlog.iscensored(rev):
784 781 try:
785 782 delta = revlog.revision(node)
786 783 except error.CensoredNodeError as e:
787 784 delta = e.tombstone
788 785 if base == nullrev:
789 786 prefix = mdiff.trivialdiffheader(len(delta))
790 787 else:
791 788 baselen = revlog.rawsize(base)
792 789 prefix = mdiff.replacediffheader(baselen, len(delta))
793 790 elif base == nullrev:
794 791 delta = revlog.revision(node)
795 792 prefix = mdiff.trivialdiffheader(len(delta))
796 793 else:
797 794 delta = revlog.revdiff(base, rev)
798 795 p1n, p2n = revlog.parents(node)
799 796 basenode = revlog.node(base)
800 797 flags = revlog.flags(rev)
801 798 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
802 799 meta += prefix
803 800 l = len(meta) + len(delta)
804 801 yield chunkheader(l)
805 802 yield meta
806 803 yield delta
807 804 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
808 805 # do nothing with basenode, it is implicitly the previous one in HG10
809 806 # do nothing with flags, it is implicitly 0 for cg1 and cg2
810 807 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
811 808
812 809 class cg2packer(cg1packer):
813 810 version = '02'
814 811 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
815 812
816 813 def __init__(self, repo, bundlecaps=None):
817 814 super(cg2packer, self).__init__(repo, bundlecaps)
818 815 if self._reorder is None:
819 816 # Since generaldelta is directly supported by cg2, reordering
820 817 # generally doesn't help, so we disable it by default (treating
821 818 # bundle.reorder=auto just like bundle.reorder=False).
822 819 self._reorder = False
823 820
824 821 def deltaparent(self, revlog, rev, p1, p2, prev):
825 822 dp = revlog.deltaparent(rev)
826 823 # avoid storing full revisions; pick prev in those cases
827 824 # also pick prev when we can't be sure remote has dp
828 825 if dp == nullrev or (dp != p1 and dp != p2 and dp != prev):
829 826 return prev
830 827 return dp
831 828
832 829 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
833 830 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
834 831 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
835 832
836 833 class cg3packer(cg2packer):
837 834 version = '03'
838 835 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
839 836
840 837 def _packmanifests(self, dir, mfnodes, lookuplinknode):
841 838 if dir:
842 839 yield self.fileheader(dir)
843 840 for chunk in self.group(mfnodes, self._repo.manifest.dirlog(dir),
844 841 lookuplinknode, units=_('manifests')):
845 842 yield chunk
846 843
847 844 def _manifestsdone(self):
848 845 return self.close()
849 846
850 847 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
851 848 return struct.pack(
852 849 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
853 850
854 851 _packermap = {'01': (cg1packer, cg1unpacker),
855 852 # cg2 adds support for exchanging generaldelta
856 853 '02': (cg2packer, cg2unpacker),
857 854 # cg3 adds support for exchanging revlog flags and treemanifests
858 855 '03': (cg3packer, cg3unpacker),
859 856 }
860 857
861 858 def allsupportedversions(ui):
862 859 versions = set(_packermap.keys())
863 860 versions.discard('03')
864 861 if (ui.configbool('experimental', 'changegroup3') or
865 862 ui.configbool('experimental', 'treemanifest')):
866 863 versions.add('03')
867 864 return versions
868 865
869 866 # Changegroup versions that can be applied to the repo
870 867 def supportedincomingversions(repo):
871 868 versions = allsupportedversions(repo.ui)
872 869 if 'treemanifest' in repo.requirements:
873 870 versions.add('03')
874 871 return versions
875 872
876 873 # Changegroup versions that can be created from the repo
877 874 def supportedoutgoingversions(repo):
878 875 versions = allsupportedversions(repo.ui)
879 876 if 'treemanifest' in repo.requirements:
880 877 # Versions 01 and 02 support only flat manifests and it's just too
881 878 # expensive to convert between the flat manifest and tree manifest on
882 879 # the fly. Since tree manifests are hashed differently, all of history
883 880 # would have to be converted. Instead, we simply don't even pretend to
884 881 # support versions 01 and 02.
885 882 versions.discard('01')
886 883 versions.discard('02')
887 884 versions.add('03')
888 885 return versions
889 886
890 887 def safeversion(repo):
891 888 # Finds the smallest version that it's safe to assume clients of the repo
892 889 # will support. For example, all hg versions that support generaldelta also
893 890 # support changegroup 02.
894 891 versions = supportedoutgoingversions(repo)
895 892 if 'generaldelta' in repo.requirements:
896 893 versions.discard('01')
897 894 assert versions
898 895 return min(versions)
899 896
900 897 def getbundler(version, repo, bundlecaps=None):
901 898 assert version in supportedoutgoingversions(repo)
902 899 return _packermap[version][0](repo, bundlecaps)
903 900
904 901 def getunbundler(version, fh, alg, extras=None):
905 902 return _packermap[version][1](fh, alg, extras=extras)
906 903
907 904 def _changegroupinfo(repo, nodes, source):
908 905 if repo.ui.verbose or source == 'bundle':
909 906 repo.ui.status(_("%d changesets found\n") % len(nodes))
910 907 if repo.ui.debugflag:
911 908 repo.ui.debug("list of changesets:\n")
912 909 for node in nodes:
913 910 repo.ui.debug("%s\n" % hex(node))
914 911
915 912 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
916 913 repo = repo.unfiltered()
917 914 commonrevs = outgoing.common
918 915 csets = outgoing.missing
919 916 heads = outgoing.missingheads
920 917 # We go through the fast path if we get told to, or if all (unfiltered
921 918 # heads have been requested (since we then know there all linkrevs will
922 919 # be pulled by the client).
923 920 heads.sort()
924 921 fastpathlinkrev = fastpath or (
925 922 repo.filtername is None and heads == sorted(repo.heads()))
926 923
927 924 repo.hook('preoutgoing', throw=True, source=source)
928 925 _changegroupinfo(repo, csets, source)
929 926 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
930 927
931 928 def getsubset(repo, outgoing, bundler, source, fastpath=False):
932 929 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
933 930 return getunbundler(bundler.version, util.chunkbuffer(gengroup), None,
934 931 {'clcount': len(outgoing.missing)})
935 932
936 933 def changegroupsubset(repo, roots, heads, source, version='01'):
937 934 """Compute a changegroup consisting of all the nodes that are
938 935 descendants of any of the roots and ancestors of any of the heads.
939 936 Return a chunkbuffer object whose read() method will return
940 937 successive changegroup chunks.
941 938
942 939 It is fairly complex as determining which filenodes and which
943 940 manifest nodes need to be included for the changeset to be complete
944 941 is non-trivial.
945 942
946 943 Another wrinkle is doing the reverse, figuring out which changeset in
947 944 the changegroup a particular filenode or manifestnode belongs to.
948 945 """
949 946 outgoing = discovery.outgoingbetween(repo, roots, heads)
950 947 bundler = getbundler(version, repo)
951 948 return getsubset(repo, outgoing, bundler, source)
952 949
953 950 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
954 951 version='01'):
955 952 """Like getbundle, but taking a discovery.outgoing as an argument.
956 953
957 954 This is only implemented for local repos and reuses potentially
958 955 precomputed sets in outgoing. Returns a raw changegroup generator."""
959 956 if not outgoing.missing:
960 957 return None
961 958 bundler = getbundler(version, repo, bundlecaps)
962 959 return getsubsetraw(repo, outgoing, bundler, source)
963 960
964 961 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None,
965 962 version='01'):
966 963 """Like getbundle, but taking a discovery.outgoing as an argument.
967 964
968 965 This is only implemented for local repos and reuses potentially
969 966 precomputed sets in outgoing."""
970 967 if not outgoing.missing:
971 968 return None
972 969 bundler = getbundler(version, repo, bundlecaps)
973 970 return getsubset(repo, outgoing, bundler, source)
974 971
975 972 def computeoutgoing(repo, heads, common):
976 973 """Computes which revs are outgoing given a set of common
977 974 and a set of heads.
978 975
979 976 This is a separate function so extensions can have access to
980 977 the logic.
981 978
982 979 Returns a discovery.outgoing object.
983 980 """
984 981 cl = repo.changelog
985 982 if common:
986 983 hasnode = cl.hasnode
987 984 common = [n for n in common if hasnode(n)]
988 985 else:
989 986 common = [nullid]
990 987 if not heads:
991 988 heads = cl.heads()
992 989 return discovery.outgoing(cl, common, heads)
993 990
994 991 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None,
995 992 version='01'):
996 993 """Like changegroupsubset, but returns the set difference between the
997 994 ancestors of heads and the ancestors common.
998 995
999 996 If heads is None, use the local heads. If common is None, use [nullid].
1000 997
1001 998 The nodes in common might not all be known locally due to the way the
1002 999 current discovery protocol works.
1003 1000 """
1004 1001 outgoing = computeoutgoing(repo, heads, common)
1005 1002 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps,
1006 1003 version=version)
1007 1004
1008 1005 def changegroup(repo, basenodes, source):
1009 1006 # to avoid a race we use changegroupsubset() (issue1320)
1010 1007 return changegroupsubset(repo, basenodes, repo.heads(), source)
1011 1008
1012 1009 def _addchangegroupfiles(repo, source, revmap, trp, expectedfiles, needfiles):
1013 1010 revisions = 0
1014 1011 files = 0
1015 while True:
1016 chunkdata = source.filelogheader()
1017 if not chunkdata:
1018 break
1012 for chunkdata in iter(source.filelogheader, {}):
1019 1013 files += 1
1020 1014 f = chunkdata["filename"]
1021 1015 repo.ui.debug("adding %s revisions\n" % f)
1022 1016 repo.ui.progress(_('files'), files, unit=_('files'),
1023 1017 total=expectedfiles)
1024 1018 fl = repo.file(f)
1025 1019 o = len(fl)
1026 1020 try:
1027 1021 if not fl.addgroup(source, revmap, trp):
1028 1022 raise error.Abort(_("received file revlog group is empty"))
1029 1023 except error.CensoredBaseError as e:
1030 1024 raise error.Abort(_("received delta base is censored: %s") % e)
1031 1025 revisions += len(fl) - o
1032 1026 if f in needfiles:
1033 1027 needs = needfiles[f]
1034 1028 for new in xrange(o, len(fl)):
1035 1029 n = fl.node(new)
1036 1030 if n in needs:
1037 1031 needs.remove(n)
1038 1032 else:
1039 1033 raise error.Abort(
1040 1034 _("received spurious file revlog entry"))
1041 1035 if not needs:
1042 1036 del needfiles[f]
1043 1037 repo.ui.progress(_('files'), None)
1044 1038
1045 1039 for f, needs in needfiles.iteritems():
1046 1040 fl = repo.file(f)
1047 1041 for n in needs:
1048 1042 try:
1049 1043 fl.rev(n)
1050 1044 except error.LookupError:
1051 1045 raise error.Abort(
1052 1046 _('missing file data for %s:%s - run hg verify') %
1053 1047 (f, hex(n)))
1054 1048
1055 1049 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now