##// END OF EJS Templates
cg1packer: fix `compressed` method...
Stanislau Hlebik -
r30589:182cacaa default
parent child Browse files
Show More
@@ -1,1048 +1,1048 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullrev,
19 19 short,
20 20 )
21 21
22 22 from . import (
23 23 branchmap,
24 24 dagutil,
25 25 discovery,
26 26 error,
27 27 mdiff,
28 28 phases,
29 29 util,
30 30 )
31 31
32 32 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
33 33 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
34 34 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
35 35
36 36 def readexactly(stream, n):
37 37 '''read n bytes from stream.read and abort if less was available'''
38 38 s = stream.read(n)
39 39 if len(s) < n:
40 40 raise error.Abort(_("stream ended unexpectedly"
41 41 " (got %d bytes, expected %d)")
42 42 % (len(s), n))
43 43 return s
44 44
45 45 def getchunk(stream):
46 46 """return the next chunk from stream as a string"""
47 47 d = readexactly(stream, 4)
48 48 l = struct.unpack(">l", d)[0]
49 49 if l <= 4:
50 50 if l:
51 51 raise error.Abort(_("invalid chunk length %d") % l)
52 52 return ""
53 53 return readexactly(stream, l - 4)
54 54
55 55 def chunkheader(length):
56 56 """return a changegroup chunk header (string)"""
57 57 return struct.pack(">l", length + 4)
58 58
59 59 def closechunk():
60 60 """return a changegroup chunk header (string) for a zero-length chunk"""
61 61 return struct.pack(">l", 0)
62 62
63 63 def combineresults(results):
64 64 """logic to combine 0 or more addchangegroup results into one"""
65 65 changedheads = 0
66 66 result = 1
67 67 for ret in results:
68 68 # If any changegroup result is 0, return 0
69 69 if ret == 0:
70 70 result = 0
71 71 break
72 72 if ret < -1:
73 73 changedheads += ret + 1
74 74 elif ret > 1:
75 75 changedheads += ret - 1
76 76 if changedheads > 0:
77 77 result = 1 + changedheads
78 78 elif changedheads < 0:
79 79 result = -1 + changedheads
80 80 return result
81 81
82 82 def writechunks(ui, chunks, filename, vfs=None):
83 83 """Write chunks to a file and return its filename.
84 84
85 85 The stream is assumed to be a bundle file.
86 86 Existing files will not be overwritten.
87 87 If no filename is specified, a temporary file is created.
88 88 """
89 89 fh = None
90 90 cleanup = None
91 91 try:
92 92 if filename:
93 93 if vfs:
94 94 fh = vfs.open(filename, "wb")
95 95 else:
96 96 # Increase default buffer size because default is usually
97 97 # small (4k is common on Linux).
98 98 fh = open(filename, "wb", 131072)
99 99 else:
100 100 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
101 101 fh = os.fdopen(fd, "wb")
102 102 cleanup = filename
103 103 for c in chunks:
104 104 fh.write(c)
105 105 cleanup = None
106 106 return filename
107 107 finally:
108 108 if fh is not None:
109 109 fh.close()
110 110 if cleanup is not None:
111 111 if filename and vfs:
112 112 vfs.unlink(cleanup)
113 113 else:
114 114 os.unlink(cleanup)
115 115
116 116 class cg1unpacker(object):
117 117 """Unpacker for cg1 changegroup streams.
118 118
119 119 A changegroup unpacker handles the framing of the revision data in
120 120 the wire format. Most consumers will want to use the apply()
121 121 method to add the changes from the changegroup to a repository.
122 122
123 123 If you're forwarding a changegroup unmodified to another consumer,
124 124 use getchunks(), which returns an iterator of changegroup
125 125 chunks. This is mostly useful for cases where you need to know the
126 126 data stream has ended by observing the end of the changegroup.
127 127
128 128 deltachunk() is useful only if you're applying delta data. Most
129 129 consumers should prefer apply() instead.
130 130
131 131 A few other public methods exist. Those are used only for
132 132 bundlerepo and some debug commands - their use is discouraged.
133 133 """
134 134 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
135 135 deltaheadersize = struct.calcsize(deltaheader)
136 136 version = '01'
137 137 _grouplistcount = 1 # One list of files after the manifests
138 138
139 139 def __init__(self, fh, alg, extras=None):
140 140 if alg is None:
141 141 alg = 'UN'
142 142 if alg not in util.compengines.supportedbundletypes:
143 143 raise error.Abort(_('unknown stream compression type: %s')
144 144 % alg)
145 145 if alg == 'BZ':
146 146 alg = '_truncatedBZ'
147 147
148 148 compengine = util.compengines.forbundletype(alg)
149 149 self._stream = compengine.decompressorreader(fh)
150 150 self._type = alg
151 151 self.extras = extras or {}
152 152 self.callback = None
153 153
154 154 # These methods (compressed, read, seek, tell) all appear to only
155 155 # be used by bundlerepo, but it's a little hard to tell.
156 156 def compressed(self):
157 return self._type is not None
157 return self._type is not None and self._type != 'UN'
158 158 def read(self, l):
159 159 return self._stream.read(l)
160 160 def seek(self, pos):
161 161 return self._stream.seek(pos)
162 162 def tell(self):
163 163 return self._stream.tell()
164 164 def close(self):
165 165 return self._stream.close()
166 166
167 167 def _chunklength(self):
168 168 d = readexactly(self._stream, 4)
169 169 l = struct.unpack(">l", d)[0]
170 170 if l <= 4:
171 171 if l:
172 172 raise error.Abort(_("invalid chunk length %d") % l)
173 173 return 0
174 174 if self.callback:
175 175 self.callback()
176 176 return l - 4
177 177
178 178 def changelogheader(self):
179 179 """v10 does not have a changelog header chunk"""
180 180 return {}
181 181
182 182 def manifestheader(self):
183 183 """v10 does not have a manifest header chunk"""
184 184 return {}
185 185
186 186 def filelogheader(self):
187 187 """return the header of the filelogs chunk, v10 only has the filename"""
188 188 l = self._chunklength()
189 189 if not l:
190 190 return {}
191 191 fname = readexactly(self._stream, l)
192 192 return {'filename': fname}
193 193
194 194 def _deltaheader(self, headertuple, prevnode):
195 195 node, p1, p2, cs = headertuple
196 196 if prevnode is None:
197 197 deltabase = p1
198 198 else:
199 199 deltabase = prevnode
200 200 flags = 0
201 201 return node, p1, p2, deltabase, cs, flags
202 202
203 203 def deltachunk(self, prevnode):
204 204 l = self._chunklength()
205 205 if not l:
206 206 return {}
207 207 headerdata = readexactly(self._stream, self.deltaheadersize)
208 208 header = struct.unpack(self.deltaheader, headerdata)
209 209 delta = readexactly(self._stream, l - self.deltaheadersize)
210 210 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
211 211 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
212 212 'deltabase': deltabase, 'delta': delta, 'flags': flags}
213 213
214 214 def getchunks(self):
215 215 """returns all the chunks contains in the bundle
216 216
217 217 Used when you need to forward the binary stream to a file or another
218 218 network API. To do so, it parse the changegroup data, otherwise it will
219 219 block in case of sshrepo because it don't know the end of the stream.
220 220 """
221 221 # an empty chunkgroup is the end of the changegroup
222 222 # a changegroup has at least 2 chunkgroups (changelog and manifest).
223 223 # after that, changegroup versions 1 and 2 have a series of groups
224 224 # with one group per file. changegroup 3 has a series of directory
225 225 # manifests before the files.
226 226 count = 0
227 227 emptycount = 0
228 228 while emptycount < self._grouplistcount:
229 229 empty = True
230 230 count += 1
231 231 while True:
232 232 chunk = getchunk(self)
233 233 if not chunk:
234 234 if empty and count > 2:
235 235 emptycount += 1
236 236 break
237 237 empty = False
238 238 yield chunkheader(len(chunk))
239 239 pos = 0
240 240 while pos < len(chunk):
241 241 next = pos + 2**20
242 242 yield chunk[pos:next]
243 243 pos = next
244 244 yield closechunk()
245 245
246 246 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
247 247 # We know that we'll never have more manifests than we had
248 248 # changesets.
249 249 self.callback = prog(_('manifests'), numchanges)
250 250 # no need to check for empty manifest group here:
251 251 # if the result of the merge of 1 and 2 is the same in 3 and 4,
252 252 # no new manifest will be created and the manifest group will
253 253 # be empty during the pull
254 254 self.manifestheader()
255 255 repo.manifestlog._revlog.addgroup(self, revmap, trp)
256 256 repo.ui.progress(_('manifests'), None)
257 257 self.callback = None
258 258
259 259 def apply(self, repo, srctype, url, emptyok=False,
260 260 targetphase=phases.draft, expectedtotal=None):
261 261 """Add the changegroup returned by source.read() to this repo.
262 262 srctype is a string like 'push', 'pull', or 'unbundle'. url is
263 263 the URL of the repo where this changegroup is coming from.
264 264
265 265 Return an integer summarizing the change to this repo:
266 266 - nothing changed or no source: 0
267 267 - more heads than before: 1+added heads (2..n)
268 268 - fewer heads than before: -1-removed heads (-2..-n)
269 269 - number of heads stays the same: 1
270 270 """
271 271 repo = repo.unfiltered()
272 272 def csmap(x):
273 273 repo.ui.debug("add changeset %s\n" % short(x))
274 274 return len(cl)
275 275
276 276 def revmap(x):
277 277 return cl.rev(x)
278 278
279 279 changesets = files = revisions = 0
280 280
281 281 try:
282 282 with repo.transaction("\n".join([srctype,
283 283 util.hidepassword(url)])) as tr:
284 284 # The transaction could have been created before and already
285 285 # carries source information. In this case we use the top
286 286 # level data. We overwrite the argument because we need to use
287 287 # the top level value (if they exist) in this function.
288 288 srctype = tr.hookargs.setdefault('source', srctype)
289 289 url = tr.hookargs.setdefault('url', url)
290 290 repo.hook('prechangegroup', throw=True, **tr.hookargs)
291 291
292 292 # write changelog data to temp files so concurrent readers
293 293 # will not see an inconsistent view
294 294 cl = repo.changelog
295 295 cl.delayupdate(tr)
296 296 oldheads = cl.heads()
297 297
298 298 trp = weakref.proxy(tr)
299 299 # pull off the changeset group
300 300 repo.ui.status(_("adding changesets\n"))
301 301 clstart = len(cl)
302 302 class prog(object):
303 303 def __init__(self, step, total):
304 304 self._step = step
305 305 self._total = total
306 306 self._count = 1
307 307 def __call__(self):
308 308 repo.ui.progress(self._step, self._count,
309 309 unit=_('chunks'), total=self._total)
310 310 self._count += 1
311 311 self.callback = prog(_('changesets'), expectedtotal)
312 312
313 313 efiles = set()
314 314 def onchangelog(cl, node):
315 315 efiles.update(cl.readfiles(node))
316 316
317 317 self.changelogheader()
318 318 srccontent = cl.addgroup(self, csmap, trp,
319 319 addrevisioncb=onchangelog)
320 320 efiles = len(efiles)
321 321
322 322 if not (srccontent or emptyok):
323 323 raise error.Abort(_("received changelog group is empty"))
324 324 clend = len(cl)
325 325 changesets = clend - clstart
326 326 repo.ui.progress(_('changesets'), None)
327 327 self.callback = None
328 328
329 329 # pull off the manifest group
330 330 repo.ui.status(_("adding manifests\n"))
331 331 self._unpackmanifests(repo, revmap, trp, prog, changesets)
332 332
333 333 needfiles = {}
334 334 if repo.ui.configbool('server', 'validate', default=False):
335 335 cl = repo.changelog
336 336 ml = repo.manifestlog
337 337 # validate incoming csets have their manifests
338 338 for cset in xrange(clstart, clend):
339 339 mfnode = cl.changelogrevision(cset).manifest
340 340 mfest = ml[mfnode].readdelta()
341 341 # store file nodes we must see
342 342 for f, n in mfest.iteritems():
343 343 needfiles.setdefault(f, set()).add(n)
344 344
345 345 # process the files
346 346 repo.ui.status(_("adding file changes\n"))
347 347 newrevs, newfiles = _addchangegroupfiles(
348 348 repo, self, revmap, trp, efiles, needfiles)
349 349 revisions += newrevs
350 350 files += newfiles
351 351
352 352 dh = 0
353 353 if oldheads:
354 354 heads = cl.heads()
355 355 dh = len(heads) - len(oldheads)
356 356 for h in heads:
357 357 if h not in oldheads and repo[h].closesbranch():
358 358 dh -= 1
359 359 htext = ""
360 360 if dh:
361 361 htext = _(" (%+d heads)") % dh
362 362
363 363 repo.ui.status(_("added %d changesets"
364 364 " with %d changes to %d files%s\n")
365 365 % (changesets, revisions, files, htext))
366 366 repo.invalidatevolatilesets()
367 367
368 368 if changesets > 0:
369 369 if 'node' not in tr.hookargs:
370 370 tr.hookargs['node'] = hex(cl.node(clstart))
371 371 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
372 372 hookargs = dict(tr.hookargs)
373 373 else:
374 374 hookargs = dict(tr.hookargs)
375 375 hookargs['node'] = hex(cl.node(clstart))
376 376 hookargs['node_last'] = hex(cl.node(clend - 1))
377 377 repo.hook('pretxnchangegroup', throw=True, **hookargs)
378 378
379 379 added = [cl.node(r) for r in xrange(clstart, clend)]
380 380 publishing = repo.publishing()
381 381 if srctype in ('push', 'serve'):
382 382 # Old servers can not push the boundary themselves.
383 383 # New servers won't push the boundary if changeset already
384 384 # exists locally as secret
385 385 #
386 386 # We should not use added here but the list of all change in
387 387 # the bundle
388 388 if publishing:
389 389 phases.advanceboundary(repo, tr, phases.public,
390 390 srccontent)
391 391 else:
392 392 # Those changesets have been pushed from the
393 393 # outside, their phases are going to be pushed
394 394 # alongside. Therefor `targetphase` is
395 395 # ignored.
396 396 phases.advanceboundary(repo, tr, phases.draft,
397 397 srccontent)
398 398 phases.retractboundary(repo, tr, phases.draft, added)
399 399 elif srctype != 'strip':
400 400 # publishing only alter behavior during push
401 401 #
402 402 # strip should not touch boundary at all
403 403 phases.retractboundary(repo, tr, targetphase, added)
404 404
405 405 if changesets > 0:
406 406 if srctype != 'strip':
407 407 # During strip, branchcache is invalid but
408 408 # coming call to `destroyed` will repair it.
409 409 # In other case we can safely update cache on
410 410 # disk.
411 411 repo.ui.debug('updating the branch cache\n')
412 412 branchmap.updatecache(repo.filtered('served'))
413 413
414 414 def runhooks():
415 415 # These hooks run when the lock releases, not when the
416 416 # transaction closes. So it's possible for the changelog
417 417 # to have changed since we last saw it.
418 418 if clstart >= len(repo):
419 419 return
420 420
421 421 repo.hook("changegroup", **hookargs)
422 422
423 423 for n in added:
424 424 args = hookargs.copy()
425 425 args['node'] = hex(n)
426 426 del args['node_last']
427 427 repo.hook("incoming", **args)
428 428
429 429 newheads = [h for h in repo.heads()
430 430 if h not in oldheads]
431 431 repo.ui.log("incoming",
432 432 "%s incoming changes - new heads: %s\n",
433 433 len(added),
434 434 ', '.join([hex(c[:6]) for c in newheads]))
435 435
436 436 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
437 437 lambda tr: repo._afterlock(runhooks))
438 438 finally:
439 439 repo.ui.flush()
440 440 # never return 0 here:
441 441 if dh < 0:
442 442 return dh - 1
443 443 else:
444 444 return dh + 1
445 445
446 446 class cg2unpacker(cg1unpacker):
447 447 """Unpacker for cg2 streams.
448 448
449 449 cg2 streams add support for generaldelta, so the delta header
450 450 format is slightly different. All other features about the data
451 451 remain the same.
452 452 """
453 453 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
454 454 deltaheadersize = struct.calcsize(deltaheader)
455 455 version = '02'
456 456
457 457 def _deltaheader(self, headertuple, prevnode):
458 458 node, p1, p2, deltabase, cs = headertuple
459 459 flags = 0
460 460 return node, p1, p2, deltabase, cs, flags
461 461
462 462 class cg3unpacker(cg2unpacker):
463 463 """Unpacker for cg3 streams.
464 464
465 465 cg3 streams add support for exchanging treemanifests and revlog
466 466 flags. It adds the revlog flags to the delta header and an empty chunk
467 467 separating manifests and files.
468 468 """
469 469 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
470 470 deltaheadersize = struct.calcsize(deltaheader)
471 471 version = '03'
472 472 _grouplistcount = 2 # One list of manifests and one list of files
473 473
474 474 def _deltaheader(self, headertuple, prevnode):
475 475 node, p1, p2, deltabase, cs, flags = headertuple
476 476 return node, p1, p2, deltabase, cs, flags
477 477
478 478 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
479 479 super(cg3unpacker, self)._unpackmanifests(repo, revmap, trp, prog,
480 480 numchanges)
481 481 for chunkdata in iter(self.filelogheader, {}):
482 482 # If we get here, there are directory manifests in the changegroup
483 483 d = chunkdata["filename"]
484 484 repo.ui.debug("adding %s revisions\n" % d)
485 485 dirlog = repo.manifestlog._revlog.dirlog(d)
486 486 if not dirlog.addgroup(self, revmap, trp):
487 487 raise error.Abort(_("received dir revlog group is empty"))
488 488
489 489 class headerlessfixup(object):
490 490 def __init__(self, fh, h):
491 491 self._h = h
492 492 self._fh = fh
493 493 def read(self, n):
494 494 if self._h:
495 495 d, self._h = self._h[:n], self._h[n:]
496 496 if len(d) < n:
497 497 d += readexactly(self._fh, n - len(d))
498 498 return d
499 499 return readexactly(self._fh, n)
500 500
501 501 class cg1packer(object):
502 502 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
503 503 version = '01'
504 504 def __init__(self, repo, bundlecaps=None):
505 505 """Given a source repo, construct a bundler.
506 506
507 507 bundlecaps is optional and can be used to specify the set of
508 508 capabilities which can be used to build the bundle.
509 509 """
510 510 # Set of capabilities we can use to build the bundle.
511 511 if bundlecaps is None:
512 512 bundlecaps = set()
513 513 self._bundlecaps = bundlecaps
514 514 # experimental config: bundle.reorder
515 515 reorder = repo.ui.config('bundle', 'reorder', 'auto')
516 516 if reorder == 'auto':
517 517 reorder = None
518 518 else:
519 519 reorder = util.parsebool(reorder)
520 520 self._repo = repo
521 521 self._reorder = reorder
522 522 self._progress = repo.ui.progress
523 523 if self._repo.ui.verbose and not self._repo.ui.debugflag:
524 524 self._verbosenote = self._repo.ui.note
525 525 else:
526 526 self._verbosenote = lambda s: None
527 527
528 528 def close(self):
529 529 return closechunk()
530 530
531 531 def fileheader(self, fname):
532 532 return chunkheader(len(fname)) + fname
533 533
534 534 # Extracted both for clarity and for overriding in extensions.
535 535 def _sortgroup(self, revlog, nodelist, lookup):
536 536 """Sort nodes for change group and turn them into revnums."""
537 537 # for generaldelta revlogs, we linearize the revs; this will both be
538 538 # much quicker and generate a much smaller bundle
539 539 if (revlog._generaldelta and self._reorder is None) or self._reorder:
540 540 dag = dagutil.revlogdag(revlog)
541 541 return dag.linearize(set(revlog.rev(n) for n in nodelist))
542 542 else:
543 543 return sorted([revlog.rev(n) for n in nodelist])
544 544
545 545 def group(self, nodelist, revlog, lookup, units=None):
546 546 """Calculate a delta group, yielding a sequence of changegroup chunks
547 547 (strings).
548 548
549 549 Given a list of changeset revs, return a set of deltas and
550 550 metadata corresponding to nodes. The first delta is
551 551 first parent(nodelist[0]) -> nodelist[0], the receiver is
552 552 guaranteed to have this parent as it has all history before
553 553 these changesets. In the case firstparent is nullrev the
554 554 changegroup starts with a full revision.
555 555
556 556 If units is not None, progress detail will be generated, units specifies
557 557 the type of revlog that is touched (changelog, manifest, etc.).
558 558 """
559 559 # if we don't have any revisions touched by these changesets, bail
560 560 if len(nodelist) == 0:
561 561 yield self.close()
562 562 return
563 563
564 564 revs = self._sortgroup(revlog, nodelist, lookup)
565 565
566 566 # add the parent of the first rev
567 567 p = revlog.parentrevs(revs[0])[0]
568 568 revs.insert(0, p)
569 569
570 570 # build deltas
571 571 total = len(revs) - 1
572 572 msgbundling = _('bundling')
573 573 for r in xrange(len(revs) - 1):
574 574 if units is not None:
575 575 self._progress(msgbundling, r + 1, unit=units, total=total)
576 576 prev, curr = revs[r], revs[r + 1]
577 577 linknode = lookup(revlog.node(curr))
578 578 for c in self.revchunk(revlog, curr, prev, linknode):
579 579 yield c
580 580
581 581 if units is not None:
582 582 self._progress(msgbundling, None)
583 583 yield self.close()
584 584
585 585 # filter any nodes that claim to be part of the known set
586 586 def prune(self, revlog, missing, commonrevs):
587 587 rr, rl = revlog.rev, revlog.linkrev
588 588 return [n for n in missing if rl(rr(n)) not in commonrevs]
589 589
590 590 def _packmanifests(self, dir, mfnodes, lookuplinknode):
591 591 """Pack flat manifests into a changegroup stream."""
592 592 assert not dir
593 593 for chunk in self.group(mfnodes, self._repo.manifestlog._revlog,
594 594 lookuplinknode, units=_('manifests')):
595 595 yield chunk
596 596
597 597 def _manifestsdone(self):
598 598 return ''
599 599
600 600 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
601 601 '''yield a sequence of changegroup chunks (strings)'''
602 602 repo = self._repo
603 603 cl = repo.changelog
604 604
605 605 clrevorder = {}
606 606 mfs = {} # needed manifests
607 607 fnodes = {} # needed file nodes
608 608 changedfiles = set()
609 609
610 610 # Callback for the changelog, used to collect changed files and manifest
611 611 # nodes.
612 612 # Returns the linkrev node (identity in the changelog case).
613 613 def lookupcl(x):
614 614 c = cl.read(x)
615 615 clrevorder[x] = len(clrevorder)
616 616 n = c[0]
617 617 # record the first changeset introducing this manifest version
618 618 mfs.setdefault(n, x)
619 619 # Record a complete list of potentially-changed files in
620 620 # this manifest.
621 621 changedfiles.update(c[3])
622 622 return x
623 623
624 624 self._verbosenote(_('uncompressed size of bundle content:\n'))
625 625 size = 0
626 626 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
627 627 size += len(chunk)
628 628 yield chunk
629 629 self._verbosenote(_('%8.i (changelog)\n') % size)
630 630
631 631 # We need to make sure that the linkrev in the changegroup refers to
632 632 # the first changeset that introduced the manifest or file revision.
633 633 # The fastpath is usually safer than the slowpath, because the filelogs
634 634 # are walked in revlog order.
635 635 #
636 636 # When taking the slowpath with reorder=None and the manifest revlog
637 637 # uses generaldelta, the manifest may be walked in the "wrong" order.
638 638 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
639 639 # cc0ff93d0c0c).
640 640 #
641 641 # When taking the fastpath, we are only vulnerable to reordering
642 642 # of the changelog itself. The changelog never uses generaldelta, so
643 643 # it is only reordered when reorder=True. To handle this case, we
644 644 # simply take the slowpath, which already has the 'clrevorder' logic.
645 645 # This was also fixed in cc0ff93d0c0c.
646 646 fastpathlinkrev = fastpathlinkrev and not self._reorder
647 647 # Treemanifests don't work correctly with fastpathlinkrev
648 648 # either, because we don't discover which directory nodes to
649 649 # send along with files. This could probably be fixed.
650 650 fastpathlinkrev = fastpathlinkrev and (
651 651 'treemanifest' not in repo.requirements)
652 652
653 653 for chunk in self.generatemanifests(commonrevs, clrevorder,
654 654 fastpathlinkrev, mfs, fnodes):
655 655 yield chunk
656 656 mfs.clear()
657 657 clrevs = set(cl.rev(x) for x in clnodes)
658 658
659 659 if not fastpathlinkrev:
660 660 def linknodes(unused, fname):
661 661 return fnodes.get(fname, {})
662 662 else:
663 663 cln = cl.node
664 664 def linknodes(filerevlog, fname):
665 665 llr = filerevlog.linkrev
666 666 fln = filerevlog.node
667 667 revs = ((r, llr(r)) for r in filerevlog)
668 668 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
669 669
670 670 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
671 671 source):
672 672 yield chunk
673 673
674 674 yield self.close()
675 675
676 676 if clnodes:
677 677 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
678 678
679 679 def generatemanifests(self, commonrevs, clrevorder, fastpathlinkrev, mfs,
680 680 fnodes):
681 681 repo = self._repo
682 682 mfl = repo.manifestlog
683 683 dirlog = mfl._revlog.dirlog
684 684 tmfnodes = {'': mfs}
685 685
686 686 # Callback for the manifest, used to collect linkrevs for filelog
687 687 # revisions.
688 688 # Returns the linkrev node (collected in lookupcl).
689 689 def makelookupmflinknode(dir):
690 690 if fastpathlinkrev:
691 691 assert not dir
692 692 return mfs.__getitem__
693 693
694 694 def lookupmflinknode(x):
695 695 """Callback for looking up the linknode for manifests.
696 696
697 697 Returns the linkrev node for the specified manifest.
698 698
699 699 SIDE EFFECT:
700 700
701 701 1) fclnodes gets populated with the list of relevant
702 702 file nodes if we're not using fastpathlinkrev
703 703 2) When treemanifests are in use, collects treemanifest nodes
704 704 to send
705 705
706 706 Note that this means manifests must be completely sent to
707 707 the client before you can trust the list of files and
708 708 treemanifests to send.
709 709 """
710 710 clnode = tmfnodes[dir][x]
711 711 mdata = mfl.get(dir, x).readfast(shallow=True)
712 712 for p, n, fl in mdata.iterentries():
713 713 if fl == 't': # subdirectory manifest
714 714 subdir = dir + p + '/'
715 715 tmfclnodes = tmfnodes.setdefault(subdir, {})
716 716 tmfclnode = tmfclnodes.setdefault(n, clnode)
717 717 if clrevorder[clnode] < clrevorder[tmfclnode]:
718 718 tmfclnodes[n] = clnode
719 719 else:
720 720 f = dir + p
721 721 fclnodes = fnodes.setdefault(f, {})
722 722 fclnode = fclnodes.setdefault(n, clnode)
723 723 if clrevorder[clnode] < clrevorder[fclnode]:
724 724 fclnodes[n] = clnode
725 725 return clnode
726 726 return lookupmflinknode
727 727
728 728 size = 0
729 729 while tmfnodes:
730 730 dir = min(tmfnodes)
731 731 nodes = tmfnodes[dir]
732 732 prunednodes = self.prune(dirlog(dir), nodes, commonrevs)
733 733 if not dir or prunednodes:
734 734 for x in self._packmanifests(dir, prunednodes,
735 735 makelookupmflinknode(dir)):
736 736 size += len(x)
737 737 yield x
738 738 del tmfnodes[dir]
739 739 self._verbosenote(_('%8.i (manifests)\n') % size)
740 740 yield self._manifestsdone()
741 741
742 742 # The 'source' parameter is useful for extensions
743 743 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
744 744 repo = self._repo
745 745 progress = self._progress
746 746 msgbundling = _('bundling')
747 747
748 748 total = len(changedfiles)
749 749 # for progress output
750 750 msgfiles = _('files')
751 751 for i, fname in enumerate(sorted(changedfiles)):
752 752 filerevlog = repo.file(fname)
753 753 if not filerevlog:
754 754 raise error.Abort(_("empty or missing revlog for %s") % fname)
755 755
756 756 linkrevnodes = linknodes(filerevlog, fname)
757 757 # Lookup for filenodes, we collected the linkrev nodes above in the
758 758 # fastpath case and with lookupmf in the slowpath case.
759 759 def lookupfilelog(x):
760 760 return linkrevnodes[x]
761 761
762 762 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
763 763 if filenodes:
764 764 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
765 765 total=total)
766 766 h = self.fileheader(fname)
767 767 size = len(h)
768 768 yield h
769 769 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
770 770 size += len(chunk)
771 771 yield chunk
772 772 self._verbosenote(_('%8.i %s\n') % (size, fname))
773 773 progress(msgbundling, None)
774 774
775 775 def deltaparent(self, revlog, rev, p1, p2, prev):
776 776 return prev
777 777
778 778 def revchunk(self, revlog, rev, prev, linknode):
779 779 node = revlog.node(rev)
780 780 p1, p2 = revlog.parentrevs(rev)
781 781 base = self.deltaparent(revlog, rev, p1, p2, prev)
782 782
783 783 prefix = ''
784 784 if revlog.iscensored(base) or revlog.iscensored(rev):
785 785 try:
786 786 delta = revlog.revision(node)
787 787 except error.CensoredNodeError as e:
788 788 delta = e.tombstone
789 789 if base == nullrev:
790 790 prefix = mdiff.trivialdiffheader(len(delta))
791 791 else:
792 792 baselen = revlog.rawsize(base)
793 793 prefix = mdiff.replacediffheader(baselen, len(delta))
794 794 elif base == nullrev:
795 795 delta = revlog.revision(node)
796 796 prefix = mdiff.trivialdiffheader(len(delta))
797 797 else:
798 798 delta = revlog.revdiff(base, rev)
799 799 p1n, p2n = revlog.parents(node)
800 800 basenode = revlog.node(base)
801 801 flags = revlog.flags(rev)
802 802 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
803 803 meta += prefix
804 804 l = len(meta) + len(delta)
805 805 yield chunkheader(l)
806 806 yield meta
807 807 yield delta
808 808 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
809 809 # do nothing with basenode, it is implicitly the previous one in HG10
810 810 # do nothing with flags, it is implicitly 0 for cg1 and cg2
811 811 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
812 812
813 813 class cg2packer(cg1packer):
814 814 version = '02'
815 815 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
816 816
817 817 def __init__(self, repo, bundlecaps=None):
818 818 super(cg2packer, self).__init__(repo, bundlecaps)
819 819 if self._reorder is None:
820 820 # Since generaldelta is directly supported by cg2, reordering
821 821 # generally doesn't help, so we disable it by default (treating
822 822 # bundle.reorder=auto just like bundle.reorder=False).
823 823 self._reorder = False
824 824
825 825 def deltaparent(self, revlog, rev, p1, p2, prev):
826 826 dp = revlog.deltaparent(rev)
827 827 if dp == nullrev and revlog.storedeltachains:
828 828 # Avoid sending full revisions when delta parent is null. Pick prev
829 829 # in that case. It's tempting to pick p1 in this case, as p1 will
830 830 # be smaller in the common case. However, computing a delta against
831 831 # p1 may require resolving the raw text of p1, which could be
832 832 # expensive. The revlog caches should have prev cached, meaning
833 833 # less CPU for changegroup generation. There is likely room to add
834 834 # a flag and/or config option to control this behavior.
835 835 return prev
836 836 elif dp == nullrev:
837 837 # revlog is configured to use full snapshot for a reason,
838 838 # stick to full snapshot.
839 839 return nullrev
840 840 elif dp not in (p1, p2, prev):
841 841 # Pick prev when we can't be sure remote has the base revision.
842 842 return prev
843 843 else:
844 844 return dp
845 845
846 846 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
847 847 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
848 848 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
849 849
850 850 class cg3packer(cg2packer):
851 851 version = '03'
852 852 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
853 853
854 854 def _packmanifests(self, dir, mfnodes, lookuplinknode):
855 855 if dir:
856 856 yield self.fileheader(dir)
857 857
858 858 dirlog = self._repo.manifestlog._revlog.dirlog(dir)
859 859 for chunk in self.group(mfnodes, dirlog, lookuplinknode,
860 860 units=_('manifests')):
861 861 yield chunk
862 862
863 863 def _manifestsdone(self):
864 864 return self.close()
865 865
866 866 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
867 867 return struct.pack(
868 868 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
869 869
870 870 _packermap = {'01': (cg1packer, cg1unpacker),
871 871 # cg2 adds support for exchanging generaldelta
872 872 '02': (cg2packer, cg2unpacker),
873 873 # cg3 adds support for exchanging revlog flags and treemanifests
874 874 '03': (cg3packer, cg3unpacker),
875 875 }
876 876
877 877 def allsupportedversions(ui):
878 878 versions = set(_packermap.keys())
879 879 versions.discard('03')
880 880 if (ui.configbool('experimental', 'changegroup3') or
881 881 ui.configbool('experimental', 'treemanifest')):
882 882 versions.add('03')
883 883 return versions
884 884
885 885 # Changegroup versions that can be applied to the repo
886 886 def supportedincomingversions(repo):
887 887 versions = allsupportedversions(repo.ui)
888 888 if 'treemanifest' in repo.requirements:
889 889 versions.add('03')
890 890 return versions
891 891
892 892 # Changegroup versions that can be created from the repo
893 893 def supportedoutgoingversions(repo):
894 894 versions = allsupportedversions(repo.ui)
895 895 if 'treemanifest' in repo.requirements:
896 896 # Versions 01 and 02 support only flat manifests and it's just too
897 897 # expensive to convert between the flat manifest and tree manifest on
898 898 # the fly. Since tree manifests are hashed differently, all of history
899 899 # would have to be converted. Instead, we simply don't even pretend to
900 900 # support versions 01 and 02.
901 901 versions.discard('01')
902 902 versions.discard('02')
903 903 versions.add('03')
904 904 return versions
905 905
906 906 def safeversion(repo):
907 907 # Finds the smallest version that it's safe to assume clients of the repo
908 908 # will support. For example, all hg versions that support generaldelta also
909 909 # support changegroup 02.
910 910 versions = supportedoutgoingversions(repo)
911 911 if 'generaldelta' in repo.requirements:
912 912 versions.discard('01')
913 913 assert versions
914 914 return min(versions)
915 915
916 916 def getbundler(version, repo, bundlecaps=None):
917 917 assert version in supportedoutgoingversions(repo)
918 918 return _packermap[version][0](repo, bundlecaps)
919 919
920 920 def getunbundler(version, fh, alg, extras=None):
921 921 return _packermap[version][1](fh, alg, extras=extras)
922 922
923 923 def _changegroupinfo(repo, nodes, source):
924 924 if repo.ui.verbose or source == 'bundle':
925 925 repo.ui.status(_("%d changesets found\n") % len(nodes))
926 926 if repo.ui.debugflag:
927 927 repo.ui.debug("list of changesets:\n")
928 928 for node in nodes:
929 929 repo.ui.debug("%s\n" % hex(node))
930 930
931 931 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
932 932 repo = repo.unfiltered()
933 933 commonrevs = outgoing.common
934 934 csets = outgoing.missing
935 935 heads = outgoing.missingheads
936 936 # We go through the fast path if we get told to, or if all (unfiltered
937 937 # heads have been requested (since we then know there all linkrevs will
938 938 # be pulled by the client).
939 939 heads.sort()
940 940 fastpathlinkrev = fastpath or (
941 941 repo.filtername is None and heads == sorted(repo.heads()))
942 942
943 943 repo.hook('preoutgoing', throw=True, source=source)
944 944 _changegroupinfo(repo, csets, source)
945 945 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
946 946
947 947 def getsubset(repo, outgoing, bundler, source, fastpath=False):
948 948 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
949 949 return getunbundler(bundler.version, util.chunkbuffer(gengroup), None,
950 950 {'clcount': len(outgoing.missing)})
951 951
952 952 def changegroupsubset(repo, roots, heads, source, version='01'):
953 953 """Compute a changegroup consisting of all the nodes that are
954 954 descendants of any of the roots and ancestors of any of the heads.
955 955 Return a chunkbuffer object whose read() method will return
956 956 successive changegroup chunks.
957 957
958 958 It is fairly complex as determining which filenodes and which
959 959 manifest nodes need to be included for the changeset to be complete
960 960 is non-trivial.
961 961
962 962 Another wrinkle is doing the reverse, figuring out which changeset in
963 963 the changegroup a particular filenode or manifestnode belongs to.
964 964 """
965 965 outgoing = discovery.outgoing(repo, missingroots=roots, missingheads=heads)
966 966 bundler = getbundler(version, repo)
967 967 return getsubset(repo, outgoing, bundler, source)
968 968
969 969 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
970 970 version='01'):
971 971 """Like getbundle, but taking a discovery.outgoing as an argument.
972 972
973 973 This is only implemented for local repos and reuses potentially
974 974 precomputed sets in outgoing. Returns a raw changegroup generator."""
975 975 if not outgoing.missing:
976 976 return None
977 977 bundler = getbundler(version, repo, bundlecaps)
978 978 return getsubsetraw(repo, outgoing, bundler, source)
979 979
980 980 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None,
981 981 version='01'):
982 982 """Like getbundle, but taking a discovery.outgoing as an argument.
983 983
984 984 This is only implemented for local repos and reuses potentially
985 985 precomputed sets in outgoing."""
986 986 if not outgoing.missing:
987 987 return None
988 988 bundler = getbundler(version, repo, bundlecaps)
989 989 return getsubset(repo, outgoing, bundler, source)
990 990
991 991 def getchangegroup(repo, source, outgoing, bundlecaps=None,
992 992 version='01'):
993 993 """Like changegroupsubset, but returns the set difference between the
994 994 ancestors of heads and the ancestors common.
995 995
996 996 If heads is None, use the local heads. If common is None, use [nullid].
997 997
998 998 The nodes in common might not all be known locally due to the way the
999 999 current discovery protocol works.
1000 1000 """
1001 1001 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps,
1002 1002 version=version)
1003 1003
1004 1004 def changegroup(repo, basenodes, source):
1005 1005 # to avoid a race we use changegroupsubset() (issue1320)
1006 1006 return changegroupsubset(repo, basenodes, repo.heads(), source)
1007 1007
1008 1008 def _addchangegroupfiles(repo, source, revmap, trp, expectedfiles, needfiles):
1009 1009 revisions = 0
1010 1010 files = 0
1011 1011 for chunkdata in iter(source.filelogheader, {}):
1012 1012 files += 1
1013 1013 f = chunkdata["filename"]
1014 1014 repo.ui.debug("adding %s revisions\n" % f)
1015 1015 repo.ui.progress(_('files'), files, unit=_('files'),
1016 1016 total=expectedfiles)
1017 1017 fl = repo.file(f)
1018 1018 o = len(fl)
1019 1019 try:
1020 1020 if not fl.addgroup(source, revmap, trp):
1021 1021 raise error.Abort(_("received file revlog group is empty"))
1022 1022 except error.CensoredBaseError as e:
1023 1023 raise error.Abort(_("received delta base is censored: %s") % e)
1024 1024 revisions += len(fl) - o
1025 1025 if f in needfiles:
1026 1026 needs = needfiles[f]
1027 1027 for new in xrange(o, len(fl)):
1028 1028 n = fl.node(new)
1029 1029 if n in needs:
1030 1030 needs.remove(n)
1031 1031 else:
1032 1032 raise error.Abort(
1033 1033 _("received spurious file revlog entry"))
1034 1034 if not needs:
1035 1035 del needfiles[f]
1036 1036 repo.ui.progress(_('files'), None)
1037 1037
1038 1038 for f, needs in needfiles.iteritems():
1039 1039 fl = repo.file(f)
1040 1040 for n in needs:
1041 1041 try:
1042 1042 fl.rev(n)
1043 1043 except error.LookupError:
1044 1044 raise error.Abort(
1045 1045 _('missing file data for %s:%s - run hg verify') %
1046 1046 (f, hex(n)))
1047 1047
1048 1048 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now