##// END OF EJS Templates
bundlerepo: move temp bundle creation to a separate function...
Durham Goode -
r33888:702a26fe default
parent child Browse files
Show More
@@ -1,558 +1,558
1 1 # bundlerepo.py - repository class for viewing uncompressed bundles
2 2 #
3 3 # Copyright 2006, 2007 Benoit Boissinot <bboissin@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 """Repository class for viewing uncompressed bundles.
9 9
10 10 This provides a read-only repository interface to bundles as if they
11 11 were part of the actual repository.
12 12 """
13 13
14 14 from __future__ import absolute_import
15 15
16 16 import os
17 17 import shutil
18 18 import tempfile
19 19
20 20 from .i18n import _
21 21 from .node import nullid
22 22
23 23 from . import (
24 24 bundle2,
25 25 changegroup,
26 26 changelog,
27 27 cmdutil,
28 28 discovery,
29 29 error,
30 30 exchange,
31 31 filelog,
32 32 localrepo,
33 33 manifest,
34 34 mdiff,
35 35 node as nodemod,
36 36 pathutil,
37 37 phases,
38 38 pycompat,
39 39 revlog,
40 40 util,
41 41 vfs as vfsmod,
42 42 )
43 43
44 44 class bundlerevlog(revlog.revlog):
45 45 def __init__(self, opener, indexfile, bundle, linkmapper):
46 46 # How it works:
47 47 # To retrieve a revision, we need to know the offset of the revision in
48 48 # the bundle (an unbundle object). We store this offset in the index
49 49 # (start). The base of the delta is stored in the base field.
50 50 #
51 51 # To differentiate a rev in the bundle from a rev in the revlog, we
52 52 # check revision against repotiprev.
53 53 opener = vfsmod.readonlyvfs(opener)
54 54 revlog.revlog.__init__(self, opener, indexfile)
55 55 self.bundle = bundle
56 56 n = len(self)
57 57 self.repotiprev = n - 1
58 58 chain = None
59 59 self.bundlerevs = set() # used by 'bundle()' revset expression
60 60 getchunk = lambda: bundle.deltachunk(chain)
61 61 for chunkdata in iter(getchunk, {}):
62 62 node = chunkdata['node']
63 63 p1 = chunkdata['p1']
64 64 p2 = chunkdata['p2']
65 65 cs = chunkdata['cs']
66 66 deltabase = chunkdata['deltabase']
67 67 delta = chunkdata['delta']
68 68 flags = chunkdata['flags']
69 69
70 70 size = len(delta)
71 71 start = bundle.tell() - size
72 72
73 73 link = linkmapper(cs)
74 74 if node in self.nodemap:
75 75 # this can happen if two branches make the same change
76 76 chain = node
77 77 self.bundlerevs.add(self.nodemap[node])
78 78 continue
79 79
80 80 for p in (p1, p2):
81 81 if p not in self.nodemap:
82 82 raise error.LookupError(p, self.indexfile,
83 83 _("unknown parent"))
84 84
85 85 if deltabase not in self.nodemap:
86 86 raise LookupError(deltabase, self.indexfile,
87 87 _('unknown delta base'))
88 88
89 89 baserev = self.rev(deltabase)
90 90 # start, size, full unc. size, base (unused), link, p1, p2, node
91 91 e = (revlog.offset_type(start, flags), size, -1, baserev, link,
92 92 self.rev(p1), self.rev(p2), node)
93 93 self.index.insert(-1, e)
94 94 self.nodemap[node] = n
95 95 self.bundlerevs.add(n)
96 96 chain = node
97 97 n += 1
98 98
99 99 def _chunk(self, rev):
100 100 # Warning: in case of bundle, the diff is against what we stored as
101 101 # delta base, not against rev - 1
102 102 # XXX: could use some caching
103 103 if rev <= self.repotiprev:
104 104 return revlog.revlog._chunk(self, rev)
105 105 self.bundle.seek(self.start(rev))
106 106 return self.bundle.read(self.length(rev))
107 107
108 108 def revdiff(self, rev1, rev2):
109 109 """return or calculate a delta between two revisions"""
110 110 if rev1 > self.repotiprev and rev2 > self.repotiprev:
111 111 # hot path for bundle
112 112 revb = self.index[rev2][3]
113 113 if revb == rev1:
114 114 return self._chunk(rev2)
115 115 elif rev1 <= self.repotiprev and rev2 <= self.repotiprev:
116 116 return revlog.revlog.revdiff(self, rev1, rev2)
117 117
118 118 return mdiff.textdiff(self.revision(rev1, raw=True),
119 119 self.revision(rev2, raw=True))
120 120
121 121 def revision(self, nodeorrev, raw=False):
122 122 """return an uncompressed revision of a given node or revision
123 123 number.
124 124 """
125 125 if isinstance(nodeorrev, int):
126 126 rev = nodeorrev
127 127 node = self.node(rev)
128 128 else:
129 129 node = nodeorrev
130 130 rev = self.rev(node)
131 131
132 132 if node == nullid:
133 133 return ""
134 134
135 135 rawtext = None
136 136 chain = []
137 137 iterrev = rev
138 138 # reconstruct the revision if it is from a changegroup
139 139 while iterrev > self.repotiprev:
140 140 if self._cache and self._cache[1] == iterrev:
141 141 rawtext = self._cache[2]
142 142 break
143 143 chain.append(iterrev)
144 144 iterrev = self.index[iterrev][3]
145 145 if rawtext is None:
146 146 rawtext = self.baserevision(iterrev)
147 147
148 148 while chain:
149 149 delta = self._chunk(chain.pop())
150 150 rawtext = mdiff.patches(rawtext, [delta])
151 151
152 152 text, validatehash = self._processflags(rawtext, self.flags(rev),
153 153 'read', raw=raw)
154 154 if validatehash:
155 155 self.checkhash(text, node, rev=rev)
156 156 self._cache = (node, rev, rawtext)
157 157 return text
158 158
159 159 def baserevision(self, nodeorrev):
160 160 # Revlog subclasses may override 'revision' method to modify format of
161 161 # content retrieved from revlog. To use bundlerevlog with such class one
162 162 # needs to override 'baserevision' and make more specific call here.
163 163 return revlog.revlog.revision(self, nodeorrev, raw=True)
164 164
165 165 def addrevision(self, text, transaction, link, p1=None, p2=None, d=None):
166 166 raise NotImplementedError
167 167 def addgroup(self, revs, linkmapper, transaction):
168 168 raise NotImplementedError
169 169 def strip(self, rev, minlink):
170 170 raise NotImplementedError
171 171 def checksize(self):
172 172 raise NotImplementedError
173 173
174 174 class bundlechangelog(bundlerevlog, changelog.changelog):
175 175 def __init__(self, opener, bundle):
176 176 changelog.changelog.__init__(self, opener)
177 177 linkmapper = lambda x: x
178 178 bundlerevlog.__init__(self, opener, self.indexfile, bundle,
179 179 linkmapper)
180 180
181 181 def baserevision(self, nodeorrev):
182 182 # Although changelog doesn't override 'revision' method, some extensions
183 183 # may replace this class with another that does. Same story with
184 184 # manifest and filelog classes.
185 185
186 186 # This bypasses filtering on changelog.node() and rev() because we need
187 187 # revision text of the bundle base even if it is hidden.
188 188 oldfilter = self.filteredrevs
189 189 try:
190 190 self.filteredrevs = ()
191 191 return changelog.changelog.revision(self, nodeorrev, raw=True)
192 192 finally:
193 193 self.filteredrevs = oldfilter
194 194
195 195 class bundlemanifest(bundlerevlog, manifest.manifestrevlog):
196 196 def __init__(self, opener, bundle, linkmapper, dirlogstarts=None, dir=''):
197 197 manifest.manifestrevlog.__init__(self, opener, dir=dir)
198 198 bundlerevlog.__init__(self, opener, self.indexfile, bundle,
199 199 linkmapper)
200 200 if dirlogstarts is None:
201 201 dirlogstarts = {}
202 202 if self.bundle.version == "03":
203 203 dirlogstarts = _getfilestarts(self.bundle)
204 204 self._dirlogstarts = dirlogstarts
205 205 self._linkmapper = linkmapper
206 206
207 207 def baserevision(self, nodeorrev):
208 208 node = nodeorrev
209 209 if isinstance(node, int):
210 210 node = self.node(node)
211 211
212 212 if node in self.fulltextcache:
213 213 result = '%s' % self.fulltextcache[node]
214 214 else:
215 215 result = manifest.manifestrevlog.revision(self, nodeorrev, raw=True)
216 216 return result
217 217
218 218 def dirlog(self, d):
219 219 if d in self._dirlogstarts:
220 220 self.bundle.seek(self._dirlogstarts[d])
221 221 return bundlemanifest(
222 222 self.opener, self.bundle, self._linkmapper,
223 223 self._dirlogstarts, dir=d)
224 224 return super(bundlemanifest, self).dirlog(d)
225 225
226 226 class bundlefilelog(bundlerevlog, filelog.filelog):
227 227 def __init__(self, opener, path, bundle, linkmapper):
228 228 filelog.filelog.__init__(self, opener, path)
229 229 bundlerevlog.__init__(self, opener, self.indexfile, bundle,
230 230 linkmapper)
231 231
232 232 def baserevision(self, nodeorrev):
233 233 return filelog.filelog.revision(self, nodeorrev, raw=True)
234 234
235 235 class bundlepeer(localrepo.localpeer):
236 236 def canpush(self):
237 237 return False
238 238
239 239 class bundlephasecache(phases.phasecache):
240 240 def __init__(self, *args, **kwargs):
241 241 super(bundlephasecache, self).__init__(*args, **kwargs)
242 242 if util.safehasattr(self, 'opener'):
243 243 self.opener = vfsmod.readonlyvfs(self.opener)
244 244
245 245 def write(self):
246 246 raise NotImplementedError
247 247
248 248 def _write(self, fp):
249 249 raise NotImplementedError
250 250
251 251 def _updateroots(self, phase, newroots, tr):
252 252 self.phaseroots[phase] = newroots
253 253 self.invalidate()
254 254 self.dirty = True
255 255
256 256 def _getfilestarts(bundle):
257 257 bundlefilespos = {}
258 258 for chunkdata in iter(bundle.filelogheader, {}):
259 259 fname = chunkdata['filename']
260 260 bundlefilespos[fname] = bundle.tell()
261 261 for chunk in iter(lambda: bundle.deltachunk(None), {}):
262 262 pass
263 263 return bundlefilespos
264 264
265 265 class bundlerepository(localrepo.localrepository):
266 266 def __init__(self, ui, path, bundlename):
267 def _writetempbundle(read, suffix, header=''):
268 """Write a temporary file to disk
269
270 This is closure because we need to make sure this tracked by
271 self.tempfile for cleanup purposes."""
272 fdtemp, temp = self.vfs.mkstemp(prefix="hg-bundle-",
273 suffix=".hg10un")
274 self.tempfile = temp
275
276 with os.fdopen(fdtemp, pycompat.sysstr('wb')) as fptemp:
277 fptemp.write(header)
278 while True:
279 chunk = read(2**18)
280 if not chunk:
281 break
282 fptemp.write(chunk)
283
284 return self.vfs.open(self.tempfile, mode="rb")
285 267 self._tempparent = None
286 268 try:
287 269 localrepo.localrepository.__init__(self, ui, path)
288 270 except error.RepoError:
289 271 self._tempparent = tempfile.mkdtemp()
290 272 localrepo.instance(ui, self._tempparent, 1)
291 273 localrepo.localrepository.__init__(self, ui, self._tempparent)
292 274 self.ui.setconfig('phases', 'publish', False, 'bundlerepo')
293 275
294 276 if path:
295 277 self._url = 'bundle:' + util.expandpath(path) + '+' + bundlename
296 278 else:
297 279 self._url = 'bundle:' + bundlename
298 280
299 281 self.tempfile = None
300 282 f = util.posixfile(bundlename, "rb")
301 283 self.bundlefile = self.bundle = exchange.readbundle(ui, f, bundlename)
302 284
303 285 if isinstance(self.bundle, bundle2.unbundle20):
304 286 cgstream = None
305 287 for part in self.bundle.iterparts():
306 288 if part.type == 'changegroup':
307 289 if cgstream is not None:
308 290 raise NotImplementedError("can't process "
309 291 "multiple changegroups")
310 292 cgstream = part
311 293 version = part.params.get('version', '01')
312 294 legalcgvers = changegroup.supportedincomingversions(self)
313 295 if version not in legalcgvers:
314 296 msg = _('Unsupported changegroup version: %s')
315 297 raise error.Abort(msg % version)
316 298 if self.bundle.compressed():
317 cgstream = _writetempbundle(part.read,
299 cgstream = self._writetempbundle(part.read,
318 300 ".cg%sun" % version)
319 301
320 302 if cgstream is None:
321 303 raise error.Abort(_('No changegroups found'))
322 304 cgstream.seek(0)
323 305
324 306 self.bundle = changegroup.getunbundler(version, cgstream, 'UN')
325 307
326 308 elif self.bundle.compressed():
327 f = _writetempbundle(self.bundle.read, '.hg10un', header='HG10UN')
309 f = self._writetempbundle(self.bundle.read, '.hg10un',
310 header='HG10UN')
328 311 self.bundlefile = self.bundle = exchange.readbundle(ui, f,
329 312 bundlename,
330 313 self.vfs)
331 314
332 315 # dict with the mapping 'filename' -> position in the bundle
333 316 self.bundlefilespos = {}
334 317
335 318 self.firstnewrev = self.changelog.repotiprev + 1
336 319 phases.retractboundary(self, None, phases.draft,
337 320 [ctx.node() for ctx in self[self.firstnewrev:]])
338 321
322 def _writetempbundle(self, readfn, suffix, header=''):
323 """Write a temporary file to disk
324 """
325 fdtemp, temp = self.vfs.mkstemp(prefix="hg-bundle-",
326 suffix=".hg10un")
327 self.tempfile = temp
328
329 with os.fdopen(fdtemp, pycompat.sysstr('wb')) as fptemp:
330 fptemp.write(header)
331 while True:
332 chunk = readfn(2**18)
333 if not chunk:
334 break
335 fptemp.write(chunk)
336
337 return self.vfs.open(self.tempfile, mode="rb")
338
339 339 @localrepo.unfilteredpropertycache
340 340 def _phasecache(self):
341 341 return bundlephasecache(self, self._phasedefaults)
342 342
343 343 @localrepo.unfilteredpropertycache
344 344 def changelog(self):
345 345 # consume the header if it exists
346 346 self.bundle.changelogheader()
347 347 c = bundlechangelog(self.svfs, self.bundle)
348 348 self.manstart = self.bundle.tell()
349 349 return c
350 350
351 351 def _constructmanifest(self):
352 352 self.bundle.seek(self.manstart)
353 353 # consume the header if it exists
354 354 self.bundle.manifestheader()
355 355 linkmapper = self.unfiltered().changelog.rev
356 356 m = bundlemanifest(self.svfs, self.bundle, linkmapper)
357 357 self.filestart = self.bundle.tell()
358 358 return m
359 359
360 360 @localrepo.unfilteredpropertycache
361 361 def manstart(self):
362 362 self.changelog
363 363 return self.manstart
364 364
365 365 @localrepo.unfilteredpropertycache
366 366 def filestart(self):
367 367 self.manifestlog
368 368 return self.filestart
369 369
370 370 def url(self):
371 371 return self._url
372 372
373 373 def file(self, f):
374 374 if not self.bundlefilespos:
375 375 self.bundle.seek(self.filestart)
376 376 self.bundlefilespos = _getfilestarts(self.bundle)
377 377
378 378 if f in self.bundlefilespos:
379 379 self.bundle.seek(self.bundlefilespos[f])
380 380 linkmapper = self.unfiltered().changelog.rev
381 381 return bundlefilelog(self.svfs, f, self.bundle, linkmapper)
382 382 else:
383 383 return filelog.filelog(self.svfs, f)
384 384
385 385 def close(self):
386 386 """Close assigned bundle file immediately."""
387 387 self.bundlefile.close()
388 388 if self.tempfile is not None:
389 389 self.vfs.unlink(self.tempfile)
390 390 if self._tempparent:
391 391 shutil.rmtree(self._tempparent, True)
392 392
393 393 def cancopy(self):
394 394 return False
395 395
396 396 def peer(self):
397 397 return bundlepeer(self)
398 398
399 399 def getcwd(self):
400 400 return pycompat.getcwd() # always outside the repo
401 401
402 402 # Check if parents exist in localrepo before setting
403 403 def setparents(self, p1, p2=nullid):
404 404 p1rev = self.changelog.rev(p1)
405 405 p2rev = self.changelog.rev(p2)
406 406 msg = _("setting parent to node %s that only exists in the bundle\n")
407 407 if self.changelog.repotiprev < p1rev:
408 408 self.ui.warn(msg % nodemod.hex(p1))
409 409 if self.changelog.repotiprev < p2rev:
410 410 self.ui.warn(msg % nodemod.hex(p2))
411 411 return super(bundlerepository, self).setparents(p1, p2)
412 412
413 413 def instance(ui, path, create):
414 414 if create:
415 415 raise error.Abort(_('cannot create new bundle repository'))
416 416 # internal config: bundle.mainreporoot
417 417 parentpath = ui.config("bundle", "mainreporoot")
418 418 if not parentpath:
419 419 # try to find the correct path to the working directory repo
420 420 parentpath = cmdutil.findrepo(pycompat.getcwd())
421 421 if parentpath is None:
422 422 parentpath = ''
423 423 if parentpath:
424 424 # Try to make the full path relative so we get a nice, short URL.
425 425 # In particular, we don't want temp dir names in test outputs.
426 426 cwd = pycompat.getcwd()
427 427 if parentpath == cwd:
428 428 parentpath = ''
429 429 else:
430 430 cwd = pathutil.normasprefix(cwd)
431 431 if parentpath.startswith(cwd):
432 432 parentpath = parentpath[len(cwd):]
433 433 u = util.url(path)
434 434 path = u.localpath()
435 435 if u.scheme == 'bundle':
436 436 s = path.split("+", 1)
437 437 if len(s) == 1:
438 438 repopath, bundlename = parentpath, s[0]
439 439 else:
440 440 repopath, bundlename = s
441 441 else:
442 442 repopath, bundlename = parentpath, path
443 443 return bundlerepository(ui, repopath, bundlename)
444 444
445 445 class bundletransactionmanager(object):
446 446 def transaction(self):
447 447 return None
448 448
449 449 def close(self):
450 450 raise NotImplementedError
451 451
452 452 def release(self):
453 453 raise NotImplementedError
454 454
455 455 def getremotechanges(ui, repo, other, onlyheads=None, bundlename=None,
456 456 force=False):
457 457 '''obtains a bundle of changes incoming from other
458 458
459 459 "onlyheads" restricts the returned changes to those reachable from the
460 460 specified heads.
461 461 "bundlename", if given, stores the bundle to this file path permanently;
462 462 otherwise it's stored to a temp file and gets deleted again when you call
463 463 the returned "cleanupfn".
464 464 "force" indicates whether to proceed on unrelated repos.
465 465
466 466 Returns a tuple (local, csets, cleanupfn):
467 467
468 468 "local" is a local repo from which to obtain the actual incoming
469 469 changesets; it is a bundlerepo for the obtained bundle when the
470 470 original "other" is remote.
471 471 "csets" lists the incoming changeset node ids.
472 472 "cleanupfn" must be called without arguments when you're done processing
473 473 the changes; it closes both the original "other" and the one returned
474 474 here.
475 475 '''
476 476 tmp = discovery.findcommonincoming(repo, other, heads=onlyheads,
477 477 force=force)
478 478 common, incoming, rheads = tmp
479 479 if not incoming:
480 480 try:
481 481 if bundlename:
482 482 os.unlink(bundlename)
483 483 except OSError:
484 484 pass
485 485 return repo, [], other.close
486 486
487 487 commonset = set(common)
488 488 rheads = [x for x in rheads if x not in commonset]
489 489
490 490 bundle = None
491 491 bundlerepo = None
492 492 localrepo = other.local()
493 493 if bundlename or not localrepo:
494 494 # create a bundle (uncompressed if other repo is not local)
495 495
496 496 # developer config: devel.legacy.exchange
497 497 legexc = ui.configlist('devel', 'legacy.exchange')
498 498 forcebundle1 = 'bundle2' not in legexc and 'bundle1' in legexc
499 499 canbundle2 = (not forcebundle1
500 500 and other.capable('getbundle')
501 501 and other.capable('bundle2'))
502 502 if canbundle2:
503 503 kwargs = {}
504 504 kwargs['common'] = common
505 505 kwargs['heads'] = rheads
506 506 kwargs['bundlecaps'] = exchange.caps20to10(repo)
507 507 kwargs['cg'] = True
508 508 b2 = other.getbundle('incoming', **kwargs)
509 509 fname = bundle = changegroup.writechunks(ui, b2._forwardchunks(),
510 510 bundlename)
511 511 else:
512 512 if other.capable('getbundle'):
513 513 cg = other.getbundle('incoming', common=common, heads=rheads)
514 514 elif onlyheads is None and not other.capable('changegroupsubset'):
515 515 # compat with older servers when pulling all remote heads
516 516 cg = other.changegroup(incoming, "incoming")
517 517 rheads = None
518 518 else:
519 519 cg = other.changegroupsubset(incoming, rheads, 'incoming')
520 520 if localrepo:
521 521 bundletype = "HG10BZ"
522 522 else:
523 523 bundletype = "HG10UN"
524 524 fname = bundle = bundle2.writebundle(ui, cg, bundlename,
525 525 bundletype)
526 526 # keep written bundle?
527 527 if bundlename:
528 528 bundle = None
529 529 if not localrepo:
530 530 # use the created uncompressed bundlerepo
531 531 localrepo = bundlerepo = bundlerepository(repo.baseui, repo.root,
532 532 fname)
533 533 # this repo contains local and other now, so filter out local again
534 534 common = repo.heads()
535 535 if localrepo:
536 536 # Part of common may be remotely filtered
537 537 # So use an unfiltered version
538 538 # The discovery process probably need cleanup to avoid that
539 539 localrepo = localrepo.unfiltered()
540 540
541 541 csets = localrepo.changelog.findmissing(common, rheads)
542 542
543 543 if bundlerepo:
544 544 reponodes = [ctx.node() for ctx in bundlerepo[bundlerepo.firstnewrev:]]
545 545 remotephases = other.listkeys('phases')
546 546
547 547 pullop = exchange.pulloperation(bundlerepo, other, heads=reponodes)
548 548 pullop.trmanager = bundletransactionmanager()
549 549 exchange._pullapplyphases(pullop, remotephases)
550 550
551 551 def cleanup():
552 552 if bundlerepo:
553 553 bundlerepo.close()
554 554 if bundle:
555 555 os.unlink(bundle)
556 556 other.close()
557 557
558 558 return (localrepo, csets, cleanup)
General Comments 0
You need to be logged in to leave comments. Login now