##// END OF EJS Templates
changegroup: extract method that sorts nodes to send...
Augie Fackler -
r29236:1b7d907e default
parent child Browse files
Show More
@@ -1,1058 +1,1062 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullid,
19 19 nullrev,
20 20 short,
21 21 )
22 22
23 23 from . import (
24 24 branchmap,
25 25 dagutil,
26 26 discovery,
27 27 error,
28 28 mdiff,
29 29 phases,
30 30 util,
31 31 )
32 32
33 33 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
34 34 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
35 35 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
36 36
37 37 def readexactly(stream, n):
38 38 '''read n bytes from stream.read and abort if less was available'''
39 39 s = stream.read(n)
40 40 if len(s) < n:
41 41 raise error.Abort(_("stream ended unexpectedly"
42 42 " (got %d bytes, expected %d)")
43 43 % (len(s), n))
44 44 return s
45 45
46 46 def getchunk(stream):
47 47 """return the next chunk from stream as a string"""
48 48 d = readexactly(stream, 4)
49 49 l = struct.unpack(">l", d)[0]
50 50 if l <= 4:
51 51 if l:
52 52 raise error.Abort(_("invalid chunk length %d") % l)
53 53 return ""
54 54 return readexactly(stream, l - 4)
55 55
56 56 def chunkheader(length):
57 57 """return a changegroup chunk header (string)"""
58 58 return struct.pack(">l", length + 4)
59 59
60 60 def closechunk():
61 61 """return a changegroup chunk header (string) for a zero-length chunk"""
62 62 return struct.pack(">l", 0)
63 63
64 64 def combineresults(results):
65 65 """logic to combine 0 or more addchangegroup results into one"""
66 66 changedheads = 0
67 67 result = 1
68 68 for ret in results:
69 69 # If any changegroup result is 0, return 0
70 70 if ret == 0:
71 71 result = 0
72 72 break
73 73 if ret < -1:
74 74 changedheads += ret + 1
75 75 elif ret > 1:
76 76 changedheads += ret - 1
77 77 if changedheads > 0:
78 78 result = 1 + changedheads
79 79 elif changedheads < 0:
80 80 result = -1 + changedheads
81 81 return result
82 82
83 83 def writechunks(ui, chunks, filename, vfs=None):
84 84 """Write chunks to a file and return its filename.
85 85
86 86 The stream is assumed to be a bundle file.
87 87 Existing files will not be overwritten.
88 88 If no filename is specified, a temporary file is created.
89 89 """
90 90 fh = None
91 91 cleanup = None
92 92 try:
93 93 if filename:
94 94 if vfs:
95 95 fh = vfs.open(filename, "wb")
96 96 else:
97 97 fh = open(filename, "wb")
98 98 else:
99 99 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
100 100 fh = os.fdopen(fd, "wb")
101 101 cleanup = filename
102 102 for c in chunks:
103 103 fh.write(c)
104 104 cleanup = None
105 105 return filename
106 106 finally:
107 107 if fh is not None:
108 108 fh.close()
109 109 if cleanup is not None:
110 110 if filename and vfs:
111 111 vfs.unlink(cleanup)
112 112 else:
113 113 os.unlink(cleanup)
114 114
115 115 class cg1unpacker(object):
116 116 """Unpacker for cg1 changegroup streams.
117 117
118 118 A changegroup unpacker handles the framing of the revision data in
119 119 the wire format. Most consumers will want to use the apply()
120 120 method to add the changes from the changegroup to a repository.
121 121
122 122 If you're forwarding a changegroup unmodified to another consumer,
123 123 use getchunks(), which returns an iterator of changegroup
124 124 chunks. This is mostly useful for cases where you need to know the
125 125 data stream has ended by observing the end of the changegroup.
126 126
127 127 deltachunk() is useful only if you're applying delta data. Most
128 128 consumers should prefer apply() instead.
129 129
130 130 A few other public methods exist. Those are used only for
131 131 bundlerepo and some debug commands - their use is discouraged.
132 132 """
133 133 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
134 134 deltaheadersize = struct.calcsize(deltaheader)
135 135 version = '01'
136 136 _grouplistcount = 1 # One list of files after the manifests
137 137
138 138 def __init__(self, fh, alg):
139 139 if alg == 'UN':
140 140 alg = None # get more modern without breaking too much
141 141 if not alg in util.decompressors:
142 142 raise error.Abort(_('unknown stream compression type: %s')
143 143 % alg)
144 144 if alg == 'BZ':
145 145 alg = '_truncatedBZ'
146 146 self._stream = util.decompressors[alg](fh)
147 147 self._type = alg
148 148 self.callback = None
149 149
150 150 # These methods (compressed, read, seek, tell) all appear to only
151 151 # be used by bundlerepo, but it's a little hard to tell.
152 152 def compressed(self):
153 153 return self._type is not None
154 154 def read(self, l):
155 155 return self._stream.read(l)
156 156 def seek(self, pos):
157 157 return self._stream.seek(pos)
158 158 def tell(self):
159 159 return self._stream.tell()
160 160 def close(self):
161 161 return self._stream.close()
162 162
163 163 def _chunklength(self):
164 164 d = readexactly(self._stream, 4)
165 165 l = struct.unpack(">l", d)[0]
166 166 if l <= 4:
167 167 if l:
168 168 raise error.Abort(_("invalid chunk length %d") % l)
169 169 return 0
170 170 if self.callback:
171 171 self.callback()
172 172 return l - 4
173 173
174 174 def changelogheader(self):
175 175 """v10 does not have a changelog header chunk"""
176 176 return {}
177 177
178 178 def manifestheader(self):
179 179 """v10 does not have a manifest header chunk"""
180 180 return {}
181 181
182 182 def filelogheader(self):
183 183 """return the header of the filelogs chunk, v10 only has the filename"""
184 184 l = self._chunklength()
185 185 if not l:
186 186 return {}
187 187 fname = readexactly(self._stream, l)
188 188 return {'filename': fname}
189 189
190 190 def _deltaheader(self, headertuple, prevnode):
191 191 node, p1, p2, cs = headertuple
192 192 if prevnode is None:
193 193 deltabase = p1
194 194 else:
195 195 deltabase = prevnode
196 196 flags = 0
197 197 return node, p1, p2, deltabase, cs, flags
198 198
199 199 def deltachunk(self, prevnode):
200 200 l = self._chunklength()
201 201 if not l:
202 202 return {}
203 203 headerdata = readexactly(self._stream, self.deltaheadersize)
204 204 header = struct.unpack(self.deltaheader, headerdata)
205 205 delta = readexactly(self._stream, l - self.deltaheadersize)
206 206 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
207 207 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
208 208 'deltabase': deltabase, 'delta': delta, 'flags': flags}
209 209
210 210 def getchunks(self):
211 211 """returns all the chunks contains in the bundle
212 212
213 213 Used when you need to forward the binary stream to a file or another
214 214 network API. To do so, it parse the changegroup data, otherwise it will
215 215 block in case of sshrepo because it don't know the end of the stream.
216 216 """
217 217 # an empty chunkgroup is the end of the changegroup
218 218 # a changegroup has at least 2 chunkgroups (changelog and manifest).
219 219 # after that, changegroup versions 1 and 2 have a series of groups
220 220 # with one group per file. changegroup 3 has a series of directory
221 221 # manifests before the files.
222 222 count = 0
223 223 emptycount = 0
224 224 while emptycount < self._grouplistcount:
225 225 empty = True
226 226 count += 1
227 227 while True:
228 228 chunk = getchunk(self)
229 229 if not chunk:
230 230 if empty and count > 2:
231 231 emptycount += 1
232 232 break
233 233 empty = False
234 234 yield chunkheader(len(chunk))
235 235 pos = 0
236 236 while pos < len(chunk):
237 237 next = pos + 2**20
238 238 yield chunk[pos:next]
239 239 pos = next
240 240 yield closechunk()
241 241
242 242 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
243 243 # We know that we'll never have more manifests than we had
244 244 # changesets.
245 245 self.callback = prog(_('manifests'), numchanges)
246 246 # no need to check for empty manifest group here:
247 247 # if the result of the merge of 1 and 2 is the same in 3 and 4,
248 248 # no new manifest will be created and the manifest group will
249 249 # be empty during the pull
250 250 self.manifestheader()
251 251 repo.manifest.addgroup(self, revmap, trp)
252 252 repo.ui.progress(_('manifests'), None)
253 253 self.callback = None
254 254
255 255 def apply(self, repo, srctype, url, emptyok=False,
256 256 targetphase=phases.draft, expectedtotal=None):
257 257 """Add the changegroup returned by source.read() to this repo.
258 258 srctype is a string like 'push', 'pull', or 'unbundle'. url is
259 259 the URL of the repo where this changegroup is coming from.
260 260
261 261 Return an integer summarizing the change to this repo:
262 262 - nothing changed or no source: 0
263 263 - more heads than before: 1+added heads (2..n)
264 264 - fewer heads than before: -1-removed heads (-2..-n)
265 265 - number of heads stays the same: 1
266 266 """
267 267 repo = repo.unfiltered()
268 268 def csmap(x):
269 269 repo.ui.debug("add changeset %s\n" % short(x))
270 270 return len(cl)
271 271
272 272 def revmap(x):
273 273 return cl.rev(x)
274 274
275 275 changesets = files = revisions = 0
276 276
277 277 try:
278 278 with repo.transaction("\n".join([srctype,
279 279 util.hidepassword(url)])) as tr:
280 280 # The transaction could have been created before and already
281 281 # carries source information. In this case we use the top
282 282 # level data. We overwrite the argument because we need to use
283 283 # the top level value (if they exist) in this function.
284 284 srctype = tr.hookargs.setdefault('source', srctype)
285 285 url = tr.hookargs.setdefault('url', url)
286 286 repo.hook('prechangegroup', throw=True, **tr.hookargs)
287 287
288 288 # write changelog data to temp files so concurrent readers
289 289 # will not see an inconsistent view
290 290 cl = repo.changelog
291 291 cl.delayupdate(tr)
292 292 oldheads = cl.heads()
293 293
294 294 trp = weakref.proxy(tr)
295 295 # pull off the changeset group
296 296 repo.ui.status(_("adding changesets\n"))
297 297 clstart = len(cl)
298 298 class prog(object):
299 299 def __init__(self, step, total):
300 300 self._step = step
301 301 self._total = total
302 302 self._count = 1
303 303 def __call__(self):
304 304 repo.ui.progress(self._step, self._count,
305 305 unit=_('chunks'), total=self._total)
306 306 self._count += 1
307 307 self.callback = prog(_('changesets'), expectedtotal)
308 308
309 309 efiles = set()
310 310 def onchangelog(cl, node):
311 311 efiles.update(cl.readfiles(node))
312 312
313 313 self.changelogheader()
314 314 srccontent = cl.addgroup(self, csmap, trp,
315 315 addrevisioncb=onchangelog)
316 316 efiles = len(efiles)
317 317
318 318 if not (srccontent or emptyok):
319 319 raise error.Abort(_("received changelog group is empty"))
320 320 clend = len(cl)
321 321 changesets = clend - clstart
322 322 repo.ui.progress(_('changesets'), None)
323 323 self.callback = None
324 324
325 325 # pull off the manifest group
326 326 repo.ui.status(_("adding manifests\n"))
327 327 self._unpackmanifests(repo, revmap, trp, prog, changesets)
328 328
329 329 needfiles = {}
330 330 if repo.ui.configbool('server', 'validate', default=False):
331 331 # validate incoming csets have their manifests
332 332 for cset in xrange(clstart, clend):
333 333 mfnode = repo.changelog.read(
334 334 repo.changelog.node(cset))[0]
335 335 mfest = repo.manifest.readdelta(mfnode)
336 336 # store file nodes we must see
337 337 for f, n in mfest.iteritems():
338 338 needfiles.setdefault(f, set()).add(n)
339 339
340 340 # process the files
341 341 repo.ui.status(_("adding file changes\n"))
342 342 newrevs, newfiles = _addchangegroupfiles(
343 343 repo, self, revmap, trp, efiles, needfiles)
344 344 revisions += newrevs
345 345 files += newfiles
346 346
347 347 dh = 0
348 348 if oldheads:
349 349 heads = cl.heads()
350 350 dh = len(heads) - len(oldheads)
351 351 for h in heads:
352 352 if h not in oldheads and repo[h].closesbranch():
353 353 dh -= 1
354 354 htext = ""
355 355 if dh:
356 356 htext = _(" (%+d heads)") % dh
357 357
358 358 repo.ui.status(_("added %d changesets"
359 359 " with %d changes to %d files%s\n")
360 360 % (changesets, revisions, files, htext))
361 361 repo.invalidatevolatilesets()
362 362
363 363 if changesets > 0:
364 364 if 'node' not in tr.hookargs:
365 365 tr.hookargs['node'] = hex(cl.node(clstart))
366 366 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
367 367 hookargs = dict(tr.hookargs)
368 368 else:
369 369 hookargs = dict(tr.hookargs)
370 370 hookargs['node'] = hex(cl.node(clstart))
371 371 hookargs['node_last'] = hex(cl.node(clend - 1))
372 372 repo.hook('pretxnchangegroup', throw=True, **hookargs)
373 373
374 374 added = [cl.node(r) for r in xrange(clstart, clend)]
375 375 publishing = repo.publishing()
376 376 if srctype in ('push', 'serve'):
377 377 # Old servers can not push the boundary themselves.
378 378 # New servers won't push the boundary if changeset already
379 379 # exists locally as secret
380 380 #
381 381 # We should not use added here but the list of all change in
382 382 # the bundle
383 383 if publishing:
384 384 phases.advanceboundary(repo, tr, phases.public,
385 385 srccontent)
386 386 else:
387 387 # Those changesets have been pushed from the
388 388 # outside, their phases are going to be pushed
389 389 # alongside. Therefor `targetphase` is
390 390 # ignored.
391 391 phases.advanceboundary(repo, tr, phases.draft,
392 392 srccontent)
393 393 phases.retractboundary(repo, tr, phases.draft, added)
394 394 elif srctype != 'strip':
395 395 # publishing only alter behavior during push
396 396 #
397 397 # strip should not touch boundary at all
398 398 phases.retractboundary(repo, tr, targetphase, added)
399 399
400 400 if changesets > 0:
401 401 if srctype != 'strip':
402 402 # During strip, branchcache is invalid but
403 403 # coming call to `destroyed` will repair it.
404 404 # In other case we can safely update cache on
405 405 # disk.
406 406 branchmap.updatecache(repo.filtered('served'))
407 407
408 408 def runhooks():
409 409 # These hooks run when the lock releases, not when the
410 410 # transaction closes. So it's possible for the changelog
411 411 # to have changed since we last saw it.
412 412 if clstart >= len(repo):
413 413 return
414 414
415 415 # forcefully update the on-disk branch cache
416 416 repo.ui.debug("updating the branch cache\n")
417 417 repo.hook("changegroup", **hookargs)
418 418
419 419 for n in added:
420 420 args = hookargs.copy()
421 421 args['node'] = hex(n)
422 422 del args['node_last']
423 423 repo.hook("incoming", **args)
424 424
425 425 newheads = [h for h in repo.heads()
426 426 if h not in oldheads]
427 427 repo.ui.log("incoming",
428 428 "%s incoming changes - new heads: %s\n",
429 429 len(added),
430 430 ', '.join([hex(c[:6]) for c in newheads]))
431 431
432 432 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
433 433 lambda tr: repo._afterlock(runhooks))
434 434 finally:
435 435 repo.ui.flush()
436 436 # never return 0 here:
437 437 if dh < 0:
438 438 return dh - 1
439 439 else:
440 440 return dh + 1
441 441
442 442 class cg2unpacker(cg1unpacker):
443 443 """Unpacker for cg2 streams.
444 444
445 445 cg2 streams add support for generaldelta, so the delta header
446 446 format is slightly different. All other features about the data
447 447 remain the same.
448 448 """
449 449 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
450 450 deltaheadersize = struct.calcsize(deltaheader)
451 451 version = '02'
452 452
453 453 def _deltaheader(self, headertuple, prevnode):
454 454 node, p1, p2, deltabase, cs = headertuple
455 455 flags = 0
456 456 return node, p1, p2, deltabase, cs, flags
457 457
458 458 class cg3unpacker(cg2unpacker):
459 459 """Unpacker for cg3 streams.
460 460
461 461 cg3 streams add support for exchanging treemanifests and revlog
462 462 flags. It adds the revlog flags to the delta header and an empty chunk
463 463 separating manifests and files.
464 464 """
465 465 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
466 466 deltaheadersize = struct.calcsize(deltaheader)
467 467 version = '03'
468 468 _grouplistcount = 2 # One list of manifests and one list of files
469 469
470 470 def _deltaheader(self, headertuple, prevnode):
471 471 node, p1, p2, deltabase, cs, flags = headertuple
472 472 return node, p1, p2, deltabase, cs, flags
473 473
474 474 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
475 475 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog,
476 476 numchanges)
477 477 while True:
478 478 chunkdata = self.filelogheader()
479 479 if not chunkdata:
480 480 break
481 481 # If we get here, there are directory manifests in the changegroup
482 482 d = chunkdata["filename"]
483 483 repo.ui.debug("adding %s revisions\n" % d)
484 484 dirlog = repo.manifest.dirlog(d)
485 485 if not dirlog.addgroup(self, revmap, trp):
486 486 raise error.Abort(_("received dir revlog group is empty"))
487 487
488 488 class headerlessfixup(object):
489 489 def __init__(self, fh, h):
490 490 self._h = h
491 491 self._fh = fh
492 492 def read(self, n):
493 493 if self._h:
494 494 d, self._h = self._h[:n], self._h[n:]
495 495 if len(d) < n:
496 496 d += readexactly(self._fh, n - len(d))
497 497 return d
498 498 return readexactly(self._fh, n)
499 499
500 500 class cg1packer(object):
501 501 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
502 502 version = '01'
503 503 def __init__(self, repo, bundlecaps=None):
504 504 """Given a source repo, construct a bundler.
505 505
506 506 bundlecaps is optional and can be used to specify the set of
507 507 capabilities which can be used to build the bundle.
508 508 """
509 509 # Set of capabilities we can use to build the bundle.
510 510 if bundlecaps is None:
511 511 bundlecaps = set()
512 512 self._bundlecaps = bundlecaps
513 513 # experimental config: bundle.reorder
514 514 reorder = repo.ui.config('bundle', 'reorder', 'auto')
515 515 if reorder == 'auto':
516 516 reorder = None
517 517 else:
518 518 reorder = util.parsebool(reorder)
519 519 self._repo = repo
520 520 self._reorder = reorder
521 521 self._progress = repo.ui.progress
522 522 if self._repo.ui.verbose and not self._repo.ui.debugflag:
523 523 self._verbosenote = self._repo.ui.note
524 524 else:
525 525 self._verbosenote = lambda s: None
526 526
527 527 def close(self):
528 528 return closechunk()
529 529
530 530 def fileheader(self, fname):
531 531 return chunkheader(len(fname)) + fname
532 532
533 # Extracted both for clarity and for overriding in extensions.
534 def _sortgroup(self, revlog, nodelist, lookup):
535 """Sort nodes for change group and turn them into revnums."""
536 # for generaldelta revlogs, we linearize the revs; this will both be
537 # much quicker and generate a much smaller bundle
538 if (revlog._generaldelta and self._reorder is None) or self._reorder:
539 dag = dagutil.revlogdag(revlog)
540 return dag.linearize(set(revlog.rev(n) for n in nodelist))
541 else:
542 return sorted([revlog.rev(n) for n in nodelist])
543
533 544 def group(self, nodelist, revlog, lookup, units=None):
534 545 """Calculate a delta group, yielding a sequence of changegroup chunks
535 546 (strings).
536 547
537 548 Given a list of changeset revs, return a set of deltas and
538 549 metadata corresponding to nodes. The first delta is
539 550 first parent(nodelist[0]) -> nodelist[0], the receiver is
540 551 guaranteed to have this parent as it has all history before
541 552 these changesets. In the case firstparent is nullrev the
542 553 changegroup starts with a full revision.
543 554
544 555 If units is not None, progress detail will be generated, units specifies
545 556 the type of revlog that is touched (changelog, manifest, etc.).
546 557 """
547 558 # if we don't have any revisions touched by these changesets, bail
548 559 if len(nodelist) == 0:
549 560 yield self.close()
550 561 return
551 562
552 # for generaldelta revlogs, we linearize the revs; this will both be
553 # much quicker and generate a much smaller bundle
554 if (revlog._generaldelta and self._reorder is None) or self._reorder:
555 dag = dagutil.revlogdag(revlog)
556 revs = set(revlog.rev(n) for n in nodelist)
557 revs = dag.linearize(revs)
558 else:
559 revs = sorted([revlog.rev(n) for n in nodelist])
563 revs = self._sortgroup(revlog, nodelist, lookup)
560 564
561 565 # add the parent of the first rev
562 566 p = revlog.parentrevs(revs[0])[0]
563 567 revs.insert(0, p)
564 568
565 569 # build deltas
566 570 total = len(revs) - 1
567 571 msgbundling = _('bundling')
568 572 for r in xrange(len(revs) - 1):
569 573 if units is not None:
570 574 self._progress(msgbundling, r + 1, unit=units, total=total)
571 575 prev, curr = revs[r], revs[r + 1]
572 576 linknode = lookup(revlog.node(curr))
573 577 for c in self.revchunk(revlog, curr, prev, linknode):
574 578 yield c
575 579
576 580 if units is not None:
577 581 self._progress(msgbundling, None)
578 582 yield self.close()
579 583
580 584 # filter any nodes that claim to be part of the known set
581 585 def prune(self, revlog, missing, commonrevs):
582 586 rr, rl = revlog.rev, revlog.linkrev
583 587 return [n for n in missing if rl(rr(n)) not in commonrevs]
584 588
585 589 def _packmanifests(self, dir, mfnodes, lookuplinknode):
586 590 """Pack flat manifests into a changegroup stream."""
587 591 assert not dir
588 592 for chunk in self.group(mfnodes, self._repo.manifest,
589 593 lookuplinknode, units=_('manifests')):
590 594 yield chunk
591 595
592 596 def _manifestsdone(self):
593 597 return ''
594 598
595 599 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
596 600 '''yield a sequence of changegroup chunks (strings)'''
597 601 repo = self._repo
598 602 cl = repo.changelog
599 603
600 604 clrevorder = {}
601 605 mfs = {} # needed manifests
602 606 fnodes = {} # needed file nodes
603 607 changedfiles = set()
604 608
605 609 # Callback for the changelog, used to collect changed files and manifest
606 610 # nodes.
607 611 # Returns the linkrev node (identity in the changelog case).
608 612 def lookupcl(x):
609 613 c = cl.read(x)
610 614 clrevorder[x] = len(clrevorder)
611 615 n = c[0]
612 616 # record the first changeset introducing this manifest version
613 617 mfs.setdefault(n, x)
614 618 # Record a complete list of potentially-changed files in
615 619 # this manifest.
616 620 changedfiles.update(c[3])
617 621 return x
618 622
619 623 self._verbosenote(_('uncompressed size of bundle content:\n'))
620 624 size = 0
621 625 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
622 626 size += len(chunk)
623 627 yield chunk
624 628 self._verbosenote(_('%8.i (changelog)\n') % size)
625 629
626 630 # We need to make sure that the linkrev in the changegroup refers to
627 631 # the first changeset that introduced the manifest or file revision.
628 632 # The fastpath is usually safer than the slowpath, because the filelogs
629 633 # are walked in revlog order.
630 634 #
631 635 # When taking the slowpath with reorder=None and the manifest revlog
632 636 # uses generaldelta, the manifest may be walked in the "wrong" order.
633 637 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
634 638 # cc0ff93d0c0c).
635 639 #
636 640 # When taking the fastpath, we are only vulnerable to reordering
637 641 # of the changelog itself. The changelog never uses generaldelta, so
638 642 # it is only reordered when reorder=True. To handle this case, we
639 643 # simply take the slowpath, which already has the 'clrevorder' logic.
640 644 # This was also fixed in cc0ff93d0c0c.
641 645 fastpathlinkrev = fastpathlinkrev and not self._reorder
642 646 # Treemanifests don't work correctly with fastpathlinkrev
643 647 # either, because we don't discover which directory nodes to
644 648 # send along with files. This could probably be fixed.
645 649 fastpathlinkrev = fastpathlinkrev and (
646 650 'treemanifest' not in repo.requirements)
647 651
648 652 for chunk in self.generatemanifests(commonrevs, clrevorder,
649 653 fastpathlinkrev, mfs, fnodes):
650 654 yield chunk
651 655 mfs.clear()
652 656 clrevs = set(cl.rev(x) for x in clnodes)
653 657
654 658 if not fastpathlinkrev:
655 659 def linknodes(unused, fname):
656 660 return fnodes.get(fname, {})
657 661 else:
658 662 cln = cl.node
659 663 def linknodes(filerevlog, fname):
660 664 llr = filerevlog.linkrev
661 665 fln = filerevlog.node
662 666 revs = ((r, llr(r)) for r in filerevlog)
663 667 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
664 668
665 669 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
666 670 source):
667 671 yield chunk
668 672
669 673 yield self.close()
670 674
671 675 if clnodes:
672 676 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
673 677
674 678 def generatemanifests(self, commonrevs, clrevorder, fastpathlinkrev, mfs,
675 679 fnodes):
676 680 repo = self._repo
677 681 dirlog = repo.manifest.dirlog
678 682 tmfnodes = {'': mfs}
679 683
680 684 # Callback for the manifest, used to collect linkrevs for filelog
681 685 # revisions.
682 686 # Returns the linkrev node (collected in lookupcl).
683 687 def makelookupmflinknode(dir):
684 688 if fastpathlinkrev:
685 689 assert not dir
686 690 return mfs.__getitem__
687 691
688 692 def lookupmflinknode(x):
689 693 """Callback for looking up the linknode for manifests.
690 694
691 695 Returns the linkrev node for the specified manifest.
692 696
693 697 SIDE EFFECT:
694 698
695 699 1) fclnodes gets populated with the list of relevant
696 700 file nodes if we're not using fastpathlinkrev
697 701 2) When treemanifests are in use, collects treemanifest nodes
698 702 to send
699 703
700 704 Note that this means manifests must be completely sent to
701 705 the client before you can trust the list of files and
702 706 treemanifests to send.
703 707 """
704 708 clnode = tmfnodes[dir][x]
705 709 mdata = dirlog(dir).readshallowfast(x)
706 710 for p, n, fl in mdata.iterentries():
707 711 if fl == 't': # subdirectory manifest
708 712 subdir = dir + p + '/'
709 713 tmfclnodes = tmfnodes.setdefault(subdir, {})
710 714 tmfclnode = tmfclnodes.setdefault(n, clnode)
711 715 if clrevorder[clnode] < clrevorder[tmfclnode]:
712 716 tmfclnodes[n] = clnode
713 717 else:
714 718 f = dir + p
715 719 fclnodes = fnodes.setdefault(f, {})
716 720 fclnode = fclnodes.setdefault(n, clnode)
717 721 if clrevorder[clnode] < clrevorder[fclnode]:
718 722 fclnodes[n] = clnode
719 723 return clnode
720 724 return lookupmflinknode
721 725
722 726 size = 0
723 727 while tmfnodes:
724 728 dir = min(tmfnodes)
725 729 nodes = tmfnodes[dir]
726 730 prunednodes = self.prune(dirlog(dir), nodes, commonrevs)
727 731 for x in self._packmanifests(dir, prunednodes,
728 732 makelookupmflinknode(dir)):
729 733 size += len(x)
730 734 yield x
731 735 del tmfnodes[dir]
732 736 self._verbosenote(_('%8.i (manifests)\n') % size)
733 737 yield self._manifestsdone()
734 738
735 739 # The 'source' parameter is useful for extensions
736 740 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
737 741 repo = self._repo
738 742 progress = self._progress
739 743 msgbundling = _('bundling')
740 744
741 745 total = len(changedfiles)
742 746 # for progress output
743 747 msgfiles = _('files')
744 748 for i, fname in enumerate(sorted(changedfiles)):
745 749 filerevlog = repo.file(fname)
746 750 if not filerevlog:
747 751 raise error.Abort(_("empty or missing revlog for %s") % fname)
748 752
749 753 linkrevnodes = linknodes(filerevlog, fname)
750 754 # Lookup for filenodes, we collected the linkrev nodes above in the
751 755 # fastpath case and with lookupmf in the slowpath case.
752 756 def lookupfilelog(x):
753 757 return linkrevnodes[x]
754 758
755 759 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
756 760 if filenodes:
757 761 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
758 762 total=total)
759 763 h = self.fileheader(fname)
760 764 size = len(h)
761 765 yield h
762 766 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
763 767 size += len(chunk)
764 768 yield chunk
765 769 self._verbosenote(_('%8.i %s\n') % (size, fname))
766 770 progress(msgbundling, None)
767 771
768 772 def deltaparent(self, revlog, rev, p1, p2, prev):
769 773 return prev
770 774
771 775 def revchunk(self, revlog, rev, prev, linknode):
772 776 node = revlog.node(rev)
773 777 p1, p2 = revlog.parentrevs(rev)
774 778 base = self.deltaparent(revlog, rev, p1, p2, prev)
775 779
776 780 prefix = ''
777 781 if revlog.iscensored(base) or revlog.iscensored(rev):
778 782 try:
779 783 delta = revlog.revision(node)
780 784 except error.CensoredNodeError as e:
781 785 delta = e.tombstone
782 786 if base == nullrev:
783 787 prefix = mdiff.trivialdiffheader(len(delta))
784 788 else:
785 789 baselen = revlog.rawsize(base)
786 790 prefix = mdiff.replacediffheader(baselen, len(delta))
787 791 elif base == nullrev:
788 792 delta = revlog.revision(node)
789 793 prefix = mdiff.trivialdiffheader(len(delta))
790 794 else:
791 795 delta = revlog.revdiff(base, rev)
792 796 p1n, p2n = revlog.parents(node)
793 797 basenode = revlog.node(base)
794 798 flags = revlog.flags(rev)
795 799 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
796 800 meta += prefix
797 801 l = len(meta) + len(delta)
798 802 yield chunkheader(l)
799 803 yield meta
800 804 yield delta
801 805 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
802 806 # do nothing with basenode, it is implicitly the previous one in HG10
803 807 # do nothing with flags, it is implicitly 0 for cg1 and cg2
804 808 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
805 809
806 810 class cg2packer(cg1packer):
807 811 version = '02'
808 812 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
809 813
810 814 def __init__(self, repo, bundlecaps=None):
811 815 super(cg2packer, self).__init__(repo, bundlecaps)
812 816 if self._reorder is None:
813 817 # Since generaldelta is directly supported by cg2, reordering
814 818 # generally doesn't help, so we disable it by default (treating
815 819 # bundle.reorder=auto just like bundle.reorder=False).
816 820 self._reorder = False
817 821
818 822 def deltaparent(self, revlog, rev, p1, p2, prev):
819 823 dp = revlog.deltaparent(rev)
820 824 # avoid storing full revisions; pick prev in those cases
821 825 # also pick prev when we can't be sure remote has dp
822 826 if dp == nullrev or (dp != p1 and dp != p2 and dp != prev):
823 827 return prev
824 828 return dp
825 829
826 830 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
827 831 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
828 832 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
829 833
830 834 class cg3packer(cg2packer):
831 835 version = '03'
832 836 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
833 837
834 838 def _packmanifests(self, dir, mfnodes, lookuplinknode):
835 839 if dir:
836 840 yield self.fileheader(dir)
837 841 for chunk in self.group(mfnodes, self._repo.manifest.dirlog(dir),
838 842 lookuplinknode, units=_('manifests')):
839 843 yield chunk
840 844
841 845 def _manifestsdone(self):
842 846 return self.close()
843 847
844 848 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
845 849 return struct.pack(
846 850 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
847 851
848 852 _packermap = {'01': (cg1packer, cg1unpacker),
849 853 # cg2 adds support for exchanging generaldelta
850 854 '02': (cg2packer, cg2unpacker),
851 855 # cg3 adds support for exchanging revlog flags and treemanifests
852 856 '03': (cg3packer, cg3unpacker),
853 857 }
854 858
855 859 def allsupportedversions(ui):
856 860 versions = set(_packermap.keys())
857 861 versions.discard('03')
858 862 if (ui.configbool('experimental', 'changegroup3') or
859 863 ui.configbool('experimental', 'treemanifest')):
860 864 versions.add('03')
861 865 return versions
862 866
863 867 # Changegroup versions that can be applied to the repo
864 868 def supportedincomingversions(repo):
865 869 versions = allsupportedversions(repo.ui)
866 870 if 'treemanifest' in repo.requirements:
867 871 versions.add('03')
868 872 return versions
869 873
870 874 # Changegroup versions that can be created from the repo
871 875 def supportedoutgoingversions(repo):
872 876 versions = allsupportedversions(repo.ui)
873 877 if 'treemanifest' in repo.requirements:
874 878 # Versions 01 and 02 support only flat manifests and it's just too
875 879 # expensive to convert between the flat manifest and tree manifest on
876 880 # the fly. Since tree manifests are hashed differently, all of history
877 881 # would have to be converted. Instead, we simply don't even pretend to
878 882 # support versions 01 and 02.
879 883 versions.discard('01')
880 884 versions.discard('02')
881 885 versions.add('03')
882 886 return versions
883 887
884 888 def safeversion(repo):
885 889 # Finds the smallest version that it's safe to assume clients of the repo
886 890 # will support. For example, all hg versions that support generaldelta also
887 891 # support changegroup 02.
888 892 versions = supportedoutgoingversions(repo)
889 893 if 'generaldelta' in repo.requirements:
890 894 versions.discard('01')
891 895 assert versions
892 896 return min(versions)
893 897
894 898 def getbundler(version, repo, bundlecaps=None):
895 899 assert version in supportedoutgoingversions(repo)
896 900 return _packermap[version][0](repo, bundlecaps)
897 901
898 902 def getunbundler(version, fh, alg):
899 903 return _packermap[version][1](fh, alg)
900 904
901 905 def _changegroupinfo(repo, nodes, source):
902 906 if repo.ui.verbose or source == 'bundle':
903 907 repo.ui.status(_("%d changesets found\n") % len(nodes))
904 908 if repo.ui.debugflag:
905 909 repo.ui.debug("list of changesets:\n")
906 910 for node in nodes:
907 911 repo.ui.debug("%s\n" % hex(node))
908 912
909 913 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
910 914 repo = repo.unfiltered()
911 915 commonrevs = outgoing.common
912 916 csets = outgoing.missing
913 917 heads = outgoing.missingheads
914 918 # We go through the fast path if we get told to, or if all (unfiltered
915 919 # heads have been requested (since we then know there all linkrevs will
916 920 # be pulled by the client).
917 921 heads.sort()
918 922 fastpathlinkrev = fastpath or (
919 923 repo.filtername is None and heads == sorted(repo.heads()))
920 924
921 925 repo.hook('preoutgoing', throw=True, source=source)
922 926 _changegroupinfo(repo, csets, source)
923 927 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
924 928
925 929 def getsubset(repo, outgoing, bundler, source, fastpath=False):
926 930 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
927 931 return getunbundler(bundler.version, util.chunkbuffer(gengroup), None)
928 932
929 933 def changegroupsubset(repo, roots, heads, source, version='01'):
930 934 """Compute a changegroup consisting of all the nodes that are
931 935 descendants of any of the roots and ancestors of any of the heads.
932 936 Return a chunkbuffer object whose read() method will return
933 937 successive changegroup chunks.
934 938
935 939 It is fairly complex as determining which filenodes and which
936 940 manifest nodes need to be included for the changeset to be complete
937 941 is non-trivial.
938 942
939 943 Another wrinkle is doing the reverse, figuring out which changeset in
940 944 the changegroup a particular filenode or manifestnode belongs to.
941 945 """
942 946 cl = repo.changelog
943 947 if not roots:
944 948 roots = [nullid]
945 949 discbases = []
946 950 for n in roots:
947 951 discbases.extend([p for p in cl.parents(n) if p != nullid])
948 952 # TODO: remove call to nodesbetween.
949 953 csets, roots, heads = cl.nodesbetween(roots, heads)
950 954 included = set(csets)
951 955 discbases = [n for n in discbases if n not in included]
952 956 outgoing = discovery.outgoing(cl, discbases, heads)
953 957 bundler = getbundler(version, repo)
954 958 return getsubset(repo, outgoing, bundler, source)
955 959
956 960 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
957 961 version='01'):
958 962 """Like getbundle, but taking a discovery.outgoing as an argument.
959 963
960 964 This is only implemented for local repos and reuses potentially
961 965 precomputed sets in outgoing. Returns a raw changegroup generator."""
962 966 if not outgoing.missing:
963 967 return None
964 968 bundler = getbundler(version, repo, bundlecaps)
965 969 return getsubsetraw(repo, outgoing, bundler, source)
966 970
967 971 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None,
968 972 version='01'):
969 973 """Like getbundle, but taking a discovery.outgoing as an argument.
970 974
971 975 This is only implemented for local repos and reuses potentially
972 976 precomputed sets in outgoing."""
973 977 if not outgoing.missing:
974 978 return None
975 979 bundler = getbundler(version, repo, bundlecaps)
976 980 return getsubset(repo, outgoing, bundler, source)
977 981
978 982 def computeoutgoing(repo, heads, common):
979 983 """Computes which revs are outgoing given a set of common
980 984 and a set of heads.
981 985
982 986 This is a separate function so extensions can have access to
983 987 the logic.
984 988
985 989 Returns a discovery.outgoing object.
986 990 """
987 991 cl = repo.changelog
988 992 if common:
989 993 hasnode = cl.hasnode
990 994 common = [n for n in common if hasnode(n)]
991 995 else:
992 996 common = [nullid]
993 997 if not heads:
994 998 heads = cl.heads()
995 999 return discovery.outgoing(cl, common, heads)
996 1000
997 1001 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None,
998 1002 version='01'):
999 1003 """Like changegroupsubset, but returns the set difference between the
1000 1004 ancestors of heads and the ancestors common.
1001 1005
1002 1006 If heads is None, use the local heads. If common is None, use [nullid].
1003 1007
1004 1008 The nodes in common might not all be known locally due to the way the
1005 1009 current discovery protocol works.
1006 1010 """
1007 1011 outgoing = computeoutgoing(repo, heads, common)
1008 1012 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps,
1009 1013 version=version)
1010 1014
1011 1015 def changegroup(repo, basenodes, source):
1012 1016 # to avoid a race we use changegroupsubset() (issue1320)
1013 1017 return changegroupsubset(repo, basenodes, repo.heads(), source)
1014 1018
1015 1019 def _addchangegroupfiles(repo, source, revmap, trp, expectedfiles, needfiles):
1016 1020 revisions = 0
1017 1021 files = 0
1018 1022 while True:
1019 1023 chunkdata = source.filelogheader()
1020 1024 if not chunkdata:
1021 1025 break
1022 1026 files += 1
1023 1027 f = chunkdata["filename"]
1024 1028 repo.ui.debug("adding %s revisions\n" % f)
1025 1029 repo.ui.progress(_('files'), files, unit=_('files'),
1026 1030 total=expectedfiles)
1027 1031 fl = repo.file(f)
1028 1032 o = len(fl)
1029 1033 try:
1030 1034 if not fl.addgroup(source, revmap, trp):
1031 1035 raise error.Abort(_("received file revlog group is empty"))
1032 1036 except error.CensoredBaseError as e:
1033 1037 raise error.Abort(_("received delta base is censored: %s") % e)
1034 1038 revisions += len(fl) - o
1035 1039 if f in needfiles:
1036 1040 needs = needfiles[f]
1037 1041 for new in xrange(o, len(fl)):
1038 1042 n = fl.node(new)
1039 1043 if n in needs:
1040 1044 needs.remove(n)
1041 1045 else:
1042 1046 raise error.Abort(
1043 1047 _("received spurious file revlog entry"))
1044 1048 if not needs:
1045 1049 del needfiles[f]
1046 1050 repo.ui.progress(_('files'), None)
1047 1051
1048 1052 for f, needs in needfiles.iteritems():
1049 1053 fl = repo.file(f)
1050 1054 for n in needs:
1051 1055 try:
1052 1056 fl.rev(n)
1053 1057 except error.LookupError:
1054 1058 raise error.Abort(
1055 1059 _('missing file data for %s:%s - run hg verify') %
1056 1060 (f, hex(n)))
1057 1061
1058 1062 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now