##// END OF EJS Templates
exchangev2: start to implement pull with wire protocol v2...
Gregory Szorc -
r39665:a86d21e7 default
parent child Browse files
Show More
@@ -0,0 +1,55
1 # exchangev2.py - repository exchange for wire protocol version 2
2 #
3 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
4 #
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
7
8 from __future__ import absolute_import
9
10 from .node import (
11 nullid,
12 )
13 from . import (
14 setdiscovery,
15 )
16
17 def pull(pullop):
18 """Pull using wire protocol version 2."""
19 repo = pullop.repo
20 remote = pullop.remote
21
22 # Figure out what needs to be fetched.
23 common, fetch, remoteheads = _pullchangesetdiscovery(
24 repo, remote, pullop.heads, abortwhenunrelated=pullop.force)
25
26 def _pullchangesetdiscovery(repo, remote, heads, abortwhenunrelated=True):
27 """Determine which changesets need to be pulled."""
28
29 if heads:
30 knownnode = repo.changelog.hasnode
31 if all(knownnode(head) for head in heads):
32 return heads, False, heads
33
34 # TODO wire protocol version 2 is capable of more efficient discovery
35 # than setdiscovery. Consider implementing something better.
36 common, fetch, remoteheads = setdiscovery.findcommonheads(
37 repo.ui, repo, remote, abortwhenunrelated=abortwhenunrelated)
38
39 common = set(common)
40 remoteheads = set(remoteheads)
41
42 # If a remote head is filtered locally, put it back in the common set.
43 # See the comment in exchange._pulldiscoverychangegroup() for more.
44
45 if fetch and remoteheads:
46 nodemap = repo.unfiltered().changelog.nodemap
47
48 common |= {head for head in remoteheads if head in nodemap}
49
50 if set(remoteheads).issubset(common):
51 fetch = []
52
53 common.discard(nullid)
54
55 return common, fetch, remoteheads
@@ -0,0 +1,53
1 Tests for wire protocol version 2 exchange.
2 Tests in this file should be folded into existing tests once protocol
3 v2 has enough features that it can be enabled via #testcase in existing
4 tests.
5
6 $ . $TESTDIR/wireprotohelpers.sh
7 $ enablehttpv2client
8
9 $ hg init server-simple
10 $ enablehttpv2 server-simple
11 $ cd server-simple
12 $ cat >> .hg/hgrc << EOF
13 > [phases]
14 > publish = false
15 > EOF
16 $ echo a0 > a
17 $ echo b0 > b
18 $ hg -q commit -A -m 'commit 0'
19
20 $ echo a1 > a
21 $ hg commit -m 'commit 1'
22 $ hg phase --public -r .
23 $ echo a2 > a
24 $ hg commit -m 'commit 2'
25
26 $ hg -q up -r 0
27 $ echo b1 > b
28 $ hg -q commit -m 'head 2 commit 1'
29 $ echo b2 > b
30 $ hg -q commit -m 'head 2 commit 2'
31
32 $ hg serve -p $HGPORT -d --pid-file hg.pid -E error.log
33 $ cat hg.pid > $DAEMON_PIDS
34
35 $ cd ..
36
37 Test basic clone
38
39 $ hg --debug clone -U http://localhost:$HGPORT client-simple
40 using http://localhost:$HGPORT/
41 sending capabilities command
42 query 1; heads
43 sending 2 commands
44 sending command heads: {}
45 sending command known: {
46 'nodes': []
47 }
48 received frame(size=11; request=1; stream=2; streamflags=stream-begin; type=command-response; flags=continuation)
49 received frame(size=43; request=1; stream=2; streamflags=; type=command-response; flags=continuation)
50 received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
51 received frame(size=11; request=3; stream=2; streamflags=; type=command-response; flags=continuation)
52 received frame(size=1; request=3; stream=2; streamflags=; type=command-response; flags=continuation)
53 received frame(size=0; request=3; stream=2; streamflags=; type=command-response; flags=eos)
@@ -1,2644 +1,2649
1 1 # exchange.py - utility to exchange data between repos.
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import collections
11 11 import hashlib
12 12
13 13 from .i18n import _
14 14 from .node import (
15 15 bin,
16 16 hex,
17 17 nullid,
18 18 nullrev,
19 19 )
20 20 from .thirdparty import (
21 21 attr,
22 22 )
23 23 from . import (
24 24 bookmarks as bookmod,
25 25 bundle2,
26 26 changegroup,
27 27 discovery,
28 28 error,
29 exchangev2,
29 30 lock as lockmod,
30 31 logexchange,
31 32 narrowspec,
32 33 obsolete,
33 34 phases,
34 35 pushkey,
35 36 pycompat,
36 37 repository,
37 38 scmutil,
38 39 sslutil,
39 40 streamclone,
40 41 url as urlmod,
41 42 util,
42 43 )
43 44 from .utils import (
44 45 stringutil,
45 46 )
46 47
47 48 urlerr = util.urlerr
48 49 urlreq = util.urlreq
49 50
50 51 _NARROWACL_SECTION = 'narrowhgacl'
51 52
52 53 # Maps bundle version human names to changegroup versions.
53 54 _bundlespeccgversions = {'v1': '01',
54 55 'v2': '02',
55 56 'packed1': 's1',
56 57 'bundle2': '02', #legacy
57 58 }
58 59
59 60 # Maps bundle version with content opts to choose which part to bundle
60 61 _bundlespeccontentopts = {
61 62 'v1': {
62 63 'changegroup': True,
63 64 'cg.version': '01',
64 65 'obsolescence': False,
65 66 'phases': False,
66 67 'tagsfnodescache': False,
67 68 'revbranchcache': False
68 69 },
69 70 'v2': {
70 71 'changegroup': True,
71 72 'cg.version': '02',
72 73 'obsolescence': False,
73 74 'phases': False,
74 75 'tagsfnodescache': True,
75 76 'revbranchcache': True
76 77 },
77 78 'packed1' : {
78 79 'cg.version': 's1'
79 80 }
80 81 }
81 82 _bundlespeccontentopts['bundle2'] = _bundlespeccontentopts['v2']
82 83
83 84 _bundlespecvariants = {"streamv2": {"changegroup": False, "streamv2": True,
84 85 "tagsfnodescache": False,
85 86 "revbranchcache": False}}
86 87
87 88 # Compression engines allowed in version 1. THIS SHOULD NEVER CHANGE.
88 89 _bundlespecv1compengines = {'gzip', 'bzip2', 'none'}
89 90
90 91 @attr.s
91 92 class bundlespec(object):
92 93 compression = attr.ib()
93 94 wirecompression = attr.ib()
94 95 version = attr.ib()
95 96 wireversion = attr.ib()
96 97 params = attr.ib()
97 98 contentopts = attr.ib()
98 99
99 100 def parsebundlespec(repo, spec, strict=True):
100 101 """Parse a bundle string specification into parts.
101 102
102 103 Bundle specifications denote a well-defined bundle/exchange format.
103 104 The content of a given specification should not change over time in
104 105 order to ensure that bundles produced by a newer version of Mercurial are
105 106 readable from an older version.
106 107
107 108 The string currently has the form:
108 109
109 110 <compression>-<type>[;<parameter0>[;<parameter1>]]
110 111
111 112 Where <compression> is one of the supported compression formats
112 113 and <type> is (currently) a version string. A ";" can follow the type and
113 114 all text afterwards is interpreted as URI encoded, ";" delimited key=value
114 115 pairs.
115 116
116 117 If ``strict`` is True (the default) <compression> is required. Otherwise,
117 118 it is optional.
118 119
119 120 Returns a bundlespec object of (compression, version, parameters).
120 121 Compression will be ``None`` if not in strict mode and a compression isn't
121 122 defined.
122 123
123 124 An ``InvalidBundleSpecification`` is raised when the specification is
124 125 not syntactically well formed.
125 126
126 127 An ``UnsupportedBundleSpecification`` is raised when the compression or
127 128 bundle type/version is not recognized.
128 129
129 130 Note: this function will likely eventually return a more complex data
130 131 structure, including bundle2 part information.
131 132 """
132 133 def parseparams(s):
133 134 if ';' not in s:
134 135 return s, {}
135 136
136 137 params = {}
137 138 version, paramstr = s.split(';', 1)
138 139
139 140 for p in paramstr.split(';'):
140 141 if '=' not in p:
141 142 raise error.InvalidBundleSpecification(
142 143 _('invalid bundle specification: '
143 144 'missing "=" in parameter: %s') % p)
144 145
145 146 key, value = p.split('=', 1)
146 147 key = urlreq.unquote(key)
147 148 value = urlreq.unquote(value)
148 149 params[key] = value
149 150
150 151 return version, params
151 152
152 153
153 154 if strict and '-' not in spec:
154 155 raise error.InvalidBundleSpecification(
155 156 _('invalid bundle specification; '
156 157 'must be prefixed with compression: %s') % spec)
157 158
158 159 if '-' in spec:
159 160 compression, version = spec.split('-', 1)
160 161
161 162 if compression not in util.compengines.supportedbundlenames:
162 163 raise error.UnsupportedBundleSpecification(
163 164 _('%s compression is not supported') % compression)
164 165
165 166 version, params = parseparams(version)
166 167
167 168 if version not in _bundlespeccgversions:
168 169 raise error.UnsupportedBundleSpecification(
169 170 _('%s is not a recognized bundle version') % version)
170 171 else:
171 172 # Value could be just the compression or just the version, in which
172 173 # case some defaults are assumed (but only when not in strict mode).
173 174 assert not strict
174 175
175 176 spec, params = parseparams(spec)
176 177
177 178 if spec in util.compengines.supportedbundlenames:
178 179 compression = spec
179 180 version = 'v1'
180 181 # Generaldelta repos require v2.
181 182 if 'generaldelta' in repo.requirements:
182 183 version = 'v2'
183 184 # Modern compression engines require v2.
184 185 if compression not in _bundlespecv1compengines:
185 186 version = 'v2'
186 187 elif spec in _bundlespeccgversions:
187 188 if spec == 'packed1':
188 189 compression = 'none'
189 190 else:
190 191 compression = 'bzip2'
191 192 version = spec
192 193 else:
193 194 raise error.UnsupportedBundleSpecification(
194 195 _('%s is not a recognized bundle specification') % spec)
195 196
196 197 # Bundle version 1 only supports a known set of compression engines.
197 198 if version == 'v1' and compression not in _bundlespecv1compengines:
198 199 raise error.UnsupportedBundleSpecification(
199 200 _('compression engine %s is not supported on v1 bundles') %
200 201 compression)
201 202
202 203 # The specification for packed1 can optionally declare the data formats
203 204 # required to apply it. If we see this metadata, compare against what the
204 205 # repo supports and error if the bundle isn't compatible.
205 206 if version == 'packed1' and 'requirements' in params:
206 207 requirements = set(params['requirements'].split(','))
207 208 missingreqs = requirements - repo.supportedformats
208 209 if missingreqs:
209 210 raise error.UnsupportedBundleSpecification(
210 211 _('missing support for repository features: %s') %
211 212 ', '.join(sorted(missingreqs)))
212 213
213 214 # Compute contentopts based on the version
214 215 contentopts = _bundlespeccontentopts.get(version, {}).copy()
215 216
216 217 # Process the variants
217 218 if "stream" in params and params["stream"] == "v2":
218 219 variant = _bundlespecvariants["streamv2"]
219 220 contentopts.update(variant)
220 221
221 222 engine = util.compengines.forbundlename(compression)
222 223 compression, wirecompression = engine.bundletype()
223 224 wireversion = _bundlespeccgversions[version]
224 225
225 226 return bundlespec(compression, wirecompression, version, wireversion,
226 227 params, contentopts)
227 228
228 229 def readbundle(ui, fh, fname, vfs=None):
229 230 header = changegroup.readexactly(fh, 4)
230 231
231 232 alg = None
232 233 if not fname:
233 234 fname = "stream"
234 235 if not header.startswith('HG') and header.startswith('\0'):
235 236 fh = changegroup.headerlessfixup(fh, header)
236 237 header = "HG10"
237 238 alg = 'UN'
238 239 elif vfs:
239 240 fname = vfs.join(fname)
240 241
241 242 magic, version = header[0:2], header[2:4]
242 243
243 244 if magic != 'HG':
244 245 raise error.Abort(_('%s: not a Mercurial bundle') % fname)
245 246 if version == '10':
246 247 if alg is None:
247 248 alg = changegroup.readexactly(fh, 2)
248 249 return changegroup.cg1unpacker(fh, alg)
249 250 elif version.startswith('2'):
250 251 return bundle2.getunbundler(ui, fh, magicstring=magic + version)
251 252 elif version == 'S1':
252 253 return streamclone.streamcloneapplier(fh)
253 254 else:
254 255 raise error.Abort(_('%s: unknown bundle version %s') % (fname, version))
255 256
256 257 def getbundlespec(ui, fh):
257 258 """Infer the bundlespec from a bundle file handle.
258 259
259 260 The input file handle is seeked and the original seek position is not
260 261 restored.
261 262 """
262 263 def speccompression(alg):
263 264 try:
264 265 return util.compengines.forbundletype(alg).bundletype()[0]
265 266 except KeyError:
266 267 return None
267 268
268 269 b = readbundle(ui, fh, None)
269 270 if isinstance(b, changegroup.cg1unpacker):
270 271 alg = b._type
271 272 if alg == '_truncatedBZ':
272 273 alg = 'BZ'
273 274 comp = speccompression(alg)
274 275 if not comp:
275 276 raise error.Abort(_('unknown compression algorithm: %s') % alg)
276 277 return '%s-v1' % comp
277 278 elif isinstance(b, bundle2.unbundle20):
278 279 if 'Compression' in b.params:
279 280 comp = speccompression(b.params['Compression'])
280 281 if not comp:
281 282 raise error.Abort(_('unknown compression algorithm: %s') % comp)
282 283 else:
283 284 comp = 'none'
284 285
285 286 version = None
286 287 for part in b.iterparts():
287 288 if part.type == 'changegroup':
288 289 version = part.params['version']
289 290 if version in ('01', '02'):
290 291 version = 'v2'
291 292 else:
292 293 raise error.Abort(_('changegroup version %s does not have '
293 294 'a known bundlespec') % version,
294 295 hint=_('try upgrading your Mercurial '
295 296 'client'))
296 297 elif part.type == 'stream2' and version is None:
297 298 # A stream2 part requires to be part of a v2 bundle
298 299 version = "v2"
299 300 requirements = urlreq.unquote(part.params['requirements'])
300 301 splitted = requirements.split()
301 302 params = bundle2._formatrequirementsparams(splitted)
302 303 return 'none-v2;stream=v2;%s' % params
303 304
304 305 if not version:
305 306 raise error.Abort(_('could not identify changegroup version in '
306 307 'bundle'))
307 308
308 309 return '%s-%s' % (comp, version)
309 310 elif isinstance(b, streamclone.streamcloneapplier):
310 311 requirements = streamclone.readbundle1header(fh)[2]
311 312 formatted = bundle2._formatrequirementsparams(requirements)
312 313 return 'none-packed1;%s' % formatted
313 314 else:
314 315 raise error.Abort(_('unknown bundle type: %s') % b)
315 316
316 317 def _computeoutgoing(repo, heads, common):
317 318 """Computes which revs are outgoing given a set of common
318 319 and a set of heads.
319 320
320 321 This is a separate function so extensions can have access to
321 322 the logic.
322 323
323 324 Returns a discovery.outgoing object.
324 325 """
325 326 cl = repo.changelog
326 327 if common:
327 328 hasnode = cl.hasnode
328 329 common = [n for n in common if hasnode(n)]
329 330 else:
330 331 common = [nullid]
331 332 if not heads:
332 333 heads = cl.heads()
333 334 return discovery.outgoing(repo, common, heads)
334 335
335 336 def _forcebundle1(op):
336 337 """return true if a pull/push must use bundle1
337 338
338 339 This function is used to allow testing of the older bundle version"""
339 340 ui = op.repo.ui
340 341 # The goal is this config is to allow developer to choose the bundle
341 342 # version used during exchanged. This is especially handy during test.
342 343 # Value is a list of bundle version to be picked from, highest version
343 344 # should be used.
344 345 #
345 346 # developer config: devel.legacy.exchange
346 347 exchange = ui.configlist('devel', 'legacy.exchange')
347 348 forcebundle1 = 'bundle2' not in exchange and 'bundle1' in exchange
348 349 return forcebundle1 or not op.remote.capable('bundle2')
349 350
350 351 class pushoperation(object):
351 352 """A object that represent a single push operation
352 353
353 354 Its purpose is to carry push related state and very common operations.
354 355
355 356 A new pushoperation should be created at the beginning of each push and
356 357 discarded afterward.
357 358 """
358 359
359 360 def __init__(self, repo, remote, force=False, revs=None, newbranch=False,
360 361 bookmarks=(), pushvars=None):
361 362 # repo we push from
362 363 self.repo = repo
363 364 self.ui = repo.ui
364 365 # repo we push to
365 366 self.remote = remote
366 367 # force option provided
367 368 self.force = force
368 369 # revs to be pushed (None is "all")
369 370 self.revs = revs
370 371 # bookmark explicitly pushed
371 372 self.bookmarks = bookmarks
372 373 # allow push of new branch
373 374 self.newbranch = newbranch
374 375 # step already performed
375 376 # (used to check what steps have been already performed through bundle2)
376 377 self.stepsdone = set()
377 378 # Integer version of the changegroup push result
378 379 # - None means nothing to push
379 380 # - 0 means HTTP error
380 381 # - 1 means we pushed and remote head count is unchanged *or*
381 382 # we have outgoing changesets but refused to push
382 383 # - other values as described by addchangegroup()
383 384 self.cgresult = None
384 385 # Boolean value for the bookmark push
385 386 self.bkresult = None
386 387 # discover.outgoing object (contains common and outgoing data)
387 388 self.outgoing = None
388 389 # all remote topological heads before the push
389 390 self.remoteheads = None
390 391 # Details of the remote branch pre and post push
391 392 #
392 393 # mapping: {'branch': ([remoteheads],
393 394 # [newheads],
394 395 # [unsyncedheads],
395 396 # [discardedheads])}
396 397 # - branch: the branch name
397 398 # - remoteheads: the list of remote heads known locally
398 399 # None if the branch is new
399 400 # - newheads: the new remote heads (known locally) with outgoing pushed
400 401 # - unsyncedheads: the list of remote heads unknown locally.
401 402 # - discardedheads: the list of remote heads made obsolete by the push
402 403 self.pushbranchmap = None
403 404 # testable as a boolean indicating if any nodes are missing locally.
404 405 self.incoming = None
405 406 # summary of the remote phase situation
406 407 self.remotephases = None
407 408 # phases changes that must be pushed along side the changesets
408 409 self.outdatedphases = None
409 410 # phases changes that must be pushed if changeset push fails
410 411 self.fallbackoutdatedphases = None
411 412 # outgoing obsmarkers
412 413 self.outobsmarkers = set()
413 414 # outgoing bookmarks
414 415 self.outbookmarks = []
415 416 # transaction manager
416 417 self.trmanager = None
417 418 # map { pushkey partid -> callback handling failure}
418 419 # used to handle exception from mandatory pushkey part failure
419 420 self.pkfailcb = {}
420 421 # an iterable of pushvars or None
421 422 self.pushvars = pushvars
422 423
423 424 @util.propertycache
424 425 def futureheads(self):
425 426 """future remote heads if the changeset push succeeds"""
426 427 return self.outgoing.missingheads
427 428
428 429 @util.propertycache
429 430 def fallbackheads(self):
430 431 """future remote heads if the changeset push fails"""
431 432 if self.revs is None:
432 433 # not target to push, all common are relevant
433 434 return self.outgoing.commonheads
434 435 unfi = self.repo.unfiltered()
435 436 # I want cheads = heads(::missingheads and ::commonheads)
436 437 # (missingheads is revs with secret changeset filtered out)
437 438 #
438 439 # This can be expressed as:
439 440 # cheads = ( (missingheads and ::commonheads)
440 441 # + (commonheads and ::missingheads))"
441 442 # )
442 443 #
443 444 # while trying to push we already computed the following:
444 445 # common = (::commonheads)
445 446 # missing = ((commonheads::missingheads) - commonheads)
446 447 #
447 448 # We can pick:
448 449 # * missingheads part of common (::commonheads)
449 450 common = self.outgoing.common
450 451 nm = self.repo.changelog.nodemap
451 452 cheads = [node for node in self.revs if nm[node] in common]
452 453 # and
453 454 # * commonheads parents on missing
454 455 revset = unfi.set('%ln and parents(roots(%ln))',
455 456 self.outgoing.commonheads,
456 457 self.outgoing.missing)
457 458 cheads.extend(c.node() for c in revset)
458 459 return cheads
459 460
460 461 @property
461 462 def commonheads(self):
462 463 """set of all common heads after changeset bundle push"""
463 464 if self.cgresult:
464 465 return self.futureheads
465 466 else:
466 467 return self.fallbackheads
467 468
468 469 # mapping of message used when pushing bookmark
469 470 bookmsgmap = {'update': (_("updating bookmark %s\n"),
470 471 _('updating bookmark %s failed!\n')),
471 472 'export': (_("exporting bookmark %s\n"),
472 473 _('exporting bookmark %s failed!\n')),
473 474 'delete': (_("deleting remote bookmark %s\n"),
474 475 _('deleting remote bookmark %s failed!\n')),
475 476 }
476 477
477 478
478 479 def push(repo, remote, force=False, revs=None, newbranch=False, bookmarks=(),
479 480 opargs=None):
480 481 '''Push outgoing changesets (limited by revs) from a local
481 482 repository to remote. Return an integer:
482 483 - None means nothing to push
483 484 - 0 means HTTP error
484 485 - 1 means we pushed and remote head count is unchanged *or*
485 486 we have outgoing changesets but refused to push
486 487 - other values as described by addchangegroup()
487 488 '''
488 489 if opargs is None:
489 490 opargs = {}
490 491 pushop = pushoperation(repo, remote, force, revs, newbranch, bookmarks,
491 492 **pycompat.strkwargs(opargs))
492 493 if pushop.remote.local():
493 494 missing = (set(pushop.repo.requirements)
494 495 - pushop.remote.local().supported)
495 496 if missing:
496 497 msg = _("required features are not"
497 498 " supported in the destination:"
498 499 " %s") % (', '.join(sorted(missing)))
499 500 raise error.Abort(msg)
500 501
501 502 if not pushop.remote.canpush():
502 503 raise error.Abort(_("destination does not support push"))
503 504
504 505 if not pushop.remote.capable('unbundle'):
505 506 raise error.Abort(_('cannot push: destination does not support the '
506 507 'unbundle wire protocol command'))
507 508
508 509 # get lock as we might write phase data
509 510 wlock = lock = None
510 511 try:
511 512 # bundle2 push may receive a reply bundle touching bookmarks or other
512 513 # things requiring the wlock. Take it now to ensure proper ordering.
513 514 maypushback = pushop.ui.configbool('experimental', 'bundle2.pushback')
514 515 if (not _forcebundle1(pushop)) and maypushback:
515 516 wlock = pushop.repo.wlock()
516 517 lock = pushop.repo.lock()
517 518 pushop.trmanager = transactionmanager(pushop.repo,
518 519 'push-response',
519 520 pushop.remote.url())
520 521 except error.LockUnavailable as err:
521 522 # source repo cannot be locked.
522 523 # We do not abort the push, but just disable the local phase
523 524 # synchronisation.
524 525 msg = 'cannot lock source repository: %s\n' % err
525 526 pushop.ui.debug(msg)
526 527
527 528 with wlock or util.nullcontextmanager(), \
528 529 lock or util.nullcontextmanager(), \
529 530 pushop.trmanager or util.nullcontextmanager():
530 531 pushop.repo.checkpush(pushop)
531 532 _pushdiscovery(pushop)
532 533 if not _forcebundle1(pushop):
533 534 _pushbundle2(pushop)
534 535 _pushchangeset(pushop)
535 536 _pushsyncphase(pushop)
536 537 _pushobsolete(pushop)
537 538 _pushbookmark(pushop)
538 539
539 540 if repo.ui.configbool('experimental', 'remotenames'):
540 541 logexchange.pullremotenames(repo, remote)
541 542
542 543 return pushop
543 544
544 545 # list of steps to perform discovery before push
545 546 pushdiscoveryorder = []
546 547
547 548 # Mapping between step name and function
548 549 #
549 550 # This exists to help extensions wrap steps if necessary
550 551 pushdiscoverymapping = {}
551 552
552 553 def pushdiscovery(stepname):
553 554 """decorator for function performing discovery before push
554 555
555 556 The function is added to the step -> function mapping and appended to the
556 557 list of steps. Beware that decorated function will be added in order (this
557 558 may matter).
558 559
559 560 You can only use this decorator for a new step, if you want to wrap a step
560 561 from an extension, change the pushdiscovery dictionary directly."""
561 562 def dec(func):
562 563 assert stepname not in pushdiscoverymapping
563 564 pushdiscoverymapping[stepname] = func
564 565 pushdiscoveryorder.append(stepname)
565 566 return func
566 567 return dec
567 568
568 569 def _pushdiscovery(pushop):
569 570 """Run all discovery steps"""
570 571 for stepname in pushdiscoveryorder:
571 572 step = pushdiscoverymapping[stepname]
572 573 step(pushop)
573 574
574 575 @pushdiscovery('changeset')
575 576 def _pushdiscoverychangeset(pushop):
576 577 """discover the changeset that need to be pushed"""
577 578 fci = discovery.findcommonincoming
578 579 if pushop.revs:
579 580 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force,
580 581 ancestorsof=pushop.revs)
581 582 else:
582 583 commoninc = fci(pushop.repo, pushop.remote, force=pushop.force)
583 584 common, inc, remoteheads = commoninc
584 585 fco = discovery.findcommonoutgoing
585 586 outgoing = fco(pushop.repo, pushop.remote, onlyheads=pushop.revs,
586 587 commoninc=commoninc, force=pushop.force)
587 588 pushop.outgoing = outgoing
588 589 pushop.remoteheads = remoteheads
589 590 pushop.incoming = inc
590 591
591 592 @pushdiscovery('phase')
592 593 def _pushdiscoveryphase(pushop):
593 594 """discover the phase that needs to be pushed
594 595
595 596 (computed for both success and failure case for changesets push)"""
596 597 outgoing = pushop.outgoing
597 598 unfi = pushop.repo.unfiltered()
598 599 remotephases = listkeys(pushop.remote, 'phases')
599 600
600 601 if (pushop.ui.configbool('ui', '_usedassubrepo')
601 602 and remotephases # server supports phases
602 603 and not pushop.outgoing.missing # no changesets to be pushed
603 604 and remotephases.get('publishing', False)):
604 605 # When:
605 606 # - this is a subrepo push
606 607 # - and remote support phase
607 608 # - and no changeset are to be pushed
608 609 # - and remote is publishing
609 610 # We may be in issue 3781 case!
610 611 # We drop the possible phase synchronisation done by
611 612 # courtesy to publish changesets possibly locally draft
612 613 # on the remote.
613 614 pushop.outdatedphases = []
614 615 pushop.fallbackoutdatedphases = []
615 616 return
616 617
617 618 pushop.remotephases = phases.remotephasessummary(pushop.repo,
618 619 pushop.fallbackheads,
619 620 remotephases)
620 621 droots = pushop.remotephases.draftroots
621 622
622 623 extracond = ''
623 624 if not pushop.remotephases.publishing:
624 625 extracond = ' and public()'
625 626 revset = 'heads((%%ln::%%ln) %s)' % extracond
626 627 # Get the list of all revs draft on remote by public here.
627 628 # XXX Beware that revset break if droots is not strictly
628 629 # XXX root we may want to ensure it is but it is costly
629 630 fallback = list(unfi.set(revset, droots, pushop.fallbackheads))
630 631 if not outgoing.missing:
631 632 future = fallback
632 633 else:
633 634 # adds changeset we are going to push as draft
634 635 #
635 636 # should not be necessary for publishing server, but because of an
636 637 # issue fixed in xxxxx we have to do it anyway.
637 638 fdroots = list(unfi.set('roots(%ln + %ln::)',
638 639 outgoing.missing, droots))
639 640 fdroots = [f.node() for f in fdroots]
640 641 future = list(unfi.set(revset, fdroots, pushop.futureheads))
641 642 pushop.outdatedphases = future
642 643 pushop.fallbackoutdatedphases = fallback
643 644
644 645 @pushdiscovery('obsmarker')
645 646 def _pushdiscoveryobsmarkers(pushop):
646 647 if not obsolete.isenabled(pushop.repo, obsolete.exchangeopt):
647 648 return
648 649
649 650 if not pushop.repo.obsstore:
650 651 return
651 652
652 653 if 'obsolete' not in listkeys(pushop.remote, 'namespaces'):
653 654 return
654 655
655 656 repo = pushop.repo
656 657 # very naive computation, that can be quite expensive on big repo.
657 658 # However: evolution is currently slow on them anyway.
658 659 nodes = (c.node() for c in repo.set('::%ln', pushop.futureheads))
659 660 pushop.outobsmarkers = pushop.repo.obsstore.relevantmarkers(nodes)
660 661
661 662 @pushdiscovery('bookmarks')
662 663 def _pushdiscoverybookmarks(pushop):
663 664 ui = pushop.ui
664 665 repo = pushop.repo.unfiltered()
665 666 remote = pushop.remote
666 667 ui.debug("checking for updated bookmarks\n")
667 668 ancestors = ()
668 669 if pushop.revs:
669 670 revnums = pycompat.maplist(repo.changelog.rev, pushop.revs)
670 671 ancestors = repo.changelog.ancestors(revnums, inclusive=True)
671 672
672 673 remotebookmark = listkeys(remote, 'bookmarks')
673 674
674 675 explicit = set([repo._bookmarks.expandname(bookmark)
675 676 for bookmark in pushop.bookmarks])
676 677
677 678 remotebookmark = bookmod.unhexlifybookmarks(remotebookmark)
678 679 comp = bookmod.comparebookmarks(repo, repo._bookmarks, remotebookmark)
679 680
680 681 def safehex(x):
681 682 if x is None:
682 683 return x
683 684 return hex(x)
684 685
685 686 def hexifycompbookmarks(bookmarks):
686 687 return [(b, safehex(scid), safehex(dcid))
687 688 for (b, scid, dcid) in bookmarks]
688 689
689 690 comp = [hexifycompbookmarks(marks) for marks in comp]
690 691 return _processcompared(pushop, ancestors, explicit, remotebookmark, comp)
691 692
692 693 def _processcompared(pushop, pushed, explicit, remotebms, comp):
693 694 """take decision on bookmark to pull from the remote bookmark
694 695
695 696 Exist to help extensions who want to alter this behavior.
696 697 """
697 698 addsrc, adddst, advsrc, advdst, diverge, differ, invalid, same = comp
698 699
699 700 repo = pushop.repo
700 701
701 702 for b, scid, dcid in advsrc:
702 703 if b in explicit:
703 704 explicit.remove(b)
704 705 if not pushed or repo[scid].rev() in pushed:
705 706 pushop.outbookmarks.append((b, dcid, scid))
706 707 # search added bookmark
707 708 for b, scid, dcid in addsrc:
708 709 if b in explicit:
709 710 explicit.remove(b)
710 711 pushop.outbookmarks.append((b, '', scid))
711 712 # search for overwritten bookmark
712 713 for b, scid, dcid in list(advdst) + list(diverge) + list(differ):
713 714 if b in explicit:
714 715 explicit.remove(b)
715 716 pushop.outbookmarks.append((b, dcid, scid))
716 717 # search for bookmark to delete
717 718 for b, scid, dcid in adddst:
718 719 if b in explicit:
719 720 explicit.remove(b)
720 721 # treat as "deleted locally"
721 722 pushop.outbookmarks.append((b, dcid, ''))
722 723 # identical bookmarks shouldn't get reported
723 724 for b, scid, dcid in same:
724 725 if b in explicit:
725 726 explicit.remove(b)
726 727
727 728 if explicit:
728 729 explicit = sorted(explicit)
729 730 # we should probably list all of them
730 731 pushop.ui.warn(_('bookmark %s does not exist on the local '
731 732 'or remote repository!\n') % explicit[0])
732 733 pushop.bkresult = 2
733 734
734 735 pushop.outbookmarks.sort()
735 736
736 737 def _pushcheckoutgoing(pushop):
737 738 outgoing = pushop.outgoing
738 739 unfi = pushop.repo.unfiltered()
739 740 if not outgoing.missing:
740 741 # nothing to push
741 742 scmutil.nochangesfound(unfi.ui, unfi, outgoing.excluded)
742 743 return False
743 744 # something to push
744 745 if not pushop.force:
745 746 # if repo.obsstore == False --> no obsolete
746 747 # then, save the iteration
747 748 if unfi.obsstore:
748 749 # this message are here for 80 char limit reason
749 750 mso = _("push includes obsolete changeset: %s!")
750 751 mspd = _("push includes phase-divergent changeset: %s!")
751 752 mscd = _("push includes content-divergent changeset: %s!")
752 753 mst = {"orphan": _("push includes orphan changeset: %s!"),
753 754 "phase-divergent": mspd,
754 755 "content-divergent": mscd}
755 756 # If we are to push if there is at least one
756 757 # obsolete or unstable changeset in missing, at
757 758 # least one of the missinghead will be obsolete or
758 759 # unstable. So checking heads only is ok
759 760 for node in outgoing.missingheads:
760 761 ctx = unfi[node]
761 762 if ctx.obsolete():
762 763 raise error.Abort(mso % ctx)
763 764 elif ctx.isunstable():
764 765 # TODO print more than one instability in the abort
765 766 # message
766 767 raise error.Abort(mst[ctx.instabilities()[0]] % ctx)
767 768
768 769 discovery.checkheads(pushop)
769 770 return True
770 771
771 772 # List of names of steps to perform for an outgoing bundle2, order matters.
772 773 b2partsgenorder = []
773 774
774 775 # Mapping between step name and function
775 776 #
776 777 # This exists to help extensions wrap steps if necessary
777 778 b2partsgenmapping = {}
778 779
779 780 def b2partsgenerator(stepname, idx=None):
780 781 """decorator for function generating bundle2 part
781 782
782 783 The function is added to the step -> function mapping and appended to the
783 784 list of steps. Beware that decorated functions will be added in order
784 785 (this may matter).
785 786
786 787 You can only use this decorator for new steps, if you want to wrap a step
787 788 from an extension, attack the b2partsgenmapping dictionary directly."""
788 789 def dec(func):
789 790 assert stepname not in b2partsgenmapping
790 791 b2partsgenmapping[stepname] = func
791 792 if idx is None:
792 793 b2partsgenorder.append(stepname)
793 794 else:
794 795 b2partsgenorder.insert(idx, stepname)
795 796 return func
796 797 return dec
797 798
798 799 def _pushb2ctxcheckheads(pushop, bundler):
799 800 """Generate race condition checking parts
800 801
801 802 Exists as an independent function to aid extensions
802 803 """
803 804 # * 'force' do not check for push race,
804 805 # * if we don't push anything, there are nothing to check.
805 806 if not pushop.force and pushop.outgoing.missingheads:
806 807 allowunrelated = 'related' in bundler.capabilities.get('checkheads', ())
807 808 emptyremote = pushop.pushbranchmap is None
808 809 if not allowunrelated or emptyremote:
809 810 bundler.newpart('check:heads', data=iter(pushop.remoteheads))
810 811 else:
811 812 affected = set()
812 813 for branch, heads in pushop.pushbranchmap.iteritems():
813 814 remoteheads, newheads, unsyncedheads, discardedheads = heads
814 815 if remoteheads is not None:
815 816 remote = set(remoteheads)
816 817 affected |= set(discardedheads) & remote
817 818 affected |= remote - set(newheads)
818 819 if affected:
819 820 data = iter(sorted(affected))
820 821 bundler.newpart('check:updated-heads', data=data)
821 822
822 823 def _pushing(pushop):
823 824 """return True if we are pushing anything"""
824 825 return bool(pushop.outgoing.missing
825 826 or pushop.outdatedphases
826 827 or pushop.outobsmarkers
827 828 or pushop.outbookmarks)
828 829
829 830 @b2partsgenerator('check-bookmarks')
830 831 def _pushb2checkbookmarks(pushop, bundler):
831 832 """insert bookmark move checking"""
832 833 if not _pushing(pushop) or pushop.force:
833 834 return
834 835 b2caps = bundle2.bundle2caps(pushop.remote)
835 836 hasbookmarkcheck = 'bookmarks' in b2caps
836 837 if not (pushop.outbookmarks and hasbookmarkcheck):
837 838 return
838 839 data = []
839 840 for book, old, new in pushop.outbookmarks:
840 841 old = bin(old)
841 842 data.append((book, old))
842 843 checkdata = bookmod.binaryencode(data)
843 844 bundler.newpart('check:bookmarks', data=checkdata)
844 845
845 846 @b2partsgenerator('check-phases')
846 847 def _pushb2checkphases(pushop, bundler):
847 848 """insert phase move checking"""
848 849 if not _pushing(pushop) or pushop.force:
849 850 return
850 851 b2caps = bundle2.bundle2caps(pushop.remote)
851 852 hasphaseheads = 'heads' in b2caps.get('phases', ())
852 853 if pushop.remotephases is not None and hasphaseheads:
853 854 # check that the remote phase has not changed
854 855 checks = [[] for p in phases.allphases]
855 856 checks[phases.public].extend(pushop.remotephases.publicheads)
856 857 checks[phases.draft].extend(pushop.remotephases.draftroots)
857 858 if any(checks):
858 859 for nodes in checks:
859 860 nodes.sort()
860 861 checkdata = phases.binaryencode(checks)
861 862 bundler.newpart('check:phases', data=checkdata)
862 863
863 864 @b2partsgenerator('changeset')
864 865 def _pushb2ctx(pushop, bundler):
865 866 """handle changegroup push through bundle2
866 867
867 868 addchangegroup result is stored in the ``pushop.cgresult`` attribute.
868 869 """
869 870 if 'changesets' in pushop.stepsdone:
870 871 return
871 872 pushop.stepsdone.add('changesets')
872 873 # Send known heads to the server for race detection.
873 874 if not _pushcheckoutgoing(pushop):
874 875 return
875 876 pushop.repo.prepushoutgoinghooks(pushop)
876 877
877 878 _pushb2ctxcheckheads(pushop, bundler)
878 879
879 880 b2caps = bundle2.bundle2caps(pushop.remote)
880 881 version = '01'
881 882 cgversions = b2caps.get('changegroup')
882 883 if cgversions: # 3.1 and 3.2 ship with an empty value
883 884 cgversions = [v for v in cgversions
884 885 if v in changegroup.supportedoutgoingversions(
885 886 pushop.repo)]
886 887 if not cgversions:
887 888 raise ValueError(_('no common changegroup version'))
888 889 version = max(cgversions)
889 890 cgstream = changegroup.makestream(pushop.repo, pushop.outgoing, version,
890 891 'push')
891 892 cgpart = bundler.newpart('changegroup', data=cgstream)
892 893 if cgversions:
893 894 cgpart.addparam('version', version)
894 895 if 'treemanifest' in pushop.repo.requirements:
895 896 cgpart.addparam('treemanifest', '1')
896 897 def handlereply(op):
897 898 """extract addchangegroup returns from server reply"""
898 899 cgreplies = op.records.getreplies(cgpart.id)
899 900 assert len(cgreplies['changegroup']) == 1
900 901 pushop.cgresult = cgreplies['changegroup'][0]['return']
901 902 return handlereply
902 903
903 904 @b2partsgenerator('phase')
904 905 def _pushb2phases(pushop, bundler):
905 906 """handle phase push through bundle2"""
906 907 if 'phases' in pushop.stepsdone:
907 908 return
908 909 b2caps = bundle2.bundle2caps(pushop.remote)
909 910 ui = pushop.repo.ui
910 911
911 912 legacyphase = 'phases' in ui.configlist('devel', 'legacy.exchange')
912 913 haspushkey = 'pushkey' in b2caps
913 914 hasphaseheads = 'heads' in b2caps.get('phases', ())
914 915
915 916 if hasphaseheads and not legacyphase:
916 917 return _pushb2phaseheads(pushop, bundler)
917 918 elif haspushkey:
918 919 return _pushb2phasespushkey(pushop, bundler)
919 920
920 921 def _pushb2phaseheads(pushop, bundler):
921 922 """push phase information through a bundle2 - binary part"""
922 923 pushop.stepsdone.add('phases')
923 924 if pushop.outdatedphases:
924 925 updates = [[] for p in phases.allphases]
925 926 updates[0].extend(h.node() for h in pushop.outdatedphases)
926 927 phasedata = phases.binaryencode(updates)
927 928 bundler.newpart('phase-heads', data=phasedata)
928 929
929 930 def _pushb2phasespushkey(pushop, bundler):
930 931 """push phase information through a bundle2 - pushkey part"""
931 932 pushop.stepsdone.add('phases')
932 933 part2node = []
933 934
934 935 def handlefailure(pushop, exc):
935 936 targetid = int(exc.partid)
936 937 for partid, node in part2node:
937 938 if partid == targetid:
938 939 raise error.Abort(_('updating %s to public failed') % node)
939 940
940 941 enc = pushkey.encode
941 942 for newremotehead in pushop.outdatedphases:
942 943 part = bundler.newpart('pushkey')
943 944 part.addparam('namespace', enc('phases'))
944 945 part.addparam('key', enc(newremotehead.hex()))
945 946 part.addparam('old', enc('%d' % phases.draft))
946 947 part.addparam('new', enc('%d' % phases.public))
947 948 part2node.append((part.id, newremotehead))
948 949 pushop.pkfailcb[part.id] = handlefailure
949 950
950 951 def handlereply(op):
951 952 for partid, node in part2node:
952 953 partrep = op.records.getreplies(partid)
953 954 results = partrep['pushkey']
954 955 assert len(results) <= 1
955 956 msg = None
956 957 if not results:
957 958 msg = _('server ignored update of %s to public!\n') % node
958 959 elif not int(results[0]['return']):
959 960 msg = _('updating %s to public failed!\n') % node
960 961 if msg is not None:
961 962 pushop.ui.warn(msg)
962 963 return handlereply
963 964
964 965 @b2partsgenerator('obsmarkers')
965 966 def _pushb2obsmarkers(pushop, bundler):
966 967 if 'obsmarkers' in pushop.stepsdone:
967 968 return
968 969 remoteversions = bundle2.obsmarkersversion(bundler.capabilities)
969 970 if obsolete.commonversion(remoteversions) is None:
970 971 return
971 972 pushop.stepsdone.add('obsmarkers')
972 973 if pushop.outobsmarkers:
973 974 markers = sorted(pushop.outobsmarkers)
974 975 bundle2.buildobsmarkerspart(bundler, markers)
975 976
976 977 @b2partsgenerator('bookmarks')
977 978 def _pushb2bookmarks(pushop, bundler):
978 979 """handle bookmark push through bundle2"""
979 980 if 'bookmarks' in pushop.stepsdone:
980 981 return
981 982 b2caps = bundle2.bundle2caps(pushop.remote)
982 983
983 984 legacy = pushop.repo.ui.configlist('devel', 'legacy.exchange')
984 985 legacybooks = 'bookmarks' in legacy
985 986
986 987 if not legacybooks and 'bookmarks' in b2caps:
987 988 return _pushb2bookmarkspart(pushop, bundler)
988 989 elif 'pushkey' in b2caps:
989 990 return _pushb2bookmarkspushkey(pushop, bundler)
990 991
991 992 def _bmaction(old, new):
992 993 """small utility for bookmark pushing"""
993 994 if not old:
994 995 return 'export'
995 996 elif not new:
996 997 return 'delete'
997 998 return 'update'
998 999
999 1000 def _pushb2bookmarkspart(pushop, bundler):
1000 1001 pushop.stepsdone.add('bookmarks')
1001 1002 if not pushop.outbookmarks:
1002 1003 return
1003 1004
1004 1005 allactions = []
1005 1006 data = []
1006 1007 for book, old, new in pushop.outbookmarks:
1007 1008 new = bin(new)
1008 1009 data.append((book, new))
1009 1010 allactions.append((book, _bmaction(old, new)))
1010 1011 checkdata = bookmod.binaryencode(data)
1011 1012 bundler.newpart('bookmarks', data=checkdata)
1012 1013
1013 1014 def handlereply(op):
1014 1015 ui = pushop.ui
1015 1016 # if success
1016 1017 for book, action in allactions:
1017 1018 ui.status(bookmsgmap[action][0] % book)
1018 1019
1019 1020 return handlereply
1020 1021
1021 1022 def _pushb2bookmarkspushkey(pushop, bundler):
1022 1023 pushop.stepsdone.add('bookmarks')
1023 1024 part2book = []
1024 1025 enc = pushkey.encode
1025 1026
1026 1027 def handlefailure(pushop, exc):
1027 1028 targetid = int(exc.partid)
1028 1029 for partid, book, action in part2book:
1029 1030 if partid == targetid:
1030 1031 raise error.Abort(bookmsgmap[action][1].rstrip() % book)
1031 1032 # we should not be called for part we did not generated
1032 1033 assert False
1033 1034
1034 1035 for book, old, new in pushop.outbookmarks:
1035 1036 part = bundler.newpart('pushkey')
1036 1037 part.addparam('namespace', enc('bookmarks'))
1037 1038 part.addparam('key', enc(book))
1038 1039 part.addparam('old', enc(old))
1039 1040 part.addparam('new', enc(new))
1040 1041 action = 'update'
1041 1042 if not old:
1042 1043 action = 'export'
1043 1044 elif not new:
1044 1045 action = 'delete'
1045 1046 part2book.append((part.id, book, action))
1046 1047 pushop.pkfailcb[part.id] = handlefailure
1047 1048
1048 1049 def handlereply(op):
1049 1050 ui = pushop.ui
1050 1051 for partid, book, action in part2book:
1051 1052 partrep = op.records.getreplies(partid)
1052 1053 results = partrep['pushkey']
1053 1054 assert len(results) <= 1
1054 1055 if not results:
1055 1056 pushop.ui.warn(_('server ignored bookmark %s update\n') % book)
1056 1057 else:
1057 1058 ret = int(results[0]['return'])
1058 1059 if ret:
1059 1060 ui.status(bookmsgmap[action][0] % book)
1060 1061 else:
1061 1062 ui.warn(bookmsgmap[action][1] % book)
1062 1063 if pushop.bkresult is not None:
1063 1064 pushop.bkresult = 1
1064 1065 return handlereply
1065 1066
1066 1067 @b2partsgenerator('pushvars', idx=0)
1067 1068 def _getbundlesendvars(pushop, bundler):
1068 1069 '''send shellvars via bundle2'''
1069 1070 pushvars = pushop.pushvars
1070 1071 if pushvars:
1071 1072 shellvars = {}
1072 1073 for raw in pushvars:
1073 1074 if '=' not in raw:
1074 1075 msg = ("unable to parse variable '%s', should follow "
1075 1076 "'KEY=VALUE' or 'KEY=' format")
1076 1077 raise error.Abort(msg % raw)
1077 1078 k, v = raw.split('=', 1)
1078 1079 shellvars[k] = v
1079 1080
1080 1081 part = bundler.newpart('pushvars')
1081 1082
1082 1083 for key, value in shellvars.iteritems():
1083 1084 part.addparam(key, value, mandatory=False)
1084 1085
1085 1086 def _pushbundle2(pushop):
1086 1087 """push data to the remote using bundle2
1087 1088
1088 1089 The only currently supported type of data is changegroup but this will
1089 1090 evolve in the future."""
1090 1091 bundler = bundle2.bundle20(pushop.ui, bundle2.bundle2caps(pushop.remote))
1091 1092 pushback = (pushop.trmanager
1092 1093 and pushop.ui.configbool('experimental', 'bundle2.pushback'))
1093 1094
1094 1095 # create reply capability
1095 1096 capsblob = bundle2.encodecaps(bundle2.getrepocaps(pushop.repo,
1096 1097 allowpushback=pushback,
1097 1098 role='client'))
1098 1099 bundler.newpart('replycaps', data=capsblob)
1099 1100 replyhandlers = []
1100 1101 for partgenname in b2partsgenorder:
1101 1102 partgen = b2partsgenmapping[partgenname]
1102 1103 ret = partgen(pushop, bundler)
1103 1104 if callable(ret):
1104 1105 replyhandlers.append(ret)
1105 1106 # do not push if nothing to push
1106 1107 if bundler.nbparts <= 1:
1107 1108 return
1108 1109 stream = util.chunkbuffer(bundler.getchunks())
1109 1110 try:
1110 1111 try:
1111 1112 with pushop.remote.commandexecutor() as e:
1112 1113 reply = e.callcommand('unbundle', {
1113 1114 'bundle': stream,
1114 1115 'heads': ['force'],
1115 1116 'url': pushop.remote.url(),
1116 1117 }).result()
1117 1118 except error.BundleValueError as exc:
1118 1119 raise error.Abort(_('missing support for %s') % exc)
1119 1120 try:
1120 1121 trgetter = None
1121 1122 if pushback:
1122 1123 trgetter = pushop.trmanager.transaction
1123 1124 op = bundle2.processbundle(pushop.repo, reply, trgetter)
1124 1125 except error.BundleValueError as exc:
1125 1126 raise error.Abort(_('missing support for %s') % exc)
1126 1127 except bundle2.AbortFromPart as exc:
1127 1128 pushop.ui.status(_('remote: %s\n') % exc)
1128 1129 if exc.hint is not None:
1129 1130 pushop.ui.status(_('remote: %s\n') % ('(%s)' % exc.hint))
1130 1131 raise error.Abort(_('push failed on remote'))
1131 1132 except error.PushkeyFailed as exc:
1132 1133 partid = int(exc.partid)
1133 1134 if partid not in pushop.pkfailcb:
1134 1135 raise
1135 1136 pushop.pkfailcb[partid](pushop, exc)
1136 1137 for rephand in replyhandlers:
1137 1138 rephand(op)
1138 1139
1139 1140 def _pushchangeset(pushop):
1140 1141 """Make the actual push of changeset bundle to remote repo"""
1141 1142 if 'changesets' in pushop.stepsdone:
1142 1143 return
1143 1144 pushop.stepsdone.add('changesets')
1144 1145 if not _pushcheckoutgoing(pushop):
1145 1146 return
1146 1147
1147 1148 # Should have verified this in push().
1148 1149 assert pushop.remote.capable('unbundle')
1149 1150
1150 1151 pushop.repo.prepushoutgoinghooks(pushop)
1151 1152 outgoing = pushop.outgoing
1152 1153 # TODO: get bundlecaps from remote
1153 1154 bundlecaps = None
1154 1155 # create a changegroup from local
1155 1156 if pushop.revs is None and not (outgoing.excluded
1156 1157 or pushop.repo.changelog.filteredrevs):
1157 1158 # push everything,
1158 1159 # use the fast path, no race possible on push
1159 1160 cg = changegroup.makechangegroup(pushop.repo, outgoing, '01', 'push',
1160 1161 fastpath=True, bundlecaps=bundlecaps)
1161 1162 else:
1162 1163 cg = changegroup.makechangegroup(pushop.repo, outgoing, '01',
1163 1164 'push', bundlecaps=bundlecaps)
1164 1165
1165 1166 # apply changegroup to remote
1166 1167 # local repo finds heads on server, finds out what
1167 1168 # revs it must push. once revs transferred, if server
1168 1169 # finds it has different heads (someone else won
1169 1170 # commit/push race), server aborts.
1170 1171 if pushop.force:
1171 1172 remoteheads = ['force']
1172 1173 else:
1173 1174 remoteheads = pushop.remoteheads
1174 1175 # ssh: return remote's addchangegroup()
1175 1176 # http: return remote's addchangegroup() or 0 for error
1176 1177 pushop.cgresult = pushop.remote.unbundle(cg, remoteheads,
1177 1178 pushop.repo.url())
1178 1179
1179 1180 def _pushsyncphase(pushop):
1180 1181 """synchronise phase information locally and remotely"""
1181 1182 cheads = pushop.commonheads
1182 1183 # even when we don't push, exchanging phase data is useful
1183 1184 remotephases = listkeys(pushop.remote, 'phases')
1184 1185 if (pushop.ui.configbool('ui', '_usedassubrepo')
1185 1186 and remotephases # server supports phases
1186 1187 and pushop.cgresult is None # nothing was pushed
1187 1188 and remotephases.get('publishing', False)):
1188 1189 # When:
1189 1190 # - this is a subrepo push
1190 1191 # - and remote support phase
1191 1192 # - and no changeset was pushed
1192 1193 # - and remote is publishing
1193 1194 # We may be in issue 3871 case!
1194 1195 # We drop the possible phase synchronisation done by
1195 1196 # courtesy to publish changesets possibly locally draft
1196 1197 # on the remote.
1197 1198 remotephases = {'publishing': 'True'}
1198 1199 if not remotephases: # old server or public only reply from non-publishing
1199 1200 _localphasemove(pushop, cheads)
1200 1201 # don't push any phase data as there is nothing to push
1201 1202 else:
1202 1203 ana = phases.analyzeremotephases(pushop.repo, cheads,
1203 1204 remotephases)
1204 1205 pheads, droots = ana
1205 1206 ### Apply remote phase on local
1206 1207 if remotephases.get('publishing', False):
1207 1208 _localphasemove(pushop, cheads)
1208 1209 else: # publish = False
1209 1210 _localphasemove(pushop, pheads)
1210 1211 _localphasemove(pushop, cheads, phases.draft)
1211 1212 ### Apply local phase on remote
1212 1213
1213 1214 if pushop.cgresult:
1214 1215 if 'phases' in pushop.stepsdone:
1215 1216 # phases already pushed though bundle2
1216 1217 return
1217 1218 outdated = pushop.outdatedphases
1218 1219 else:
1219 1220 outdated = pushop.fallbackoutdatedphases
1220 1221
1221 1222 pushop.stepsdone.add('phases')
1222 1223
1223 1224 # filter heads already turned public by the push
1224 1225 outdated = [c for c in outdated if c.node() not in pheads]
1225 1226 # fallback to independent pushkey command
1226 1227 for newremotehead in outdated:
1227 1228 with pushop.remote.commandexecutor() as e:
1228 1229 r = e.callcommand('pushkey', {
1229 1230 'namespace': 'phases',
1230 1231 'key': newremotehead.hex(),
1231 1232 'old': '%d' % phases.draft,
1232 1233 'new': '%d' % phases.public
1233 1234 }).result()
1234 1235
1235 1236 if not r:
1236 1237 pushop.ui.warn(_('updating %s to public failed!\n')
1237 1238 % newremotehead)
1238 1239
1239 1240 def _localphasemove(pushop, nodes, phase=phases.public):
1240 1241 """move <nodes> to <phase> in the local source repo"""
1241 1242 if pushop.trmanager:
1242 1243 phases.advanceboundary(pushop.repo,
1243 1244 pushop.trmanager.transaction(),
1244 1245 phase,
1245 1246 nodes)
1246 1247 else:
1247 1248 # repo is not locked, do not change any phases!
1248 1249 # Informs the user that phases should have been moved when
1249 1250 # applicable.
1250 1251 actualmoves = [n for n in nodes if phase < pushop.repo[n].phase()]
1251 1252 phasestr = phases.phasenames[phase]
1252 1253 if actualmoves:
1253 1254 pushop.ui.status(_('cannot lock source repo, skipping '
1254 1255 'local %s phase update\n') % phasestr)
1255 1256
1256 1257 def _pushobsolete(pushop):
1257 1258 """utility function to push obsolete markers to a remote"""
1258 1259 if 'obsmarkers' in pushop.stepsdone:
1259 1260 return
1260 1261 repo = pushop.repo
1261 1262 remote = pushop.remote
1262 1263 pushop.stepsdone.add('obsmarkers')
1263 1264 if pushop.outobsmarkers:
1264 1265 pushop.ui.debug('try to push obsolete markers to remote\n')
1265 1266 rslts = []
1266 1267 remotedata = obsolete._pushkeyescape(sorted(pushop.outobsmarkers))
1267 1268 for key in sorted(remotedata, reverse=True):
1268 1269 # reverse sort to ensure we end with dump0
1269 1270 data = remotedata[key]
1270 1271 rslts.append(remote.pushkey('obsolete', key, '', data))
1271 1272 if [r for r in rslts if not r]:
1272 1273 msg = _('failed to push some obsolete markers!\n')
1273 1274 repo.ui.warn(msg)
1274 1275
1275 1276 def _pushbookmark(pushop):
1276 1277 """Update bookmark position on remote"""
1277 1278 if pushop.cgresult == 0 or 'bookmarks' in pushop.stepsdone:
1278 1279 return
1279 1280 pushop.stepsdone.add('bookmarks')
1280 1281 ui = pushop.ui
1281 1282 remote = pushop.remote
1282 1283
1283 1284 for b, old, new in pushop.outbookmarks:
1284 1285 action = 'update'
1285 1286 if not old:
1286 1287 action = 'export'
1287 1288 elif not new:
1288 1289 action = 'delete'
1289 1290
1290 1291 with remote.commandexecutor() as e:
1291 1292 r = e.callcommand('pushkey', {
1292 1293 'namespace': 'bookmarks',
1293 1294 'key': b,
1294 1295 'old': old,
1295 1296 'new': new,
1296 1297 }).result()
1297 1298
1298 1299 if r:
1299 1300 ui.status(bookmsgmap[action][0] % b)
1300 1301 else:
1301 1302 ui.warn(bookmsgmap[action][1] % b)
1302 1303 # discovery can have set the value form invalid entry
1303 1304 if pushop.bkresult is not None:
1304 1305 pushop.bkresult = 1
1305 1306
1306 1307 class pulloperation(object):
1307 1308 """A object that represent a single pull operation
1308 1309
1309 1310 It purpose is to carry pull related state and very common operation.
1310 1311
1311 1312 A new should be created at the beginning of each pull and discarded
1312 1313 afterward.
1313 1314 """
1314 1315
1315 1316 def __init__(self, repo, remote, heads=None, force=False, bookmarks=(),
1316 1317 remotebookmarks=None, streamclonerequested=None,
1317 1318 includepats=None, excludepats=None):
1318 1319 # repo we pull into
1319 1320 self.repo = repo
1320 1321 # repo we pull from
1321 1322 self.remote = remote
1322 1323 # revision we try to pull (None is "all")
1323 1324 self.heads = heads
1324 1325 # bookmark pulled explicitly
1325 1326 self.explicitbookmarks = [repo._bookmarks.expandname(bookmark)
1326 1327 for bookmark in bookmarks]
1327 1328 # do we force pull?
1328 1329 self.force = force
1329 1330 # whether a streaming clone was requested
1330 1331 self.streamclonerequested = streamclonerequested
1331 1332 # transaction manager
1332 1333 self.trmanager = None
1333 1334 # set of common changeset between local and remote before pull
1334 1335 self.common = None
1335 1336 # set of pulled head
1336 1337 self.rheads = None
1337 1338 # list of missing changeset to fetch remotely
1338 1339 self.fetch = None
1339 1340 # remote bookmarks data
1340 1341 self.remotebookmarks = remotebookmarks
1341 1342 # result of changegroup pulling (used as return code by pull)
1342 1343 self.cgresult = None
1343 1344 # list of step already done
1344 1345 self.stepsdone = set()
1345 1346 # Whether we attempted a clone from pre-generated bundles.
1346 1347 self.clonebundleattempted = False
1347 1348 # Set of file patterns to include.
1348 1349 self.includepats = includepats
1349 1350 # Set of file patterns to exclude.
1350 1351 self.excludepats = excludepats
1351 1352
1352 1353 @util.propertycache
1353 1354 def pulledsubset(self):
1354 1355 """heads of the set of changeset target by the pull"""
1355 1356 # compute target subset
1356 1357 if self.heads is None:
1357 1358 # We pulled every thing possible
1358 1359 # sync on everything common
1359 1360 c = set(self.common)
1360 1361 ret = list(self.common)
1361 1362 for n in self.rheads:
1362 1363 if n not in c:
1363 1364 ret.append(n)
1364 1365 return ret
1365 1366 else:
1366 1367 # We pulled a specific subset
1367 1368 # sync on this subset
1368 1369 return self.heads
1369 1370
1370 1371 @util.propertycache
1371 1372 def canusebundle2(self):
1372 1373 return not _forcebundle1(self)
1373 1374
1374 1375 @util.propertycache
1375 1376 def remotebundle2caps(self):
1376 1377 return bundle2.bundle2caps(self.remote)
1377 1378
1378 1379 def gettransaction(self):
1379 1380 # deprecated; talk to trmanager directly
1380 1381 return self.trmanager.transaction()
1381 1382
1382 1383 class transactionmanager(util.transactional):
1383 1384 """An object to manage the life cycle of a transaction
1384 1385
1385 1386 It creates the transaction on demand and calls the appropriate hooks when
1386 1387 closing the transaction."""
1387 1388 def __init__(self, repo, source, url):
1388 1389 self.repo = repo
1389 1390 self.source = source
1390 1391 self.url = url
1391 1392 self._tr = None
1392 1393
1393 1394 def transaction(self):
1394 1395 """Return an open transaction object, constructing if necessary"""
1395 1396 if not self._tr:
1396 1397 trname = '%s\n%s' % (self.source, util.hidepassword(self.url))
1397 1398 self._tr = self.repo.transaction(trname)
1398 1399 self._tr.hookargs['source'] = self.source
1399 1400 self._tr.hookargs['url'] = self.url
1400 1401 return self._tr
1401 1402
1402 1403 def close(self):
1403 1404 """close transaction if created"""
1404 1405 if self._tr is not None:
1405 1406 self._tr.close()
1406 1407
1407 1408 def release(self):
1408 1409 """release transaction if created"""
1409 1410 if self._tr is not None:
1410 1411 self._tr.release()
1411 1412
1412 1413 def listkeys(remote, namespace):
1413 1414 with remote.commandexecutor() as e:
1414 1415 return e.callcommand('listkeys', {'namespace': namespace}).result()
1415 1416
1416 1417 def _fullpullbundle2(repo, pullop):
1417 1418 # The server may send a partial reply, i.e. when inlining
1418 1419 # pre-computed bundles. In that case, update the common
1419 1420 # set based on the results and pull another bundle.
1420 1421 #
1421 1422 # There are two indicators that the process is finished:
1422 1423 # - no changeset has been added, or
1423 1424 # - all remote heads are known locally.
1424 1425 # The head check must use the unfiltered view as obsoletion
1425 1426 # markers can hide heads.
1426 1427 unfi = repo.unfiltered()
1427 1428 unficl = unfi.changelog
1428 1429 def headsofdiff(h1, h2):
1429 1430 """Returns heads(h1 % h2)"""
1430 1431 res = unfi.set('heads(%ln %% %ln)', h1, h2)
1431 1432 return set(ctx.node() for ctx in res)
1432 1433 def headsofunion(h1, h2):
1433 1434 """Returns heads((h1 + h2) - null)"""
1434 1435 res = unfi.set('heads((%ln + %ln - null))', h1, h2)
1435 1436 return set(ctx.node() for ctx in res)
1436 1437 while True:
1437 1438 old_heads = unficl.heads()
1438 1439 clstart = len(unficl)
1439 1440 _pullbundle2(pullop)
1440 1441 if repository.NARROW_REQUIREMENT in repo.requirements:
1441 1442 # XXX narrow clones filter the heads on the server side during
1442 1443 # XXX getbundle and result in partial replies as well.
1443 1444 # XXX Disable pull bundles in this case as band aid to avoid
1444 1445 # XXX extra round trips.
1445 1446 break
1446 1447 if clstart == len(unficl):
1447 1448 break
1448 1449 if all(unficl.hasnode(n) for n in pullop.rheads):
1449 1450 break
1450 1451 new_heads = headsofdiff(unficl.heads(), old_heads)
1451 1452 pullop.common = headsofunion(new_heads, pullop.common)
1452 1453 pullop.rheads = set(pullop.rheads) - pullop.common
1453 1454
1454 1455 def pull(repo, remote, heads=None, force=False, bookmarks=(), opargs=None,
1455 1456 streamclonerequested=None, includepats=None, excludepats=None):
1456 1457 """Fetch repository data from a remote.
1457 1458
1458 1459 This is the main function used to retrieve data from a remote repository.
1459 1460
1460 1461 ``repo`` is the local repository to clone into.
1461 1462 ``remote`` is a peer instance.
1462 1463 ``heads`` is an iterable of revisions we want to pull. ``None`` (the
1463 1464 default) means to pull everything from the remote.
1464 1465 ``bookmarks`` is an iterable of bookmarks requesting to be pulled. By
1465 1466 default, all remote bookmarks are pulled.
1466 1467 ``opargs`` are additional keyword arguments to pass to ``pulloperation``
1467 1468 initialization.
1468 1469 ``streamclonerequested`` is a boolean indicating whether a "streaming
1469 1470 clone" is requested. A "streaming clone" is essentially a raw file copy
1470 1471 of revlogs from the server. This only works when the local repository is
1471 1472 empty. The default value of ``None`` means to respect the server
1472 1473 configuration for preferring stream clones.
1473 1474 ``includepats`` and ``excludepats`` define explicit file patterns to
1474 1475 include and exclude in storage, respectively. If not defined, narrow
1475 1476 patterns from the repo instance are used, if available.
1476 1477
1477 1478 Returns the ``pulloperation`` created for this pull.
1478 1479 """
1479 1480 if opargs is None:
1480 1481 opargs = {}
1481 1482
1482 1483 # We allow the narrow patterns to be passed in explicitly to provide more
1483 1484 # flexibility for API consumers.
1484 1485 if includepats or excludepats:
1485 1486 includepats = includepats or set()
1486 1487 excludepats = excludepats or set()
1487 1488 else:
1488 1489 includepats, excludepats = repo.narrowpats
1489 1490
1490 1491 narrowspec.validatepatterns(includepats)
1491 1492 narrowspec.validatepatterns(excludepats)
1492 1493
1493 1494 pullop = pulloperation(repo, remote, heads, force, bookmarks=bookmarks,
1494 1495 streamclonerequested=streamclonerequested,
1495 1496 includepats=includepats, excludepats=excludepats,
1496 1497 **pycompat.strkwargs(opargs))
1497 1498
1498 1499 peerlocal = pullop.remote.local()
1499 1500 if peerlocal:
1500 1501 missing = set(peerlocal.requirements) - pullop.repo.supported
1501 1502 if missing:
1502 1503 msg = _("required features are not"
1503 1504 " supported in the destination:"
1504 1505 " %s") % (', '.join(sorted(missing)))
1505 1506 raise error.Abort(msg)
1506 1507
1507 1508 pullop.trmanager = transactionmanager(repo, 'pull', remote.url())
1508 1509 with repo.wlock(), repo.lock(), pullop.trmanager:
1509 # This should ideally be in _pullbundle2(). However, it needs to run
1510 # before discovery to avoid extra work.
1511 _maybeapplyclonebundle(pullop)
1512 streamclone.maybeperformlegacystreamclone(pullop)
1513 _pulldiscovery(pullop)
1514 if pullop.canusebundle2:
1515 _fullpullbundle2(repo, pullop)
1516 _pullchangeset(pullop)
1517 _pullphase(pullop)
1518 _pullbookmarks(pullop)
1519 _pullobsolete(pullop)
1510 # Use the modern wire protocol, if available.
1511 if remote.capable('exchangev2'):
1512 exchangev2.pull(pullop)
1513 else:
1514 # This should ideally be in _pullbundle2(). However, it needs to run
1515 # before discovery to avoid extra work.
1516 _maybeapplyclonebundle(pullop)
1517 streamclone.maybeperformlegacystreamclone(pullop)
1518 _pulldiscovery(pullop)
1519 if pullop.canusebundle2:
1520 _fullpullbundle2(repo, pullop)
1521 _pullchangeset(pullop)
1522 _pullphase(pullop)
1523 _pullbookmarks(pullop)
1524 _pullobsolete(pullop)
1520 1525
1521 1526 # storing remotenames
1522 1527 if repo.ui.configbool('experimental', 'remotenames'):
1523 1528 logexchange.pullremotenames(repo, remote)
1524 1529
1525 1530 return pullop
1526 1531
1527 1532 # list of steps to perform discovery before pull
1528 1533 pulldiscoveryorder = []
1529 1534
1530 1535 # Mapping between step name and function
1531 1536 #
1532 1537 # This exists to help extensions wrap steps if necessary
1533 1538 pulldiscoverymapping = {}
1534 1539
1535 1540 def pulldiscovery(stepname):
1536 1541 """decorator for function performing discovery before pull
1537 1542
1538 1543 The function is added to the step -> function mapping and appended to the
1539 1544 list of steps. Beware that decorated function will be added in order (this
1540 1545 may matter).
1541 1546
1542 1547 You can only use this decorator for a new step, if you want to wrap a step
1543 1548 from an extension, change the pulldiscovery dictionary directly."""
1544 1549 def dec(func):
1545 1550 assert stepname not in pulldiscoverymapping
1546 1551 pulldiscoverymapping[stepname] = func
1547 1552 pulldiscoveryorder.append(stepname)
1548 1553 return func
1549 1554 return dec
1550 1555
1551 1556 def _pulldiscovery(pullop):
1552 1557 """Run all discovery steps"""
1553 1558 for stepname in pulldiscoveryorder:
1554 1559 step = pulldiscoverymapping[stepname]
1555 1560 step(pullop)
1556 1561
1557 1562 @pulldiscovery('b1:bookmarks')
1558 1563 def _pullbookmarkbundle1(pullop):
1559 1564 """fetch bookmark data in bundle1 case
1560 1565
1561 1566 If not using bundle2, we have to fetch bookmarks before changeset
1562 1567 discovery to reduce the chance and impact of race conditions."""
1563 1568 if pullop.remotebookmarks is not None:
1564 1569 return
1565 1570 if pullop.canusebundle2 and 'listkeys' in pullop.remotebundle2caps:
1566 1571 # all known bundle2 servers now support listkeys, but lets be nice with
1567 1572 # new implementation.
1568 1573 return
1569 1574 books = listkeys(pullop.remote, 'bookmarks')
1570 1575 pullop.remotebookmarks = bookmod.unhexlifybookmarks(books)
1571 1576
1572 1577
1573 1578 @pulldiscovery('changegroup')
1574 1579 def _pulldiscoverychangegroup(pullop):
1575 1580 """discovery phase for the pull
1576 1581
1577 1582 Current handle changeset discovery only, will change handle all discovery
1578 1583 at some point."""
1579 1584 tmp = discovery.findcommonincoming(pullop.repo,
1580 1585 pullop.remote,
1581 1586 heads=pullop.heads,
1582 1587 force=pullop.force)
1583 1588 common, fetch, rheads = tmp
1584 1589 nm = pullop.repo.unfiltered().changelog.nodemap
1585 1590 if fetch and rheads:
1586 1591 # If a remote heads is filtered locally, put in back in common.
1587 1592 #
1588 1593 # This is a hackish solution to catch most of "common but locally
1589 1594 # hidden situation". We do not performs discovery on unfiltered
1590 1595 # repository because it end up doing a pathological amount of round
1591 1596 # trip for w huge amount of changeset we do not care about.
1592 1597 #
1593 1598 # If a set of such "common but filtered" changeset exist on the server
1594 1599 # but are not including a remote heads, we'll not be able to detect it,
1595 1600 scommon = set(common)
1596 1601 for n in rheads:
1597 1602 if n in nm:
1598 1603 if n not in scommon:
1599 1604 common.append(n)
1600 1605 if set(rheads).issubset(set(common)):
1601 1606 fetch = []
1602 1607 pullop.common = common
1603 1608 pullop.fetch = fetch
1604 1609 pullop.rheads = rheads
1605 1610
1606 1611 def _pullbundle2(pullop):
1607 1612 """pull data using bundle2
1608 1613
1609 1614 For now, the only supported data are changegroup."""
1610 1615 kwargs = {'bundlecaps': caps20to10(pullop.repo, role='client')}
1611 1616
1612 1617 # make ui easier to access
1613 1618 ui = pullop.repo.ui
1614 1619
1615 1620 # At the moment we don't do stream clones over bundle2. If that is
1616 1621 # implemented then here's where the check for that will go.
1617 1622 streaming = streamclone.canperformstreamclone(pullop, bundle2=True)[0]
1618 1623
1619 1624 # declare pull perimeters
1620 1625 kwargs['common'] = pullop.common
1621 1626 kwargs['heads'] = pullop.heads or pullop.rheads
1622 1627
1623 1628 if streaming:
1624 1629 kwargs['cg'] = False
1625 1630 kwargs['stream'] = True
1626 1631 pullop.stepsdone.add('changegroup')
1627 1632 pullop.stepsdone.add('phases')
1628 1633
1629 1634 else:
1630 1635 # pulling changegroup
1631 1636 pullop.stepsdone.add('changegroup')
1632 1637
1633 1638 kwargs['cg'] = pullop.fetch
1634 1639
1635 1640 legacyphase = 'phases' in ui.configlist('devel', 'legacy.exchange')
1636 1641 hasbinaryphase = 'heads' in pullop.remotebundle2caps.get('phases', ())
1637 1642 if (not legacyphase and hasbinaryphase):
1638 1643 kwargs['phases'] = True
1639 1644 pullop.stepsdone.add('phases')
1640 1645
1641 1646 if 'listkeys' in pullop.remotebundle2caps:
1642 1647 if 'phases' not in pullop.stepsdone:
1643 1648 kwargs['listkeys'] = ['phases']
1644 1649
1645 1650 bookmarksrequested = False
1646 1651 legacybookmark = 'bookmarks' in ui.configlist('devel', 'legacy.exchange')
1647 1652 hasbinarybook = 'bookmarks' in pullop.remotebundle2caps
1648 1653
1649 1654 if pullop.remotebookmarks is not None:
1650 1655 pullop.stepsdone.add('request-bookmarks')
1651 1656
1652 1657 if ('request-bookmarks' not in pullop.stepsdone
1653 1658 and pullop.remotebookmarks is None
1654 1659 and not legacybookmark and hasbinarybook):
1655 1660 kwargs['bookmarks'] = True
1656 1661 bookmarksrequested = True
1657 1662
1658 1663 if 'listkeys' in pullop.remotebundle2caps:
1659 1664 if 'request-bookmarks' not in pullop.stepsdone:
1660 1665 # make sure to always includes bookmark data when migrating
1661 1666 # `hg incoming --bundle` to using this function.
1662 1667 pullop.stepsdone.add('request-bookmarks')
1663 1668 kwargs.setdefault('listkeys', []).append('bookmarks')
1664 1669
1665 1670 # If this is a full pull / clone and the server supports the clone bundles
1666 1671 # feature, tell the server whether we attempted a clone bundle. The
1667 1672 # presence of this flag indicates the client supports clone bundles. This
1668 1673 # will enable the server to treat clients that support clone bundles
1669 1674 # differently from those that don't.
1670 1675 if (pullop.remote.capable('clonebundles')
1671 1676 and pullop.heads is None and list(pullop.common) == [nullid]):
1672 1677 kwargs['cbattempted'] = pullop.clonebundleattempted
1673 1678
1674 1679 if streaming:
1675 1680 pullop.repo.ui.status(_('streaming all changes\n'))
1676 1681 elif not pullop.fetch:
1677 1682 pullop.repo.ui.status(_("no changes found\n"))
1678 1683 pullop.cgresult = 0
1679 1684 else:
1680 1685 if pullop.heads is None and list(pullop.common) == [nullid]:
1681 1686 pullop.repo.ui.status(_("requesting all changes\n"))
1682 1687 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1683 1688 remoteversions = bundle2.obsmarkersversion(pullop.remotebundle2caps)
1684 1689 if obsolete.commonversion(remoteversions) is not None:
1685 1690 kwargs['obsmarkers'] = True
1686 1691 pullop.stepsdone.add('obsmarkers')
1687 1692 _pullbundle2extraprepare(pullop, kwargs)
1688 1693
1689 1694 with pullop.remote.commandexecutor() as e:
1690 1695 args = dict(kwargs)
1691 1696 args['source'] = 'pull'
1692 1697 bundle = e.callcommand('getbundle', args).result()
1693 1698
1694 1699 try:
1695 1700 op = bundle2.bundleoperation(pullop.repo, pullop.gettransaction,
1696 1701 source='pull')
1697 1702 op.modes['bookmarks'] = 'records'
1698 1703 bundle2.processbundle(pullop.repo, bundle, op=op)
1699 1704 except bundle2.AbortFromPart as exc:
1700 1705 pullop.repo.ui.status(_('remote: abort: %s\n') % exc)
1701 1706 raise error.Abort(_('pull failed on remote'), hint=exc.hint)
1702 1707 except error.BundleValueError as exc:
1703 1708 raise error.Abort(_('missing support for %s') % exc)
1704 1709
1705 1710 if pullop.fetch:
1706 1711 pullop.cgresult = bundle2.combinechangegroupresults(op)
1707 1712
1708 1713 # processing phases change
1709 1714 for namespace, value in op.records['listkeys']:
1710 1715 if namespace == 'phases':
1711 1716 _pullapplyphases(pullop, value)
1712 1717
1713 1718 # processing bookmark update
1714 1719 if bookmarksrequested:
1715 1720 books = {}
1716 1721 for record in op.records['bookmarks']:
1717 1722 books[record['bookmark']] = record["node"]
1718 1723 pullop.remotebookmarks = books
1719 1724 else:
1720 1725 for namespace, value in op.records['listkeys']:
1721 1726 if namespace == 'bookmarks':
1722 1727 pullop.remotebookmarks = bookmod.unhexlifybookmarks(value)
1723 1728
1724 1729 # bookmark data were either already there or pulled in the bundle
1725 1730 if pullop.remotebookmarks is not None:
1726 1731 _pullbookmarks(pullop)
1727 1732
1728 1733 def _pullbundle2extraprepare(pullop, kwargs):
1729 1734 """hook function so that extensions can extend the getbundle call"""
1730 1735
1731 1736 def _pullchangeset(pullop):
1732 1737 """pull changeset from unbundle into the local repo"""
1733 1738 # We delay the open of the transaction as late as possible so we
1734 1739 # don't open transaction for nothing or you break future useful
1735 1740 # rollback call
1736 1741 if 'changegroup' in pullop.stepsdone:
1737 1742 return
1738 1743 pullop.stepsdone.add('changegroup')
1739 1744 if not pullop.fetch:
1740 1745 pullop.repo.ui.status(_("no changes found\n"))
1741 1746 pullop.cgresult = 0
1742 1747 return
1743 1748 tr = pullop.gettransaction()
1744 1749 if pullop.heads is None and list(pullop.common) == [nullid]:
1745 1750 pullop.repo.ui.status(_("requesting all changes\n"))
1746 1751 elif pullop.heads is None and pullop.remote.capable('changegroupsubset'):
1747 1752 # issue1320, avoid a race if remote changed after discovery
1748 1753 pullop.heads = pullop.rheads
1749 1754
1750 1755 if pullop.remote.capable('getbundle'):
1751 1756 # TODO: get bundlecaps from remote
1752 1757 cg = pullop.remote.getbundle('pull', common=pullop.common,
1753 1758 heads=pullop.heads or pullop.rheads)
1754 1759 elif pullop.heads is None:
1755 1760 with pullop.remote.commandexecutor() as e:
1756 1761 cg = e.callcommand('changegroup', {
1757 1762 'nodes': pullop.fetch,
1758 1763 'source': 'pull',
1759 1764 }).result()
1760 1765
1761 1766 elif not pullop.remote.capable('changegroupsubset'):
1762 1767 raise error.Abort(_("partial pull cannot be done because "
1763 1768 "other repository doesn't support "
1764 1769 "changegroupsubset."))
1765 1770 else:
1766 1771 with pullop.remote.commandexecutor() as e:
1767 1772 cg = e.callcommand('changegroupsubset', {
1768 1773 'bases': pullop.fetch,
1769 1774 'heads': pullop.heads,
1770 1775 'source': 'pull',
1771 1776 }).result()
1772 1777
1773 1778 bundleop = bundle2.applybundle(pullop.repo, cg, tr, 'pull',
1774 1779 pullop.remote.url())
1775 1780 pullop.cgresult = bundle2.combinechangegroupresults(bundleop)
1776 1781
1777 1782 def _pullphase(pullop):
1778 1783 # Get remote phases data from remote
1779 1784 if 'phases' in pullop.stepsdone:
1780 1785 return
1781 1786 remotephases = listkeys(pullop.remote, 'phases')
1782 1787 _pullapplyphases(pullop, remotephases)
1783 1788
1784 1789 def _pullapplyphases(pullop, remotephases):
1785 1790 """apply phase movement from observed remote state"""
1786 1791 if 'phases' in pullop.stepsdone:
1787 1792 return
1788 1793 pullop.stepsdone.add('phases')
1789 1794 publishing = bool(remotephases.get('publishing', False))
1790 1795 if remotephases and not publishing:
1791 1796 # remote is new and non-publishing
1792 1797 pheads, _dr = phases.analyzeremotephases(pullop.repo,
1793 1798 pullop.pulledsubset,
1794 1799 remotephases)
1795 1800 dheads = pullop.pulledsubset
1796 1801 else:
1797 1802 # Remote is old or publishing all common changesets
1798 1803 # should be seen as public
1799 1804 pheads = pullop.pulledsubset
1800 1805 dheads = []
1801 1806 unfi = pullop.repo.unfiltered()
1802 1807 phase = unfi._phasecache.phase
1803 1808 rev = unfi.changelog.nodemap.get
1804 1809 public = phases.public
1805 1810 draft = phases.draft
1806 1811
1807 1812 # exclude changesets already public locally and update the others
1808 1813 pheads = [pn for pn in pheads if phase(unfi, rev(pn)) > public]
1809 1814 if pheads:
1810 1815 tr = pullop.gettransaction()
1811 1816 phases.advanceboundary(pullop.repo, tr, public, pheads)
1812 1817
1813 1818 # exclude changesets already draft locally and update the others
1814 1819 dheads = [pn for pn in dheads if phase(unfi, rev(pn)) > draft]
1815 1820 if dheads:
1816 1821 tr = pullop.gettransaction()
1817 1822 phases.advanceboundary(pullop.repo, tr, draft, dheads)
1818 1823
1819 1824 def _pullbookmarks(pullop):
1820 1825 """process the remote bookmark information to update the local one"""
1821 1826 if 'bookmarks' in pullop.stepsdone:
1822 1827 return
1823 1828 pullop.stepsdone.add('bookmarks')
1824 1829 repo = pullop.repo
1825 1830 remotebookmarks = pullop.remotebookmarks
1826 1831 bookmod.updatefromremote(repo.ui, repo, remotebookmarks,
1827 1832 pullop.remote.url(),
1828 1833 pullop.gettransaction,
1829 1834 explicit=pullop.explicitbookmarks)
1830 1835
1831 1836 def _pullobsolete(pullop):
1832 1837 """utility function to pull obsolete markers from a remote
1833 1838
1834 1839 The `gettransaction` is function that return the pull transaction, creating
1835 1840 one if necessary. We return the transaction to inform the calling code that
1836 1841 a new transaction have been created (when applicable).
1837 1842
1838 1843 Exists mostly to allow overriding for experimentation purpose"""
1839 1844 if 'obsmarkers' in pullop.stepsdone:
1840 1845 return
1841 1846 pullop.stepsdone.add('obsmarkers')
1842 1847 tr = None
1843 1848 if obsolete.isenabled(pullop.repo, obsolete.exchangeopt):
1844 1849 pullop.repo.ui.debug('fetching remote obsolete markers\n')
1845 1850 remoteobs = listkeys(pullop.remote, 'obsolete')
1846 1851 if 'dump0' in remoteobs:
1847 1852 tr = pullop.gettransaction()
1848 1853 markers = []
1849 1854 for key in sorted(remoteobs, reverse=True):
1850 1855 if key.startswith('dump'):
1851 1856 data = util.b85decode(remoteobs[key])
1852 1857 version, newmarks = obsolete._readmarkers(data)
1853 1858 markers += newmarks
1854 1859 if markers:
1855 1860 pullop.repo.obsstore.add(tr, markers)
1856 1861 pullop.repo.invalidatevolatilesets()
1857 1862 return tr
1858 1863
1859 1864 def applynarrowacl(repo, kwargs):
1860 1865 """Apply narrow fetch access control.
1861 1866
1862 1867 This massages the named arguments for getbundle wire protocol commands
1863 1868 so requested data is filtered through access control rules.
1864 1869 """
1865 1870 ui = repo.ui
1866 1871 # TODO this assumes existence of HTTP and is a layering violation.
1867 1872 username = ui.shortuser(ui.environ.get('REMOTE_USER') or ui.username())
1868 1873 user_includes = ui.configlist(
1869 1874 _NARROWACL_SECTION, username + '.includes',
1870 1875 ui.configlist(_NARROWACL_SECTION, 'default.includes'))
1871 1876 user_excludes = ui.configlist(
1872 1877 _NARROWACL_SECTION, username + '.excludes',
1873 1878 ui.configlist(_NARROWACL_SECTION, 'default.excludes'))
1874 1879 if not user_includes:
1875 1880 raise error.Abort(_("{} configuration for user {} is empty")
1876 1881 .format(_NARROWACL_SECTION, username))
1877 1882
1878 1883 user_includes = [
1879 1884 'path:.' if p == '*' else 'path:' + p for p in user_includes]
1880 1885 user_excludes = [
1881 1886 'path:.' if p == '*' else 'path:' + p for p in user_excludes]
1882 1887
1883 1888 req_includes = set(kwargs.get(r'includepats', []))
1884 1889 req_excludes = set(kwargs.get(r'excludepats', []))
1885 1890
1886 1891 req_includes, req_excludes, invalid_includes = narrowspec.restrictpatterns(
1887 1892 req_includes, req_excludes, user_includes, user_excludes)
1888 1893
1889 1894 if invalid_includes:
1890 1895 raise error.Abort(
1891 1896 _("The following includes are not accessible for {}: {}")
1892 1897 .format(username, invalid_includes))
1893 1898
1894 1899 new_args = {}
1895 1900 new_args.update(kwargs)
1896 1901 new_args[r'narrow'] = True
1897 1902 new_args[r'includepats'] = req_includes
1898 1903 if req_excludes:
1899 1904 new_args[r'excludepats'] = req_excludes
1900 1905
1901 1906 return new_args
1902 1907
1903 1908 def _computeellipsis(repo, common, heads, known, match, depth=None):
1904 1909 """Compute the shape of a narrowed DAG.
1905 1910
1906 1911 Args:
1907 1912 repo: The repository we're transferring.
1908 1913 common: The roots of the DAG range we're transferring.
1909 1914 May be just [nullid], which means all ancestors of heads.
1910 1915 heads: The heads of the DAG range we're transferring.
1911 1916 match: The narrowmatcher that allows us to identify relevant changes.
1912 1917 depth: If not None, only consider nodes to be full nodes if they are at
1913 1918 most depth changesets away from one of heads.
1914 1919
1915 1920 Returns:
1916 1921 A tuple of (visitnodes, relevant_nodes, ellipsisroots) where:
1917 1922
1918 1923 visitnodes: The list of nodes (either full or ellipsis) which
1919 1924 need to be sent to the client.
1920 1925 relevant_nodes: The set of changelog nodes which change a file inside
1921 1926 the narrowspec. The client needs these as non-ellipsis nodes.
1922 1927 ellipsisroots: A dict of {rev: parents} that is used in
1923 1928 narrowchangegroup to produce ellipsis nodes with the
1924 1929 correct parents.
1925 1930 """
1926 1931 cl = repo.changelog
1927 1932 mfl = repo.manifestlog
1928 1933
1929 1934 clrev = cl.rev
1930 1935
1931 1936 commonrevs = {clrev(n) for n in common} | {nullrev}
1932 1937 headsrevs = {clrev(n) for n in heads}
1933 1938
1934 1939 if depth:
1935 1940 revdepth = {h: 0 for h in headsrevs}
1936 1941
1937 1942 ellipsisheads = collections.defaultdict(set)
1938 1943 ellipsisroots = collections.defaultdict(set)
1939 1944
1940 1945 def addroot(head, curchange):
1941 1946 """Add a root to an ellipsis head, splitting heads with 3 roots."""
1942 1947 ellipsisroots[head].add(curchange)
1943 1948 # Recursively split ellipsis heads with 3 roots by finding the
1944 1949 # roots' youngest common descendant which is an elided merge commit.
1945 1950 # That descendant takes 2 of the 3 roots as its own, and becomes a
1946 1951 # root of the head.
1947 1952 while len(ellipsisroots[head]) > 2:
1948 1953 child, roots = splithead(head)
1949 1954 splitroots(head, child, roots)
1950 1955 head = child # Recurse in case we just added a 3rd root
1951 1956
1952 1957 def splitroots(head, child, roots):
1953 1958 ellipsisroots[head].difference_update(roots)
1954 1959 ellipsisroots[head].add(child)
1955 1960 ellipsisroots[child].update(roots)
1956 1961 ellipsisroots[child].discard(child)
1957 1962
1958 1963 def splithead(head):
1959 1964 r1, r2, r3 = sorted(ellipsisroots[head])
1960 1965 for nr1, nr2 in ((r2, r3), (r1, r3), (r1, r2)):
1961 1966 mid = repo.revs('sort(merge() & %d::%d & %d::%d, -rev)',
1962 1967 nr1, head, nr2, head)
1963 1968 for j in mid:
1964 1969 if j == nr2:
1965 1970 return nr2, (nr1, nr2)
1966 1971 if j not in ellipsisroots or len(ellipsisroots[j]) < 2:
1967 1972 return j, (nr1, nr2)
1968 1973 raise error.Abort(_('Failed to split up ellipsis node! head: %d, '
1969 1974 'roots: %d %d %d') % (head, r1, r2, r3))
1970 1975
1971 1976 missing = list(cl.findmissingrevs(common=commonrevs, heads=headsrevs))
1972 1977 visit = reversed(missing)
1973 1978 relevant_nodes = set()
1974 1979 visitnodes = [cl.node(m) for m in missing]
1975 1980 required = set(headsrevs) | known
1976 1981 for rev in visit:
1977 1982 clrev = cl.changelogrevision(rev)
1978 1983 ps = [prev for prev in cl.parentrevs(rev) if prev != nullrev]
1979 1984 if depth is not None:
1980 1985 curdepth = revdepth[rev]
1981 1986 for p in ps:
1982 1987 revdepth[p] = min(curdepth + 1, revdepth.get(p, depth + 1))
1983 1988 needed = False
1984 1989 shallow_enough = depth is None or revdepth[rev] <= depth
1985 1990 if shallow_enough:
1986 1991 curmf = mfl[clrev.manifest].read()
1987 1992 if ps:
1988 1993 # We choose to not trust the changed files list in
1989 1994 # changesets because it's not always correct. TODO: could
1990 1995 # we trust it for the non-merge case?
1991 1996 p1mf = mfl[cl.changelogrevision(ps[0]).manifest].read()
1992 1997 needed = bool(curmf.diff(p1mf, match))
1993 1998 if not needed and len(ps) > 1:
1994 1999 # For merge changes, the list of changed files is not
1995 2000 # helpful, since we need to emit the merge if a file
1996 2001 # in the narrow spec has changed on either side of the
1997 2002 # merge. As a result, we do a manifest diff to check.
1998 2003 p2mf = mfl[cl.changelogrevision(ps[1]).manifest].read()
1999 2004 needed = bool(curmf.diff(p2mf, match))
2000 2005 else:
2001 2006 # For a root node, we need to include the node if any
2002 2007 # files in the node match the narrowspec.
2003 2008 needed = any(curmf.walk(match))
2004 2009
2005 2010 if needed:
2006 2011 for head in ellipsisheads[rev]:
2007 2012 addroot(head, rev)
2008 2013 for p in ps:
2009 2014 required.add(p)
2010 2015 relevant_nodes.add(cl.node(rev))
2011 2016 else:
2012 2017 if not ps:
2013 2018 ps = [nullrev]
2014 2019 if rev in required:
2015 2020 for head in ellipsisheads[rev]:
2016 2021 addroot(head, rev)
2017 2022 for p in ps:
2018 2023 ellipsisheads[p].add(rev)
2019 2024 else:
2020 2025 for p in ps:
2021 2026 ellipsisheads[p] |= ellipsisheads[rev]
2022 2027
2023 2028 # add common changesets as roots of their reachable ellipsis heads
2024 2029 for c in commonrevs:
2025 2030 for head in ellipsisheads[c]:
2026 2031 addroot(head, c)
2027 2032 return visitnodes, relevant_nodes, ellipsisroots
2028 2033
2029 2034 def caps20to10(repo, role):
2030 2035 """return a set with appropriate options to use bundle20 during getbundle"""
2031 2036 caps = {'HG20'}
2032 2037 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role=role))
2033 2038 caps.add('bundle2=' + urlreq.quote(capsblob))
2034 2039 return caps
2035 2040
2036 2041 # List of names of steps to perform for a bundle2 for getbundle, order matters.
2037 2042 getbundle2partsorder = []
2038 2043
2039 2044 # Mapping between step name and function
2040 2045 #
2041 2046 # This exists to help extensions wrap steps if necessary
2042 2047 getbundle2partsmapping = {}
2043 2048
2044 2049 def getbundle2partsgenerator(stepname, idx=None):
2045 2050 """decorator for function generating bundle2 part for getbundle
2046 2051
2047 2052 The function is added to the step -> function mapping and appended to the
2048 2053 list of steps. Beware that decorated functions will be added in order
2049 2054 (this may matter).
2050 2055
2051 2056 You can only use this decorator for new steps, if you want to wrap a step
2052 2057 from an extension, attack the getbundle2partsmapping dictionary directly."""
2053 2058 def dec(func):
2054 2059 assert stepname not in getbundle2partsmapping
2055 2060 getbundle2partsmapping[stepname] = func
2056 2061 if idx is None:
2057 2062 getbundle2partsorder.append(stepname)
2058 2063 else:
2059 2064 getbundle2partsorder.insert(idx, stepname)
2060 2065 return func
2061 2066 return dec
2062 2067
2063 2068 def bundle2requested(bundlecaps):
2064 2069 if bundlecaps is not None:
2065 2070 return any(cap.startswith('HG2') for cap in bundlecaps)
2066 2071 return False
2067 2072
2068 2073 def getbundlechunks(repo, source, heads=None, common=None, bundlecaps=None,
2069 2074 **kwargs):
2070 2075 """Return chunks constituting a bundle's raw data.
2071 2076
2072 2077 Could be a bundle HG10 or a bundle HG20 depending on bundlecaps
2073 2078 passed.
2074 2079
2075 2080 Returns a 2-tuple of a dict with metadata about the generated bundle
2076 2081 and an iterator over raw chunks (of varying sizes).
2077 2082 """
2078 2083 kwargs = pycompat.byteskwargs(kwargs)
2079 2084 info = {}
2080 2085 usebundle2 = bundle2requested(bundlecaps)
2081 2086 # bundle10 case
2082 2087 if not usebundle2:
2083 2088 if bundlecaps and not kwargs.get('cg', True):
2084 2089 raise ValueError(_('request for bundle10 must include changegroup'))
2085 2090
2086 2091 if kwargs:
2087 2092 raise ValueError(_('unsupported getbundle arguments: %s')
2088 2093 % ', '.join(sorted(kwargs.keys())))
2089 2094 outgoing = _computeoutgoing(repo, heads, common)
2090 2095 info['bundleversion'] = 1
2091 2096 return info, changegroup.makestream(repo, outgoing, '01', source,
2092 2097 bundlecaps=bundlecaps)
2093 2098
2094 2099 # bundle20 case
2095 2100 info['bundleversion'] = 2
2096 2101 b2caps = {}
2097 2102 for bcaps in bundlecaps:
2098 2103 if bcaps.startswith('bundle2='):
2099 2104 blob = urlreq.unquote(bcaps[len('bundle2='):])
2100 2105 b2caps.update(bundle2.decodecaps(blob))
2101 2106 bundler = bundle2.bundle20(repo.ui, b2caps)
2102 2107
2103 2108 kwargs['heads'] = heads
2104 2109 kwargs['common'] = common
2105 2110
2106 2111 for name in getbundle2partsorder:
2107 2112 func = getbundle2partsmapping[name]
2108 2113 func(bundler, repo, source, bundlecaps=bundlecaps, b2caps=b2caps,
2109 2114 **pycompat.strkwargs(kwargs))
2110 2115
2111 2116 info['prefercompressed'] = bundler.prefercompressed
2112 2117
2113 2118 return info, bundler.getchunks()
2114 2119
2115 2120 @getbundle2partsgenerator('stream2')
2116 2121 def _getbundlestream2(bundler, repo, *args, **kwargs):
2117 2122 return bundle2.addpartbundlestream2(bundler, repo, **kwargs)
2118 2123
2119 2124 @getbundle2partsgenerator('changegroup')
2120 2125 def _getbundlechangegrouppart(bundler, repo, source, bundlecaps=None,
2121 2126 b2caps=None, heads=None, common=None, **kwargs):
2122 2127 """add a changegroup part to the requested bundle"""
2123 2128 if not kwargs.get(r'cg', True):
2124 2129 return
2125 2130
2126 2131 version = '01'
2127 2132 cgversions = b2caps.get('changegroup')
2128 2133 if cgversions: # 3.1 and 3.2 ship with an empty value
2129 2134 cgversions = [v for v in cgversions
2130 2135 if v in changegroup.supportedoutgoingversions(repo)]
2131 2136 if not cgversions:
2132 2137 raise ValueError(_('no common changegroup version'))
2133 2138 version = max(cgversions)
2134 2139
2135 2140 outgoing = _computeoutgoing(repo, heads, common)
2136 2141 if not outgoing.missing:
2137 2142 return
2138 2143
2139 2144 if kwargs.get(r'narrow', False):
2140 2145 include = sorted(filter(bool, kwargs.get(r'includepats', [])))
2141 2146 exclude = sorted(filter(bool, kwargs.get(r'excludepats', [])))
2142 2147 filematcher = narrowspec.match(repo.root, include=include,
2143 2148 exclude=exclude)
2144 2149 else:
2145 2150 filematcher = None
2146 2151
2147 2152 cgstream = changegroup.makestream(repo, outgoing, version, source,
2148 2153 bundlecaps=bundlecaps,
2149 2154 filematcher=filematcher)
2150 2155
2151 2156 part = bundler.newpart('changegroup', data=cgstream)
2152 2157 if cgversions:
2153 2158 part.addparam('version', version)
2154 2159
2155 2160 part.addparam('nbchanges', '%d' % len(outgoing.missing),
2156 2161 mandatory=False)
2157 2162
2158 2163 if 'treemanifest' in repo.requirements:
2159 2164 part.addparam('treemanifest', '1')
2160 2165
2161 2166 if kwargs.get(r'narrow', False) and (include or exclude):
2162 2167 narrowspecpart = bundler.newpart('narrow:spec')
2163 2168 if include:
2164 2169 narrowspecpart.addparam(
2165 2170 'include', '\n'.join(include), mandatory=True)
2166 2171 if exclude:
2167 2172 narrowspecpart.addparam(
2168 2173 'exclude', '\n'.join(exclude), mandatory=True)
2169 2174
2170 2175 @getbundle2partsgenerator('bookmarks')
2171 2176 def _getbundlebookmarkpart(bundler, repo, source, bundlecaps=None,
2172 2177 b2caps=None, **kwargs):
2173 2178 """add a bookmark part to the requested bundle"""
2174 2179 if not kwargs.get(r'bookmarks', False):
2175 2180 return
2176 2181 if 'bookmarks' not in b2caps:
2177 2182 raise ValueError(_('no common bookmarks exchange method'))
2178 2183 books = bookmod.listbinbookmarks(repo)
2179 2184 data = bookmod.binaryencode(books)
2180 2185 if data:
2181 2186 bundler.newpart('bookmarks', data=data)
2182 2187
2183 2188 @getbundle2partsgenerator('listkeys')
2184 2189 def _getbundlelistkeysparts(bundler, repo, source, bundlecaps=None,
2185 2190 b2caps=None, **kwargs):
2186 2191 """add parts containing listkeys namespaces to the requested bundle"""
2187 2192 listkeys = kwargs.get(r'listkeys', ())
2188 2193 for namespace in listkeys:
2189 2194 part = bundler.newpart('listkeys')
2190 2195 part.addparam('namespace', namespace)
2191 2196 keys = repo.listkeys(namespace).items()
2192 2197 part.data = pushkey.encodekeys(keys)
2193 2198
2194 2199 @getbundle2partsgenerator('obsmarkers')
2195 2200 def _getbundleobsmarkerpart(bundler, repo, source, bundlecaps=None,
2196 2201 b2caps=None, heads=None, **kwargs):
2197 2202 """add an obsolescence markers part to the requested bundle"""
2198 2203 if kwargs.get(r'obsmarkers', False):
2199 2204 if heads is None:
2200 2205 heads = repo.heads()
2201 2206 subset = [c.node() for c in repo.set('::%ln', heads)]
2202 2207 markers = repo.obsstore.relevantmarkers(subset)
2203 2208 markers = sorted(markers)
2204 2209 bundle2.buildobsmarkerspart(bundler, markers)
2205 2210
2206 2211 @getbundle2partsgenerator('phases')
2207 2212 def _getbundlephasespart(bundler, repo, source, bundlecaps=None,
2208 2213 b2caps=None, heads=None, **kwargs):
2209 2214 """add phase heads part to the requested bundle"""
2210 2215 if kwargs.get(r'phases', False):
2211 2216 if not 'heads' in b2caps.get('phases'):
2212 2217 raise ValueError(_('no common phases exchange method'))
2213 2218 if heads is None:
2214 2219 heads = repo.heads()
2215 2220
2216 2221 headsbyphase = collections.defaultdict(set)
2217 2222 if repo.publishing():
2218 2223 headsbyphase[phases.public] = heads
2219 2224 else:
2220 2225 # find the appropriate heads to move
2221 2226
2222 2227 phase = repo._phasecache.phase
2223 2228 node = repo.changelog.node
2224 2229 rev = repo.changelog.rev
2225 2230 for h in heads:
2226 2231 headsbyphase[phase(repo, rev(h))].add(h)
2227 2232 seenphases = list(headsbyphase.keys())
2228 2233
2229 2234 # We do not handle anything but public and draft phase for now)
2230 2235 if seenphases:
2231 2236 assert max(seenphases) <= phases.draft
2232 2237
2233 2238 # if client is pulling non-public changesets, we need to find
2234 2239 # intermediate public heads.
2235 2240 draftheads = headsbyphase.get(phases.draft, set())
2236 2241 if draftheads:
2237 2242 publicheads = headsbyphase.get(phases.public, set())
2238 2243
2239 2244 revset = 'heads(only(%ln, %ln) and public())'
2240 2245 extraheads = repo.revs(revset, draftheads, publicheads)
2241 2246 for r in extraheads:
2242 2247 headsbyphase[phases.public].add(node(r))
2243 2248
2244 2249 # transform data in a format used by the encoding function
2245 2250 phasemapping = []
2246 2251 for phase in phases.allphases:
2247 2252 phasemapping.append(sorted(headsbyphase[phase]))
2248 2253
2249 2254 # generate the actual part
2250 2255 phasedata = phases.binaryencode(phasemapping)
2251 2256 bundler.newpart('phase-heads', data=phasedata)
2252 2257
2253 2258 @getbundle2partsgenerator('hgtagsfnodes')
2254 2259 def _getbundletagsfnodes(bundler, repo, source, bundlecaps=None,
2255 2260 b2caps=None, heads=None, common=None,
2256 2261 **kwargs):
2257 2262 """Transfer the .hgtags filenodes mapping.
2258 2263
2259 2264 Only values for heads in this bundle will be transferred.
2260 2265
2261 2266 The part data consists of pairs of 20 byte changeset node and .hgtags
2262 2267 filenodes raw values.
2263 2268 """
2264 2269 # Don't send unless:
2265 2270 # - changeset are being exchanged,
2266 2271 # - the client supports it.
2267 2272 if not (kwargs.get(r'cg', True) and 'hgtagsfnodes' in b2caps):
2268 2273 return
2269 2274
2270 2275 outgoing = _computeoutgoing(repo, heads, common)
2271 2276 bundle2.addparttagsfnodescache(repo, bundler, outgoing)
2272 2277
2273 2278 @getbundle2partsgenerator('cache:rev-branch-cache')
2274 2279 def _getbundlerevbranchcache(bundler, repo, source, bundlecaps=None,
2275 2280 b2caps=None, heads=None, common=None,
2276 2281 **kwargs):
2277 2282 """Transfer the rev-branch-cache mapping
2278 2283
2279 2284 The payload is a series of data related to each branch
2280 2285
2281 2286 1) branch name length
2282 2287 2) number of open heads
2283 2288 3) number of closed heads
2284 2289 4) open heads nodes
2285 2290 5) closed heads nodes
2286 2291 """
2287 2292 # Don't send unless:
2288 2293 # - changeset are being exchanged,
2289 2294 # - the client supports it.
2290 2295 # - narrow bundle isn't in play (not currently compatible).
2291 2296 if (not kwargs.get(r'cg', True)
2292 2297 or 'rev-branch-cache' not in b2caps
2293 2298 or kwargs.get(r'narrow', False)
2294 2299 or repo.ui.has_section(_NARROWACL_SECTION)):
2295 2300 return
2296 2301
2297 2302 outgoing = _computeoutgoing(repo, heads, common)
2298 2303 bundle2.addpartrevbranchcache(repo, bundler, outgoing)
2299 2304
2300 2305 def check_heads(repo, their_heads, context):
2301 2306 """check if the heads of a repo have been modified
2302 2307
2303 2308 Used by peer for unbundling.
2304 2309 """
2305 2310 heads = repo.heads()
2306 2311 heads_hash = hashlib.sha1(''.join(sorted(heads))).digest()
2307 2312 if not (their_heads == ['force'] or their_heads == heads or
2308 2313 their_heads == ['hashed', heads_hash]):
2309 2314 # someone else committed/pushed/unbundled while we
2310 2315 # were transferring data
2311 2316 raise error.PushRaced('repository changed while %s - '
2312 2317 'please try again' % context)
2313 2318
2314 2319 def unbundle(repo, cg, heads, source, url):
2315 2320 """Apply a bundle to a repo.
2316 2321
2317 2322 this function makes sure the repo is locked during the application and have
2318 2323 mechanism to check that no push race occurred between the creation of the
2319 2324 bundle and its application.
2320 2325
2321 2326 If the push was raced as PushRaced exception is raised."""
2322 2327 r = 0
2323 2328 # need a transaction when processing a bundle2 stream
2324 2329 # [wlock, lock, tr] - needs to be an array so nested functions can modify it
2325 2330 lockandtr = [None, None, None]
2326 2331 recordout = None
2327 2332 # quick fix for output mismatch with bundle2 in 3.4
2328 2333 captureoutput = repo.ui.configbool('experimental', 'bundle2-output-capture')
2329 2334 if url.startswith('remote:http:') or url.startswith('remote:https:'):
2330 2335 captureoutput = True
2331 2336 try:
2332 2337 # note: outside bundle1, 'heads' is expected to be empty and this
2333 2338 # 'check_heads' call wil be a no-op
2334 2339 check_heads(repo, heads, 'uploading changes')
2335 2340 # push can proceed
2336 2341 if not isinstance(cg, bundle2.unbundle20):
2337 2342 # legacy case: bundle1 (changegroup 01)
2338 2343 txnname = "\n".join([source, util.hidepassword(url)])
2339 2344 with repo.lock(), repo.transaction(txnname) as tr:
2340 2345 op = bundle2.applybundle(repo, cg, tr, source, url)
2341 2346 r = bundle2.combinechangegroupresults(op)
2342 2347 else:
2343 2348 r = None
2344 2349 try:
2345 2350 def gettransaction():
2346 2351 if not lockandtr[2]:
2347 2352 lockandtr[0] = repo.wlock()
2348 2353 lockandtr[1] = repo.lock()
2349 2354 lockandtr[2] = repo.transaction(source)
2350 2355 lockandtr[2].hookargs['source'] = source
2351 2356 lockandtr[2].hookargs['url'] = url
2352 2357 lockandtr[2].hookargs['bundle2'] = '1'
2353 2358 return lockandtr[2]
2354 2359
2355 2360 # Do greedy locking by default until we're satisfied with lazy
2356 2361 # locking.
2357 2362 if not repo.ui.configbool('experimental', 'bundle2lazylocking'):
2358 2363 gettransaction()
2359 2364
2360 2365 op = bundle2.bundleoperation(repo, gettransaction,
2361 2366 captureoutput=captureoutput,
2362 2367 source='push')
2363 2368 try:
2364 2369 op = bundle2.processbundle(repo, cg, op=op)
2365 2370 finally:
2366 2371 r = op.reply
2367 2372 if captureoutput and r is not None:
2368 2373 repo.ui.pushbuffer(error=True, subproc=True)
2369 2374 def recordout(output):
2370 2375 r.newpart('output', data=output, mandatory=False)
2371 2376 if lockandtr[2] is not None:
2372 2377 lockandtr[2].close()
2373 2378 except BaseException as exc:
2374 2379 exc.duringunbundle2 = True
2375 2380 if captureoutput and r is not None:
2376 2381 parts = exc._bundle2salvagedoutput = r.salvageoutput()
2377 2382 def recordout(output):
2378 2383 part = bundle2.bundlepart('output', data=output,
2379 2384 mandatory=False)
2380 2385 parts.append(part)
2381 2386 raise
2382 2387 finally:
2383 2388 lockmod.release(lockandtr[2], lockandtr[1], lockandtr[0])
2384 2389 if recordout is not None:
2385 2390 recordout(repo.ui.popbuffer())
2386 2391 return r
2387 2392
2388 2393 def _maybeapplyclonebundle(pullop):
2389 2394 """Apply a clone bundle from a remote, if possible."""
2390 2395
2391 2396 repo = pullop.repo
2392 2397 remote = pullop.remote
2393 2398
2394 2399 if not repo.ui.configbool('ui', 'clonebundles'):
2395 2400 return
2396 2401
2397 2402 # Only run if local repo is empty.
2398 2403 if len(repo):
2399 2404 return
2400 2405
2401 2406 if pullop.heads:
2402 2407 return
2403 2408
2404 2409 if not remote.capable('clonebundles'):
2405 2410 return
2406 2411
2407 2412 with remote.commandexecutor() as e:
2408 2413 res = e.callcommand('clonebundles', {}).result()
2409 2414
2410 2415 # If we call the wire protocol command, that's good enough to record the
2411 2416 # attempt.
2412 2417 pullop.clonebundleattempted = True
2413 2418
2414 2419 entries = parseclonebundlesmanifest(repo, res)
2415 2420 if not entries:
2416 2421 repo.ui.note(_('no clone bundles available on remote; '
2417 2422 'falling back to regular clone\n'))
2418 2423 return
2419 2424
2420 2425 entries = filterclonebundleentries(
2421 2426 repo, entries, streamclonerequested=pullop.streamclonerequested)
2422 2427
2423 2428 if not entries:
2424 2429 # There is a thundering herd concern here. However, if a server
2425 2430 # operator doesn't advertise bundles appropriate for its clients,
2426 2431 # they deserve what's coming. Furthermore, from a client's
2427 2432 # perspective, no automatic fallback would mean not being able to
2428 2433 # clone!
2429 2434 repo.ui.warn(_('no compatible clone bundles available on server; '
2430 2435 'falling back to regular clone\n'))
2431 2436 repo.ui.warn(_('(you may want to report this to the server '
2432 2437 'operator)\n'))
2433 2438 return
2434 2439
2435 2440 entries = sortclonebundleentries(repo.ui, entries)
2436 2441
2437 2442 url = entries[0]['URL']
2438 2443 repo.ui.status(_('applying clone bundle from %s\n') % url)
2439 2444 if trypullbundlefromurl(repo.ui, repo, url):
2440 2445 repo.ui.status(_('finished applying clone bundle\n'))
2441 2446 # Bundle failed.
2442 2447 #
2443 2448 # We abort by default to avoid the thundering herd of
2444 2449 # clients flooding a server that was expecting expensive
2445 2450 # clone load to be offloaded.
2446 2451 elif repo.ui.configbool('ui', 'clonebundlefallback'):
2447 2452 repo.ui.warn(_('falling back to normal clone\n'))
2448 2453 else:
2449 2454 raise error.Abort(_('error applying bundle'),
2450 2455 hint=_('if this error persists, consider contacting '
2451 2456 'the server operator or disable clone '
2452 2457 'bundles via '
2453 2458 '"--config ui.clonebundles=false"'))
2454 2459
2455 2460 def parseclonebundlesmanifest(repo, s):
2456 2461 """Parses the raw text of a clone bundles manifest.
2457 2462
2458 2463 Returns a list of dicts. The dicts have a ``URL`` key corresponding
2459 2464 to the URL and other keys are the attributes for the entry.
2460 2465 """
2461 2466 m = []
2462 2467 for line in s.splitlines():
2463 2468 fields = line.split()
2464 2469 if not fields:
2465 2470 continue
2466 2471 attrs = {'URL': fields[0]}
2467 2472 for rawattr in fields[1:]:
2468 2473 key, value = rawattr.split('=', 1)
2469 2474 key = urlreq.unquote(key)
2470 2475 value = urlreq.unquote(value)
2471 2476 attrs[key] = value
2472 2477
2473 2478 # Parse BUNDLESPEC into components. This makes client-side
2474 2479 # preferences easier to specify since you can prefer a single
2475 2480 # component of the BUNDLESPEC.
2476 2481 if key == 'BUNDLESPEC':
2477 2482 try:
2478 2483 bundlespec = parsebundlespec(repo, value)
2479 2484 attrs['COMPRESSION'] = bundlespec.compression
2480 2485 attrs['VERSION'] = bundlespec.version
2481 2486 except error.InvalidBundleSpecification:
2482 2487 pass
2483 2488 except error.UnsupportedBundleSpecification:
2484 2489 pass
2485 2490
2486 2491 m.append(attrs)
2487 2492
2488 2493 return m
2489 2494
2490 2495 def isstreamclonespec(bundlespec):
2491 2496 # Stream clone v1
2492 2497 if (bundlespec.wirecompression == 'UN' and bundlespec.wireversion == 's1'):
2493 2498 return True
2494 2499
2495 2500 # Stream clone v2
2496 2501 if (bundlespec.wirecompression == 'UN' and \
2497 2502 bundlespec.wireversion == '02' and \
2498 2503 bundlespec.contentopts.get('streamv2')):
2499 2504 return True
2500 2505
2501 2506 return False
2502 2507
2503 2508 def filterclonebundleentries(repo, entries, streamclonerequested=False):
2504 2509 """Remove incompatible clone bundle manifest entries.
2505 2510
2506 2511 Accepts a list of entries parsed with ``parseclonebundlesmanifest``
2507 2512 and returns a new list consisting of only the entries that this client
2508 2513 should be able to apply.
2509 2514
2510 2515 There is no guarantee we'll be able to apply all returned entries because
2511 2516 the metadata we use to filter on may be missing or wrong.
2512 2517 """
2513 2518 newentries = []
2514 2519 for entry in entries:
2515 2520 spec = entry.get('BUNDLESPEC')
2516 2521 if spec:
2517 2522 try:
2518 2523 bundlespec = parsebundlespec(repo, spec, strict=True)
2519 2524
2520 2525 # If a stream clone was requested, filter out non-streamclone
2521 2526 # entries.
2522 2527 if streamclonerequested and not isstreamclonespec(bundlespec):
2523 2528 repo.ui.debug('filtering %s because not a stream clone\n' %
2524 2529 entry['URL'])
2525 2530 continue
2526 2531
2527 2532 except error.InvalidBundleSpecification as e:
2528 2533 repo.ui.debug(stringutil.forcebytestr(e) + '\n')
2529 2534 continue
2530 2535 except error.UnsupportedBundleSpecification as e:
2531 2536 repo.ui.debug('filtering %s because unsupported bundle '
2532 2537 'spec: %s\n' % (
2533 2538 entry['URL'], stringutil.forcebytestr(e)))
2534 2539 continue
2535 2540 # If we don't have a spec and requested a stream clone, we don't know
2536 2541 # what the entry is so don't attempt to apply it.
2537 2542 elif streamclonerequested:
2538 2543 repo.ui.debug('filtering %s because cannot determine if a stream '
2539 2544 'clone bundle\n' % entry['URL'])
2540 2545 continue
2541 2546
2542 2547 if 'REQUIRESNI' in entry and not sslutil.hassni:
2543 2548 repo.ui.debug('filtering %s because SNI not supported\n' %
2544 2549 entry['URL'])
2545 2550 continue
2546 2551
2547 2552 newentries.append(entry)
2548 2553
2549 2554 return newentries
2550 2555
2551 2556 class clonebundleentry(object):
2552 2557 """Represents an item in a clone bundles manifest.
2553 2558
2554 2559 This rich class is needed to support sorting since sorted() in Python 3
2555 2560 doesn't support ``cmp`` and our comparison is complex enough that ``key=``
2556 2561 won't work.
2557 2562 """
2558 2563
2559 2564 def __init__(self, value, prefers):
2560 2565 self.value = value
2561 2566 self.prefers = prefers
2562 2567
2563 2568 def _cmp(self, other):
2564 2569 for prefkey, prefvalue in self.prefers:
2565 2570 avalue = self.value.get(prefkey)
2566 2571 bvalue = other.value.get(prefkey)
2567 2572
2568 2573 # Special case for b missing attribute and a matches exactly.
2569 2574 if avalue is not None and bvalue is None and avalue == prefvalue:
2570 2575 return -1
2571 2576
2572 2577 # Special case for a missing attribute and b matches exactly.
2573 2578 if bvalue is not None and avalue is None and bvalue == prefvalue:
2574 2579 return 1
2575 2580
2576 2581 # We can't compare unless attribute present on both.
2577 2582 if avalue is None or bvalue is None:
2578 2583 continue
2579 2584
2580 2585 # Same values should fall back to next attribute.
2581 2586 if avalue == bvalue:
2582 2587 continue
2583 2588
2584 2589 # Exact matches come first.
2585 2590 if avalue == prefvalue:
2586 2591 return -1
2587 2592 if bvalue == prefvalue:
2588 2593 return 1
2589 2594
2590 2595 # Fall back to next attribute.
2591 2596 continue
2592 2597
2593 2598 # If we got here we couldn't sort by attributes and prefers. Fall
2594 2599 # back to index order.
2595 2600 return 0
2596 2601
2597 2602 def __lt__(self, other):
2598 2603 return self._cmp(other) < 0
2599 2604
2600 2605 def __gt__(self, other):
2601 2606 return self._cmp(other) > 0
2602 2607
2603 2608 def __eq__(self, other):
2604 2609 return self._cmp(other) == 0
2605 2610
2606 2611 def __le__(self, other):
2607 2612 return self._cmp(other) <= 0
2608 2613
2609 2614 def __ge__(self, other):
2610 2615 return self._cmp(other) >= 0
2611 2616
2612 2617 def __ne__(self, other):
2613 2618 return self._cmp(other) != 0
2614 2619
2615 2620 def sortclonebundleentries(ui, entries):
2616 2621 prefers = ui.configlist('ui', 'clonebundleprefers')
2617 2622 if not prefers:
2618 2623 return list(entries)
2619 2624
2620 2625 prefers = [p.split('=', 1) for p in prefers]
2621 2626
2622 2627 items = sorted(clonebundleentry(v, prefers) for v in entries)
2623 2628 return [i.value for i in items]
2624 2629
2625 2630 def trypullbundlefromurl(ui, repo, url):
2626 2631 """Attempt to apply a bundle from a URL."""
2627 2632 with repo.lock(), repo.transaction('bundleurl') as tr:
2628 2633 try:
2629 2634 fh = urlmod.open(ui, url)
2630 2635 cg = readbundle(ui, fh, 'stream')
2631 2636
2632 2637 if isinstance(cg, streamclone.streamcloneapplier):
2633 2638 cg.apply(repo)
2634 2639 else:
2635 2640 bundle2.applybundle(repo, cg, tr, 'clonebundles', url)
2636 2641 return True
2637 2642 except urlerr.httperror as e:
2638 2643 ui.warn(_('HTTP error fetching bundle: %s\n') %
2639 2644 stringutil.forcebytestr(e))
2640 2645 except urlerr.urlerror as e:
2641 2646 ui.warn(_('error fetching bundle: %s\n') %
2642 2647 stringutil.forcebytestr(e.reason))
2643 2648
2644 2649 return False
@@ -1,1006 +1,1007
1 1 # httppeer.py - HTTP repository proxy classes for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import errno
12 12 import io
13 13 import os
14 14 import socket
15 15 import struct
16 16 import weakref
17 17
18 18 from .i18n import _
19 19 from . import (
20 20 bundle2,
21 21 error,
22 22 httpconnection,
23 23 pycompat,
24 24 repository,
25 25 statichttprepo,
26 26 url as urlmod,
27 27 util,
28 28 wireprotoframing,
29 29 wireprototypes,
30 30 wireprotov1peer,
31 31 wireprotov2peer,
32 32 wireprotov2server,
33 33 )
34 34 from .utils import (
35 35 cborutil,
36 36 interfaceutil,
37 37 stringutil,
38 38 )
39 39
40 40 httplib = util.httplib
41 41 urlerr = util.urlerr
42 42 urlreq = util.urlreq
43 43
44 44 def encodevalueinheaders(value, header, limit):
45 45 """Encode a string value into multiple HTTP headers.
46 46
47 47 ``value`` will be encoded into 1 or more HTTP headers with the names
48 48 ``header-<N>`` where ``<N>`` is an integer starting at 1. Each header
49 49 name + value will be at most ``limit`` bytes long.
50 50
51 51 Returns an iterable of 2-tuples consisting of header names and
52 52 values as native strings.
53 53 """
54 54 # HTTP Headers are ASCII. Python 3 requires them to be unicodes,
55 55 # not bytes. This function always takes bytes in as arguments.
56 56 fmt = pycompat.strurl(header) + r'-%s'
57 57 # Note: it is *NOT* a bug that the last bit here is a bytestring
58 58 # and not a unicode: we're just getting the encoded length anyway,
59 59 # and using an r-string to make it portable between Python 2 and 3
60 60 # doesn't work because then the \r is a literal backslash-r
61 61 # instead of a carriage return.
62 62 valuelen = limit - len(fmt % r'000') - len(': \r\n')
63 63 result = []
64 64
65 65 n = 0
66 66 for i in pycompat.xrange(0, len(value), valuelen):
67 67 n += 1
68 68 result.append((fmt % str(n), pycompat.strurl(value[i:i + valuelen])))
69 69
70 70 return result
71 71
72 72 def _wraphttpresponse(resp):
73 73 """Wrap an HTTPResponse with common error handlers.
74 74
75 75 This ensures that any I/O from any consumer raises the appropriate
76 76 error and messaging.
77 77 """
78 78 origread = resp.read
79 79
80 80 class readerproxy(resp.__class__):
81 81 def read(self, size=None):
82 82 try:
83 83 return origread(size)
84 84 except httplib.IncompleteRead as e:
85 85 # e.expected is an integer if length known or None otherwise.
86 86 if e.expected:
87 87 got = len(e.partial)
88 88 total = e.expected + got
89 89 msg = _('HTTP request error (incomplete response; '
90 90 'expected %d bytes got %d)') % (total, got)
91 91 else:
92 92 msg = _('HTTP request error (incomplete response)')
93 93
94 94 raise error.PeerTransportError(
95 95 msg,
96 96 hint=_('this may be an intermittent network failure; '
97 97 'if the error persists, consider contacting the '
98 98 'network or server operator'))
99 99 except httplib.HTTPException as e:
100 100 raise error.PeerTransportError(
101 101 _('HTTP request error (%s)') % e,
102 102 hint=_('this may be an intermittent network failure; '
103 103 'if the error persists, consider contacting the '
104 104 'network or server operator'))
105 105
106 106 resp.__class__ = readerproxy
107 107
108 108 class _multifile(object):
109 109 def __init__(self, *fileobjs):
110 110 for f in fileobjs:
111 111 if not util.safehasattr(f, 'length'):
112 112 raise ValueError(
113 113 '_multifile only supports file objects that '
114 114 'have a length but this one does not:', type(f), f)
115 115 self._fileobjs = fileobjs
116 116 self._index = 0
117 117
118 118 @property
119 119 def length(self):
120 120 return sum(f.length for f in self._fileobjs)
121 121
122 122 def read(self, amt=None):
123 123 if amt <= 0:
124 124 return ''.join(f.read() for f in self._fileobjs)
125 125 parts = []
126 126 while amt and self._index < len(self._fileobjs):
127 127 parts.append(self._fileobjs[self._index].read(amt))
128 128 got = len(parts[-1])
129 129 if got < amt:
130 130 self._index += 1
131 131 amt -= got
132 132 return ''.join(parts)
133 133
134 134 def seek(self, offset, whence=os.SEEK_SET):
135 135 if whence != os.SEEK_SET:
136 136 raise NotImplementedError(
137 137 '_multifile does not support anything other'
138 138 ' than os.SEEK_SET for whence on seek()')
139 139 if offset != 0:
140 140 raise NotImplementedError(
141 141 '_multifile only supports seeking to start, but that '
142 142 'could be fixed if you need it')
143 143 for f in self._fileobjs:
144 144 f.seek(0)
145 145 self._index = 0
146 146
147 147 def makev1commandrequest(ui, requestbuilder, caps, capablefn,
148 148 repobaseurl, cmd, args):
149 149 """Make an HTTP request to run a command for a version 1 client.
150 150
151 151 ``caps`` is a set of known server capabilities. The value may be
152 152 None if capabilities are not yet known.
153 153
154 154 ``capablefn`` is a function to evaluate a capability.
155 155
156 156 ``cmd``, ``args``, and ``data`` define the command, its arguments, and
157 157 raw data to pass to it.
158 158 """
159 159 if cmd == 'pushkey':
160 160 args['data'] = ''
161 161 data = args.pop('data', None)
162 162 headers = args.pop('headers', {})
163 163
164 164 ui.debug("sending %s command\n" % cmd)
165 165 q = [('cmd', cmd)]
166 166 headersize = 0
167 167 # Important: don't use self.capable() here or else you end up
168 168 # with infinite recursion when trying to look up capabilities
169 169 # for the first time.
170 170 postargsok = caps is not None and 'httppostargs' in caps
171 171
172 172 # Send arguments via POST.
173 173 if postargsok and args:
174 174 strargs = urlreq.urlencode(sorted(args.items()))
175 175 if not data:
176 176 data = strargs
177 177 else:
178 178 if isinstance(data, bytes):
179 179 i = io.BytesIO(data)
180 180 i.length = len(data)
181 181 data = i
182 182 argsio = io.BytesIO(strargs)
183 183 argsio.length = len(strargs)
184 184 data = _multifile(argsio, data)
185 185 headers[r'X-HgArgs-Post'] = len(strargs)
186 186 elif args:
187 187 # Calling self.capable() can infinite loop if we are calling
188 188 # "capabilities". But that command should never accept wire
189 189 # protocol arguments. So this should never happen.
190 190 assert cmd != 'capabilities'
191 191 httpheader = capablefn('httpheader')
192 192 if httpheader:
193 193 headersize = int(httpheader.split(',', 1)[0])
194 194
195 195 # Send arguments via HTTP headers.
196 196 if headersize > 0:
197 197 # The headers can typically carry more data than the URL.
198 198 encargs = urlreq.urlencode(sorted(args.items()))
199 199 for header, value in encodevalueinheaders(encargs, 'X-HgArg',
200 200 headersize):
201 201 headers[header] = value
202 202 # Send arguments via query string (Mercurial <1.9).
203 203 else:
204 204 q += sorted(args.items())
205 205
206 206 qs = '?%s' % urlreq.urlencode(q)
207 207 cu = "%s%s" % (repobaseurl, qs)
208 208 size = 0
209 209 if util.safehasattr(data, 'length'):
210 210 size = data.length
211 211 elif data is not None:
212 212 size = len(data)
213 213 if data is not None and r'Content-Type' not in headers:
214 214 headers[r'Content-Type'] = r'application/mercurial-0.1'
215 215
216 216 # Tell the server we accept application/mercurial-0.2 and multiple
217 217 # compression formats if the server is capable of emitting those
218 218 # payloads.
219 219 # Note: Keep this set empty by default, as client advertisement of
220 220 # protocol parameters should only occur after the handshake.
221 221 protoparams = set()
222 222
223 223 mediatypes = set()
224 224 if caps is not None:
225 225 mt = capablefn('httpmediatype')
226 226 if mt:
227 227 protoparams.add('0.1')
228 228 mediatypes = set(mt.split(','))
229 229
230 230 protoparams.add('partial-pull')
231 231
232 232 if '0.2tx' in mediatypes:
233 233 protoparams.add('0.2')
234 234
235 235 if '0.2tx' in mediatypes and capablefn('compression'):
236 236 # We /could/ compare supported compression formats and prune
237 237 # non-mutually supported or error if nothing is mutually supported.
238 238 # For now, send the full list to the server and have it error.
239 239 comps = [e.wireprotosupport().name for e in
240 240 util.compengines.supportedwireengines(util.CLIENTROLE)]
241 241 protoparams.add('comp=%s' % ','.join(comps))
242 242
243 243 if protoparams:
244 244 protoheaders = encodevalueinheaders(' '.join(sorted(protoparams)),
245 245 'X-HgProto',
246 246 headersize or 1024)
247 247 for header, value in protoheaders:
248 248 headers[header] = value
249 249
250 250 varyheaders = []
251 251 for header in headers:
252 252 if header.lower().startswith(r'x-hg'):
253 253 varyheaders.append(header)
254 254
255 255 if varyheaders:
256 256 headers[r'Vary'] = r','.join(sorted(varyheaders))
257 257
258 258 req = requestbuilder(pycompat.strurl(cu), data, headers)
259 259
260 260 if data is not None:
261 261 ui.debug("sending %d bytes\n" % size)
262 262 req.add_unredirected_header(r'Content-Length', r'%d' % size)
263 263
264 264 return req, cu, qs
265 265
266 266 def _reqdata(req):
267 267 """Get request data, if any. If no data, returns None."""
268 268 if pycompat.ispy3:
269 269 return req.data
270 270 if not req.has_data():
271 271 return None
272 272 return req.get_data()
273 273
274 274 def sendrequest(ui, opener, req):
275 275 """Send a prepared HTTP request.
276 276
277 277 Returns the response object.
278 278 """
279 279 dbg = ui.debug
280 280 if (ui.debugflag
281 281 and ui.configbool('devel', 'debug.peer-request')):
282 282 line = 'devel-peer-request: %s\n'
283 283 dbg(line % '%s %s' % (pycompat.bytesurl(req.get_method()),
284 284 pycompat.bytesurl(req.get_full_url())))
285 285 hgargssize = None
286 286
287 287 for header, value in sorted(req.header_items()):
288 288 header = pycompat.bytesurl(header)
289 289 value = pycompat.bytesurl(value)
290 290 if header.startswith('X-hgarg-'):
291 291 if hgargssize is None:
292 292 hgargssize = 0
293 293 hgargssize += len(value)
294 294 else:
295 295 dbg(line % ' %s %s' % (header, value))
296 296
297 297 if hgargssize is not None:
298 298 dbg(line % ' %d bytes of commands arguments in headers'
299 299 % hgargssize)
300 300 data = _reqdata(req)
301 301 if data is not None:
302 302 length = getattr(data, 'length', None)
303 303 if length is None:
304 304 length = len(data)
305 305 dbg(line % ' %d bytes of data' % length)
306 306
307 307 start = util.timer()
308 308
309 309 res = None
310 310 try:
311 311 res = opener.open(req)
312 312 except urlerr.httperror as inst:
313 313 if inst.code == 401:
314 314 raise error.Abort(_('authorization failed'))
315 315 raise
316 316 except httplib.HTTPException as inst:
317 317 ui.debug('http error requesting %s\n' %
318 318 util.hidepassword(req.get_full_url()))
319 319 ui.traceback()
320 320 raise IOError(None, inst)
321 321 finally:
322 322 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
323 323 code = res.code if res else -1
324 324 dbg(line % ' finished in %.4f seconds (%d)'
325 325 % (util.timer() - start, code))
326 326
327 327 # Insert error handlers for common I/O failures.
328 328 _wraphttpresponse(res)
329 329
330 330 return res
331 331
332 332 class RedirectedRepoError(error.RepoError):
333 333 def __init__(self, msg, respurl):
334 334 super(RedirectedRepoError, self).__init__(msg)
335 335 self.respurl = respurl
336 336
337 337 def parsev1commandresponse(ui, baseurl, requrl, qs, resp, compressible,
338 338 allowcbor=False):
339 339 # record the url we got redirected to
340 340 redirected = False
341 341 respurl = pycompat.bytesurl(resp.geturl())
342 342 if respurl.endswith(qs):
343 343 respurl = respurl[:-len(qs)]
344 344 qsdropped = False
345 345 else:
346 346 qsdropped = True
347 347
348 348 if baseurl.rstrip('/') != respurl.rstrip('/'):
349 349 redirected = True
350 350 if not ui.quiet:
351 351 ui.warn(_('real URL is %s\n') % respurl)
352 352
353 353 try:
354 354 proto = pycompat.bytesurl(resp.getheader(r'content-type', r''))
355 355 except AttributeError:
356 356 proto = pycompat.bytesurl(resp.headers.get(r'content-type', r''))
357 357
358 358 safeurl = util.hidepassword(baseurl)
359 359 if proto.startswith('application/hg-error'):
360 360 raise error.OutOfBandError(resp.read())
361 361
362 362 # Pre 1.0 versions of Mercurial used text/plain and
363 363 # application/hg-changegroup. We don't support such old servers.
364 364 if not proto.startswith('application/mercurial-'):
365 365 ui.debug("requested URL: '%s'\n" % util.hidepassword(requrl))
366 366 msg = _("'%s' does not appear to be an hg repository:\n"
367 367 "---%%<--- (%s)\n%s\n---%%<---\n") % (
368 368 safeurl, proto or 'no content-type', resp.read(1024))
369 369
370 370 # Some servers may strip the query string from the redirect. We
371 371 # raise a special error type so callers can react to this specially.
372 372 if redirected and qsdropped:
373 373 raise RedirectedRepoError(msg, respurl)
374 374 else:
375 375 raise error.RepoError(msg)
376 376
377 377 try:
378 378 subtype = proto.split('-', 1)[1]
379 379
380 380 # Unless we end up supporting CBOR in the legacy wire protocol,
381 381 # this should ONLY be encountered for the initial capabilities
382 382 # request during handshake.
383 383 if subtype == 'cbor':
384 384 if allowcbor:
385 385 return respurl, proto, resp
386 386 else:
387 387 raise error.RepoError(_('unexpected CBOR response from '
388 388 'server'))
389 389
390 390 version_info = tuple([int(n) for n in subtype.split('.')])
391 391 except ValueError:
392 392 raise error.RepoError(_("'%s' sent a broken Content-Type "
393 393 "header (%s)") % (safeurl, proto))
394 394
395 395 # TODO consider switching to a decompression reader that uses
396 396 # generators.
397 397 if version_info == (0, 1):
398 398 if compressible:
399 399 resp = util.compengines['zlib'].decompressorreader(resp)
400 400
401 401 elif version_info == (0, 2):
402 402 # application/mercurial-0.2 always identifies the compression
403 403 # engine in the payload header.
404 404 elen = struct.unpack('B', util.readexactly(resp, 1))[0]
405 405 ename = util.readexactly(resp, elen)
406 406 engine = util.compengines.forwiretype(ename)
407 407
408 408 resp = engine.decompressorreader(resp)
409 409 else:
410 410 raise error.RepoError(_("'%s' uses newer protocol %s") %
411 411 (safeurl, subtype))
412 412
413 413 return respurl, proto, resp
414 414
415 415 class httppeer(wireprotov1peer.wirepeer):
416 416 def __init__(self, ui, path, url, opener, requestbuilder, caps):
417 417 self.ui = ui
418 418 self._path = path
419 419 self._url = url
420 420 self._caps = caps
421 421 self._urlopener = opener
422 422 self._requestbuilder = requestbuilder
423 423
424 424 def __del__(self):
425 425 for h in self._urlopener.handlers:
426 426 h.close()
427 427 getattr(h, "close_all", lambda: None)()
428 428
429 429 # Begin of ipeerconnection interface.
430 430
431 431 def url(self):
432 432 return self._path
433 433
434 434 def local(self):
435 435 return None
436 436
437 437 def peer(self):
438 438 return self
439 439
440 440 def canpush(self):
441 441 return True
442 442
443 443 def close(self):
444 444 pass
445 445
446 446 # End of ipeerconnection interface.
447 447
448 448 # Begin of ipeercommands interface.
449 449
450 450 def capabilities(self):
451 451 return self._caps
452 452
453 453 # End of ipeercommands interface.
454 454
455 455 def _callstream(self, cmd, _compressible=False, **args):
456 456 args = pycompat.byteskwargs(args)
457 457
458 458 req, cu, qs = makev1commandrequest(self.ui, self._requestbuilder,
459 459 self._caps, self.capable,
460 460 self._url, cmd, args)
461 461
462 462 resp = sendrequest(self.ui, self._urlopener, req)
463 463
464 464 self._url, ct, resp = parsev1commandresponse(self.ui, self._url, cu, qs,
465 465 resp, _compressible)
466 466
467 467 return resp
468 468
469 469 def _call(self, cmd, **args):
470 470 fp = self._callstream(cmd, **args)
471 471 try:
472 472 return fp.read()
473 473 finally:
474 474 # if using keepalive, allow connection to be reused
475 475 fp.close()
476 476
477 477 def _callpush(self, cmd, cg, **args):
478 478 # have to stream bundle to a temp file because we do not have
479 479 # http 1.1 chunked transfer.
480 480
481 481 types = self.capable('unbundle')
482 482 try:
483 483 types = types.split(',')
484 484 except AttributeError:
485 485 # servers older than d1b16a746db6 will send 'unbundle' as a
486 486 # boolean capability. They only support headerless/uncompressed
487 487 # bundles.
488 488 types = [""]
489 489 for x in types:
490 490 if x in bundle2.bundletypes:
491 491 type = x
492 492 break
493 493
494 494 tempname = bundle2.writebundle(self.ui, cg, None, type)
495 495 fp = httpconnection.httpsendfile(self.ui, tempname, "rb")
496 496 headers = {r'Content-Type': r'application/mercurial-0.1'}
497 497
498 498 try:
499 499 r = self._call(cmd, data=fp, headers=headers, **args)
500 500 vals = r.split('\n', 1)
501 501 if len(vals) < 2:
502 502 raise error.ResponseError(_("unexpected response:"), r)
503 503 return vals
504 504 except urlerr.httperror:
505 505 # Catch and re-raise these so we don't try and treat them
506 506 # like generic socket errors. They lack any values in
507 507 # .args on Python 3 which breaks our socket.error block.
508 508 raise
509 509 except socket.error as err:
510 510 if err.args[0] in (errno.ECONNRESET, errno.EPIPE):
511 511 raise error.Abort(_('push failed: %s') % err.args[1])
512 512 raise error.Abort(err.args[1])
513 513 finally:
514 514 fp.close()
515 515 os.unlink(tempname)
516 516
517 517 def _calltwowaystream(self, cmd, fp, **args):
518 518 fh = None
519 519 fp_ = None
520 520 filename = None
521 521 try:
522 522 # dump bundle to disk
523 523 fd, filename = pycompat.mkstemp(prefix="hg-bundle-", suffix=".hg")
524 524 fh = os.fdopen(fd, r"wb")
525 525 d = fp.read(4096)
526 526 while d:
527 527 fh.write(d)
528 528 d = fp.read(4096)
529 529 fh.close()
530 530 # start http push
531 531 fp_ = httpconnection.httpsendfile(self.ui, filename, "rb")
532 532 headers = {r'Content-Type': r'application/mercurial-0.1'}
533 533 return self._callstream(cmd, data=fp_, headers=headers, **args)
534 534 finally:
535 535 if fp_ is not None:
536 536 fp_.close()
537 537 if fh is not None:
538 538 fh.close()
539 539 os.unlink(filename)
540 540
541 541 def _callcompressable(self, cmd, **args):
542 542 return self._callstream(cmd, _compressible=True, **args)
543 543
544 544 def _abort(self, exception):
545 545 raise exception
546 546
547 547 def sendv2request(ui, opener, requestbuilder, apiurl, permission, requests):
548 548 reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
549 549 buffersends=True)
550 550
551 551 handler = wireprotov2peer.clienthandler(ui, reactor)
552 552
553 553 url = '%s/%s' % (apiurl, permission)
554 554
555 555 if len(requests) > 1:
556 556 url += '/multirequest'
557 557 else:
558 558 url += '/%s' % requests[0][0]
559 559
560 560 ui.debug('sending %d commands\n' % len(requests))
561 561 for command, args, f in requests:
562 562 ui.debug('sending command %s: %s\n' % (
563 563 command, stringutil.pprint(args, indent=2)))
564 564 assert not list(handler.callcommand(command, args, f))
565 565
566 566 # TODO stream this.
567 567 body = b''.join(map(bytes, handler.flushcommands()))
568 568
569 569 # TODO modify user-agent to reflect v2
570 570 headers = {
571 571 r'Accept': wireprotov2server.FRAMINGTYPE,
572 572 r'Content-Type': wireprotov2server.FRAMINGTYPE,
573 573 }
574 574
575 575 req = requestbuilder(pycompat.strurl(url), body, headers)
576 576 req.add_unredirected_header(r'Content-Length', r'%d' % len(body))
577 577
578 578 try:
579 579 res = opener.open(req)
580 580 except urlerr.httperror as e:
581 581 if e.code == 401:
582 582 raise error.Abort(_('authorization failed'))
583 583
584 584 raise
585 585 except httplib.HTTPException as e:
586 586 ui.traceback()
587 587 raise IOError(None, e)
588 588
589 589 return handler, res
590 590
591 591 class queuedcommandfuture(pycompat.futures.Future):
592 592 """Wraps result() on command futures to trigger submission on call."""
593 593
594 594 def result(self, timeout=None):
595 595 if self.done():
596 596 return pycompat.futures.Future.result(self, timeout)
597 597
598 598 self._peerexecutor.sendcommands()
599 599
600 600 # sendcommands() will restore the original __class__ and self.result
601 601 # will resolve to Future.result.
602 602 return self.result(timeout)
603 603
604 604 @interfaceutil.implementer(repository.ipeercommandexecutor)
605 605 class httpv2executor(object):
606 606 def __init__(self, ui, opener, requestbuilder, apiurl, descriptor):
607 607 self._ui = ui
608 608 self._opener = opener
609 609 self._requestbuilder = requestbuilder
610 610 self._apiurl = apiurl
611 611 self._descriptor = descriptor
612 612 self._sent = False
613 613 self._closed = False
614 614 self._neededpermissions = set()
615 615 self._calls = []
616 616 self._futures = weakref.WeakSet()
617 617 self._responseexecutor = None
618 618 self._responsef = None
619 619
620 620 def __enter__(self):
621 621 return self
622 622
623 623 def __exit__(self, exctype, excvalue, exctb):
624 624 self.close()
625 625
626 626 def callcommand(self, command, args):
627 627 if self._sent:
628 628 raise error.ProgrammingError('callcommand() cannot be used after '
629 629 'commands are sent')
630 630
631 631 if self._closed:
632 632 raise error.ProgrammingError('callcommand() cannot be used after '
633 633 'close()')
634 634
635 635 # The service advertises which commands are available. So if we attempt
636 636 # to call an unknown command or pass an unknown argument, we can screen
637 637 # for this.
638 638 if command not in self._descriptor['commands']:
639 639 raise error.ProgrammingError(
640 640 'wire protocol command %s is not available' % command)
641 641
642 642 cmdinfo = self._descriptor['commands'][command]
643 643 unknownargs = set(args.keys()) - set(cmdinfo.get('args', {}))
644 644
645 645 if unknownargs:
646 646 raise error.ProgrammingError(
647 647 'wire protocol command %s does not accept argument: %s' % (
648 648 command, ', '.join(sorted(unknownargs))))
649 649
650 650 self._neededpermissions |= set(cmdinfo['permissions'])
651 651
652 652 # TODO we /could/ also validate types here, since the API descriptor
653 653 # includes types...
654 654
655 655 f = pycompat.futures.Future()
656 656
657 657 # Monkeypatch it so result() triggers sendcommands(), otherwise result()
658 658 # could deadlock.
659 659 f.__class__ = queuedcommandfuture
660 660 f._peerexecutor = self
661 661
662 662 self._futures.add(f)
663 663 self._calls.append((command, args, f))
664 664
665 665 return f
666 666
667 667 def sendcommands(self):
668 668 if self._sent:
669 669 return
670 670
671 671 if not self._calls:
672 672 return
673 673
674 674 self._sent = True
675 675
676 676 # Unhack any future types so caller sees a clean type and so we
677 677 # break reference cycle.
678 678 for f in self._futures:
679 679 if isinstance(f, queuedcommandfuture):
680 680 f.__class__ = pycompat.futures.Future
681 681 f._peerexecutor = None
682 682
683 683 # Mark the future as running and filter out cancelled futures.
684 684 calls = [(command, args, f)
685 685 for command, args, f in self._calls
686 686 if f.set_running_or_notify_cancel()]
687 687
688 688 # Clear out references, prevent improper object usage.
689 689 self._calls = None
690 690
691 691 if not calls:
692 692 return
693 693
694 694 permissions = set(self._neededpermissions)
695 695
696 696 if 'push' in permissions and 'pull' in permissions:
697 697 permissions.remove('pull')
698 698
699 699 if len(permissions) > 1:
700 700 raise error.RepoError(_('cannot make request requiring multiple '
701 701 'permissions: %s') %
702 702 _(', ').join(sorted(permissions)))
703 703
704 704 permission = {
705 705 'push': 'rw',
706 706 'pull': 'ro',
707 707 }[permissions.pop()]
708 708
709 709 handler, resp = sendv2request(
710 710 self._ui, self._opener, self._requestbuilder, self._apiurl,
711 711 permission, calls)
712 712
713 713 # TODO we probably want to validate the HTTP code, media type, etc.
714 714
715 715 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
716 716 self._responsef = self._responseexecutor.submit(self._handleresponse,
717 717 handler, resp)
718 718
719 719 def close(self):
720 720 if self._closed:
721 721 return
722 722
723 723 self.sendcommands()
724 724
725 725 self._closed = True
726 726
727 727 if not self._responsef:
728 728 return
729 729
730 730 # TODO ^C here may not result in immediate program termination.
731 731
732 732 try:
733 733 self._responsef.result()
734 734 finally:
735 735 self._responseexecutor.shutdown(wait=True)
736 736 self._responsef = None
737 737 self._responseexecutor = None
738 738
739 739 # If any of our futures are still in progress, mark them as
740 740 # errored, otherwise a result() could wait indefinitely.
741 741 for f in self._futures:
742 742 if not f.done():
743 743 f.set_exception(error.ResponseError(
744 744 _('unfulfilled command response')))
745 745
746 746 self._futures = None
747 747
748 748 def _handleresponse(self, handler, resp):
749 749 # Called in a thread to read the response.
750 750
751 751 while handler.readframe(resp):
752 752 pass
753 753
754 754 # TODO implement interface for version 2 peers
755 755 @interfaceutil.implementer(repository.ipeerconnection,
756 756 repository.ipeercapabilities,
757 757 repository.ipeerrequests)
758 758 class httpv2peer(object):
759 759 def __init__(self, ui, repourl, apipath, opener, requestbuilder,
760 760 apidescriptor):
761 761 self.ui = ui
762 762
763 763 if repourl.endswith('/'):
764 764 repourl = repourl[:-1]
765 765
766 766 self._url = repourl
767 767 self._apipath = apipath
768 768 self._apiurl = '%s/%s' % (repourl, apipath)
769 769 self._opener = opener
770 770 self._requestbuilder = requestbuilder
771 771 self._descriptor = apidescriptor
772 772
773 773 # Start of ipeerconnection.
774 774
775 775 def url(self):
776 776 return self._url
777 777
778 778 def local(self):
779 779 return None
780 780
781 781 def peer(self):
782 782 return self
783 783
784 784 def canpush(self):
785 785 # TODO change once implemented.
786 786 return False
787 787
788 788 def close(self):
789 789 pass
790 790
791 791 # End of ipeerconnection.
792 792
793 793 # Start of ipeercapabilities.
794 794
795 795 def capable(self, name):
796 796 # The capabilities used internally historically map to capabilities
797 797 # advertised from the "capabilities" wire protocol command. However,
798 798 # version 2 of that command works differently.
799 799
800 800 # Maps to commands that are available.
801 801 if name in ('branchmap', 'getbundle', 'known', 'lookup', 'pushkey'):
802 802 return True
803 803
804 804 # Other concepts.
805 if name in ('bundle2',):
805 # TODO remove exchangev2 once we have a command implemented.
806 if name in ('bundle2', 'exchangev2'):
806 807 return True
807 808
808 809 # Alias command-* to presence of command of that name.
809 810 if name.startswith('command-'):
810 811 return name[len('command-'):] in self._descriptor['commands']
811 812
812 813 return False
813 814
814 815 def requirecap(self, name, purpose):
815 816 if self.capable(name):
816 817 return
817 818
818 819 raise error.CapabilityError(
819 820 _('cannot %s; client or remote repository does not support the %r '
820 821 'capability') % (purpose, name))
821 822
822 823 # End of ipeercapabilities.
823 824
824 825 def _call(self, name, **args):
825 826 with self.commandexecutor() as e:
826 827 return e.callcommand(name, args).result()
827 828
828 829 def commandexecutor(self):
829 830 return httpv2executor(self.ui, self._opener, self._requestbuilder,
830 831 self._apiurl, self._descriptor)
831 832
832 833 # Registry of API service names to metadata about peers that handle it.
833 834 #
834 835 # The following keys are meaningful:
835 836 #
836 837 # init
837 838 # Callable receiving (ui, repourl, servicepath, opener, requestbuilder,
838 839 # apidescriptor) to create a peer.
839 840 #
840 841 # priority
841 842 # Integer priority for the service. If we could choose from multiple
842 843 # services, we choose the one with the highest priority.
843 844 API_PEERS = {
844 845 wireprototypes.HTTP_WIREPROTO_V2: {
845 846 'init': httpv2peer,
846 847 'priority': 50,
847 848 },
848 849 }
849 850
850 851 def performhandshake(ui, url, opener, requestbuilder):
851 852 # The handshake is a request to the capabilities command.
852 853
853 854 caps = None
854 855 def capable(x):
855 856 raise error.ProgrammingError('should not be called')
856 857
857 858 args = {}
858 859
859 860 # The client advertises support for newer protocols by adding an
860 861 # X-HgUpgrade-* header with a list of supported APIs and an
861 862 # X-HgProto-* header advertising which serializing formats it supports.
862 863 # We only support the HTTP version 2 transport and CBOR responses for
863 864 # now.
864 865 advertisev2 = ui.configbool('experimental', 'httppeer.advertise-v2')
865 866
866 867 if advertisev2:
867 868 args['headers'] = {
868 869 r'X-HgProto-1': r'cbor',
869 870 }
870 871
871 872 args['headers'].update(
872 873 encodevalueinheaders(' '.join(sorted(API_PEERS)),
873 874 'X-HgUpgrade',
874 875 # We don't know the header limit this early.
875 876 # So make it small.
876 877 1024))
877 878
878 879 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
879 880 capable, url, 'capabilities',
880 881 args)
881 882 resp = sendrequest(ui, opener, req)
882 883
883 884 # The server may redirect us to the repo root, stripping the
884 885 # ?cmd=capabilities query string from the URL. The server would likely
885 886 # return HTML in this case and ``parsev1commandresponse()`` would raise.
886 887 # We catch this special case and re-issue the capabilities request against
887 888 # the new URL.
888 889 #
889 890 # We should ideally not do this, as a redirect that drops the query
890 891 # string from the URL is arguably a server bug. (Garbage in, garbage out).
891 892 # However, Mercurial clients for several years appeared to handle this
892 893 # issue without behavior degradation. And according to issue 5860, it may
893 894 # be a longstanding bug in some server implementations. So we allow a
894 895 # redirect that drops the query string to "just work."
895 896 try:
896 897 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
897 898 compressible=False,
898 899 allowcbor=advertisev2)
899 900 except RedirectedRepoError as e:
900 901 req, requrl, qs = makev1commandrequest(ui, requestbuilder, caps,
901 902 capable, e.respurl,
902 903 'capabilities', args)
903 904 resp = sendrequest(ui, opener, req)
904 905 respurl, ct, resp = parsev1commandresponse(ui, url, requrl, qs, resp,
905 906 compressible=False,
906 907 allowcbor=advertisev2)
907 908
908 909 try:
909 910 rawdata = resp.read()
910 911 finally:
911 912 resp.close()
912 913
913 914 if not ct.startswith('application/mercurial-'):
914 915 raise error.ProgrammingError('unexpected content-type: %s' % ct)
915 916
916 917 if advertisev2:
917 918 if ct == 'application/mercurial-cbor':
918 919 try:
919 920 info = cborutil.decodeall(rawdata)[0]
920 921 except cborutil.CBORDecodeError:
921 922 raise error.Abort(_('error decoding CBOR from remote server'),
922 923 hint=_('try again and consider contacting '
923 924 'the server operator'))
924 925
925 926 # We got a legacy response. That's fine.
926 927 elif ct in ('application/mercurial-0.1', 'application/mercurial-0.2'):
927 928 info = {
928 929 'v1capabilities': set(rawdata.split())
929 930 }
930 931
931 932 else:
932 933 raise error.RepoError(
933 934 _('unexpected response type from server: %s') % ct)
934 935 else:
935 936 info = {
936 937 'v1capabilities': set(rawdata.split())
937 938 }
938 939
939 940 return respurl, info
940 941
941 942 def makepeer(ui, path, opener=None, requestbuilder=urlreq.request):
942 943 """Construct an appropriate HTTP peer instance.
943 944
944 945 ``opener`` is an ``url.opener`` that should be used to establish
945 946 connections, perform HTTP requests.
946 947
947 948 ``requestbuilder`` is the type used for constructing HTTP requests.
948 949 It exists as an argument so extensions can override the default.
949 950 """
950 951 u = util.url(path)
951 952 if u.query or u.fragment:
952 953 raise error.Abort(_('unsupported URL component: "%s"') %
953 954 (u.query or u.fragment))
954 955
955 956 # urllib cannot handle URLs with embedded user or passwd.
956 957 url, authinfo = u.authinfo()
957 958 ui.debug('using %s\n' % url)
958 959
959 960 opener = opener or urlmod.opener(ui, authinfo)
960 961
961 962 respurl, info = performhandshake(ui, url, opener, requestbuilder)
962 963
963 964 # Given the intersection of APIs that both we and the server support,
964 965 # sort by their advertised priority and pick the first one.
965 966 #
966 967 # TODO consider making this request-based and interface driven. For
967 968 # example, the caller could say "I want a peer that does X." It's quite
968 969 # possible that not all peers would do that. Since we know the service
969 970 # capabilities, we could filter out services not meeting the
970 971 # requirements. Possibly by consulting the interfaces defined by the
971 972 # peer type.
972 973 apipeerchoices = set(info.get('apis', {}).keys()) & set(API_PEERS.keys())
973 974
974 975 preferredchoices = sorted(apipeerchoices,
975 976 key=lambda x: API_PEERS[x]['priority'],
976 977 reverse=True)
977 978
978 979 for service in preferredchoices:
979 980 apipath = '%s/%s' % (info['apibase'].rstrip('/'), service)
980 981
981 982 return API_PEERS[service]['init'](ui, respurl, apipath, opener,
982 983 requestbuilder,
983 984 info['apis'][service])
984 985
985 986 # Failed to construct an API peer. Fall back to legacy.
986 987 return httppeer(ui, path, respurl, opener, requestbuilder,
987 988 info['v1capabilities'])
988 989
989 990 def instance(ui, path, create, intents=None, createopts=None):
990 991 if create:
991 992 raise error.Abort(_('cannot create new http repository'))
992 993 try:
993 994 if path.startswith('https:') and not urlmod.has_https:
994 995 raise error.Abort(_('Python support for SSL and HTTPS '
995 996 'is not installed'))
996 997
997 998 inst = makepeer(ui, path)
998 999
999 1000 return inst
1000 1001 except error.RepoError as httpexception:
1001 1002 try:
1002 1003 r = statichttprepo.instance(ui, "static-" + path, create)
1003 1004 ui.note(_('(falling back to static-http)\n'))
1004 1005 return r
1005 1006 except error.RepoError:
1006 1007 raise httpexception # use the original http RepoError instead
@@ -1,58 +1,65
1 1 HTTPV2=exp-http-v2-0001
2 2 MEDIATYPE=application/mercurial-exp-framing-0005
3 3
4 4 sendhttpraw() {
5 5 hg --verbose debugwireproto --peer raw http://$LOCALIP:$HGPORT/
6 6 }
7 7
8 8 sendhttpv2peer() {
9 9 hg --verbose debugwireproto --nologhandshake --peer http2 http://$LOCALIP:$HGPORT/
10 10 }
11 11
12 12 sendhttpv2peerhandshake() {
13 13 hg --verbose debugwireproto --peer http2 http://$LOCALIP:$HGPORT/
14 14 }
15 15
16 16 cat > dummycommands.py << EOF
17 17 from mercurial import (
18 18 wireprototypes,
19 19 wireprotov1server,
20 20 wireprotov2server,
21 21 )
22 22
23 23 @wireprotov1server.wireprotocommand(b'customreadonly', permission=b'pull')
24 24 def customreadonlyv1(repo, proto):
25 25 return wireprototypes.bytesresponse(b'customreadonly bytes response')
26 26
27 27 @wireprotov2server.wireprotocommand(b'customreadonly', permission=b'pull')
28 28 def customreadonlyv2(repo, proto):
29 29 yield b'customreadonly bytes response'
30 30
31 31 @wireprotov1server.wireprotocommand(b'customreadwrite', permission=b'push')
32 32 def customreadwrite(repo, proto):
33 33 return wireprototypes.bytesresponse(b'customreadwrite bytes response')
34 34
35 35 @wireprotov2server.wireprotocommand(b'customreadwrite', permission=b'push')
36 36 def customreadwritev2(repo, proto):
37 37 yield b'customreadwrite bytes response'
38 38 EOF
39 39
40 40 cat >> $HGRCPATH << EOF
41 41 [extensions]
42 42 drawdag = $TESTDIR/drawdag.py
43 43 EOF
44 44
45 45 enabledummycommands() {
46 46 cat >> $HGRCPATH << EOF
47 47 [extensions]
48 48 dummycommands = $TESTTMP/dummycommands.py
49 49 EOF
50 50 }
51 51
52 52 enablehttpv2() {
53 53 cat >> $1/.hg/hgrc << EOF
54 54 [experimental]
55 55 web.apiserver = true
56 56 web.api.http-v2 = true
57 57 EOF
58 58 }
59
60 enablehttpv2client() {
61 cat >> $HGRCPATH << EOF
62 [experimental]
63 httppeer.advertise-v2 = true
64 EOF
65 }
General Comments 0
You need to be logged in to leave comments. Login now