##// END OF EJS Templates
changegroup3: add empty chunk separating directories and files...
Martin von Zweigbergk -
r27753:d4071cc7 default
parent child Browse files
Show More
@@ -1,528 +1,532 b''
1 1 # bundlerepo.py - repository class for viewing uncompressed bundles
2 2 #
3 3 # Copyright 2006, 2007 Benoit Boissinot <bboissin@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 """Repository class for viewing uncompressed bundles.
9 9
10 10 This provides a read-only repository interface to bundles as if they
11 11 were part of the actual repository.
12 12 """
13 13
14 14 from __future__ import absolute_import
15 15
16 16 import os
17 17 import shutil
18 18 import tempfile
19 19
20 20 from .i18n import _
21 21 from .node import nullid
22 22
23 23 from . import (
24 24 bundle2,
25 25 changegroup,
26 26 changelog,
27 27 cmdutil,
28 28 discovery,
29 29 error,
30 30 exchange,
31 31 filelog,
32 32 localrepo,
33 33 manifest,
34 34 mdiff,
35 35 pathutil,
36 36 phases,
37 37 revlog,
38 38 scmutil,
39 39 util,
40 40 )
41 41
42 42 class bundlerevlog(revlog.revlog):
43 43 def __init__(self, opener, indexfile, bundle, linkmapper):
44 44 # How it works:
45 45 # To retrieve a revision, we need to know the offset of the revision in
46 46 # the bundle (an unbundle object). We store this offset in the index
47 47 # (start). The base of the delta is stored in the base field.
48 48 #
49 49 # To differentiate a rev in the bundle from a rev in the revlog, we
50 50 # check revision against repotiprev.
51 51 opener = scmutil.readonlyvfs(opener)
52 52 revlog.revlog.__init__(self, opener, indexfile)
53 53 self.bundle = bundle
54 54 n = len(self)
55 55 self.repotiprev = n - 1
56 56 chain = None
57 57 self.bundlerevs = set() # used by 'bundle()' revset expression
58 58 while True:
59 59 chunkdata = bundle.deltachunk(chain)
60 60 if not chunkdata:
61 61 break
62 62 node = chunkdata['node']
63 63 p1 = chunkdata['p1']
64 64 p2 = chunkdata['p2']
65 65 cs = chunkdata['cs']
66 66 deltabase = chunkdata['deltabase']
67 67 delta = chunkdata['delta']
68 68
69 69 size = len(delta)
70 70 start = bundle.tell() - size
71 71
72 72 link = linkmapper(cs)
73 73 if node in self.nodemap:
74 74 # this can happen if two branches make the same change
75 75 chain = node
76 76 self.bundlerevs.add(self.nodemap[node])
77 77 continue
78 78
79 79 for p in (p1, p2):
80 80 if p not in self.nodemap:
81 81 raise error.LookupError(p, self.indexfile,
82 82 _("unknown parent"))
83 83
84 84 if deltabase not in self.nodemap:
85 85 raise LookupError(deltabase, self.indexfile,
86 86 _('unknown delta base'))
87 87
88 88 baserev = self.rev(deltabase)
89 89 # start, size, full unc. size, base (unused), link, p1, p2, node
90 90 e = (revlog.offset_type(start, 0), size, -1, baserev, link,
91 91 self.rev(p1), self.rev(p2), node)
92 92 self.index.insert(-1, e)
93 93 self.nodemap[node] = n
94 94 self.bundlerevs.add(n)
95 95 chain = node
96 96 n += 1
97 97
98 98 def _chunk(self, rev):
99 99 # Warning: in case of bundle, the diff is against what we stored as
100 100 # delta base, not against rev - 1
101 101 # XXX: could use some caching
102 102 if rev <= self.repotiprev:
103 103 return revlog.revlog._chunk(self, rev)
104 104 self.bundle.seek(self.start(rev))
105 105 return self.bundle.read(self.length(rev))
106 106
107 107 def revdiff(self, rev1, rev2):
108 108 """return or calculate a delta between two revisions"""
109 109 if rev1 > self.repotiprev and rev2 > self.repotiprev:
110 110 # hot path for bundle
111 111 revb = self.index[rev2][3]
112 112 if revb == rev1:
113 113 return self._chunk(rev2)
114 114 elif rev1 <= self.repotiprev and rev2 <= self.repotiprev:
115 115 return revlog.revlog.revdiff(self, rev1, rev2)
116 116
117 117 return mdiff.textdiff(self.revision(self.node(rev1)),
118 118 self.revision(self.node(rev2)))
119 119
120 120 def revision(self, nodeorrev):
121 121 """return an uncompressed revision of a given node or revision
122 122 number.
123 123 """
124 124 if isinstance(nodeorrev, int):
125 125 rev = nodeorrev
126 126 node = self.node(rev)
127 127 else:
128 128 node = nodeorrev
129 129 rev = self.rev(node)
130 130
131 131 if node == nullid:
132 132 return ""
133 133
134 134 text = None
135 135 chain = []
136 136 iterrev = rev
137 137 # reconstruct the revision if it is from a changegroup
138 138 while iterrev > self.repotiprev:
139 139 if self._cache and self._cache[1] == iterrev:
140 140 text = self._cache[2]
141 141 break
142 142 chain.append(iterrev)
143 143 iterrev = self.index[iterrev][3]
144 144 if text is None:
145 145 text = self.baserevision(iterrev)
146 146
147 147 while chain:
148 148 delta = self._chunk(chain.pop())
149 149 text = mdiff.patches(text, [delta])
150 150
151 151 self._checkhash(text, node, rev)
152 152 self._cache = (node, rev, text)
153 153 return text
154 154
155 155 def baserevision(self, nodeorrev):
156 156 # Revlog subclasses may override 'revision' method to modify format of
157 157 # content retrieved from revlog. To use bundlerevlog with such class one
158 158 # needs to override 'baserevision' and make more specific call here.
159 159 return revlog.revlog.revision(self, nodeorrev)
160 160
161 161 def addrevision(self, text, transaction, link, p1=None, p2=None, d=None):
162 162 raise NotImplementedError
163 163 def addgroup(self, revs, linkmapper, transaction):
164 164 raise NotImplementedError
165 165 def strip(self, rev, minlink):
166 166 raise NotImplementedError
167 167 def checksize(self):
168 168 raise NotImplementedError
169 169
170 170 class bundlechangelog(bundlerevlog, changelog.changelog):
171 171 def __init__(self, opener, bundle):
172 172 changelog.changelog.__init__(self, opener)
173 173 linkmapper = lambda x: x
174 174 bundlerevlog.__init__(self, opener, self.indexfile, bundle,
175 175 linkmapper)
176 176
177 177 def baserevision(self, nodeorrev):
178 178 # Although changelog doesn't override 'revision' method, some extensions
179 179 # may replace this class with another that does. Same story with
180 180 # manifest and filelog classes.
181 181
182 182 # This bypasses filtering on changelog.node() and rev() because we need
183 183 # revision text of the bundle base even if it is hidden.
184 184 oldfilter = self.filteredrevs
185 185 try:
186 186 self.filteredrevs = ()
187 187 return changelog.changelog.revision(self, nodeorrev)
188 188 finally:
189 189 self.filteredrevs = oldfilter
190 190
191 191 class bundlemanifest(bundlerevlog, manifest.manifest):
192 192 def __init__(self, opener, bundle, linkmapper):
193 193 manifest.manifest.__init__(self, opener)
194 194 bundlerevlog.__init__(self, opener, self.indexfile, bundle,
195 195 linkmapper)
196 196
197 197 def baserevision(self, nodeorrev):
198 198 node = nodeorrev
199 199 if isinstance(node, int):
200 200 node = self.node(node)
201 201
202 202 if node in self._mancache:
203 203 result = self._mancache[node][0].text()
204 204 else:
205 205 result = manifest.manifest.revision(self, nodeorrev)
206 206 return result
207 207
208 208 class bundlefilelog(bundlerevlog, filelog.filelog):
209 209 def __init__(self, opener, path, bundle, linkmapper):
210 210 filelog.filelog.__init__(self, opener, path)
211 211 bundlerevlog.__init__(self, opener, self.indexfile, bundle,
212 212 linkmapper)
213 213
214 214 def baserevision(self, nodeorrev):
215 215 return filelog.filelog.revision(self, nodeorrev)
216 216
217 217 class bundlepeer(localrepo.localpeer):
218 218 def canpush(self):
219 219 return False
220 220
221 221 class bundlephasecache(phases.phasecache):
222 222 def __init__(self, *args, **kwargs):
223 223 super(bundlephasecache, self).__init__(*args, **kwargs)
224 224 if util.safehasattr(self, 'opener'):
225 225 self.opener = scmutil.readonlyvfs(self.opener)
226 226
227 227 def write(self):
228 228 raise NotImplementedError
229 229
230 230 def _write(self, fp):
231 231 raise NotImplementedError
232 232
233 233 def _updateroots(self, phase, newroots, tr):
234 234 self.phaseroots[phase] = newroots
235 235 self.invalidate()
236 236 self.dirty = True
237 237
238 238 class bundlerepository(localrepo.localrepository):
239 239 def __init__(self, ui, path, bundlename):
240 240 def _writetempbundle(read, suffix, header=''):
241 241 """Write a temporary file to disk
242 242
243 243 This is closure because we need to make sure this tracked by
244 244 self.tempfile for cleanup purposes."""
245 245 fdtemp, temp = self.vfs.mkstemp(prefix="hg-bundle-",
246 246 suffix=".hg10un")
247 247 self.tempfile = temp
248 248 fptemp = os.fdopen(fdtemp, 'wb')
249 249
250 250 try:
251 251 fptemp.write(header)
252 252 while True:
253 253 chunk = read(2**18)
254 254 if not chunk:
255 255 break
256 256 fptemp.write(chunk)
257 257 finally:
258 258 fptemp.close()
259 259
260 260 return self.vfs.open(self.tempfile, mode="rb")
261 261 self._tempparent = None
262 262 try:
263 263 localrepo.localrepository.__init__(self, ui, path)
264 264 except error.RepoError:
265 265 self._tempparent = tempfile.mkdtemp()
266 266 localrepo.instance(ui, self._tempparent, 1)
267 267 localrepo.localrepository.__init__(self, ui, self._tempparent)
268 268 self.ui.setconfig('phases', 'publish', False, 'bundlerepo')
269 269
270 270 if path:
271 271 self._url = 'bundle:' + util.expandpath(path) + '+' + bundlename
272 272 else:
273 273 self._url = 'bundle:' + bundlename
274 274
275 275 self.tempfile = None
276 276 f = util.posixfile(bundlename, "rb")
277 277 self.bundlefile = self.bundle = exchange.readbundle(ui, f, bundlename)
278 278
279 279 if isinstance(self.bundle, bundle2.unbundle20):
280 280 cgstream = None
281 281 for part in self.bundle.iterparts():
282 282 if part.type == 'changegroup':
283 283 if cgstream is not None:
284 284 raise NotImplementedError("can't process "
285 285 "multiple changegroups")
286 286 cgstream = part
287 287 version = part.params.get('version', '01')
288 288 if version not in changegroup.supportedversions(self):
289 289 msg = _('Unsupported changegroup version: %s')
290 290 raise error.Abort(msg % version)
291 291 if self.bundle.compressed():
292 292 cgstream = _writetempbundle(part.read,
293 293 ".cg%sun" % version)
294 294
295 295 if cgstream is None:
296 296 raise error.Abort('No changegroups found')
297 297 cgstream.seek(0)
298 298
299 299 self.bundle = changegroup.getunbundler(version, cgstream, 'UN')
300 300
301 301 elif self.bundle.compressed():
302 302 f = _writetempbundle(self.bundle.read, '.hg10un', header='HG10UN')
303 303 self.bundlefile = self.bundle = exchange.readbundle(ui, f,
304 304 bundlename,
305 305 self.vfs)
306 306
307 307 # dict with the mapping 'filename' -> position in the bundle
308 308 self.bundlefilespos = {}
309 309
310 310 self.firstnewrev = self.changelog.repotiprev + 1
311 311 phases.retractboundary(self, None, phases.draft,
312 312 [ctx.node() for ctx in self[self.firstnewrev:]])
313 313
314 314 @localrepo.unfilteredpropertycache
315 315 def _phasecache(self):
316 316 return bundlephasecache(self, self._phasedefaults)
317 317
318 318 @localrepo.unfilteredpropertycache
319 319 def changelog(self):
320 320 # consume the header if it exists
321 321 self.bundle.changelogheader()
322 322 c = bundlechangelog(self.svfs, self.bundle)
323 323 self.manstart = self.bundle.tell()
324 324 return c
325 325
326 326 @localrepo.unfilteredpropertycache
327 327 def manifest(self):
328 328 self.bundle.seek(self.manstart)
329 329 # consume the header if it exists
330 330 self.bundle.manifestheader()
331 331 m = bundlemanifest(self.svfs, self.bundle, self.changelog.rev)
332 # XXX: hack to work with changegroup3, but we still don't handle
333 # tree manifests correctly
334 if self.bundle.version == "03":
335 self.bundle.filelogheader()
332 336 self.filestart = self.bundle.tell()
333 337 return m
334 338
335 339 @localrepo.unfilteredpropertycache
336 340 def manstart(self):
337 341 self.changelog
338 342 return self.manstart
339 343
340 344 @localrepo.unfilteredpropertycache
341 345 def filestart(self):
342 346 self.manifest
343 347 return self.filestart
344 348
345 349 def url(self):
346 350 return self._url
347 351
348 352 def file(self, f):
349 353 if not self.bundlefilespos:
350 354 self.bundle.seek(self.filestart)
351 355 while True:
352 356 chunkdata = self.bundle.filelogheader()
353 357 if not chunkdata:
354 358 break
355 359 fname = chunkdata['filename']
356 360 self.bundlefilespos[fname] = self.bundle.tell()
357 361 while True:
358 362 c = self.bundle.deltachunk(None)
359 363 if not c:
360 364 break
361 365
362 366 if f in self.bundlefilespos:
363 367 self.bundle.seek(self.bundlefilespos[f])
364 368 return bundlefilelog(self.svfs, f, self.bundle, self.changelog.rev)
365 369 else:
366 370 return filelog.filelog(self.svfs, f)
367 371
368 372 def close(self):
369 373 """Close assigned bundle file immediately."""
370 374 self.bundlefile.close()
371 375 if self.tempfile is not None:
372 376 self.vfs.unlink(self.tempfile)
373 377 if self._tempparent:
374 378 shutil.rmtree(self._tempparent, True)
375 379
376 380 def cancopy(self):
377 381 return False
378 382
379 383 def peer(self):
380 384 return bundlepeer(self)
381 385
382 386 def getcwd(self):
383 387 return os.getcwd() # always outside the repo
384 388
385 389
386 390 def instance(ui, path, create):
387 391 if create:
388 392 raise error.Abort(_('cannot create new bundle repository'))
389 393 # internal config: bundle.mainreporoot
390 394 parentpath = ui.config("bundle", "mainreporoot", "")
391 395 if not parentpath:
392 396 # try to find the correct path to the working directory repo
393 397 parentpath = cmdutil.findrepo(os.getcwd())
394 398 if parentpath is None:
395 399 parentpath = ''
396 400 if parentpath:
397 401 # Try to make the full path relative so we get a nice, short URL.
398 402 # In particular, we don't want temp dir names in test outputs.
399 403 cwd = os.getcwd()
400 404 if parentpath == cwd:
401 405 parentpath = ''
402 406 else:
403 407 cwd = pathutil.normasprefix(cwd)
404 408 if parentpath.startswith(cwd):
405 409 parentpath = parentpath[len(cwd):]
406 410 u = util.url(path)
407 411 path = u.localpath()
408 412 if u.scheme == 'bundle':
409 413 s = path.split("+", 1)
410 414 if len(s) == 1:
411 415 repopath, bundlename = parentpath, s[0]
412 416 else:
413 417 repopath, bundlename = s
414 418 else:
415 419 repopath, bundlename = parentpath, path
416 420 return bundlerepository(ui, repopath, bundlename)
417 421
418 422 class bundletransactionmanager(object):
419 423 def transaction(self):
420 424 return None
421 425
422 426 def close(self):
423 427 raise NotImplementedError
424 428
425 429 def release(self):
426 430 raise NotImplementedError
427 431
428 432 def getremotechanges(ui, repo, other, onlyheads=None, bundlename=None,
429 433 force=False):
430 434 '''obtains a bundle of changes incoming from other
431 435
432 436 "onlyheads" restricts the returned changes to those reachable from the
433 437 specified heads.
434 438 "bundlename", if given, stores the bundle to this file path permanently;
435 439 otherwise it's stored to a temp file and gets deleted again when you call
436 440 the returned "cleanupfn".
437 441 "force" indicates whether to proceed on unrelated repos.
438 442
439 443 Returns a tuple (local, csets, cleanupfn):
440 444
441 445 "local" is a local repo from which to obtain the actual incoming
442 446 changesets; it is a bundlerepo for the obtained bundle when the
443 447 original "other" is remote.
444 448 "csets" lists the incoming changeset node ids.
445 449 "cleanupfn" must be called without arguments when you're done processing
446 450 the changes; it closes both the original "other" and the one returned
447 451 here.
448 452 '''
449 453 tmp = discovery.findcommonincoming(repo, other, heads=onlyheads,
450 454 force=force)
451 455 common, incoming, rheads = tmp
452 456 if not incoming:
453 457 try:
454 458 if bundlename:
455 459 os.unlink(bundlename)
456 460 except OSError:
457 461 pass
458 462 return repo, [], other.close
459 463
460 464 commonset = set(common)
461 465 rheads = [x for x in rheads if x not in commonset]
462 466
463 467 bundle = None
464 468 bundlerepo = None
465 469 localrepo = other.local()
466 470 if bundlename or not localrepo:
467 471 # create a bundle (uncompressed if other repo is not local)
468 472
469 473 canbundle2 = (ui.configbool('experimental', 'bundle2-exp', True)
470 474 and other.capable('getbundle')
471 475 and other.capable('bundle2'))
472 476 if canbundle2:
473 477 kwargs = {}
474 478 kwargs['common'] = common
475 479 kwargs['heads'] = rheads
476 480 kwargs['bundlecaps'] = exchange.caps20to10(repo)
477 481 kwargs['cg'] = True
478 482 b2 = other.getbundle('incoming', **kwargs)
479 483 fname = bundle = changegroup.writechunks(ui, b2._forwardchunks(),
480 484 bundlename)
481 485 else:
482 486 if other.capable('getbundle'):
483 487 cg = other.getbundle('incoming', common=common, heads=rheads)
484 488 elif onlyheads is None and not other.capable('changegroupsubset'):
485 489 # compat with older servers when pulling all remote heads
486 490 cg = other.changegroup(incoming, "incoming")
487 491 rheads = None
488 492 else:
489 493 cg = other.changegroupsubset(incoming, rheads, 'incoming')
490 494 if localrepo:
491 495 bundletype = "HG10BZ"
492 496 else:
493 497 bundletype = "HG10UN"
494 498 fname = bundle = changegroup.writebundle(ui, cg, bundlename,
495 499 bundletype)
496 500 # keep written bundle?
497 501 if bundlename:
498 502 bundle = None
499 503 if not localrepo:
500 504 # use the created uncompressed bundlerepo
501 505 localrepo = bundlerepo = bundlerepository(repo.baseui, repo.root,
502 506 fname)
503 507 # this repo contains local and other now, so filter out local again
504 508 common = repo.heads()
505 509 if localrepo:
506 510 # Part of common may be remotely filtered
507 511 # So use an unfiltered version
508 512 # The discovery process probably need cleanup to avoid that
509 513 localrepo = localrepo.unfiltered()
510 514
511 515 csets = localrepo.changelog.findmissing(common, rheads)
512 516
513 517 if bundlerepo:
514 518 reponodes = [ctx.node() for ctx in bundlerepo[bundlerepo.firstnewrev:]]
515 519 remotephases = other.listkeys('phases')
516 520
517 521 pullop = exchange.pulloperation(bundlerepo, other, heads=reponodes)
518 522 pullop.trmanager = bundletransactionmanager()
519 523 exchange._pullapplyphases(pullop, remotephases)
520 524
521 525 def cleanup():
522 526 if bundlerepo:
523 527 bundlerepo.close()
524 528 if bundle:
525 529 os.unlink(bundle)
526 530 other.close()
527 531
528 532 return (localrepo, csets, cleanup)
@@ -1,1102 +1,1107 b''
1 1 # changegroup.py - Mercurial changegroup manipulation functions
2 2 #
3 3 # Copyright 2006 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import struct
12 12 import tempfile
13 13 import weakref
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 hex,
18 18 nullid,
19 19 nullrev,
20 20 short,
21 21 )
22 22
23 23 from . import (
24 24 branchmap,
25 25 dagutil,
26 26 discovery,
27 27 error,
28 28 mdiff,
29 29 phases,
30 30 util,
31 31 )
32 32
33 33 _CHANGEGROUPV1_DELTA_HEADER = "20s20s20s20s"
34 34 _CHANGEGROUPV2_DELTA_HEADER = "20s20s20s20s20s"
35 35 _CHANGEGROUPV3_DELTA_HEADER = ">20s20s20s20s20sH"
36 36
37 37 def readexactly(stream, n):
38 38 '''read n bytes from stream.read and abort if less was available'''
39 39 s = stream.read(n)
40 40 if len(s) < n:
41 41 raise error.Abort(_("stream ended unexpectedly"
42 42 " (got %d bytes, expected %d)")
43 43 % (len(s), n))
44 44 return s
45 45
46 46 def getchunk(stream):
47 47 """return the next chunk from stream as a string"""
48 48 d = readexactly(stream, 4)
49 49 l = struct.unpack(">l", d)[0]
50 50 if l <= 4:
51 51 if l:
52 52 raise error.Abort(_("invalid chunk length %d") % l)
53 53 return ""
54 54 return readexactly(stream, l - 4)
55 55
56 56 def chunkheader(length):
57 57 """return a changegroup chunk header (string)"""
58 58 return struct.pack(">l", length + 4)
59 59
60 60 def closechunk():
61 61 """return a changegroup chunk header (string) for a zero-length chunk"""
62 62 return struct.pack(">l", 0)
63 63
64 64 def combineresults(results):
65 65 """logic to combine 0 or more addchangegroup results into one"""
66 66 changedheads = 0
67 67 result = 1
68 68 for ret in results:
69 69 # If any changegroup result is 0, return 0
70 70 if ret == 0:
71 71 result = 0
72 72 break
73 73 if ret < -1:
74 74 changedheads += ret + 1
75 75 elif ret > 1:
76 76 changedheads += ret - 1
77 77 if changedheads > 0:
78 78 result = 1 + changedheads
79 79 elif changedheads < 0:
80 80 result = -1 + changedheads
81 81 return result
82 82
83 83 bundletypes = {
84 84 "": ("", None), # only when using unbundle on ssh and old http servers
85 85 # since the unification ssh accepts a header but there
86 86 # is no capability signaling it.
87 87 "HG20": (), # special-cased below
88 88 "HG10UN": ("HG10UN", None),
89 89 "HG10BZ": ("HG10", 'BZ'),
90 90 "HG10GZ": ("HG10GZ", 'GZ'),
91 91 }
92 92
93 93 # hgweb uses this list to communicate its preferred type
94 94 bundlepriority = ['HG10GZ', 'HG10BZ', 'HG10UN']
95 95
96 96 def writechunks(ui, chunks, filename, vfs=None):
97 97 """Write chunks to a file and return its filename.
98 98
99 99 The stream is assumed to be a bundle file.
100 100 Existing files will not be overwritten.
101 101 If no filename is specified, a temporary file is created.
102 102 """
103 103 fh = None
104 104 cleanup = None
105 105 try:
106 106 if filename:
107 107 if vfs:
108 108 fh = vfs.open(filename, "wb")
109 109 else:
110 110 fh = open(filename, "wb")
111 111 else:
112 112 fd, filename = tempfile.mkstemp(prefix="hg-bundle-", suffix=".hg")
113 113 fh = os.fdopen(fd, "wb")
114 114 cleanup = filename
115 115 for c in chunks:
116 116 fh.write(c)
117 117 cleanup = None
118 118 return filename
119 119 finally:
120 120 if fh is not None:
121 121 fh.close()
122 122 if cleanup is not None:
123 123 if filename and vfs:
124 124 vfs.unlink(cleanup)
125 125 else:
126 126 os.unlink(cleanup)
127 127
128 128 def writebundle(ui, cg, filename, bundletype, vfs=None, compression=None):
129 129 """Write a bundle file and return its filename.
130 130
131 131 Existing files will not be overwritten.
132 132 If no filename is specified, a temporary file is created.
133 133 bz2 compression can be turned off.
134 134 The bundle file will be deleted in case of errors.
135 135 """
136 136
137 137 if bundletype == "HG20":
138 138 from . import bundle2
139 139 bundle = bundle2.bundle20(ui)
140 140 bundle.setcompression(compression)
141 141 part = bundle.newpart('changegroup', data=cg.getchunks())
142 142 part.addparam('version', cg.version)
143 143 chunkiter = bundle.getchunks()
144 144 else:
145 145 # compression argument is only for the bundle2 case
146 146 assert compression is None
147 147 if cg.version != '01':
148 148 raise error.Abort(_('old bundle types only supports v1 '
149 149 'changegroups'))
150 150 header, comp = bundletypes[bundletype]
151 151 if comp not in util.compressors:
152 152 raise error.Abort(_('unknown stream compression type: %s')
153 153 % comp)
154 154 z = util.compressors[comp]()
155 155 subchunkiter = cg.getchunks()
156 156 def chunkiter():
157 157 yield header
158 158 for chunk in subchunkiter:
159 159 yield z.compress(chunk)
160 160 yield z.flush()
161 161 chunkiter = chunkiter()
162 162
163 163 # parse the changegroup data, otherwise we will block
164 164 # in case of sshrepo because we don't know the end of the stream
165 165
166 166 # an empty chunkgroup is the end of the changegroup
167 167 # a changegroup has at least 2 chunkgroups (changelog and manifest).
168 168 # after that, an empty chunkgroup is the end of the changegroup
169 169 return writechunks(ui, chunkiter, filename, vfs=vfs)
170 170
171 171 class cg1unpacker(object):
172 172 """Unpacker for cg1 changegroup streams.
173 173
174 174 A changegroup unpacker handles the framing of the revision data in
175 175 the wire format. Most consumers will want to use the apply()
176 176 method to add the changes from the changegroup to a repository.
177 177
178 178 If you're forwarding a changegroup unmodified to another consumer,
179 179 use getchunks(), which returns an iterator of changegroup
180 180 chunks. This is mostly useful for cases where you need to know the
181 181 data stream has ended by observing the end of the changegroup.
182 182
183 183 deltachunk() is useful only if you're applying delta data. Most
184 184 consumers should prefer apply() instead.
185 185
186 186 A few other public methods exist. Those are used only for
187 187 bundlerepo and some debug commands - their use is discouraged.
188 188 """
189 189 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
190 190 deltaheadersize = struct.calcsize(deltaheader)
191 191 version = '01'
192 192 def __init__(self, fh, alg):
193 193 if alg == 'UN':
194 194 alg = None # get more modern without breaking too much
195 195 if not alg in util.decompressors:
196 196 raise error.Abort(_('unknown stream compression type: %s')
197 197 % alg)
198 198 if alg == 'BZ':
199 199 alg = '_truncatedBZ'
200 200 self._stream = util.decompressors[alg](fh)
201 201 self._type = alg
202 202 self.callback = None
203 203
204 204 # These methods (compressed, read, seek, tell) all appear to only
205 205 # be used by bundlerepo, but it's a little hard to tell.
206 206 def compressed(self):
207 207 return self._type is not None
208 208 def read(self, l):
209 209 return self._stream.read(l)
210 210 def seek(self, pos):
211 211 return self._stream.seek(pos)
212 212 def tell(self):
213 213 return self._stream.tell()
214 214 def close(self):
215 215 return self._stream.close()
216 216
217 217 def _chunklength(self):
218 218 d = readexactly(self._stream, 4)
219 219 l = struct.unpack(">l", d)[0]
220 220 if l <= 4:
221 221 if l:
222 222 raise error.Abort(_("invalid chunk length %d") % l)
223 223 return 0
224 224 if self.callback:
225 225 self.callback()
226 226 return l - 4
227 227
228 228 def changelogheader(self):
229 229 """v10 does not have a changelog header chunk"""
230 230 return {}
231 231
232 232 def manifestheader(self):
233 233 """v10 does not have a manifest header chunk"""
234 234 return {}
235 235
236 236 def filelogheader(self):
237 237 """return the header of the filelogs chunk, v10 only has the filename"""
238 238 l = self._chunklength()
239 239 if not l:
240 240 return {}
241 241 fname = readexactly(self._stream, l)
242 242 return {'filename': fname}
243 243
244 244 def _deltaheader(self, headertuple, prevnode):
245 245 node, p1, p2, cs = headertuple
246 246 if prevnode is None:
247 247 deltabase = p1
248 248 else:
249 249 deltabase = prevnode
250 250 flags = 0
251 251 return node, p1, p2, deltabase, cs, flags
252 252
253 253 def deltachunk(self, prevnode):
254 254 l = self._chunklength()
255 255 if not l:
256 256 return {}
257 257 headerdata = readexactly(self._stream, self.deltaheadersize)
258 258 header = struct.unpack(self.deltaheader, headerdata)
259 259 delta = readexactly(self._stream, l - self.deltaheadersize)
260 260 node, p1, p2, deltabase, cs, flags = self._deltaheader(header, prevnode)
261 261 return {'node': node, 'p1': p1, 'p2': p2, 'cs': cs,
262 262 'deltabase': deltabase, 'delta': delta, 'flags': flags}
263 263
264 264 def getchunks(self):
265 265 """returns all the chunks contains in the bundle
266 266
267 267 Used when you need to forward the binary stream to a file or another
268 268 network API. To do so, it parse the changegroup data, otherwise it will
269 269 block in case of sshrepo because it don't know the end of the stream.
270 270 """
271 271 # an empty chunkgroup is the end of the changegroup
272 272 # a changegroup has at least 2 chunkgroups (changelog and manifest).
273 273 # after that, an empty chunkgroup is the end of the changegroup
274 274 empty = False
275 275 count = 0
276 276 while not empty or count <= 2:
277 277 empty = True
278 278 count += 1
279 279 while True:
280 280 chunk = getchunk(self)
281 281 if not chunk:
282 282 break
283 283 empty = False
284 284 yield chunkheader(len(chunk))
285 285 pos = 0
286 286 while pos < len(chunk):
287 287 next = pos + 2**20
288 288 yield chunk[pos:next]
289 289 pos = next
290 290 yield closechunk()
291 291
292 292 def _unpackmanifests(self, repo, revmap, trp, prog, numchanges):
293 293 # We know that we'll never have more manifests than we had
294 294 # changesets.
295 295 self.callback = prog(_('manifests'), numchanges)
296 296 # no need to check for empty manifest group here:
297 297 # if the result of the merge of 1 and 2 is the same in 3 and 4,
298 298 # no new manifest will be created and the manifest group will
299 299 # be empty during the pull
300 300 self.manifestheader()
301 301 repo.manifest.addgroup(self, revmap, trp)
302 302 repo.ui.progress(_('manifests'), None)
303 303
304 304 def apply(self, repo, srctype, url, emptyok=False,
305 305 targetphase=phases.draft, expectedtotal=None):
306 306 """Add the changegroup returned by source.read() to this repo.
307 307 srctype is a string like 'push', 'pull', or 'unbundle'. url is
308 308 the URL of the repo where this changegroup is coming from.
309 309
310 310 Return an integer summarizing the change to this repo:
311 311 - nothing changed or no source: 0
312 312 - more heads than before: 1+added heads (2..n)
313 313 - fewer heads than before: -1-removed heads (-2..-n)
314 314 - number of heads stays the same: 1
315 315 """
316 316 repo = repo.unfiltered()
317 317 def csmap(x):
318 318 repo.ui.debug("add changeset %s\n" % short(x))
319 319 return len(cl)
320 320
321 321 def revmap(x):
322 322 return cl.rev(x)
323 323
324 324 changesets = files = revisions = 0
325 325
326 326 tr = repo.transaction("\n".join([srctype, util.hidepassword(url)]))
327 327 try:
328 328 # The transaction could have been created before and already
329 329 # carries source information. In this case we use the top
330 330 # level data. We overwrite the argument because we need to use
331 331 # the top level value (if they exist) in this function.
332 332 srctype = tr.hookargs.setdefault('source', srctype)
333 333 url = tr.hookargs.setdefault('url', url)
334 334 repo.hook('prechangegroup', throw=True, **tr.hookargs)
335 335
336 336 # write changelog data to temp files so concurrent readers
337 337 # will not see an inconsistent view
338 338 cl = repo.changelog
339 339 cl.delayupdate(tr)
340 340 oldheads = cl.heads()
341 341
342 342 trp = weakref.proxy(tr)
343 343 # pull off the changeset group
344 344 repo.ui.status(_("adding changesets\n"))
345 345 clstart = len(cl)
346 346 class prog(object):
347 347 def __init__(self, step, total):
348 348 self._step = step
349 349 self._total = total
350 350 self._count = 1
351 351 def __call__(self):
352 352 repo.ui.progress(self._step, self._count, unit=_('chunks'),
353 353 total=self._total)
354 354 self._count += 1
355 355 self.callback = prog(_('changesets'), expectedtotal)
356 356
357 357 efiles = set()
358 358 def onchangelog(cl, node):
359 359 efiles.update(cl.read(node)[3])
360 360
361 361 self.changelogheader()
362 362 srccontent = cl.addgroup(self, csmap, trp,
363 363 addrevisioncb=onchangelog)
364 364 efiles = len(efiles)
365 365
366 366 if not (srccontent or emptyok):
367 367 raise error.Abort(_("received changelog group is empty"))
368 368 clend = len(cl)
369 369 changesets = clend - clstart
370 370 repo.ui.progress(_('changesets'), None)
371 371
372 372 # pull off the manifest group
373 373 repo.ui.status(_("adding manifests\n"))
374 374 self._unpackmanifests(repo, revmap, trp, prog, changesets)
375 375
376 376 needfiles = {}
377 377 if repo.ui.configbool('server', 'validate', default=False):
378 378 # validate incoming csets have their manifests
379 379 for cset in xrange(clstart, clend):
380 380 mfnode = repo.changelog.read(repo.changelog.node(cset))[0]
381 381 mfest = repo.manifest.readdelta(mfnode)
382 382 # store file nodes we must see
383 383 for f, n in mfest.iteritems():
384 384 needfiles.setdefault(f, set()).add(n)
385 385
386 386 # process the files
387 387 repo.ui.status(_("adding file changes\n"))
388 388 self.callback = None
389 389 pr = prog(_('files'), efiles)
390 390 newrevs, newfiles = _addchangegroupfiles(
391 391 repo, self, revmap, trp, pr, needfiles)
392 392 revisions += newrevs
393 393 files += newfiles
394 394
395 395 dh = 0
396 396 if oldheads:
397 397 heads = cl.heads()
398 398 dh = len(heads) - len(oldheads)
399 399 for h in heads:
400 400 if h not in oldheads and repo[h].closesbranch():
401 401 dh -= 1
402 402 htext = ""
403 403 if dh:
404 404 htext = _(" (%+d heads)") % dh
405 405
406 406 repo.ui.status(_("added %d changesets"
407 407 " with %d changes to %d files%s\n")
408 408 % (changesets, revisions, files, htext))
409 409 repo.invalidatevolatilesets()
410 410
411 411 if changesets > 0:
412 412 if 'node' not in tr.hookargs:
413 413 tr.hookargs['node'] = hex(cl.node(clstart))
414 414 tr.hookargs['node_last'] = hex(cl.node(clend - 1))
415 415 hookargs = dict(tr.hookargs)
416 416 else:
417 417 hookargs = dict(tr.hookargs)
418 418 hookargs['node'] = hex(cl.node(clstart))
419 419 hookargs['node_last'] = hex(cl.node(clend - 1))
420 420 repo.hook('pretxnchangegroup', throw=True, **hookargs)
421 421
422 422 added = [cl.node(r) for r in xrange(clstart, clend)]
423 423 publishing = repo.publishing()
424 424 if srctype in ('push', 'serve'):
425 425 # Old servers can not push the boundary themselves.
426 426 # New servers won't push the boundary if changeset already
427 427 # exists locally as secret
428 428 #
429 429 # We should not use added here but the list of all change in
430 430 # the bundle
431 431 if publishing:
432 432 phases.advanceboundary(repo, tr, phases.public, srccontent)
433 433 else:
434 434 # Those changesets have been pushed from the outside, their
435 435 # phases are going to be pushed alongside. Therefor
436 436 # `targetphase` is ignored.
437 437 phases.advanceboundary(repo, tr, phases.draft, srccontent)
438 438 phases.retractboundary(repo, tr, phases.draft, added)
439 439 elif srctype != 'strip':
440 440 # publishing only alter behavior during push
441 441 #
442 442 # strip should not touch boundary at all
443 443 phases.retractboundary(repo, tr, targetphase, added)
444 444
445 445 if changesets > 0:
446 446 if srctype != 'strip':
447 447 # During strip, branchcache is invalid but coming call to
448 448 # `destroyed` will repair it.
449 449 # In other case we can safely update cache on disk.
450 450 branchmap.updatecache(repo.filtered('served'))
451 451
452 452 def runhooks():
453 453 # These hooks run when the lock releases, not when the
454 454 # transaction closes. So it's possible for the changelog
455 455 # to have changed since we last saw it.
456 456 if clstart >= len(repo):
457 457 return
458 458
459 459 # forcefully update the on-disk branch cache
460 460 repo.ui.debug("updating the branch cache\n")
461 461 repo.hook("changegroup", **hookargs)
462 462
463 463 for n in added:
464 464 args = hookargs.copy()
465 465 args['node'] = hex(n)
466 466 del args['node_last']
467 467 repo.hook("incoming", **args)
468 468
469 469 newheads = [h for h in repo.heads() if h not in oldheads]
470 470 repo.ui.log("incoming",
471 471 "%s incoming changes - new heads: %s\n",
472 472 len(added),
473 473 ', '.join([hex(c[:6]) for c in newheads]))
474 474
475 475 tr.addpostclose('changegroup-runhooks-%020i' % clstart,
476 476 lambda tr: repo._afterlock(runhooks))
477 477
478 478 tr.close()
479 479
480 480 finally:
481 481 tr.release()
482 482 repo.ui.flush()
483 483 # never return 0 here:
484 484 if dh < 0:
485 485 return dh - 1
486 486 else:
487 487 return dh + 1
488 488
489 489 class cg2unpacker(cg1unpacker):
490 490 """Unpacker for cg2 streams.
491 491
492 492 cg2 streams add support for generaldelta, so the delta header
493 493 format is slightly different. All other features about the data
494 494 remain the same.
495 495 """
496 496 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
497 497 deltaheadersize = struct.calcsize(deltaheader)
498 498 version = '02'
499 499
500 500 def _deltaheader(self, headertuple, prevnode):
501 501 node, p1, p2, deltabase, cs = headertuple
502 502 flags = 0
503 503 return node, p1, p2, deltabase, cs, flags
504 504
505 505 class cg3unpacker(cg2unpacker):
506 506 """Unpacker for cg3 streams.
507 507
508 508 cg3 streams add support for exchanging treemanifests and revlog
509 flags, so the only changes from cg2 are the delta header and
510 version number.
509 flags. It adds the revlog flags to the delta header and an empty chunk
510 separating manifests and files.
511 511 """
512 512 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
513 513 deltaheadersize = struct.calcsize(deltaheader)
514 514 version = '03'
515 515
516 516 def _deltaheader(self, headertuple, prevnode):
517 517 node, p1, p2, deltabase, cs, flags = headertuple
518 518 return node, p1, p2, deltabase, cs, flags
519 519
520 520 class headerlessfixup(object):
521 521 def __init__(self, fh, h):
522 522 self._h = h
523 523 self._fh = fh
524 524 def read(self, n):
525 525 if self._h:
526 526 d, self._h = self._h[:n], self._h[n:]
527 527 if len(d) < n:
528 528 d += readexactly(self._fh, n - len(d))
529 529 return d
530 530 return readexactly(self._fh, n)
531 531
532 532 def _moddirs(files):
533 533 """Given a set of modified files, find the list of modified directories.
534 534
535 535 This returns a list of (path to changed dir, changed dir) tuples,
536 536 as that's what the one client needs anyway.
537 537
538 538 >>> _moddirs(['a/b/c.py', 'a/b/c.txt', 'a/d/e/f/g.txt', 'i.txt', ])
539 539 [('/', 'a/'), ('a/', 'b/'), ('a/', 'd/'), ('a/d/', 'e/'), ('a/d/e/', 'f/')]
540 540
541 541 """
542 542 alldirs = set()
543 543 for f in files:
544 544 path = f.split('/')[:-1]
545 545 for i in xrange(len(path) - 1, -1, -1):
546 546 dn = '/'.join(path[:i])
547 547 current = dn + '/', path[i] + '/'
548 548 if current in alldirs:
549 549 break
550 550 alldirs.add(current)
551 551 return sorted(alldirs)
552 552
553 553 class cg1packer(object):
554 554 deltaheader = _CHANGEGROUPV1_DELTA_HEADER
555 555 version = '01'
556 556 def __init__(self, repo, bundlecaps=None):
557 557 """Given a source repo, construct a bundler.
558 558
559 559 bundlecaps is optional and can be used to specify the set of
560 560 capabilities which can be used to build the bundle.
561 561 """
562 562 # Set of capabilities we can use to build the bundle.
563 563 if bundlecaps is None:
564 564 bundlecaps = set()
565 565 self._bundlecaps = bundlecaps
566 566 # experimental config: bundle.reorder
567 567 reorder = repo.ui.config('bundle', 'reorder', 'auto')
568 568 if reorder == 'auto':
569 569 reorder = None
570 570 else:
571 571 reorder = util.parsebool(reorder)
572 572 self._repo = repo
573 573 self._reorder = reorder
574 574 self._progress = repo.ui.progress
575 575 if self._repo.ui.verbose and not self._repo.ui.debugflag:
576 576 self._verbosenote = self._repo.ui.note
577 577 else:
578 578 self._verbosenote = lambda s: None
579 579
580 580 def close(self):
581 581 return closechunk()
582 582
583 583 def fileheader(self, fname):
584 584 return chunkheader(len(fname)) + fname
585 585
586 586 def group(self, nodelist, revlog, lookup, units=None):
587 587 """Calculate a delta group, yielding a sequence of changegroup chunks
588 588 (strings).
589 589
590 590 Given a list of changeset revs, return a set of deltas and
591 591 metadata corresponding to nodes. The first delta is
592 592 first parent(nodelist[0]) -> nodelist[0], the receiver is
593 593 guaranteed to have this parent as it has all history before
594 594 these changesets. In the case firstparent is nullrev the
595 595 changegroup starts with a full revision.
596 596
597 597 If units is not None, progress detail will be generated, units specifies
598 598 the type of revlog that is touched (changelog, manifest, etc.).
599 599 """
600 600 # if we don't have any revisions touched by these changesets, bail
601 601 if len(nodelist) == 0:
602 602 yield self.close()
603 603 return
604 604
605 605 # for generaldelta revlogs, we linearize the revs; this will both be
606 606 # much quicker and generate a much smaller bundle
607 607 if (revlog._generaldelta and self._reorder is None) or self._reorder:
608 608 dag = dagutil.revlogdag(revlog)
609 609 revs = set(revlog.rev(n) for n in nodelist)
610 610 revs = dag.linearize(revs)
611 611 else:
612 612 revs = sorted([revlog.rev(n) for n in nodelist])
613 613
614 614 # add the parent of the first rev
615 615 p = revlog.parentrevs(revs[0])[0]
616 616 revs.insert(0, p)
617 617
618 618 # build deltas
619 619 total = len(revs) - 1
620 620 msgbundling = _('bundling')
621 621 for r in xrange(len(revs) - 1):
622 622 if units is not None:
623 623 self._progress(msgbundling, r + 1, unit=units, total=total)
624 624 prev, curr = revs[r], revs[r + 1]
625 625 linknode = lookup(revlog.node(curr))
626 626 for c in self.revchunk(revlog, curr, prev, linknode):
627 627 yield c
628 628
629 629 if units is not None:
630 630 self._progress(msgbundling, None)
631 631 yield self.close()
632 632
633 633 # filter any nodes that claim to be part of the known set
634 634 def prune(self, revlog, missing, commonrevs):
635 635 rr, rl = revlog.rev, revlog.linkrev
636 636 return [n for n in missing if rl(rr(n)) not in commonrevs]
637 637
638 638 def _packmanifests(self, mfnodes, tmfnodes, lookuplinknode):
639 639 """Pack flat manifests into a changegroup stream."""
640 640 ml = self._repo.manifest
641 641 size = 0
642 642 for chunk in self.group(
643 643 mfnodes, ml, lookuplinknode, units=_('manifests')):
644 644 size += len(chunk)
645 645 yield chunk
646 646 self._verbosenote(_('%8.i (manifests)\n') % size)
647 647 # It looks odd to assert this here, but tmfnodes doesn't get
648 648 # filled in until after we've called lookuplinknode for
649 649 # sending root manifests, so the only way to tell the streams
650 650 # got crossed is to check after we've done all the work.
651 651 assert not tmfnodes
652 652
653 653 def generate(self, commonrevs, clnodes, fastpathlinkrev, source):
654 654 '''yield a sequence of changegroup chunks (strings)'''
655 655 repo = self._repo
656 656 cl = repo.changelog
657 657 ml = repo.manifest
658 658
659 659 clrevorder = {}
660 660 mfs = {} # needed manifests
661 661 tmfnodes = {}
662 662 fnodes = {} # needed file nodes
663 663 # maps manifest node id -> set(changed files)
664 664 mfchangedfiles = {}
665 665
666 666 # Callback for the changelog, used to collect changed files and manifest
667 667 # nodes.
668 668 # Returns the linkrev node (identity in the changelog case).
669 669 def lookupcl(x):
670 670 c = cl.read(x)
671 671 clrevorder[x] = len(clrevorder)
672 672 n = c[0]
673 673 # record the first changeset introducing this manifest version
674 674 mfs.setdefault(n, x)
675 675 # Record a complete list of potentially-changed files in
676 676 # this manifest.
677 677 mfchangedfiles.setdefault(n, set()).update(c[3])
678 678 return x
679 679
680 680 self._verbosenote(_('uncompressed size of bundle content:\n'))
681 681 size = 0
682 682 for chunk in self.group(clnodes, cl, lookupcl, units=_('changesets')):
683 683 size += len(chunk)
684 684 yield chunk
685 685 self._verbosenote(_('%8.i (changelog)\n') % size)
686 686
687 687 # We need to make sure that the linkrev in the changegroup refers to
688 688 # the first changeset that introduced the manifest or file revision.
689 689 # The fastpath is usually safer than the slowpath, because the filelogs
690 690 # are walked in revlog order.
691 691 #
692 692 # When taking the slowpath with reorder=None and the manifest revlog
693 693 # uses generaldelta, the manifest may be walked in the "wrong" order.
694 694 # Without 'clrevorder', we would get an incorrect linkrev (see fix in
695 695 # cc0ff93d0c0c).
696 696 #
697 697 # When taking the fastpath, we are only vulnerable to reordering
698 698 # of the changelog itself. The changelog never uses generaldelta, so
699 699 # it is only reordered when reorder=True. To handle this case, we
700 700 # simply take the slowpath, which already has the 'clrevorder' logic.
701 701 # This was also fixed in cc0ff93d0c0c.
702 702 fastpathlinkrev = fastpathlinkrev and not self._reorder
703 703 # Treemanifests don't work correctly with fastpathlinkrev
704 704 # either, because we don't discover which directory nodes to
705 705 # send along with files. This could probably be fixed.
706 706 fastpathlinkrev = fastpathlinkrev and (
707 707 'treemanifest' not in repo.requirements)
708 708 # Callback for the manifest, used to collect linkrevs for filelog
709 709 # revisions.
710 710 # Returns the linkrev node (collected in lookupcl).
711 711 if fastpathlinkrev:
712 712 lookupmflinknode = mfs.__getitem__
713 713 else:
714 714 def lookupmflinknode(x):
715 715 """Callback for looking up the linknode for manifests.
716 716
717 717 Returns the linkrev node for the specified manifest.
718 718
719 719 SIDE EFFECT:
720 720
721 721 1) fclnodes gets populated with the list of relevant
722 722 file nodes if we're not using fastpathlinkrev
723 723 2) When treemanifests are in use, collects treemanifest nodes
724 724 to send
725 725
726 726 Note that this means manifests must be completely sent to
727 727 the client before you can trust the list of files and
728 728 treemanifests to send.
729 729 """
730 730 clnode = mfs[x]
731 731 # We no longer actually care about reading deltas of
732 732 # the manifest here, because we already know the list
733 733 # of changed files, so for treemanifests (which
734 734 # lazily-load anyway to *generate* a readdelta) we can
735 735 # just load them with read() and then we'll actually
736 736 # be able to correctly load node IDs from the
737 737 # submanifest entries.
738 738 if 'treemanifest' in repo.requirements:
739 739 mdata = ml.read(x)
740 740 else:
741 741 mdata = ml.readfast(x)
742 742 for f in mfchangedfiles[x]:
743 743 try:
744 744 n = mdata[f]
745 745 except KeyError:
746 746 continue
747 747 # record the first changeset introducing this filelog
748 748 # version
749 749 fclnodes = fnodes.setdefault(f, {})
750 750 fclnode = fclnodes.setdefault(n, clnode)
751 751 if clrevorder[clnode] < clrevorder[fclnode]:
752 752 fclnodes[n] = clnode
753 753 # gather list of changed treemanifest nodes
754 754 if 'treemanifest' in repo.requirements:
755 755 submfs = {'/': mdata}
756 756 for dn, bn in _moddirs(mfchangedfiles[x]):
757 757 submf = submfs[dn]
758 758 submf = submf._dirs[bn]
759 759 submfs[submf.dir()] = submf
760 760 tmfclnodes = tmfnodes.setdefault(submf.dir(), {})
761 761 tmfclnodes.setdefault(submf._node, clnode)
762 762 if clrevorder[clnode] < clrevorder[fclnode]:
763 763 tmfclnodes[n] = clnode
764 764 return clnode
765 765
766 766 mfnodes = self.prune(ml, mfs, commonrevs)
767 767 for x in self._packmanifests(
768 768 mfnodes, tmfnodes, lookupmflinknode):
769 769 yield x
770 770
771 771 mfs.clear()
772 772 clrevs = set(cl.rev(x) for x in clnodes)
773 773
774 774 if not fastpathlinkrev:
775 775 def linknodes(unused, fname):
776 776 return fnodes.get(fname, {})
777 777 else:
778 778 cln = cl.node
779 779 def linknodes(filerevlog, fname):
780 780 llr = filerevlog.linkrev
781 781 fln = filerevlog.node
782 782 revs = ((r, llr(r)) for r in filerevlog)
783 783 return dict((fln(r), cln(lr)) for r, lr in revs if lr in clrevs)
784 784
785 785 changedfiles = set()
786 786 for x in mfchangedfiles.itervalues():
787 787 changedfiles.update(x)
788 788 for chunk in self.generatefiles(changedfiles, linknodes, commonrevs,
789 789 source):
790 790 yield chunk
791 791
792 792 yield self.close()
793 793
794 794 if clnodes:
795 795 repo.hook('outgoing', node=hex(clnodes[0]), source=source)
796 796
797 797 # The 'source' parameter is useful for extensions
798 798 def generatefiles(self, changedfiles, linknodes, commonrevs, source):
799 799 repo = self._repo
800 800 progress = self._progress
801 801 msgbundling = _('bundling')
802 802
803 803 total = len(changedfiles)
804 804 # for progress output
805 805 msgfiles = _('files')
806 806 for i, fname in enumerate(sorted(changedfiles)):
807 807 filerevlog = repo.file(fname)
808 808 if not filerevlog:
809 809 raise error.Abort(_("empty or missing revlog for %s") % fname)
810 810
811 811 linkrevnodes = linknodes(filerevlog, fname)
812 812 # Lookup for filenodes, we collected the linkrev nodes above in the
813 813 # fastpath case and with lookupmf in the slowpath case.
814 814 def lookupfilelog(x):
815 815 return linkrevnodes[x]
816 816
817 817 filenodes = self.prune(filerevlog, linkrevnodes, commonrevs)
818 818 if filenodes:
819 819 progress(msgbundling, i + 1, item=fname, unit=msgfiles,
820 820 total=total)
821 821 h = self.fileheader(fname)
822 822 size = len(h)
823 823 yield h
824 824 for chunk in self.group(filenodes, filerevlog, lookupfilelog):
825 825 size += len(chunk)
826 826 yield chunk
827 827 self._verbosenote(_('%8.i %s\n') % (size, fname))
828 828 progress(msgbundling, None)
829 829
830 830 def deltaparent(self, revlog, rev, p1, p2, prev):
831 831 return prev
832 832
833 833 def revchunk(self, revlog, rev, prev, linknode):
834 834 node = revlog.node(rev)
835 835 p1, p2 = revlog.parentrevs(rev)
836 836 base = self.deltaparent(revlog, rev, p1, p2, prev)
837 837
838 838 prefix = ''
839 839 if revlog.iscensored(base) or revlog.iscensored(rev):
840 840 try:
841 841 delta = revlog.revision(node)
842 842 except error.CensoredNodeError as e:
843 843 delta = e.tombstone
844 844 if base == nullrev:
845 845 prefix = mdiff.trivialdiffheader(len(delta))
846 846 else:
847 847 baselen = revlog.rawsize(base)
848 848 prefix = mdiff.replacediffheader(baselen, len(delta))
849 849 elif base == nullrev:
850 850 delta = revlog.revision(node)
851 851 prefix = mdiff.trivialdiffheader(len(delta))
852 852 else:
853 853 delta = revlog.revdiff(base, rev)
854 854 p1n, p2n = revlog.parents(node)
855 855 basenode = revlog.node(base)
856 856 flags = revlog.flags(rev)
857 857 meta = self.builddeltaheader(node, p1n, p2n, basenode, linknode, flags)
858 858 meta += prefix
859 859 l = len(meta) + len(delta)
860 860 yield chunkheader(l)
861 861 yield meta
862 862 yield delta
863 863 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
864 864 # do nothing with basenode, it is implicitly the previous one in HG10
865 865 # do nothing with flags, it is implicitly 0 for cg1 and cg2
866 866 return struct.pack(self.deltaheader, node, p1n, p2n, linknode)
867 867
868 868 class cg2packer(cg1packer):
869 869 version = '02'
870 870 deltaheader = _CHANGEGROUPV2_DELTA_HEADER
871 871
872 872 def __init__(self, repo, bundlecaps=None):
873 873 super(cg2packer, self).__init__(repo, bundlecaps)
874 874 if self._reorder is None:
875 875 # Since generaldelta is directly supported by cg2, reordering
876 876 # generally doesn't help, so we disable it by default (treating
877 877 # bundle.reorder=auto just like bundle.reorder=False).
878 878 self._reorder = False
879 879
880 880 def deltaparent(self, revlog, rev, p1, p2, prev):
881 881 dp = revlog.deltaparent(rev)
882 882 # avoid storing full revisions; pick prev in those cases
883 883 # also pick prev when we can't be sure remote has dp
884 884 if dp == nullrev or (dp != p1 and dp != p2 and dp != prev):
885 885 return prev
886 886 return dp
887 887
888 888 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
889 889 # Do nothing with flags, it is implicitly 0 in cg1 and cg2
890 890 return struct.pack(self.deltaheader, node, p1n, p2n, basenode, linknode)
891 891
892 892 class cg3packer(cg2packer):
893 893 version = '03'
894 894 deltaheader = _CHANGEGROUPV3_DELTA_HEADER
895 895
896 896 def _packmanifests(self, mfnodes, tmfnodes, lookuplinknode):
897 897 # Note that debug prints are super confusing in this code, as
898 898 # tmfnodes gets populated by the calls to lookuplinknode in
899 899 # the superclass's manifest packer. In the future we should
900 900 # probably see if we can refactor this somehow to be less
901 901 # confusing.
902 902 for x in super(cg3packer, self)._packmanifests(
903 903 mfnodes, {}, lookuplinknode):
904 904 yield x
905 905 dirlog = self._repo.manifest.dirlog
906 906 for name, nodes in tmfnodes.iteritems():
907 907 # For now, directory headers are simply file headers with
908 908 # a trailing '/' on the path (already in the name).
909 909 yield self.fileheader(name)
910 910 for chunk in self.group(nodes, dirlog(name), nodes.get):
911 911 yield chunk
912 yield self.close()
912 913
913 914 def builddeltaheader(self, node, p1n, p2n, basenode, linknode, flags):
914 915 return struct.pack(
915 916 self.deltaheader, node, p1n, p2n, basenode, linknode, flags)
916 917
917 918 _packermap = {'01': (cg1packer, cg1unpacker),
918 919 # cg2 adds support for exchanging generaldelta
919 920 '02': (cg2packer, cg2unpacker),
920 # cg3 adds support for exchanging treemanifests
921 # cg3 adds support for exchanging revlog flags and treemanifests
921 922 '03': (cg3packer, cg3unpacker),
922 923 }
923 924
924 925 def supportedversions(repo):
925 926 versions = _packermap.keys()
926 927 cg3 = ('treemanifest' in repo.requirements or
927 928 repo.ui.configbool('experimental', 'changegroup3') or
928 929 repo.ui.configbool('experimental', 'treemanifest'))
929 930 if not cg3:
930 931 versions.remove('03')
931 932 return versions
932 933
933 934 def getbundler(version, repo, bundlecaps=None):
934 935 assert version in supportedversions(repo)
935 936 return _packermap[version][0](repo, bundlecaps)
936 937
937 938 def getunbundler(version, fh, alg):
938 939 return _packermap[version][1](fh, alg)
939 940
940 941 def _changegroupinfo(repo, nodes, source):
941 942 if repo.ui.verbose or source == 'bundle':
942 943 repo.ui.status(_("%d changesets found\n") % len(nodes))
943 944 if repo.ui.debugflag:
944 945 repo.ui.debug("list of changesets:\n")
945 946 for node in nodes:
946 947 repo.ui.debug("%s\n" % hex(node))
947 948
948 949 def getsubsetraw(repo, outgoing, bundler, source, fastpath=False):
949 950 repo = repo.unfiltered()
950 951 commonrevs = outgoing.common
951 952 csets = outgoing.missing
952 953 heads = outgoing.missingheads
953 954 # We go through the fast path if we get told to, or if all (unfiltered
954 955 # heads have been requested (since we then know there all linkrevs will
955 956 # be pulled by the client).
956 957 heads.sort()
957 958 fastpathlinkrev = fastpath or (
958 959 repo.filtername is None and heads == sorted(repo.heads()))
959 960
960 961 repo.hook('preoutgoing', throw=True, source=source)
961 962 _changegroupinfo(repo, csets, source)
962 963 return bundler.generate(commonrevs, csets, fastpathlinkrev, source)
963 964
964 965 def getsubset(repo, outgoing, bundler, source, fastpath=False):
965 966 gengroup = getsubsetraw(repo, outgoing, bundler, source, fastpath)
966 967 return getunbundler(bundler.version, util.chunkbuffer(gengroup), None)
967 968
968 969 def changegroupsubset(repo, roots, heads, source, version='01'):
969 970 """Compute a changegroup consisting of all the nodes that are
970 971 descendants of any of the roots and ancestors of any of the heads.
971 972 Return a chunkbuffer object whose read() method will return
972 973 successive changegroup chunks.
973 974
974 975 It is fairly complex as determining which filenodes and which
975 976 manifest nodes need to be included for the changeset to be complete
976 977 is non-trivial.
977 978
978 979 Another wrinkle is doing the reverse, figuring out which changeset in
979 980 the changegroup a particular filenode or manifestnode belongs to.
980 981 """
981 982 cl = repo.changelog
982 983 if not roots:
983 984 roots = [nullid]
984 985 discbases = []
985 986 for n in roots:
986 987 discbases.extend([p for p in cl.parents(n) if p != nullid])
987 988 # TODO: remove call to nodesbetween.
988 989 csets, roots, heads = cl.nodesbetween(roots, heads)
989 990 included = set(csets)
990 991 discbases = [n for n in discbases if n not in included]
991 992 outgoing = discovery.outgoing(cl, discbases, heads)
992 993 bundler = getbundler(version, repo)
993 994 return getsubset(repo, outgoing, bundler, source)
994 995
995 996 def getlocalchangegroupraw(repo, source, outgoing, bundlecaps=None,
996 997 version='01'):
997 998 """Like getbundle, but taking a discovery.outgoing as an argument.
998 999
999 1000 This is only implemented for local repos and reuses potentially
1000 1001 precomputed sets in outgoing. Returns a raw changegroup generator."""
1001 1002 if not outgoing.missing:
1002 1003 return None
1003 1004 bundler = getbundler(version, repo, bundlecaps)
1004 1005 return getsubsetraw(repo, outgoing, bundler, source)
1005 1006
1006 1007 def getlocalchangegroup(repo, source, outgoing, bundlecaps=None,
1007 1008 version='01'):
1008 1009 """Like getbundle, but taking a discovery.outgoing as an argument.
1009 1010
1010 1011 This is only implemented for local repos and reuses potentially
1011 1012 precomputed sets in outgoing."""
1012 1013 if not outgoing.missing:
1013 1014 return None
1014 1015 bundler = getbundler(version, repo, bundlecaps)
1015 1016 return getsubset(repo, outgoing, bundler, source)
1016 1017
1017 1018 def computeoutgoing(repo, heads, common):
1018 1019 """Computes which revs are outgoing given a set of common
1019 1020 and a set of heads.
1020 1021
1021 1022 This is a separate function so extensions can have access to
1022 1023 the logic.
1023 1024
1024 1025 Returns a discovery.outgoing object.
1025 1026 """
1026 1027 cl = repo.changelog
1027 1028 if common:
1028 1029 hasnode = cl.hasnode
1029 1030 common = [n for n in common if hasnode(n)]
1030 1031 else:
1031 1032 common = [nullid]
1032 1033 if not heads:
1033 1034 heads = cl.heads()
1034 1035 return discovery.outgoing(cl, common, heads)
1035 1036
1036 1037 def getchangegroup(repo, source, heads=None, common=None, bundlecaps=None,
1037 1038 version='01'):
1038 1039 """Like changegroupsubset, but returns the set difference between the
1039 1040 ancestors of heads and the ancestors common.
1040 1041
1041 1042 If heads is None, use the local heads. If common is None, use [nullid].
1042 1043
1043 1044 The nodes in common might not all be known locally due to the way the
1044 1045 current discovery protocol works.
1045 1046 """
1046 1047 outgoing = computeoutgoing(repo, heads, common)
1047 1048 return getlocalchangegroup(repo, source, outgoing, bundlecaps=bundlecaps,
1048 1049 version=version)
1049 1050
1050 1051 def changegroup(repo, basenodes, source):
1051 1052 # to avoid a race we use changegroupsubset() (issue1320)
1052 1053 return changegroupsubset(repo, basenodes, repo.heads(), source)
1053 1054
1054 1055 def _addchangegroupfiles(repo, source, revmap, trp, pr, needfiles):
1055 1056 revisions = 0
1056 1057 files = 0
1058 submfsdone = False
1057 1059 while True:
1058 1060 chunkdata = source.filelogheader()
1059 1061 if not chunkdata:
1062 if source.version == "03" and not submfsdone:
1063 submfsdone = True
1064 continue
1060 1065 break
1061 1066 f = chunkdata["filename"]
1062 1067 repo.ui.debug("adding %s revisions\n" % f)
1063 1068 pr()
1064 1069 directory = (f[-1] == '/')
1065 1070 if directory:
1066 1071 # a directory using treemanifests
1067 1072 fl = repo.manifest.dirlog(f)
1068 1073 else:
1069 1074 fl = repo.file(f)
1070 1075 o = len(fl)
1071 1076 try:
1072 1077 if not fl.addgroup(source, revmap, trp):
1073 1078 raise error.Abort(_("received file revlog group is empty"))
1074 1079 except error.CensoredBaseError as e:
1075 1080 raise error.Abort(_("received delta base is censored: %s") % e)
1076 1081 if not directory:
1077 1082 revisions += len(fl) - o
1078 1083 files += 1
1079 1084 if f in needfiles:
1080 1085 needs = needfiles[f]
1081 1086 for new in xrange(o, len(fl)):
1082 1087 n = fl.node(new)
1083 1088 if n in needs:
1084 1089 needs.remove(n)
1085 1090 else:
1086 1091 raise error.Abort(
1087 1092 _("received spurious file revlog entry"))
1088 1093 if not needs:
1089 1094 del needfiles[f]
1090 1095 repo.ui.progress(_('files'), None)
1091 1096
1092 1097 for f, needs in needfiles.iteritems():
1093 1098 fl = repo.file(f)
1094 1099 for n in needs:
1095 1100 try:
1096 1101 fl.rev(n)
1097 1102 except error.LookupError:
1098 1103 raise error.Abort(
1099 1104 _('missing file data for %s:%s - run hg verify') %
1100 1105 (f, hex(n)))
1101 1106
1102 1107 return revisions, files
General Comments 0
You need to be logged in to leave comments. Login now