##// END OF EJS Templates
bundlerepo: drop the custom `rawdata` implementation...
marmoute -
r43091:5ba8c328 default
parent child Browse files
Show More
@@ -1,627 +1,624 b''
1 1 # bundlerepo.py - repository class for viewing uncompressed bundles
2 2 #
3 3 # Copyright 2006, 2007 Benoit Boissinot <bboissin@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 """Repository class for viewing uncompressed bundles.
9 9
10 10 This provides a read-only repository interface to bundles as if they
11 11 were part of the actual repository.
12 12 """
13 13
14 14 from __future__ import absolute_import
15 15
16 16 import os
17 17 import shutil
18 18
19 19 from .i18n import _
20 20 from .node import (
21 21 nullid,
22 22 nullrev
23 23 )
24 24
25 25 from . import (
26 26 bundle2,
27 27 changegroup,
28 28 changelog,
29 29 cmdutil,
30 30 discovery,
31 31 encoding,
32 32 error,
33 33 exchange,
34 34 filelog,
35 35 localrepo,
36 36 manifest,
37 37 mdiff,
38 38 node as nodemod,
39 39 pathutil,
40 40 phases,
41 41 pycompat,
42 42 revlog,
43 43 util,
44 44 vfs as vfsmod,
45 45 )
46 46
47 47 class bundlerevlog(revlog.revlog):
48 48 def __init__(self, opener, indexfile, cgunpacker, linkmapper):
49 49 # How it works:
50 50 # To retrieve a revision, we need to know the offset of the revision in
51 51 # the bundle (an unbundle object). We store this offset in the index
52 52 # (start). The base of the delta is stored in the base field.
53 53 #
54 54 # To differentiate a rev in the bundle from a rev in the revlog, we
55 55 # check revision against repotiprev.
56 56 opener = vfsmod.readonlyvfs(opener)
57 57 revlog.revlog.__init__(self, opener, indexfile)
58 58 self.bundle = cgunpacker
59 59 n = len(self)
60 60 self.repotiprev = n - 1
61 61 self.bundlerevs = set() # used by 'bundle()' revset expression
62 62 for deltadata in cgunpacker.deltaiter():
63 63 node, p1, p2, cs, deltabase, delta, flags = deltadata
64 64
65 65 size = len(delta)
66 66 start = cgunpacker.tell() - size
67 67
68 68 link = linkmapper(cs)
69 69 if node in self.nodemap:
70 70 # this can happen if two branches make the same change
71 71 self.bundlerevs.add(self.nodemap[node])
72 72 continue
73 73
74 74 for p in (p1, p2):
75 75 if p not in self.nodemap:
76 76 raise error.LookupError(p, self.indexfile,
77 77 _("unknown parent"))
78 78
79 79 if deltabase not in self.nodemap:
80 80 raise LookupError(deltabase, self.indexfile,
81 81 _('unknown delta base'))
82 82
83 83 baserev = self.rev(deltabase)
84 84 # start, size, full unc. size, base (unused), link, p1, p2, node
85 85 e = (revlog.offset_type(start, flags), size, -1, baserev, link,
86 86 self.rev(p1), self.rev(p2), node)
87 87 self.index.append(e)
88 88 self.nodemap[node] = n
89 89 self.bundlerevs.add(n)
90 90 n += 1
91 91
92 92 def _chunk(self, rev, df=None):
93 93 # Warning: in case of bundle, the diff is against what we stored as
94 94 # delta base, not against rev - 1
95 95 # XXX: could use some caching
96 96 if rev <= self.repotiprev:
97 97 return revlog.revlog._chunk(self, rev)
98 98 self.bundle.seek(self.start(rev))
99 99 return self.bundle.read(self.length(rev))
100 100
101 101 def revdiff(self, rev1, rev2):
102 102 """return or calculate a delta between two revisions"""
103 103 if rev1 > self.repotiprev and rev2 > self.repotiprev:
104 104 # hot path for bundle
105 105 revb = self.index[rev2][3]
106 106 if revb == rev1:
107 107 return self._chunk(rev2)
108 108 elif rev1 <= self.repotiprev and rev2 <= self.repotiprev:
109 109 return revlog.revlog.revdiff(self, rev1, rev2)
110 110
111 111 return mdiff.textdiff(self.rawdata(rev1),
112 112 self.rawdata(rev2))
113 113
114 114 def _rawtext(self, node, rev, _df=None):
115 115 if rev is None:
116 116 rev = self.rev(node)
117 117 validated = False
118 118 rawtext = None
119 119 chain = []
120 120 iterrev = rev
121 121 # reconstruct the revision if it is from a changegroup
122 122 while iterrev > self.repotiprev:
123 123 if self._revisioncache and self._revisioncache[1] == iterrev:
124 124 rawtext = self._revisioncache[2]
125 125 break
126 126 chain.append(iterrev)
127 127 iterrev = self.index[iterrev][3]
128 128 if iterrev == nullrev:
129 129 rawtext = ''
130 130 elif rawtext is None:
131 131 r = super(bundlerevlog, self)._rawtext(self.node(iterrev),
132 132 iterrev,
133 133 _df=_df)
134 134 __, rawtext, validated = r
135 135 if chain:
136 136 validated = False
137 137 while chain:
138 138 delta = self._chunk(chain.pop())
139 139 rawtext = mdiff.patches(rawtext, [delta])
140 140 return rev, rawtext, validated
141 141
142 def rawdata(self, nodeorrev, _df=None):
143 return self.revision(nodeorrev, _df=_df, raw=True)
144
145 142 def addrevision(self, *args, **kwargs):
146 143 raise NotImplementedError
147 144
148 145 def addgroup(self, *args, **kwargs):
149 146 raise NotImplementedError
150 147
151 148 def strip(self, *args, **kwargs):
152 149 raise NotImplementedError
153 150
154 151 def checksize(self):
155 152 raise NotImplementedError
156 153
157 154 class bundlechangelog(bundlerevlog, changelog.changelog):
158 155 def __init__(self, opener, cgunpacker):
159 156 changelog.changelog.__init__(self, opener)
160 157 linkmapper = lambda x: x
161 158 bundlerevlog.__init__(self, opener, self.indexfile, cgunpacker,
162 159 linkmapper)
163 160
164 161 class bundlemanifest(bundlerevlog, manifest.manifestrevlog):
165 162 def __init__(self, opener, cgunpacker, linkmapper, dirlogstarts=None,
166 163 dir=''):
167 164 manifest.manifestrevlog.__init__(self, opener, tree=dir)
168 165 bundlerevlog.__init__(self, opener, self.indexfile, cgunpacker,
169 166 linkmapper)
170 167 if dirlogstarts is None:
171 168 dirlogstarts = {}
172 169 if self.bundle.version == "03":
173 170 dirlogstarts = _getfilestarts(self.bundle)
174 171 self._dirlogstarts = dirlogstarts
175 172 self._linkmapper = linkmapper
176 173
177 174 def dirlog(self, d):
178 175 if d in self._dirlogstarts:
179 176 self.bundle.seek(self._dirlogstarts[d])
180 177 return bundlemanifest(
181 178 self.opener, self.bundle, self._linkmapper,
182 179 self._dirlogstarts, dir=d)
183 180 return super(bundlemanifest, self).dirlog(d)
184 181
185 182 class bundlefilelog(filelog.filelog):
186 183 def __init__(self, opener, path, cgunpacker, linkmapper):
187 184 filelog.filelog.__init__(self, opener, path)
188 185 self._revlog = bundlerevlog(opener, self.indexfile,
189 186 cgunpacker, linkmapper)
190 187
191 188 class bundlepeer(localrepo.localpeer):
192 189 def canpush(self):
193 190 return False
194 191
195 192 class bundlephasecache(phases.phasecache):
196 193 def __init__(self, *args, **kwargs):
197 194 super(bundlephasecache, self).__init__(*args, **kwargs)
198 195 if util.safehasattr(self, 'opener'):
199 196 self.opener = vfsmod.readonlyvfs(self.opener)
200 197
201 198 def write(self):
202 199 raise NotImplementedError
203 200
204 201 def _write(self, fp):
205 202 raise NotImplementedError
206 203
207 204 def _updateroots(self, phase, newroots, tr):
208 205 self.phaseroots[phase] = newroots
209 206 self.invalidate()
210 207 self.dirty = True
211 208
212 209 def _getfilestarts(cgunpacker):
213 210 filespos = {}
214 211 for chunkdata in iter(cgunpacker.filelogheader, {}):
215 212 fname = chunkdata['filename']
216 213 filespos[fname] = cgunpacker.tell()
217 214 for chunk in iter(lambda: cgunpacker.deltachunk(None), {}):
218 215 pass
219 216 return filespos
220 217
221 218 class bundlerepository(object):
222 219 """A repository instance that is a union of a local repo and a bundle.
223 220
224 221 Instances represent a read-only repository composed of a local repository
225 222 with the contents of a bundle file applied. The repository instance is
226 223 conceptually similar to the state of a repository after an
227 224 ``hg unbundle`` operation. However, the contents of the bundle are never
228 225 applied to the actual base repository.
229 226
230 227 Instances constructed directly are not usable as repository objects.
231 228 Use instance() or makebundlerepository() to create instances.
232 229 """
233 230 def __init__(self, bundlepath, url, tempparent):
234 231 self._tempparent = tempparent
235 232 self._url = url
236 233
237 234 self.ui.setconfig('phases', 'publish', False, 'bundlerepo')
238 235
239 236 self.tempfile = None
240 237 f = util.posixfile(bundlepath, "rb")
241 238 bundle = exchange.readbundle(self.ui, f, bundlepath)
242 239
243 240 if isinstance(bundle, bundle2.unbundle20):
244 241 self._bundlefile = bundle
245 242 self._cgunpacker = None
246 243
247 244 cgpart = None
248 245 for part in bundle.iterparts(seekable=True):
249 246 if part.type == 'changegroup':
250 247 if cgpart:
251 248 raise NotImplementedError("can't process "
252 249 "multiple changegroups")
253 250 cgpart = part
254 251
255 252 self._handlebundle2part(bundle, part)
256 253
257 254 if not cgpart:
258 255 raise error.Abort(_("No changegroups found"))
259 256
260 257 # This is required to placate a later consumer, which expects
261 258 # the payload offset to be at the beginning of the changegroup.
262 259 # We need to do this after the iterparts() generator advances
263 260 # because iterparts() will seek to end of payload after the
264 261 # generator returns control to iterparts().
265 262 cgpart.seek(0, os.SEEK_SET)
266 263
267 264 elif isinstance(bundle, changegroup.cg1unpacker):
268 265 if bundle.compressed():
269 266 f = self._writetempbundle(bundle.read, '.hg10un',
270 267 header='HG10UN')
271 268 bundle = exchange.readbundle(self.ui, f, bundlepath, self.vfs)
272 269
273 270 self._bundlefile = bundle
274 271 self._cgunpacker = bundle
275 272 else:
276 273 raise error.Abort(_('bundle type %s cannot be read') %
277 274 type(bundle))
278 275
279 276 # dict with the mapping 'filename' -> position in the changegroup.
280 277 self._cgfilespos = {}
281 278
282 279 self.firstnewrev = self.changelog.repotiprev + 1
283 280 phases.retractboundary(self, None, phases.draft,
284 281 [ctx.node() for ctx in self[self.firstnewrev:]])
285 282
286 283 def _handlebundle2part(self, bundle, part):
287 284 if part.type != 'changegroup':
288 285 return
289 286
290 287 cgstream = part
291 288 version = part.params.get('version', '01')
292 289 legalcgvers = changegroup.supportedincomingversions(self)
293 290 if version not in legalcgvers:
294 291 msg = _('Unsupported changegroup version: %s')
295 292 raise error.Abort(msg % version)
296 293 if bundle.compressed():
297 294 cgstream = self._writetempbundle(part.read, '.cg%sun' % version)
298 295
299 296 self._cgunpacker = changegroup.getunbundler(version, cgstream, 'UN')
300 297
301 298 def _writetempbundle(self, readfn, suffix, header=''):
302 299 """Write a temporary file to disk
303 300 """
304 301 fdtemp, temp = self.vfs.mkstemp(prefix="hg-bundle-",
305 302 suffix=suffix)
306 303 self.tempfile = temp
307 304
308 305 with os.fdopen(fdtemp, r'wb') as fptemp:
309 306 fptemp.write(header)
310 307 while True:
311 308 chunk = readfn(2**18)
312 309 if not chunk:
313 310 break
314 311 fptemp.write(chunk)
315 312
316 313 return self.vfs.open(self.tempfile, mode="rb")
317 314
318 315 @localrepo.unfilteredpropertycache
319 316 def _phasecache(self):
320 317 return bundlephasecache(self, self._phasedefaults)
321 318
322 319 @localrepo.unfilteredpropertycache
323 320 def changelog(self):
324 321 # consume the header if it exists
325 322 self._cgunpacker.changelogheader()
326 323 c = bundlechangelog(self.svfs, self._cgunpacker)
327 324 self.manstart = self._cgunpacker.tell()
328 325 return c
329 326
330 327 def _refreshchangelog(self):
331 328 # changelog for bundle repo are not filecache, this method is not
332 329 # applicable.
333 330 pass
334 331
335 332 @localrepo.unfilteredpropertycache
336 333 def manifestlog(self):
337 334 self._cgunpacker.seek(self.manstart)
338 335 # consume the header if it exists
339 336 self._cgunpacker.manifestheader()
340 337 linkmapper = self.unfiltered().changelog.rev
341 338 rootstore = bundlemanifest(self.svfs, self._cgunpacker, linkmapper)
342 339 self.filestart = self._cgunpacker.tell()
343 340
344 341 return manifest.manifestlog(self.svfs, self, rootstore,
345 342 self.narrowmatch())
346 343
347 344 def _consumemanifest(self):
348 345 """Consumes the manifest portion of the bundle, setting filestart so the
349 346 file portion can be read."""
350 347 self._cgunpacker.seek(self.manstart)
351 348 self._cgunpacker.manifestheader()
352 349 for delta in self._cgunpacker.deltaiter():
353 350 pass
354 351 self.filestart = self._cgunpacker.tell()
355 352
356 353 @localrepo.unfilteredpropertycache
357 354 def manstart(self):
358 355 self.changelog
359 356 return self.manstart
360 357
361 358 @localrepo.unfilteredpropertycache
362 359 def filestart(self):
363 360 self.manifestlog
364 361
365 362 # If filestart was not set by self.manifestlog, that means the
366 363 # manifestlog implementation did not consume the manifests from the
367 364 # changegroup (ex: it might be consuming trees from a separate bundle2
368 365 # part instead). So we need to manually consume it.
369 366 if r'filestart' not in self.__dict__:
370 367 self._consumemanifest()
371 368
372 369 return self.filestart
373 370
374 371 def url(self):
375 372 return self._url
376 373
377 374 def file(self, f):
378 375 if not self._cgfilespos:
379 376 self._cgunpacker.seek(self.filestart)
380 377 self._cgfilespos = _getfilestarts(self._cgunpacker)
381 378
382 379 if f in self._cgfilespos:
383 380 self._cgunpacker.seek(self._cgfilespos[f])
384 381 linkmapper = self.unfiltered().changelog.rev
385 382 return bundlefilelog(self.svfs, f, self._cgunpacker, linkmapper)
386 383 else:
387 384 return super(bundlerepository, self).file(f)
388 385
389 386 def close(self):
390 387 """Close assigned bundle file immediately."""
391 388 self._bundlefile.close()
392 389 if self.tempfile is not None:
393 390 self.vfs.unlink(self.tempfile)
394 391 if self._tempparent:
395 392 shutil.rmtree(self._tempparent, True)
396 393
397 394 def cancopy(self):
398 395 return False
399 396
400 397 def peer(self):
401 398 return bundlepeer(self)
402 399
403 400 def getcwd(self):
404 401 return encoding.getcwd() # always outside the repo
405 402
406 403 # Check if parents exist in localrepo before setting
407 404 def setparents(self, p1, p2=nullid):
408 405 p1rev = self.changelog.rev(p1)
409 406 p2rev = self.changelog.rev(p2)
410 407 msg = _("setting parent to node %s that only exists in the bundle\n")
411 408 if self.changelog.repotiprev < p1rev:
412 409 self.ui.warn(msg % nodemod.hex(p1))
413 410 if self.changelog.repotiprev < p2rev:
414 411 self.ui.warn(msg % nodemod.hex(p2))
415 412 return super(bundlerepository, self).setparents(p1, p2)
416 413
417 414 def instance(ui, path, create, intents=None, createopts=None):
418 415 if create:
419 416 raise error.Abort(_('cannot create new bundle repository'))
420 417 # internal config: bundle.mainreporoot
421 418 parentpath = ui.config("bundle", "mainreporoot")
422 419 if not parentpath:
423 420 # try to find the correct path to the working directory repo
424 421 parentpath = cmdutil.findrepo(encoding.getcwd())
425 422 if parentpath is None:
426 423 parentpath = ''
427 424 if parentpath:
428 425 # Try to make the full path relative so we get a nice, short URL.
429 426 # In particular, we don't want temp dir names in test outputs.
430 427 cwd = encoding.getcwd()
431 428 if parentpath == cwd:
432 429 parentpath = ''
433 430 else:
434 431 cwd = pathutil.normasprefix(cwd)
435 432 if parentpath.startswith(cwd):
436 433 parentpath = parentpath[len(cwd):]
437 434 u = util.url(path)
438 435 path = u.localpath()
439 436 if u.scheme == 'bundle':
440 437 s = path.split("+", 1)
441 438 if len(s) == 1:
442 439 repopath, bundlename = parentpath, s[0]
443 440 else:
444 441 repopath, bundlename = s
445 442 else:
446 443 repopath, bundlename = parentpath, path
447 444
448 445 return makebundlerepository(ui, repopath, bundlename)
449 446
450 447 def makebundlerepository(ui, repopath, bundlepath):
451 448 """Make a bundle repository object based on repo and bundle paths."""
452 449 if repopath:
453 450 url = 'bundle:%s+%s' % (util.expandpath(repopath), bundlepath)
454 451 else:
455 452 url = 'bundle:%s' % bundlepath
456 453
457 454 # Because we can't make any guarantees about the type of the base
458 455 # repository, we can't have a static class representing the bundle
459 456 # repository. We also can't make any guarantees about how to even
460 457 # call the base repository's constructor!
461 458 #
462 459 # So, our strategy is to go through ``localrepo.instance()`` to construct
463 460 # a repo instance. Then, we dynamically create a new type derived from
464 461 # both it and our ``bundlerepository`` class which overrides some
465 462 # functionality. We then change the type of the constructed repository
466 463 # to this new type and initialize the bundle-specific bits of it.
467 464
468 465 try:
469 466 repo = localrepo.instance(ui, repopath, create=False)
470 467 tempparent = None
471 468 except error.RepoError:
472 469 tempparent = pycompat.mkdtemp()
473 470 try:
474 471 repo = localrepo.instance(ui, tempparent, create=True)
475 472 except Exception:
476 473 shutil.rmtree(tempparent)
477 474 raise
478 475
479 476 class derivedbundlerepository(bundlerepository, repo.__class__):
480 477 pass
481 478
482 479 repo.__class__ = derivedbundlerepository
483 480 bundlerepository.__init__(repo, bundlepath, url, tempparent)
484 481
485 482 return repo
486 483
487 484 class bundletransactionmanager(object):
488 485 def transaction(self):
489 486 return None
490 487
491 488 def close(self):
492 489 raise NotImplementedError
493 490
494 491 def release(self):
495 492 raise NotImplementedError
496 493
497 494 def getremotechanges(ui, repo, peer, onlyheads=None, bundlename=None,
498 495 force=False):
499 496 '''obtains a bundle of changes incoming from peer
500 497
501 498 "onlyheads" restricts the returned changes to those reachable from the
502 499 specified heads.
503 500 "bundlename", if given, stores the bundle to this file path permanently;
504 501 otherwise it's stored to a temp file and gets deleted again when you call
505 502 the returned "cleanupfn".
506 503 "force" indicates whether to proceed on unrelated repos.
507 504
508 505 Returns a tuple (local, csets, cleanupfn):
509 506
510 507 "local" is a local repo from which to obtain the actual incoming
511 508 changesets; it is a bundlerepo for the obtained bundle when the
512 509 original "peer" is remote.
513 510 "csets" lists the incoming changeset node ids.
514 511 "cleanupfn" must be called without arguments when you're done processing
515 512 the changes; it closes both the original "peer" and the one returned
516 513 here.
517 514 '''
518 515 tmp = discovery.findcommonincoming(repo, peer, heads=onlyheads,
519 516 force=force)
520 517 common, incoming, rheads = tmp
521 518 if not incoming:
522 519 try:
523 520 if bundlename:
524 521 os.unlink(bundlename)
525 522 except OSError:
526 523 pass
527 524 return repo, [], peer.close
528 525
529 526 commonset = set(common)
530 527 rheads = [x for x in rheads if x not in commonset]
531 528
532 529 bundle = None
533 530 bundlerepo = None
534 531 localrepo = peer.local()
535 532 if bundlename or not localrepo:
536 533 # create a bundle (uncompressed if peer repo is not local)
537 534
538 535 # developer config: devel.legacy.exchange
539 536 legexc = ui.configlist('devel', 'legacy.exchange')
540 537 forcebundle1 = 'bundle2' not in legexc and 'bundle1' in legexc
541 538 canbundle2 = (not forcebundle1
542 539 and peer.capable('getbundle')
543 540 and peer.capable('bundle2'))
544 541 if canbundle2:
545 542 with peer.commandexecutor() as e:
546 543 b2 = e.callcommand('getbundle', {
547 544 'source': 'incoming',
548 545 'common': common,
549 546 'heads': rheads,
550 547 'bundlecaps': exchange.caps20to10(repo, role='client'),
551 548 'cg': True,
552 549 }).result()
553 550
554 551 fname = bundle = changegroup.writechunks(ui,
555 552 b2._forwardchunks(),
556 553 bundlename)
557 554 else:
558 555 if peer.capable('getbundle'):
559 556 with peer.commandexecutor() as e:
560 557 cg = e.callcommand('getbundle', {
561 558 'source': 'incoming',
562 559 'common': common,
563 560 'heads': rheads,
564 561 }).result()
565 562 elif onlyheads is None and not peer.capable('changegroupsubset'):
566 563 # compat with older servers when pulling all remote heads
567 564
568 565 with peer.commandexecutor() as e:
569 566 cg = e.callcommand('changegroup', {
570 567 'nodes': incoming,
571 568 'source': 'incoming',
572 569 }).result()
573 570
574 571 rheads = None
575 572 else:
576 573 with peer.commandexecutor() as e:
577 574 cg = e.callcommand('changegroupsubset', {
578 575 'bases': incoming,
579 576 'heads': rheads,
580 577 'source': 'incoming',
581 578 }).result()
582 579
583 580 if localrepo:
584 581 bundletype = "HG10BZ"
585 582 else:
586 583 bundletype = "HG10UN"
587 584 fname = bundle = bundle2.writebundle(ui, cg, bundlename,
588 585 bundletype)
589 586 # keep written bundle?
590 587 if bundlename:
591 588 bundle = None
592 589 if not localrepo:
593 590 # use the created uncompressed bundlerepo
594 591 localrepo = bundlerepo = makebundlerepository(repo. baseui,
595 592 repo.root,
596 593 fname)
597 594
598 595 # this repo contains local and peer now, so filter out local again
599 596 common = repo.heads()
600 597 if localrepo:
601 598 # Part of common may be remotely filtered
602 599 # So use an unfiltered version
603 600 # The discovery process probably need cleanup to avoid that
604 601 localrepo = localrepo.unfiltered()
605 602
606 603 csets = localrepo.changelog.findmissing(common, rheads)
607 604
608 605 if bundlerepo:
609 606 reponodes = [ctx.node() for ctx in bundlerepo[bundlerepo.firstnewrev:]]
610 607
611 608 with peer.commandexecutor() as e:
612 609 remotephases = e.callcommand('listkeys', {
613 610 'namespace': 'phases',
614 611 }).result()
615 612
616 613 pullop = exchange.pulloperation(bundlerepo, peer, heads=reponodes)
617 614 pullop.trmanager = bundletransactionmanager()
618 615 exchange._pullapplyphases(pullop, remotephases)
619 616
620 617 def cleanup():
621 618 if bundlerepo:
622 619 bundlerepo.close()
623 620 if bundle:
624 621 os.unlink(bundle)
625 622 peer.close()
626 623
627 624 return (localrepo, csets, cleanup)
General Comments 0
You need to be logged in to leave comments. Login now