##// END OF EJS Templates
bundlerepo: move temp-bundle writing logic into a closure...
Pierre-Yves David -
r26800:7cac6ee4 default
parent child Browse files
Show More
@@ -1,517 +1,523 b''
1 1 # bundlerepo.py - repository class for viewing uncompressed bundles
2 2 #
3 3 # Copyright 2006, 2007 Benoit Boissinot <bboissin@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 """Repository class for viewing uncompressed bundles.
9 9
10 10 This provides a read-only repository interface to bundles as if they
11 11 were part of the actual repository.
12 12 """
13 13
14 14 from __future__ import absolute_import
15 15
16 16 import os
17 17 import shutil
18 18 import tempfile
19 19
20 20 from .i18n import _
21 21 from .node import nullid
22 22
23 23 from . import (
24 24 bundle2,
25 25 changegroup,
26 26 changelog,
27 27 cmdutil,
28 28 discovery,
29 29 error,
30 30 exchange,
31 31 filelog,
32 32 localrepo,
33 33 manifest,
34 34 mdiff,
35 35 pathutil,
36 36 phases,
37 37 revlog,
38 38 scmutil,
39 39 util,
40 40 )
41 41
42 42 class bundlerevlog(revlog.revlog):
43 43 def __init__(self, opener, indexfile, bundle, linkmapper):
44 44 # How it works:
45 45 # To retrieve a revision, we need to know the offset of the revision in
46 46 # the bundle (an unbundle object). We store this offset in the index
47 47 # (start). The base of the delta is stored in the base field.
48 48 #
49 49 # To differentiate a rev in the bundle from a rev in the revlog, we
50 50 # check revision against repotiprev.
51 51 opener = scmutil.readonlyvfs(opener)
52 52 revlog.revlog.__init__(self, opener, indexfile)
53 53 self.bundle = bundle
54 54 n = len(self)
55 55 self.repotiprev = n - 1
56 56 chain = None
57 57 self.bundlerevs = set() # used by 'bundle()' revset expression
58 58 while True:
59 59 chunkdata = bundle.deltachunk(chain)
60 60 if not chunkdata:
61 61 break
62 62 node = chunkdata['node']
63 63 p1 = chunkdata['p1']
64 64 p2 = chunkdata['p2']
65 65 cs = chunkdata['cs']
66 66 deltabase = chunkdata['deltabase']
67 67 delta = chunkdata['delta']
68 68
69 69 size = len(delta)
70 70 start = bundle.tell() - size
71 71
72 72 link = linkmapper(cs)
73 73 if node in self.nodemap:
74 74 # this can happen if two branches make the same change
75 75 chain = node
76 76 self.bundlerevs.add(self.nodemap[node])
77 77 continue
78 78
79 79 for p in (p1, p2):
80 80 if p not in self.nodemap:
81 81 raise error.LookupError(p, self.indexfile,
82 82 _("unknown parent"))
83 83
84 84 if deltabase not in self.nodemap:
85 85 raise LookupError(deltabase, self.indexfile,
86 86 _('unknown delta base'))
87 87
88 88 baserev = self.rev(deltabase)
89 89 # start, size, full unc. size, base (unused), link, p1, p2, node
90 90 e = (revlog.offset_type(start, 0), size, -1, baserev, link,
91 91 self.rev(p1), self.rev(p2), node)
92 92 self.index.insert(-1, e)
93 93 self.nodemap[node] = n
94 94 self.bundlerevs.add(n)
95 95 chain = node
96 96 n += 1
97 97
98 98 def _chunk(self, rev):
99 99 # Warning: in case of bundle, the diff is against what we stored as
100 100 # delta base, not against rev - 1
101 101 # XXX: could use some caching
102 102 if rev <= self.repotiprev:
103 103 return revlog.revlog._chunk(self, rev)
104 104 self.bundle.seek(self.start(rev))
105 105 return self.bundle.read(self.length(rev))
106 106
107 107 def revdiff(self, rev1, rev2):
108 108 """return or calculate a delta between two revisions"""
109 109 if rev1 > self.repotiprev and rev2 > self.repotiprev:
110 110 # hot path for bundle
111 111 revb = self.index[rev2][3]
112 112 if revb == rev1:
113 113 return self._chunk(rev2)
114 114 elif rev1 <= self.repotiprev and rev2 <= self.repotiprev:
115 115 return revlog.revlog.revdiff(self, rev1, rev2)
116 116
117 117 return mdiff.textdiff(self.revision(self.node(rev1)),
118 118 self.revision(self.node(rev2)))
119 119
120 120 def revision(self, nodeorrev):
121 121 """return an uncompressed revision of a given node or revision
122 122 number.
123 123 """
124 124 if isinstance(nodeorrev, int):
125 125 rev = nodeorrev
126 126 node = self.node(rev)
127 127 else:
128 128 node = nodeorrev
129 129 rev = self.rev(node)
130 130
131 131 if node == nullid:
132 132 return ""
133 133
134 134 text = None
135 135 chain = []
136 136 iterrev = rev
137 137 # reconstruct the revision if it is from a changegroup
138 138 while iterrev > self.repotiprev:
139 139 if self._cache and self._cache[1] == iterrev:
140 140 text = self._cache[2]
141 141 break
142 142 chain.append(iterrev)
143 143 iterrev = self.index[iterrev][3]
144 144 if text is None:
145 145 text = self.baserevision(iterrev)
146 146
147 147 while chain:
148 148 delta = self._chunk(chain.pop())
149 149 text = mdiff.patches(text, [delta])
150 150
151 151 self._checkhash(text, node, rev)
152 152 self._cache = (node, rev, text)
153 153 return text
154 154
155 155 def baserevision(self, nodeorrev):
156 156 # Revlog subclasses may override 'revision' method to modify format of
157 157 # content retrieved from revlog. To use bundlerevlog with such class one
158 158 # needs to override 'baserevision' and make more specific call here.
159 159 return revlog.revlog.revision(self, nodeorrev)
160 160
161 161 def addrevision(self, text, transaction, link, p1=None, p2=None, d=None):
162 162 raise NotImplementedError
163 163 def addgroup(self, revs, linkmapper, transaction):
164 164 raise NotImplementedError
165 165 def strip(self, rev, minlink):
166 166 raise NotImplementedError
167 167 def checksize(self):
168 168 raise NotImplementedError
169 169
170 170 class bundlechangelog(bundlerevlog, changelog.changelog):
171 171 def __init__(self, opener, bundle):
172 172 changelog.changelog.__init__(self, opener)
173 173 linkmapper = lambda x: x
174 174 bundlerevlog.__init__(self, opener, self.indexfile, bundle,
175 175 linkmapper)
176 176
177 177 def baserevision(self, nodeorrev):
178 178 # Although changelog doesn't override 'revision' method, some extensions
179 179 # may replace this class with another that does. Same story with
180 180 # manifest and filelog classes.
181 181
182 182 # This bypasses filtering on changelog.node() and rev() because we need
183 183 # revision text of the bundle base even if it is hidden.
184 184 oldfilter = self.filteredrevs
185 185 try:
186 186 self.filteredrevs = ()
187 187 return changelog.changelog.revision(self, nodeorrev)
188 188 finally:
189 189 self.filteredrevs = oldfilter
190 190
191 191 class bundlemanifest(bundlerevlog, manifest.manifest):
192 192 def __init__(self, opener, bundle, linkmapper):
193 193 manifest.manifest.__init__(self, opener)
194 194 bundlerevlog.__init__(self, opener, self.indexfile, bundle,
195 195 linkmapper)
196 196
197 197 def baserevision(self, nodeorrev):
198 198 node = nodeorrev
199 199 if isinstance(node, int):
200 200 node = self.node(node)
201 201
202 202 if node in self._mancache:
203 203 result = self._mancache[node][0].text()
204 204 else:
205 205 result = manifest.manifest.revision(self, nodeorrev)
206 206 return result
207 207
208 208 class bundlefilelog(bundlerevlog, filelog.filelog):
209 209 def __init__(self, opener, path, bundle, linkmapper):
210 210 filelog.filelog.__init__(self, opener, path)
211 211 bundlerevlog.__init__(self, opener, self.indexfile, bundle,
212 212 linkmapper)
213 213
214 214 def baserevision(self, nodeorrev):
215 215 return filelog.filelog.revision(self, nodeorrev)
216 216
217 217 class bundlepeer(localrepo.localpeer):
218 218 def canpush(self):
219 219 return False
220 220
221 221 class bundlephasecache(phases.phasecache):
222 222 def __init__(self, *args, **kwargs):
223 223 super(bundlephasecache, self).__init__(*args, **kwargs)
224 224 if util.safehasattr(self, 'opener'):
225 225 self.opener = scmutil.readonlyvfs(self.opener)
226 226
227 227 def write(self):
228 228 raise NotImplementedError
229 229
230 230 def _write(self, fp):
231 231 raise NotImplementedError
232 232
233 233 def _updateroots(self, phase, newroots, tr):
234 234 self.phaseroots[phase] = newroots
235 235 self.invalidate()
236 236 self.dirty = True
237 237
238 238 class bundlerepository(localrepo.localrepository):
239 239 def __init__(self, ui, path, bundlename):
240 def _writetempbundle(read, suffix, header=''):
241 """Write a temporary file to disk
242
243 This is closure because we need to make sure this tracked by
244 self.tempfile for cleanup purposes."""
245 fdtemp, temp = self.vfs.mkstemp(prefix="hg-bundle-",
246 suffix=".hg10un")
247 self.tempfile = temp
248 fptemp = os.fdopen(fdtemp, 'wb')
249
250 try:
251 fptemp.write(header)
252 while True:
253 chunk = read(2**18)
254 if not chunk:
255 break
256 fptemp.write(chunk)
257 finally:
258 fptemp.close()
259
260 return self.vfs.open(self.tempfile, mode="rb")
240 261 self._tempparent = None
241 262 try:
242 263 localrepo.localrepository.__init__(self, ui, path)
243 264 except error.RepoError:
244 265 self._tempparent = tempfile.mkdtemp()
245 266 localrepo.instance(ui, self._tempparent, 1)
246 267 localrepo.localrepository.__init__(self, ui, self._tempparent)
247 268 self.ui.setconfig('phases', 'publish', False, 'bundlerepo')
248 269
249 270 if path:
250 271 self._url = 'bundle:' + util.expandpath(path) + '+' + bundlename
251 272 else:
252 273 self._url = 'bundle:' + bundlename
253 274
254 275 self.tempfile = None
255 276 f = util.posixfile(bundlename, "rb")
256 277 self.bundlefile = self.bundle = exchange.readbundle(ui, f, bundlename)
257 278 if self.bundle.compressed():
258 fdtemp, temp = self.vfs.mkstemp(prefix="hg-bundle-",
259 suffix=".hg10un")
260 self.tempfile = temp
261 fptemp = os.fdopen(fdtemp, 'wb')
262
263 try:
264 fptemp.write("HG10UN")
265 while True:
266 chunk = self.bundle.read(2**18)
267 if not chunk:
268 break
269 fptemp.write(chunk)
270 finally:
271 fptemp.close()
272
273 f = self.vfs.open(self.tempfile, mode="rb")
279 f = _writetempbundle(self.bundle.read, '.hg10un', header='HG10UN')
274 280 self.bundlefile = self.bundle = exchange.readbundle(ui, f,
275 281 bundlename,
276 282 self.vfs)
277 283
278 284 if isinstance(self.bundle, bundle2.unbundle20):
279 285 cgparts = [part for part in self.bundle.iterparts()
280 286 if (part.type == 'changegroup')
281 287 and (part.params.get('version', '01')
282 288 in changegroup.packermap)]
283 289
284 290 if not cgparts:
285 291 raise error.Abort('No changegroups found')
286 292 version = cgparts[0].params.get('version', '01')
287 293 cgparts = [p for p in cgparts
288 294 if p.params.get('version', '01') == version]
289 295 if len(cgparts) > 1:
290 296 raise NotImplementedError("Can't process multiple changegroups")
291 297 part = cgparts[0]
292 298
293 299 part.seek(0)
294 300 self.bundle = changegroup.packermap[version][1](part, 'UN')
295 301
296 302 # dict with the mapping 'filename' -> position in the bundle
297 303 self.bundlefilespos = {}
298 304
299 305 self.firstnewrev = self.changelog.repotiprev + 1
300 306 phases.retractboundary(self, None, phases.draft,
301 307 [ctx.node() for ctx in self[self.firstnewrev:]])
302 308
303 309 @localrepo.unfilteredpropertycache
304 310 def _phasecache(self):
305 311 return bundlephasecache(self, self._phasedefaults)
306 312
307 313 @localrepo.unfilteredpropertycache
308 314 def changelog(self):
309 315 # consume the header if it exists
310 316 self.bundle.changelogheader()
311 317 c = bundlechangelog(self.svfs, self.bundle)
312 318 self.manstart = self.bundle.tell()
313 319 return c
314 320
315 321 @localrepo.unfilteredpropertycache
316 322 def manifest(self):
317 323 self.bundle.seek(self.manstart)
318 324 # consume the header if it exists
319 325 self.bundle.manifestheader()
320 326 m = bundlemanifest(self.svfs, self.bundle, self.changelog.rev)
321 327 self.filestart = self.bundle.tell()
322 328 return m
323 329
324 330 @localrepo.unfilteredpropertycache
325 331 def manstart(self):
326 332 self.changelog
327 333 return self.manstart
328 334
329 335 @localrepo.unfilteredpropertycache
330 336 def filestart(self):
331 337 self.manifest
332 338 return self.filestart
333 339
334 340 def url(self):
335 341 return self._url
336 342
337 343 def file(self, f):
338 344 if not self.bundlefilespos:
339 345 self.bundle.seek(self.filestart)
340 346 while True:
341 347 chunkdata = self.bundle.filelogheader()
342 348 if not chunkdata:
343 349 break
344 350 fname = chunkdata['filename']
345 351 self.bundlefilespos[fname] = self.bundle.tell()
346 352 while True:
347 353 c = self.bundle.deltachunk(None)
348 354 if not c:
349 355 break
350 356
351 357 if f in self.bundlefilespos:
352 358 self.bundle.seek(self.bundlefilespos[f])
353 359 return bundlefilelog(self.svfs, f, self.bundle, self.changelog.rev)
354 360 else:
355 361 return filelog.filelog(self.svfs, f)
356 362
357 363 def close(self):
358 364 """Close assigned bundle file immediately."""
359 365 self.bundlefile.close()
360 366 if self.tempfile is not None:
361 367 self.vfs.unlink(self.tempfile)
362 368 if self._tempparent:
363 369 shutil.rmtree(self._tempparent, True)
364 370
365 371 def cancopy(self):
366 372 return False
367 373
368 374 def peer(self):
369 375 return bundlepeer(self)
370 376
371 377 def getcwd(self):
372 378 return os.getcwd() # always outside the repo
373 379
374 380
375 381 def instance(ui, path, create):
376 382 if create:
377 383 raise error.Abort(_('cannot create new bundle repository'))
378 384 # internal config: bundle.mainreporoot
379 385 parentpath = ui.config("bundle", "mainreporoot", "")
380 386 if not parentpath:
381 387 # try to find the correct path to the working directory repo
382 388 parentpath = cmdutil.findrepo(os.getcwd())
383 389 if parentpath is None:
384 390 parentpath = ''
385 391 if parentpath:
386 392 # Try to make the full path relative so we get a nice, short URL.
387 393 # In particular, we don't want temp dir names in test outputs.
388 394 cwd = os.getcwd()
389 395 if parentpath == cwd:
390 396 parentpath = ''
391 397 else:
392 398 cwd = pathutil.normasprefix(cwd)
393 399 if parentpath.startswith(cwd):
394 400 parentpath = parentpath[len(cwd):]
395 401 u = util.url(path)
396 402 path = u.localpath()
397 403 if u.scheme == 'bundle':
398 404 s = path.split("+", 1)
399 405 if len(s) == 1:
400 406 repopath, bundlename = parentpath, s[0]
401 407 else:
402 408 repopath, bundlename = s
403 409 else:
404 410 repopath, bundlename = parentpath, path
405 411 return bundlerepository(ui, repopath, bundlename)
406 412
407 413 class bundletransactionmanager(object):
408 414 def transaction(self):
409 415 return None
410 416
411 417 def close(self):
412 418 raise NotImplementedError
413 419
414 420 def release(self):
415 421 raise NotImplementedError
416 422
417 423 def getremotechanges(ui, repo, other, onlyheads=None, bundlename=None,
418 424 force=False):
419 425 '''obtains a bundle of changes incoming from other
420 426
421 427 "onlyheads" restricts the returned changes to those reachable from the
422 428 specified heads.
423 429 "bundlename", if given, stores the bundle to this file path permanently;
424 430 otherwise it's stored to a temp file and gets deleted again when you call
425 431 the returned "cleanupfn".
426 432 "force" indicates whether to proceed on unrelated repos.
427 433
428 434 Returns a tuple (local, csets, cleanupfn):
429 435
430 436 "local" is a local repo from which to obtain the actual incoming
431 437 changesets; it is a bundlerepo for the obtained bundle when the
432 438 original "other" is remote.
433 439 "csets" lists the incoming changeset node ids.
434 440 "cleanupfn" must be called without arguments when you're done processing
435 441 the changes; it closes both the original "other" and the one returned
436 442 here.
437 443 '''
438 444 tmp = discovery.findcommonincoming(repo, other, heads=onlyheads,
439 445 force=force)
440 446 common, incoming, rheads = tmp
441 447 if not incoming:
442 448 try:
443 449 if bundlename:
444 450 os.unlink(bundlename)
445 451 except OSError:
446 452 pass
447 453 return repo, [], other.close
448 454
449 455 commonset = set(common)
450 456 rheads = [x for x in rheads if x not in commonset]
451 457
452 458 bundle = None
453 459 bundlerepo = None
454 460 localrepo = other.local()
455 461 if bundlename or not localrepo:
456 462 # create a bundle (uncompressed if other repo is not local)
457 463
458 464 canbundle2 = (ui.configbool('experimental', 'bundle2-exp', True)
459 465 and other.capable('getbundle')
460 466 and other.capable('bundle2'))
461 467 if canbundle2:
462 468 kwargs = {}
463 469 kwargs['common'] = common
464 470 kwargs['heads'] = rheads
465 471 kwargs['bundlecaps'] = exchange.caps20to10(repo)
466 472 kwargs['cg'] = True
467 473 b2 = other.getbundle('incoming', **kwargs)
468 474 fname = bundle = changegroup.writechunks(ui, b2._forwardchunks(),
469 475 bundlename)
470 476 else:
471 477 if other.capable('getbundle'):
472 478 cg = other.getbundle('incoming', common=common, heads=rheads)
473 479 elif onlyheads is None and not other.capable('changegroupsubset'):
474 480 # compat with older servers when pulling all remote heads
475 481 cg = other.changegroup(incoming, "incoming")
476 482 rheads = None
477 483 else:
478 484 cg = other.changegroupsubset(incoming, rheads, 'incoming')
479 485 if localrepo:
480 486 bundletype = "HG10BZ"
481 487 else:
482 488 bundletype = "HG10UN"
483 489 fname = bundle = changegroup.writebundle(ui, cg, bundlename,
484 490 bundletype)
485 491 # keep written bundle?
486 492 if bundlename:
487 493 bundle = None
488 494 if not localrepo:
489 495 # use the created uncompressed bundlerepo
490 496 localrepo = bundlerepo = bundlerepository(repo.baseui, repo.root,
491 497 fname)
492 498 # this repo contains local and other now, so filter out local again
493 499 common = repo.heads()
494 500 if localrepo:
495 501 # Part of common may be remotely filtered
496 502 # So use an unfiltered version
497 503 # The discovery process probably need cleanup to avoid that
498 504 localrepo = localrepo.unfiltered()
499 505
500 506 csets = localrepo.changelog.findmissing(common, rheads)
501 507
502 508 if bundlerepo:
503 509 reponodes = [ctx.node() for ctx in bundlerepo[bundlerepo.firstnewrev:]]
504 510 remotephases = other.listkeys('phases')
505 511
506 512 pullop = exchange.pulloperation(bundlerepo, other, heads=reponodes)
507 513 pullop.trmanager = bundletransactionmanager()
508 514 exchange._pullapplyphases(pullop, remotephases)
509 515
510 516 def cleanup():
511 517 if bundlerepo:
512 518 bundlerepo.close()
513 519 if bundle:
514 520 os.unlink(bundle)
515 521 other.close()
516 522
517 523 return (localrepo, csets, cleanup)
General Comments 0
You need to be logged in to leave comments. Login now