##// END OF EJS Templates
wireproto: move gboptsmap to wireprototypes and rename (API)...
Gregory Szorc -
r37631:96d73560 default
parent child Browse files
Show More
@@ -1,505 +1,507 b''
1 1 # narrowbundle2.py - bundle2 extensions for narrow repository support
2 2 #
3 3 # Copyright 2017 Google, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import collections
11 11 import errno
12 12 import struct
13 13
14 14 from mercurial.i18n import _
15 15 from mercurial.node import (
16 16 bin,
17 17 nullid,
18 18 nullrev,
19 19 )
20 20 from mercurial import (
21 21 bundle2,
22 22 changegroup,
23 23 dagutil,
24 24 error,
25 25 exchange,
26 26 extensions,
27 27 narrowspec,
28 28 repair,
29 29 util,
30 wireproto,
30 wireprototypes,
31 31 )
32 32 from mercurial.utils import (
33 33 stringutil,
34 34 )
35 35
36 36 NARROWCAP = 'narrow'
37 37 _NARROWACL_SECTION = 'narrowhgacl'
38 38 _CHANGESPECPART = NARROWCAP + ':changespec'
39 39 _SPECPART = NARROWCAP + ':spec'
40 40 _SPECPART_INCLUDE = 'include'
41 41 _SPECPART_EXCLUDE = 'exclude'
42 42 _KILLNODESIGNAL = 'KILL'
43 43 _DONESIGNAL = 'DONE'
44 44 _ELIDEDCSHEADER = '>20s20s20sl' # cset id, p1, p2, len(text)
45 45 _ELIDEDMFHEADER = '>20s20s20s20sl' # manifest id, p1, p2, link id, len(text)
46 46 _CSHEADERSIZE = struct.calcsize(_ELIDEDCSHEADER)
47 47 _MFHEADERSIZE = struct.calcsize(_ELIDEDMFHEADER)
48 48
49 49 # When advertising capabilities, always include narrow clone support.
50 50 def getrepocaps_narrow(orig, repo, **kwargs):
51 51 caps = orig(repo, **kwargs)
52 52 caps[NARROWCAP] = ['v0']
53 53 return caps
54 54
55 55 def _computeellipsis(repo, common, heads, known, match, depth=None):
56 56 """Compute the shape of a narrowed DAG.
57 57
58 58 Args:
59 59 repo: The repository we're transferring.
60 60 common: The roots of the DAG range we're transferring.
61 61 May be just [nullid], which means all ancestors of heads.
62 62 heads: The heads of the DAG range we're transferring.
63 63 match: The narrowmatcher that allows us to identify relevant changes.
64 64 depth: If not None, only consider nodes to be full nodes if they are at
65 65 most depth changesets away from one of heads.
66 66
67 67 Returns:
68 68 A tuple of (visitnodes, relevant_nodes, ellipsisroots) where:
69 69
70 70 visitnodes: The list of nodes (either full or ellipsis) which
71 71 need to be sent to the client.
72 72 relevant_nodes: The set of changelog nodes which change a file inside
73 73 the narrowspec. The client needs these as non-ellipsis nodes.
74 74 ellipsisroots: A dict of {rev: parents} that is used in
75 75 narrowchangegroup to produce ellipsis nodes with the
76 76 correct parents.
77 77 """
78 78 cl = repo.changelog
79 79 mfl = repo.manifestlog
80 80
81 81 cldag = dagutil.revlogdag(cl)
82 82 # dagutil does not like nullid/nullrev
83 83 commonrevs = cldag.internalizeall(common - set([nullid])) | set([nullrev])
84 84 headsrevs = cldag.internalizeall(heads)
85 85 if depth:
86 86 revdepth = {h: 0 for h in headsrevs}
87 87
88 88 ellipsisheads = collections.defaultdict(set)
89 89 ellipsisroots = collections.defaultdict(set)
90 90
91 91 def addroot(head, curchange):
92 92 """Add a root to an ellipsis head, splitting heads with 3 roots."""
93 93 ellipsisroots[head].add(curchange)
94 94 # Recursively split ellipsis heads with 3 roots by finding the
95 95 # roots' youngest common descendant which is an elided merge commit.
96 96 # That descendant takes 2 of the 3 roots as its own, and becomes a
97 97 # root of the head.
98 98 while len(ellipsisroots[head]) > 2:
99 99 child, roots = splithead(head)
100 100 splitroots(head, child, roots)
101 101 head = child # Recurse in case we just added a 3rd root
102 102
103 103 def splitroots(head, child, roots):
104 104 ellipsisroots[head].difference_update(roots)
105 105 ellipsisroots[head].add(child)
106 106 ellipsisroots[child].update(roots)
107 107 ellipsisroots[child].discard(child)
108 108
109 109 def splithead(head):
110 110 r1, r2, r3 = sorted(ellipsisroots[head])
111 111 for nr1, nr2 in ((r2, r3), (r1, r3), (r1, r2)):
112 112 mid = repo.revs('sort(merge() & %d::%d & %d::%d, -rev)',
113 113 nr1, head, nr2, head)
114 114 for j in mid:
115 115 if j == nr2:
116 116 return nr2, (nr1, nr2)
117 117 if j not in ellipsisroots or len(ellipsisroots[j]) < 2:
118 118 return j, (nr1, nr2)
119 119 raise error.Abort('Failed to split up ellipsis node! head: %d, '
120 120 'roots: %d %d %d' % (head, r1, r2, r3))
121 121
122 122 missing = list(cl.findmissingrevs(common=commonrevs, heads=headsrevs))
123 123 visit = reversed(missing)
124 124 relevant_nodes = set()
125 125 visitnodes = [cl.node(m) for m in missing]
126 126 required = set(headsrevs) | known
127 127 for rev in visit:
128 128 clrev = cl.changelogrevision(rev)
129 129 ps = cldag.parents(rev)
130 130 if depth is not None:
131 131 curdepth = revdepth[rev]
132 132 for p in ps:
133 133 revdepth[p] = min(curdepth + 1, revdepth.get(p, depth + 1))
134 134 needed = False
135 135 shallow_enough = depth is None or revdepth[rev] <= depth
136 136 if shallow_enough:
137 137 curmf = mfl[clrev.manifest].read()
138 138 if ps:
139 139 # We choose to not trust the changed files list in
140 140 # changesets because it's not always correct. TODO: could
141 141 # we trust it for the non-merge case?
142 142 p1mf = mfl[cl.changelogrevision(ps[0]).manifest].read()
143 143 needed = bool(curmf.diff(p1mf, match))
144 144 if not needed and len(ps) > 1:
145 145 # For merge changes, the list of changed files is not
146 146 # helpful, since we need to emit the merge if a file
147 147 # in the narrow spec has changed on either side of the
148 148 # merge. As a result, we do a manifest diff to check.
149 149 p2mf = mfl[cl.changelogrevision(ps[1]).manifest].read()
150 150 needed = bool(curmf.diff(p2mf, match))
151 151 else:
152 152 # For a root node, we need to include the node if any
153 153 # files in the node match the narrowspec.
154 154 needed = any(curmf.walk(match))
155 155
156 156 if needed:
157 157 for head in ellipsisheads[rev]:
158 158 addroot(head, rev)
159 159 for p in ps:
160 160 required.add(p)
161 161 relevant_nodes.add(cl.node(rev))
162 162 else:
163 163 if not ps:
164 164 ps = [nullrev]
165 165 if rev in required:
166 166 for head in ellipsisheads[rev]:
167 167 addroot(head, rev)
168 168 for p in ps:
169 169 ellipsisheads[p].add(rev)
170 170 else:
171 171 for p in ps:
172 172 ellipsisheads[p] |= ellipsisheads[rev]
173 173
174 174 # add common changesets as roots of their reachable ellipsis heads
175 175 for c in commonrevs:
176 176 for head in ellipsisheads[c]:
177 177 addroot(head, c)
178 178 return visitnodes, relevant_nodes, ellipsisroots
179 179
180 180 def _packellipsischangegroup(repo, common, match, relevant_nodes,
181 181 ellipsisroots, visitnodes, depth, source, version):
182 182 if version in ('01', '02'):
183 183 raise error.Abort(
184 184 'ellipsis nodes require at least cg3 on client and server, '
185 185 'but negotiated version %s' % version)
186 186 # We wrap cg1packer.revchunk, using a side channel to pass
187 187 # relevant_nodes into that area. Then if linknode isn't in the
188 188 # set, we know we have an ellipsis node and we should defer
189 189 # sending that node's data. We override close() to detect
190 190 # pending ellipsis nodes and flush them.
191 191 packer = changegroup.getbundler(version, repo)
192 192 # Let the packer have access to the narrow matcher so it can
193 193 # omit filelogs and dirlogs as needed
194 194 packer._narrow_matcher = lambda : match
195 195 # Give the packer the list of nodes which should not be
196 196 # ellipsis nodes. We store this rather than the set of nodes
197 197 # that should be an ellipsis because for very large histories
198 198 # we expect this to be significantly smaller.
199 199 packer.full_nodes = relevant_nodes
200 200 # Maps ellipsis revs to their roots at the changelog level.
201 201 packer.precomputed_ellipsis = ellipsisroots
202 202 # Maps CL revs to per-revlog revisions. Cleared in close() at
203 203 # the end of each group.
204 204 packer.clrev_to_localrev = {}
205 205 packer.next_clrev_to_localrev = {}
206 206 # Maps changelog nodes to changelog revs. Filled in once
207 207 # during changelog stage and then left unmodified.
208 208 packer.clnode_to_rev = {}
209 209 packer.changelog_done = False
210 210 # If true, informs the packer that it is serving shallow content and might
211 211 # need to pack file contents not introduced by the changes being packed.
212 212 packer.is_shallow = depth is not None
213 213
214 214 return packer.generate(common, visitnodes, False, source)
215 215
216 216 # Serve a changegroup for a client with a narrow clone.
217 217 def getbundlechangegrouppart_narrow(bundler, repo, source,
218 218 bundlecaps=None, b2caps=None, heads=None,
219 219 common=None, **kwargs):
220 220 cgversions = b2caps.get('changegroup')
221 221 if cgversions: # 3.1 and 3.2 ship with an empty value
222 222 cgversions = [v for v in cgversions
223 223 if v in changegroup.supportedoutgoingversions(repo)]
224 224 if not cgversions:
225 225 raise ValueError(_('no common changegroup version'))
226 226 version = max(cgversions)
227 227 else:
228 228 raise ValueError(_("server does not advertise changegroup version,"
229 229 " can't negotiate support for ellipsis nodes"))
230 230
231 231 include = sorted(filter(bool, kwargs.get(r'includepats', [])))
232 232 exclude = sorted(filter(bool, kwargs.get(r'excludepats', [])))
233 233 newmatch = narrowspec.match(repo.root, include=include, exclude=exclude)
234 234 if not repo.ui.configbool("experimental", "narrowservebrokenellipses"):
235 235 outgoing = exchange._computeoutgoing(repo, heads, common)
236 236 if not outgoing.missing:
237 237 return
238 238 def wrappedgetbundler(orig, *args, **kwargs):
239 239 bundler = orig(*args, **kwargs)
240 240 bundler._narrow_matcher = lambda : newmatch
241 241 return bundler
242 242 with extensions.wrappedfunction(changegroup, 'getbundler',
243 243 wrappedgetbundler):
244 244 cg = changegroup.makestream(repo, outgoing, version, source)
245 245 part = bundler.newpart('changegroup', data=cg)
246 246 part.addparam('version', version)
247 247 if 'treemanifest' in repo.requirements:
248 248 part.addparam('treemanifest', '1')
249 249
250 250 if include or exclude:
251 251 narrowspecpart = bundler.newpart(_SPECPART)
252 252 if include:
253 253 narrowspecpart.addparam(
254 254 _SPECPART_INCLUDE, '\n'.join(include), mandatory=True)
255 255 if exclude:
256 256 narrowspecpart.addparam(
257 257 _SPECPART_EXCLUDE, '\n'.join(exclude), mandatory=True)
258 258
259 259 return
260 260
261 261 depth = kwargs.get(r'depth', None)
262 262 if depth is not None:
263 263 depth = int(depth)
264 264 if depth < 1:
265 265 raise error.Abort(_('depth must be positive, got %d') % depth)
266 266
267 267 heads = set(heads or repo.heads())
268 268 common = set(common or [nullid])
269 269 oldinclude = sorted(filter(bool, kwargs.get(r'oldincludepats', [])))
270 270 oldexclude = sorted(filter(bool, kwargs.get(r'oldexcludepats', [])))
271 271 known = {bin(n) for n in kwargs.get(r'known', [])}
272 272 if known and (oldinclude != include or oldexclude != exclude):
273 273 # Steps:
274 274 # 1. Send kill for "$known & ::common"
275 275 #
276 276 # 2. Send changegroup for ::common
277 277 #
278 278 # 3. Proceed.
279 279 #
280 280 # In the future, we can send kills for only the specific
281 281 # nodes we know should go away or change shape, and then
282 282 # send a data stream that tells the client something like this:
283 283 #
284 284 # a) apply this changegroup
285 285 # b) apply nodes XXX, YYY, ZZZ that you already have
286 286 # c) goto a
287 287 #
288 288 # until they've built up the full new state.
289 289 # Convert to revnums and intersect with "common". The client should
290 290 # have made it a subset of "common" already, but let's be safe.
291 291 known = set(repo.revs("%ln & ::%ln", known, common))
292 292 # TODO: we could send only roots() of this set, and the
293 293 # list of nodes in common, and the client could work out
294 294 # what to strip, instead of us explicitly sending every
295 295 # single node.
296 296 deadrevs = known
297 297 def genkills():
298 298 for r in deadrevs:
299 299 yield _KILLNODESIGNAL
300 300 yield repo.changelog.node(r)
301 301 yield _DONESIGNAL
302 302 bundler.newpart(_CHANGESPECPART, data=genkills())
303 303 newvisit, newfull, newellipsis = _computeellipsis(
304 304 repo, set(), common, known, newmatch)
305 305 if newvisit:
306 306 cg = _packellipsischangegroup(
307 307 repo, common, newmatch, newfull, newellipsis,
308 308 newvisit, depth, source, version)
309 309 part = bundler.newpart('changegroup', data=cg)
310 310 part.addparam('version', version)
311 311 if 'treemanifest' in repo.requirements:
312 312 part.addparam('treemanifest', '1')
313 313
314 314 visitnodes, relevant_nodes, ellipsisroots = _computeellipsis(
315 315 repo, common, heads, set(), newmatch, depth=depth)
316 316
317 317 repo.ui.debug('Found %d relevant revs\n' % len(relevant_nodes))
318 318 if visitnodes:
319 319 cg = _packellipsischangegroup(
320 320 repo, common, newmatch, relevant_nodes, ellipsisroots,
321 321 visitnodes, depth, source, version)
322 322 part = bundler.newpart('changegroup', data=cg)
323 323 part.addparam('version', version)
324 324 if 'treemanifest' in repo.requirements:
325 325 part.addparam('treemanifest', '1')
326 326
327 327 def applyacl_narrow(repo, kwargs):
328 328 ui = repo.ui
329 329 username = ui.shortuser(ui.environ.get('REMOTE_USER') or ui.username())
330 330 user_includes = ui.configlist(
331 331 _NARROWACL_SECTION, username + '.includes',
332 332 ui.configlist(_NARROWACL_SECTION, 'default.includes'))
333 333 user_excludes = ui.configlist(
334 334 _NARROWACL_SECTION, username + '.excludes',
335 335 ui.configlist(_NARROWACL_SECTION, 'default.excludes'))
336 336 if not user_includes:
337 337 raise error.Abort(_("{} configuration for user {} is empty")
338 338 .format(_NARROWACL_SECTION, username))
339 339
340 340 user_includes = [
341 341 'path:.' if p == '*' else 'path:' + p for p in user_includes]
342 342 user_excludes = [
343 343 'path:.' if p == '*' else 'path:' + p for p in user_excludes]
344 344
345 345 req_includes = set(kwargs.get(r'includepats', []))
346 346 req_excludes = set(kwargs.get(r'excludepats', []))
347 347
348 348 req_includes, req_excludes, invalid_includes = narrowspec.restrictpatterns(
349 349 req_includes, req_excludes, user_includes, user_excludes)
350 350
351 351 if invalid_includes:
352 352 raise error.Abort(
353 353 _("The following includes are not accessible for {}: {}")
354 354 .format(username, invalid_includes))
355 355
356 356 new_args = {}
357 357 new_args.update(kwargs)
358 358 new_args['includepats'] = req_includes
359 359 if req_excludes:
360 360 new_args['excludepats'] = req_excludes
361 361 return new_args
362 362
363 363 @bundle2.parthandler(_SPECPART, (_SPECPART_INCLUDE, _SPECPART_EXCLUDE))
364 364 def _handlechangespec_2(op, inpart):
365 365 includepats = set(inpart.params.get(_SPECPART_INCLUDE, '').splitlines())
366 366 excludepats = set(inpart.params.get(_SPECPART_EXCLUDE, '').splitlines())
367 367 if not changegroup.NARROW_REQUIREMENT in op.repo.requirements:
368 368 op.repo.requirements.add(changegroup.NARROW_REQUIREMENT)
369 369 op.repo._writerequirements()
370 370 op.repo.setnarrowpats(includepats, excludepats)
371 371
372 372 @bundle2.parthandler(_CHANGESPECPART)
373 373 def _handlechangespec(op, inpart):
374 374 repo = op.repo
375 375 cl = repo.changelog
376 376
377 377 # changesets which need to be stripped entirely. either they're no longer
378 378 # needed in the new narrow spec, or the server is sending a replacement
379 379 # in the changegroup part.
380 380 clkills = set()
381 381
382 382 # A changespec part contains all the updates to ellipsis nodes
383 383 # that will happen as a result of widening or narrowing a
384 384 # repo. All the changes that this block encounters are ellipsis
385 385 # nodes or flags to kill an existing ellipsis.
386 386 chunksignal = changegroup.readexactly(inpart, 4)
387 387 while chunksignal != _DONESIGNAL:
388 388 if chunksignal == _KILLNODESIGNAL:
389 389 # a node used to be an ellipsis but isn't anymore
390 390 ck = changegroup.readexactly(inpart, 20)
391 391 if cl.hasnode(ck):
392 392 clkills.add(ck)
393 393 else:
394 394 raise error.Abort(
395 395 _('unexpected changespec node chunk type: %s') % chunksignal)
396 396 chunksignal = changegroup.readexactly(inpart, 4)
397 397
398 398 if clkills:
399 399 # preserve bookmarks that repair.strip() would otherwise strip
400 400 bmstore = repo._bookmarks
401 401 class dummybmstore(dict):
402 402 def applychanges(self, repo, tr, changes):
403 403 pass
404 404 def recordchange(self, tr): # legacy version
405 405 pass
406 406 repo._bookmarks = dummybmstore()
407 407 chgrpfile = repair.strip(op.ui, repo, list(clkills), backup=True,
408 408 topic='widen')
409 409 repo._bookmarks = bmstore
410 410 if chgrpfile:
411 411 # presence of _widen_bundle attribute activates widen handler later
412 412 op._widen_bundle = chgrpfile
413 413 # Set the new narrowspec if we're widening. The setnewnarrowpats() method
414 414 # will currently always be there when using the core+narrowhg server, but
415 415 # other servers may include a changespec part even when not widening (e.g.
416 416 # because we're deepening a shallow repo).
417 417 if util.safehasattr(repo, 'setnewnarrowpats'):
418 418 repo.setnewnarrowpats()
419 419
420 420 def handlechangegroup_widen(op, inpart):
421 421 """Changegroup exchange handler which restores temporarily-stripped nodes"""
422 422 # We saved a bundle with stripped node data we must now restore.
423 423 # This approach is based on mercurial/repair.py@6ee26a53c111.
424 424 repo = op.repo
425 425 ui = op.ui
426 426
427 427 chgrpfile = op._widen_bundle
428 428 del op._widen_bundle
429 429 vfs = repo.vfs
430 430
431 431 ui.note(_("adding branch\n"))
432 432 f = vfs.open(chgrpfile, "rb")
433 433 try:
434 434 gen = exchange.readbundle(ui, f, chgrpfile, vfs)
435 435 if not ui.verbose:
436 436 # silence internal shuffling chatter
437 437 ui.pushbuffer()
438 438 if isinstance(gen, bundle2.unbundle20):
439 439 with repo.transaction('strip') as tr:
440 440 bundle2.processbundle(repo, gen, lambda: tr)
441 441 else:
442 442 gen.apply(repo, 'strip', 'bundle:' + vfs.join(chgrpfile), True)
443 443 if not ui.verbose:
444 444 ui.popbuffer()
445 445 finally:
446 446 f.close()
447 447
448 448 # remove undo files
449 449 for undovfs, undofile in repo.undofiles():
450 450 try:
451 451 undovfs.unlink(undofile)
452 452 except OSError as e:
453 453 if e.errno != errno.ENOENT:
454 454 ui.warn(_('error removing %s: %s\n') %
455 455 (undovfs.join(undofile), stringutil.forcebytestr(e)))
456 456
457 457 # Remove partial backup only if there were no exceptions
458 458 vfs.unlink(chgrpfile)
459 459
460 460 def setup():
461 461 """Enable narrow repo support in bundle2-related extension points."""
462 462 extensions.wrapfunction(bundle2, 'getrepocaps', getrepocaps_narrow)
463 463
464 wireproto.gboptsmap['narrow'] = 'boolean'
465 wireproto.gboptsmap['depth'] = 'plain'
466 wireproto.gboptsmap['oldincludepats'] = 'csv'
467 wireproto.gboptsmap['oldexcludepats'] = 'csv'
468 wireproto.gboptsmap['includepats'] = 'csv'
469 wireproto.gboptsmap['excludepats'] = 'csv'
470 wireproto.gboptsmap['known'] = 'csv'
464 getbundleargs = wireprototypes.GETBUNDLE_ARGUMENTS
465
466 getbundleargs['narrow'] = 'boolean'
467 getbundleargs['depth'] = 'plain'
468 getbundleargs['oldincludepats'] = 'csv'
469 getbundleargs['oldexcludepats'] = 'csv'
470 getbundleargs['includepats'] = 'csv'
471 getbundleargs['excludepats'] = 'csv'
472 getbundleargs['known'] = 'csv'
471 473
472 474 # Extend changegroup serving to handle requests from narrow clients.
473 475 origcgfn = exchange.getbundle2partsmapping['changegroup']
474 476 def wrappedcgfn(*args, **kwargs):
475 477 repo = args[1]
476 478 if repo.ui.has_section(_NARROWACL_SECTION):
477 479 getbundlechangegrouppart_narrow(
478 480 *args, **applyacl_narrow(repo, kwargs))
479 481 elif kwargs.get(r'narrow', False):
480 482 getbundlechangegrouppart_narrow(*args, **kwargs)
481 483 else:
482 484 origcgfn(*args, **kwargs)
483 485 exchange.getbundle2partsmapping['changegroup'] = wrappedcgfn
484 486
485 487 # disable rev branch cache exchange when serving a narrow bundle
486 488 # (currently incompatible with that part)
487 489 origrbcfn = exchange.getbundle2partsmapping['cache:rev-branch-cache']
488 490 def wrappedcgfn(*args, **kwargs):
489 491 repo = args[1]
490 492 if repo.ui.has_section(_NARROWACL_SECTION):
491 493 return
492 494 elif kwargs.get(r'narrow', False):
493 495 return
494 496 else:
495 497 origrbcfn(*args, **kwargs)
496 498 exchange.getbundle2partsmapping['cache:rev-branch-cache'] = wrappedcgfn
497 499
498 500 # Extend changegroup receiver so client can fixup after widen requests.
499 501 origcghandler = bundle2.parthandlermapping['changegroup']
500 502 def wrappedcghandler(op, inpart):
501 503 origcghandler(op, inpart)
502 504 if util.safehasattr(op, '_widen_bundle'):
503 505 handlechangegroup_widen(op, inpart)
504 506 wrappedcghandler.params = origcghandler.params
505 507 bundle2.parthandlermapping['changegroup'] = wrappedcghandler
@@ -1,1287 +1,1265 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import os
12 12 import tempfile
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 bin,
17 17 hex,
18 18 nullid,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 discovery,
25 25 encoding,
26 26 error,
27 27 exchange,
28 28 peer,
29 29 pushkey as pushkeymod,
30 30 pycompat,
31 31 repository,
32 32 streamclone,
33 33 util,
34 34 wireprototypes,
35 35 )
36 36
37 37 from .utils import (
38 38 procutil,
39 39 stringutil,
40 40 )
41 41
42 42 urlerr = util.urlerr
43 43 urlreq = util.urlreq
44 44
45 45 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
46 46 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
47 47 'IncompatibleClient')
48 48 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
49 49
50 50 class remoteiterbatcher(peer.iterbatcher):
51 51 def __init__(self, remote):
52 52 super(remoteiterbatcher, self).__init__()
53 53 self._remote = remote
54 54
55 55 def __getattr__(self, name):
56 56 # Validate this method is batchable, since submit() only supports
57 57 # batchable methods.
58 58 fn = getattr(self._remote, name)
59 59 if not getattr(fn, 'batchable', None):
60 60 raise error.ProgrammingError('Attempted to batch a non-batchable '
61 61 'call to %r' % name)
62 62
63 63 return super(remoteiterbatcher, self).__getattr__(name)
64 64
65 65 def submit(self):
66 66 """Break the batch request into many patch calls and pipeline them.
67 67
68 68 This is mostly valuable over http where request sizes can be
69 69 limited, but can be used in other places as well.
70 70 """
71 71 # 2-tuple of (command, arguments) that represents what will be
72 72 # sent over the wire.
73 73 requests = []
74 74
75 75 # 4-tuple of (command, final future, @batchable generator, remote
76 76 # future).
77 77 results = []
78 78
79 79 for command, args, opts, finalfuture in self.calls:
80 80 mtd = getattr(self._remote, command)
81 81 batchable = mtd.batchable(mtd.__self__, *args, **opts)
82 82
83 83 commandargs, fremote = next(batchable)
84 84 assert fremote
85 85 requests.append((command, commandargs))
86 86 results.append((command, finalfuture, batchable, fremote))
87 87
88 88 if requests:
89 89 self._resultiter = self._remote._submitbatch(requests)
90 90
91 91 self._results = results
92 92
93 93 def results(self):
94 94 for command, finalfuture, batchable, remotefuture in self._results:
95 95 # Get the raw result, set it in the remote future, feed it
96 96 # back into the @batchable generator so it can be decoded, and
97 97 # set the result on the final future to this value.
98 98 remoteresult = next(self._resultiter)
99 99 remotefuture.set(remoteresult)
100 100 finalfuture.set(next(batchable))
101 101
102 102 # Verify our @batchable generators only emit 2 values.
103 103 try:
104 104 next(batchable)
105 105 except StopIteration:
106 106 pass
107 107 else:
108 108 raise error.ProgrammingError('%s @batchable generator emitted '
109 109 'unexpected value count' % command)
110 110
111 111 yield finalfuture.value
112 112
113 113 # Forward a couple of names from peer to make wireproto interactions
114 114 # slightly more sensible.
115 115 batchable = peer.batchable
116 116 future = peer.future
117 117
118 118
119 119 def encodebatchcmds(req):
120 120 """Return a ``cmds`` argument value for the ``batch`` command."""
121 121 escapearg = wireprototypes.escapebatcharg
122 122
123 123 cmds = []
124 124 for op, argsdict in req:
125 125 # Old servers didn't properly unescape argument names. So prevent
126 126 # the sending of argument names that may not be decoded properly by
127 127 # servers.
128 128 assert all(escapearg(k) == k for k in argsdict)
129 129
130 130 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
131 131 for k, v in argsdict.iteritems())
132 132 cmds.append('%s %s' % (op, args))
133 133
134 134 return ';'.join(cmds)
135 135
136 136 def clientcompressionsupport(proto):
137 137 """Returns a list of compression methods supported by the client.
138 138
139 139 Returns a list of the compression methods supported by the client
140 140 according to the protocol capabilities. If no such capability has
141 141 been announced, fallback to the default of zlib and uncompressed.
142 142 """
143 143 for cap in proto.getprotocaps():
144 144 if cap.startswith('comp='):
145 145 return cap[5:].split(',')
146 146 return ['zlib', 'none']
147 147
148 # mapping of options accepted by getbundle and their types
149 #
150 # Meant to be extended by extensions. It is extensions responsibility to ensure
151 # such options are properly processed in exchange.getbundle.
152 #
153 # supported types are:
154 #
155 # :nodes: list of binary nodes
156 # :csv: list of comma-separated values
157 # :scsv: list of comma-separated values return as set
158 # :plain: string with no transformation needed.
159 gboptsmap = {'heads': 'nodes',
160 'bookmarks': 'boolean',
161 'common': 'nodes',
162 'obsmarkers': 'boolean',
163 'phases': 'boolean',
164 'bundlecaps': 'scsv',
165 'listkeys': 'csv',
166 'cg': 'boolean',
167 'cbattempted': 'boolean',
168 'stream': 'boolean',
169 }
170
171 148 # client side
172 149
173 150 class wirepeer(repository.legacypeer):
174 151 """Client-side interface for communicating with a peer repository.
175 152
176 153 Methods commonly call wire protocol commands of the same name.
177 154
178 155 See also httppeer.py and sshpeer.py for protocol-specific
179 156 implementations of this interface.
180 157 """
181 158 # Begin of ipeercommands interface.
182 159
183 160 def iterbatch(self):
184 161 return remoteiterbatcher(self)
185 162
186 163 @batchable
187 164 def lookup(self, key):
188 165 self.requirecap('lookup', _('look up remote revision'))
189 166 f = future()
190 167 yield {'key': encoding.fromlocal(key)}, f
191 168 d = f.value
192 169 success, data = d[:-1].split(" ", 1)
193 170 if int(success):
194 171 yield bin(data)
195 172 else:
196 173 self._abort(error.RepoError(data))
197 174
198 175 @batchable
199 176 def heads(self):
200 177 f = future()
201 178 yield {}, f
202 179 d = f.value
203 180 try:
204 181 yield wireprototypes.decodelist(d[:-1])
205 182 except ValueError:
206 183 self._abort(error.ResponseError(_("unexpected response:"), d))
207 184
208 185 @batchable
209 186 def known(self, nodes):
210 187 f = future()
211 188 yield {'nodes': wireprototypes.encodelist(nodes)}, f
212 189 d = f.value
213 190 try:
214 191 yield [bool(int(b)) for b in d]
215 192 except ValueError:
216 193 self._abort(error.ResponseError(_("unexpected response:"), d))
217 194
218 195 @batchable
219 196 def branchmap(self):
220 197 f = future()
221 198 yield {}, f
222 199 d = f.value
223 200 try:
224 201 branchmap = {}
225 202 for branchpart in d.splitlines():
226 203 branchname, branchheads = branchpart.split(' ', 1)
227 204 branchname = encoding.tolocal(urlreq.unquote(branchname))
228 205 branchheads = wireprototypes.decodelist(branchheads)
229 206 branchmap[branchname] = branchheads
230 207 yield branchmap
231 208 except TypeError:
232 209 self._abort(error.ResponseError(_("unexpected response:"), d))
233 210
234 211 @batchable
235 212 def listkeys(self, namespace):
236 213 if not self.capable('pushkey'):
237 214 yield {}, None
238 215 f = future()
239 216 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
240 217 yield {'namespace': encoding.fromlocal(namespace)}, f
241 218 d = f.value
242 219 self.ui.debug('received listkey for "%s": %i bytes\n'
243 220 % (namespace, len(d)))
244 221 yield pushkeymod.decodekeys(d)
245 222
246 223 @batchable
247 224 def pushkey(self, namespace, key, old, new):
248 225 if not self.capable('pushkey'):
249 226 yield False, None
250 227 f = future()
251 228 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
252 229 yield {'namespace': encoding.fromlocal(namespace),
253 230 'key': encoding.fromlocal(key),
254 231 'old': encoding.fromlocal(old),
255 232 'new': encoding.fromlocal(new)}, f
256 233 d = f.value
257 234 d, output = d.split('\n', 1)
258 235 try:
259 236 d = bool(int(d))
260 237 except ValueError:
261 238 raise error.ResponseError(
262 239 _('push failed (unexpected response):'), d)
263 240 for l in output.splitlines(True):
264 241 self.ui.status(_('remote: '), l)
265 242 yield d
266 243
267 244 def stream_out(self):
268 245 return self._callstream('stream_out')
269 246
270 247 def getbundle(self, source, **kwargs):
271 248 kwargs = pycompat.byteskwargs(kwargs)
272 249 self.requirecap('getbundle', _('look up remote changes'))
273 250 opts = {}
274 251 bundlecaps = kwargs.get('bundlecaps') or set()
275 252 for key, value in kwargs.iteritems():
276 253 if value is None:
277 254 continue
278 keytype = gboptsmap.get(key)
255 keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
279 256 if keytype is None:
280 257 raise error.ProgrammingError(
281 258 'Unexpectedly None keytype for key %s' % key)
282 259 elif keytype == 'nodes':
283 260 value = wireprototypes.encodelist(value)
284 261 elif keytype == 'csv':
285 262 value = ','.join(value)
286 263 elif keytype == 'scsv':
287 264 value = ','.join(sorted(value))
288 265 elif keytype == 'boolean':
289 266 value = '%i' % bool(value)
290 267 elif keytype != 'plain':
291 268 raise KeyError('unknown getbundle option type %s'
292 269 % keytype)
293 270 opts[key] = value
294 271 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
295 272 if any((cap.startswith('HG2') for cap in bundlecaps)):
296 273 return bundle2.getunbundler(self.ui, f)
297 274 else:
298 275 return changegroupmod.cg1unpacker(f, 'UN')
299 276
300 277 def unbundle(self, cg, heads, url):
301 278 '''Send cg (a readable file-like object representing the
302 279 changegroup to push, typically a chunkbuffer object) to the
303 280 remote server as a bundle.
304 281
305 282 When pushing a bundle10 stream, return an integer indicating the
306 283 result of the push (see changegroup.apply()).
307 284
308 285 When pushing a bundle20 stream, return a bundle20 stream.
309 286
310 287 `url` is the url the client thinks it's pushing to, which is
311 288 visible to hooks.
312 289 '''
313 290
314 291 if heads != ['force'] and self.capable('unbundlehash'):
315 292 heads = wireprototypes.encodelist(
316 293 ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()])
317 294 else:
318 295 heads = wireprototypes.encodelist(heads)
319 296
320 297 if util.safehasattr(cg, 'deltaheader'):
321 298 # this a bundle10, do the old style call sequence
322 299 ret, output = self._callpush("unbundle", cg, heads=heads)
323 300 if ret == "":
324 301 raise error.ResponseError(
325 302 _('push failed:'), output)
326 303 try:
327 304 ret = int(ret)
328 305 except ValueError:
329 306 raise error.ResponseError(
330 307 _('push failed (unexpected response):'), ret)
331 308
332 309 for l in output.splitlines(True):
333 310 self.ui.status(_('remote: '), l)
334 311 else:
335 312 # bundle2 push. Send a stream, fetch a stream.
336 313 stream = self._calltwowaystream('unbundle', cg, heads=heads)
337 314 ret = bundle2.getunbundler(self.ui, stream)
338 315 return ret
339 316
340 317 # End of ipeercommands interface.
341 318
342 319 # Begin of ipeerlegacycommands interface.
343 320
344 321 def branches(self, nodes):
345 322 n = wireprototypes.encodelist(nodes)
346 323 d = self._call("branches", nodes=n)
347 324 try:
348 325 br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
349 326 return br
350 327 except ValueError:
351 328 self._abort(error.ResponseError(_("unexpected response:"), d))
352 329
353 330 def between(self, pairs):
354 331 batch = 8 # avoid giant requests
355 332 r = []
356 333 for i in xrange(0, len(pairs), batch):
357 334 n = " ".join([wireprototypes.encodelist(p, '-')
358 335 for p in pairs[i:i + batch]])
359 336 d = self._call("between", pairs=n)
360 337 try:
361 338 r.extend(l and wireprototypes.decodelist(l) or []
362 339 for l in d.splitlines())
363 340 except ValueError:
364 341 self._abort(error.ResponseError(_("unexpected response:"), d))
365 342 return r
366 343
367 344 def changegroup(self, nodes, kind):
368 345 n = wireprototypes.encodelist(nodes)
369 346 f = self._callcompressable("changegroup", roots=n)
370 347 return changegroupmod.cg1unpacker(f, 'UN')
371 348
372 349 def changegroupsubset(self, bases, heads, kind):
373 350 self.requirecap('changegroupsubset', _('look up remote changes'))
374 351 bases = wireprototypes.encodelist(bases)
375 352 heads = wireprototypes.encodelist(heads)
376 353 f = self._callcompressable("changegroupsubset",
377 354 bases=bases, heads=heads)
378 355 return changegroupmod.cg1unpacker(f, 'UN')
379 356
380 357 # End of ipeerlegacycommands interface.
381 358
382 359 def _submitbatch(self, req):
383 360 """run batch request <req> on the server
384 361
385 362 Returns an iterator of the raw responses from the server.
386 363 """
387 364 ui = self.ui
388 365 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
389 366 ui.debug('devel-peer-request: batched-content\n')
390 367 for op, args in req:
391 368 msg = 'devel-peer-request: - %s (%d arguments)\n'
392 369 ui.debug(msg % (op, len(args)))
393 370
394 371 unescapearg = wireprototypes.unescapebatcharg
395 372
396 373 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
397 374 chunk = rsp.read(1024)
398 375 work = [chunk]
399 376 while chunk:
400 377 while ';' not in chunk and chunk:
401 378 chunk = rsp.read(1024)
402 379 work.append(chunk)
403 380 merged = ''.join(work)
404 381 while ';' in merged:
405 382 one, merged = merged.split(';', 1)
406 383 yield unescapearg(one)
407 384 chunk = rsp.read(1024)
408 385 work = [merged, chunk]
409 386 yield unescapearg(''.join(work))
410 387
411 388 def _submitone(self, op, args):
412 389 return self._call(op, **pycompat.strkwargs(args))
413 390
414 391 def debugwireargs(self, one, two, three=None, four=None, five=None):
415 392 # don't pass optional arguments left at their default value
416 393 opts = {}
417 394 if three is not None:
418 395 opts[r'three'] = three
419 396 if four is not None:
420 397 opts[r'four'] = four
421 398 return self._call('debugwireargs', one=one, two=two, **opts)
422 399
423 400 def _call(self, cmd, **args):
424 401 """execute <cmd> on the server
425 402
426 403 The command is expected to return a simple string.
427 404
428 405 returns the server reply as a string."""
429 406 raise NotImplementedError()
430 407
431 408 def _callstream(self, cmd, **args):
432 409 """execute <cmd> on the server
433 410
434 411 The command is expected to return a stream. Note that if the
435 412 command doesn't return a stream, _callstream behaves
436 413 differently for ssh and http peers.
437 414
438 415 returns the server reply as a file like object.
439 416 """
440 417 raise NotImplementedError()
441 418
442 419 def _callcompressable(self, cmd, **args):
443 420 """execute <cmd> on the server
444 421
445 422 The command is expected to return a stream.
446 423
447 424 The stream may have been compressed in some implementations. This
448 425 function takes care of the decompression. This is the only difference
449 426 with _callstream.
450 427
451 428 returns the server reply as a file like object.
452 429 """
453 430 raise NotImplementedError()
454 431
455 432 def _callpush(self, cmd, fp, **args):
456 433 """execute a <cmd> on server
457 434
458 435 The command is expected to be related to a push. Push has a special
459 436 return method.
460 437
461 438 returns the server reply as a (ret, output) tuple. ret is either
462 439 empty (error) or a stringified int.
463 440 """
464 441 raise NotImplementedError()
465 442
466 443 def _calltwowaystream(self, cmd, fp, **args):
467 444 """execute <cmd> on server
468 445
469 446 The command will send a stream to the server and get a stream in reply.
470 447 """
471 448 raise NotImplementedError()
472 449
473 450 def _abort(self, exception):
474 451 """clearly abort the wire protocol connection and raise the exception
475 452 """
476 453 raise NotImplementedError()
477 454
478 455 # server side
479 456
480 457 # wire protocol command can either return a string or one of these classes.
481 458
482 459 def getdispatchrepo(repo, proto, command):
483 460 """Obtain the repo used for processing wire protocol commands.
484 461
485 462 The intent of this function is to serve as a monkeypatch point for
486 463 extensions that need commands to operate on different repo views under
487 464 specialized circumstances.
488 465 """
489 466 return repo.filtered('served')
490 467
491 468 def dispatch(repo, proto, command):
492 469 repo = getdispatchrepo(repo, proto, command)
493 470
494 471 transportversion = wireprototypes.TRANSPORTS[proto.name]['version']
495 472 commandtable = commandsv2 if transportversion == 2 else commands
496 473 func, spec = commandtable[command]
497 474
498 475 args = proto.getargs(spec)
499 476
500 477 # Version 1 protocols define arguments as a list. Version 2 uses a dict.
501 478 if isinstance(args, list):
502 479 return func(repo, proto, *args)
503 480 elif isinstance(args, dict):
504 481 return func(repo, proto, **args)
505 482 else:
506 483 raise error.ProgrammingError('unexpected type returned from '
507 484 'proto.getargs(): %s' % type(args))
508 485
509 486 def options(cmd, keys, others):
510 487 opts = {}
511 488 for k in keys:
512 489 if k in others:
513 490 opts[k] = others[k]
514 491 del others[k]
515 492 if others:
516 493 procutil.stderr.write("warning: %s ignored unexpected arguments %s\n"
517 494 % (cmd, ",".join(others)))
518 495 return opts
519 496
520 497 def bundle1allowed(repo, action):
521 498 """Whether a bundle1 operation is allowed from the server.
522 499
523 500 Priority is:
524 501
525 502 1. server.bundle1gd.<action> (if generaldelta active)
526 503 2. server.bundle1.<action>
527 504 3. server.bundle1gd (if generaldelta active)
528 505 4. server.bundle1
529 506 """
530 507 ui = repo.ui
531 508 gd = 'generaldelta' in repo.requirements
532 509
533 510 if gd:
534 511 v = ui.configbool('server', 'bundle1gd.%s' % action)
535 512 if v is not None:
536 513 return v
537 514
538 515 v = ui.configbool('server', 'bundle1.%s' % action)
539 516 if v is not None:
540 517 return v
541 518
542 519 if gd:
543 520 v = ui.configbool('server', 'bundle1gd')
544 521 if v is not None:
545 522 return v
546 523
547 524 return ui.configbool('server', 'bundle1')
548 525
549 526 def supportedcompengines(ui, role):
550 527 """Obtain the list of supported compression engines for a request."""
551 528 assert role in (util.CLIENTROLE, util.SERVERROLE)
552 529
553 530 compengines = util.compengines.supportedwireengines(role)
554 531
555 532 # Allow config to override default list and ordering.
556 533 if role == util.SERVERROLE:
557 534 configengines = ui.configlist('server', 'compressionengines')
558 535 config = 'server.compressionengines'
559 536 else:
560 537 # This is currently implemented mainly to facilitate testing. In most
561 538 # cases, the server should be in charge of choosing a compression engine
562 539 # because a server has the most to lose from a sub-optimal choice. (e.g.
563 540 # CPU DoS due to an expensive engine or a network DoS due to poor
564 541 # compression ratio).
565 542 configengines = ui.configlist('experimental',
566 543 'clientcompressionengines')
567 544 config = 'experimental.clientcompressionengines'
568 545
569 546 # No explicit config. Filter out the ones that aren't supposed to be
570 547 # advertised and return default ordering.
571 548 if not configengines:
572 549 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
573 550 return [e for e in compengines
574 551 if getattr(e.wireprotosupport(), attr) > 0]
575 552
576 553 # If compression engines are listed in the config, assume there is a good
577 554 # reason for it (like server operators wanting to achieve specific
578 555 # performance characteristics). So fail fast if the config references
579 556 # unusable compression engines.
580 557 validnames = set(e.name() for e in compengines)
581 558 invalidnames = set(e for e in configengines if e not in validnames)
582 559 if invalidnames:
583 560 raise error.Abort(_('invalid compression engine defined in %s: %s') %
584 561 (config, ', '.join(sorted(invalidnames))))
585 562
586 563 compengines = [e for e in compengines if e.name() in configengines]
587 564 compengines = sorted(compengines,
588 565 key=lambda e: configengines.index(e.name()))
589 566
590 567 if not compengines:
591 568 raise error.Abort(_('%s config option does not specify any known '
592 569 'compression engines') % config,
593 570 hint=_('usable compression engines: %s') %
594 571 ', '.sorted(validnames))
595 572
596 573 return compengines
597 574
598 575 class commandentry(object):
599 576 """Represents a declared wire protocol command."""
600 577 def __init__(self, func, args='', transports=None,
601 578 permission='push'):
602 579 self.func = func
603 580 self.args = args
604 581 self.transports = transports or set()
605 582 self.permission = permission
606 583
607 584 def _merge(self, func, args):
608 585 """Merge this instance with an incoming 2-tuple.
609 586
610 587 This is called when a caller using the old 2-tuple API attempts
611 588 to replace an instance. The incoming values are merged with
612 589 data not captured by the 2-tuple and a new instance containing
613 590 the union of the two objects is returned.
614 591 """
615 592 return commandentry(func, args=args, transports=set(self.transports),
616 593 permission=self.permission)
617 594
618 595 # Old code treats instances as 2-tuples. So expose that interface.
619 596 def __iter__(self):
620 597 yield self.func
621 598 yield self.args
622 599
623 600 def __getitem__(self, i):
624 601 if i == 0:
625 602 return self.func
626 603 elif i == 1:
627 604 return self.args
628 605 else:
629 606 raise IndexError('can only access elements 0 and 1')
630 607
631 608 class commanddict(dict):
632 609 """Container for registered wire protocol commands.
633 610
634 611 It behaves like a dict. But __setitem__ is overwritten to allow silent
635 612 coercion of values from 2-tuples for API compatibility.
636 613 """
637 614 def __setitem__(self, k, v):
638 615 if isinstance(v, commandentry):
639 616 pass
640 617 # Cast 2-tuples to commandentry instances.
641 618 elif isinstance(v, tuple):
642 619 if len(v) != 2:
643 620 raise ValueError('command tuples must have exactly 2 elements')
644 621
645 622 # It is common for extensions to wrap wire protocol commands via
646 623 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
647 624 # doing this aren't aware of the new API that uses objects to store
648 625 # command entries, we automatically merge old state with new.
649 626 if k in self:
650 627 v = self[k]._merge(v[0], v[1])
651 628 else:
652 629 # Use default values from @wireprotocommand.
653 630 v = commandentry(v[0], args=v[1],
654 631 transports=set(wireprototypes.TRANSPORTS),
655 632 permission='push')
656 633 else:
657 634 raise ValueError('command entries must be commandentry instances '
658 635 'or 2-tuples')
659 636
660 637 return super(commanddict, self).__setitem__(k, v)
661 638
662 639 def commandavailable(self, command, proto):
663 640 """Determine if a command is available for the requested protocol."""
664 641 assert proto.name in wireprototypes.TRANSPORTS
665 642
666 643 entry = self.get(command)
667 644
668 645 if not entry:
669 646 return False
670 647
671 648 if proto.name not in entry.transports:
672 649 return False
673 650
674 651 return True
675 652
676 653 # Constants specifying which transports a wire protocol command should be
677 654 # available on. For use with @wireprotocommand.
678 655 POLICY_V1_ONLY = 'v1-only'
679 656 POLICY_V2_ONLY = 'v2-only'
680 657
681 658 # For version 1 transports.
682 659 commands = commanddict()
683 660
684 661 # For version 2 transports.
685 662 commandsv2 = commanddict()
686 663
687 664 def wireprotocommand(name, args=None, transportpolicy=POLICY_V1_ONLY,
688 665 permission='push'):
689 666 """Decorator to declare a wire protocol command.
690 667
691 668 ``name`` is the name of the wire protocol command being provided.
692 669
693 670 ``args`` defines the named arguments accepted by the command. It is
694 671 ideally a dict mapping argument names to their types. For backwards
695 672 compatibility, it can be a space-delimited list of argument names. For
696 673 version 1 transports, ``*`` denotes a special value that says to accept
697 674 all named arguments.
698 675
699 676 ``transportpolicy`` is a POLICY_* constant denoting which transports
700 677 this wire protocol command should be exposed to. By default, commands
701 678 are exposed to all wire protocol transports.
702 679
703 680 ``permission`` defines the permission type needed to run this command.
704 681 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
705 682 respectively. Default is to assume command requires ``push`` permissions
706 683 because otherwise commands not declaring their permissions could modify
707 684 a repository that is supposed to be read-only.
708 685 """
709 686 if transportpolicy == POLICY_V1_ONLY:
710 687 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
711 688 if v['version'] == 1}
712 689 transportversion = 1
713 690 elif transportpolicy == POLICY_V2_ONLY:
714 691 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
715 692 if v['version'] == 2}
716 693 transportversion = 2
717 694 else:
718 695 raise error.ProgrammingError('invalid transport policy value: %s' %
719 696 transportpolicy)
720 697
721 698 # Because SSHv2 is a mirror of SSHv1, we allow "batch" commands through to
722 699 # SSHv2.
723 700 # TODO undo this hack when SSH is using the unified frame protocol.
724 701 if name == b'batch':
725 702 transports.add(wireprototypes.SSHV2)
726 703
727 704 if permission not in ('push', 'pull'):
728 705 raise error.ProgrammingError('invalid wire protocol permission; '
729 706 'got %s; expected "push" or "pull"' %
730 707 permission)
731 708
732 709 if transportversion == 1:
733 710 if args is None:
734 711 args = ''
735 712
736 713 if not isinstance(args, bytes):
737 714 raise error.ProgrammingError('arguments for version 1 commands '
738 715 'must be declared as bytes')
739 716 elif transportversion == 2:
740 717 if args is None:
741 718 args = {}
742 719
743 720 if not isinstance(args, dict):
744 721 raise error.ProgrammingError('arguments for version 2 commands '
745 722 'must be declared as dicts')
746 723
747 724 def register(func):
748 725 if transportversion == 1:
749 726 if name in commands:
750 727 raise error.ProgrammingError('%s command already registered '
751 728 'for version 1' % name)
752 729 commands[name] = commandentry(func, args=args,
753 730 transports=transports,
754 731 permission=permission)
755 732 elif transportversion == 2:
756 733 if name in commandsv2:
757 734 raise error.ProgrammingError('%s command already registered '
758 735 'for version 2' % name)
759 736
760 737 commandsv2[name] = commandentry(func, args=args,
761 738 transports=transports,
762 739 permission=permission)
763 740 else:
764 741 raise error.ProgrammingError('unhandled transport version: %d' %
765 742 transportversion)
766 743
767 744 return func
768 745 return register
769 746
770 747 # TODO define a more appropriate permissions type to use for this.
771 748 @wireprotocommand('batch', 'cmds *', permission='pull',
772 749 transportpolicy=POLICY_V1_ONLY)
773 750 def batch(repo, proto, cmds, others):
774 751 unescapearg = wireprototypes.unescapebatcharg
775 752 repo = repo.filtered("served")
776 753 res = []
777 754 for pair in cmds.split(';'):
778 755 op, args = pair.split(' ', 1)
779 756 vals = {}
780 757 for a in args.split(','):
781 758 if a:
782 759 n, v = a.split('=')
783 760 vals[unescapearg(n)] = unescapearg(v)
784 761 func, spec = commands[op]
785 762
786 763 # Validate that client has permissions to perform this command.
787 764 perm = commands[op].permission
788 765 assert perm in ('push', 'pull')
789 766 proto.checkperm(perm)
790 767
791 768 if spec:
792 769 keys = spec.split()
793 770 data = {}
794 771 for k in keys:
795 772 if k == '*':
796 773 star = {}
797 774 for key in vals.keys():
798 775 if key not in keys:
799 776 star[key] = vals[key]
800 777 data['*'] = star
801 778 else:
802 779 data[k] = vals[k]
803 780 result = func(repo, proto, *[data[k] for k in keys])
804 781 else:
805 782 result = func(repo, proto)
806 783 if isinstance(result, wireprototypes.ooberror):
807 784 return result
808 785
809 786 # For now, all batchable commands must return bytesresponse or
810 787 # raw bytes (for backwards compatibility).
811 788 assert isinstance(result, (wireprototypes.bytesresponse, bytes))
812 789 if isinstance(result, wireprototypes.bytesresponse):
813 790 result = result.data
814 791 res.append(wireprototypes.escapebatcharg(result))
815 792
816 793 return wireprototypes.bytesresponse(';'.join(res))
817 794
818 795 @wireprotocommand('between', 'pairs', transportpolicy=POLICY_V1_ONLY,
819 796 permission='pull')
820 797 def between(repo, proto, pairs):
821 798 pairs = [wireprototypes.decodelist(p, '-') for p in pairs.split(" ")]
822 799 r = []
823 800 for b in repo.between(pairs):
824 801 r.append(wireprototypes.encodelist(b) + "\n")
825 802
826 803 return wireprototypes.bytesresponse(''.join(r))
827 804
828 805 @wireprotocommand('branchmap', permission='pull',
829 806 transportpolicy=POLICY_V1_ONLY)
830 807 def branchmap(repo, proto):
831 808 branchmap = repo.branchmap()
832 809 heads = []
833 810 for branch, nodes in branchmap.iteritems():
834 811 branchname = urlreq.quote(encoding.fromlocal(branch))
835 812 branchnodes = wireprototypes.encodelist(nodes)
836 813 heads.append('%s %s' % (branchname, branchnodes))
837 814
838 815 return wireprototypes.bytesresponse('\n'.join(heads))
839 816
840 817 @wireprotocommand('branches', 'nodes', transportpolicy=POLICY_V1_ONLY,
841 818 permission='pull')
842 819 def branches(repo, proto, nodes):
843 820 nodes = wireprototypes.decodelist(nodes)
844 821 r = []
845 822 for b in repo.branches(nodes):
846 823 r.append(wireprototypes.encodelist(b) + "\n")
847 824
848 825 return wireprototypes.bytesresponse(''.join(r))
849 826
850 827 @wireprotocommand('clonebundles', '', permission='pull',
851 828 transportpolicy=POLICY_V1_ONLY)
852 829 def clonebundles(repo, proto):
853 830 """Server command for returning info for available bundles to seed clones.
854 831
855 832 Clients will parse this response and determine what bundle to fetch.
856 833
857 834 Extensions may wrap this command to filter or dynamically emit data
858 835 depending on the request. e.g. you could advertise URLs for the closest
859 836 data center given the client's IP address.
860 837 """
861 838 return wireprototypes.bytesresponse(
862 839 repo.vfs.tryread('clonebundles.manifest'))
863 840
864 841 wireprotocaps = ['lookup', 'branchmap', 'pushkey',
865 842 'known', 'getbundle', 'unbundlehash']
866 843
867 844 def _capabilities(repo, proto):
868 845 """return a list of capabilities for a repo
869 846
870 847 This function exists to allow extensions to easily wrap capabilities
871 848 computation
872 849
873 850 - returns a lists: easy to alter
874 851 - change done here will be propagated to both `capabilities` and `hello`
875 852 command without any other action needed.
876 853 """
877 854 # copy to prevent modification of the global list
878 855 caps = list(wireprotocaps)
879 856
880 857 # Command of same name as capability isn't exposed to version 1 of
881 858 # transports. So conditionally add it.
882 859 if commands.commandavailable('changegroupsubset', proto):
883 860 caps.append('changegroupsubset')
884 861
885 862 if streamclone.allowservergeneration(repo):
886 863 if repo.ui.configbool('server', 'preferuncompressed'):
887 864 caps.append('stream-preferred')
888 865 requiredformats = repo.requirements & repo.supportedformats
889 866 # if our local revlogs are just revlogv1, add 'stream' cap
890 867 if not requiredformats - {'revlogv1'}:
891 868 caps.append('stream')
892 869 # otherwise, add 'streamreqs' detailing our local revlog format
893 870 else:
894 871 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
895 872 if repo.ui.configbool('experimental', 'bundle2-advertise'):
896 873 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
897 874 caps.append('bundle2=' + urlreq.quote(capsblob))
898 875 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
899 876
900 877 return proto.addcapabilities(repo, caps)
901 878
902 879 # If you are writing an extension and consider wrapping this function. Wrap
903 880 # `_capabilities` instead.
904 881 @wireprotocommand('capabilities', permission='pull',
905 882 transportpolicy=POLICY_V1_ONLY)
906 883 def capabilities(repo, proto):
907 884 caps = _capabilities(repo, proto)
908 885 return wireprototypes.bytesresponse(' '.join(sorted(caps)))
909 886
910 887 @wireprotocommand('changegroup', 'roots', transportpolicy=POLICY_V1_ONLY,
911 888 permission='pull')
912 889 def changegroup(repo, proto, roots):
913 890 nodes = wireprototypes.decodelist(roots)
914 891 outgoing = discovery.outgoing(repo, missingroots=nodes,
915 892 missingheads=repo.heads())
916 893 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
917 894 gen = iter(lambda: cg.read(32768), '')
918 895 return wireprototypes.streamres(gen=gen)
919 896
920 897 @wireprotocommand('changegroupsubset', 'bases heads',
921 898 transportpolicy=POLICY_V1_ONLY,
922 899 permission='pull')
923 900 def changegroupsubset(repo, proto, bases, heads):
924 901 bases = wireprototypes.decodelist(bases)
925 902 heads = wireprototypes.decodelist(heads)
926 903 outgoing = discovery.outgoing(repo, missingroots=bases,
927 904 missingheads=heads)
928 905 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
929 906 gen = iter(lambda: cg.read(32768), '')
930 907 return wireprototypes.streamres(gen=gen)
931 908
932 909 @wireprotocommand('debugwireargs', 'one two *',
933 910 permission='pull', transportpolicy=POLICY_V1_ONLY)
934 911 def debugwireargs(repo, proto, one, two, others):
935 912 # only accept optional args from the known set
936 913 opts = options('debugwireargs', ['three', 'four'], others)
937 914 return wireprototypes.bytesresponse(repo.debugwireargs(
938 915 one, two, **pycompat.strkwargs(opts)))
939 916
940 917 def find_pullbundle(repo, proto, opts, clheads, heads, common):
941 918 """Return a file object for the first matching pullbundle.
942 919
943 920 Pullbundles are specified in .hg/pullbundles.manifest similar to
944 921 clonebundles.
945 922 For each entry, the bundle specification is checked for compatibility:
946 923 - Client features vs the BUNDLESPEC.
947 924 - Revisions shared with the clients vs base revisions of the bundle.
948 925 A bundle can be applied only if all its base revisions are known by
949 926 the client.
950 927 - At least one leaf of the bundle's DAG is missing on the client.
951 928 - Every leaf of the bundle's DAG is part of node set the client wants.
952 929 E.g. do not send a bundle of all changes if the client wants only
953 930 one specific branch of many.
954 931 """
955 932 def decodehexstring(s):
956 933 return set([h.decode('hex') for h in s.split(';')])
957 934
958 935 manifest = repo.vfs.tryread('pullbundles.manifest')
959 936 if not manifest:
960 937 return None
961 938 res = exchange.parseclonebundlesmanifest(repo, manifest)
962 939 res = exchange.filterclonebundleentries(repo, res)
963 940 if not res:
964 941 return None
965 942 cl = repo.changelog
966 943 heads_anc = cl.ancestors([cl.rev(rev) for rev in heads], inclusive=True)
967 944 common_anc = cl.ancestors([cl.rev(rev) for rev in common], inclusive=True)
968 945 compformats = clientcompressionsupport(proto)
969 946 for entry in res:
970 947 if 'COMPRESSION' in entry and entry['COMPRESSION'] not in compformats:
971 948 continue
972 949 # No test yet for VERSION, since V2 is supported by any client
973 950 # that advertises partial pulls
974 951 if 'heads' in entry:
975 952 try:
976 953 bundle_heads = decodehexstring(entry['heads'])
977 954 except TypeError:
978 955 # Bad heads entry
979 956 continue
980 957 if bundle_heads.issubset(common):
981 958 continue # Nothing new
982 959 if all(cl.rev(rev) in common_anc for rev in bundle_heads):
983 960 continue # Still nothing new
984 961 if any(cl.rev(rev) not in heads_anc and
985 962 cl.rev(rev) not in common_anc for rev in bundle_heads):
986 963 continue
987 964 if 'bases' in entry:
988 965 try:
989 966 bundle_bases = decodehexstring(entry['bases'])
990 967 except TypeError:
991 968 # Bad bases entry
992 969 continue
993 970 if not all(cl.rev(rev) in common_anc for rev in bundle_bases):
994 971 continue
995 972 path = entry['URL']
996 973 repo.ui.debug('sending pullbundle "%s"\n' % path)
997 974 try:
998 975 return repo.vfs.open(path)
999 976 except IOError:
1000 977 repo.ui.debug('pullbundle "%s" not accessible\n' % path)
1001 978 continue
1002 979 return None
1003 980
1004 981 @wireprotocommand('getbundle', '*', permission='pull',
1005 982 transportpolicy=POLICY_V1_ONLY)
1006 983 def getbundle(repo, proto, others):
1007 opts = options('getbundle', gboptsmap.keys(), others)
984 opts = options('getbundle', wireprototypes.GETBUNDLE_ARGUMENTS.keys(),
985 others)
1008 986 for k, v in opts.iteritems():
1009 keytype = gboptsmap[k]
987 keytype = wireprototypes.GETBUNDLE_ARGUMENTS[k]
1010 988 if keytype == 'nodes':
1011 989 opts[k] = wireprototypes.decodelist(v)
1012 990 elif keytype == 'csv':
1013 991 opts[k] = list(v.split(','))
1014 992 elif keytype == 'scsv':
1015 993 opts[k] = set(v.split(','))
1016 994 elif keytype == 'boolean':
1017 995 # Client should serialize False as '0', which is a non-empty string
1018 996 # so it evaluates as a True bool.
1019 997 if v == '0':
1020 998 opts[k] = False
1021 999 else:
1022 1000 opts[k] = bool(v)
1023 1001 elif keytype != 'plain':
1024 1002 raise KeyError('unknown getbundle option type %s'
1025 1003 % keytype)
1026 1004
1027 1005 if not bundle1allowed(repo, 'pull'):
1028 1006 if not exchange.bundle2requested(opts.get('bundlecaps')):
1029 1007 if proto.name == 'http-v1':
1030 1008 return wireprototypes.ooberror(bundle2required)
1031 1009 raise error.Abort(bundle2requiredmain,
1032 1010 hint=bundle2requiredhint)
1033 1011
1034 1012 prefercompressed = True
1035 1013
1036 1014 try:
1037 1015 clheads = set(repo.changelog.heads())
1038 1016 heads = set(opts.get('heads', set()))
1039 1017 common = set(opts.get('common', set()))
1040 1018 common.discard(nullid)
1041 1019 if (repo.ui.configbool('server', 'pullbundle') and
1042 1020 'partial-pull' in proto.getprotocaps()):
1043 1021 # Check if a pre-built bundle covers this request.
1044 1022 bundle = find_pullbundle(repo, proto, opts, clheads, heads, common)
1045 1023 if bundle:
1046 1024 return wireprototypes.streamres(gen=util.filechunkiter(bundle),
1047 1025 prefer_uncompressed=True)
1048 1026
1049 1027 if repo.ui.configbool('server', 'disablefullbundle'):
1050 1028 # Check to see if this is a full clone.
1051 1029 changegroup = opts.get('cg', True)
1052 1030 if changegroup and not common and clheads == heads:
1053 1031 raise error.Abort(
1054 1032 _('server has pull-based clones disabled'),
1055 1033 hint=_('remove --pull if specified or upgrade Mercurial'))
1056 1034
1057 1035 info, chunks = exchange.getbundlechunks(repo, 'serve',
1058 1036 **pycompat.strkwargs(opts))
1059 1037 prefercompressed = info.get('prefercompressed', True)
1060 1038 except error.Abort as exc:
1061 1039 # cleanly forward Abort error to the client
1062 1040 if not exchange.bundle2requested(opts.get('bundlecaps')):
1063 1041 if proto.name == 'http-v1':
1064 1042 return wireprototypes.ooberror(pycompat.bytestr(exc) + '\n')
1065 1043 raise # cannot do better for bundle1 + ssh
1066 1044 # bundle2 request expect a bundle2 reply
1067 1045 bundler = bundle2.bundle20(repo.ui)
1068 1046 manargs = [('message', pycompat.bytestr(exc))]
1069 1047 advargs = []
1070 1048 if exc.hint is not None:
1071 1049 advargs.append(('hint', exc.hint))
1072 1050 bundler.addpart(bundle2.bundlepart('error:abort',
1073 1051 manargs, advargs))
1074 1052 chunks = bundler.getchunks()
1075 1053 prefercompressed = False
1076 1054
1077 1055 return wireprototypes.streamres(
1078 1056 gen=chunks, prefer_uncompressed=not prefercompressed)
1079 1057
1080 1058 @wireprotocommand('heads', permission='pull', transportpolicy=POLICY_V1_ONLY)
1081 1059 def heads(repo, proto):
1082 1060 h = repo.heads()
1083 1061 return wireprototypes.bytesresponse(wireprototypes.encodelist(h) + '\n')
1084 1062
1085 1063 @wireprotocommand('hello', permission='pull', transportpolicy=POLICY_V1_ONLY)
1086 1064 def hello(repo, proto):
1087 1065 """Called as part of SSH handshake to obtain server info.
1088 1066
1089 1067 Returns a list of lines describing interesting things about the
1090 1068 server, in an RFC822-like format.
1091 1069
1092 1070 Currently, the only one defined is ``capabilities``, which consists of a
1093 1071 line of space separated tokens describing server abilities:
1094 1072
1095 1073 capabilities: <token0> <token1> <token2>
1096 1074 """
1097 1075 caps = capabilities(repo, proto).data
1098 1076 return wireprototypes.bytesresponse('capabilities: %s\n' % caps)
1099 1077
1100 1078 @wireprotocommand('listkeys', 'namespace', permission='pull',
1101 1079 transportpolicy=POLICY_V1_ONLY)
1102 1080 def listkeys(repo, proto, namespace):
1103 1081 d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
1104 1082 return wireprototypes.bytesresponse(pushkeymod.encodekeys(d))
1105 1083
1106 1084 @wireprotocommand('lookup', 'key', permission='pull',
1107 1085 transportpolicy=POLICY_V1_ONLY)
1108 1086 def lookup(repo, proto, key):
1109 1087 try:
1110 1088 k = encoding.tolocal(key)
1111 1089 n = repo.lookup(k)
1112 1090 r = hex(n)
1113 1091 success = 1
1114 1092 except Exception as inst:
1115 1093 r = stringutil.forcebytestr(inst)
1116 1094 success = 0
1117 1095 return wireprototypes.bytesresponse('%d %s\n' % (success, r))
1118 1096
1119 1097 @wireprotocommand('known', 'nodes *', permission='pull',
1120 1098 transportpolicy=POLICY_V1_ONLY)
1121 1099 def known(repo, proto, nodes, others):
1122 1100 v = ''.join(b and '1' or '0'
1123 1101 for b in repo.known(wireprototypes.decodelist(nodes)))
1124 1102 return wireprototypes.bytesresponse(v)
1125 1103
1126 1104 @wireprotocommand('protocaps', 'caps', permission='pull',
1127 1105 transportpolicy=POLICY_V1_ONLY)
1128 1106 def protocaps(repo, proto, caps):
1129 1107 if proto.name == wireprototypes.SSHV1:
1130 1108 proto._protocaps = set(caps.split(' '))
1131 1109 return wireprototypes.bytesresponse('OK')
1132 1110
1133 1111 @wireprotocommand('pushkey', 'namespace key old new', permission='push',
1134 1112 transportpolicy=POLICY_V1_ONLY)
1135 1113 def pushkey(repo, proto, namespace, key, old, new):
1136 1114 # compatibility with pre-1.8 clients which were accidentally
1137 1115 # sending raw binary nodes rather than utf-8-encoded hex
1138 1116 if len(new) == 20 and stringutil.escapestr(new) != new:
1139 1117 # looks like it could be a binary node
1140 1118 try:
1141 1119 new.decode('utf-8')
1142 1120 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
1143 1121 except UnicodeDecodeError:
1144 1122 pass # binary, leave unmodified
1145 1123 else:
1146 1124 new = encoding.tolocal(new) # normal path
1147 1125
1148 1126 with proto.mayberedirectstdio() as output:
1149 1127 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
1150 1128 encoding.tolocal(old), new) or False
1151 1129
1152 1130 output = output.getvalue() if output else ''
1153 1131 return wireprototypes.bytesresponse('%d\n%s' % (int(r), output))
1154 1132
1155 1133 @wireprotocommand('stream_out', permission='pull',
1156 1134 transportpolicy=POLICY_V1_ONLY)
1157 1135 def stream(repo, proto):
1158 1136 '''If the server supports streaming clone, it advertises the "stream"
1159 1137 capability with a value representing the version and flags of the repo
1160 1138 it is serving. Client checks to see if it understands the format.
1161 1139 '''
1162 1140 return wireprototypes.streamreslegacy(
1163 1141 streamclone.generatev1wireproto(repo))
1164 1142
1165 1143 @wireprotocommand('unbundle', 'heads', permission='push',
1166 1144 transportpolicy=POLICY_V1_ONLY)
1167 1145 def unbundle(repo, proto, heads):
1168 1146 their_heads = wireprototypes.decodelist(heads)
1169 1147
1170 1148 with proto.mayberedirectstdio() as output:
1171 1149 try:
1172 1150 exchange.check_heads(repo, their_heads, 'preparing changes')
1173 1151 cleanup = lambda: None
1174 1152 try:
1175 1153 payload = proto.getpayload()
1176 1154 if repo.ui.configbool('server', 'streamunbundle'):
1177 1155 def cleanup():
1178 1156 # Ensure that the full payload is consumed, so
1179 1157 # that the connection doesn't contain trailing garbage.
1180 1158 for p in payload:
1181 1159 pass
1182 1160 fp = util.chunkbuffer(payload)
1183 1161 else:
1184 1162 # write bundle data to temporary file as it can be big
1185 1163 fp, tempname = None, None
1186 1164 def cleanup():
1187 1165 if fp:
1188 1166 fp.close()
1189 1167 if tempname:
1190 1168 os.unlink(tempname)
1191 1169 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1192 1170 repo.ui.debug('redirecting incoming bundle to %s\n' %
1193 1171 tempname)
1194 1172 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
1195 1173 r = 0
1196 1174 for p in payload:
1197 1175 fp.write(p)
1198 1176 fp.seek(0)
1199 1177
1200 1178 gen = exchange.readbundle(repo.ui, fp, None)
1201 1179 if (isinstance(gen, changegroupmod.cg1unpacker)
1202 1180 and not bundle1allowed(repo, 'push')):
1203 1181 if proto.name == 'http-v1':
1204 1182 # need to special case http because stderr do not get to
1205 1183 # the http client on failed push so we need to abuse
1206 1184 # some other error type to make sure the message get to
1207 1185 # the user.
1208 1186 return wireprototypes.ooberror(bundle2required)
1209 1187 raise error.Abort(bundle2requiredmain,
1210 1188 hint=bundle2requiredhint)
1211 1189
1212 1190 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1213 1191 proto.client())
1214 1192 if util.safehasattr(r, 'addpart'):
1215 1193 # The return looks streamable, we are in the bundle2 case
1216 1194 # and should return a stream.
1217 1195 return wireprototypes.streamreslegacy(gen=r.getchunks())
1218 1196 return wireprototypes.pushres(
1219 1197 r, output.getvalue() if output else '')
1220 1198
1221 1199 finally:
1222 1200 cleanup()
1223 1201
1224 1202 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1225 1203 # handle non-bundle2 case first
1226 1204 if not getattr(exc, 'duringunbundle2', False):
1227 1205 try:
1228 1206 raise
1229 1207 except error.Abort:
1230 1208 # The old code we moved used procutil.stderr directly.
1231 1209 # We did not change it to minimise code change.
1232 1210 # This need to be moved to something proper.
1233 1211 # Feel free to do it.
1234 1212 procutil.stderr.write("abort: %s\n" % exc)
1235 1213 if exc.hint is not None:
1236 1214 procutil.stderr.write("(%s)\n" % exc.hint)
1237 1215 procutil.stderr.flush()
1238 1216 return wireprototypes.pushres(
1239 1217 0, output.getvalue() if output else '')
1240 1218 except error.PushRaced:
1241 1219 return wireprototypes.pusherr(
1242 1220 pycompat.bytestr(exc),
1243 1221 output.getvalue() if output else '')
1244 1222
1245 1223 bundler = bundle2.bundle20(repo.ui)
1246 1224 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1247 1225 bundler.addpart(out)
1248 1226 try:
1249 1227 try:
1250 1228 raise
1251 1229 except error.PushkeyFailed as exc:
1252 1230 # check client caps
1253 1231 remotecaps = getattr(exc, '_replycaps', None)
1254 1232 if (remotecaps is not None
1255 1233 and 'pushkey' not in remotecaps.get('error', ())):
1256 1234 # no support remote side, fallback to Abort handler.
1257 1235 raise
1258 1236 part = bundler.newpart('error:pushkey')
1259 1237 part.addparam('in-reply-to', exc.partid)
1260 1238 if exc.namespace is not None:
1261 1239 part.addparam('namespace', exc.namespace,
1262 1240 mandatory=False)
1263 1241 if exc.key is not None:
1264 1242 part.addparam('key', exc.key, mandatory=False)
1265 1243 if exc.new is not None:
1266 1244 part.addparam('new', exc.new, mandatory=False)
1267 1245 if exc.old is not None:
1268 1246 part.addparam('old', exc.old, mandatory=False)
1269 1247 if exc.ret is not None:
1270 1248 part.addparam('ret', exc.ret, mandatory=False)
1271 1249 except error.BundleValueError as exc:
1272 1250 errpart = bundler.newpart('error:unsupportedcontent')
1273 1251 if exc.parttype is not None:
1274 1252 errpart.addparam('parttype', exc.parttype)
1275 1253 if exc.params:
1276 1254 errpart.addparam('params', '\0'.join(exc.params))
1277 1255 except error.Abort as exc:
1278 1256 manargs = [('message', stringutil.forcebytestr(exc))]
1279 1257 advargs = []
1280 1258 if exc.hint is not None:
1281 1259 advargs.append(('hint', exc.hint))
1282 1260 bundler.addpart(bundle2.bundlepart('error:abort',
1283 1261 manargs, advargs))
1284 1262 except error.PushRaced as exc:
1285 1263 bundler.newpart('error:pushraced',
1286 1264 [('message', stringutil.forcebytestr(exc))])
1287 1265 return wireprototypes.streamreslegacy(gen=bundler.getchunks())
@@ -1,203 +1,227 b''
1 1 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
2 2 #
3 3 # This software may be used and distributed according to the terms of the
4 4 # GNU General Public License version 2 or any later version.
5 5
6 6 from __future__ import absolute_import
7 7
8 8 from .node import (
9 9 bin,
10 10 hex,
11 11 )
12 12 from .thirdparty.zope import (
13 13 interface as zi,
14 14 )
15 15
16 16 # Names of the SSH protocol implementations.
17 17 SSHV1 = 'ssh-v1'
18 18 # These are advertised over the wire. Increment the counters at the end
19 19 # to reflect BC breakages.
20 20 SSHV2 = 'exp-ssh-v2-0001'
21 21 HTTPV2 = 'exp-http-v2-0001'
22 22
23 23 # All available wire protocol transports.
24 24 TRANSPORTS = {
25 25 SSHV1: {
26 26 'transport': 'ssh',
27 27 'version': 1,
28 28 },
29 29 SSHV2: {
30 30 'transport': 'ssh',
31 31 # TODO mark as version 2 once all commands are implemented.
32 32 'version': 1,
33 33 },
34 34 'http-v1': {
35 35 'transport': 'http',
36 36 'version': 1,
37 37 },
38 38 HTTPV2: {
39 39 'transport': 'http',
40 40 'version': 2,
41 41 }
42 42 }
43 43
44 44 class bytesresponse(object):
45 45 """A wire protocol response consisting of raw bytes."""
46 46 def __init__(self, data):
47 47 self.data = data
48 48
49 49 class ooberror(object):
50 50 """wireproto reply: failure of a batch of operation
51 51
52 52 Something failed during a batch call. The error message is stored in
53 53 `self.message`.
54 54 """
55 55 def __init__(self, message):
56 56 self.message = message
57 57
58 58 class pushres(object):
59 59 """wireproto reply: success with simple integer return
60 60
61 61 The call was successful and returned an integer contained in `self.res`.
62 62 """
63 63 def __init__(self, res, output):
64 64 self.res = res
65 65 self.output = output
66 66
67 67 class pusherr(object):
68 68 """wireproto reply: failure
69 69
70 70 The call failed. The `self.res` attribute contains the error message.
71 71 """
72 72 def __init__(self, res, output):
73 73 self.res = res
74 74 self.output = output
75 75
76 76 class streamres(object):
77 77 """wireproto reply: binary stream
78 78
79 79 The call was successful and the result is a stream.
80 80
81 81 Accepts a generator containing chunks of data to be sent to the client.
82 82
83 83 ``prefer_uncompressed`` indicates that the data is expected to be
84 84 uncompressable and that the stream should therefore use the ``none``
85 85 engine.
86 86 """
87 87 def __init__(self, gen=None, prefer_uncompressed=False):
88 88 self.gen = gen
89 89 self.prefer_uncompressed = prefer_uncompressed
90 90
91 91 class streamreslegacy(object):
92 92 """wireproto reply: uncompressed binary stream
93 93
94 94 The call was successful and the result is a stream.
95 95
96 96 Accepts a generator containing chunks of data to be sent to the client.
97 97
98 98 Like ``streamres``, but sends an uncompressed data for "version 1" clients
99 99 using the application/mercurial-0.1 media type.
100 100 """
101 101 def __init__(self, gen=None):
102 102 self.gen = gen
103 103
104 104 class cborresponse(object):
105 105 """Encode the response value as CBOR."""
106 106 def __init__(self, v):
107 107 self.value = v
108 108
109 109 # list of nodes encoding / decoding
110 110 def decodelist(l, sep=' '):
111 111 if l:
112 112 return [bin(v) for v in l.split(sep)]
113 113 return []
114 114
115 115 def encodelist(l, sep=' '):
116 116 try:
117 117 return sep.join(map(hex, l))
118 118 except TypeError:
119 119 raise
120 120
121 121 # batched call argument encoding
122 122
123 123 def escapebatcharg(plain):
124 124 return (plain
125 125 .replace(':', ':c')
126 126 .replace(',', ':o')
127 127 .replace(';', ':s')
128 128 .replace('=', ':e'))
129 129
130 130 def unescapebatcharg(escaped):
131 131 return (escaped
132 132 .replace(':e', '=')
133 133 .replace(':s', ';')
134 134 .replace(':o', ',')
135 135 .replace(':c', ':'))
136 136
137 # mapping of options accepted by getbundle and their types
138 #
139 # Meant to be extended by extensions. It is extensions responsibility to ensure
140 # such options are properly processed in exchange.getbundle.
141 #
142 # supported types are:
143 #
144 # :nodes: list of binary nodes
145 # :csv: list of comma-separated values
146 # :scsv: list of comma-separated values return as set
147 # :plain: string with no transformation needed.
148 GETBUNDLE_ARGUMENTS = {
149 'heads': 'nodes',
150 'bookmarks': 'boolean',
151 'common': 'nodes',
152 'obsmarkers': 'boolean',
153 'phases': 'boolean',
154 'bundlecaps': 'scsv',
155 'listkeys': 'csv',
156 'cg': 'boolean',
157 'cbattempted': 'boolean',
158 'stream': 'boolean',
159 }
160
137 161 class baseprotocolhandler(zi.Interface):
138 162 """Abstract base class for wire protocol handlers.
139 163
140 164 A wire protocol handler serves as an interface between protocol command
141 165 handlers and the wire protocol transport layer. Protocol handlers provide
142 166 methods to read command arguments, redirect stdio for the duration of
143 167 the request, handle response types, etc.
144 168 """
145 169
146 170 name = zi.Attribute(
147 171 """The name of the protocol implementation.
148 172
149 173 Used for uniquely identifying the transport type.
150 174 """)
151 175
152 176 def getargs(args):
153 177 """return the value for arguments in <args>
154 178
155 179 For version 1 transports, returns a list of values in the same
156 180 order they appear in ``args``. For version 2 transports, returns
157 181 a dict mapping argument name to value.
158 182 """
159 183
160 184 def getprotocaps():
161 185 """Returns the list of protocol-level capabilities of client
162 186
163 187 Returns a list of capabilities as declared by the client for
164 188 the current request (or connection for stateful protocol handlers)."""
165 189
166 190 def getpayload():
167 191 """Provide a generator for the raw payload.
168 192
169 193 The caller is responsible for ensuring that the full payload is
170 194 processed.
171 195 """
172 196
173 197 def mayberedirectstdio():
174 198 """Context manager to possibly redirect stdio.
175 199
176 200 The context manager yields a file-object like object that receives
177 201 stdout and stderr output when the context manager is active. Or it
178 202 yields ``None`` if no I/O redirection occurs.
179 203
180 204 The intent of this context manager is to capture stdio output
181 205 so it may be sent in the response. Some transports support streaming
182 206 stdio to the client in real time. For these transports, stdio output
183 207 won't be captured.
184 208 """
185 209
186 210 def client():
187 211 """Returns a string representation of this client (as bytes)."""
188 212
189 213 def addcapabilities(repo, caps):
190 214 """Adds advertised capabilities specific to this protocol.
191 215
192 216 Receives the list of capabilities collected so far.
193 217
194 218 Returns a list of capabilities. The passed in argument can be returned.
195 219 """
196 220
197 221 def checkperm(perm):
198 222 """Validate that the client has permissions to perform a request.
199 223
200 224 The argument is the permission required to proceed. If the client
201 225 doesn't have that permission, the exception should raise or abort
202 226 in a protocol specific manner.
203 227 """
General Comments 0
You need to be logged in to leave comments. Login now