##// END OF EJS Templates
changegroup: mark cg1unpacker.chunklength as private
Augie Fackler -
r26707:5ee6bd52 default
parent child Browse files
Show More
@@ -1,914 +1,914 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullid,
19 19 nullrev,
20 20 short,
21 21 )
22 22
23 23 from . import (
24 24 branchmap,
25 25 dagutil,
26 26 discovery,
27 27 error,
28 28 mdiff,
29 29 phases,
30 30 util,
31 31 )
32 32
33 33 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
34 34 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
35 35
36 36 def readexactly(stream, n):
37 37 '''read n bytes from stream.read and abort if less was available'''
38 38 s = stream.read(n)
39 39 if len(s) < n:
40 40 raise error.Abort(_("stream ended unexpectedly"
41 41 " (got %d bytes, expected %d)")
42 42 % (len(s), n))
43 43 return s
44 44
45 45 def getchunk(stream):
46 46 """return the next chunk from stream as a string"""
47 47 d = readexactly(stream, 4)
48 48 l = struct.unpack(">l", d)[0]
49 49 if l <= 4:
50 50 if l:
51 51 raise error.Abort(_("invalid chunk length %d") % l)
52 52 return ""
53 53 return readexactly(stream, l - 4)
54 54
55 55 def chunkheader(length):
56 56 """return a changegroup chunk header (string)"""
57 57 return struct.pack(">l", length + 4)
58 58
59 59 def closechunk():
60 60 """return a changegroup chunk header (string) for a zero-length chunk"""
61 61 return struct.pack(">l", 0)
62 62
63 63 def combineresults(results):
64 64 """logic to combine 0 or more addchangegroup results into one"""
65 65 changedheads = 0
66 66 result = 1
67 67 for ret in results:
68 68 # If any changegroup result is 0, return 0
69 69 if ret == 0:
70 70 result = 0
71 71 break
72 72 if ret < -1:
73 73 changedheads += ret + 1
74 74 elif ret > 1:
75 75 changedheads += ret - 1
76 76 if changedheads > 0:
77 77 result = 1 + changedheads
78 78 elif changedheads < 0:
79 79 result = -1 + changedheads
80 80 return result
81 81
82 82 bundletypes = {
83 83 "": ("", None), # only when using unbundle on ssh and old http servers
84 84 # since the unification ssh accepts a header but there
85 85 # is no capability signaling it.
86 86 "HG20": (), # special-cased below
87 87 "HG10UN": ("HG10UN", None),
88 88 "HG10BZ": ("HG10", 'BZ'),
89 89 "HG10GZ": ("HG10GZ", 'GZ'),
90 90 }
91 91
92 92 # hgweb uses this list to communicate its preferred type
93 93 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
94 94
95 95 def writechunks(ui, chunks, filename, vfs=None):
96 96 """Write chunks to a file and return its filename.
97 97
98 98 The stream is assumed to be a bundle file.
99 99 Existing files will not be overwritten.
100 100 If no filename is specified, a temporary file is created.
101 101 """
102 102 fh = None
103 103 cleanup = None
104 104 try:
105 105 if filename:
106 106 if vfs:
107 107 fh = vfs.open(filename, "wb")
108 108 else:
109 109 fh = open(filename, "wb")
110 110 else:
111 111 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
112 112 fh = os.fdopen(fd, "wb")
113 113 cleanup = filename
114 114 for c in chunks:
115 115 fh.write(c)
116 116 cleanup = None
117 117 return filename
118 118 finally:
119 119 if fh is not None:
120 120 fh.close()
121 121 if cleanup is not None:
122 122 if filename and vfs:
123 123 vfs.unlink(cleanup)
124 124 else:
125 125 os.unlink(cleanup)
126 126
127 127 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
128 128 """Write a bundle file and return its filename.
129 129
130 130 Existing files will not be overwritten.
131 131 If no filename is specified, a temporary file is created.
132 132 bz2 compression can be turned off.
133 133 The bundle file will be deleted in case of errors.
134 134 """
135 135
136 136 if bundletype == "HG20":
137 137 from . import bundle2
138 138 bundle = bundle2.bundle20(ui)
139 139 bundle.setcompression(compression)
140 140 part = bundle.newpart('changegroup', data=cg.getchunks())
141 141 part.addparam('version', cg.version)
142 142 chunkiter = bundle.getchunks()
143 143 else:
144 144 # compression argument is only for the bundle2 case
145 145 assert compression is None
146 146 if cg.version != '01':
147 147 raise error.Abort(_('old bundle types only supports v1 '
148 148 'changegroups'))
149 149 header, comp = bundletypes[bundletype]
150 150 if comp not in util.compressors:
151 151 raise error.Abort(_('unknown stream compression type: %s')
152 152 % comp)
153 153 z = util.compressors[comp]()
154 154 subchunkiter = cg.getchunks()
155 155 def chunkiter():
156 156 yield header
157 157 for chunk in subchunkiter:
158 158 yield z.compress(chunk)
159 159 yield z.flush()
160 160 chunkiter = chunkiter()
161 161
162 162 # parse the changegroup data, otherwise we will block
163 163 # in case of sshrepo because we don't know the end of the stream
164 164
165 165 # an empty chunkgroup is the end of the changegroup
166 166 # a changegroup has at least 2 chunkgroups (changelog and manifest).
167 167 # after that, an empty chunkgroup is the end of the changegroup
168 168 return writechunks(ui, chunkiter, filename, vfs=vfs)
169 169
170 170 class cg1unpacker(object):
171 171 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
172 172 deltaheadersize = struct.calcsize(deltaheader)
173 173 version = '01'
174 174 def __init__(self, fh, alg):
175 175 if alg == 'UN':
176 176 alg = None # get more modern without breaking too much
177 177 if not alg in util.decompressors:
178 178 raise error.Abort(_('unknown stream compression type: %s')
179 179 % alg)
180 180 if alg == 'BZ':
181 181 alg = '_truncatedBZ'
182 182 self._stream = util.decompressors[alg](fh)
183 183 self._type = alg
184 184 self.callback = None
185 185
186 186 # These methods (compressed, read, seek, tell) all appear to only
187 187 # be used by bundlerepo, but it's a little hard to tell.
188 188 def compressed(self):
189 189 return self._type is not None
190 190 def read(self, l):
191 191 return self._stream.read(l)
192 192 def seek(self, pos):
193 193 return self._stream.seek(pos)
194 194 def tell(self):
195 195 return self._stream.tell()
196 196 def close(self):
197 197 return self._stream.close()
198 198
199 def chunklength(self):
199 def _chunklength(self):
200 200 d = readexactly(self._stream, 4)
201 201 l = struct.unpack(">l", d)[0]
202 202 if l <= 4:
203 203 if l:
204 204 raise error.Abort(_("invalid chunk length %d") % l)
205 205 return 0
206 206 if self.callback:
207 207 self.callback()
208 208 return l - 4
209 209
210 210 def changelogheader(self):
211 211 """v10 does not have a changelog header chunk"""
212 212 return {}
213 213
214 214 def manifestheader(self):
215 215 """v10 does not have a manifest header chunk"""
216 216 return {}
217 217
218 218 def filelogheader(self):
219 219 """return the header of the filelogs chunk, v10 only has the filename"""
220 l = self.chunklength()
220 l = self._chunklength()
221 221 if not l:
222 222 return {}
223 223 fname = readexactly(self._stream, l)
224 224 return {'filename': fname}
225 225
226 226 def _deltaheader(self, headertuple, prevnode):
227 227 node, p1, p2, cs = headertuple
228 228 if prevnode is None:
229 229 deltabase = p1
230 230 else:
231 231 deltabase = prevnode
232 232 return node, p1, p2, deltabase, cs
233 233
234 234 def deltachunk(self, prevnode):
235 l = self.chunklength()
235 l = self._chunklength()
236 236 if not l:
237 237 return {}
238 238 headerdata = readexactly(self._stream, self.deltaheadersize)
239 239 header = struct.unpack(self.deltaheader, headerdata)
240 240 delta = readexactly(self._stream, l - self.deltaheadersize)
241 241 node, p1, p2, deltabase, cs = self._deltaheader(header, prevnode)
242 242 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
243 243 'deltabase': deltabase, 'delta': delta}
244 244
245 245 def getchunks(self):
246 246 """returns all the chunks contains in the bundle
247 247
248 248 Used when you need to forward the binary stream to a file or another
249 249 network API. To do so, it parse the changegroup data, otherwise it will
250 250 block in case of sshrepo because it don't know the end of the stream.
251 251 """
252 252 # an empty chunkgroup is the end of the changegroup
253 253 # a changegroup has at least 2 chunkgroups (changelog and manifest).
254 254 # after that, an empty chunkgroup is the end of the changegroup
255 255 empty = False
256 256 count = 0
257 257 while not empty or count <= 2:
258 258 empty = True
259 259 count += 1
260 260 while True:
261 261 chunk = getchunk(self)
262 262 if not chunk:
263 263 break
264 264 empty = False
265 265 yield chunkheader(len(chunk))
266 266 pos = 0
267 267 while pos < len(chunk):
268 268 next = pos + 2**20
269 269 yield chunk[pos:next]
270 270 pos = next
271 271 yield closechunk()
272 272
273 273 def apply(self, repo, srctype, url, emptyok=False,
274 274 targetphase=phases.draft, expectedtotal=None):
275 275 """Add the changegroup returned by source.read() to this repo.
276 276 srctype is a string like 'push', 'pull', or 'unbundle'. url is
277 277 the URL of the repo where this changegroup is coming from.
278 278
279 279 Return an integer summarizing the change to this repo:
280 280 - nothing changed or no source: 0
281 281 - more heads than before: 1+added heads (2..n)
282 282 - fewer heads than before: -1-removed heads (-2..-n)
283 283 - number of heads stays the same: 1
284 284 """
285 285 repo = repo.unfiltered()
286 286 def csmap(x):
287 287 repo.ui.debug("add changeset %s\n" % short(x))
288 288 return len(cl)
289 289
290 290 def revmap(x):
291 291 return cl.rev(x)
292 292
293 293 changesets = files = revisions = 0
294 294
295 295 tr = repo.transaction("\n".join([srctype, util.hidepassword(url)]))
296 296 # The transaction could have been created before and already
297 297 # carries source information. In this case we use the top
298 298 # level data. We overwrite the argument because we need to use
299 299 # the top level value (if they exist) in this function.
300 300 srctype = tr.hookargs.setdefault('source', srctype)
301 301 url = tr.hookargs.setdefault('url', url)
302 302
303 303 # write changelog data to temp files so concurrent readers will not see
304 304 # inconsistent view
305 305 cl = repo.changelog
306 306 cl.delayupdate(tr)
307 307 oldheads = cl.heads()
308 308 try:
309 309 repo.hook('prechangegroup', throw=True, **tr.hookargs)
310 310
311 311 trp = weakref.proxy(tr)
312 312 # pull off the changeset group
313 313 repo.ui.status(_("adding changesets\n"))
314 314 clstart = len(cl)
315 315 class prog(object):
316 316 def __init__(self, step, total):
317 317 self._step = step
318 318 self._total = total
319 319 self._count = 1
320 320 def __call__(self):
321 321 repo.ui.progress(self._step, self._count, unit=_('chunks'),
322 322 total=self._total)
323 323 self._count += 1
324 324 self.callback = prog(_('changesets'), expectedtotal)
325 325
326 326 efiles = set()
327 327 def onchangelog(cl, node):
328 328 efiles.update(cl.read(node)[3])
329 329
330 330 self.changelogheader()
331 331 srccontent = cl.addgroup(self, csmap, trp,
332 332 addrevisioncb=onchangelog)
333 333 efiles = len(efiles)
334 334
335 335 if not (srccontent or emptyok):
336 336 raise error.Abort(_("received changelog group is empty"))
337 337 clend = len(cl)
338 338 changesets = clend - clstart
339 339 repo.ui.progress(_('changesets'), None)
340 340
341 341 # pull off the manifest group
342 342 repo.ui.status(_("adding manifests\n"))
343 343 # manifests <= changesets
344 344 self.callback = prog(_('manifests'), changesets)
345 345 # no need to check for empty manifest group here:
346 346 # if the result of the merge of 1 and 2 is the same in 3 and 4,
347 347 # no new manifest will be created and the manifest group will
348 348 # be empty during the pull
349 349 self.manifestheader()
350 350 repo.manifest.addgroup(self, revmap, trp)
351 351 repo.ui.progress(_('manifests'), None)
352 352
353 353 needfiles = {}
354 354 if repo.ui.configbool('server', 'validate', default=False):
355 355 # validate incoming csets have their manifests
356 356 for cset in xrange(clstart, clend):
357 357 mfnode = repo.changelog.read(repo.changelog.node(cset))[0]
358 358 mfest = repo.manifest.readdelta(mfnode)
359 359 # store file nodes we must see
360 360 for f, n in mfest.iteritems():
361 361 needfiles.setdefault(f, set()).add(n)
362 362
363 363 # process the files
364 364 repo.ui.status(_("adding file changes\n"))
365 365 self.callback = None
366 366 pr = prog(_('files'), efiles)
367 367 newrevs, newfiles = _addchangegroupfiles(
368 368 repo, self, revmap, trp, pr, needfiles)
369 369 revisions += newrevs
370 370 files += newfiles
371 371
372 372 dh = 0
373 373 if oldheads:
374 374 heads = cl.heads()
375 375 dh = len(heads) - len(oldheads)
376 376 for h in heads:
377 377 if h not in oldheads and repo[h].closesbranch():
378 378 dh -= 1
379 379 htext = ""
380 380 if dh:
381 381 htext = _(" (%+d heads)") % dh
382 382
383 383 repo.ui.status(_("added %d changesets"
384 384 " with %d changes to %d files%s\n")
385 385 % (changesets, revisions, files, htext))
386 386 repo.invalidatevolatilesets()
387 387
388 388 if changesets > 0:
389 389 p = lambda: tr.writepending() and repo.root or ""
390 390 if 'node' not in tr.hookargs:
391 391 tr.hookargs['node'] = hex(cl.node(clstart))
392 392 hookargs = dict(tr.hookargs)
393 393 else:
394 394 hookargs = dict(tr.hookargs)
395 395 hookargs['node'] = hex(cl.node(clstart))
396 396 repo.hook('pretxnchangegroup', throw=True, pending=p,
397 397 **hookargs)
398 398
399 399 added = [cl.node(r) for r in xrange(clstart, clend)]
400 400 publishing = repo.publishing()
401 401 if srctype in ('push', 'serve'):
402 402 # Old servers can not push the boundary themselves.
403 403 # New servers won't push the boundary if changeset already
404 404 # exists locally as secret
405 405 #
406 406 # We should not use added here but the list of all change in
407 407 # the bundle
408 408 if publishing:
409 409 phases.advanceboundary(repo, tr, phases.public, srccontent)
410 410 else:
411 411 # Those changesets have been pushed from the outside, their
412 412 # phases are going to be pushed alongside. Therefor
413 413 # `targetphase` is ignored.
414 414 phases.advanceboundary(repo, tr, phases.draft, srccontent)
415 415 phases.retractboundary(repo, tr, phases.draft, added)
416 416 elif srctype != 'strip':
417 417 # publishing only alter behavior during push
418 418 #
419 419 # strip should not touch boundary at all
420 420 phases.retractboundary(repo, tr, targetphase, added)
421 421
422 422 if changesets > 0:
423 423 if srctype != 'strip':
424 424 # During strip, branchcache is invalid but coming call to
425 425 # `destroyed` will repair it.
426 426 # In other case we can safely update cache on disk.
427 427 branchmap.updatecache(repo.filtered('served'))
428 428
429 429 def runhooks():
430 430 # These hooks run when the lock releases, not when the
431 431 # transaction closes. So it's possible for the changelog
432 432 # to have changed since we last saw it.
433 433 if clstart >= len(repo):
434 434 return
435 435
436 436 # forcefully update the on-disk branch cache
437 437 repo.ui.debug("updating the branch cache\n")
438 438 repo.hook("changegroup", **hookargs)
439 439
440 440 for n in added:
441 441 args = hookargs.copy()
442 442 args['node'] = hex(n)
443 443 repo.hook("incoming", **args)
444 444
445 445 newheads = [h for h in repo.heads() if h not in oldheads]
446 446 repo.ui.log("incoming",
447 447 "%s incoming changes - new heads: %s\n",
448 448 len(added),
449 449 ', '.join([hex(c[:6]) for c in newheads]))
450 450
451 451 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
452 452 lambda tr: repo._afterlock(runhooks))
453 453
454 454 tr.close()
455 455
456 456 finally:
457 457 tr.release()
458 458 repo.ui.flush()
459 459 # never return 0 here:
460 460 if dh < 0:
461 461 return dh - 1
462 462 else:
463 463 return dh + 1
464 464
465 465 class cg2unpacker(cg1unpacker):
466 466 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
467 467 deltaheadersize = struct.calcsize(deltaheader)
468 468 version = '02'
469 469
470 470 def _deltaheader(self, headertuple, prevnode):
471 471 node, p1, p2, deltabase, cs = headertuple
472 472 return node, p1, p2, deltabase, cs
473 473
474 474 class headerlessfixup(object):
475 475 def __init__(self, fh, h):
476 476 self._h = h
477 477 self._fh = fh
478 478 def read(self, n):
479 479 if self._h:
480 480 d, self._h = self._h[:n], self._h[n:]
481 481 if len(d) < n:
482 482 d += readexactly(self._fh, n - len(d))
483 483 return d
484 484 return readexactly(self._fh, n)
485 485
486 486 class cg1packer(object):
487 487 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
488 488 version = '01'
489 489 def __init__(self, repo, bundlecaps=None):
490 490 """Given a source repo, construct a bundler.
491 491
492 492 bundlecaps is optional and can be used to specify the set of
493 493 capabilities which can be used to build the bundle.
494 494 """
495 495 # Set of capabilities we can use to build the bundle.
496 496 if bundlecaps is None:
497 497 bundlecaps = set()
498 498 self._bundlecaps = bundlecaps
499 499 # experimental config: bundle.reorder
500 500 reorder = repo.ui.config('bundle', 'reorder', 'auto')
501 501 if reorder == 'auto':
502 502 reorder = None
503 503 else:
504 504 reorder = util.parsebool(reorder)
505 505 self._repo = repo
506 506 self._reorder = reorder
507 507 self._progress = repo.ui.progress
508 508 if self._repo.ui.verbose and not self._repo.ui.debugflag:
509 509 self._verbosenote = self._repo.ui.note
510 510 else:
511 511 self._verbosenote = lambda s: None
512 512
513 513 def close(self):
514 514 return closechunk()
515 515
516 516 def fileheader(self, fname):
517 517 return chunkheader(len(fname)) + fname
518 518
519 519 def group(self, nodelist, revlog, lookup, units=None):
520 520 """Calculate a delta group, yielding a sequence of changegroup chunks
521 521 (strings).
522 522
523 523 Given a list of changeset revs, return a set of deltas and
524 524 metadata corresponding to nodes. The first delta is
525 525 first parent(nodelist[0]) -> nodelist[0], the receiver is
526 526 guaranteed to have this parent as it has all history before
527 527 these changesets. In the case firstparent is nullrev the
528 528 changegroup starts with a full revision.
529 529
530 530 If units is not None, progress detail will be generated, units specifies
531 531 the type of revlog that is touched (changelog, manifest, etc.).
532 532 """
533 533 # if we don't have any revisions touched by these changesets, bail
534 534 if len(nodelist) == 0:
535 535 yield self.close()
536 536 return
537 537
538 538 # for generaldelta revlogs, we linearize the revs; this will both be
539 539 # much quicker and generate a much smaller bundle
540 540 if (revlog._generaldelta and self._reorder is None) or self._reorder:
541 541 dag = dagutil.revlogdag(revlog)
542 542 revs = set(revlog.rev(n) for n in nodelist)
543 543 revs = dag.linearize(revs)
544 544 else:
545 545 revs = sorted([revlog.rev(n) for n in nodelist])
546 546
547 547 # add the parent of the first rev
548 548 p = revlog.parentrevs(revs[0])[0]
549 549 revs.insert(0, p)
550 550
551 551 # build deltas
552 552 total = len(revs) - 1
553 553 msgbundling = _('bundling')
554 554 for r in xrange(len(revs) - 1):
555 555 if units is not None:
556 556 self._progress(msgbundling, r + 1, unit=units, total=total)
557 557 prev, curr = revs[r], revs[r + 1]
558 558 linknode = lookup(revlog.node(curr))
559 559 for c in self.revchunk(revlog, curr, prev, linknode):
560 560 yield c
561 561
562 562 if units is not None:
563 563 self._progress(msgbundling, None)
564 564 yield self.close()
565 565
566 566 # filter any nodes that claim to be part of the known set
567 567 def prune(self, revlog, missing, commonrevs):
568 568 rr, rl = revlog.rev, revlog.linkrev
569 569 return [n for n in missing if rl(rr(n)) not in commonrevs]
570 570
571 571 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
572 572 '''yield a sequence of changegroup chunks (strings)'''
573 573 repo = self._repo
574 574 cl = repo.changelog
575 575 ml = repo.manifest
576 576
577 577 clrevorder = {}
578 578 mfs = {} # needed manifests
579 579 fnodes = {} # needed file nodes
580 580 changedfiles = set()
581 581
582 582 # Callback for the changelog, used to collect changed files and manifest
583 583 # nodes.
584 584 # Returns the linkrev node (identity in the changelog case).
585 585 def lookupcl(x):
586 586 c = cl.read(x)
587 587 clrevorder[x] = len(clrevorder)
588 588 changedfiles.update(c[3])
589 589 # record the first changeset introducing this manifest version
590 590 mfs.setdefault(c[0], x)
591 591 return x
592 592
593 593 self._verbosenote(_('uncompressed size of bundle content:\n'))
594 594 size = 0
595 595 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
596 596 size += len(chunk)
597 597 yield chunk
598 598 self._verbosenote(_('%8.i (changelog)\n') % size)
599 599
600 600 # We need to make sure that the linkrev in the changegroup refers to
601 601 # the first changeset that introduced the manifest or file revision.
602 602 # The fastpath is usually safer than the slowpath, because the filelogs
603 603 # are walked in revlog order.
604 604 #
605 605 # When taking the slowpath with reorder=None and the manifest revlog
606 606 # uses generaldelta, the manifest may be walked in the "wrong" order.
607 607 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
608 608 # cc0ff93d0c0c).
609 609 #
610 610 # When taking the fastpath, we are only vulnerable to reordering
611 611 # of the changelog itself. The changelog never uses generaldelta, so
612 612 # it is only reordered when reorder=True. To handle this case, we
613 613 # simply take the slowpath, which already has the 'clrevorder' logic.
614 614 # This was also fixed in cc0ff93d0c0c.
615 615 fastpathlinkrev = fastpathlinkrev and not self._reorder
616 616 # Callback for the manifest, used to collect linkrevs for filelog
617 617 # revisions.
618 618 # Returns the linkrev node (collected in lookupcl).
619 619 def lookupmf(x):
620 620 clnode = mfs[x]
621 621 if not fastpathlinkrev:
622 622 mdata = ml.readfast(x)
623 623 for f, n in mdata.iteritems():
624 624 if f in changedfiles:
625 625 # record the first changeset introducing this filelog
626 626 # version
627 627 fclnodes = fnodes.setdefault(f, {})
628 628 fclnode = fclnodes.setdefault(n, clnode)
629 629 if clrevorder[clnode] < clrevorder[fclnode]:
630 630 fclnodes[n] = clnode
631 631 return clnode
632 632
633 633 mfnodes = self.prune(ml, mfs, commonrevs)
634 634 size = 0
635 635 for chunk in self.group(mfnodes, ml, lookupmf, units=_('manifests')):
636 636 size += len(chunk)
637 637 yield chunk
638 638 self._verbosenote(_('%8.i (manifests)\n') % size)
639 639
640 640 mfs.clear()
641 641 clrevs = set(cl.rev(x) for x in clnodes)
642 642
643 643 def linknodes(filerevlog, fname):
644 644 if fastpathlinkrev:
645 645 llr = filerevlog.linkrev
646 646 def genfilenodes():
647 647 for r in filerevlog:
648 648 linkrev = llr(r)
649 649 if linkrev in clrevs:
650 650 yield filerevlog.node(r), cl.node(linkrev)
651 651 return dict(genfilenodes())
652 652 return fnodes.get(fname, {})
653 653
654 654 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
655 655 source):
656 656 yield chunk
657 657
658 658 yield self.close()
659 659
660 660 if clnodes:
661 661 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
662 662
663 663 # The 'source' parameter is useful for extensions
664 664 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
665 665 repo = self._repo
666 666 progress = self._progress
667 667 msgbundling = _('bundling')
668 668
669 669 total = len(changedfiles)
670 670 # for progress output
671 671 msgfiles = _('files')
672 672 for i, fname in enumerate(sorted(changedfiles)):
673 673 filerevlog = repo.file(fname)
674 674 if not filerevlog:
675 675 raise error.Abort(_("empty or missing revlog for %s") % fname)
676 676
677 677 linkrevnodes = linknodes(filerevlog, fname)
678 678 # Lookup for filenodes, we collected the linkrev nodes above in the
679 679 # fastpath case and with lookupmf in the slowpath case.
680 680 def lookupfilelog(x):
681 681 return linkrevnodes[x]
682 682
683 683 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
684 684 if filenodes:
685 685 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
686 686 total=total)
687 687 h = self.fileheader(fname)
688 688 size = len(h)
689 689 yield h
690 690 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
691 691 size += len(chunk)
692 692 yield chunk
693 693 self._verbosenote(_('%8.i %s\n') % (size, fname))
694 694 progress(msgbundling, None)
695 695
696 696 def deltaparent(self, revlog, rev, p1, p2, prev):
697 697 return prev
698 698
699 699 def revchunk(self, revlog, rev, prev, linknode):
700 700 node = revlog.node(rev)
701 701 p1, p2 = revlog.parentrevs(rev)
702 702 base = self.deltaparent(revlog, rev, p1, p2, prev)
703 703
704 704 prefix = ''
705 705 if revlog.iscensored(base) or revlog.iscensored(rev):
706 706 try:
707 707 delta = revlog.revision(node)
708 708 except error.CensoredNodeError as e:
709 709 delta = e.tombstone
710 710 if base == nullrev:
711 711 prefix = mdiff.trivialdiffheader(len(delta))
712 712 else:
713 713 baselen = revlog.rawsize(base)
714 714 prefix = mdiff.replacediffheader(baselen, len(delta))
715 715 elif base == nullrev:
716 716 delta = revlog.revision(node)
717 717 prefix = mdiff.trivialdiffheader(len(delta))
718 718 else:
719 719 delta = revlog.revdiff(base, rev)
720 720 p1n, p2n = revlog.parents(node)
721 721 basenode = revlog.node(base)
722 722 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode)
723 723 meta += prefix
724 724 l = len(meta) + len(delta)
725 725 yield chunkheader(l)
726 726 yield meta
727 727 yield delta
728 728 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
729 729 # do nothing with basenode, it is implicitly the previous one in HG10
730 730 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
731 731
732 732 class cg2packer(cg1packer):
733 733 version = '02'
734 734 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
735 735
736 736 def __init__(self, repo, bundlecaps=None):
737 737 super(cg2packer, self).__init__(repo, bundlecaps)
738 738 if self._reorder is None:
739 739 # Since generaldelta is directly supported by cg2, reordering
740 740 # generally doesn't help, so we disable it by default (treating
741 741 # bundle.reorder=auto just like bundle.reorder=False).
742 742 self._reorder = False
743 743
744 744 def deltaparent(self, revlog, rev, p1, p2, prev):
745 745 dp = revlog.deltaparent(rev)
746 746 # avoid storing full revisions; pick prev in those cases
747 747 # also pick prev when we can't be sure remote has dp
748 748 if dp == nullrev or (dp != p1 and dp != p2 and dp != prev):
749 749 return prev
750 750 return dp
751 751
752 752 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
753 753 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
754 754
755 755 packermap = {'01': (cg1packer, cg1unpacker),
756 756 '02': (cg2packer, cg2unpacker)}
757 757
758 758 def _changegroupinfo(repo, nodes, source):
759 759 if repo.ui.verbose or source == 'bundle':
760 760 repo.ui.status(_("%d changesets found\n") % len(nodes))
761 761 if repo.ui.debugflag:
762 762 repo.ui.debug("list of changesets:\n")
763 763 for node in nodes:
764 764 repo.ui.debug("%s\n" % hex(node))
765 765
766 766 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
767 767 repo = repo.unfiltered()
768 768 commonrevs = outgoing.common
769 769 csets = outgoing.missing
770 770 heads = outgoing.missingheads
771 771 # We go through the fast path if we get told to, or if all (unfiltered
772 772 # heads have been requested (since we then know there all linkrevs will
773 773 # be pulled by the client).
774 774 heads.sort()
775 775 fastpathlinkrev = fastpath or (
776 776 repo.filtername is None and heads == sorted(repo.heads()))
777 777
778 778 repo.hook('preoutgoing', throw=True, source=source)
779 779 _changegroupinfo(repo, csets, source)
780 780 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
781 781
782 782 def getsubset(repo, outgoing, bundler, source, fastpath=False):
783 783 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
784 784 return packermap[bundler.version][1](util.chunkbuffer(gengroup), None)
785 785
786 786 def changegroupsubset(repo, roots, heads, source, version='01'):
787 787 """Compute a changegroup consisting of all the nodes that are
788 788 descendants of any of the roots and ancestors of any of the heads.
789 789 Return a chunkbuffer object whose read() method will return
790 790 successive changegroup chunks.
791 791
792 792 It is fairly complex as determining which filenodes and which
793 793 manifest nodes need to be included for the changeset to be complete
794 794 is non-trivial.
795 795
796 796 Another wrinkle is doing the reverse, figuring out which changeset in
797 797 the changegroup a particular filenode or manifestnode belongs to.
798 798 """
799 799 cl = repo.changelog
800 800 if not roots:
801 801 roots = [nullid]
802 802 discbases = []
803 803 for n in roots:
804 804 discbases.extend([p for p in cl.parents(n) if p != nullid])
805 805 # TODO: remove call to nodesbetween.
806 806 csets, roots, heads = cl.nodesbetween(roots, heads)
807 807 included = set(csets)
808 808 discbases = [n for n in discbases if n not in included]
809 809 outgoing = discovery.outgoing(cl, discbases, heads)
810 810 bundler = packermap[version][0](repo)
811 811 return getsubset(repo, outgoing, bundler, source)
812 812
813 813 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
814 814 version='01'):
815 815 """Like getbundle, but taking a discovery.outgoing as an argument.
816 816
817 817 This is only implemented for local repos and reuses potentially
818 818 precomputed sets in outgoing. Returns a raw changegroup generator."""
819 819 if not outgoing.missing:
820 820 return None
821 821 bundler = packermap[version][0](repo, bundlecaps)
822 822 return getsubsetraw(repo, outgoing, bundler, source)
823 823
824 824 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None,
825 825 version='01'):
826 826 """Like getbundle, but taking a discovery.outgoing as an argument.
827 827
828 828 This is only implemented for local repos and reuses potentially
829 829 precomputed sets in outgoing."""
830 830 if not outgoing.missing:
831 831 return None
832 832 bundler = packermap[version][0](repo, bundlecaps)
833 833 return getsubset(repo, outgoing, bundler, source)
834 834
835 835 def computeoutgoing(repo, heads, common):
836 836 """Computes which revs are outgoing given a set of common
837 837 and a set of heads.
838 838
839 839 This is a separate function so extensions can have access to
840 840 the logic.
841 841
842 842 Returns a discovery.outgoing object.
843 843 """
844 844 cl = repo.changelog
845 845 if common:
846 846 hasnode = cl.hasnode
847 847 common = [n for n in common if hasnode(n)]
848 848 else:
849 849 common = [nullid]
850 850 if not heads:
851 851 heads = cl.heads()
852 852 return discovery.outgoing(cl, common, heads)
853 853
854 854 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None,
855 855 version='01'):
856 856 """Like changegroupsubset, but returns the set difference between the
857 857 ancestors of heads and the ancestors common.
858 858
859 859 If heads is None, use the local heads. If common is None, use [nullid].
860 860
861 861 The nodes in common might not all be known locally due to the way the
862 862 current discovery protocol works.
863 863 """
864 864 outgoing = computeoutgoing(repo, heads, common)
865 865 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps,
866 866 version=version)
867 867
868 868 def changegroup(repo, basenodes, source):
869 869 # to avoid a race we use changegroupsubset() (issue1320)
870 870 return changegroupsubset(repo, basenodes, repo.heads(), source)
871 871
872 872 def _addchangegroupfiles(repo, source, revmap, trp, pr, needfiles):
873 873 revisions = 0
874 874 files = 0
875 875 while True:
876 876 chunkdata = source.filelogheader()
877 877 if not chunkdata:
878 878 break
879 879 f = chunkdata["filename"]
880 880 repo.ui.debug("adding %s revisions\n" % f)
881 881 pr()
882 882 fl = repo.file(f)
883 883 o = len(fl)
884 884 try:
885 885 if not fl.addgroup(source, revmap, trp):
886 886 raise error.Abort(_("received file revlog group is empty"))
887 887 except error.CensoredBaseError as e:
888 888 raise error.Abort(_("received delta base is censored: %s") % e)
889 889 revisions += len(fl) - o
890 890 files += 1
891 891 if f in needfiles:
892 892 needs = needfiles[f]
893 893 for new in xrange(o, len(fl)):
894 894 n = fl.node(new)
895 895 if n in needs:
896 896 needs.remove(n)
897 897 else:
898 898 raise error.Abort(
899 899 _("received spurious file revlog entry"))
900 900 if not needs:
901 901 del needfiles[f]
902 902 repo.ui.progress(_('files'), None)
903 903
904 904 for f, needs in needfiles.iteritems():
905 905 fl = repo.file(f)
906 906 for n in needs:
907 907 try:
908 908 fl.rev(n)
909 909 except error.LookupError:
910 910 raise error.Abort(
911 911 _('missing file data for %s:%s - run hg verify') %
912 912 (f, hex(n)))
913 913
914 914 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now