##// END OF EJS Templates
changegroup: call 'prechangegroup' hook before setting up write delay...
Pierre-Yves David -
r26881:6b1ea696 stable
parent child Browse files
Show More
@@ -1,952 +1,952 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullid,
19 19 nullrev,
20 20 short,
21 21 )
22 22
23 23 from . import (
24 24 branchmap,
25 25 dagutil,
26 26 discovery,
27 27 error,
28 28 mdiff,
29 29 phases,
30 30 util,
31 31 )
32 32
33 33 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
34 34 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
35 35
36 36 def readexactly(stream, n):
37 37 '''read n bytes from stream.read and abort if less was available'''
38 38 s = stream.read(n)
39 39 if len(s) < n:
40 40 raise error.Abort(_("stream ended unexpectedly"
41 41 " (got %d bytes, expected %d)")
42 42 % (len(s), n))
43 43 return s
44 44
45 45 def getchunk(stream):
46 46 """return the next chunk from stream as a string"""
47 47 d = readexactly(stream, 4)
48 48 l = struct.unpack(">l", d)[0]
49 49 if l <= 4:
50 50 if l:
51 51 raise error.Abort(_("invalid chunk length %d") % l)
52 52 return ""
53 53 return readexactly(stream, l - 4)
54 54
55 55 def chunkheader(length):
56 56 """return a changegroup chunk header (string)"""
57 57 return struct.pack(">l", length + 4)
58 58
59 59 def closechunk():
60 60 """return a changegroup chunk header (string) for a zero-length chunk"""
61 61 return struct.pack(">l", 0)
62 62
63 63 def combineresults(results):
64 64 """logic to combine 0 or more addchangegroup results into one"""
65 65 changedheads = 0
66 66 result = 1
67 67 for ret in results:
68 68 # If any changegroup result is 0, return 0
69 69 if ret == 0:
70 70 result = 0
71 71 break
72 72 if ret < -1:
73 73 changedheads += ret + 1
74 74 elif ret > 1:
75 75 changedheads += ret - 1
76 76 if changedheads > 0:
77 77 result = 1 + changedheads
78 78 elif changedheads < 0:
79 79 result = -1 + changedheads
80 80 return result
81 81
82 82 bundletypes = {
83 83 "": ("", None), # only when using unbundle on ssh and old http servers
84 84 # since the unification ssh accepts a header but there
85 85 # is no capability signaling it.
86 86 "HG20": (), # special-cased below
87 87 "HG10UN": ("HG10UN", None),
88 88 "HG10BZ": ("HG10", 'BZ'),
89 89 "HG10GZ": ("HG10GZ", 'GZ'),
90 90 }
91 91
92 92 # hgweb uses this list to communicate its preferred type
93 93 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
94 94
95 95 def writechunks(ui, chunks, filename, vfs=None):
96 96 """Write chunks to a file and return its filename.
97 97
98 98 The stream is assumed to be a bundle file.
99 99 Existing files will not be overwritten.
100 100 If no filename is specified, a temporary file is created.
101 101 """
102 102 fh = None
103 103 cleanup = None
104 104 try:
105 105 if filename:
106 106 if vfs:
107 107 fh = vfs.open(filename, "wb")
108 108 else:
109 109 fh = open(filename, "wb")
110 110 else:
111 111 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
112 112 fh = os.fdopen(fd, "wb")
113 113 cleanup = filename
114 114 for c in chunks:
115 115 fh.write(c)
116 116 cleanup = None
117 117 return filename
118 118 finally:
119 119 if fh is not None:
120 120 fh.close()
121 121 if cleanup is not None:
122 122 if filename and vfs:
123 123 vfs.unlink(cleanup)
124 124 else:
125 125 os.unlink(cleanup)
126 126
127 127 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
128 128 """Write a bundle file and return its filename.
129 129
130 130 Existing files will not be overwritten.
131 131 If no filename is specified, a temporary file is created.
132 132 bz2 compression can be turned off.
133 133 The bundle file will be deleted in case of errors.
134 134 """
135 135
136 136 if bundletype == "HG20":
137 137 from . import bundle2
138 138 bundle = bundle2.bundle20(ui)
139 139 bundle.setcompression(compression)
140 140 part = bundle.newpart('changegroup', data=cg.getchunks())
141 141 part.addparam('version', cg.version)
142 142 chunkiter = bundle.getchunks()
143 143 else:
144 144 # compression argument is only for the bundle2 case
145 145 assert compression is None
146 146 if cg.version != '01':
147 147 raise error.Abort(_('old bundle types only supports v1 '
148 148 'changegroups'))
149 149 header, comp = bundletypes[bundletype]
150 150 if comp not in util.compressors:
151 151 raise error.Abort(_('unknown stream compression type: %s')
152 152 % comp)
153 153 z = util.compressors[comp]()
154 154 subchunkiter = cg.getchunks()
155 155 def chunkiter():
156 156 yield header
157 157 for chunk in subchunkiter:
158 158 yield z.compress(chunk)
159 159 yield z.flush()
160 160 chunkiter = chunkiter()
161 161
162 162 # parse the changegroup data, otherwise we will block
163 163 # in case of sshrepo because we don't know the end of the stream
164 164
165 165 # an empty chunkgroup is the end of the changegroup
166 166 # a changegroup has at least 2 chunkgroups (changelog and manifest).
167 167 # after that, an empty chunkgroup is the end of the changegroup
168 168 return writechunks(ui, chunkiter, filename, vfs=vfs)
169 169
170 170 class cg1unpacker(object):
171 171 """Unpacker for cg1 changegroup streams.
172 172
173 173 A changegroup unpacker handles the framing of the revision data in
174 174 the wire format. Most consumers will want to use the apply()
175 175 method to add the changes from the changegroup to a repository.
176 176
177 177 If you're forwarding a changegroup unmodified to another consumer,
178 178 use getchunks(), which returns an iterator of changegroup
179 179 chunks. This is mostly useful for cases where you need to know the
180 180 data stream has ended by observing the end of the changegroup.
181 181
182 182 deltachunk() is useful only if you're applying delta data. Most
183 183 consumers should prefer apply() instead.
184 184
185 185 A few other public methods exist. Those are used only for
186 186 bundlerepo and some debug commands - their use is discouraged.
187 187 """
188 188 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
189 189 deltaheadersize = struct.calcsize(deltaheader)
190 190 version = '01'
191 191 def __init__(self, fh, alg):
192 192 if alg == 'UN':
193 193 alg = None # get more modern without breaking too much
194 194 if not alg in util.decompressors:
195 195 raise error.Abort(_('unknown stream compression type: %s')
196 196 % alg)
197 197 if alg == 'BZ':
198 198 alg = '_truncatedBZ'
199 199 self._stream = util.decompressors[alg](fh)
200 200 self._type = alg
201 201 self.callback = None
202 202
203 203 # These methods (compressed, read, seek, tell) all appear to only
204 204 # be used by bundlerepo, but it's a little hard to tell.
205 205 def compressed(self):
206 206 return self._type is not None
207 207 def read(self, l):
208 208 return self._stream.read(l)
209 209 def seek(self, pos):
210 210 return self._stream.seek(pos)
211 211 def tell(self):
212 212 return self._stream.tell()
213 213 def close(self):
214 214 return self._stream.close()
215 215
216 216 def _chunklength(self):
217 217 d = readexactly(self._stream, 4)
218 218 l = struct.unpack(">l", d)[0]
219 219 if l <= 4:
220 220 if l:
221 221 raise error.Abort(_("invalid chunk length %d") % l)
222 222 return 0
223 223 if self.callback:
224 224 self.callback()
225 225 return l - 4
226 226
227 227 def changelogheader(self):
228 228 """v10 does not have a changelog header chunk"""
229 229 return {}
230 230
231 231 def manifestheader(self):
232 232 """v10 does not have a manifest header chunk"""
233 233 return {}
234 234
235 235 def filelogheader(self):
236 236 """return the header of the filelogs chunk, v10 only has the filename"""
237 237 l = self._chunklength()
238 238 if not l:
239 239 return {}
240 240 fname = readexactly(self._stream, l)
241 241 return {'filename': fname}
242 242
243 243 def _deltaheader(self, headertuple, prevnode):
244 244 node, p1, p2, cs = headertuple
245 245 if prevnode is None:
246 246 deltabase = p1
247 247 else:
248 248 deltabase = prevnode
249 249 return node, p1, p2, deltabase, cs
250 250
251 251 def deltachunk(self, prevnode):
252 252 l = self._chunklength()
253 253 if not l:
254 254 return {}
255 255 headerdata = readexactly(self._stream, self.deltaheadersize)
256 256 header = struct.unpack(self.deltaheader, headerdata)
257 257 delta = readexactly(self._stream, l - self.deltaheadersize)
258 258 node, p1, p2, deltabase, cs = self._deltaheader(header, prevnode)
259 259 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
260 260 'deltabase': deltabase, 'delta': delta}
261 261
262 262 def getchunks(self):
263 263 """returns all the chunks contains in the bundle
264 264
265 265 Used when you need to forward the binary stream to a file or another
266 266 network API. To do so, it parse the changegroup data, otherwise it will
267 267 block in case of sshrepo because it don't know the end of the stream.
268 268 """
269 269 # an empty chunkgroup is the end of the changegroup
270 270 # a changegroup has at least 2 chunkgroups (changelog and manifest).
271 271 # after that, an empty chunkgroup is the end of the changegroup
272 272 empty = False
273 273 count = 0
274 274 while not empty or count <= 2:
275 275 empty = True
276 276 count += 1
277 277 while True:
278 278 chunk = getchunk(self)
279 279 if not chunk:
280 280 break
281 281 empty = False
282 282 yield chunkheader(len(chunk))
283 283 pos = 0
284 284 while pos < len(chunk):
285 285 next = pos + 2**20
286 286 yield chunk[pos:next]
287 287 pos = next
288 288 yield closechunk()
289 289
290 290 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
291 291 # We know that we'll never have more manifests than we had
292 292 # changesets.
293 293 self.callback = prog(_('manifests'), numchanges)
294 294 # no need to check for empty manifest group here:
295 295 # if the result of the merge of 1 and 2 is the same in 3 and 4,
296 296 # no new manifest will be created and the manifest group will
297 297 # be empty during the pull
298 298 self.manifestheader()
299 299 repo.manifest.addgroup(self, revmap, trp)
300 300 repo.ui.progress(_('manifests'), None)
301 301
302 302 def apply(self, repo, srctype, url, emptyok=False,
303 303 targetphase=phases.draft, expectedtotal=None):
304 304 """Add the changegroup returned by source.read() to this repo.
305 305 srctype is a string like 'push', 'pull', or 'unbundle'. url is
306 306 the URL of the repo where this changegroup is coming from.
307 307
308 308 Return an integer summarizing the change to this repo:
309 309 - nothing changed or no source: 0
310 310 - more heads than before: 1+added heads (2..n)
311 311 - fewer heads than before: -1-removed heads (-2..-n)
312 312 - number of heads stays the same: 1
313 313 """
314 314 repo = repo.unfiltered()
315 315 def csmap(x):
316 316 repo.ui.debug("add changeset %s\n" % short(x))
317 317 return len(cl)
318 318
319 319 def revmap(x):
320 320 return cl.rev(x)
321 321
322 322 changesets = files = revisions = 0
323 323
324 324 tr = repo.transaction("\n".join([srctype, util.hidepassword(url)]))
325 325 try:
326 326 # The transaction could have been created before and already
327 327 # carries source information. In this case we use the top
328 328 # level data. We overwrite the argument because we need to use
329 329 # the top level value (if they exist) in this function.
330 330 srctype = tr.hookargs.setdefault('source', srctype)
331 331 url = tr.hookargs.setdefault('url', url)
332 repo.hook('prechangegroup', throw=True, **tr.hookargs)
332 333
333 334 # write changelog data to temp files so concurrent readers
334 335 # will not see an inconsistent view
335 336 cl = repo.changelog
336 337 cl.delayupdate(tr)
337 338 oldheads = cl.heads()
338 repo.hook('prechangegroup', throw=True, **tr.hookargs)
339 339
340 340 trp = weakref.proxy(tr)
341 341 # pull off the changeset group
342 342 repo.ui.status(_("adding changesets\n"))
343 343 clstart = len(cl)
344 344 class prog(object):
345 345 def __init__(self, step, total):
346 346 self._step = step
347 347 self._total = total
348 348 self._count = 1
349 349 def __call__(self):
350 350 repo.ui.progress(self._step, self._count, unit=_('chunks'),
351 351 total=self._total)
352 352 self._count += 1
353 353 self.callback = prog(_('changesets'), expectedtotal)
354 354
355 355 efiles = set()
356 356 def onchangelog(cl, node):
357 357 efiles.update(cl.read(node)[3])
358 358
359 359 self.changelogheader()
360 360 srccontent = cl.addgroup(self, csmap, trp,
361 361 addrevisioncb=onchangelog)
362 362 efiles = len(efiles)
363 363
364 364 if not (srccontent or emptyok):
365 365 raise error.Abort(_("received changelog group is empty"))
366 366 clend = len(cl)
367 367 changesets = clend - clstart
368 368 repo.ui.progress(_('changesets'), None)
369 369
370 370 # pull off the manifest group
371 371 repo.ui.status(_("adding manifests\n"))
372 372 self._unpackmanifests(repo, revmap, trp, prog, changesets)
373 373
374 374 needfiles = {}
375 375 if repo.ui.configbool('server', 'validate', default=False):
376 376 # validate incoming csets have their manifests
377 377 for cset in xrange(clstart, clend):
378 378 mfnode = repo.changelog.read(repo.changelog.node(cset))[0]
379 379 mfest = repo.manifest.readdelta(mfnode)
380 380 # store file nodes we must see
381 381 for f, n in mfest.iteritems():
382 382 needfiles.setdefault(f, set()).add(n)
383 383
384 384 # process the files
385 385 repo.ui.status(_("adding file changes\n"))
386 386 self.callback = None
387 387 pr = prog(_('files'), efiles)
388 388 newrevs, newfiles = _addchangegroupfiles(
389 389 repo, self, revmap, trp, pr, needfiles)
390 390 revisions += newrevs
391 391 files += newfiles
392 392
393 393 dh = 0
394 394 if oldheads:
395 395 heads = cl.heads()
396 396 dh = len(heads) - len(oldheads)
397 397 for h in heads:
398 398 if h not in oldheads and repo[h].closesbranch():
399 399 dh -= 1
400 400 htext = ""
401 401 if dh:
402 402 htext = _(" (%+d heads)") % dh
403 403
404 404 repo.ui.status(_("added %d changesets"
405 405 " with %d changes to %d files%s\n")
406 406 % (changesets, revisions, files, htext))
407 407 repo.invalidatevolatilesets()
408 408
409 409 # Call delayupdate again to ensure the transaction writepending
410 410 # subscriptions are still in place.
411 411 cl.delayupdate(tr)
412 412
413 413 if changesets > 0:
414 414 if 'node' not in tr.hookargs:
415 415 tr.hookargs['node'] = hex(cl.node(clstart))
416 416 hookargs = dict(tr.hookargs)
417 417 else:
418 418 hookargs = dict(tr.hookargs)
419 419 hookargs['node'] = hex(cl.node(clstart))
420 420 repo.hook('pretxnchangegroup', throw=True, **hookargs)
421 421
422 422 added = [cl.node(r) for r in xrange(clstart, clend)]
423 423 publishing = repo.publishing()
424 424 if srctype in ('push', 'serve'):
425 425 # Old servers can not push the boundary themselves.
426 426 # New servers won't push the boundary if changeset already
427 427 # exists locally as secret
428 428 #
429 429 # We should not use added here but the list of all change in
430 430 # the bundle
431 431 if publishing:
432 432 phases.advanceboundary(repo, tr, phases.public, srccontent)
433 433 else:
434 434 # Those changesets have been pushed from the outside, their
435 435 # phases are going to be pushed alongside. Therefor
436 436 # `targetphase` is ignored.
437 437 phases.advanceboundary(repo, tr, phases.draft, srccontent)
438 438 phases.retractboundary(repo, tr, phases.draft, added)
439 439 elif srctype != 'strip':
440 440 # publishing only alter behavior during push
441 441 #
442 442 # strip should not touch boundary at all
443 443 phases.retractboundary(repo, tr, targetphase, added)
444 444
445 445 if changesets > 0:
446 446 if srctype != 'strip':
447 447 # During strip, branchcache is invalid but coming call to
448 448 # `destroyed` will repair it.
449 449 # In other case we can safely update cache on disk.
450 450 branchmap.updatecache(repo.filtered('served'))
451 451
452 452 def runhooks():
453 453 # These hooks run when the lock releases, not when the
454 454 # transaction closes. So it's possible for the changelog
455 455 # to have changed since we last saw it.
456 456 if clstart >= len(repo):
457 457 return
458 458
459 459 # forcefully update the on-disk branch cache
460 460 repo.ui.debug("updating the branch cache\n")
461 461 repo.hook("changegroup", **hookargs)
462 462
463 463 for n in added:
464 464 args = hookargs.copy()
465 465 args['node'] = hex(n)
466 466 repo.hook("incoming", **args)
467 467
468 468 newheads = [h for h in repo.heads() if h not in oldheads]
469 469 repo.ui.log("incoming",
470 470 "%s incoming changes - new heads: %s\n",
471 471 len(added),
472 472 ', '.join([hex(c[:6]) for c in newheads]))
473 473
474 474 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
475 475 lambda tr: repo._afterlock(runhooks))
476 476
477 477 tr.close()
478 478
479 479 finally:
480 480 tr.release()
481 481 repo.ui.flush()
482 482 # never return 0 here:
483 483 if dh < 0:
484 484 return dh - 1
485 485 else:
486 486 return dh + 1
487 487
488 488 class cg2unpacker(cg1unpacker):
489 489 """Unpacker for cg2 streams.
490 490
491 491 cg2 streams add support for generaldelta, so the delta header
492 492 format is slightly different. All other features about the data
493 493 remain the same.
494 494 """
495 495 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
496 496 deltaheadersize = struct.calcsize(deltaheader)
497 497 version = '02'
498 498
499 499 def _deltaheader(self, headertuple, prevnode):
500 500 node, p1, p2, deltabase, cs = headertuple
501 501 return node, p1, p2, deltabase, cs
502 502
503 503 class headerlessfixup(object):
504 504 def __init__(self, fh, h):
505 505 self._h = h
506 506 self._fh = fh
507 507 def read(self, n):
508 508 if self._h:
509 509 d, self._h = self._h[:n], self._h[n:]
510 510 if len(d) < n:
511 511 d += readexactly(self._fh, n - len(d))
512 512 return d
513 513 return readexactly(self._fh, n)
514 514
515 515 class cg1packer(object):
516 516 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
517 517 version = '01'
518 518 def __init__(self, repo, bundlecaps=None):
519 519 """Given a source repo, construct a bundler.
520 520
521 521 bundlecaps is optional and can be used to specify the set of
522 522 capabilities which can be used to build the bundle.
523 523 """
524 524 # Set of capabilities we can use to build the bundle.
525 525 if bundlecaps is None:
526 526 bundlecaps = set()
527 527 self._bundlecaps = bundlecaps
528 528 # experimental config: bundle.reorder
529 529 reorder = repo.ui.config('bundle', 'reorder', 'auto')
530 530 if reorder == 'auto':
531 531 reorder = None
532 532 else:
533 533 reorder = util.parsebool(reorder)
534 534 self._repo = repo
535 535 self._reorder = reorder
536 536 self._progress = repo.ui.progress
537 537 if self._repo.ui.verbose and not self._repo.ui.debugflag:
538 538 self._verbosenote = self._repo.ui.note
539 539 else:
540 540 self._verbosenote = lambda s: None
541 541
542 542 def close(self):
543 543 return closechunk()
544 544
545 545 def fileheader(self, fname):
546 546 return chunkheader(len(fname)) + fname
547 547
548 548 def group(self, nodelist, revlog, lookup, units=None):
549 549 """Calculate a delta group, yielding a sequence of changegroup chunks
550 550 (strings).
551 551
552 552 Given a list of changeset revs, return a set of deltas and
553 553 metadata corresponding to nodes. The first delta is
554 554 first parent(nodelist[0]) -> nodelist[0], the receiver is
555 555 guaranteed to have this parent as it has all history before
556 556 these changesets. In the case firstparent is nullrev the
557 557 changegroup starts with a full revision.
558 558
559 559 If units is not None, progress detail will be generated, units specifies
560 560 the type of revlog that is touched (changelog, manifest, etc.).
561 561 """
562 562 # if we don't have any revisions touched by these changesets, bail
563 563 if len(nodelist) == 0:
564 564 yield self.close()
565 565 return
566 566
567 567 # for generaldelta revlogs, we linearize the revs; this will both be
568 568 # much quicker and generate a much smaller bundle
569 569 if (revlog._generaldelta and self._reorder is None) or self._reorder:
570 570 dag = dagutil.revlogdag(revlog)
571 571 revs = set(revlog.rev(n) for n in nodelist)
572 572 revs = dag.linearize(revs)
573 573 else:
574 574 revs = sorted([revlog.rev(n) for n in nodelist])
575 575
576 576 # add the parent of the first rev
577 577 p = revlog.parentrevs(revs[0])[0]
578 578 revs.insert(0, p)
579 579
580 580 # build deltas
581 581 total = len(revs) - 1
582 582 msgbundling = _('bundling')
583 583 for r in xrange(len(revs) - 1):
584 584 if units is not None:
585 585 self._progress(msgbundling, r + 1, unit=units, total=total)
586 586 prev, curr = revs[r], revs[r + 1]
587 587 linknode = lookup(revlog.node(curr))
588 588 for c in self.revchunk(revlog, curr, prev, linknode):
589 589 yield c
590 590
591 591 if units is not None:
592 592 self._progress(msgbundling, None)
593 593 yield self.close()
594 594
595 595 # filter any nodes that claim to be part of the known set
596 596 def prune(self, revlog, missing, commonrevs):
597 597 rr, rl = revlog.rev, revlog.linkrev
598 598 return [n for n in missing if rl(rr(n)) not in commonrevs]
599 599
600 600 def _packmanifests(self, mfnodes, lookuplinknode):
601 601 """Pack flat manifests into a changegroup stream."""
602 602 ml = self._repo.manifest
603 603 size = 0
604 604 for chunk in self.group(
605 605 mfnodes, ml, lookuplinknode, units=_('manifests')):
606 606 size += len(chunk)
607 607 yield chunk
608 608 self._verbosenote(_('%8.i (manifests)\n') % size)
609 609
610 610 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
611 611 '''yield a sequence of changegroup chunks (strings)'''
612 612 repo = self._repo
613 613 cl = repo.changelog
614 614 ml = repo.manifest
615 615
616 616 clrevorder = {}
617 617 mfs = {} # needed manifests
618 618 fnodes = {} # needed file nodes
619 619 changedfiles = set()
620 620
621 621 # Callback for the changelog, used to collect changed files and manifest
622 622 # nodes.
623 623 # Returns the linkrev node (identity in the changelog case).
624 624 def lookupcl(x):
625 625 c = cl.read(x)
626 626 clrevorder[x] = len(clrevorder)
627 627 changedfiles.update(c[3])
628 628 # record the first changeset introducing this manifest version
629 629 mfs.setdefault(c[0], x)
630 630 return x
631 631
632 632 self._verbosenote(_('uncompressed size of bundle content:\n'))
633 633 size = 0
634 634 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
635 635 size += len(chunk)
636 636 yield chunk
637 637 self._verbosenote(_('%8.i (changelog)\n') % size)
638 638
639 639 # We need to make sure that the linkrev in the changegroup refers to
640 640 # the first changeset that introduced the manifest or file revision.
641 641 # The fastpath is usually safer than the slowpath, because the filelogs
642 642 # are walked in revlog order.
643 643 #
644 644 # When taking the slowpath with reorder=None and the manifest revlog
645 645 # uses generaldelta, the manifest may be walked in the "wrong" order.
646 646 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
647 647 # cc0ff93d0c0c).
648 648 #
649 649 # When taking the fastpath, we are only vulnerable to reordering
650 650 # of the changelog itself. The changelog never uses generaldelta, so
651 651 # it is only reordered when reorder=True. To handle this case, we
652 652 # simply take the slowpath, which already has the 'clrevorder' logic.
653 653 # This was also fixed in cc0ff93d0c0c.
654 654 fastpathlinkrev = fastpathlinkrev and not self._reorder
655 655 # Callback for the manifest, used to collect linkrevs for filelog
656 656 # revisions.
657 657 # Returns the linkrev node (collected in lookupcl).
658 658 def lookupmflinknode(x):
659 659 clnode = mfs[x]
660 660 if not fastpathlinkrev:
661 661 mdata = ml.readfast(x)
662 662 for f, n in mdata.iteritems():
663 663 if f in changedfiles:
664 664 # record the first changeset introducing this filelog
665 665 # version
666 666 fclnodes = fnodes.setdefault(f, {})
667 667 fclnode = fclnodes.setdefault(n, clnode)
668 668 if clrevorder[clnode] < clrevorder[fclnode]:
669 669 fclnodes[n] = clnode
670 670 return clnode
671 671
672 672 mfnodes = self.prune(ml, mfs, commonrevs)
673 673 for x in self._packmanifests(mfnodes, lookupmflinknode):
674 674 yield x
675 675
676 676 mfs.clear()
677 677 clrevs = set(cl.rev(x) for x in clnodes)
678 678
679 679 def linknodes(filerevlog, fname):
680 680 if fastpathlinkrev:
681 681 llr = filerevlog.linkrev
682 682 def genfilenodes():
683 683 for r in filerevlog:
684 684 linkrev = llr(r)
685 685 if linkrev in clrevs:
686 686 yield filerevlog.node(r), cl.node(linkrev)
687 687 return dict(genfilenodes())
688 688 return fnodes.get(fname, {})
689 689
690 690 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
691 691 source):
692 692 yield chunk
693 693
694 694 yield self.close()
695 695
696 696 if clnodes:
697 697 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
698 698
699 699 # The 'source' parameter is useful for extensions
700 700 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
701 701 repo = self._repo
702 702 progress = self._progress
703 703 msgbundling = _('bundling')
704 704
705 705 total = len(changedfiles)
706 706 # for progress output
707 707 msgfiles = _('files')
708 708 for i, fname in enumerate(sorted(changedfiles)):
709 709 filerevlog = repo.file(fname)
710 710 if not filerevlog:
711 711 raise error.Abort(_("empty or missing revlog for %s") % fname)
712 712
713 713 linkrevnodes = linknodes(filerevlog, fname)
714 714 # Lookup for filenodes, we collected the linkrev nodes above in the
715 715 # fastpath case and with lookupmf in the slowpath case.
716 716 def lookupfilelog(x):
717 717 return linkrevnodes[x]
718 718
719 719 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
720 720 if filenodes:
721 721 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
722 722 total=total)
723 723 h = self.fileheader(fname)
724 724 size = len(h)
725 725 yield h
726 726 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
727 727 size += len(chunk)
728 728 yield chunk
729 729 self._verbosenote(_('%8.i %s\n') % (size, fname))
730 730 progress(msgbundling, None)
731 731
732 732 def deltaparent(self, revlog, rev, p1, p2, prev):
733 733 return prev
734 734
735 735 def revchunk(self, revlog, rev, prev, linknode):
736 736 node = revlog.node(rev)
737 737 p1, p2 = revlog.parentrevs(rev)
738 738 base = self.deltaparent(revlog, rev, p1, p2, prev)
739 739
740 740 prefix = ''
741 741 if revlog.iscensored(base) or revlog.iscensored(rev):
742 742 try:
743 743 delta = revlog.revision(node)
744 744 except error.CensoredNodeError as e:
745 745 delta = e.tombstone
746 746 if base == nullrev:
747 747 prefix = mdiff.trivialdiffheader(len(delta))
748 748 else:
749 749 baselen = revlog.rawsize(base)
750 750 prefix = mdiff.replacediffheader(baselen, len(delta))
751 751 elif base == nullrev:
752 752 delta = revlog.revision(node)
753 753 prefix = mdiff.trivialdiffheader(len(delta))
754 754 else:
755 755 delta = revlog.revdiff(base, rev)
756 756 p1n, p2n = revlog.parents(node)
757 757 basenode = revlog.node(base)
758 758 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode)
759 759 meta += prefix
760 760 l = len(meta) + len(delta)
761 761 yield chunkheader(l)
762 762 yield meta
763 763 yield delta
764 764 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
765 765 # do nothing with basenode, it is implicitly the previous one in HG10
766 766 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
767 767
768 768 class cg2packer(cg1packer):
769 769 version = '02'
770 770 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
771 771
772 772 def __init__(self, repo, bundlecaps=None):
773 773 super(cg2packer, self).__init__(repo, bundlecaps)
774 774 if self._reorder is None:
775 775 # Since generaldelta is directly supported by cg2, reordering
776 776 # generally doesn't help, so we disable it by default (treating
777 777 # bundle.reorder=auto just like bundle.reorder=False).
778 778 self._reorder = False
779 779
780 780 def deltaparent(self, revlog, rev, p1, p2, prev):
781 781 dp = revlog.deltaparent(rev)
782 782 # avoid storing full revisions; pick prev in those cases
783 783 # also pick prev when we can't be sure remote has dp
784 784 if dp == nullrev or (dp != p1 and dp != p2 and dp != prev):
785 785 return prev
786 786 return dp
787 787
788 788 def builddeltaheader(self, node, p1n, p2n, basenode, linknode):
789 789 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
790 790
791 791 packermap = {'01': (cg1packer, cg1unpacker),
792 792 # cg2 adds support for exchanging generaldelta
793 793 '02': (cg2packer, cg2unpacker),
794 794 }
795 795
796 796 def _changegroupinfo(repo, nodes, source):
797 797 if repo.ui.verbose or source == 'bundle':
798 798 repo.ui.status(_("%d changesets found\n") % len(nodes))
799 799 if repo.ui.debugflag:
800 800 repo.ui.debug("list of changesets:\n")
801 801 for node in nodes:
802 802 repo.ui.debug("%s\n" % hex(node))
803 803
804 804 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
805 805 repo = repo.unfiltered()
806 806 commonrevs = outgoing.common
807 807 csets = outgoing.missing
808 808 heads = outgoing.missingheads
809 809 # We go through the fast path if we get told to, or if all (unfiltered
810 810 # heads have been requested (since we then know there all linkrevs will
811 811 # be pulled by the client).
812 812 heads.sort()
813 813 fastpathlinkrev = fastpath or (
814 814 repo.filtername is None and heads == sorted(repo.heads()))
815 815
816 816 repo.hook('preoutgoing', throw=True, source=source)
817 817 _changegroupinfo(repo, csets, source)
818 818 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
819 819
820 820 def getsubset(repo, outgoing, bundler, source, fastpath=False):
821 821 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
822 822 return packermap[bundler.version][1](util.chunkbuffer(gengroup), None)
823 823
824 824 def changegroupsubset(repo, roots, heads, source, version='01'):
825 825 """Compute a changegroup consisting of all the nodes that are
826 826 descendants of any of the roots and ancestors of any of the heads.
827 827 Return a chunkbuffer object whose read() method will return
828 828 successive changegroup chunks.
829 829
830 830 It is fairly complex as determining which filenodes and which
831 831 manifest nodes need to be included for the changeset to be complete
832 832 is non-trivial.
833 833
834 834 Another wrinkle is doing the reverse, figuring out which changeset in
835 835 the changegroup a particular filenode or manifestnode belongs to.
836 836 """
837 837 cl = repo.changelog
838 838 if not roots:
839 839 roots = [nullid]
840 840 discbases = []
841 841 for n in roots:
842 842 discbases.extend([p for p in cl.parents(n) if p != nullid])
843 843 # TODO: remove call to nodesbetween.
844 844 csets, roots, heads = cl.nodesbetween(roots, heads)
845 845 included = set(csets)
846 846 discbases = [n for n in discbases if n not in included]
847 847 outgoing = discovery.outgoing(cl, discbases, heads)
848 848 bundler = packermap[version][0](repo)
849 849 return getsubset(repo, outgoing, bundler, source)
850 850
851 851 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
852 852 version='01'):
853 853 """Like getbundle, but taking a discovery.outgoing as an argument.
854 854
855 855 This is only implemented for local repos and reuses potentially
856 856 precomputed sets in outgoing. Returns a raw changegroup generator."""
857 857 if not outgoing.missing:
858 858 return None
859 859 bundler = packermap[version][0](repo, bundlecaps)
860 860 return getsubsetraw(repo, outgoing, bundler, source)
861 861
862 862 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None,
863 863 version='01'):
864 864 """Like getbundle, but taking a discovery.outgoing as an argument.
865 865
866 866 This is only implemented for local repos and reuses potentially
867 867 precomputed sets in outgoing."""
868 868 if not outgoing.missing:
869 869 return None
870 870 bundler = packermap[version][0](repo, bundlecaps)
871 871 return getsubset(repo, outgoing, bundler, source)
872 872
873 873 def computeoutgoing(repo, heads, common):
874 874 """Computes which revs are outgoing given a set of common
875 875 and a set of heads.
876 876
877 877 This is a separate function so extensions can have access to
878 878 the logic.
879 879
880 880 Returns a discovery.outgoing object.
881 881 """
882 882 cl = repo.changelog
883 883 if common:
884 884 hasnode = cl.hasnode
885 885 common = [n for n in common if hasnode(n)]
886 886 else:
887 887 common = [nullid]
888 888 if not heads:
889 889 heads = cl.heads()
890 890 return discovery.outgoing(cl, common, heads)
891 891
892 892 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None,
893 893 version='01'):
894 894 """Like changegroupsubset, but returns the set difference between the
895 895 ancestors of heads and the ancestors common.
896 896
897 897 If heads is None, use the local heads. If common is None, use [nullid].
898 898
899 899 The nodes in common might not all be known locally due to the way the
900 900 current discovery protocol works.
901 901 """
902 902 outgoing = computeoutgoing(repo, heads, common)
903 903 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps,
904 904 version=version)
905 905
906 906 def changegroup(repo, basenodes, source):
907 907 # to avoid a race we use changegroupsubset() (issue1320)
908 908 return changegroupsubset(repo, basenodes, repo.heads(), source)
909 909
910 910 def _addchangegroupfiles(repo, source, revmap, trp, pr, needfiles):
911 911 revisions = 0
912 912 files = 0
913 913 while True:
914 914 chunkdata = source.filelogheader()
915 915 if not chunkdata:
916 916 break
917 917 f = chunkdata["filename"]
918 918 repo.ui.debug("adding %s revisions\n" % f)
919 919 pr()
920 920 fl = repo.file(f)
921 921 o = len(fl)
922 922 try:
923 923 if not fl.addgroup(source, revmap, trp):
924 924 raise error.Abort(_("received file revlog group is empty"))
925 925 except error.CensoredBaseError as e:
926 926 raise error.Abort(_("received delta base is censored: %s") % e)
927 927 revisions += len(fl) - o
928 928 files += 1
929 929 if f in needfiles:
930 930 needs = needfiles[f]
931 931 for new in xrange(o, len(fl)):
932 932 n = fl.node(new)
933 933 if n in needs:
934 934 needs.remove(n)
935 935 else:
936 936 raise error.Abort(
937 937 _("received spurious file revlog entry"))
938 938 if not needs:
939 939 del needfiles[f]
940 940 repo.ui.progress(_('files'), None)
941 941
942 942 for f, needs in needfiles.iteritems():
943 943 fl = repo.file(f)
944 944 for n in needs:
945 945 try:
946 946 fl.rev(n)
947 947 except error.LookupError:
948 948 raise error.Abort(
949 949 _('missing file data for %s:%s - run hg verify') %
950 950 (f, hex(n)))
951 951
952 952 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now