##// END OF EJS Templates
bundlerepo: use context manager for file I/O in _writetempbundle
Bryan O'Sullivan -
r27776:6fe2da48 default
parent child Browse files
Show More
@@ -1,532 +1,529 b''
1 1 # bundlerepo.py - repository class for viewing uncompressed bundles
2 2 #
3 3 # Copyright 2006, 2007 Benoit Boissinot <bboissin@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 """Repository class for viewing uncompressed bundles.
9 9
10 10 This provides a read-only repository interface to bundles as if they
11 11 were part of the actual repository.
12 12 """
13 13
14 14 from __future__ import absolute_import
15 15
16 16 import os
17 17 import shutil
18 18 import tempfile
19 19
20 20 from .i18n import _
21 21 from .node import nullid
22 22
23 23 from . import (
24 24 bundle2,
25 25 changegroup,
26 26 changelog,
27 27 cmdutil,
28 28 discovery,
29 29 error,
30 30 exchange,
31 31 filelog,
32 32 localrepo,
33 33 manifest,
34 34 mdiff,
35 35 pathutil,
36 36 phases,
37 37 revlog,
38 38 scmutil,
39 39 util,
40 40 )
41 41
42 42 class bundlerevlog(revlog.revlog):
43 43 def __init__(self, opener, indexfile, bundle, linkmapper):
44 44 # How it works:
45 45 # To retrieve a revision, we need to know the offset of the revision in
46 46 # the bundle (an unbundle object). We store this offset in the index
47 47 # (start). The base of the delta is stored in the base field.
48 48 #
49 49 # To differentiate a rev in the bundle from a rev in the revlog, we
50 50 # check revision against repotiprev.
51 51 opener = scmutil.readonlyvfs(opener)
52 52 revlog.revlog.__init__(self, opener, indexfile)
53 53 self.bundle = bundle
54 54 n = len(self)
55 55 self.repotiprev = n - 1
56 56 chain = None
57 57 self.bundlerevs = set() # used by 'bundle()' revset expression
58 58 while True:
59 59 chunkdata = bundle.deltachunk(chain)
60 60 if not chunkdata:
61 61 break
62 62 node = chunkdata['node']
63 63 p1 = chunkdata['p1']
64 64 p2 = chunkdata['p2']
65 65 cs = chunkdata['cs']
66 66 deltabase = chunkdata['deltabase']
67 67 delta = chunkdata['delta']
68 68
69 69 size = len(delta)
70 70 start = bundle.tell() - size
71 71
72 72 link = linkmapper(cs)
73 73 if node in self.nodemap:
74 74 # this can happen if two branches make the same change
75 75 chain = node
76 76 self.bundlerevs.add(self.nodemap[node])
77 77 continue
78 78
79 79 for p in (p1, p2):
80 80 if p not in self.nodemap:
81 81 raise error.LookupError(p, self.indexfile,
82 82 _("unknown parent"))
83 83
84 84 if deltabase not in self.nodemap:
85 85 raise LookupError(deltabase, self.indexfile,
86 86 _('unknown delta base'))
87 87
88 88 baserev = self.rev(deltabase)
89 89 # start, size, full unc. size, base (unused), link, p1, p2, node
90 90 e = (revlog.offset_type(start, 0), size, -1, baserev, link,
91 91 self.rev(p1), self.rev(p2), node)
92 92 self.index.insert(-1, e)
93 93 self.nodemap[node] = n
94 94 self.bundlerevs.add(n)
95 95 chain = node
96 96 n += 1
97 97
98 98 def _chunk(self, rev):
99 99 # Warning: in case of bundle, the diff is against what we stored as
100 100 # delta base, not against rev - 1
101 101 # XXX: could use some caching
102 102 if rev <= self.repotiprev:
103 103 return revlog.revlog._chunk(self, rev)
104 104 self.bundle.seek(self.start(rev))
105 105 return self.bundle.read(self.length(rev))
106 106
107 107 def revdiff(self, rev1, rev2):
108 108 """return or calculate a delta between two revisions"""
109 109 if rev1 > self.repotiprev and rev2 > self.repotiprev:
110 110 # hot path for bundle
111 111 revb = self.index[rev2][3]
112 112 if revb == rev1:
113 113 return self._chunk(rev2)
114 114 elif rev1 <= self.repotiprev and rev2 <= self.repotiprev:
115 115 return revlog.revlog.revdiff(self, rev1, rev2)
116 116
117 117 return mdiff.textdiff(self.revision(self.node(rev1)),
118 118 self.revision(self.node(rev2)))
119 119
120 120 def revision(self, nodeorrev):
121 121 """return an uncompressed revision of a given node or revision
122 122 number.
123 123 """
124 124 if isinstance(nodeorrev, int):
125 125 rev = nodeorrev
126 126 node = self.node(rev)
127 127 else:
128 128 node = nodeorrev
129 129 rev = self.rev(node)
130 130
131 131 if node == nullid:
132 132 return ""
133 133
134 134 text = None
135 135 chain = []
136 136 iterrev = rev
137 137 # reconstruct the revision if it is from a changegroup
138 138 while iterrev > self.repotiprev:
139 139 if self._cache and self._cache[1] == iterrev:
140 140 text = self._cache[2]
141 141 break
142 142 chain.append(iterrev)
143 143 iterrev = self.index[iterrev][3]
144 144 if text is None:
145 145 text = self.baserevision(iterrev)
146 146
147 147 while chain:
148 148 delta = self._chunk(chain.pop())
149 149 text = mdiff.patches(text, [delta])
150 150
151 151 self._checkhash(text, node, rev)
152 152 self._cache = (node, rev, text)
153 153 return text
154 154
155 155 def baserevision(self, nodeorrev):
156 156 # Revlog subclasses may override 'revision' method to modify format of
157 157 # content retrieved from revlog. To use bundlerevlog with such class one
158 158 # needs to override 'baserevision' and make more specific call here.
159 159 return revlog.revlog.revision(self, nodeorrev)
160 160
161 161 def addrevision(self, text, transaction, link, p1=None, p2=None, d=None):
162 162 raise NotImplementedError
163 163 def addgroup(self, revs, linkmapper, transaction):
164 164 raise NotImplementedError
165 165 def strip(self, rev, minlink):
166 166 raise NotImplementedError
167 167 def checksize(self):
168 168 raise NotImplementedError
169 169
170 170 class bundlechangelog(bundlerevlog, changelog.changelog):
171 171 def __init__(self, opener, bundle):
172 172 changelog.changelog.__init__(self, opener)
173 173 linkmapper = lambda x: x
174 174 bundlerevlog.__init__(self, opener, self.indexfile, bundle,
175 175 linkmapper)
176 176
177 177 def baserevision(self, nodeorrev):
178 178 # Although changelog doesn't override 'revision' method, some extensions
179 179 # may replace this class with another that does. Same story with
180 180 # manifest and filelog classes.
181 181
182 182 # This bypasses filtering on changelog.node() and rev() because we need
183 183 # revision text of the bundle base even if it is hidden.
184 184 oldfilter = self.filteredrevs
185 185 try:
186 186 self.filteredrevs = ()
187 187 return changelog.changelog.revision(self, nodeorrev)
188 188 finally:
189 189 self.filteredrevs = oldfilter
190 190
191 191 class bundlemanifest(bundlerevlog, manifest.manifest):
192 192 def __init__(self, opener, bundle, linkmapper):
193 193 manifest.manifest.__init__(self, opener)
194 194 bundlerevlog.__init__(self, opener, self.indexfile, bundle,
195 195 linkmapper)
196 196
197 197 def baserevision(self, nodeorrev):
198 198 node = nodeorrev
199 199 if isinstance(node, int):
200 200 node = self.node(node)
201 201
202 202 if node in self._mancache:
203 203 result = self._mancache[node][0].text()
204 204 else:
205 205 result = manifest.manifest.revision(self, nodeorrev)
206 206 return result
207 207
208 208 class bundlefilelog(bundlerevlog, filelog.filelog):
209 209 def __init__(self, opener, path, bundle, linkmapper):
210 210 filelog.filelog.__init__(self, opener, path)
211 211 bundlerevlog.__init__(self, opener, self.indexfile, bundle,
212 212 linkmapper)
213 213
214 214 def baserevision(self, nodeorrev):
215 215 return filelog.filelog.revision(self, nodeorrev)
216 216
217 217 class bundlepeer(localrepo.localpeer):
218 218 def canpush(self):
219 219 return False
220 220
221 221 class bundlephasecache(phases.phasecache):
222 222 def __init__(self, *args, **kwargs):
223 223 super(bundlephasecache, self).__init__(*args, **kwargs)
224 224 if util.safehasattr(self, 'opener'):
225 225 self.opener = scmutil.readonlyvfs(self.opener)
226 226
227 227 def write(self):
228 228 raise NotImplementedError
229 229
230 230 def _write(self, fp):
231 231 raise NotImplementedError
232 232
233 233 def _updateroots(self, phase, newroots, tr):
234 234 self.phaseroots[phase] = newroots
235 235 self.invalidate()
236 236 self.dirty = True
237 237
238 238 class bundlerepository(localrepo.localrepository):
239 239 def __init__(self, ui, path, bundlename):
240 240 def _writetempbundle(read, suffix, header=''):
241 241 """Write a temporary file to disk
242 242
243 243 This is closure because we need to make sure this tracked by
244 244 self.tempfile for cleanup purposes."""
245 245 fdtemp, temp = self.vfs.mkstemp(prefix="hg-bundle-",
246 246 suffix=".hg10un")
247 247 self.tempfile = temp
248 fptemp = os.fdopen(fdtemp, 'wb')
249 248
250 try:
249 with os.fdopen(fdtemp, 'wb') as fptemp:
251 250 fptemp.write(header)
252 251 while True:
253 252 chunk = read(2**18)
254 253 if not chunk:
255 254 break
256 255 fptemp.write(chunk)
257 finally:
258 fptemp.close()
259 256
260 257 return self.vfs.open(self.tempfile, mode="rb")
261 258 self._tempparent = None
262 259 try:
263 260 localrepo.localrepository.__init__(self, ui, path)
264 261 except error.RepoError:
265 262 self._tempparent = tempfile.mkdtemp()
266 263 localrepo.instance(ui, self._tempparent, 1)
267 264 localrepo.localrepository.__init__(self, ui, self._tempparent)
268 265 self.ui.setconfig('phases', 'publish', False, 'bundlerepo')
269 266
270 267 if path:
271 268 self._url = 'bundle:' + util.expandpath(path) + '+' + bundlename
272 269 else:
273 270 self._url = 'bundle:' + bundlename
274 271
275 272 self.tempfile = None
276 273 f = util.posixfile(bundlename, "rb")
277 274 self.bundlefile = self.bundle = exchange.readbundle(ui, f, bundlename)
278 275
279 276 if isinstance(self.bundle, bundle2.unbundle20):
280 277 cgstream = None
281 278 for part in self.bundle.iterparts():
282 279 if part.type == 'changegroup':
283 280 if cgstream is not None:
284 281 raise NotImplementedError("can't process "
285 282 "multiple changegroups")
286 283 cgstream = part
287 284 version = part.params.get('version', '01')
288 285 if version not in changegroup.supportedversions(self):
289 286 msg = _('Unsupported changegroup version: %s')
290 287 raise error.Abort(msg % version)
291 288 if self.bundle.compressed():
292 289 cgstream = _writetempbundle(part.read,
293 290 ".cg%sun" % version)
294 291
295 292 if cgstream is None:
296 293 raise error.Abort('No changegroups found')
297 294 cgstream.seek(0)
298 295
299 296 self.bundle = changegroup.getunbundler(version, cgstream, 'UN')
300 297
301 298 elif self.bundle.compressed():
302 299 f = _writetempbundle(self.bundle.read, '.hg10un', header='HG10UN')
303 300 self.bundlefile = self.bundle = exchange.readbundle(ui, f,
304 301 bundlename,
305 302 self.vfs)
306 303
307 304 # dict with the mapping 'filename' -> position in the bundle
308 305 self.bundlefilespos = {}
309 306
310 307 self.firstnewrev = self.changelog.repotiprev + 1
311 308 phases.retractboundary(self, None, phases.draft,
312 309 [ctx.node() for ctx in self[self.firstnewrev:]])
313 310
314 311 @localrepo.unfilteredpropertycache
315 312 def _phasecache(self):
316 313 return bundlephasecache(self, self._phasedefaults)
317 314
318 315 @localrepo.unfilteredpropertycache
319 316 def changelog(self):
320 317 # consume the header if it exists
321 318 self.bundle.changelogheader()
322 319 c = bundlechangelog(self.svfs, self.bundle)
323 320 self.manstart = self.bundle.tell()
324 321 return c
325 322
326 323 @localrepo.unfilteredpropertycache
327 324 def manifest(self):
328 325 self.bundle.seek(self.manstart)
329 326 # consume the header if it exists
330 327 self.bundle.manifestheader()
331 328 m = bundlemanifest(self.svfs, self.bundle, self.changelog.rev)
332 329 # XXX: hack to work with changegroup3, but we still don't handle
333 330 # tree manifests correctly
334 331 if self.bundle.version == "03":
335 332 self.bundle.filelogheader()
336 333 self.filestart = self.bundle.tell()
337 334 return m
338 335
339 336 @localrepo.unfilteredpropertycache
340 337 def manstart(self):
341 338 self.changelog
342 339 return self.manstart
343 340
344 341 @localrepo.unfilteredpropertycache
345 342 def filestart(self):
346 343 self.manifest
347 344 return self.filestart
348 345
349 346 def url(self):
350 347 return self._url
351 348
352 349 def file(self, f):
353 350 if not self.bundlefilespos:
354 351 self.bundle.seek(self.filestart)
355 352 while True:
356 353 chunkdata = self.bundle.filelogheader()
357 354 if not chunkdata:
358 355 break
359 356 fname = chunkdata['filename']
360 357 self.bundlefilespos[fname] = self.bundle.tell()
361 358 while True:
362 359 c = self.bundle.deltachunk(None)
363 360 if not c:
364 361 break
365 362
366 363 if f in self.bundlefilespos:
367 364 self.bundle.seek(self.bundlefilespos[f])
368 365 return bundlefilelog(self.svfs, f, self.bundle, self.changelog.rev)
369 366 else:
370 367 return filelog.filelog(self.svfs, f)
371 368
372 369 def close(self):
373 370 """Close assigned bundle file immediately."""
374 371 self.bundlefile.close()
375 372 if self.tempfile is not None:
376 373 self.vfs.unlink(self.tempfile)
377 374 if self._tempparent:
378 375 shutil.rmtree(self._tempparent, True)
379 376
380 377 def cancopy(self):
381 378 return False
382 379
383 380 def peer(self):
384 381 return bundlepeer(self)
385 382
386 383 def getcwd(self):
387 384 return os.getcwd() # always outside the repo
388 385
389 386
390 387 def instance(ui, path, create):
391 388 if create:
392 389 raise error.Abort(_('cannot create new bundle repository'))
393 390 # internal config: bundle.mainreporoot
394 391 parentpath = ui.config("bundle", "mainreporoot", "")
395 392 if not parentpath:
396 393 # try to find the correct path to the working directory repo
397 394 parentpath = cmdutil.findrepo(os.getcwd())
398 395 if parentpath is None:
399 396 parentpath = ''
400 397 if parentpath:
401 398 # Try to make the full path relative so we get a nice, short URL.
402 399 # In particular, we don't want temp dir names in test outputs.
403 400 cwd = os.getcwd()
404 401 if parentpath == cwd:
405 402 parentpath = ''
406 403 else:
407 404 cwd = pathutil.normasprefix(cwd)
408 405 if parentpath.startswith(cwd):
409 406 parentpath = parentpath[len(cwd):]
410 407 u = util.url(path)
411 408 path = u.localpath()
412 409 if u.scheme == 'bundle':
413 410 s = path.split("+", 1)
414 411 if len(s) == 1:
415 412 repopath, bundlename = parentpath, s[0]
416 413 else:
417 414 repopath, bundlename = s
418 415 else:
419 416 repopath, bundlename = parentpath, path
420 417 return bundlerepository(ui, repopath, bundlename)
421 418
422 419 class bundletransactionmanager(object):
423 420 def transaction(self):
424 421 return None
425 422
426 423 def close(self):
427 424 raise NotImplementedError
428 425
429 426 def release(self):
430 427 raise NotImplementedError
431 428
432 429 def getremotechanges(ui, repo, other, onlyheads=None, bundlename=None,
433 430 force=False):
434 431 '''obtains a bundle of changes incoming from other
435 432
436 433 "onlyheads" restricts the returned changes to those reachable from the
437 434 specified heads.
438 435 "bundlename", if given, stores the bundle to this file path permanently;
439 436 otherwise it's stored to a temp file and gets deleted again when you call
440 437 the returned "cleanupfn".
441 438 "force" indicates whether to proceed on unrelated repos.
442 439
443 440 Returns a tuple (local, csets, cleanupfn):
444 441
445 442 "local" is a local repo from which to obtain the actual incoming
446 443 changesets; it is a bundlerepo for the obtained bundle when the
447 444 original "other" is remote.
448 445 "csets" lists the incoming changeset node ids.
449 446 "cleanupfn" must be called without arguments when you're done processing
450 447 the changes; it closes both the original "other" and the one returned
451 448 here.
452 449 '''
453 450 tmp = discovery.findcommonincoming(repo, other, heads=onlyheads,
454 451 force=force)
455 452 common, incoming, rheads = tmp
456 453 if not incoming:
457 454 try:
458 455 if bundlename:
459 456 os.unlink(bundlename)
460 457 except OSError:
461 458 pass
462 459 return repo, [], other.close
463 460
464 461 commonset = set(common)
465 462 rheads = [x for x in rheads if x not in commonset]
466 463
467 464 bundle = None
468 465 bundlerepo = None
469 466 localrepo = other.local()
470 467 if bundlename or not localrepo:
471 468 # create a bundle (uncompressed if other repo is not local)
472 469
473 470 canbundle2 = (ui.configbool('experimental', 'bundle2-exp', True)
474 471 and other.capable('getbundle')
475 472 and other.capable('bundle2'))
476 473 if canbundle2:
477 474 kwargs = {}
478 475 kwargs['common'] = common
479 476 kwargs['heads'] = rheads
480 477 kwargs['bundlecaps'] = exchange.caps20to10(repo)
481 478 kwargs['cg'] = True
482 479 b2 = other.getbundle('incoming', **kwargs)
483 480 fname = bundle = changegroup.writechunks(ui, b2._forwardchunks(),
484 481 bundlename)
485 482 else:
486 483 if other.capable('getbundle'):
487 484 cg = other.getbundle('incoming', common=common, heads=rheads)
488 485 elif onlyheads is None and not other.capable('changegroupsubset'):
489 486 # compat with older servers when pulling all remote heads
490 487 cg = other.changegroup(incoming, "incoming")
491 488 rheads = None
492 489 else:
493 490 cg = other.changegroupsubset(incoming, rheads, 'incoming')
494 491 if localrepo:
495 492 bundletype = "HG10BZ"
496 493 else:
497 494 bundletype = "HG10UN"
498 495 fname = bundle = changegroup.writebundle(ui, cg, bundlename,
499 496 bundletype)
500 497 # keep written bundle?
501 498 if bundlename:
502 499 bundle = None
503 500 if not localrepo:
504 501 # use the created uncompressed bundlerepo
505 502 localrepo = bundlerepo = bundlerepository(repo.baseui, repo.root,
506 503 fname)
507 504 # this repo contains local and other now, so filter out local again
508 505 common = repo.heads()
509 506 if localrepo:
510 507 # Part of common may be remotely filtered
511 508 # So use an unfiltered version
512 509 # The discovery process probably need cleanup to avoid that
513 510 localrepo = localrepo.unfiltered()
514 511
515 512 csets = localrepo.changelog.findmissing(common, rheads)
516 513
517 514 if bundlerepo:
518 515 reponodes = [ctx.node() for ctx in bundlerepo[bundlerepo.firstnewrev:]]
519 516 remotephases = other.listkeys('phases')
520 517
521 518 pullop = exchange.pulloperation(bundlerepo, other, heads=reponodes)
522 519 pullop.trmanager = bundletransactionmanager()
523 520 exchange._pullapplyphases(pullop, remotephases)
524 521
525 522 def cleanup():
526 523 if bundlerepo:
527 524 bundlerepo.close()
528 525 if bundle:
529 526 os.unlink(bundle)
530 527 other.close()
531 528
532 529 return (localrepo, csets, cleanup)
General Comments 0
You need to be logged in to leave comments. Login now