##// END OF EJS Templates
changegroup: mark _addchangegroupfiles as module-private...
Augie Fackler -
r26704:d7e61451 default
parent child Browse files
Show More
@@ -1,911 +1,911
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullid,
19 19 nullrev,
20 20 short,
21 21 )
22 22
23 23 from . import (
24 24 branchmap,
25 25 dagutil,
26 26 discovery,
27 27 error,
28 28 mdiff,
29 29 phases,
30 30 util,
31 31 )
32 32
33 33 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
34 34 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
35 35
36 36 def readexactly(stream, n):
37 37 '''read n bytes from stream.read and abort if less was available'''
38 38 s = stream.read(n)
39 39 if len(s) < n:
40 40 raise error.Abort(_("stream ended unexpectedly"
41 41 " (got %d bytes, expected %d)")
42 42 % (len(s), n))
43 43 return s
44 44
45 45 def getchunk(stream):
46 46 """return the next chunk from stream as a string"""
47 47 d = readexactly(stream, 4)
48 48 l = struct.unpack(">l", d)[0]
49 49 if l <= 4:
50 50 if l:
51 51 raise error.Abort(_("invalid chunk length %d") % l)
52 52 return ""
53 53 return readexactly(stream, l - 4)
54 54
55 55 def chunkheader(length):
56 56 """return a changegroup chunk header (string)"""
57 57 return struct.pack(">l", length + 4)
58 58
59 59 def closechunk():
60 60 """return a changegroup chunk header (string) for a zero-length chunk"""
61 61 return struct.pack(">l", 0)
62 62
63 63 def combineresults(results):
64 64 """logic to combine 0 or more addchangegroup results into one"""
65 65 changedheads = 0
66 66 result = 1
67 67 for ret in results:
68 68 # If any changegroup result is 0, return 0
69 69 if ret == 0:
70 70 result = 0
71 71 break
72 72 if ret < -1:
73 73 changedheads += ret + 1
74 74 elif ret > 1:
75 75 changedheads += ret - 1
76 76 if changedheads > 0:
77 77 result = 1 + changedheads
78 78 elif changedheads < 0:
79 79 result = -1 + changedheads
80 80 return result
81 81
82 82 bundletypes = {
83 83 "": ("", None), # only when using unbundle on ssh and old http servers
84 84 # since the unification ssh accepts a header but there
85 85 # is no capability signaling it.
86 86 "HG20": (), # special-cased below
87 87 "HG10UN": ("HG10UN", None),
88 88 "HG10BZ": ("HG10", 'BZ'),
89 89 "HG10GZ": ("HG10GZ", 'GZ'),
90 90 }
91 91
92 92 # hgweb uses this list to communicate its preferred type
93 93 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
94 94
95 95 def writechunks(ui, chunks, filename, vfs=None):
96 96 """Write chunks to a file and return its filename.
97 97
98 98 The stream is assumed to be a bundle file.
99 99 Existing files will not be overwritten.
100 100 If no filename is specified, a temporary file is created.
101 101 """
102 102 fh = None
103 103 cleanup = None
104 104 try:
105 105 if filename:
106 106 if vfs:
107 107 fh = vfs.open(filename, "wb")
108 108 else:
109 109 fh = open(filename, "wb")
110 110 else:
111 111 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
112 112 fh = os.fdopen(fd, "wb")
113 113 cleanup = filename
114 114 for c in chunks:
115 115 fh.write(c)
116 116 cleanup = None
117 117 return filename
118 118 finally:
119 119 if fh is not None:
120 120 fh.close()
121 121 if cleanup is not None:
122 122 if filename and vfs:
123 123 vfs.unlink(cleanup)
124 124 else:
125 125 os.unlink(cleanup)
126 126
127 127 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
128 128 """Write a bundle file and return its filename.
129 129
130 130 Existing files will not be overwritten.
131 131 If no filename is specified, a temporary file is created.
132 132 bz2 compression can be turned off.
133 133 The bundle file will be deleted in case of errors.
134 134 """
135 135
136 136 if bundletype == "HG20":
137 137 from . import bundle2
138 138 bundle = bundle2.bundle20(ui)
139 139 bundle.setcompression(compression)
140 140 part = bundle.newpart('changegroup', data=cg.getchunks())
141 141 part.addparam('version', cg.version)
142 142 chunkiter = bundle.getchunks()
143 143 else:
144 144 # compression argument is only for the bundle2 case
145 145 assert compression is None
146 146 if cg.version != '01':
147 147 raise error.Abort(_('old bundle types only supports v1 '
148 148 'changegroups'))
149 149 header, comp = bundletypes[bundletype]
150 150 if comp not in util.compressors:
151 151 raise error.Abort(_('unknown stream compression type: %s')
152 152 % comp)
153 153 z = util.compressors[comp]()
154 154 subchunkiter = cg.getchunks()
155 155 def chunkiter():
156 156 yield header
157 157 for chunk in subchunkiter:
158 158 yield z.compress(chunk)
159 159 yield z.flush()
160 160 chunkiter = chunkiter()
161 161
162 162 # parse the changegroup data, otherwise we will block
163 163 # in case of sshrepo because we don't know the end of the stream
164 164
165 165 # an empty chunkgroup is the end of the changegroup
166 166 # a changegroup has at least 2 chunkgroups (changelog and manifest).
167 167 # after that, an empty chunkgroup is the end of the changegroup
168 168 return writechunks(ui, chunkiter, filename, vfs=vfs)
169 169
170 170 class cg1unpacker(object):
171 171 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
172 172 deltaheadersize = struct.calcsize(deltaheader)
173 173 version = '01'
174 174 def __init__(self, fh, alg):
175 175 if alg == 'UN':
176 176 alg = None # get more modern without breaking too much
177 177 if not alg in util.decompressors:
178 178 raise error.Abort(_('unknown stream compression type: %s')
179 179 % alg)
180 180 if alg == 'BZ':
181 181 alg = '_truncatedBZ'
182 182 self._stream = util.decompressors[alg](fh)
183 183 self._type = alg
184 184 self.callback = None
185 185 def compressed(self):
186 186 return self._type is not None
187 187 def read(self, l):
188 188 return self._stream.read(l)
189 189 def seek(self, pos):
190 190 return self._stream.seek(pos)
191 191 def tell(self):
192 192 return self._stream.tell()
193 193 def close(self):
194 194 return self._stream.close()
195 195
196 196 def chunklength(self):
197 197 d = readexactly(self._stream, 4)
198 198 l = struct.unpack(">l", d)[0]
199 199 if l <= 4:
200 200 if l:
201 201 raise error.Abort(_("invalid chunk length %d") % l)
202 202 return 0
203 203 if self.callback:
204 204 self.callback()
205 205 return l - 4
206 206
207 207 def changelogheader(self):
208 208 """v10 does not have a changelog header chunk"""
209 209 return {}
210 210
211 211 def manifestheader(self):
212 212 """v10 does not have a manifest header chunk"""
213 213 return {}
214 214
215 215 def filelogheader(self):
216 216 """return the header of the filelogs chunk, v10 only has the filename"""
217 217 l = self.chunklength()
218 218 if not l:
219 219 return {}
220 220 fname = readexactly(self._stream, l)
221 221 return {'filename': fname}
222 222
223 223 def _deltaheader(self, headertuple, prevnode):
224 224 node, p1, p2, cs = headertuple
225 225 if prevnode is None:
226 226 deltabase = p1
227 227 else:
228 228 deltabase = prevnode
229 229 return node, p1, p2, deltabase, cs
230 230
231 231 def deltachunk(self, prevnode):
232 232 l = self.chunklength()
233 233 if not l:
234 234 return {}
235 235 headerdata = readexactly(self._stream, self.deltaheadersize)
236 236 header = struct.unpack(self.deltaheader, headerdata)
237 237 delta = readexactly(self._stream, l - self.deltaheadersize)
238 238 node, p1, p2, deltabase, cs = self._deltaheader(header, prevnode)
239 239 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
240 240 'deltabase': deltabase, 'delta': delta}
241 241
242 242 def getchunks(self):
243 243 """returns all the chunks contains in the bundle
244 244
245 245 Used when you need to forward the binary stream to a file or another
246 246 network API. To do so, it parse the changegroup data, otherwise it will
247 247 block in case of sshrepo because it don't know the end of the stream.
248 248 """
249 249 # an empty chunkgroup is the end of the changegroup
250 250 # a changegroup has at least 2 chunkgroups (changelog and manifest).
251 251 # after that, an empty chunkgroup is the end of the changegroup
252 252 empty = False
253 253 count = 0
254 254 while not empty or count <= 2:
255 255 empty = True
256 256 count += 1
257 257 while True:
258 258 chunk = getchunk(self)
259 259 if not chunk:
260 260 break
261 261 empty = False
262 262 yield chunkheader(len(chunk))
263 263 pos = 0
264 264 while pos < len(chunk):
265 265 next = pos + 2**20
266 266 yield chunk[pos:next]
267 267 pos = next
268 268 yield closechunk()
269 269
270 270 def apply(self, repo, srctype, url, emptyok=False,
271 271 targetphase=phases.draft, expectedtotal=None):
272 272 """Add the changegroup returned by source.read() to this repo.
273 273 srctype is a string like 'push', 'pull', or 'unbundle'. url is
274 274 the URL of the repo where this changegroup is coming from.
275 275
276 276 Return an integer summarizing the change to this repo:
277 277 - nothing changed or no source: 0
278 278 - more heads than before: 1+added heads (2..n)
279 279 - fewer heads than before: -1-removed heads (-2..-n)
280 280 - number of heads stays the same: 1
281 281 """
282 282 repo = repo.unfiltered()
283 283 def csmap(x):
284 284 repo.ui.debug("add changeset %s\n" % short(x))
285 285 return len(cl)
286 286
287 287 def revmap(x):
288 288 return cl.rev(x)
289 289
290 290 changesets = files = revisions = 0
291 291
292 292 tr = repo.transaction("\n".join([srctype, util.hidepassword(url)]))
293 293 # The transaction could have been created before and already
294 294 # carries source information. In this case we use the top
295 295 # level data. We overwrite the argument because we need to use
296 296 # the top level value (if they exist) in this function.
297 297 srctype = tr.hookargs.setdefault('source', srctype)
298 298 url = tr.hookargs.setdefault('url', url)
299 299
300 300 # write changelog data to temp files so concurrent readers will not see
301 301 # inconsistent view
302 302 cl = repo.changelog
303 303 cl.delayupdate(tr)
304 304 oldheads = cl.heads()
305 305 try:
306 306 repo.hook('prechangegroup', throw=True, **tr.hookargs)
307 307
308 308 trp = weakref.proxy(tr)
309 309 # pull off the changeset group
310 310 repo.ui.status(_("adding changesets\n"))
311 311 clstart = len(cl)
312 312 class prog(object):
313 313 def __init__(self, step, total):
314 314 self._step = step
315 315 self._total = total
316 316 self._count = 1
317 317 def __call__(self):
318 318 repo.ui.progress(self._step, self._count, unit=_('chunks'),
319 319 total=self._total)
320 320 self._count += 1
321 321 self.callback = prog(_('changesets'), expectedtotal)
322 322
323 323 efiles = set()
324 324 def onchangelog(cl, node):
325 325 efiles.update(cl.read(node)[3])
326 326
327 327 self.changelogheader()
328 328 srccontent = cl.addgroup(self, csmap, trp,
329 329 addrevisioncb=onchangelog)
330 330 efiles = len(efiles)
331 331
332 332 if not (srccontent or emptyok):
333 333 raise error.Abort(_("received changelog group is empty"))
334 334 clend = len(cl)
335 335 changesets = clend - clstart
336 336 repo.ui.progress(_('changesets'), None)
337 337
338 338 # pull off the manifest group
339 339 repo.ui.status(_("adding manifests\n"))
340 340 # manifests <= changesets
341 341 self.callback = prog(_('manifests'), changesets)
342 342 # no need to check for empty manifest group here:
343 343 # if the result of the merge of 1 and 2 is the same in 3 and 4,
344 344 # no new manifest will be created and the manifest group will
345 345 # be empty during the pull
346 346 self.manifestheader()
347 347 repo.manifest.addgroup(self, revmap, trp)
348 348 repo.ui.progress(_('manifests'), None)
349 349
350 350 needfiles = {}
351 351 if repo.ui.configbool('server', 'validate', default=False):
352 352 # validate incoming csets have their manifests
353 353 for cset in xrange(clstart, clend):
354 354 mfnode = repo.changelog.read(repo.changelog.node(cset))[0]
355 355 mfest = repo.manifest.readdelta(mfnode)
356 356 # store file nodes we must see
357 357 for f, n in mfest.iteritems():
358 358 needfiles.setdefault(f, set()).add(n)
359 359
360 360 # process the files
361 361 repo.ui.status(_("adding file changes\n"))
362 362 self.callback = None
363 363 pr = prog(_('files'), efiles)
364 newrevs, newfiles = addchangegroupfiles(repo, self, revmap, trp, pr,
365 needfiles)
364 newrevs, newfiles = _addchangegroupfiles(
365 repo, self, revmap, trp, pr, needfiles)
366 366 revisions += newrevs
367 367 files += newfiles
368 368
369 369 dh = 0
370 370 if oldheads:
371 371 heads = cl.heads()
372 372 dh = len(heads) - len(oldheads)
373 373 for h in heads:
374 374 if h not in oldheads and repo[h].closesbranch():
375 375 dh -= 1
376 376 htext = ""
377 377 if dh:
378 378 htext = _(" (%+d heads)") % dh
379 379
380 380 repo.ui.status(_("added %d changesets"
381 381 " with %d changes to %d files%s\n")
382 382 % (changesets, revisions, files, htext))
383 383 repo.invalidatevolatilesets()
384 384
385 385 if changesets > 0:
386 386 p = lambda: tr.writepending() and repo.root or ""
387 387 if 'node' not in tr.hookargs:
388 388 tr.hookargs['node'] = hex(cl.node(clstart))
389 389 hookargs = dict(tr.hookargs)
390 390 else:
391 391 hookargs = dict(tr.hookargs)
392 392 hookargs['node'] = hex(cl.node(clstart))
393 393 repo.hook('pretxnchangegroup', throw=True, pending=p,
394 394 **hookargs)
395 395
396 396 added = [cl.node(r) for r in xrange(clstart, clend)]
397 397 publishing = repo.publishing()
398 398 if srctype in ('push', 'serve'):
399 399 # Old servers can not push the boundary themselves.
400 400 # New servers won't push the boundary if changeset already
401 401 # exists locally as secret
402 402 #
403 403 # We should not use added here but the list of all change in
404 404 # the bundle
405 405 if publishing:
406 406 phases.advanceboundary(repo, tr, phases.public, srccontent)
407 407 else:
408 408 # Those changesets have been pushed from the outside, their
409 409 # phases are going to be pushed alongside. Therefor
410 410 # `targetphase` is ignored.
411 411 phases.advanceboundary(repo, tr, phases.draft, srccontent)
412 412 phases.retractboundary(repo, tr, phases.draft, added)
413 413 elif srctype != 'strip':
414 414 # publishing only alter behavior during push
415 415 #
416 416 # strip should not touch boundary at all
417 417 phases.retractboundary(repo, tr, targetphase, added)
418 418
419 419 if changesets > 0:
420 420 if srctype != 'strip':
421 421 # During strip, branchcache is invalid but coming call to
422 422 # `destroyed` will repair it.
423 423 # In other case we can safely update cache on disk.
424 424 branchmap.updatecache(repo.filtered('served'))
425 425
426 426 def runhooks():
427 427 # These hooks run when the lock releases, not when the
428 428 # transaction closes. So it's possible for the changelog
429 429 # to have changed since we last saw it.
430 430 if clstart >= len(repo):
431 431 return
432 432
433 433 # forcefully update the on-disk branch cache
434 434 repo.ui.debug("updating the branch cache\n")
435 435 repo.hook("changegroup", **hookargs)
436 436
437 437 for n in added:
438 438 args = hookargs.copy()
439 439 args['node'] = hex(n)
440 440 repo.hook("incoming", **args)
441 441
442 442 newheads = [h for h in repo.heads() if h not in oldheads]
443 443 repo.ui.log("incoming",
444 444 "%s incoming changes - new heads: %s\n",
445 445 len(added),
446 446 ', '.join([hex(c[:6]) for c in newheads]))
447 447
448 448 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
449 449 lambda tr: repo._afterlock(runhooks))
450 450
451 451 tr.close()
452 452
453 453 finally:
454 454 tr.release()
455 455 repo.ui.flush()
456 456 # never return 0 here:
457 457 if dh < 0:
458 458 return dh - 1
459 459 else:
460 460 return dh + 1
461 461
462 462 class cg2unpacker(cg1unpacker):
463 463 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
464 464 deltaheadersize = struct.calcsize(deltaheader)
465 465 version = '02'
466 466
467 467 def _deltaheader(self, headertuple, prevnode):
468 468 node, p1, p2, deltabase, cs = headertuple
469 469 return node, p1, p2, deltabase, cs
470 470
471 471 class headerlessfixup(object):
472 472 def __init__(self, fh, h):
473 473 self._h = h
474 474 self._fh = fh
475 475 def read(self, n):
476 476 if self._h:
477 477 d, self._h = self._h[:n], self._h[n:]
478 478 if len(d) < n:
479 479 d += readexactly(self._fh, n - len(d))
480 480 return d
481 481 return readexactly(self._fh, n)
482 482
483 483 class cg1packer(object):
484 484 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
485 485 version = '01'
486 486 def __init__(self, repo, bundlecaps=None):
487 487 """Given a source repo, construct a bundler.
488 488
489 489 bundlecaps is optional and can be used to specify the set of
490 490 capabilities which can be used to build the bundle.
491 491 """
492 492 # Set of capabilities we can use to build the bundle.
493 493 if bundlecaps is None:
494 494 bundlecaps = set()
495 495 self._bundlecaps = bundlecaps
496 496 # experimental config: bundle.reorder
497 497 reorder = repo.ui.config('bundle', 'reorder', 'auto')
498 498 if reorder == 'auto':
499 499 reorder = None
500 500 else:
501 501 reorder = util.parsebool(reorder)
502 502 self._repo = repo
503 503 self._reorder = reorder
504 504 self._progress = repo.ui.progress
505 505 if self._repo.ui.verbose and not self._repo.ui.debugflag:
506 506 self._verbosenote = self._repo.ui.note
507 507 else:
508 508 self._verbosenote = lambda s: None
509 509
510 510 def close(self):
511 511 return closechunk()
512 512
513 513 def fileheader(self, fname):
514 514 return chunkheader(len(fname)) + fname
515 515
516 516 def group(self, nodelist, revlog, lookup, units=None):
517 517 """Calculate a delta group, yielding a sequence of changegroup chunks
518 518 (strings).
519 519
520 520 Given a list of changeset revs, return a set of deltas and
521 521 metadata corresponding to nodes. The first delta is
522 522 first parent(nodelist[0]) -> nodelist[0], the receiver is
523 523 guaranteed to have this parent as it has all history before
524 524 these changesets. In the case firstparent is nullrev the
525 525 changegroup starts with a full revision.
526 526
527 527 If units is not None, progress detail will be generated, units specifies
528 528 the type of revlog that is touched (changelog, manifest, etc.).
529 529 """
530 530 # if we don't have any revisions touched by these changesets, bail
531 531 if len(nodelist) == 0:
532 532 yield self.close()
533 533 return
534 534
535 535 # for generaldelta revlogs, we linearize the revs; this will both be
536 536 # much quicker and generate a much smaller bundle
537 537 if (revlog._generaldelta and self._reorder is None) or self._reorder:
538 538 dag = dagutil.revlogdag(revlog)
539 539 revs = set(revlog.rev(n) for n in nodelist)
540 540 revs = dag.linearize(revs)
541 541 else:
542 542 revs = sorted([revlog.rev(n) for n in nodelist])
543 543
544 544 # add the parent of the first rev
545 545 p = revlog.parentrevs(revs[0])[0]
546 546 revs.insert(0, p)
547 547
548 548 # build deltas
549 549 total = len(revs) - 1
550 550 msgbundling = _('bundling')
551 551 for r in xrange(len(revs) - 1):
552 552 if units is not None:
553 553 self._progress(msgbundling, r + 1, unit=units, total=total)
554 554 prev, curr = revs[r], revs[r + 1]
555 555 linknode = lookup(revlog.node(curr))
556 556 for c in self.revchunk(revlog, curr, prev, linknode):
557 557 yield c
558 558
559 559 if units is not None:
560 560 self._progress(msgbundling, None)
561 561 yield self.close()
562 562
563 563 # filter any nodes that claim to be part of the known set
564 564 def prune(self, revlog, missing, commonrevs):
565 565 rr, rl = revlog.rev, revlog.linkrev
566 566 return [n for n in missing if rl(rr(n)) not in commonrevs]
567 567
568 568 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
569 569 '''yield a sequence of changegroup chunks (strings)'''
570 570 repo = self._repo
571 571 cl = repo.changelog
572 572 ml = repo.manifest
573 573
574 574 clrevorder = {}
575 575 mfs = {} # needed manifests
576 576 fnodes = {} # needed file nodes
577 577 changedfiles = set()
578 578
579 579 # Callback for the changelog, used to collect changed files and manifest
580 580 # nodes.
581 581 # Returns the linkrev node (identity in the changelog case).
582 582 def lookupcl(x):
583 583 c = cl.read(x)
584 584 clrevorder[x] = len(clrevorder)
585 585 changedfiles.update(c[3])
586 586 # record the first changeset introducing this manifest version
587 587 mfs.setdefault(c[0], x)
588 588 return x
589 589
590 590 self._verbosenote(_('uncompressed size of bundle content:\n'))
591 591 size = 0
592 592 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
593 593 size += len(chunk)
594 594 yield chunk
595 595 self._verbosenote(_('%8.i (changelog)\n') % size)
596 596
597 597 # We need to make sure that the linkrev in the changegroup refers to
598 598 # the first changeset that introduced the manifest or file revision.
599 599 # The fastpath is usually safer than the slowpath, because the filelogs
600 600 # are walked in revlog order.
601 601 #
602 602 # When taking the slowpath with reorder=None and the manifest revlog
603 603 # uses generaldelta, the manifest may be walked in the "wrong" order.
604 604 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
605 605 # cc0ff93d0c0c).
606 606 #
607 607 # When taking the fastpath, we are only vulnerable to reordering
608 608 # of the changelog itself. The changelog never uses generaldelta, so
609 609 # it is only reordered when reorder=True. To handle this case, we
610 610 # simply take the slowpath, which already has the 'clrevorder' logic.
611 611 # This was also fixed in cc0ff93d0c0c.
612 612 fastpathlinkrev = fastpathlinkrev and not self._reorder
613 613 # Callback for the manifest, used to collect linkrevs for filelog
614 614 # revisions.
615 615 # Returns the linkrev node (collected in lookupcl).
616 616 def lookupmf(x):
617 617 clnode = mfs[x]
618 618 if not fastpathlinkrev:
619 619 mdata = ml.readfast(x)
620 620 for f, n in mdata.iteritems():
621 621 if f in changedfiles:
622 622 # record the first changeset introducing this filelog
623 623 # version
624 624 fclnodes = fnodes.setdefault(f, {})
625 625 fclnode = fclnodes.setdefault(n, clnode)
626 626 if clrevorder[clnode] < clrevorder[fclnode]:
627 627 fclnodes[n] = clnode
628 628 return clnode
629 629
630 630 mfnodes = self.prune(ml, mfs, commonrevs)
631 631 size = 0
632 632 for chunk in self.group(mfnodes, ml, lookupmf, units=_('manifests')):
633 633 size += len(chunk)
634 634 yield chunk
635 635 self._verbosenote(_('%8.i (manifests)\n') % size)
636 636
637 637 mfs.clear()
638 638 clrevs = set(cl.rev(x) for x in clnodes)
639 639
640 640 def linknodes(filerevlog, fname):
641 641 if fastpathlinkrev:
642 642 llr = filerevlog.linkrev
643 643 def genfilenodes():
644 644 for r in filerevlog:
645 645 linkrev = llr(r)
646 646 if linkrev in clrevs:
647 647 yield filerevlog.node(r), cl.node(linkrev)
648 648 return dict(genfilenodes())
649 649 return fnodes.get(fname, {})
650 650
651 651 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
652 652 source):
653 653 yield chunk
654 654
655 655 yield self.close()
656 656
657 657 if clnodes:
658 658 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
659 659
660 660 # The 'source' parameter is useful for extensions
661 661 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
662 662 repo = self._repo
663 663 progress = self._progress
664 664 msgbundling = _('bundling')
665 665
666 666 total = len(changedfiles)
667 667 # for progress output
668 668 msgfiles = _('files')
669 669 for i, fname in enumerate(sorted(changedfiles)):
670 670 filerevlog = repo.file(fname)
671 671 if not filerevlog:
672 672 raise error.Abort(_("empty or missing revlog for %s") % fname)
673 673
674 674 linkrevnodes = linknodes(filerevlog, fname)
675 675 # Lookup for filenodes, we collected the linkrev nodes above in the
676 676 # fastpath case and with lookupmf in the slowpath case.
677 677 def lookupfilelog(x):
678 678 return linkrevnodes[x]
679 679
680 680 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
681 681 if filenodes:
682 682 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
683 683 total=total)
684 684 h = self.fileheader(fname)
685 685 size = len(h)
686 686 yield h
687 687 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
688 688 size += len(chunk)
689 689 yield chunk
690 690 self._verbosenote(_('%8.i %s\n') % (size, fname))
691 691 progress(msgbundling, None)
692 692
693 693 def deltaparent(self, revlog, rev, p1, p2, prev):
694 694 return prev
695 695
696 696 def revchunk(self, revlog, rev, prev, linknode):
697 697 node = revlog.node(rev)
698 698 p1, p2 = revlog.parentrevs(rev)
699 699 base = self.deltaparent(revlog, rev, p1, p2, prev)
700 700
701 701 prefix = ''
702 702 if revlog.iscensored(base) or revlog.iscensored(rev):
703 703 try:
704 704 delta = revlog.revision(node)
705 705 except error.CensoredNodeError as e:
706 706 delta = e.tombstone
707 707 if base == nullrev:
708 708 prefix = mdiff.trivialdiffheader(len(delta))
709 709 else:
710 710 baselen = revlog.rawsize(base)
711 711 prefix = mdiff.replacediffheader(baselen, len(delta))
712 712 elif base == nullrev:
713 713 delta = revlog.revision(node)
714 714 prefix = mdiff.trivialdiffheader(len(delta))
715 715 else:
716 716 delta = revlog.revdiff(base, rev)
717 717 p1n, p2n = revlog.parents(node)
718 718 basenode = revlog.node(base)
719 719 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode)
720 720 meta += prefix
721 721 l = len(meta) + len(delta)
722 722 yield chunkheader(l)
723 723 yield meta
724 724 yield delta
725 725 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
726 726 # do nothing with basenode, it is implicitly the previous one in HG10
727 727 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
728 728
729 729 class cg2packer(cg1packer):
730 730 version = '02'
731 731 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
732 732
733 733 def __init__(self, repo, bundlecaps=None):
734 734 super(cg2packer, self).__init__(repo, bundlecaps)
735 735 if self._reorder is None:
736 736 # Since generaldelta is directly supported by cg2, reordering
737 737 # generally doesn't help, so we disable it by default (treating
738 738 # bundle.reorder=auto just like bundle.reorder=False).
739 739 self._reorder = False
740 740
741 741 def deltaparent(self, revlog, rev, p1, p2, prev):
742 742 dp = revlog.deltaparent(rev)
743 743 # avoid storing full revisions; pick prev in those cases
744 744 # also pick prev when we can't be sure remote has dp
745 745 if dp == nullrev or (dp != p1 and dp != p2 and dp != prev):
746 746 return prev
747 747 return dp
748 748
749 749 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
750 750 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
751 751
752 752 packermap = {'01': (cg1packer, cg1unpacker),
753 753 '02': (cg2packer, cg2unpacker)}
754 754
755 755 def _changegroupinfo(repo, nodes, source):
756 756 if repo.ui.verbose or source == 'bundle':
757 757 repo.ui.status(_("%d changesets found\n") % len(nodes))
758 758 if repo.ui.debugflag:
759 759 repo.ui.debug("list of changesets:\n")
760 760 for node in nodes:
761 761 repo.ui.debug("%s\n" % hex(node))
762 762
763 763 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
764 764 repo = repo.unfiltered()
765 765 commonrevs = outgoing.common
766 766 csets = outgoing.missing
767 767 heads = outgoing.missingheads
768 768 # We go through the fast path if we get told to, or if all (unfiltered
769 769 # heads have been requested (since we then know there all linkrevs will
770 770 # be pulled by the client).
771 771 heads.sort()
772 772 fastpathlinkrev = fastpath or (
773 773 repo.filtername is None and heads == sorted(repo.heads()))
774 774
775 775 repo.hook('preoutgoing', throw=True, source=source)
776 776 _changegroupinfo(repo, csets, source)
777 777 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
778 778
779 779 def getsubset(repo, outgoing, bundler, source, fastpath=False):
780 780 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
781 781 return packermap[bundler.version][1](util.chunkbuffer(gengroup), None)
782 782
783 783 def changegroupsubset(repo, roots, heads, source, version='01'):
784 784 """Compute a changegroup consisting of all the nodes that are
785 785 descendants of any of the roots and ancestors of any of the heads.
786 786 Return a chunkbuffer object whose read() method will return
787 787 successive changegroup chunks.
788 788
789 789 It is fairly complex as determining which filenodes and which
790 790 manifest nodes need to be included for the changeset to be complete
791 791 is non-trivial.
792 792
793 793 Another wrinkle is doing the reverse, figuring out which changeset in
794 794 the changegroup a particular filenode or manifestnode belongs to.
795 795 """
796 796 cl = repo.changelog
797 797 if not roots:
798 798 roots = [nullid]
799 799 discbases = []
800 800 for n in roots:
801 801 discbases.extend([p for p in cl.parents(n) if p != nullid])
802 802 # TODO: remove call to nodesbetween.
803 803 csets, roots, heads = cl.nodesbetween(roots, heads)
804 804 included = set(csets)
805 805 discbases = [n for n in discbases if n not in included]
806 806 outgoing = discovery.outgoing(cl, discbases, heads)
807 807 bundler = packermap[version][0](repo)
808 808 return getsubset(repo, outgoing, bundler, source)
809 809
810 810 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
811 811 version='01'):
812 812 """Like getbundle, but taking a discovery.outgoing as an argument.
813 813
814 814 This is only implemented for local repos and reuses potentially
815 815 precomputed sets in outgoing. Returns a raw changegroup generator."""
816 816 if not outgoing.missing:
817 817 return None
818 818 bundler = packermap[version][0](repo, bundlecaps)
819 819 return getsubsetraw(repo, outgoing, bundler, source)
820 820
821 821 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None,
822 822 version='01'):
823 823 """Like getbundle, but taking a discovery.outgoing as an argument.
824 824
825 825 This is only implemented for local repos and reuses potentially
826 826 precomputed sets in outgoing."""
827 827 if not outgoing.missing:
828 828 return None
829 829 bundler = packermap[version][0](repo, bundlecaps)
830 830 return getsubset(repo, outgoing, bundler, source)
831 831
832 832 def computeoutgoing(repo, heads, common):
833 833 """Computes which revs are outgoing given a set of common
834 834 and a set of heads.
835 835
836 836 This is a separate function so extensions can have access to
837 837 the logic.
838 838
839 839 Returns a discovery.outgoing object.
840 840 """
841 841 cl = repo.changelog
842 842 if common:
843 843 hasnode = cl.hasnode
844 844 common = [n for n in common if hasnode(n)]
845 845 else:
846 846 common = [nullid]
847 847 if not heads:
848 848 heads = cl.heads()
849 849 return discovery.outgoing(cl, common, heads)
850 850
851 851 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None,
852 852 version='01'):
853 853 """Like changegroupsubset, but returns the set difference between the
854 854 ancestors of heads and the ancestors common.
855 855
856 856 If heads is None, use the local heads. If common is None, use [nullid].
857 857
858 858 The nodes in common might not all be known locally due to the way the
859 859 current discovery protocol works.
860 860 """
861 861 outgoing = computeoutgoing(repo, heads, common)
862 862 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps,
863 863 version=version)
864 864
865 865 def changegroup(repo, basenodes, source):
866 866 # to avoid a race we use changegroupsubset() (issue1320)
867 867 return changegroupsubset(repo, basenodes, repo.heads(), source)
868 868
869 def addchangegroupfiles(repo, source, revmap, trp, pr, needfiles):
869 def _addchangegroupfiles(repo, source, revmap, trp, pr, needfiles):
870 870 revisions = 0
871 871 files = 0
872 872 while True:
873 873 chunkdata = source.filelogheader()
874 874 if not chunkdata:
875 875 break
876 876 f = chunkdata["filename"]
877 877 repo.ui.debug("adding %s revisions\n" % f)
878 878 pr()
879 879 fl = repo.file(f)
880 880 o = len(fl)
881 881 try:
882 882 if not fl.addgroup(source, revmap, trp):
883 883 raise error.Abort(_("received file revlog group is empty"))
884 884 except error.CensoredBaseError as e:
885 885 raise error.Abort(_("received delta base is censored: %s") % e)
886 886 revisions += len(fl) - o
887 887 files += 1
888 888 if f in needfiles:
889 889 needs = needfiles[f]
890 890 for new in xrange(o, len(fl)):
891 891 n = fl.node(new)
892 892 if n in needs:
893 893 needs.remove(n)
894 894 else:
895 895 raise error.Abort(
896 896 _("received spurious file revlog entry"))
897 897 if not needs:
898 898 del needfiles[f]
899 899 repo.ui.progress(_('files'), None)
900 900
901 901 for f, needs in needfiles.iteritems():
902 902 fl = repo.file(f)
903 903 for n in needs:
904 904 try:
905 905 fl.rev(n)
906 906 except error.LookupError:
907 907 raise error.Abort(
908 908 _('missing file data for %s:%s - run hg verify') %
909 909 (f, hex(n)))
910 910
911 911 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now