##// END OF EJS Templates
changegroup: progress for added files is not measured in "chunks"...
Martin von Zweigbergk -
r28361:277a22cd default
parent child Browse files
Show More
@@ -1,1113 +1,1113 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullid,
19 19 nullrev,
20 20 short,
21 21 )
22 22
23 23 from . import (
24 24 branchmap,
25 25 dagutil,
26 26 discovery,
27 27 error,
28 28 mdiff,
29 29 phases,
30 30 util,
31 31 )
32 32
33 33 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
34 34 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
35 35 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
36 36
37 37 def readexactly(stream, n):
38 38 '''read n bytes from stream.read and abort if less was available'''
39 39 s = stream.read(n)
40 40 if len(s) < n:
41 41 raise error.Abort(_("stream ended unexpectedly"
42 42 " (got %d bytes, expected %d)")
43 43 % (len(s), n))
44 44 return s
45 45
46 46 def getchunk(stream):
47 47 """return the next chunk from stream as a string"""
48 48 d = readexactly(stream, 4)
49 49 l = struct.unpack(">l", d)[0]
50 50 if l <= 4:
51 51 if l:
52 52 raise error.Abort(_("invalid chunk length %d") % l)
53 53 return ""
54 54 return readexactly(stream, l - 4)
55 55
56 56 def chunkheader(length):
57 57 """return a changegroup chunk header (string)"""
58 58 return struct.pack(">l", length + 4)
59 59
60 60 def closechunk():
61 61 """return a changegroup chunk header (string) for a zero-length chunk"""
62 62 return struct.pack(">l", 0)
63 63
64 64 def combineresults(results):
65 65 """logic to combine 0 or more addchangegroup results into one"""
66 66 changedheads = 0
67 67 result = 1
68 68 for ret in results:
69 69 # If any changegroup result is 0, return 0
70 70 if ret == 0:
71 71 result = 0
72 72 break
73 73 if ret < -1:
74 74 changedheads += ret + 1
75 75 elif ret > 1:
76 76 changedheads += ret - 1
77 77 if changedheads > 0:
78 78 result = 1 + changedheads
79 79 elif changedheads < 0:
80 80 result = -1 + changedheads
81 81 return result
82 82
83 83 bundletypes = {
84 84 "": ("", None), # only when using unbundle on ssh and old http servers
85 85 # since the unification ssh accepts a header but there
86 86 # is no capability signaling it.
87 87 "HG20": (), # special-cased below
88 88 "HG10UN": ("HG10UN", None),
89 89 "HG10BZ": ("HG10", 'BZ'),
90 90 "HG10GZ": ("HG10GZ", 'GZ'),
91 91 }
92 92
93 93 # hgweb uses this list to communicate its preferred type
94 94 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
95 95
96 96 def writechunks(ui, chunks, filename, vfs=None):
97 97 """Write chunks to a file and return its filename.
98 98
99 99 The stream is assumed to be a bundle file.
100 100 Existing files will not be overwritten.
101 101 If no filename is specified, a temporary file is created.
102 102 """
103 103 fh = None
104 104 cleanup = None
105 105 try:
106 106 if filename:
107 107 if vfs:
108 108 fh = vfs.open(filename, "wb")
109 109 else:
110 110 fh = open(filename, "wb")
111 111 else:
112 112 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
113 113 fh = os.fdopen(fd, "wb")
114 114 cleanup = filename
115 115 for c in chunks:
116 116 fh.write(c)
117 117 cleanup = None
118 118 return filename
119 119 finally:
120 120 if fh is not None:
121 121 fh.close()
122 122 if cleanup is not None:
123 123 if filename and vfs:
124 124 vfs.unlink(cleanup)
125 125 else:
126 126 os.unlink(cleanup)
127 127
128 128 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
129 129 """Write a bundle file and return its filename.
130 130
131 131 Existing files will not be overwritten.
132 132 If no filename is specified, a temporary file is created.
133 133 bz2 compression can be turned off.
134 134 The bundle file will be deleted in case of errors.
135 135 """
136 136
137 137 if bundletype == "HG20":
138 138 from . import bundle2
139 139 bundle = bundle2.bundle20(ui)
140 140 bundle.setcompression(compression)
141 141 part = bundle.newpart('changegroup', data=cg.getchunks())
142 142 part.addparam('version', cg.version)
143 143 chunkiter = bundle.getchunks()
144 144 else:
145 145 # compression argument is only for the bundle2 case
146 146 assert compression is None
147 147 if cg.version != '01':
148 148 raise error.Abort(_('old bundle types only supports v1 '
149 149 'changegroups'))
150 150 header, comp = bundletypes[bundletype]
151 151 if comp not in util.compressors:
152 152 raise error.Abort(_('unknown stream compression type: %s')
153 153 % comp)
154 154 z = util.compressors[comp]()
155 155 subchunkiter = cg.getchunks()
156 156 def chunkiter():
157 157 yield header
158 158 for chunk in subchunkiter:
159 159 yield z.compress(chunk)
160 160 yield z.flush()
161 161 chunkiter = chunkiter()
162 162
163 163 # parse the changegroup data, otherwise we will block
164 164 # in case of sshrepo because we don't know the end of the stream
165 165
166 166 # an empty chunkgroup is the end of the changegroup
167 167 # a changegroup has at least 2 chunkgroups (changelog and manifest).
168 168 # after that, an empty chunkgroup is the end of the changegroup
169 169 return writechunks(ui, chunkiter, filename, vfs=vfs)
170 170
171 171 class cg1unpacker(object):
172 172 """Unpacker for cg1 changegroup streams.
173 173
174 174 A changegroup unpacker handles the framing of the revision data in
175 175 the wire format. Most consumers will want to use the apply()
176 176 method to add the changes from the changegroup to a repository.
177 177
178 178 If you're forwarding a changegroup unmodified to another consumer,
179 179 use getchunks(), which returns an iterator of changegroup
180 180 chunks. This is mostly useful for cases where you need to know the
181 181 data stream has ended by observing the end of the changegroup.
182 182
183 183 deltachunk() is useful only if you're applying delta data. Most
184 184 consumers should prefer apply() instead.
185 185
186 186 A few other public methods exist. Those are used only for
187 187 bundlerepo and some debug commands - their use is discouraged.
188 188 """
189 189 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
190 190 deltaheadersize = struct.calcsize(deltaheader)
191 191 version = '01'
192 192 _grouplistcount = 1 # One list of files after the manifests
193 193
194 194 def __init__(self, fh, alg):
195 195 if alg == 'UN':
196 196 alg = None # get more modern without breaking too much
197 197 if not alg in util.decompressors:
198 198 raise error.Abort(_('unknown stream compression type: %s')
199 199 % alg)
200 200 if alg == 'BZ':
201 201 alg = '_truncatedBZ'
202 202 self._stream = util.decompressors[alg](fh)
203 203 self._type = alg
204 204 self.callback = None
205 205
206 206 # These methods (compressed, read, seek, tell) all appear to only
207 207 # be used by bundlerepo, but it's a little hard to tell.
208 208 def compressed(self):
209 209 return self._type is not None
210 210 def read(self, l):
211 211 return self._stream.read(l)
212 212 def seek(self, pos):
213 213 return self._stream.seek(pos)
214 214 def tell(self):
215 215 return self._stream.tell()
216 216 def close(self):
217 217 return self._stream.close()
218 218
219 219 def _chunklength(self):
220 220 d = readexactly(self._stream, 4)
221 221 l = struct.unpack(">l", d)[0]
222 222 if l <= 4:
223 223 if l:
224 224 raise error.Abort(_("invalid chunk length %d") % l)
225 225 return 0
226 226 if self.callback:
227 227 self.callback()
228 228 return l - 4
229 229
230 230 def changelogheader(self):
231 231 """v10 does not have a changelog header chunk"""
232 232 return {}
233 233
234 234 def manifestheader(self):
235 235 """v10 does not have a manifest header chunk"""
236 236 return {}
237 237
238 238 def filelogheader(self):
239 239 """return the header of the filelogs chunk, v10 only has the filename"""
240 240 l = self._chunklength()
241 241 if not l:
242 242 return {}
243 243 fname = readexactly(self._stream, l)
244 244 return {'filename': fname}
245 245
246 246 def _deltaheader(self, headertuple, prevnode):
247 247 node, p1, p2, cs = headertuple
248 248 if prevnode is None:
249 249 deltabase = p1
250 250 else:
251 251 deltabase = prevnode
252 252 flags = 0
253 253 return node, p1, p2, deltabase, cs, flags
254 254
255 255 def deltachunk(self, prevnode):
256 256 l = self._chunklength()
257 257 if not l:
258 258 return {}
259 259 headerdata = readexactly(self._stream, self.deltaheadersize)
260 260 header = struct.unpack(self.deltaheader, headerdata)
261 261 delta = readexactly(self._stream, l - self.deltaheadersize)
262 262 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
263 263 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
264 264 'deltabase': deltabase, 'delta': delta, 'flags': flags}
265 265
266 266 def getchunks(self):
267 267 """returns all the chunks contains in the bundle
268 268
269 269 Used when you need to forward the binary stream to a file or another
270 270 network API. To do so, it parse the changegroup data, otherwise it will
271 271 block in case of sshrepo because it don't know the end of the stream.
272 272 """
273 273 # an empty chunkgroup is the end of the changegroup
274 274 # a changegroup has at least 2 chunkgroups (changelog and manifest).
275 275 # after that, changegroup versions 1 and 2 have a series of groups
276 276 # with one group per file. changegroup 3 has a series of directory
277 277 # manifests before the files.
278 278 count = 0
279 279 emptycount = 0
280 280 while emptycount < self._grouplistcount:
281 281 empty = True
282 282 count += 1
283 283 while True:
284 284 chunk = getchunk(self)
285 285 if not chunk:
286 286 if empty and count > 2:
287 287 emptycount += 1
288 288 break
289 289 empty = False
290 290 yield chunkheader(len(chunk))
291 291 pos = 0
292 292 while pos < len(chunk):
293 293 next = pos + 2**20
294 294 yield chunk[pos:next]
295 295 pos = next
296 296 yield closechunk()
297 297
298 298 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
299 299 # We know that we'll never have more manifests than we had
300 300 # changesets.
301 301 self.callback = prog(_('manifests'), numchanges)
302 302 # no need to check for empty manifest group here:
303 303 # if the result of the merge of 1 and 2 is the same in 3 and 4,
304 304 # no new manifest will be created and the manifest group will
305 305 # be empty during the pull
306 306 self.manifestheader()
307 307 repo.manifest.addgroup(self, revmap, trp)
308 308 repo.ui.progress(_('manifests'), None)
309 309 self.callback = None
310 310
311 311 def apply(self, repo, srctype, url, emptyok=False,
312 312 targetphase=phases.draft, expectedtotal=None):
313 313 """Add the changegroup returned by source.read() to this repo.
314 314 srctype is a string like 'push', 'pull', or 'unbundle'. url is
315 315 the URL of the repo where this changegroup is coming from.
316 316
317 317 Return an integer summarizing the change to this repo:
318 318 - nothing changed or no source: 0
319 319 - more heads than before: 1+added heads (2..n)
320 320 - fewer heads than before: -1-removed heads (-2..-n)
321 321 - number of heads stays the same: 1
322 322 """
323 323 repo = repo.unfiltered()
324 324 def csmap(x):
325 325 repo.ui.debug("add changeset %s\n" % short(x))
326 326 return len(cl)
327 327
328 328 def revmap(x):
329 329 return cl.rev(x)
330 330
331 331 changesets = files = revisions = 0
332 332
333 333 try:
334 334 with repo.transaction("\n".join([srctype,
335 335 util.hidepassword(url)])) as tr:
336 336 # The transaction could have been created before and already
337 337 # carries source information. In this case we use the top
338 338 # level data. We overwrite the argument because we need to use
339 339 # the top level value (if they exist) in this function.
340 340 srctype = tr.hookargs.setdefault('source', srctype)
341 341 url = tr.hookargs.setdefault('url', url)
342 342 repo.hook('prechangegroup', throw=True, **tr.hookargs)
343 343
344 344 # write changelog data to temp files so concurrent readers
345 345 # will not see an inconsistent view
346 346 cl = repo.changelog
347 347 cl.delayupdate(tr)
348 348 oldheads = cl.heads()
349 349
350 350 trp = weakref.proxy(tr)
351 351 # pull off the changeset group
352 352 repo.ui.status(_("adding changesets\n"))
353 353 clstart = len(cl)
354 354 class prog(object):
355 355 def __init__(self, step, total):
356 356 self._step = step
357 357 self._total = total
358 358 self._count = 1
359 359 def __call__(self):
360 360 repo.ui.progress(self._step, self._count,
361 361 unit=_('chunks'), total=self._total)
362 362 self._count += 1
363 363 self.callback = prog(_('changesets'), expectedtotal)
364 364
365 365 efiles = set()
366 366 def onchangelog(cl, node):
367 367 efiles.update(cl.readfiles(node))
368 368
369 369 self.changelogheader()
370 370 srccontent = cl.addgroup(self, csmap, trp,
371 371 addrevisioncb=onchangelog)
372 372 efiles = len(efiles)
373 373
374 374 if not (srccontent or emptyok):
375 375 raise error.Abort(_("received changelog group is empty"))
376 376 clend = len(cl)
377 377 changesets = clend - clstart
378 378 repo.ui.progress(_('changesets'), None)
379 379
380 380 # pull off the manifest group
381 381 repo.ui.status(_("adding manifests\n"))
382 382 self._unpackmanifests(repo, revmap, trp, prog, changesets)
383 383
384 384 needfiles = {}
385 385 if repo.ui.configbool('server', 'validate', default=False):
386 386 # validate incoming csets have their manifests
387 387 for cset in xrange(clstart, clend):
388 388 mfnode = repo.changelog.read(
389 389 repo.changelog.node(cset))[0]
390 390 mfest = repo.manifest.readdelta(mfnode)
391 391 # store file nodes we must see
392 392 for f, n in mfest.iteritems():
393 393 needfiles.setdefault(f, set()).add(n)
394 394
395 395 # process the files
396 396 repo.ui.status(_("adding file changes\n"))
397 pr = prog(_('files'), efiles)
398 397 newrevs, newfiles = _addchangegroupfiles(
399 repo, self, revmap, trp, pr, needfiles)
398 repo, self, revmap, trp, efiles, needfiles)
400 399 revisions += newrevs
401 400 files += newfiles
402 401
403 402 dh = 0
404 403 if oldheads:
405 404 heads = cl.heads()
406 405 dh = len(heads) - len(oldheads)
407 406 for h in heads:
408 407 if h not in oldheads and repo[h].closesbranch():
409 408 dh -= 1
410 409 htext = ""
411 410 if dh:
412 411 htext = _(" (%+d heads)") % dh
413 412
414 413 repo.ui.status(_("added %d changesets"
415 414 " with %d changes to %d files%s\n")
416 415 % (changesets, revisions, files, htext))
417 416 repo.invalidatevolatilesets()
418 417
419 418 if changesets > 0:
420 419 if 'node' not in tr.hookargs:
421 420 tr.hookargs['node'] = hex(cl.node(clstart))
422 421 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
423 422 hookargs = dict(tr.hookargs)
424 423 else:
425 424 hookargs = dict(tr.hookargs)
426 425 hookargs['node'] = hex(cl.node(clstart))
427 426 hookargs['node_last'] = hex(cl.node(clend - 1))
428 427 repo.hook('pretxnchangegroup', throw=True, **hookargs)
429 428
430 429 added = [cl.node(r) for r in xrange(clstart, clend)]
431 430 publishing = repo.publishing()
432 431 if srctype in ('push', 'serve'):
433 432 # Old servers can not push the boundary themselves.
434 433 # New servers won't push the boundary if changeset already
435 434 # exists locally as secret
436 435 #
437 436 # We should not use added here but the list of all change in
438 437 # the bundle
439 438 if publishing:
440 439 phases.advanceboundary(repo, tr, phases.public,
441 440 srccontent)
442 441 else:
443 442 # Those changesets have been pushed from the
444 443 # outside, their phases are going to be pushed
445 444 # alongside. Therefor `targetphase` is
446 445 # ignored.
447 446 phases.advanceboundary(repo, tr, phases.draft,
448 447 srccontent)
449 448 phases.retractboundary(repo, tr, phases.draft, added)
450 449 elif srctype != 'strip':
451 450 # publishing only alter behavior during push
452 451 #
453 452 # strip should not touch boundary at all
454 453 phases.retractboundary(repo, tr, targetphase, added)
455 454
456 455 if changesets > 0:
457 456 if srctype != 'strip':
458 457 # During strip, branchcache is invalid but
459 458 # coming call to `destroyed` will repair it.
460 459 # In other case we can safely update cache on
461 460 # disk.
462 461 branchmap.updatecache(repo.filtered('served'))
463 462
464 463 def runhooks():
465 464 # These hooks run when the lock releases, not when the
466 465 # transaction closes. So it's possible for the changelog
467 466 # to have changed since we last saw it.
468 467 if clstart >= len(repo):
469 468 return
470 469
471 470 # forcefully update the on-disk branch cache
472 471 repo.ui.debug("updating the branch cache\n")
473 472 repo.hook("changegroup", **hookargs)
474 473
475 474 for n in added:
476 475 args = hookargs.copy()
477 476 args['node'] = hex(n)
478 477 del args['node_last']
479 478 repo.hook("incoming", **args)
480 479
481 480 newheads = [h for h in repo.heads()
482 481 if h not in oldheads]
483 482 repo.ui.log("incoming",
484 483 "%s incoming changes - new heads: %s\n",
485 484 len(added),
486 485 ', '.join([hex(c[:6]) for c in newheads]))
487 486
488 487 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
489 488 lambda tr: repo._afterlock(runhooks))
490 489 finally:
491 490 repo.ui.flush()
492 491 # never return 0 here:
493 492 if dh < 0:
494 493 return dh - 1
495 494 else:
496 495 return dh + 1
497 496
498 497 class cg2unpacker(cg1unpacker):
499 498 """Unpacker for cg2 streams.
500 499
501 500 cg2 streams add support for generaldelta, so the delta header
502 501 format is slightly different. All other features about the data
503 502 remain the same.
504 503 """
505 504 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
506 505 deltaheadersize = struct.calcsize(deltaheader)
507 506 version = '02'
508 507
509 508 def _deltaheader(self, headertuple, prevnode):
510 509 node, p1, p2, deltabase, cs = headertuple
511 510 flags = 0
512 511 return node, p1, p2, deltabase, cs, flags
513 512
514 513 class cg3unpacker(cg2unpacker):
515 514 """Unpacker for cg3 streams.
516 515
517 516 cg3 streams add support for exchanging treemanifests and revlog
518 517 flags. It adds the revlog flags to the delta header and an empty chunk
519 518 separating manifests and files.
520 519 """
521 520 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
522 521 deltaheadersize = struct.calcsize(deltaheader)
523 522 version = '03'
524 523 _grouplistcount = 2 # One list of manifests and one list of files
525 524
526 525 def _deltaheader(self, headertuple, prevnode):
527 526 node, p1, p2, deltabase, cs, flags = headertuple
528 527 return node, p1, p2, deltabase, cs, flags
529 528
530 529 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
531 530 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog,
532 531 numchanges)
533 532 while True:
534 533 chunkdata = self.filelogheader()
535 534 if not chunkdata:
536 535 break
537 536 # If we get here, there are directory manifests in the changegroup
538 537 d = chunkdata["filename"]
539 538 repo.ui.debug("adding %s revisions\n" % d)
540 539 dirlog = repo.manifest.dirlog(d)
541 540 if not dirlog.addgroup(self, revmap, trp):
542 541 raise error.Abort(_("received dir revlog group is empty"))
543 542
544 543 class headerlessfixup(object):
545 544 def __init__(self, fh, h):
546 545 self._h = h
547 546 self._fh = fh
548 547 def read(self, n):
549 548 if self._h:
550 549 d, self._h = self._h[:n], self._h[n:]
551 550 if len(d) < n:
552 551 d += readexactly(self._fh, n - len(d))
553 552 return d
554 553 return readexactly(self._fh, n)
555 554
556 555 class cg1packer(object):
557 556 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
558 557 version = '01'
559 558 def __init__(self, repo, bundlecaps=None):
560 559 """Given a source repo, construct a bundler.
561 560
562 561 bundlecaps is optional and can be used to specify the set of
563 562 capabilities which can be used to build the bundle.
564 563 """
565 564 # Set of capabilities we can use to build the bundle.
566 565 if bundlecaps is None:
567 566 bundlecaps = set()
568 567 self._bundlecaps = bundlecaps
569 568 # experimental config: bundle.reorder
570 569 reorder = repo.ui.config('bundle', 'reorder', 'auto')
571 570 if reorder == 'auto':
572 571 reorder = None
573 572 else:
574 573 reorder = util.parsebool(reorder)
575 574 self._repo = repo
576 575 self._reorder = reorder
577 576 self._progress = repo.ui.progress
578 577 if self._repo.ui.verbose and not self._repo.ui.debugflag:
579 578 self._verbosenote = self._repo.ui.note
580 579 else:
581 580 self._verbosenote = lambda s: None
582 581
583 582 def close(self):
584 583 return closechunk()
585 584
586 585 def fileheader(self, fname):
587 586 return chunkheader(len(fname)) + fname
588 587
589 588 def group(self, nodelist, revlog, lookup, units=None):
590 589 """Calculate a delta group, yielding a sequence of changegroup chunks
591 590 (strings).
592 591
593 592 Given a list of changeset revs, return a set of deltas and
594 593 metadata corresponding to nodes. The first delta is
595 594 first parent(nodelist[0]) -> nodelist[0], the receiver is
596 595 guaranteed to have this parent as it has all history before
597 596 these changesets. In the case firstparent is nullrev the
598 597 changegroup starts with a full revision.
599 598
600 599 If units is not None, progress detail will be generated, units specifies
601 600 the type of revlog that is touched (changelog, manifest, etc.).
602 601 """
603 602 # if we don't have any revisions touched by these changesets, bail
604 603 if len(nodelist) == 0:
605 604 yield self.close()
606 605 return
607 606
608 607 # for generaldelta revlogs, we linearize the revs; this will both be
609 608 # much quicker and generate a much smaller bundle
610 609 if (revlog._generaldelta and self._reorder is None) or self._reorder:
611 610 dag = dagutil.revlogdag(revlog)
612 611 revs = set(revlog.rev(n) for n in nodelist)
613 612 revs = dag.linearize(revs)
614 613 else:
615 614 revs = sorted([revlog.rev(n) for n in nodelist])
616 615
617 616 # add the parent of the first rev
618 617 p = revlog.parentrevs(revs[0])[0]
619 618 revs.insert(0, p)
620 619
621 620 # build deltas
622 621 total = len(revs) - 1
623 622 msgbundling = _('bundling')
624 623 for r in xrange(len(revs) - 1):
625 624 if units is not None:
626 625 self._progress(msgbundling, r + 1, unit=units, total=total)
627 626 prev, curr = revs[r], revs[r + 1]
628 627 linknode = lookup(revlog.node(curr))
629 628 for c in self.revchunk(revlog, curr, prev, linknode):
630 629 yield c
631 630
632 631 if units is not None:
633 632 self._progress(msgbundling, None)
634 633 yield self.close()
635 634
636 635 # filter any nodes that claim to be part of the known set
637 636 def prune(self, revlog, missing, commonrevs):
638 637 rr, rl = revlog.rev, revlog.linkrev
639 638 return [n for n in missing if rl(rr(n)) not in commonrevs]
640 639
641 640 def _packmanifests(self, dir, mfnodes, lookuplinknode):
642 641 """Pack flat manifests into a changegroup stream."""
643 642 assert not dir
644 643 for chunk in self.group(mfnodes, self._repo.manifest,
645 644 lookuplinknode, units=_('manifests')):
646 645 yield chunk
647 646
648 647 def _manifestsdone(self):
649 648 return ''
650 649
651 650 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
652 651 '''yield a sequence of changegroup chunks (strings)'''
653 652 repo = self._repo
654 653 cl = repo.changelog
655 654
656 655 clrevorder = {}
657 656 mfs = {} # needed manifests
658 657 fnodes = {} # needed file nodes
659 658 changedfiles = set()
660 659
661 660 # Callback for the changelog, used to collect changed files and manifest
662 661 # nodes.
663 662 # Returns the linkrev node (identity in the changelog case).
664 663 def lookupcl(x):
665 664 c = cl.read(x)
666 665 clrevorder[x] = len(clrevorder)
667 666 n = c[0]
668 667 # record the first changeset introducing this manifest version
669 668 mfs.setdefault(n, x)
670 669 # Record a complete list of potentially-changed files in
671 670 # this manifest.
672 671 changedfiles.update(c[3])
673 672 return x
674 673
675 674 self._verbosenote(_('uncompressed size of bundle content:\n'))
676 675 size = 0
677 676 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
678 677 size += len(chunk)
679 678 yield chunk
680 679 self._verbosenote(_('%8.i (changelog)\n') % size)
681 680
682 681 # We need to make sure that the linkrev in the changegroup refers to
683 682 # the first changeset that introduced the manifest or file revision.
684 683 # The fastpath is usually safer than the slowpath, because the filelogs
685 684 # are walked in revlog order.
686 685 #
687 686 # When taking the slowpath with reorder=None and the manifest revlog
688 687 # uses generaldelta, the manifest may be walked in the "wrong" order.
689 688 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
690 689 # cc0ff93d0c0c).
691 690 #
692 691 # When taking the fastpath, we are only vulnerable to reordering
693 692 # of the changelog itself. The changelog never uses generaldelta, so
694 693 # it is only reordered when reorder=True. To handle this case, we
695 694 # simply take the slowpath, which already has the 'clrevorder' logic.
696 695 # This was also fixed in cc0ff93d0c0c.
697 696 fastpathlinkrev = fastpathlinkrev and not self._reorder
698 697 # Treemanifests don't work correctly with fastpathlinkrev
699 698 # either, because we don't discover which directory nodes to
700 699 # send along with files. This could probably be fixed.
701 700 fastpathlinkrev = fastpathlinkrev and (
702 701 'treemanifest' not in repo.requirements)
703 702
704 703 for chunk in self.generatemanifests(commonrevs, clrevorder,
705 704 fastpathlinkrev, mfs, fnodes):
706 705 yield chunk
707 706 mfs.clear()
708 707 clrevs = set(cl.rev(x) for x in clnodes)
709 708
710 709 if not fastpathlinkrev:
711 710 def linknodes(unused, fname):
712 711 return fnodes.get(fname, {})
713 712 else:
714 713 cln = cl.node
715 714 def linknodes(filerevlog, fname):
716 715 llr = filerevlog.linkrev
717 716 fln = filerevlog.node
718 717 revs = ((r, llr(r)) for r in filerevlog)
719 718 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
720 719
721 720 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
722 721 source):
723 722 yield chunk
724 723
725 724 yield self.close()
726 725
727 726 if clnodes:
728 727 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
729 728
730 729 def generatemanifests(self, commonrevs, clrevorder, fastpathlinkrev, mfs,
731 730 fnodes):
732 731 repo = self._repo
733 732 dirlog = repo.manifest.dirlog
734 733 tmfnodes = {'': mfs}
735 734
736 735 # Callback for the manifest, used to collect linkrevs for filelog
737 736 # revisions.
738 737 # Returns the linkrev node (collected in lookupcl).
739 738 def makelookupmflinknode(dir):
740 739 if fastpathlinkrev:
741 740 assert not dir
742 741 return mfs.__getitem__
743 742
744 743 def lookupmflinknode(x):
745 744 """Callback for looking up the linknode for manifests.
746 745
747 746 Returns the linkrev node for the specified manifest.
748 747
749 748 SIDE EFFECT:
750 749
751 750 1) fclnodes gets populated with the list of relevant
752 751 file nodes if we're not using fastpathlinkrev
753 752 2) When treemanifests are in use, collects treemanifest nodes
754 753 to send
755 754
756 755 Note that this means manifests must be completely sent to
757 756 the client before you can trust the list of files and
758 757 treemanifests to send.
759 758 """
760 759 clnode = tmfnodes[dir][x]
761 760 mdata = dirlog(dir).readshallowfast(x)
762 761 for p, n, fl in mdata.iterentries():
763 762 if fl == 't': # subdirectory manifest
764 763 subdir = dir + p + '/'
765 764 tmfclnodes = tmfnodes.setdefault(subdir, {})
766 765 tmfclnode = tmfclnodes.setdefault(n, clnode)
767 766 if clrevorder[clnode] < clrevorder[tmfclnode]:
768 767 tmfclnodes[n] = clnode
769 768 else:
770 769 f = dir + p
771 770 fclnodes = fnodes.setdefault(f, {})
772 771 fclnode = fclnodes.setdefault(n, clnode)
773 772 if clrevorder[clnode] < clrevorder[fclnode]:
774 773 fclnodes[n] = clnode
775 774 return clnode
776 775 return lookupmflinknode
777 776
778 777 size = 0
779 778 while tmfnodes:
780 779 dir = min(tmfnodes)
781 780 nodes = tmfnodes[dir]
782 781 prunednodes = self.prune(dirlog(dir), nodes, commonrevs)
783 782 for x in self._packmanifests(dir, prunednodes,
784 783 makelookupmflinknode(dir)):
785 784 size += len(x)
786 785 yield x
787 786 del tmfnodes[dir]
788 787 self._verbosenote(_('%8.i (manifests)\n') % size)
789 788 yield self._manifestsdone()
790 789
791 790 # The 'source' parameter is useful for extensions
792 791 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
793 792 repo = self._repo
794 793 progress = self._progress
795 794 msgbundling = _('bundling')
796 795
797 796 total = len(changedfiles)
798 797 # for progress output
799 798 msgfiles = _('files')
800 799 for i, fname in enumerate(sorted(changedfiles)):
801 800 filerevlog = repo.file(fname)
802 801 if not filerevlog:
803 802 raise error.Abort(_("empty or missing revlog for %s") % fname)
804 803
805 804 linkrevnodes = linknodes(filerevlog, fname)
806 805 # Lookup for filenodes, we collected the linkrev nodes above in the
807 806 # fastpath case and with lookupmf in the slowpath case.
808 807 def lookupfilelog(x):
809 808 return linkrevnodes[x]
810 809
811 810 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
812 811 if filenodes:
813 812 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
814 813 total=total)
815 814 h = self.fileheader(fname)
816 815 size = len(h)
817 816 yield h
818 817 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
819 818 size += len(chunk)
820 819 yield chunk
821 820 self._verbosenote(_('%8.i %s\n') % (size, fname))
822 821 progress(msgbundling, None)
823 822
824 823 def deltaparent(self, revlog, rev, p1, p2, prev):
825 824 return prev
826 825
827 826 def revchunk(self, revlog, rev, prev, linknode):
828 827 node = revlog.node(rev)
829 828 p1, p2 = revlog.parentrevs(rev)
830 829 base = self.deltaparent(revlog, rev, p1, p2, prev)
831 830
832 831 prefix = ''
833 832 if revlog.iscensored(base) or revlog.iscensored(rev):
834 833 try:
835 834 delta = revlog.revision(node)
836 835 except error.CensoredNodeError as e:
837 836 delta = e.tombstone
838 837 if base == nullrev:
839 838 prefix = mdiff.trivialdiffheader(len(delta))
840 839 else:
841 840 baselen = revlog.rawsize(base)
842 841 prefix = mdiff.replacediffheader(baselen, len(delta))
843 842 elif base == nullrev:
844 843 delta = revlog.revision(node)
845 844 prefix = mdiff.trivialdiffheader(len(delta))
846 845 else:
847 846 delta = revlog.revdiff(base, rev)
848 847 p1n, p2n = revlog.parents(node)
849 848 basenode = revlog.node(base)
850 849 flags = revlog.flags(rev)
851 850 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
852 851 meta += prefix
853 852 l = len(meta) + len(delta)
854 853 yield chunkheader(l)
855 854 yield meta
856 855 yield delta
857 856 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
858 857 # do nothing with basenode, it is implicitly the previous one in HG10
859 858 # do nothing with flags, it is implicitly 0 for cg1 and cg2
860 859 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
861 860
862 861 class cg2packer(cg1packer):
863 862 version = '02'
864 863 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
865 864
866 865 def __init__(self, repo, bundlecaps=None):
867 866 super(cg2packer, self).__init__(repo, bundlecaps)
868 867 if self._reorder is None:
869 868 # Since generaldelta is directly supported by cg2, reordering
870 869 # generally doesn't help, so we disable it by default (treating
871 870 # bundle.reorder=auto just like bundle.reorder=False).
872 871 self._reorder = False
873 872
874 873 def deltaparent(self, revlog, rev, p1, p2, prev):
875 874 dp = revlog.deltaparent(rev)
876 875 # avoid storing full revisions; pick prev in those cases
877 876 # also pick prev when we can't be sure remote has dp
878 877 if dp == nullrev or (dp != p1 and dp != p2 and dp != prev):
879 878 return prev
880 879 return dp
881 880
882 881 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
883 882 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
884 883 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
885 884
886 885 class cg3packer(cg2packer):
887 886 version = '03'
888 887 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
889 888
890 889 def _packmanifests(self, dir, mfnodes, lookuplinknode):
891 890 if dir:
892 891 yield self.fileheader(dir)
893 892 for chunk in self.group(mfnodes, self._repo.manifest.dirlog(dir),
894 893 lookuplinknode, units=_('manifests')):
895 894 yield chunk
896 895
897 896 def _manifestsdone(self):
898 897 return self.close()
899 898
900 899 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
901 900 return struct.pack(
902 901 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
903 902
904 903 _packermap = {'01': (cg1packer, cg1unpacker),
905 904 # cg2 adds support for exchanging generaldelta
906 905 '02': (cg2packer, cg2unpacker),
907 906 # cg3 adds support for exchanging revlog flags and treemanifests
908 907 '03': (cg3packer, cg3unpacker),
909 908 }
910 909
911 910 def allsupportedversions(ui):
912 911 versions = set(_packermap.keys())
913 912 versions.discard('03')
914 913 if (ui.configbool('experimental', 'changegroup3') or
915 914 ui.configbool('experimental', 'treemanifest')):
916 915 versions.add('03')
917 916 return versions
918 917
919 918 # Changegroup versions that can be applied to the repo
920 919 def supportedincomingversions(repo):
921 920 versions = allsupportedversions(repo.ui)
922 921 if 'treemanifest' in repo.requirements:
923 922 versions.add('03')
924 923 return versions
925 924
926 925 # Changegroup versions that can be created from the repo
927 926 def supportedoutgoingversions(repo):
928 927 versions = allsupportedversions(repo.ui)
929 928 if 'treemanifest' in repo.requirements:
930 929 # Versions 01 and 02 support only flat manifests and it's just too
931 930 # expensive to convert between the flat manifest and tree manifest on
932 931 # the fly. Since tree manifests are hashed differently, all of history
933 932 # would have to be converted. Instead, we simply don't even pretend to
934 933 # support versions 01 and 02.
935 934 versions.discard('01')
936 935 versions.discard('02')
937 936 versions.add('03')
938 937 return versions
939 938
940 939 def safeversion(repo):
941 940 # Finds the smallest version that it's safe to assume clients of the repo
942 941 # will support. For example, all hg versions that support generaldelta also
943 942 # support changegroup 02.
944 943 versions = supportedoutgoingversions(repo)
945 944 if 'generaldelta' in repo.requirements:
946 945 versions.discard('01')
947 946 assert versions
948 947 return min(versions)
949 948
950 949 def getbundler(version, repo, bundlecaps=None):
951 950 assert version in supportedoutgoingversions(repo)
952 951 return _packermap[version][0](repo, bundlecaps)
953 952
954 953 def getunbundler(version, fh, alg):
955 954 return _packermap[version][1](fh, alg)
956 955
957 956 def _changegroupinfo(repo, nodes, source):
958 957 if repo.ui.verbose or source == 'bundle':
959 958 repo.ui.status(_("%d changesets found\n") % len(nodes))
960 959 if repo.ui.debugflag:
961 960 repo.ui.debug("list of changesets:\n")
962 961 for node in nodes:
963 962 repo.ui.debug("%s\n" % hex(node))
964 963
965 964 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
966 965 repo = repo.unfiltered()
967 966 commonrevs = outgoing.common
968 967 csets = outgoing.missing
969 968 heads = outgoing.missingheads
970 969 # We go through the fast path if we get told to, or if all (unfiltered
971 970 # heads have been requested (since we then know there all linkrevs will
972 971 # be pulled by the client).
973 972 heads.sort()
974 973 fastpathlinkrev = fastpath or (
975 974 repo.filtername is None and heads == sorted(repo.heads()))
976 975
977 976 repo.hook('preoutgoing', throw=True, source=source)
978 977 _changegroupinfo(repo, csets, source)
979 978 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
980 979
981 980 def getsubset(repo, outgoing, bundler, source, fastpath=False):
982 981 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
983 982 return getunbundler(bundler.version, util.chunkbuffer(gengroup), None)
984 983
985 984 def changegroupsubset(repo, roots, heads, source, version='01'):
986 985 """Compute a changegroup consisting of all the nodes that are
987 986 descendants of any of the roots and ancestors of any of the heads.
988 987 Return a chunkbuffer object whose read() method will return
989 988 successive changegroup chunks.
990 989
991 990 It is fairly complex as determining which filenodes and which
992 991 manifest nodes need to be included for the changeset to be complete
993 992 is non-trivial.
994 993
995 994 Another wrinkle is doing the reverse, figuring out which changeset in
996 995 the changegroup a particular filenode or manifestnode belongs to.
997 996 """
998 997 cl = repo.changelog
999 998 if not roots:
1000 999 roots = [nullid]
1001 1000 discbases = []
1002 1001 for n in roots:
1003 1002 discbases.extend([p for p in cl.parents(n) if p != nullid])
1004 1003 # TODO: remove call to nodesbetween.
1005 1004 csets, roots, heads = cl.nodesbetween(roots, heads)
1006 1005 included = set(csets)
1007 1006 discbases = [n for n in discbases if n not in included]
1008 1007 outgoing = discovery.outgoing(cl, discbases, heads)
1009 1008 bundler = getbundler(version, repo)
1010 1009 return getsubset(repo, outgoing, bundler, source)
1011 1010
1012 1011 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
1013 1012 version='01'):
1014 1013 """Like getbundle, but taking a discovery.outgoing as an argument.
1015 1014
1016 1015 This is only implemented for local repos and reuses potentially
1017 1016 precomputed sets in outgoing. Returns a raw changegroup generator."""
1018 1017 if not outgoing.missing:
1019 1018 return None
1020 1019 bundler = getbundler(version, repo, bundlecaps)
1021 1020 return getsubsetraw(repo, outgoing, bundler, source)
1022 1021
1023 1022 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None,
1024 1023 version='01'):
1025 1024 """Like getbundle, but taking a discovery.outgoing as an argument.
1026 1025
1027 1026 This is only implemented for local repos and reuses potentially
1028 1027 precomputed sets in outgoing."""
1029 1028 if not outgoing.missing:
1030 1029 return None
1031 1030 bundler = getbundler(version, repo, bundlecaps)
1032 1031 return getsubset(repo, outgoing, bundler, source)
1033 1032
1034 1033 def computeoutgoing(repo, heads, common):
1035 1034 """Computes which revs are outgoing given a set of common
1036 1035 and a set of heads.
1037 1036
1038 1037 This is a separate function so extensions can have access to
1039 1038 the logic.
1040 1039
1041 1040 Returns a discovery.outgoing object.
1042 1041 """
1043 1042 cl = repo.changelog
1044 1043 if common:
1045 1044 hasnode = cl.hasnode
1046 1045 common = [n for n in common if hasnode(n)]
1047 1046 else:
1048 1047 common = [nullid]
1049 1048 if not heads:
1050 1049 heads = cl.heads()
1051 1050 return discovery.outgoing(cl, common, heads)
1052 1051
1053 1052 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None,
1054 1053 version='01'):
1055 1054 """Like changegroupsubset, but returns the set difference between the
1056 1055 ancestors of heads and the ancestors common.
1057 1056
1058 1057 If heads is None, use the local heads. If common is None, use [nullid].
1059 1058
1060 1059 The nodes in common might not all be known locally due to the way the
1061 1060 current discovery protocol works.
1062 1061 """
1063 1062 outgoing = computeoutgoing(repo, heads, common)
1064 1063 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps,
1065 1064 version=version)
1066 1065
1067 1066 def changegroup(repo, basenodes, source):
1068 1067 # to avoid a race we use changegroupsubset() (issue1320)
1069 1068 return changegroupsubset(repo, basenodes, repo.heads(), source)
1070 1069
1071 def _addchangegroupfiles(repo, source, revmap, trp, pr, needfiles):
1070 def _addchangegroupfiles(repo, source, revmap, trp, expectedfiles, needfiles):
1072 1071 revisions = 0
1073 1072 files = 0
1074 1073 while True:
1075 1074 chunkdata = source.filelogheader()
1076 1075 if not chunkdata:
1077 1076 break
1077 files += 1
1078 1078 f = chunkdata["filename"]
1079 1079 repo.ui.debug("adding %s revisions\n" % f)
1080 pr()
1080 repo.ui.progress(_('files'), files, unit=_('files'),
1081 total=expectedfiles)
1081 1082 fl = repo.file(f)
1082 1083 o = len(fl)
1083 1084 try:
1084 1085 if not fl.addgroup(source, revmap, trp):
1085 1086 raise error.Abort(_("received file revlog group is empty"))
1086 1087 except error.CensoredBaseError as e:
1087 1088 raise error.Abort(_("received delta base is censored: %s") % e)
1088 1089 revisions += len(fl) - o
1089 files += 1
1090 1090 if f in needfiles:
1091 1091 needs = needfiles[f]
1092 1092 for new in xrange(o, len(fl)):
1093 1093 n = fl.node(new)
1094 1094 if n in needs:
1095 1095 needs.remove(n)
1096 1096 else:
1097 1097 raise error.Abort(
1098 1098 _("received spurious file revlog entry"))
1099 1099 if not needs:
1100 1100 del needfiles[f]
1101 1101 repo.ui.progress(_('files'), None)
1102 1102
1103 1103 for f, needs in needfiles.iteritems():
1104 1104 fl = repo.file(f)
1105 1105 for n in needs:
1106 1106 try:
1107 1107 fl.rev(n)
1108 1108 except error.LookupError:
1109 1109 raise error.Abort(
1110 1110 _('missing file data for %s:%s - run hg verify') %
1111 1111 (f, hex(n)))
1112 1112
1113 1113 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now