##// END OF EJS Templates
bundlerepo: handle changegroup induced phase movement in the associated method...
marmoute -
r51094:21f87689 default
parent child Browse files
Show More
@@ -1,714 +1,722 b''
1 1 # bundlerepo.py - repository class for viewing uncompressed bundles
2 2 #
3 3 # Copyright 2006, 2007 Benoit Boissinot <bboissin@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 """Repository class for viewing uncompressed bundles.
9 9
10 10 This provides a read-only repository interface to bundles as if they
11 11 were part of the actual repository.
12 12 """
13 13
14 14
15 15 import os
16 16 import shutil
17 17
18 18 from .i18n import _
19 19 from .node import (
20 20 hex,
21 21 nullrev,
22 22 )
23 23
24 24 from . import (
25 25 bundle2,
26 26 changegroup,
27 27 changelog,
28 28 cmdutil,
29 29 discovery,
30 30 encoding,
31 31 error,
32 32 exchange,
33 33 filelog,
34 34 localrepo,
35 35 manifest,
36 36 mdiff,
37 37 pathutil,
38 38 phases,
39 39 pycompat,
40 40 revlog,
41 41 revlogutils,
42 42 util,
43 43 vfs as vfsmod,
44 44 )
45 45 from .utils import (
46 46 urlutil,
47 47 )
48 48
49 49 from .revlogutils import (
50 50 constants as revlog_constants,
51 51 )
52 52
53 53
54 54 class bundlerevlog(revlog.revlog):
55 55 def __init__(self, opener, target, radix, cgunpacker, linkmapper):
56 56 # How it works:
57 57 # To retrieve a revision, we need to know the offset of the revision in
58 58 # the bundle (an unbundle object). We store this offset in the index
59 59 # (start). The base of the delta is stored in the base field.
60 60 #
61 61 # To differentiate a rev in the bundle from a rev in the revlog, we
62 62 # check revision against repotiprev.
63 63 opener = vfsmod.readonlyvfs(opener)
64 64 revlog.revlog.__init__(self, opener, target=target, radix=radix)
65 65 self.bundle = cgunpacker
66 66 n = len(self)
67 67 self.repotiprev = n - 1
68 68 self.bundlerevs = set() # used by 'bundle()' revset expression
69 69 for deltadata in cgunpacker.deltaiter():
70 70 node, p1, p2, cs, deltabase, delta, flags, sidedata = deltadata
71 71
72 72 size = len(delta)
73 73 start = cgunpacker.tell() - size
74 74
75 75 if self.index.has_node(node):
76 76 # this can happen if two branches make the same change
77 77 self.bundlerevs.add(self.index.rev(node))
78 78 continue
79 79 if cs == node:
80 80 linkrev = nullrev
81 81 else:
82 82 linkrev = linkmapper(cs)
83 83
84 84 for p in (p1, p2):
85 85 if not self.index.has_node(p):
86 86 raise error.LookupError(
87 87 p, self.display_id, _(b"unknown parent")
88 88 )
89 89
90 90 if not self.index.has_node(deltabase):
91 91 raise error.LookupError(
92 92 deltabase, self.display_id, _(b'unknown delta base')
93 93 )
94 94
95 95 baserev = self.rev(deltabase)
96 96 # start, size, full unc. size, base (unused), link, p1, p2, node, sidedata_offset (unused), sidedata_size (unused)
97 97 e = revlogutils.entry(
98 98 flags=flags,
99 99 data_offset=start,
100 100 data_compressed_length=size,
101 101 data_delta_base=baserev,
102 102 link_rev=linkrev,
103 103 parent_rev_1=self.rev(p1),
104 104 parent_rev_2=self.rev(p2),
105 105 node_id=node,
106 106 )
107 107 self.index.append(e)
108 108 self.bundlerevs.add(n)
109 109 n += 1
110 110
111 111 def _chunk(self, rev, df=None):
112 112 # Warning: in case of bundle, the diff is against what we stored as
113 113 # delta base, not against rev - 1
114 114 # XXX: could use some caching
115 115 if rev <= self.repotiprev:
116 116 return revlog.revlog._chunk(self, rev)
117 117 self.bundle.seek(self.start(rev))
118 118 return self.bundle.read(self.length(rev))
119 119
120 120 def revdiff(self, rev1, rev2):
121 121 """return or calculate a delta between two revisions"""
122 122 if rev1 > self.repotiprev and rev2 > self.repotiprev:
123 123 # hot path for bundle
124 124 revb = self.index[rev2][3]
125 125 if revb == rev1:
126 126 return self._chunk(rev2)
127 127 elif rev1 <= self.repotiprev and rev2 <= self.repotiprev:
128 128 return revlog.revlog.revdiff(self, rev1, rev2)
129 129
130 130 return mdiff.textdiff(self.rawdata(rev1), self.rawdata(rev2))
131 131
132 132 def _rawtext(self, node, rev, _df=None):
133 133 if rev is None:
134 134 rev = self.rev(node)
135 135 validated = False
136 136 rawtext = None
137 137 chain = []
138 138 iterrev = rev
139 139 # reconstruct the revision if it is from a changegroup
140 140 while iterrev > self.repotiprev:
141 141 if self._revisioncache and self._revisioncache[1] == iterrev:
142 142 rawtext = self._revisioncache[2]
143 143 break
144 144 chain.append(iterrev)
145 145 iterrev = self.index[iterrev][3]
146 146 if iterrev == nullrev:
147 147 rawtext = b''
148 148 elif rawtext is None:
149 149 r = super(bundlerevlog, self)._rawtext(
150 150 self.node(iterrev), iterrev, _df=_df
151 151 )
152 152 __, rawtext, validated = r
153 153 if chain:
154 154 validated = False
155 155 while chain:
156 156 delta = self._chunk(chain.pop())
157 157 rawtext = mdiff.patches(rawtext, [delta])
158 158 return rev, rawtext, validated
159 159
160 160 def addrevision(self, *args, **kwargs):
161 161 raise NotImplementedError
162 162
163 163 def addgroup(self, *args, **kwargs):
164 164 raise NotImplementedError
165 165
166 166 def strip(self, *args, **kwargs):
167 167 raise NotImplementedError
168 168
169 169 def checksize(self):
170 170 raise NotImplementedError
171 171
172 172
173 173 class bundlechangelog(bundlerevlog, changelog.changelog):
174 174 def __init__(self, opener, cgunpacker):
175 175 changelog.changelog.__init__(self, opener)
176 176 linkmapper = lambda x: x
177 177 bundlerevlog.__init__(
178 178 self,
179 179 opener,
180 180 (revlog_constants.KIND_CHANGELOG, None),
181 181 self.radix,
182 182 cgunpacker,
183 183 linkmapper,
184 184 )
185 185
186 186
187 187 class bundlemanifest(bundlerevlog, manifest.manifestrevlog):
188 188 def __init__(
189 189 self,
190 190 nodeconstants,
191 191 opener,
192 192 cgunpacker,
193 193 linkmapper,
194 194 dirlogstarts=None,
195 195 dir=b'',
196 196 ):
197 197 manifest.manifestrevlog.__init__(self, nodeconstants, opener, tree=dir)
198 198 bundlerevlog.__init__(
199 199 self,
200 200 opener,
201 201 (revlog_constants.KIND_MANIFESTLOG, dir),
202 202 self._revlog.radix,
203 203 cgunpacker,
204 204 linkmapper,
205 205 )
206 206 if dirlogstarts is None:
207 207 dirlogstarts = {}
208 208 if self.bundle.version == b"03":
209 209 dirlogstarts = _getfilestarts(self.bundle)
210 210 self._dirlogstarts = dirlogstarts
211 211 self._linkmapper = linkmapper
212 212
213 213 def dirlog(self, d):
214 214 if d in self._dirlogstarts:
215 215 self.bundle.seek(self._dirlogstarts[d])
216 216 return bundlemanifest(
217 217 self.nodeconstants,
218 218 self.opener,
219 219 self.bundle,
220 220 self._linkmapper,
221 221 self._dirlogstarts,
222 222 dir=d,
223 223 )
224 224 return super(bundlemanifest, self).dirlog(d)
225 225
226 226
227 227 class bundlefilelog(filelog.filelog):
228 228 def __init__(self, opener, path, cgunpacker, linkmapper):
229 229 filelog.filelog.__init__(self, opener, path)
230 230 self._revlog = bundlerevlog(
231 231 opener,
232 232 # XXX should use the unencoded path
233 233 target=(revlog_constants.KIND_FILELOG, path),
234 234 radix=self._revlog.radix,
235 235 cgunpacker=cgunpacker,
236 236 linkmapper=linkmapper,
237 237 )
238 238
239 239
240 240 class bundlepeer(localrepo.localpeer):
241 241 def canpush(self):
242 242 return False
243 243
244 244
245 245 class bundlephasecache(phases.phasecache):
246 246 def __init__(self, *args, **kwargs):
247 247 super(bundlephasecache, self).__init__(*args, **kwargs)
248 248 if util.safehasattr(self, 'opener'):
249 249 self.opener = vfsmod.readonlyvfs(self.opener)
250 250
251 251 def write(self):
252 252 raise NotImplementedError
253 253
254 254 def _write(self, fp):
255 255 raise NotImplementedError
256 256
257 257 def _updateroots(self, phase, newroots, tr):
258 258 self.phaseroots[phase] = newroots
259 259 self.invalidate()
260 260 self.dirty = True
261 261
262 262
263 263 def _getfilestarts(cgunpacker):
264 264 filespos = {}
265 265 for chunkdata in iter(cgunpacker.filelogheader, {}):
266 266 fname = chunkdata[b'filename']
267 267 filespos[fname] = cgunpacker.tell()
268 268 for chunk in iter(lambda: cgunpacker.deltachunk(None), {}):
269 269 pass
270 270 return filespos
271 271
272 272
273 273 class bundlerepository:
274 274 """A repository instance that is a union of a local repo and a bundle.
275 275
276 276 Instances represent a read-only repository composed of a local repository
277 277 with the contents of a bundle file applied. The repository instance is
278 278 conceptually similar to the state of a repository after an
279 279 ``hg unbundle`` operation. However, the contents of the bundle are never
280 280 applied to the actual base repository.
281 281
282 282 Instances constructed directly are not usable as repository objects.
283 283 Use instance() or makebundlerepository() to create instances.
284 284 """
285 285
286 286 def __init__(self, bundlepath, url, tempparent):
287 287 self._tempparent = tempparent
288 288 self._url = url
289 289
290 290 self.ui.setconfig(b'phases', b'publish', False, b'bundlerepo')
291 291
292 292 # dict with the mapping 'filename' -> position in the changegroup.
293 293 self._cgfilespos = {}
294 294 self._bundlefile = None
295 295 self._cgunpacker = None
296 296 self.tempfile = None
297 297 f = util.posixfile(bundlepath, b"rb")
298 298 bundle = exchange.readbundle(self.ui, f, bundlepath)
299 299
300 300 if isinstance(bundle, bundle2.unbundle20):
301 301 self._bundlefile = bundle
302 302
303 303 cgpart = None
304 304 for part in bundle.iterparts(seekable=True):
305 305 if part.type == b'changegroup':
306 306 if cgpart:
307 307 raise NotImplementedError(
308 308 b"can't process multiple changegroups"
309 309 )
310 310 cgpart = part
311 311 self._handle_bundle2_cg_part(bundle, part)
312 312
313 313 if not cgpart:
314 314 raise error.Abort(_(b"No changegroups found"))
315 315
316 316 # This is required to placate a later consumer, which expects
317 317 # the payload offset to be at the beginning of the changegroup.
318 318 # We need to do this after the iterparts() generator advances
319 319 # because iterparts() will seek to end of payload after the
320 320 # generator returns control to iterparts().
321 321 cgpart.seek(0, os.SEEK_SET)
322 322
323 323 elif isinstance(bundle, changegroup.cg1unpacker):
324 324 self._handle_bundle1(bundle, bundlepath)
325 325 else:
326 326 raise error.Abort(
327 327 _(b'bundle type %s cannot be read') % type(bundle)
328 328 )
329 329
330 def _handle_bundle1(self, bundle, bundlepath):
331 if bundle.compressed():
332 f = self._writetempbundle(bundle.read, b'.hg10un', header=b'HG10UN')
333 bundle = exchange.readbundle(self.ui, f, bundlepath, self.vfs)
334
335 self._bundlefile = bundle
336 self._cgunpacker = bundle
337
330 338 self.firstnewrev = self.changelog.repotiprev + 1
331 339 phases.retractboundary(
332 340 self,
333 341 None,
334 342 phases.draft,
335 343 [ctx.node() for ctx in self[self.firstnewrev :]],
336 344 )
337 345
338 def _handle_bundle1(self, bundle, bundlepath):
339 if bundle.compressed():
340 f = self._writetempbundle(bundle.read, b'.hg10un', header=b'HG10UN')
341 bundle = exchange.readbundle(self.ui, f, bundlepath, self.vfs)
342
343 self._bundlefile = bundle
344 self._cgunpacker = bundle
345
346 346 def _handle_bundle2_cg_part(self, bundle, part):
347 347 assert part.type == b'changegroup'
348 348 cgstream = part
349 349 version = part.params.get(b'version', b'01')
350 350 legalcgvers = changegroup.supportedincomingversions(self)
351 351 if version not in legalcgvers:
352 352 msg = _(b'Unsupported changegroup version: %s')
353 353 raise error.Abort(msg % version)
354 354 if bundle.compressed():
355 355 cgstream = self._writetempbundle(part.read, b'.cg%sun' % version)
356 356
357 357 self._cgunpacker = changegroup.getunbundler(version, cgstream, b'UN')
358 358
359 self.firstnewrev = self.changelog.repotiprev + 1
360 phases.retractboundary(
361 self,
362 None,
363 phases.draft,
364 [ctx.node() for ctx in self[self.firstnewrev :]],
365 )
366
359 367 def _writetempbundle(self, readfn, suffix, header=b''):
360 368 """Write a temporary file to disk"""
361 369 fdtemp, temp = self.vfs.mkstemp(prefix=b"hg-bundle-", suffix=suffix)
362 370 self.tempfile = temp
363 371
364 372 with os.fdopen(fdtemp, 'wb') as fptemp:
365 373 fptemp.write(header)
366 374 while True:
367 375 chunk = readfn(2 ** 18)
368 376 if not chunk:
369 377 break
370 378 fptemp.write(chunk)
371 379
372 380 return self.vfs.open(self.tempfile, mode=b"rb")
373 381
374 382 @localrepo.unfilteredpropertycache
375 383 def _phasecache(self):
376 384 return bundlephasecache(self, self._phasedefaults)
377 385
378 386 @localrepo.unfilteredpropertycache
379 387 def changelog(self):
380 388 # consume the header if it exists
381 389 self._cgunpacker.changelogheader()
382 390 c = bundlechangelog(self.svfs, self._cgunpacker)
383 391 self.manstart = self._cgunpacker.tell()
384 392 return c
385 393
386 394 def _refreshchangelog(self):
387 395 # changelog for bundle repo are not filecache, this method is not
388 396 # applicable.
389 397 pass
390 398
391 399 @localrepo.unfilteredpropertycache
392 400 def manifestlog(self):
393 401 self._cgunpacker.seek(self.manstart)
394 402 # consume the header if it exists
395 403 self._cgunpacker.manifestheader()
396 404 linkmapper = self.unfiltered().changelog.rev
397 405 rootstore = bundlemanifest(
398 406 self.nodeconstants, self.svfs, self._cgunpacker, linkmapper
399 407 )
400 408 self.filestart = self._cgunpacker.tell()
401 409
402 410 return manifest.manifestlog(
403 411 self.svfs, self, rootstore, self.narrowmatch()
404 412 )
405 413
406 414 def _consumemanifest(self):
407 415 """Consumes the manifest portion of the bundle, setting filestart so the
408 416 file portion can be read."""
409 417 self._cgunpacker.seek(self.manstart)
410 418 self._cgunpacker.manifestheader()
411 419 for delta in self._cgunpacker.deltaiter():
412 420 pass
413 421 self.filestart = self._cgunpacker.tell()
414 422
415 423 @localrepo.unfilteredpropertycache
416 424 def manstart(self):
417 425 self.changelog
418 426 return self.manstart
419 427
420 428 @localrepo.unfilteredpropertycache
421 429 def filestart(self):
422 430 self.manifestlog
423 431
424 432 # If filestart was not set by self.manifestlog, that means the
425 433 # manifestlog implementation did not consume the manifests from the
426 434 # changegroup (ex: it might be consuming trees from a separate bundle2
427 435 # part instead). So we need to manually consume it.
428 436 if 'filestart' not in self.__dict__:
429 437 self._consumemanifest()
430 438
431 439 return self.filestart
432 440
433 441 def url(self):
434 442 return self._url
435 443
436 444 def file(self, f):
437 445 if not self._cgfilespos:
438 446 self._cgunpacker.seek(self.filestart)
439 447 self._cgfilespos = _getfilestarts(self._cgunpacker)
440 448
441 449 if f in self._cgfilespos:
442 450 self._cgunpacker.seek(self._cgfilespos[f])
443 451 linkmapper = self.unfiltered().changelog.rev
444 452 return bundlefilelog(self.svfs, f, self._cgunpacker, linkmapper)
445 453 else:
446 454 return super(bundlerepository, self).file(f)
447 455
448 456 def close(self):
449 457 """Close assigned bundle file immediately."""
450 458 self._bundlefile.close()
451 459 if self.tempfile is not None:
452 460 self.vfs.unlink(self.tempfile)
453 461 if self._tempparent:
454 462 shutil.rmtree(self._tempparent, True)
455 463
456 464 def cancopy(self):
457 465 return False
458 466
459 467 def peer(self, path=None):
460 468 return bundlepeer(self, path=path)
461 469
462 470 def getcwd(self):
463 471 return encoding.getcwd() # always outside the repo
464 472
465 473 # Check if parents exist in localrepo before setting
466 474 def setparents(self, p1, p2=None):
467 475 if p2 is None:
468 476 p2 = self.nullid
469 477 p1rev = self.changelog.rev(p1)
470 478 p2rev = self.changelog.rev(p2)
471 479 msg = _(b"setting parent to node %s that only exists in the bundle\n")
472 480 if self.changelog.repotiprev < p1rev:
473 481 self.ui.warn(msg % hex(p1))
474 482 if self.changelog.repotiprev < p2rev:
475 483 self.ui.warn(msg % hex(p2))
476 484 return super(bundlerepository, self).setparents(p1, p2)
477 485
478 486
479 487 def instance(ui, path, create, intents=None, createopts=None):
480 488 if create:
481 489 raise error.Abort(_(b'cannot create new bundle repository'))
482 490 # internal config: bundle.mainreporoot
483 491 parentpath = ui.config(b"bundle", b"mainreporoot")
484 492 if not parentpath:
485 493 # try to find the correct path to the working directory repo
486 494 parentpath = cmdutil.findrepo(encoding.getcwd())
487 495 if parentpath is None:
488 496 parentpath = b''
489 497 if parentpath:
490 498 # Try to make the full path relative so we get a nice, short URL.
491 499 # In particular, we don't want temp dir names in test outputs.
492 500 cwd = encoding.getcwd()
493 501 if parentpath == cwd:
494 502 parentpath = b''
495 503 else:
496 504 cwd = pathutil.normasprefix(cwd)
497 505 if parentpath.startswith(cwd):
498 506 parentpath = parentpath[len(cwd) :]
499 507 u = urlutil.url(path)
500 508 path = u.localpath()
501 509 if u.scheme == b'bundle':
502 510 s = path.split(b"+", 1)
503 511 if len(s) == 1:
504 512 repopath, bundlename = parentpath, s[0]
505 513 else:
506 514 repopath, bundlename = s
507 515 else:
508 516 repopath, bundlename = parentpath, path
509 517
510 518 return makebundlerepository(ui, repopath, bundlename)
511 519
512 520
513 521 def makebundlerepository(ui, repopath, bundlepath):
514 522 """Make a bundle repository object based on repo and bundle paths."""
515 523 if repopath:
516 524 url = b'bundle:%s+%s' % (util.expandpath(repopath), bundlepath)
517 525 else:
518 526 url = b'bundle:%s' % bundlepath
519 527
520 528 # Because we can't make any guarantees about the type of the base
521 529 # repository, we can't have a static class representing the bundle
522 530 # repository. We also can't make any guarantees about how to even
523 531 # call the base repository's constructor!
524 532 #
525 533 # So, our strategy is to go through ``localrepo.instance()`` to construct
526 534 # a repo instance. Then, we dynamically create a new type derived from
527 535 # both it and our ``bundlerepository`` class which overrides some
528 536 # functionality. We then change the type of the constructed repository
529 537 # to this new type and initialize the bundle-specific bits of it.
530 538
531 539 try:
532 540 repo = localrepo.instance(ui, repopath, create=False)
533 541 tempparent = None
534 542 except error.RequirementError:
535 543 raise # no fallback if the backing repo is unsupported
536 544 except error.RepoError:
537 545 tempparent = pycompat.mkdtemp()
538 546 try:
539 547 repo = localrepo.instance(ui, tempparent, create=True)
540 548 except Exception:
541 549 shutil.rmtree(tempparent)
542 550 raise
543 551
544 552 class derivedbundlerepository(bundlerepository, repo.__class__):
545 553 pass
546 554
547 555 repo.__class__ = derivedbundlerepository
548 556 bundlerepository.__init__(repo, bundlepath, url, tempparent)
549 557
550 558 return repo
551 559
552 560
553 561 class bundletransactionmanager:
554 562 def transaction(self):
555 563 return None
556 564
557 565 def close(self):
558 566 raise NotImplementedError
559 567
560 568 def release(self):
561 569 raise NotImplementedError
562 570
563 571
564 572 def getremotechanges(
565 573 ui, repo, peer, onlyheads=None, bundlename=None, force=False
566 574 ):
567 575 """obtains a bundle of changes incoming from peer
568 576
569 577 "onlyheads" restricts the returned changes to those reachable from the
570 578 specified heads.
571 579 "bundlename", if given, stores the bundle to this file path permanently;
572 580 otherwise it's stored to a temp file and gets deleted again when you call
573 581 the returned "cleanupfn".
574 582 "force" indicates whether to proceed on unrelated repos.
575 583
576 584 Returns a tuple (local, csets, cleanupfn):
577 585
578 586 "local" is a local repo from which to obtain the actual incoming
579 587 changesets; it is a bundlerepo for the obtained bundle when the
580 588 original "peer" is remote.
581 589 "csets" lists the incoming changeset node ids.
582 590 "cleanupfn" must be called without arguments when you're done processing
583 591 the changes; it closes both the original "peer" and the one returned
584 592 here.
585 593 """
586 594 tmp = discovery.findcommonincoming(repo, peer, heads=onlyheads, force=force)
587 595 common, incoming, rheads = tmp
588 596 if not incoming:
589 597 try:
590 598 if bundlename:
591 599 os.unlink(bundlename)
592 600 except OSError:
593 601 pass
594 602 return repo, [], peer.close
595 603
596 604 commonset = set(common)
597 605 rheads = [x for x in rheads if x not in commonset]
598 606
599 607 bundle = None
600 608 bundlerepo = None
601 609 localrepo = peer.local()
602 610 if bundlename or not localrepo:
603 611 # create a bundle (uncompressed if peer repo is not local)
604 612
605 613 # developer config: devel.legacy.exchange
606 614 legexc = ui.configlist(b'devel', b'legacy.exchange')
607 615 forcebundle1 = b'bundle2' not in legexc and b'bundle1' in legexc
608 616 canbundle2 = (
609 617 not forcebundle1
610 618 and peer.capable(b'getbundle')
611 619 and peer.capable(b'bundle2')
612 620 )
613 621 if canbundle2:
614 622 with peer.commandexecutor() as e:
615 623 b2 = e.callcommand(
616 624 b'getbundle',
617 625 {
618 626 b'source': b'incoming',
619 627 b'common': common,
620 628 b'heads': rheads,
621 629 b'bundlecaps': exchange.caps20to10(
622 630 repo, role=b'client'
623 631 ),
624 632 b'cg': True,
625 633 },
626 634 ).result()
627 635
628 636 fname = bundle = changegroup.writechunks(
629 637 ui, b2._forwardchunks(), bundlename
630 638 )
631 639 else:
632 640 if peer.capable(b'getbundle'):
633 641 with peer.commandexecutor() as e:
634 642 cg = e.callcommand(
635 643 b'getbundle',
636 644 {
637 645 b'source': b'incoming',
638 646 b'common': common,
639 647 b'heads': rheads,
640 648 },
641 649 ).result()
642 650 elif onlyheads is None and not peer.capable(b'changegroupsubset'):
643 651 # compat with older servers when pulling all remote heads
644 652
645 653 with peer.commandexecutor() as e:
646 654 cg = e.callcommand(
647 655 b'changegroup',
648 656 {
649 657 b'nodes': incoming,
650 658 b'source': b'incoming',
651 659 },
652 660 ).result()
653 661
654 662 rheads = None
655 663 else:
656 664 with peer.commandexecutor() as e:
657 665 cg = e.callcommand(
658 666 b'changegroupsubset',
659 667 {
660 668 b'bases': incoming,
661 669 b'heads': rheads,
662 670 b'source': b'incoming',
663 671 },
664 672 ).result()
665 673
666 674 if localrepo:
667 675 bundletype = b"HG10BZ"
668 676 else:
669 677 bundletype = b"HG10UN"
670 678 fname = bundle = bundle2.writebundle(ui, cg, bundlename, bundletype)
671 679 # keep written bundle?
672 680 if bundlename:
673 681 bundle = None
674 682 if not localrepo:
675 683 # use the created uncompressed bundlerepo
676 684 localrepo = bundlerepo = makebundlerepository(
677 685 repo.baseui, repo.root, fname
678 686 )
679 687
680 688 # this repo contains local and peer now, so filter out local again
681 689 common = repo.heads()
682 690 if localrepo:
683 691 # Part of common may be remotely filtered
684 692 # So use an unfiltered version
685 693 # The discovery process probably need cleanup to avoid that
686 694 localrepo = localrepo.unfiltered()
687 695
688 696 csets = localrepo.changelog.findmissing(common, rheads)
689 697
690 698 if bundlerepo:
691 699 reponodes = [ctx.node() for ctx in bundlerepo[bundlerepo.firstnewrev :]]
692 700
693 701 with peer.commandexecutor() as e:
694 702 remotephases = e.callcommand(
695 703 b'listkeys',
696 704 {
697 705 b'namespace': b'phases',
698 706 },
699 707 ).result()
700 708
701 709 pullop = exchange.pulloperation(
702 710 bundlerepo, peer, path=None, heads=reponodes
703 711 )
704 712 pullop.trmanager = bundletransactionmanager()
705 713 exchange._pullapplyphases(pullop, remotephases)
706 714
707 715 def cleanup():
708 716 if bundlerepo:
709 717 bundlerepo.close()
710 718 if bundle:
711 719 os.unlink(bundle)
712 720 peer.close()
713 721
714 722 return (localrepo, csets, cleanup)
General Comments 0
You need to be logged in to leave comments. Login now