##// END OF EJS Templates
wireproto: move gboptsmap to wireprototypes and rename (API)...
Gregory Szorc -
r37631:96d73560 default
parent child Browse files
Show More
@@ -1,505 +1,507 b''
1 # narrowbundle2.py - bundle2 extensions for narrow repository support
1 # narrowbundle2.py - bundle2 extensions for narrow repository support
2 #
2 #
3 # Copyright 2017 Google, Inc.
3 # Copyright 2017 Google, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import collections
10 import collections
11 import errno
11 import errno
12 import struct
12 import struct
13
13
14 from mercurial.i18n import _
14 from mercurial.i18n import _
15 from mercurial.node import (
15 from mercurial.node import (
16 bin,
16 bin,
17 nullid,
17 nullid,
18 nullrev,
18 nullrev,
19 )
19 )
20 from mercurial import (
20 from mercurial import (
21 bundle2,
21 bundle2,
22 changegroup,
22 changegroup,
23 dagutil,
23 dagutil,
24 error,
24 error,
25 exchange,
25 exchange,
26 extensions,
26 extensions,
27 narrowspec,
27 narrowspec,
28 repair,
28 repair,
29 util,
29 util,
30 wireproto,
30 wireprototypes,
31 )
31 )
32 from mercurial.utils import (
32 from mercurial.utils import (
33 stringutil,
33 stringutil,
34 )
34 )
35
35
36 NARROWCAP = 'narrow'
36 NARROWCAP = 'narrow'
37 _NARROWACL_SECTION = 'narrowhgacl'
37 _NARROWACL_SECTION = 'narrowhgacl'
38 _CHANGESPECPART = NARROWCAP + ':changespec'
38 _CHANGESPECPART = NARROWCAP + ':changespec'
39 _SPECPART = NARROWCAP + ':spec'
39 _SPECPART = NARROWCAP + ':spec'
40 _SPECPART_INCLUDE = 'include'
40 _SPECPART_INCLUDE = 'include'
41 _SPECPART_EXCLUDE = 'exclude'
41 _SPECPART_EXCLUDE = 'exclude'
42 _KILLNODESIGNAL = 'KILL'
42 _KILLNODESIGNAL = 'KILL'
43 _DONESIGNAL = 'DONE'
43 _DONESIGNAL = 'DONE'
44 _ELIDEDCSHEADER = '>20s20s20sl' # cset id, p1, p2, len(text)
44 _ELIDEDCSHEADER = '>20s20s20sl' # cset id, p1, p2, len(text)
45 _ELIDEDMFHEADER = '>20s20s20s20sl' # manifest id, p1, p2, link id, len(text)
45 _ELIDEDMFHEADER = '>20s20s20s20sl' # manifest id, p1, p2, link id, len(text)
46 _CSHEADERSIZE = struct.calcsize(_ELIDEDCSHEADER)
46 _CSHEADERSIZE = struct.calcsize(_ELIDEDCSHEADER)
47 _MFHEADERSIZE = struct.calcsize(_ELIDEDMFHEADER)
47 _MFHEADERSIZE = struct.calcsize(_ELIDEDMFHEADER)
48
48
49 # When advertising capabilities, always include narrow clone support.
49 # When advertising capabilities, always include narrow clone support.
50 def getrepocaps_narrow(orig, repo, **kwargs):
50 def getrepocaps_narrow(orig, repo, **kwargs):
51 caps = orig(repo, **kwargs)
51 caps = orig(repo, **kwargs)
52 caps[NARROWCAP] = ['v0']
52 caps[NARROWCAP] = ['v0']
53 return caps
53 return caps
54
54
55 def _computeellipsis(repo, common, heads, known, match, depth=None):
55 def _computeellipsis(repo, common, heads, known, match, depth=None):
56 """Compute the shape of a narrowed DAG.
56 """Compute the shape of a narrowed DAG.
57
57
58 Args:
58 Args:
59 repo: The repository we're transferring.
59 repo: The repository we're transferring.
60 common: The roots of the DAG range we're transferring.
60 common: The roots of the DAG range we're transferring.
61 May be just [nullid], which means all ancestors of heads.
61 May be just [nullid], which means all ancestors of heads.
62 heads: The heads of the DAG range we're transferring.
62 heads: The heads of the DAG range we're transferring.
63 match: The narrowmatcher that allows us to identify relevant changes.
63 match: The narrowmatcher that allows us to identify relevant changes.
64 depth: If not None, only consider nodes to be full nodes if they are at
64 depth: If not None, only consider nodes to be full nodes if they are at
65 most depth changesets away from one of heads.
65 most depth changesets away from one of heads.
66
66
67 Returns:
67 Returns:
68 A tuple of (visitnodes, relevant_nodes, ellipsisroots) where:
68 A tuple of (visitnodes, relevant_nodes, ellipsisroots) where:
69
69
70 visitnodes: The list of nodes (either full or ellipsis) which
70 visitnodes: The list of nodes (either full or ellipsis) which
71 need to be sent to the client.
71 need to be sent to the client.
72 relevant_nodes: The set of changelog nodes which change a file inside
72 relevant_nodes: The set of changelog nodes which change a file inside
73 the narrowspec. The client needs these as non-ellipsis nodes.
73 the narrowspec. The client needs these as non-ellipsis nodes.
74 ellipsisroots: A dict of {rev: parents} that is used in
74 ellipsisroots: A dict of {rev: parents} that is used in
75 narrowchangegroup to produce ellipsis nodes with the
75 narrowchangegroup to produce ellipsis nodes with the
76 correct parents.
76 correct parents.
77 """
77 """
78 cl = repo.changelog
78 cl = repo.changelog
79 mfl = repo.manifestlog
79 mfl = repo.manifestlog
80
80
81 cldag = dagutil.revlogdag(cl)
81 cldag = dagutil.revlogdag(cl)
82 # dagutil does not like nullid/nullrev
82 # dagutil does not like nullid/nullrev
83 commonrevs = cldag.internalizeall(common - set([nullid])) | set([nullrev])
83 commonrevs = cldag.internalizeall(common - set([nullid])) | set([nullrev])
84 headsrevs = cldag.internalizeall(heads)
84 headsrevs = cldag.internalizeall(heads)
85 if depth:
85 if depth:
86 revdepth = {h: 0 for h in headsrevs}
86 revdepth = {h: 0 for h in headsrevs}
87
87
88 ellipsisheads = collections.defaultdict(set)
88 ellipsisheads = collections.defaultdict(set)
89 ellipsisroots = collections.defaultdict(set)
89 ellipsisroots = collections.defaultdict(set)
90
90
91 def addroot(head, curchange):
91 def addroot(head, curchange):
92 """Add a root to an ellipsis head, splitting heads with 3 roots."""
92 """Add a root to an ellipsis head, splitting heads with 3 roots."""
93 ellipsisroots[head].add(curchange)
93 ellipsisroots[head].add(curchange)
94 # Recursively split ellipsis heads with 3 roots by finding the
94 # Recursively split ellipsis heads with 3 roots by finding the
95 # roots' youngest common descendant which is an elided merge commit.
95 # roots' youngest common descendant which is an elided merge commit.
96 # That descendant takes 2 of the 3 roots as its own, and becomes a
96 # That descendant takes 2 of the 3 roots as its own, and becomes a
97 # root of the head.
97 # root of the head.
98 while len(ellipsisroots[head]) > 2:
98 while len(ellipsisroots[head]) > 2:
99 child, roots = splithead(head)
99 child, roots = splithead(head)
100 splitroots(head, child, roots)
100 splitroots(head, child, roots)
101 head = child # Recurse in case we just added a 3rd root
101 head = child # Recurse in case we just added a 3rd root
102
102
103 def splitroots(head, child, roots):
103 def splitroots(head, child, roots):
104 ellipsisroots[head].difference_update(roots)
104 ellipsisroots[head].difference_update(roots)
105 ellipsisroots[head].add(child)
105 ellipsisroots[head].add(child)
106 ellipsisroots[child].update(roots)
106 ellipsisroots[child].update(roots)
107 ellipsisroots[child].discard(child)
107 ellipsisroots[child].discard(child)
108
108
109 def splithead(head):
109 def splithead(head):
110 r1, r2, r3 = sorted(ellipsisroots[head])
110 r1, r2, r3 = sorted(ellipsisroots[head])
111 for nr1, nr2 in ((r2, r3), (r1, r3), (r1, r2)):
111 for nr1, nr2 in ((r2, r3), (r1, r3), (r1, r2)):
112 mid = repo.revs('sort(merge() & %d::%d & %d::%d, -rev)',
112 mid = repo.revs('sort(merge() & %d::%d & %d::%d, -rev)',
113 nr1, head, nr2, head)
113 nr1, head, nr2, head)
114 for j in mid:
114 for j in mid:
115 if j == nr2:
115 if j == nr2:
116 return nr2, (nr1, nr2)
116 return nr2, (nr1, nr2)
117 if j not in ellipsisroots or len(ellipsisroots[j]) < 2:
117 if j not in ellipsisroots or len(ellipsisroots[j]) < 2:
118 return j, (nr1, nr2)
118 return j, (nr1, nr2)
119 raise error.Abort('Failed to split up ellipsis node! head: %d, '
119 raise error.Abort('Failed to split up ellipsis node! head: %d, '
120 'roots: %d %d %d' % (head, r1, r2, r3))
120 'roots: %d %d %d' % (head, r1, r2, r3))
121
121
122 missing = list(cl.findmissingrevs(common=commonrevs, heads=headsrevs))
122 missing = list(cl.findmissingrevs(common=commonrevs, heads=headsrevs))
123 visit = reversed(missing)
123 visit = reversed(missing)
124 relevant_nodes = set()
124 relevant_nodes = set()
125 visitnodes = [cl.node(m) for m in missing]
125 visitnodes = [cl.node(m) for m in missing]
126 required = set(headsrevs) | known
126 required = set(headsrevs) | known
127 for rev in visit:
127 for rev in visit:
128 clrev = cl.changelogrevision(rev)
128 clrev = cl.changelogrevision(rev)
129 ps = cldag.parents(rev)
129 ps = cldag.parents(rev)
130 if depth is not None:
130 if depth is not None:
131 curdepth = revdepth[rev]
131 curdepth = revdepth[rev]
132 for p in ps:
132 for p in ps:
133 revdepth[p] = min(curdepth + 1, revdepth.get(p, depth + 1))
133 revdepth[p] = min(curdepth + 1, revdepth.get(p, depth + 1))
134 needed = False
134 needed = False
135 shallow_enough = depth is None or revdepth[rev] <= depth
135 shallow_enough = depth is None or revdepth[rev] <= depth
136 if shallow_enough:
136 if shallow_enough:
137 curmf = mfl[clrev.manifest].read()
137 curmf = mfl[clrev.manifest].read()
138 if ps:
138 if ps:
139 # We choose to not trust the changed files list in
139 # We choose to not trust the changed files list in
140 # changesets because it's not always correct. TODO: could
140 # changesets because it's not always correct. TODO: could
141 # we trust it for the non-merge case?
141 # we trust it for the non-merge case?
142 p1mf = mfl[cl.changelogrevision(ps[0]).manifest].read()
142 p1mf = mfl[cl.changelogrevision(ps[0]).manifest].read()
143 needed = bool(curmf.diff(p1mf, match))
143 needed = bool(curmf.diff(p1mf, match))
144 if not needed and len(ps) > 1:
144 if not needed and len(ps) > 1:
145 # For merge changes, the list of changed files is not
145 # For merge changes, the list of changed files is not
146 # helpful, since we need to emit the merge if a file
146 # helpful, since we need to emit the merge if a file
147 # in the narrow spec has changed on either side of the
147 # in the narrow spec has changed on either side of the
148 # merge. As a result, we do a manifest diff to check.
148 # merge. As a result, we do a manifest diff to check.
149 p2mf = mfl[cl.changelogrevision(ps[1]).manifest].read()
149 p2mf = mfl[cl.changelogrevision(ps[1]).manifest].read()
150 needed = bool(curmf.diff(p2mf, match))
150 needed = bool(curmf.diff(p2mf, match))
151 else:
151 else:
152 # For a root node, we need to include the node if any
152 # For a root node, we need to include the node if any
153 # files in the node match the narrowspec.
153 # files in the node match the narrowspec.
154 needed = any(curmf.walk(match))
154 needed = any(curmf.walk(match))
155
155
156 if needed:
156 if needed:
157 for head in ellipsisheads[rev]:
157 for head in ellipsisheads[rev]:
158 addroot(head, rev)
158 addroot(head, rev)
159 for p in ps:
159 for p in ps:
160 required.add(p)
160 required.add(p)
161 relevant_nodes.add(cl.node(rev))
161 relevant_nodes.add(cl.node(rev))
162 else:
162 else:
163 if not ps:
163 if not ps:
164 ps = [nullrev]
164 ps = [nullrev]
165 if rev in required:
165 if rev in required:
166 for head in ellipsisheads[rev]:
166 for head in ellipsisheads[rev]:
167 addroot(head, rev)
167 addroot(head, rev)
168 for p in ps:
168 for p in ps:
169 ellipsisheads[p].add(rev)
169 ellipsisheads[p].add(rev)
170 else:
170 else:
171 for p in ps:
171 for p in ps:
172 ellipsisheads[p] |= ellipsisheads[rev]
172 ellipsisheads[p] |= ellipsisheads[rev]
173
173
174 # add common changesets as roots of their reachable ellipsis heads
174 # add common changesets as roots of their reachable ellipsis heads
175 for c in commonrevs:
175 for c in commonrevs:
176 for head in ellipsisheads[c]:
176 for head in ellipsisheads[c]:
177 addroot(head, c)
177 addroot(head, c)
178 return visitnodes, relevant_nodes, ellipsisroots
178 return visitnodes, relevant_nodes, ellipsisroots
179
179
180 def _packellipsischangegroup(repo, common, match, relevant_nodes,
180 def _packellipsischangegroup(repo, common, match, relevant_nodes,
181 ellipsisroots, visitnodes, depth, source, version):
181 ellipsisroots, visitnodes, depth, source, version):
182 if version in ('01', '02'):
182 if version in ('01', '02'):
183 raise error.Abort(
183 raise error.Abort(
184 'ellipsis nodes require at least cg3 on client and server, '
184 'ellipsis nodes require at least cg3 on client and server, '
185 'but negotiated version %s' % version)
185 'but negotiated version %s' % version)
186 # We wrap cg1packer.revchunk, using a side channel to pass
186 # We wrap cg1packer.revchunk, using a side channel to pass
187 # relevant_nodes into that area. Then if linknode isn't in the
187 # relevant_nodes into that area. Then if linknode isn't in the
188 # set, we know we have an ellipsis node and we should defer
188 # set, we know we have an ellipsis node and we should defer
189 # sending that node's data. We override close() to detect
189 # sending that node's data. We override close() to detect
190 # pending ellipsis nodes and flush them.
190 # pending ellipsis nodes and flush them.
191 packer = changegroup.getbundler(version, repo)
191 packer = changegroup.getbundler(version, repo)
192 # Let the packer have access to the narrow matcher so it can
192 # Let the packer have access to the narrow matcher so it can
193 # omit filelogs and dirlogs as needed
193 # omit filelogs and dirlogs as needed
194 packer._narrow_matcher = lambda : match
194 packer._narrow_matcher = lambda : match
195 # Give the packer the list of nodes which should not be
195 # Give the packer the list of nodes which should not be
196 # ellipsis nodes. We store this rather than the set of nodes
196 # ellipsis nodes. We store this rather than the set of nodes
197 # that should be an ellipsis because for very large histories
197 # that should be an ellipsis because for very large histories
198 # we expect this to be significantly smaller.
198 # we expect this to be significantly smaller.
199 packer.full_nodes = relevant_nodes
199 packer.full_nodes = relevant_nodes
200 # Maps ellipsis revs to their roots at the changelog level.
200 # Maps ellipsis revs to their roots at the changelog level.
201 packer.precomputed_ellipsis = ellipsisroots
201 packer.precomputed_ellipsis = ellipsisroots
202 # Maps CL revs to per-revlog revisions. Cleared in close() at
202 # Maps CL revs to per-revlog revisions. Cleared in close() at
203 # the end of each group.
203 # the end of each group.
204 packer.clrev_to_localrev = {}
204 packer.clrev_to_localrev = {}
205 packer.next_clrev_to_localrev = {}
205 packer.next_clrev_to_localrev = {}
206 # Maps changelog nodes to changelog revs. Filled in once
206 # Maps changelog nodes to changelog revs. Filled in once
207 # during changelog stage and then left unmodified.
207 # during changelog stage and then left unmodified.
208 packer.clnode_to_rev = {}
208 packer.clnode_to_rev = {}
209 packer.changelog_done = False
209 packer.changelog_done = False
210 # If true, informs the packer that it is serving shallow content and might
210 # If true, informs the packer that it is serving shallow content and might
211 # need to pack file contents not introduced by the changes being packed.
211 # need to pack file contents not introduced by the changes being packed.
212 packer.is_shallow = depth is not None
212 packer.is_shallow = depth is not None
213
213
214 return packer.generate(common, visitnodes, False, source)
214 return packer.generate(common, visitnodes, False, source)
215
215
216 # Serve a changegroup for a client with a narrow clone.
216 # Serve a changegroup for a client with a narrow clone.
217 def getbundlechangegrouppart_narrow(bundler, repo, source,
217 def getbundlechangegrouppart_narrow(bundler, repo, source,
218 bundlecaps=None, b2caps=None, heads=None,
218 bundlecaps=None, b2caps=None, heads=None,
219 common=None, **kwargs):
219 common=None, **kwargs):
220 cgversions = b2caps.get('changegroup')
220 cgversions = b2caps.get('changegroup')
221 if cgversions: # 3.1 and 3.2 ship with an empty value
221 if cgversions: # 3.1 and 3.2 ship with an empty value
222 cgversions = [v for v in cgversions
222 cgversions = [v for v in cgversions
223 if v in changegroup.supportedoutgoingversions(repo)]
223 if v in changegroup.supportedoutgoingversions(repo)]
224 if not cgversions:
224 if not cgversions:
225 raise ValueError(_('no common changegroup version'))
225 raise ValueError(_('no common changegroup version'))
226 version = max(cgversions)
226 version = max(cgversions)
227 else:
227 else:
228 raise ValueError(_("server does not advertise changegroup version,"
228 raise ValueError(_("server does not advertise changegroup version,"
229 " can't negotiate support for ellipsis nodes"))
229 " can't negotiate support for ellipsis nodes"))
230
230
231 include = sorted(filter(bool, kwargs.get(r'includepats', [])))
231 include = sorted(filter(bool, kwargs.get(r'includepats', [])))
232 exclude = sorted(filter(bool, kwargs.get(r'excludepats', [])))
232 exclude = sorted(filter(bool, kwargs.get(r'excludepats', [])))
233 newmatch = narrowspec.match(repo.root, include=include, exclude=exclude)
233 newmatch = narrowspec.match(repo.root, include=include, exclude=exclude)
234 if not repo.ui.configbool("experimental", "narrowservebrokenellipses"):
234 if not repo.ui.configbool("experimental", "narrowservebrokenellipses"):
235 outgoing = exchange._computeoutgoing(repo, heads, common)
235 outgoing = exchange._computeoutgoing(repo, heads, common)
236 if not outgoing.missing:
236 if not outgoing.missing:
237 return
237 return
238 def wrappedgetbundler(orig, *args, **kwargs):
238 def wrappedgetbundler(orig, *args, **kwargs):
239 bundler = orig(*args, **kwargs)
239 bundler = orig(*args, **kwargs)
240 bundler._narrow_matcher = lambda : newmatch
240 bundler._narrow_matcher = lambda : newmatch
241 return bundler
241 return bundler
242 with extensions.wrappedfunction(changegroup, 'getbundler',
242 with extensions.wrappedfunction(changegroup, 'getbundler',
243 wrappedgetbundler):
243 wrappedgetbundler):
244 cg = changegroup.makestream(repo, outgoing, version, source)
244 cg = changegroup.makestream(repo, outgoing, version, source)
245 part = bundler.newpart('changegroup', data=cg)
245 part = bundler.newpart('changegroup', data=cg)
246 part.addparam('version', version)
246 part.addparam('version', version)
247 if 'treemanifest' in repo.requirements:
247 if 'treemanifest' in repo.requirements:
248 part.addparam('treemanifest', '1')
248 part.addparam('treemanifest', '1')
249
249
250 if include or exclude:
250 if include or exclude:
251 narrowspecpart = bundler.newpart(_SPECPART)
251 narrowspecpart = bundler.newpart(_SPECPART)
252 if include:
252 if include:
253 narrowspecpart.addparam(
253 narrowspecpart.addparam(
254 _SPECPART_INCLUDE, '\n'.join(include), mandatory=True)
254 _SPECPART_INCLUDE, '\n'.join(include), mandatory=True)
255 if exclude:
255 if exclude:
256 narrowspecpart.addparam(
256 narrowspecpart.addparam(
257 _SPECPART_EXCLUDE, '\n'.join(exclude), mandatory=True)
257 _SPECPART_EXCLUDE, '\n'.join(exclude), mandatory=True)
258
258
259 return
259 return
260
260
261 depth = kwargs.get(r'depth', None)
261 depth = kwargs.get(r'depth', None)
262 if depth is not None:
262 if depth is not None:
263 depth = int(depth)
263 depth = int(depth)
264 if depth < 1:
264 if depth < 1:
265 raise error.Abort(_('depth must be positive, got %d') % depth)
265 raise error.Abort(_('depth must be positive, got %d') % depth)
266
266
267 heads = set(heads or repo.heads())
267 heads = set(heads or repo.heads())
268 common = set(common or [nullid])
268 common = set(common or [nullid])
269 oldinclude = sorted(filter(bool, kwargs.get(r'oldincludepats', [])))
269 oldinclude = sorted(filter(bool, kwargs.get(r'oldincludepats', [])))
270 oldexclude = sorted(filter(bool, kwargs.get(r'oldexcludepats', [])))
270 oldexclude = sorted(filter(bool, kwargs.get(r'oldexcludepats', [])))
271 known = {bin(n) for n in kwargs.get(r'known', [])}
271 known = {bin(n) for n in kwargs.get(r'known', [])}
272 if known and (oldinclude != include or oldexclude != exclude):
272 if known and (oldinclude != include or oldexclude != exclude):
273 # Steps:
273 # Steps:
274 # 1. Send kill for "$known & ::common"
274 # 1. Send kill for "$known & ::common"
275 #
275 #
276 # 2. Send changegroup for ::common
276 # 2. Send changegroup for ::common
277 #
277 #
278 # 3. Proceed.
278 # 3. Proceed.
279 #
279 #
280 # In the future, we can send kills for only the specific
280 # In the future, we can send kills for only the specific
281 # nodes we know should go away or change shape, and then
281 # nodes we know should go away or change shape, and then
282 # send a data stream that tells the client something like this:
282 # send a data stream that tells the client something like this:
283 #
283 #
284 # a) apply this changegroup
284 # a) apply this changegroup
285 # b) apply nodes XXX, YYY, ZZZ that you already have
285 # b) apply nodes XXX, YYY, ZZZ that you already have
286 # c) goto a
286 # c) goto a
287 #
287 #
288 # until they've built up the full new state.
288 # until they've built up the full new state.
289 # Convert to revnums and intersect with "common". The client should
289 # Convert to revnums and intersect with "common". The client should
290 # have made it a subset of "common" already, but let's be safe.
290 # have made it a subset of "common" already, but let's be safe.
291 known = set(repo.revs("%ln & ::%ln", known, common))
291 known = set(repo.revs("%ln & ::%ln", known, common))
292 # TODO: we could send only roots() of this set, and the
292 # TODO: we could send only roots() of this set, and the
293 # list of nodes in common, and the client could work out
293 # list of nodes in common, and the client could work out
294 # what to strip, instead of us explicitly sending every
294 # what to strip, instead of us explicitly sending every
295 # single node.
295 # single node.
296 deadrevs = known
296 deadrevs = known
297 def genkills():
297 def genkills():
298 for r in deadrevs:
298 for r in deadrevs:
299 yield _KILLNODESIGNAL
299 yield _KILLNODESIGNAL
300 yield repo.changelog.node(r)
300 yield repo.changelog.node(r)
301 yield _DONESIGNAL
301 yield _DONESIGNAL
302 bundler.newpart(_CHANGESPECPART, data=genkills())
302 bundler.newpart(_CHANGESPECPART, data=genkills())
303 newvisit, newfull, newellipsis = _computeellipsis(
303 newvisit, newfull, newellipsis = _computeellipsis(
304 repo, set(), common, known, newmatch)
304 repo, set(), common, known, newmatch)
305 if newvisit:
305 if newvisit:
306 cg = _packellipsischangegroup(
306 cg = _packellipsischangegroup(
307 repo, common, newmatch, newfull, newellipsis,
307 repo, common, newmatch, newfull, newellipsis,
308 newvisit, depth, source, version)
308 newvisit, depth, source, version)
309 part = bundler.newpart('changegroup', data=cg)
309 part = bundler.newpart('changegroup', data=cg)
310 part.addparam('version', version)
310 part.addparam('version', version)
311 if 'treemanifest' in repo.requirements:
311 if 'treemanifest' in repo.requirements:
312 part.addparam('treemanifest', '1')
312 part.addparam('treemanifest', '1')
313
313
314 visitnodes, relevant_nodes, ellipsisroots = _computeellipsis(
314 visitnodes, relevant_nodes, ellipsisroots = _computeellipsis(
315 repo, common, heads, set(), newmatch, depth=depth)
315 repo, common, heads, set(), newmatch, depth=depth)
316
316
317 repo.ui.debug('Found %d relevant revs\n' % len(relevant_nodes))
317 repo.ui.debug('Found %d relevant revs\n' % len(relevant_nodes))
318 if visitnodes:
318 if visitnodes:
319 cg = _packellipsischangegroup(
319 cg = _packellipsischangegroup(
320 repo, common, newmatch, relevant_nodes, ellipsisroots,
320 repo, common, newmatch, relevant_nodes, ellipsisroots,
321 visitnodes, depth, source, version)
321 visitnodes, depth, source, version)
322 part = bundler.newpart('changegroup', data=cg)
322 part = bundler.newpart('changegroup', data=cg)
323 part.addparam('version', version)
323 part.addparam('version', version)
324 if 'treemanifest' in repo.requirements:
324 if 'treemanifest' in repo.requirements:
325 part.addparam('treemanifest', '1')
325 part.addparam('treemanifest', '1')
326
326
327 def applyacl_narrow(repo, kwargs):
327 def applyacl_narrow(repo, kwargs):
328 ui = repo.ui
328 ui = repo.ui
329 username = ui.shortuser(ui.environ.get('REMOTE_USER') or ui.username())
329 username = ui.shortuser(ui.environ.get('REMOTE_USER') or ui.username())
330 user_includes = ui.configlist(
330 user_includes = ui.configlist(
331 _NARROWACL_SECTION, username + '.includes',
331 _NARROWACL_SECTION, username + '.includes',
332 ui.configlist(_NARROWACL_SECTION, 'default.includes'))
332 ui.configlist(_NARROWACL_SECTION, 'default.includes'))
333 user_excludes = ui.configlist(
333 user_excludes = ui.configlist(
334 _NARROWACL_SECTION, username + '.excludes',
334 _NARROWACL_SECTION, username + '.excludes',
335 ui.configlist(_NARROWACL_SECTION, 'default.excludes'))
335 ui.configlist(_NARROWACL_SECTION, 'default.excludes'))
336 if not user_includes:
336 if not user_includes:
337 raise error.Abort(_("{} configuration for user {} is empty")
337 raise error.Abort(_("{} configuration for user {} is empty")
338 .format(_NARROWACL_SECTION, username))
338 .format(_NARROWACL_SECTION, username))
339
339
340 user_includes = [
340 user_includes = [
341 'path:.' if p == '*' else 'path:' + p for p in user_includes]
341 'path:.' if p == '*' else 'path:' + p for p in user_includes]
342 user_excludes = [
342 user_excludes = [
343 'path:.' if p == '*' else 'path:' + p for p in user_excludes]
343 'path:.' if p == '*' else 'path:' + p for p in user_excludes]
344
344
345 req_includes = set(kwargs.get(r'includepats', []))
345 req_includes = set(kwargs.get(r'includepats', []))
346 req_excludes = set(kwargs.get(r'excludepats', []))
346 req_excludes = set(kwargs.get(r'excludepats', []))
347
347
348 req_includes, req_excludes, invalid_includes = narrowspec.restrictpatterns(
348 req_includes, req_excludes, invalid_includes = narrowspec.restrictpatterns(
349 req_includes, req_excludes, user_includes, user_excludes)
349 req_includes, req_excludes, user_includes, user_excludes)
350
350
351 if invalid_includes:
351 if invalid_includes:
352 raise error.Abort(
352 raise error.Abort(
353 _("The following includes are not accessible for {}: {}")
353 _("The following includes are not accessible for {}: {}")
354 .format(username, invalid_includes))
354 .format(username, invalid_includes))
355
355
356 new_args = {}
356 new_args = {}
357 new_args.update(kwargs)
357 new_args.update(kwargs)
358 new_args['includepats'] = req_includes
358 new_args['includepats'] = req_includes
359 if req_excludes:
359 if req_excludes:
360 new_args['excludepats'] = req_excludes
360 new_args['excludepats'] = req_excludes
361 return new_args
361 return new_args
362
362
363 @bundle2.parthandler(_SPECPART, (_SPECPART_INCLUDE, _SPECPART_EXCLUDE))
363 @bundle2.parthandler(_SPECPART, (_SPECPART_INCLUDE, _SPECPART_EXCLUDE))
364 def _handlechangespec_2(op, inpart):
364 def _handlechangespec_2(op, inpart):
365 includepats = set(inpart.params.get(_SPECPART_INCLUDE, '').splitlines())
365 includepats = set(inpart.params.get(_SPECPART_INCLUDE, '').splitlines())
366 excludepats = set(inpart.params.get(_SPECPART_EXCLUDE, '').splitlines())
366 excludepats = set(inpart.params.get(_SPECPART_EXCLUDE, '').splitlines())
367 if not changegroup.NARROW_REQUIREMENT in op.repo.requirements:
367 if not changegroup.NARROW_REQUIREMENT in op.repo.requirements:
368 op.repo.requirements.add(changegroup.NARROW_REQUIREMENT)
368 op.repo.requirements.add(changegroup.NARROW_REQUIREMENT)
369 op.repo._writerequirements()
369 op.repo._writerequirements()
370 op.repo.setnarrowpats(includepats, excludepats)
370 op.repo.setnarrowpats(includepats, excludepats)
371
371
372 @bundle2.parthandler(_CHANGESPECPART)
372 @bundle2.parthandler(_CHANGESPECPART)
373 def _handlechangespec(op, inpart):
373 def _handlechangespec(op, inpart):
374 repo = op.repo
374 repo = op.repo
375 cl = repo.changelog
375 cl = repo.changelog
376
376
377 # changesets which need to be stripped entirely. either they're no longer
377 # changesets which need to be stripped entirely. either they're no longer
378 # needed in the new narrow spec, or the server is sending a replacement
378 # needed in the new narrow spec, or the server is sending a replacement
379 # in the changegroup part.
379 # in the changegroup part.
380 clkills = set()
380 clkills = set()
381
381
382 # A changespec part contains all the updates to ellipsis nodes
382 # A changespec part contains all the updates to ellipsis nodes
383 # that will happen as a result of widening or narrowing a
383 # that will happen as a result of widening or narrowing a
384 # repo. All the changes that this block encounters are ellipsis
384 # repo. All the changes that this block encounters are ellipsis
385 # nodes or flags to kill an existing ellipsis.
385 # nodes or flags to kill an existing ellipsis.
386 chunksignal = changegroup.readexactly(inpart, 4)
386 chunksignal = changegroup.readexactly(inpart, 4)
387 while chunksignal != _DONESIGNAL:
387 while chunksignal != _DONESIGNAL:
388 if chunksignal == _KILLNODESIGNAL:
388 if chunksignal == _KILLNODESIGNAL:
389 # a node used to be an ellipsis but isn't anymore
389 # a node used to be an ellipsis but isn't anymore
390 ck = changegroup.readexactly(inpart, 20)
390 ck = changegroup.readexactly(inpart, 20)
391 if cl.hasnode(ck):
391 if cl.hasnode(ck):
392 clkills.add(ck)
392 clkills.add(ck)
393 else:
393 else:
394 raise error.Abort(
394 raise error.Abort(
395 _('unexpected changespec node chunk type: %s') % chunksignal)
395 _('unexpected changespec node chunk type: %s') % chunksignal)
396 chunksignal = changegroup.readexactly(inpart, 4)
396 chunksignal = changegroup.readexactly(inpart, 4)
397
397
398 if clkills:
398 if clkills:
399 # preserve bookmarks that repair.strip() would otherwise strip
399 # preserve bookmarks that repair.strip() would otherwise strip
400 bmstore = repo._bookmarks
400 bmstore = repo._bookmarks
401 class dummybmstore(dict):
401 class dummybmstore(dict):
402 def applychanges(self, repo, tr, changes):
402 def applychanges(self, repo, tr, changes):
403 pass
403 pass
404 def recordchange(self, tr): # legacy version
404 def recordchange(self, tr): # legacy version
405 pass
405 pass
406 repo._bookmarks = dummybmstore()
406 repo._bookmarks = dummybmstore()
407 chgrpfile = repair.strip(op.ui, repo, list(clkills), backup=True,
407 chgrpfile = repair.strip(op.ui, repo, list(clkills), backup=True,
408 topic='widen')
408 topic='widen')
409 repo._bookmarks = bmstore
409 repo._bookmarks = bmstore
410 if chgrpfile:
410 if chgrpfile:
411 # presence of _widen_bundle attribute activates widen handler later
411 # presence of _widen_bundle attribute activates widen handler later
412 op._widen_bundle = chgrpfile
412 op._widen_bundle = chgrpfile
413 # Set the new narrowspec if we're widening. The setnewnarrowpats() method
413 # Set the new narrowspec if we're widening. The setnewnarrowpats() method
414 # will currently always be there when using the core+narrowhg server, but
414 # will currently always be there when using the core+narrowhg server, but
415 # other servers may include a changespec part even when not widening (e.g.
415 # other servers may include a changespec part even when not widening (e.g.
416 # because we're deepening a shallow repo).
416 # because we're deepening a shallow repo).
417 if util.safehasattr(repo, 'setnewnarrowpats'):
417 if util.safehasattr(repo, 'setnewnarrowpats'):
418 repo.setnewnarrowpats()
418 repo.setnewnarrowpats()
419
419
420 def handlechangegroup_widen(op, inpart):
420 def handlechangegroup_widen(op, inpart):
421 """Changegroup exchange handler which restores temporarily-stripped nodes"""
421 """Changegroup exchange handler which restores temporarily-stripped nodes"""
422 # We saved a bundle with stripped node data we must now restore.
422 # We saved a bundle with stripped node data we must now restore.
423 # This approach is based on mercurial/repair.py@6ee26a53c111.
423 # This approach is based on mercurial/repair.py@6ee26a53c111.
424 repo = op.repo
424 repo = op.repo
425 ui = op.ui
425 ui = op.ui
426
426
427 chgrpfile = op._widen_bundle
427 chgrpfile = op._widen_bundle
428 del op._widen_bundle
428 del op._widen_bundle
429 vfs = repo.vfs
429 vfs = repo.vfs
430
430
431 ui.note(_("adding branch\n"))
431 ui.note(_("adding branch\n"))
432 f = vfs.open(chgrpfile, "rb")
432 f = vfs.open(chgrpfile, "rb")
433 try:
433 try:
434 gen = exchange.readbundle(ui, f, chgrpfile, vfs)
434 gen = exchange.readbundle(ui, f, chgrpfile, vfs)
435 if not ui.verbose:
435 if not ui.verbose:
436 # silence internal shuffling chatter
436 # silence internal shuffling chatter
437 ui.pushbuffer()
437 ui.pushbuffer()
438 if isinstance(gen, bundle2.unbundle20):
438 if isinstance(gen, bundle2.unbundle20):
439 with repo.transaction('strip') as tr:
439 with repo.transaction('strip') as tr:
440 bundle2.processbundle(repo, gen, lambda: tr)
440 bundle2.processbundle(repo, gen, lambda: tr)
441 else:
441 else:
442 gen.apply(repo, 'strip', 'bundle:' + vfs.join(chgrpfile), True)
442 gen.apply(repo, 'strip', 'bundle:' + vfs.join(chgrpfile), True)
443 if not ui.verbose:
443 if not ui.verbose:
444 ui.popbuffer()
444 ui.popbuffer()
445 finally:
445 finally:
446 f.close()
446 f.close()
447
447
448 # remove undo files
448 # remove undo files
449 for undovfs, undofile in repo.undofiles():
449 for undovfs, undofile in repo.undofiles():
450 try:
450 try:
451 undovfs.unlink(undofile)
451 undovfs.unlink(undofile)
452 except OSError as e:
452 except OSError as e:
453 if e.errno != errno.ENOENT:
453 if e.errno != errno.ENOENT:
454 ui.warn(_('error removing %s: %s\n') %
454 ui.warn(_('error removing %s: %s\n') %
455 (undovfs.join(undofile), stringutil.forcebytestr(e)))
455 (undovfs.join(undofile), stringutil.forcebytestr(e)))
456
456
457 # Remove partial backup only if there were no exceptions
457 # Remove partial backup only if there were no exceptions
458 vfs.unlink(chgrpfile)
458 vfs.unlink(chgrpfile)
459
459
460 def setup():
460 def setup():
461 """Enable narrow repo support in bundle2-related extension points."""
461 """Enable narrow repo support in bundle2-related extension points."""
462 extensions.wrapfunction(bundle2, 'getrepocaps', getrepocaps_narrow)
462 extensions.wrapfunction(bundle2, 'getrepocaps', getrepocaps_narrow)
463
463
464 wireproto.gboptsmap['narrow'] = 'boolean'
464 getbundleargs = wireprototypes.GETBUNDLE_ARGUMENTS
465 wireproto.gboptsmap['depth'] = 'plain'
465
466 wireproto.gboptsmap['oldincludepats'] = 'csv'
466 getbundleargs['narrow'] = 'boolean'
467 wireproto.gboptsmap['oldexcludepats'] = 'csv'
467 getbundleargs['depth'] = 'plain'
468 wireproto.gboptsmap['includepats'] = 'csv'
468 getbundleargs['oldincludepats'] = 'csv'
469 wireproto.gboptsmap['excludepats'] = 'csv'
469 getbundleargs['oldexcludepats'] = 'csv'
470 wireproto.gboptsmap['known'] = 'csv'
470 getbundleargs['includepats'] = 'csv'
471 getbundleargs['excludepats'] = 'csv'
472 getbundleargs['known'] = 'csv'
471
473
472 # Extend changegroup serving to handle requests from narrow clients.
474 # Extend changegroup serving to handle requests from narrow clients.
473 origcgfn = exchange.getbundle2partsmapping['changegroup']
475 origcgfn = exchange.getbundle2partsmapping['changegroup']
474 def wrappedcgfn(*args, **kwargs):
476 def wrappedcgfn(*args, **kwargs):
475 repo = args[1]
477 repo = args[1]
476 if repo.ui.has_section(_NARROWACL_SECTION):
478 if repo.ui.has_section(_NARROWACL_SECTION):
477 getbundlechangegrouppart_narrow(
479 getbundlechangegrouppart_narrow(
478 *args, **applyacl_narrow(repo, kwargs))
480 *args, **applyacl_narrow(repo, kwargs))
479 elif kwargs.get(r'narrow', False):
481 elif kwargs.get(r'narrow', False):
480 getbundlechangegrouppart_narrow(*args, **kwargs)
482 getbundlechangegrouppart_narrow(*args, **kwargs)
481 else:
483 else:
482 origcgfn(*args, **kwargs)
484 origcgfn(*args, **kwargs)
483 exchange.getbundle2partsmapping['changegroup'] = wrappedcgfn
485 exchange.getbundle2partsmapping['changegroup'] = wrappedcgfn
484
486
485 # disable rev branch cache exchange when serving a narrow bundle
487 # disable rev branch cache exchange when serving a narrow bundle
486 # (currently incompatible with that part)
488 # (currently incompatible with that part)
487 origrbcfn = exchange.getbundle2partsmapping['cache:rev-branch-cache']
489 origrbcfn = exchange.getbundle2partsmapping['cache:rev-branch-cache']
488 def wrappedcgfn(*args, **kwargs):
490 def wrappedcgfn(*args, **kwargs):
489 repo = args[1]
491 repo = args[1]
490 if repo.ui.has_section(_NARROWACL_SECTION):
492 if repo.ui.has_section(_NARROWACL_SECTION):
491 return
493 return
492 elif kwargs.get(r'narrow', False):
494 elif kwargs.get(r'narrow', False):
493 return
495 return
494 else:
496 else:
495 origrbcfn(*args, **kwargs)
497 origrbcfn(*args, **kwargs)
496 exchange.getbundle2partsmapping['cache:rev-branch-cache'] = wrappedcgfn
498 exchange.getbundle2partsmapping['cache:rev-branch-cache'] = wrappedcgfn
497
499
498 # Extend changegroup receiver so client can fixup after widen requests.
500 # Extend changegroup receiver so client can fixup after widen requests.
499 origcghandler = bundle2.parthandlermapping['changegroup']
501 origcghandler = bundle2.parthandlermapping['changegroup']
500 def wrappedcghandler(op, inpart):
502 def wrappedcghandler(op, inpart):
501 origcghandler(op, inpart)
503 origcghandler(op, inpart)
502 if util.safehasattr(op, '_widen_bundle'):
504 if util.safehasattr(op, '_widen_bundle'):
503 handlechangegroup_widen(op, inpart)
505 handlechangegroup_widen(op, inpart)
504 wrappedcghandler.params = origcghandler.params
506 wrappedcghandler.params = origcghandler.params
505 bundle2.parthandlermapping['changegroup'] = wrappedcghandler
507 bundle2.parthandlermapping['changegroup'] = wrappedcghandler
@@ -1,1287 +1,1265 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import os
11 import os
12 import tempfile
12 import tempfile
13
13
14 from .i18n import _
14 from .i18n import _
15 from .node import (
15 from .node import (
16 bin,
16 bin,
17 hex,
17 hex,
18 nullid,
18 nullid,
19 )
19 )
20
20
21 from . import (
21 from . import (
22 bundle2,
22 bundle2,
23 changegroup as changegroupmod,
23 changegroup as changegroupmod,
24 discovery,
24 discovery,
25 encoding,
25 encoding,
26 error,
26 error,
27 exchange,
27 exchange,
28 peer,
28 peer,
29 pushkey as pushkeymod,
29 pushkey as pushkeymod,
30 pycompat,
30 pycompat,
31 repository,
31 repository,
32 streamclone,
32 streamclone,
33 util,
33 util,
34 wireprototypes,
34 wireprototypes,
35 )
35 )
36
36
37 from .utils import (
37 from .utils import (
38 procutil,
38 procutil,
39 stringutil,
39 stringutil,
40 )
40 )
41
41
42 urlerr = util.urlerr
42 urlerr = util.urlerr
43 urlreq = util.urlreq
43 urlreq = util.urlreq
44
44
45 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
45 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
46 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
46 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
47 'IncompatibleClient')
47 'IncompatibleClient')
48 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
48 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
49
49
50 class remoteiterbatcher(peer.iterbatcher):
50 class remoteiterbatcher(peer.iterbatcher):
51 def __init__(self, remote):
51 def __init__(self, remote):
52 super(remoteiterbatcher, self).__init__()
52 super(remoteiterbatcher, self).__init__()
53 self._remote = remote
53 self._remote = remote
54
54
55 def __getattr__(self, name):
55 def __getattr__(self, name):
56 # Validate this method is batchable, since submit() only supports
56 # Validate this method is batchable, since submit() only supports
57 # batchable methods.
57 # batchable methods.
58 fn = getattr(self._remote, name)
58 fn = getattr(self._remote, name)
59 if not getattr(fn, 'batchable', None):
59 if not getattr(fn, 'batchable', None):
60 raise error.ProgrammingError('Attempted to batch a non-batchable '
60 raise error.ProgrammingError('Attempted to batch a non-batchable '
61 'call to %r' % name)
61 'call to %r' % name)
62
62
63 return super(remoteiterbatcher, self).__getattr__(name)
63 return super(remoteiterbatcher, self).__getattr__(name)
64
64
65 def submit(self):
65 def submit(self):
66 """Break the batch request into many patch calls and pipeline them.
66 """Break the batch request into many patch calls and pipeline them.
67
67
68 This is mostly valuable over http where request sizes can be
68 This is mostly valuable over http where request sizes can be
69 limited, but can be used in other places as well.
69 limited, but can be used in other places as well.
70 """
70 """
71 # 2-tuple of (command, arguments) that represents what will be
71 # 2-tuple of (command, arguments) that represents what will be
72 # sent over the wire.
72 # sent over the wire.
73 requests = []
73 requests = []
74
74
75 # 4-tuple of (command, final future, @batchable generator, remote
75 # 4-tuple of (command, final future, @batchable generator, remote
76 # future).
76 # future).
77 results = []
77 results = []
78
78
79 for command, args, opts, finalfuture in self.calls:
79 for command, args, opts, finalfuture in self.calls:
80 mtd = getattr(self._remote, command)
80 mtd = getattr(self._remote, command)
81 batchable = mtd.batchable(mtd.__self__, *args, **opts)
81 batchable = mtd.batchable(mtd.__self__, *args, **opts)
82
82
83 commandargs, fremote = next(batchable)
83 commandargs, fremote = next(batchable)
84 assert fremote
84 assert fremote
85 requests.append((command, commandargs))
85 requests.append((command, commandargs))
86 results.append((command, finalfuture, batchable, fremote))
86 results.append((command, finalfuture, batchable, fremote))
87
87
88 if requests:
88 if requests:
89 self._resultiter = self._remote._submitbatch(requests)
89 self._resultiter = self._remote._submitbatch(requests)
90
90
91 self._results = results
91 self._results = results
92
92
93 def results(self):
93 def results(self):
94 for command, finalfuture, batchable, remotefuture in self._results:
94 for command, finalfuture, batchable, remotefuture in self._results:
95 # Get the raw result, set it in the remote future, feed it
95 # Get the raw result, set it in the remote future, feed it
96 # back into the @batchable generator so it can be decoded, and
96 # back into the @batchable generator so it can be decoded, and
97 # set the result on the final future to this value.
97 # set the result on the final future to this value.
98 remoteresult = next(self._resultiter)
98 remoteresult = next(self._resultiter)
99 remotefuture.set(remoteresult)
99 remotefuture.set(remoteresult)
100 finalfuture.set(next(batchable))
100 finalfuture.set(next(batchable))
101
101
102 # Verify our @batchable generators only emit 2 values.
102 # Verify our @batchable generators only emit 2 values.
103 try:
103 try:
104 next(batchable)
104 next(batchable)
105 except StopIteration:
105 except StopIteration:
106 pass
106 pass
107 else:
107 else:
108 raise error.ProgrammingError('%s @batchable generator emitted '
108 raise error.ProgrammingError('%s @batchable generator emitted '
109 'unexpected value count' % command)
109 'unexpected value count' % command)
110
110
111 yield finalfuture.value
111 yield finalfuture.value
112
112
113 # Forward a couple of names from peer to make wireproto interactions
113 # Forward a couple of names from peer to make wireproto interactions
114 # slightly more sensible.
114 # slightly more sensible.
115 batchable = peer.batchable
115 batchable = peer.batchable
116 future = peer.future
116 future = peer.future
117
117
118
118
119 def encodebatchcmds(req):
119 def encodebatchcmds(req):
120 """Return a ``cmds`` argument value for the ``batch`` command."""
120 """Return a ``cmds`` argument value for the ``batch`` command."""
121 escapearg = wireprototypes.escapebatcharg
121 escapearg = wireprototypes.escapebatcharg
122
122
123 cmds = []
123 cmds = []
124 for op, argsdict in req:
124 for op, argsdict in req:
125 # Old servers didn't properly unescape argument names. So prevent
125 # Old servers didn't properly unescape argument names. So prevent
126 # the sending of argument names that may not be decoded properly by
126 # the sending of argument names that may not be decoded properly by
127 # servers.
127 # servers.
128 assert all(escapearg(k) == k for k in argsdict)
128 assert all(escapearg(k) == k for k in argsdict)
129
129
130 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
130 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
131 for k, v in argsdict.iteritems())
131 for k, v in argsdict.iteritems())
132 cmds.append('%s %s' % (op, args))
132 cmds.append('%s %s' % (op, args))
133
133
134 return ';'.join(cmds)
134 return ';'.join(cmds)
135
135
136 def clientcompressionsupport(proto):
136 def clientcompressionsupport(proto):
137 """Returns a list of compression methods supported by the client.
137 """Returns a list of compression methods supported by the client.
138
138
139 Returns a list of the compression methods supported by the client
139 Returns a list of the compression methods supported by the client
140 according to the protocol capabilities. If no such capability has
140 according to the protocol capabilities. If no such capability has
141 been announced, fallback to the default of zlib and uncompressed.
141 been announced, fallback to the default of zlib and uncompressed.
142 """
142 """
143 for cap in proto.getprotocaps():
143 for cap in proto.getprotocaps():
144 if cap.startswith('comp='):
144 if cap.startswith('comp='):
145 return cap[5:].split(',')
145 return cap[5:].split(',')
146 return ['zlib', 'none']
146 return ['zlib', 'none']
147
147
148 # mapping of options accepted by getbundle and their types
149 #
150 # Meant to be extended by extensions. It is extensions responsibility to ensure
151 # such options are properly processed in exchange.getbundle.
152 #
153 # supported types are:
154 #
155 # :nodes: list of binary nodes
156 # :csv: list of comma-separated values
157 # :scsv: list of comma-separated values return as set
158 # :plain: string with no transformation needed.
159 gboptsmap = {'heads': 'nodes',
160 'bookmarks': 'boolean',
161 'common': 'nodes',
162 'obsmarkers': 'boolean',
163 'phases': 'boolean',
164 'bundlecaps': 'scsv',
165 'listkeys': 'csv',
166 'cg': 'boolean',
167 'cbattempted': 'boolean',
168 'stream': 'boolean',
169 }
170
171 # client side
148 # client side
172
149
173 class wirepeer(repository.legacypeer):
150 class wirepeer(repository.legacypeer):
174 """Client-side interface for communicating with a peer repository.
151 """Client-side interface for communicating with a peer repository.
175
152
176 Methods commonly call wire protocol commands of the same name.
153 Methods commonly call wire protocol commands of the same name.
177
154
178 See also httppeer.py and sshpeer.py for protocol-specific
155 See also httppeer.py and sshpeer.py for protocol-specific
179 implementations of this interface.
156 implementations of this interface.
180 """
157 """
181 # Begin of ipeercommands interface.
158 # Begin of ipeercommands interface.
182
159
183 def iterbatch(self):
160 def iterbatch(self):
184 return remoteiterbatcher(self)
161 return remoteiterbatcher(self)
185
162
186 @batchable
163 @batchable
187 def lookup(self, key):
164 def lookup(self, key):
188 self.requirecap('lookup', _('look up remote revision'))
165 self.requirecap('lookup', _('look up remote revision'))
189 f = future()
166 f = future()
190 yield {'key': encoding.fromlocal(key)}, f
167 yield {'key': encoding.fromlocal(key)}, f
191 d = f.value
168 d = f.value
192 success, data = d[:-1].split(" ", 1)
169 success, data = d[:-1].split(" ", 1)
193 if int(success):
170 if int(success):
194 yield bin(data)
171 yield bin(data)
195 else:
172 else:
196 self._abort(error.RepoError(data))
173 self._abort(error.RepoError(data))
197
174
198 @batchable
175 @batchable
199 def heads(self):
176 def heads(self):
200 f = future()
177 f = future()
201 yield {}, f
178 yield {}, f
202 d = f.value
179 d = f.value
203 try:
180 try:
204 yield wireprototypes.decodelist(d[:-1])
181 yield wireprototypes.decodelist(d[:-1])
205 except ValueError:
182 except ValueError:
206 self._abort(error.ResponseError(_("unexpected response:"), d))
183 self._abort(error.ResponseError(_("unexpected response:"), d))
207
184
208 @batchable
185 @batchable
209 def known(self, nodes):
186 def known(self, nodes):
210 f = future()
187 f = future()
211 yield {'nodes': wireprototypes.encodelist(nodes)}, f
188 yield {'nodes': wireprototypes.encodelist(nodes)}, f
212 d = f.value
189 d = f.value
213 try:
190 try:
214 yield [bool(int(b)) for b in d]
191 yield [bool(int(b)) for b in d]
215 except ValueError:
192 except ValueError:
216 self._abort(error.ResponseError(_("unexpected response:"), d))
193 self._abort(error.ResponseError(_("unexpected response:"), d))
217
194
218 @batchable
195 @batchable
219 def branchmap(self):
196 def branchmap(self):
220 f = future()
197 f = future()
221 yield {}, f
198 yield {}, f
222 d = f.value
199 d = f.value
223 try:
200 try:
224 branchmap = {}
201 branchmap = {}
225 for branchpart in d.splitlines():
202 for branchpart in d.splitlines():
226 branchname, branchheads = branchpart.split(' ', 1)
203 branchname, branchheads = branchpart.split(' ', 1)
227 branchname = encoding.tolocal(urlreq.unquote(branchname))
204 branchname = encoding.tolocal(urlreq.unquote(branchname))
228 branchheads = wireprototypes.decodelist(branchheads)
205 branchheads = wireprototypes.decodelist(branchheads)
229 branchmap[branchname] = branchheads
206 branchmap[branchname] = branchheads
230 yield branchmap
207 yield branchmap
231 except TypeError:
208 except TypeError:
232 self._abort(error.ResponseError(_("unexpected response:"), d))
209 self._abort(error.ResponseError(_("unexpected response:"), d))
233
210
234 @batchable
211 @batchable
235 def listkeys(self, namespace):
212 def listkeys(self, namespace):
236 if not self.capable('pushkey'):
213 if not self.capable('pushkey'):
237 yield {}, None
214 yield {}, None
238 f = future()
215 f = future()
239 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
216 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
240 yield {'namespace': encoding.fromlocal(namespace)}, f
217 yield {'namespace': encoding.fromlocal(namespace)}, f
241 d = f.value
218 d = f.value
242 self.ui.debug('received listkey for "%s": %i bytes\n'
219 self.ui.debug('received listkey for "%s": %i bytes\n'
243 % (namespace, len(d)))
220 % (namespace, len(d)))
244 yield pushkeymod.decodekeys(d)
221 yield pushkeymod.decodekeys(d)
245
222
246 @batchable
223 @batchable
247 def pushkey(self, namespace, key, old, new):
224 def pushkey(self, namespace, key, old, new):
248 if not self.capable('pushkey'):
225 if not self.capable('pushkey'):
249 yield False, None
226 yield False, None
250 f = future()
227 f = future()
251 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
228 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
252 yield {'namespace': encoding.fromlocal(namespace),
229 yield {'namespace': encoding.fromlocal(namespace),
253 'key': encoding.fromlocal(key),
230 'key': encoding.fromlocal(key),
254 'old': encoding.fromlocal(old),
231 'old': encoding.fromlocal(old),
255 'new': encoding.fromlocal(new)}, f
232 'new': encoding.fromlocal(new)}, f
256 d = f.value
233 d = f.value
257 d, output = d.split('\n', 1)
234 d, output = d.split('\n', 1)
258 try:
235 try:
259 d = bool(int(d))
236 d = bool(int(d))
260 except ValueError:
237 except ValueError:
261 raise error.ResponseError(
238 raise error.ResponseError(
262 _('push failed (unexpected response):'), d)
239 _('push failed (unexpected response):'), d)
263 for l in output.splitlines(True):
240 for l in output.splitlines(True):
264 self.ui.status(_('remote: '), l)
241 self.ui.status(_('remote: '), l)
265 yield d
242 yield d
266
243
267 def stream_out(self):
244 def stream_out(self):
268 return self._callstream('stream_out')
245 return self._callstream('stream_out')
269
246
270 def getbundle(self, source, **kwargs):
247 def getbundle(self, source, **kwargs):
271 kwargs = pycompat.byteskwargs(kwargs)
248 kwargs = pycompat.byteskwargs(kwargs)
272 self.requirecap('getbundle', _('look up remote changes'))
249 self.requirecap('getbundle', _('look up remote changes'))
273 opts = {}
250 opts = {}
274 bundlecaps = kwargs.get('bundlecaps') or set()
251 bundlecaps = kwargs.get('bundlecaps') or set()
275 for key, value in kwargs.iteritems():
252 for key, value in kwargs.iteritems():
276 if value is None:
253 if value is None:
277 continue
254 continue
278 keytype = gboptsmap.get(key)
255 keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
279 if keytype is None:
256 if keytype is None:
280 raise error.ProgrammingError(
257 raise error.ProgrammingError(
281 'Unexpectedly None keytype for key %s' % key)
258 'Unexpectedly None keytype for key %s' % key)
282 elif keytype == 'nodes':
259 elif keytype == 'nodes':
283 value = wireprototypes.encodelist(value)
260 value = wireprototypes.encodelist(value)
284 elif keytype == 'csv':
261 elif keytype == 'csv':
285 value = ','.join(value)
262 value = ','.join(value)
286 elif keytype == 'scsv':
263 elif keytype == 'scsv':
287 value = ','.join(sorted(value))
264 value = ','.join(sorted(value))
288 elif keytype == 'boolean':
265 elif keytype == 'boolean':
289 value = '%i' % bool(value)
266 value = '%i' % bool(value)
290 elif keytype != 'plain':
267 elif keytype != 'plain':
291 raise KeyError('unknown getbundle option type %s'
268 raise KeyError('unknown getbundle option type %s'
292 % keytype)
269 % keytype)
293 opts[key] = value
270 opts[key] = value
294 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
271 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
295 if any((cap.startswith('HG2') for cap in bundlecaps)):
272 if any((cap.startswith('HG2') for cap in bundlecaps)):
296 return bundle2.getunbundler(self.ui, f)
273 return bundle2.getunbundler(self.ui, f)
297 else:
274 else:
298 return changegroupmod.cg1unpacker(f, 'UN')
275 return changegroupmod.cg1unpacker(f, 'UN')
299
276
300 def unbundle(self, cg, heads, url):
277 def unbundle(self, cg, heads, url):
301 '''Send cg (a readable file-like object representing the
278 '''Send cg (a readable file-like object representing the
302 changegroup to push, typically a chunkbuffer object) to the
279 changegroup to push, typically a chunkbuffer object) to the
303 remote server as a bundle.
280 remote server as a bundle.
304
281
305 When pushing a bundle10 stream, return an integer indicating the
282 When pushing a bundle10 stream, return an integer indicating the
306 result of the push (see changegroup.apply()).
283 result of the push (see changegroup.apply()).
307
284
308 When pushing a bundle20 stream, return a bundle20 stream.
285 When pushing a bundle20 stream, return a bundle20 stream.
309
286
310 `url` is the url the client thinks it's pushing to, which is
287 `url` is the url the client thinks it's pushing to, which is
311 visible to hooks.
288 visible to hooks.
312 '''
289 '''
313
290
314 if heads != ['force'] and self.capable('unbundlehash'):
291 if heads != ['force'] and self.capable('unbundlehash'):
315 heads = wireprototypes.encodelist(
292 heads = wireprototypes.encodelist(
316 ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()])
293 ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()])
317 else:
294 else:
318 heads = wireprototypes.encodelist(heads)
295 heads = wireprototypes.encodelist(heads)
319
296
320 if util.safehasattr(cg, 'deltaheader'):
297 if util.safehasattr(cg, 'deltaheader'):
321 # this a bundle10, do the old style call sequence
298 # this a bundle10, do the old style call sequence
322 ret, output = self._callpush("unbundle", cg, heads=heads)
299 ret, output = self._callpush("unbundle", cg, heads=heads)
323 if ret == "":
300 if ret == "":
324 raise error.ResponseError(
301 raise error.ResponseError(
325 _('push failed:'), output)
302 _('push failed:'), output)
326 try:
303 try:
327 ret = int(ret)
304 ret = int(ret)
328 except ValueError:
305 except ValueError:
329 raise error.ResponseError(
306 raise error.ResponseError(
330 _('push failed (unexpected response):'), ret)
307 _('push failed (unexpected response):'), ret)
331
308
332 for l in output.splitlines(True):
309 for l in output.splitlines(True):
333 self.ui.status(_('remote: '), l)
310 self.ui.status(_('remote: '), l)
334 else:
311 else:
335 # bundle2 push. Send a stream, fetch a stream.
312 # bundle2 push. Send a stream, fetch a stream.
336 stream = self._calltwowaystream('unbundle', cg, heads=heads)
313 stream = self._calltwowaystream('unbundle', cg, heads=heads)
337 ret = bundle2.getunbundler(self.ui, stream)
314 ret = bundle2.getunbundler(self.ui, stream)
338 return ret
315 return ret
339
316
340 # End of ipeercommands interface.
317 # End of ipeercommands interface.
341
318
342 # Begin of ipeerlegacycommands interface.
319 # Begin of ipeerlegacycommands interface.
343
320
344 def branches(self, nodes):
321 def branches(self, nodes):
345 n = wireprototypes.encodelist(nodes)
322 n = wireprototypes.encodelist(nodes)
346 d = self._call("branches", nodes=n)
323 d = self._call("branches", nodes=n)
347 try:
324 try:
348 br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
325 br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
349 return br
326 return br
350 except ValueError:
327 except ValueError:
351 self._abort(error.ResponseError(_("unexpected response:"), d))
328 self._abort(error.ResponseError(_("unexpected response:"), d))
352
329
353 def between(self, pairs):
330 def between(self, pairs):
354 batch = 8 # avoid giant requests
331 batch = 8 # avoid giant requests
355 r = []
332 r = []
356 for i in xrange(0, len(pairs), batch):
333 for i in xrange(0, len(pairs), batch):
357 n = " ".join([wireprototypes.encodelist(p, '-')
334 n = " ".join([wireprototypes.encodelist(p, '-')
358 for p in pairs[i:i + batch]])
335 for p in pairs[i:i + batch]])
359 d = self._call("between", pairs=n)
336 d = self._call("between", pairs=n)
360 try:
337 try:
361 r.extend(l and wireprototypes.decodelist(l) or []
338 r.extend(l and wireprototypes.decodelist(l) or []
362 for l in d.splitlines())
339 for l in d.splitlines())
363 except ValueError:
340 except ValueError:
364 self._abort(error.ResponseError(_("unexpected response:"), d))
341 self._abort(error.ResponseError(_("unexpected response:"), d))
365 return r
342 return r
366
343
367 def changegroup(self, nodes, kind):
344 def changegroup(self, nodes, kind):
368 n = wireprototypes.encodelist(nodes)
345 n = wireprototypes.encodelist(nodes)
369 f = self._callcompressable("changegroup", roots=n)
346 f = self._callcompressable("changegroup", roots=n)
370 return changegroupmod.cg1unpacker(f, 'UN')
347 return changegroupmod.cg1unpacker(f, 'UN')
371
348
372 def changegroupsubset(self, bases, heads, kind):
349 def changegroupsubset(self, bases, heads, kind):
373 self.requirecap('changegroupsubset', _('look up remote changes'))
350 self.requirecap('changegroupsubset', _('look up remote changes'))
374 bases = wireprototypes.encodelist(bases)
351 bases = wireprototypes.encodelist(bases)
375 heads = wireprototypes.encodelist(heads)
352 heads = wireprototypes.encodelist(heads)
376 f = self._callcompressable("changegroupsubset",
353 f = self._callcompressable("changegroupsubset",
377 bases=bases, heads=heads)
354 bases=bases, heads=heads)
378 return changegroupmod.cg1unpacker(f, 'UN')
355 return changegroupmod.cg1unpacker(f, 'UN')
379
356
380 # End of ipeerlegacycommands interface.
357 # End of ipeerlegacycommands interface.
381
358
382 def _submitbatch(self, req):
359 def _submitbatch(self, req):
383 """run batch request <req> on the server
360 """run batch request <req> on the server
384
361
385 Returns an iterator of the raw responses from the server.
362 Returns an iterator of the raw responses from the server.
386 """
363 """
387 ui = self.ui
364 ui = self.ui
388 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
365 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
389 ui.debug('devel-peer-request: batched-content\n')
366 ui.debug('devel-peer-request: batched-content\n')
390 for op, args in req:
367 for op, args in req:
391 msg = 'devel-peer-request: - %s (%d arguments)\n'
368 msg = 'devel-peer-request: - %s (%d arguments)\n'
392 ui.debug(msg % (op, len(args)))
369 ui.debug(msg % (op, len(args)))
393
370
394 unescapearg = wireprototypes.unescapebatcharg
371 unescapearg = wireprototypes.unescapebatcharg
395
372
396 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
373 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
397 chunk = rsp.read(1024)
374 chunk = rsp.read(1024)
398 work = [chunk]
375 work = [chunk]
399 while chunk:
376 while chunk:
400 while ';' not in chunk and chunk:
377 while ';' not in chunk and chunk:
401 chunk = rsp.read(1024)
378 chunk = rsp.read(1024)
402 work.append(chunk)
379 work.append(chunk)
403 merged = ''.join(work)
380 merged = ''.join(work)
404 while ';' in merged:
381 while ';' in merged:
405 one, merged = merged.split(';', 1)
382 one, merged = merged.split(';', 1)
406 yield unescapearg(one)
383 yield unescapearg(one)
407 chunk = rsp.read(1024)
384 chunk = rsp.read(1024)
408 work = [merged, chunk]
385 work = [merged, chunk]
409 yield unescapearg(''.join(work))
386 yield unescapearg(''.join(work))
410
387
411 def _submitone(self, op, args):
388 def _submitone(self, op, args):
412 return self._call(op, **pycompat.strkwargs(args))
389 return self._call(op, **pycompat.strkwargs(args))
413
390
414 def debugwireargs(self, one, two, three=None, four=None, five=None):
391 def debugwireargs(self, one, two, three=None, four=None, five=None):
415 # don't pass optional arguments left at their default value
392 # don't pass optional arguments left at their default value
416 opts = {}
393 opts = {}
417 if three is not None:
394 if three is not None:
418 opts[r'three'] = three
395 opts[r'three'] = three
419 if four is not None:
396 if four is not None:
420 opts[r'four'] = four
397 opts[r'four'] = four
421 return self._call('debugwireargs', one=one, two=two, **opts)
398 return self._call('debugwireargs', one=one, two=two, **opts)
422
399
423 def _call(self, cmd, **args):
400 def _call(self, cmd, **args):
424 """execute <cmd> on the server
401 """execute <cmd> on the server
425
402
426 The command is expected to return a simple string.
403 The command is expected to return a simple string.
427
404
428 returns the server reply as a string."""
405 returns the server reply as a string."""
429 raise NotImplementedError()
406 raise NotImplementedError()
430
407
431 def _callstream(self, cmd, **args):
408 def _callstream(self, cmd, **args):
432 """execute <cmd> on the server
409 """execute <cmd> on the server
433
410
434 The command is expected to return a stream. Note that if the
411 The command is expected to return a stream. Note that if the
435 command doesn't return a stream, _callstream behaves
412 command doesn't return a stream, _callstream behaves
436 differently for ssh and http peers.
413 differently for ssh and http peers.
437
414
438 returns the server reply as a file like object.
415 returns the server reply as a file like object.
439 """
416 """
440 raise NotImplementedError()
417 raise NotImplementedError()
441
418
442 def _callcompressable(self, cmd, **args):
419 def _callcompressable(self, cmd, **args):
443 """execute <cmd> on the server
420 """execute <cmd> on the server
444
421
445 The command is expected to return a stream.
422 The command is expected to return a stream.
446
423
447 The stream may have been compressed in some implementations. This
424 The stream may have been compressed in some implementations. This
448 function takes care of the decompression. This is the only difference
425 function takes care of the decompression. This is the only difference
449 with _callstream.
426 with _callstream.
450
427
451 returns the server reply as a file like object.
428 returns the server reply as a file like object.
452 """
429 """
453 raise NotImplementedError()
430 raise NotImplementedError()
454
431
455 def _callpush(self, cmd, fp, **args):
432 def _callpush(self, cmd, fp, **args):
456 """execute a <cmd> on server
433 """execute a <cmd> on server
457
434
458 The command is expected to be related to a push. Push has a special
435 The command is expected to be related to a push. Push has a special
459 return method.
436 return method.
460
437
461 returns the server reply as a (ret, output) tuple. ret is either
438 returns the server reply as a (ret, output) tuple. ret is either
462 empty (error) or a stringified int.
439 empty (error) or a stringified int.
463 """
440 """
464 raise NotImplementedError()
441 raise NotImplementedError()
465
442
466 def _calltwowaystream(self, cmd, fp, **args):
443 def _calltwowaystream(self, cmd, fp, **args):
467 """execute <cmd> on server
444 """execute <cmd> on server
468
445
469 The command will send a stream to the server and get a stream in reply.
446 The command will send a stream to the server and get a stream in reply.
470 """
447 """
471 raise NotImplementedError()
448 raise NotImplementedError()
472
449
473 def _abort(self, exception):
450 def _abort(self, exception):
474 """clearly abort the wire protocol connection and raise the exception
451 """clearly abort the wire protocol connection and raise the exception
475 """
452 """
476 raise NotImplementedError()
453 raise NotImplementedError()
477
454
478 # server side
455 # server side
479
456
480 # wire protocol command can either return a string or one of these classes.
457 # wire protocol command can either return a string or one of these classes.
481
458
482 def getdispatchrepo(repo, proto, command):
459 def getdispatchrepo(repo, proto, command):
483 """Obtain the repo used for processing wire protocol commands.
460 """Obtain the repo used for processing wire protocol commands.
484
461
485 The intent of this function is to serve as a monkeypatch point for
462 The intent of this function is to serve as a monkeypatch point for
486 extensions that need commands to operate on different repo views under
463 extensions that need commands to operate on different repo views under
487 specialized circumstances.
464 specialized circumstances.
488 """
465 """
489 return repo.filtered('served')
466 return repo.filtered('served')
490
467
491 def dispatch(repo, proto, command):
468 def dispatch(repo, proto, command):
492 repo = getdispatchrepo(repo, proto, command)
469 repo = getdispatchrepo(repo, proto, command)
493
470
494 transportversion = wireprototypes.TRANSPORTS[proto.name]['version']
471 transportversion = wireprototypes.TRANSPORTS[proto.name]['version']
495 commandtable = commandsv2 if transportversion == 2 else commands
472 commandtable = commandsv2 if transportversion == 2 else commands
496 func, spec = commandtable[command]
473 func, spec = commandtable[command]
497
474
498 args = proto.getargs(spec)
475 args = proto.getargs(spec)
499
476
500 # Version 1 protocols define arguments as a list. Version 2 uses a dict.
477 # Version 1 protocols define arguments as a list. Version 2 uses a dict.
501 if isinstance(args, list):
478 if isinstance(args, list):
502 return func(repo, proto, *args)
479 return func(repo, proto, *args)
503 elif isinstance(args, dict):
480 elif isinstance(args, dict):
504 return func(repo, proto, **args)
481 return func(repo, proto, **args)
505 else:
482 else:
506 raise error.ProgrammingError('unexpected type returned from '
483 raise error.ProgrammingError('unexpected type returned from '
507 'proto.getargs(): %s' % type(args))
484 'proto.getargs(): %s' % type(args))
508
485
509 def options(cmd, keys, others):
486 def options(cmd, keys, others):
510 opts = {}
487 opts = {}
511 for k in keys:
488 for k in keys:
512 if k in others:
489 if k in others:
513 opts[k] = others[k]
490 opts[k] = others[k]
514 del others[k]
491 del others[k]
515 if others:
492 if others:
516 procutil.stderr.write("warning: %s ignored unexpected arguments %s\n"
493 procutil.stderr.write("warning: %s ignored unexpected arguments %s\n"
517 % (cmd, ",".join(others)))
494 % (cmd, ",".join(others)))
518 return opts
495 return opts
519
496
520 def bundle1allowed(repo, action):
497 def bundle1allowed(repo, action):
521 """Whether a bundle1 operation is allowed from the server.
498 """Whether a bundle1 operation is allowed from the server.
522
499
523 Priority is:
500 Priority is:
524
501
525 1. server.bundle1gd.<action> (if generaldelta active)
502 1. server.bundle1gd.<action> (if generaldelta active)
526 2. server.bundle1.<action>
503 2. server.bundle1.<action>
527 3. server.bundle1gd (if generaldelta active)
504 3. server.bundle1gd (if generaldelta active)
528 4. server.bundle1
505 4. server.bundle1
529 """
506 """
530 ui = repo.ui
507 ui = repo.ui
531 gd = 'generaldelta' in repo.requirements
508 gd = 'generaldelta' in repo.requirements
532
509
533 if gd:
510 if gd:
534 v = ui.configbool('server', 'bundle1gd.%s' % action)
511 v = ui.configbool('server', 'bundle1gd.%s' % action)
535 if v is not None:
512 if v is not None:
536 return v
513 return v
537
514
538 v = ui.configbool('server', 'bundle1.%s' % action)
515 v = ui.configbool('server', 'bundle1.%s' % action)
539 if v is not None:
516 if v is not None:
540 return v
517 return v
541
518
542 if gd:
519 if gd:
543 v = ui.configbool('server', 'bundle1gd')
520 v = ui.configbool('server', 'bundle1gd')
544 if v is not None:
521 if v is not None:
545 return v
522 return v
546
523
547 return ui.configbool('server', 'bundle1')
524 return ui.configbool('server', 'bundle1')
548
525
549 def supportedcompengines(ui, role):
526 def supportedcompengines(ui, role):
550 """Obtain the list of supported compression engines for a request."""
527 """Obtain the list of supported compression engines for a request."""
551 assert role in (util.CLIENTROLE, util.SERVERROLE)
528 assert role in (util.CLIENTROLE, util.SERVERROLE)
552
529
553 compengines = util.compengines.supportedwireengines(role)
530 compengines = util.compengines.supportedwireengines(role)
554
531
555 # Allow config to override default list and ordering.
532 # Allow config to override default list and ordering.
556 if role == util.SERVERROLE:
533 if role == util.SERVERROLE:
557 configengines = ui.configlist('server', 'compressionengines')
534 configengines = ui.configlist('server', 'compressionengines')
558 config = 'server.compressionengines'
535 config = 'server.compressionengines'
559 else:
536 else:
560 # This is currently implemented mainly to facilitate testing. In most
537 # This is currently implemented mainly to facilitate testing. In most
561 # cases, the server should be in charge of choosing a compression engine
538 # cases, the server should be in charge of choosing a compression engine
562 # because a server has the most to lose from a sub-optimal choice. (e.g.
539 # because a server has the most to lose from a sub-optimal choice. (e.g.
563 # CPU DoS due to an expensive engine or a network DoS due to poor
540 # CPU DoS due to an expensive engine or a network DoS due to poor
564 # compression ratio).
541 # compression ratio).
565 configengines = ui.configlist('experimental',
542 configengines = ui.configlist('experimental',
566 'clientcompressionengines')
543 'clientcompressionengines')
567 config = 'experimental.clientcompressionengines'
544 config = 'experimental.clientcompressionengines'
568
545
569 # No explicit config. Filter out the ones that aren't supposed to be
546 # No explicit config. Filter out the ones that aren't supposed to be
570 # advertised and return default ordering.
547 # advertised and return default ordering.
571 if not configengines:
548 if not configengines:
572 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
549 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
573 return [e for e in compengines
550 return [e for e in compengines
574 if getattr(e.wireprotosupport(), attr) > 0]
551 if getattr(e.wireprotosupport(), attr) > 0]
575
552
576 # If compression engines are listed in the config, assume there is a good
553 # If compression engines are listed in the config, assume there is a good
577 # reason for it (like server operators wanting to achieve specific
554 # reason for it (like server operators wanting to achieve specific
578 # performance characteristics). So fail fast if the config references
555 # performance characteristics). So fail fast if the config references
579 # unusable compression engines.
556 # unusable compression engines.
580 validnames = set(e.name() for e in compengines)
557 validnames = set(e.name() for e in compengines)
581 invalidnames = set(e for e in configengines if e not in validnames)
558 invalidnames = set(e for e in configengines if e not in validnames)
582 if invalidnames:
559 if invalidnames:
583 raise error.Abort(_('invalid compression engine defined in %s: %s') %
560 raise error.Abort(_('invalid compression engine defined in %s: %s') %
584 (config, ', '.join(sorted(invalidnames))))
561 (config, ', '.join(sorted(invalidnames))))
585
562
586 compengines = [e for e in compengines if e.name() in configengines]
563 compengines = [e for e in compengines if e.name() in configengines]
587 compengines = sorted(compengines,
564 compengines = sorted(compengines,
588 key=lambda e: configengines.index(e.name()))
565 key=lambda e: configengines.index(e.name()))
589
566
590 if not compengines:
567 if not compengines:
591 raise error.Abort(_('%s config option does not specify any known '
568 raise error.Abort(_('%s config option does not specify any known '
592 'compression engines') % config,
569 'compression engines') % config,
593 hint=_('usable compression engines: %s') %
570 hint=_('usable compression engines: %s') %
594 ', '.sorted(validnames))
571 ', '.sorted(validnames))
595
572
596 return compengines
573 return compengines
597
574
598 class commandentry(object):
575 class commandentry(object):
599 """Represents a declared wire protocol command."""
576 """Represents a declared wire protocol command."""
600 def __init__(self, func, args='', transports=None,
577 def __init__(self, func, args='', transports=None,
601 permission='push'):
578 permission='push'):
602 self.func = func
579 self.func = func
603 self.args = args
580 self.args = args
604 self.transports = transports or set()
581 self.transports = transports or set()
605 self.permission = permission
582 self.permission = permission
606
583
607 def _merge(self, func, args):
584 def _merge(self, func, args):
608 """Merge this instance with an incoming 2-tuple.
585 """Merge this instance with an incoming 2-tuple.
609
586
610 This is called when a caller using the old 2-tuple API attempts
587 This is called when a caller using the old 2-tuple API attempts
611 to replace an instance. The incoming values are merged with
588 to replace an instance. The incoming values are merged with
612 data not captured by the 2-tuple and a new instance containing
589 data not captured by the 2-tuple and a new instance containing
613 the union of the two objects is returned.
590 the union of the two objects is returned.
614 """
591 """
615 return commandentry(func, args=args, transports=set(self.transports),
592 return commandentry(func, args=args, transports=set(self.transports),
616 permission=self.permission)
593 permission=self.permission)
617
594
618 # Old code treats instances as 2-tuples. So expose that interface.
595 # Old code treats instances as 2-tuples. So expose that interface.
619 def __iter__(self):
596 def __iter__(self):
620 yield self.func
597 yield self.func
621 yield self.args
598 yield self.args
622
599
623 def __getitem__(self, i):
600 def __getitem__(self, i):
624 if i == 0:
601 if i == 0:
625 return self.func
602 return self.func
626 elif i == 1:
603 elif i == 1:
627 return self.args
604 return self.args
628 else:
605 else:
629 raise IndexError('can only access elements 0 and 1')
606 raise IndexError('can only access elements 0 and 1')
630
607
631 class commanddict(dict):
608 class commanddict(dict):
632 """Container for registered wire protocol commands.
609 """Container for registered wire protocol commands.
633
610
634 It behaves like a dict. But __setitem__ is overwritten to allow silent
611 It behaves like a dict. But __setitem__ is overwritten to allow silent
635 coercion of values from 2-tuples for API compatibility.
612 coercion of values from 2-tuples for API compatibility.
636 """
613 """
637 def __setitem__(self, k, v):
614 def __setitem__(self, k, v):
638 if isinstance(v, commandentry):
615 if isinstance(v, commandentry):
639 pass
616 pass
640 # Cast 2-tuples to commandentry instances.
617 # Cast 2-tuples to commandentry instances.
641 elif isinstance(v, tuple):
618 elif isinstance(v, tuple):
642 if len(v) != 2:
619 if len(v) != 2:
643 raise ValueError('command tuples must have exactly 2 elements')
620 raise ValueError('command tuples must have exactly 2 elements')
644
621
645 # It is common for extensions to wrap wire protocol commands via
622 # It is common for extensions to wrap wire protocol commands via
646 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
623 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
647 # doing this aren't aware of the new API that uses objects to store
624 # doing this aren't aware of the new API that uses objects to store
648 # command entries, we automatically merge old state with new.
625 # command entries, we automatically merge old state with new.
649 if k in self:
626 if k in self:
650 v = self[k]._merge(v[0], v[1])
627 v = self[k]._merge(v[0], v[1])
651 else:
628 else:
652 # Use default values from @wireprotocommand.
629 # Use default values from @wireprotocommand.
653 v = commandentry(v[0], args=v[1],
630 v = commandentry(v[0], args=v[1],
654 transports=set(wireprototypes.TRANSPORTS),
631 transports=set(wireprototypes.TRANSPORTS),
655 permission='push')
632 permission='push')
656 else:
633 else:
657 raise ValueError('command entries must be commandentry instances '
634 raise ValueError('command entries must be commandentry instances '
658 'or 2-tuples')
635 'or 2-tuples')
659
636
660 return super(commanddict, self).__setitem__(k, v)
637 return super(commanddict, self).__setitem__(k, v)
661
638
662 def commandavailable(self, command, proto):
639 def commandavailable(self, command, proto):
663 """Determine if a command is available for the requested protocol."""
640 """Determine if a command is available for the requested protocol."""
664 assert proto.name in wireprototypes.TRANSPORTS
641 assert proto.name in wireprototypes.TRANSPORTS
665
642
666 entry = self.get(command)
643 entry = self.get(command)
667
644
668 if not entry:
645 if not entry:
669 return False
646 return False
670
647
671 if proto.name not in entry.transports:
648 if proto.name not in entry.transports:
672 return False
649 return False
673
650
674 return True
651 return True
675
652
676 # Constants specifying which transports a wire protocol command should be
653 # Constants specifying which transports a wire protocol command should be
677 # available on. For use with @wireprotocommand.
654 # available on. For use with @wireprotocommand.
678 POLICY_V1_ONLY = 'v1-only'
655 POLICY_V1_ONLY = 'v1-only'
679 POLICY_V2_ONLY = 'v2-only'
656 POLICY_V2_ONLY = 'v2-only'
680
657
681 # For version 1 transports.
658 # For version 1 transports.
682 commands = commanddict()
659 commands = commanddict()
683
660
684 # For version 2 transports.
661 # For version 2 transports.
685 commandsv2 = commanddict()
662 commandsv2 = commanddict()
686
663
687 def wireprotocommand(name, args=None, transportpolicy=POLICY_V1_ONLY,
664 def wireprotocommand(name, args=None, transportpolicy=POLICY_V1_ONLY,
688 permission='push'):
665 permission='push'):
689 """Decorator to declare a wire protocol command.
666 """Decorator to declare a wire protocol command.
690
667
691 ``name`` is the name of the wire protocol command being provided.
668 ``name`` is the name of the wire protocol command being provided.
692
669
693 ``args`` defines the named arguments accepted by the command. It is
670 ``args`` defines the named arguments accepted by the command. It is
694 ideally a dict mapping argument names to their types. For backwards
671 ideally a dict mapping argument names to their types. For backwards
695 compatibility, it can be a space-delimited list of argument names. For
672 compatibility, it can be a space-delimited list of argument names. For
696 version 1 transports, ``*`` denotes a special value that says to accept
673 version 1 transports, ``*`` denotes a special value that says to accept
697 all named arguments.
674 all named arguments.
698
675
699 ``transportpolicy`` is a POLICY_* constant denoting which transports
676 ``transportpolicy`` is a POLICY_* constant denoting which transports
700 this wire protocol command should be exposed to. By default, commands
677 this wire protocol command should be exposed to. By default, commands
701 are exposed to all wire protocol transports.
678 are exposed to all wire protocol transports.
702
679
703 ``permission`` defines the permission type needed to run this command.
680 ``permission`` defines the permission type needed to run this command.
704 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
681 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
705 respectively. Default is to assume command requires ``push`` permissions
682 respectively. Default is to assume command requires ``push`` permissions
706 because otherwise commands not declaring their permissions could modify
683 because otherwise commands not declaring their permissions could modify
707 a repository that is supposed to be read-only.
684 a repository that is supposed to be read-only.
708 """
685 """
709 if transportpolicy == POLICY_V1_ONLY:
686 if transportpolicy == POLICY_V1_ONLY:
710 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
687 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
711 if v['version'] == 1}
688 if v['version'] == 1}
712 transportversion = 1
689 transportversion = 1
713 elif transportpolicy == POLICY_V2_ONLY:
690 elif transportpolicy == POLICY_V2_ONLY:
714 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
691 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
715 if v['version'] == 2}
692 if v['version'] == 2}
716 transportversion = 2
693 transportversion = 2
717 else:
694 else:
718 raise error.ProgrammingError('invalid transport policy value: %s' %
695 raise error.ProgrammingError('invalid transport policy value: %s' %
719 transportpolicy)
696 transportpolicy)
720
697
721 # Because SSHv2 is a mirror of SSHv1, we allow "batch" commands through to
698 # Because SSHv2 is a mirror of SSHv1, we allow "batch" commands through to
722 # SSHv2.
699 # SSHv2.
723 # TODO undo this hack when SSH is using the unified frame protocol.
700 # TODO undo this hack when SSH is using the unified frame protocol.
724 if name == b'batch':
701 if name == b'batch':
725 transports.add(wireprototypes.SSHV2)
702 transports.add(wireprototypes.SSHV2)
726
703
727 if permission not in ('push', 'pull'):
704 if permission not in ('push', 'pull'):
728 raise error.ProgrammingError('invalid wire protocol permission; '
705 raise error.ProgrammingError('invalid wire protocol permission; '
729 'got %s; expected "push" or "pull"' %
706 'got %s; expected "push" or "pull"' %
730 permission)
707 permission)
731
708
732 if transportversion == 1:
709 if transportversion == 1:
733 if args is None:
710 if args is None:
734 args = ''
711 args = ''
735
712
736 if not isinstance(args, bytes):
713 if not isinstance(args, bytes):
737 raise error.ProgrammingError('arguments for version 1 commands '
714 raise error.ProgrammingError('arguments for version 1 commands '
738 'must be declared as bytes')
715 'must be declared as bytes')
739 elif transportversion == 2:
716 elif transportversion == 2:
740 if args is None:
717 if args is None:
741 args = {}
718 args = {}
742
719
743 if not isinstance(args, dict):
720 if not isinstance(args, dict):
744 raise error.ProgrammingError('arguments for version 2 commands '
721 raise error.ProgrammingError('arguments for version 2 commands '
745 'must be declared as dicts')
722 'must be declared as dicts')
746
723
747 def register(func):
724 def register(func):
748 if transportversion == 1:
725 if transportversion == 1:
749 if name in commands:
726 if name in commands:
750 raise error.ProgrammingError('%s command already registered '
727 raise error.ProgrammingError('%s command already registered '
751 'for version 1' % name)
728 'for version 1' % name)
752 commands[name] = commandentry(func, args=args,
729 commands[name] = commandentry(func, args=args,
753 transports=transports,
730 transports=transports,
754 permission=permission)
731 permission=permission)
755 elif transportversion == 2:
732 elif transportversion == 2:
756 if name in commandsv2:
733 if name in commandsv2:
757 raise error.ProgrammingError('%s command already registered '
734 raise error.ProgrammingError('%s command already registered '
758 'for version 2' % name)
735 'for version 2' % name)
759
736
760 commandsv2[name] = commandentry(func, args=args,
737 commandsv2[name] = commandentry(func, args=args,
761 transports=transports,
738 transports=transports,
762 permission=permission)
739 permission=permission)
763 else:
740 else:
764 raise error.ProgrammingError('unhandled transport version: %d' %
741 raise error.ProgrammingError('unhandled transport version: %d' %
765 transportversion)
742 transportversion)
766
743
767 return func
744 return func
768 return register
745 return register
769
746
770 # TODO define a more appropriate permissions type to use for this.
747 # TODO define a more appropriate permissions type to use for this.
771 @wireprotocommand('batch', 'cmds *', permission='pull',
748 @wireprotocommand('batch', 'cmds *', permission='pull',
772 transportpolicy=POLICY_V1_ONLY)
749 transportpolicy=POLICY_V1_ONLY)
773 def batch(repo, proto, cmds, others):
750 def batch(repo, proto, cmds, others):
774 unescapearg = wireprototypes.unescapebatcharg
751 unescapearg = wireprototypes.unescapebatcharg
775 repo = repo.filtered("served")
752 repo = repo.filtered("served")
776 res = []
753 res = []
777 for pair in cmds.split(';'):
754 for pair in cmds.split(';'):
778 op, args = pair.split(' ', 1)
755 op, args = pair.split(' ', 1)
779 vals = {}
756 vals = {}
780 for a in args.split(','):
757 for a in args.split(','):
781 if a:
758 if a:
782 n, v = a.split('=')
759 n, v = a.split('=')
783 vals[unescapearg(n)] = unescapearg(v)
760 vals[unescapearg(n)] = unescapearg(v)
784 func, spec = commands[op]
761 func, spec = commands[op]
785
762
786 # Validate that client has permissions to perform this command.
763 # Validate that client has permissions to perform this command.
787 perm = commands[op].permission
764 perm = commands[op].permission
788 assert perm in ('push', 'pull')
765 assert perm in ('push', 'pull')
789 proto.checkperm(perm)
766 proto.checkperm(perm)
790
767
791 if spec:
768 if spec:
792 keys = spec.split()
769 keys = spec.split()
793 data = {}
770 data = {}
794 for k in keys:
771 for k in keys:
795 if k == '*':
772 if k == '*':
796 star = {}
773 star = {}
797 for key in vals.keys():
774 for key in vals.keys():
798 if key not in keys:
775 if key not in keys:
799 star[key] = vals[key]
776 star[key] = vals[key]
800 data['*'] = star
777 data['*'] = star
801 else:
778 else:
802 data[k] = vals[k]
779 data[k] = vals[k]
803 result = func(repo, proto, *[data[k] for k in keys])
780 result = func(repo, proto, *[data[k] for k in keys])
804 else:
781 else:
805 result = func(repo, proto)
782 result = func(repo, proto)
806 if isinstance(result, wireprototypes.ooberror):
783 if isinstance(result, wireprototypes.ooberror):
807 return result
784 return result
808
785
809 # For now, all batchable commands must return bytesresponse or
786 # For now, all batchable commands must return bytesresponse or
810 # raw bytes (for backwards compatibility).
787 # raw bytes (for backwards compatibility).
811 assert isinstance(result, (wireprototypes.bytesresponse, bytes))
788 assert isinstance(result, (wireprototypes.bytesresponse, bytes))
812 if isinstance(result, wireprototypes.bytesresponse):
789 if isinstance(result, wireprototypes.bytesresponse):
813 result = result.data
790 result = result.data
814 res.append(wireprototypes.escapebatcharg(result))
791 res.append(wireprototypes.escapebatcharg(result))
815
792
816 return wireprototypes.bytesresponse(';'.join(res))
793 return wireprototypes.bytesresponse(';'.join(res))
817
794
818 @wireprotocommand('between', 'pairs', transportpolicy=POLICY_V1_ONLY,
795 @wireprotocommand('between', 'pairs', transportpolicy=POLICY_V1_ONLY,
819 permission='pull')
796 permission='pull')
820 def between(repo, proto, pairs):
797 def between(repo, proto, pairs):
821 pairs = [wireprototypes.decodelist(p, '-') for p in pairs.split(" ")]
798 pairs = [wireprototypes.decodelist(p, '-') for p in pairs.split(" ")]
822 r = []
799 r = []
823 for b in repo.between(pairs):
800 for b in repo.between(pairs):
824 r.append(wireprototypes.encodelist(b) + "\n")
801 r.append(wireprototypes.encodelist(b) + "\n")
825
802
826 return wireprototypes.bytesresponse(''.join(r))
803 return wireprototypes.bytesresponse(''.join(r))
827
804
828 @wireprotocommand('branchmap', permission='pull',
805 @wireprotocommand('branchmap', permission='pull',
829 transportpolicy=POLICY_V1_ONLY)
806 transportpolicy=POLICY_V1_ONLY)
830 def branchmap(repo, proto):
807 def branchmap(repo, proto):
831 branchmap = repo.branchmap()
808 branchmap = repo.branchmap()
832 heads = []
809 heads = []
833 for branch, nodes in branchmap.iteritems():
810 for branch, nodes in branchmap.iteritems():
834 branchname = urlreq.quote(encoding.fromlocal(branch))
811 branchname = urlreq.quote(encoding.fromlocal(branch))
835 branchnodes = wireprototypes.encodelist(nodes)
812 branchnodes = wireprototypes.encodelist(nodes)
836 heads.append('%s %s' % (branchname, branchnodes))
813 heads.append('%s %s' % (branchname, branchnodes))
837
814
838 return wireprototypes.bytesresponse('\n'.join(heads))
815 return wireprototypes.bytesresponse('\n'.join(heads))
839
816
840 @wireprotocommand('branches', 'nodes', transportpolicy=POLICY_V1_ONLY,
817 @wireprotocommand('branches', 'nodes', transportpolicy=POLICY_V1_ONLY,
841 permission='pull')
818 permission='pull')
842 def branches(repo, proto, nodes):
819 def branches(repo, proto, nodes):
843 nodes = wireprototypes.decodelist(nodes)
820 nodes = wireprototypes.decodelist(nodes)
844 r = []
821 r = []
845 for b in repo.branches(nodes):
822 for b in repo.branches(nodes):
846 r.append(wireprototypes.encodelist(b) + "\n")
823 r.append(wireprototypes.encodelist(b) + "\n")
847
824
848 return wireprototypes.bytesresponse(''.join(r))
825 return wireprototypes.bytesresponse(''.join(r))
849
826
850 @wireprotocommand('clonebundles', '', permission='pull',
827 @wireprotocommand('clonebundles', '', permission='pull',
851 transportpolicy=POLICY_V1_ONLY)
828 transportpolicy=POLICY_V1_ONLY)
852 def clonebundles(repo, proto):
829 def clonebundles(repo, proto):
853 """Server command for returning info for available bundles to seed clones.
830 """Server command for returning info for available bundles to seed clones.
854
831
855 Clients will parse this response and determine what bundle to fetch.
832 Clients will parse this response and determine what bundle to fetch.
856
833
857 Extensions may wrap this command to filter or dynamically emit data
834 Extensions may wrap this command to filter or dynamically emit data
858 depending on the request. e.g. you could advertise URLs for the closest
835 depending on the request. e.g. you could advertise URLs for the closest
859 data center given the client's IP address.
836 data center given the client's IP address.
860 """
837 """
861 return wireprototypes.bytesresponse(
838 return wireprototypes.bytesresponse(
862 repo.vfs.tryread('clonebundles.manifest'))
839 repo.vfs.tryread('clonebundles.manifest'))
863
840
864 wireprotocaps = ['lookup', 'branchmap', 'pushkey',
841 wireprotocaps = ['lookup', 'branchmap', 'pushkey',
865 'known', 'getbundle', 'unbundlehash']
842 'known', 'getbundle', 'unbundlehash']
866
843
867 def _capabilities(repo, proto):
844 def _capabilities(repo, proto):
868 """return a list of capabilities for a repo
845 """return a list of capabilities for a repo
869
846
870 This function exists to allow extensions to easily wrap capabilities
847 This function exists to allow extensions to easily wrap capabilities
871 computation
848 computation
872
849
873 - returns a lists: easy to alter
850 - returns a lists: easy to alter
874 - change done here will be propagated to both `capabilities` and `hello`
851 - change done here will be propagated to both `capabilities` and `hello`
875 command without any other action needed.
852 command without any other action needed.
876 """
853 """
877 # copy to prevent modification of the global list
854 # copy to prevent modification of the global list
878 caps = list(wireprotocaps)
855 caps = list(wireprotocaps)
879
856
880 # Command of same name as capability isn't exposed to version 1 of
857 # Command of same name as capability isn't exposed to version 1 of
881 # transports. So conditionally add it.
858 # transports. So conditionally add it.
882 if commands.commandavailable('changegroupsubset', proto):
859 if commands.commandavailable('changegroupsubset', proto):
883 caps.append('changegroupsubset')
860 caps.append('changegroupsubset')
884
861
885 if streamclone.allowservergeneration(repo):
862 if streamclone.allowservergeneration(repo):
886 if repo.ui.configbool('server', 'preferuncompressed'):
863 if repo.ui.configbool('server', 'preferuncompressed'):
887 caps.append('stream-preferred')
864 caps.append('stream-preferred')
888 requiredformats = repo.requirements & repo.supportedformats
865 requiredformats = repo.requirements & repo.supportedformats
889 # if our local revlogs are just revlogv1, add 'stream' cap
866 # if our local revlogs are just revlogv1, add 'stream' cap
890 if not requiredformats - {'revlogv1'}:
867 if not requiredformats - {'revlogv1'}:
891 caps.append('stream')
868 caps.append('stream')
892 # otherwise, add 'streamreqs' detailing our local revlog format
869 # otherwise, add 'streamreqs' detailing our local revlog format
893 else:
870 else:
894 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
871 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
895 if repo.ui.configbool('experimental', 'bundle2-advertise'):
872 if repo.ui.configbool('experimental', 'bundle2-advertise'):
896 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
873 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
897 caps.append('bundle2=' + urlreq.quote(capsblob))
874 caps.append('bundle2=' + urlreq.quote(capsblob))
898 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
875 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
899
876
900 return proto.addcapabilities(repo, caps)
877 return proto.addcapabilities(repo, caps)
901
878
902 # If you are writing an extension and consider wrapping this function. Wrap
879 # If you are writing an extension and consider wrapping this function. Wrap
903 # `_capabilities` instead.
880 # `_capabilities` instead.
904 @wireprotocommand('capabilities', permission='pull',
881 @wireprotocommand('capabilities', permission='pull',
905 transportpolicy=POLICY_V1_ONLY)
882 transportpolicy=POLICY_V1_ONLY)
906 def capabilities(repo, proto):
883 def capabilities(repo, proto):
907 caps = _capabilities(repo, proto)
884 caps = _capabilities(repo, proto)
908 return wireprototypes.bytesresponse(' '.join(sorted(caps)))
885 return wireprototypes.bytesresponse(' '.join(sorted(caps)))
909
886
910 @wireprotocommand('changegroup', 'roots', transportpolicy=POLICY_V1_ONLY,
887 @wireprotocommand('changegroup', 'roots', transportpolicy=POLICY_V1_ONLY,
911 permission='pull')
888 permission='pull')
912 def changegroup(repo, proto, roots):
889 def changegroup(repo, proto, roots):
913 nodes = wireprototypes.decodelist(roots)
890 nodes = wireprototypes.decodelist(roots)
914 outgoing = discovery.outgoing(repo, missingroots=nodes,
891 outgoing = discovery.outgoing(repo, missingroots=nodes,
915 missingheads=repo.heads())
892 missingheads=repo.heads())
916 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
893 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
917 gen = iter(lambda: cg.read(32768), '')
894 gen = iter(lambda: cg.read(32768), '')
918 return wireprototypes.streamres(gen=gen)
895 return wireprototypes.streamres(gen=gen)
919
896
920 @wireprotocommand('changegroupsubset', 'bases heads',
897 @wireprotocommand('changegroupsubset', 'bases heads',
921 transportpolicy=POLICY_V1_ONLY,
898 transportpolicy=POLICY_V1_ONLY,
922 permission='pull')
899 permission='pull')
923 def changegroupsubset(repo, proto, bases, heads):
900 def changegroupsubset(repo, proto, bases, heads):
924 bases = wireprototypes.decodelist(bases)
901 bases = wireprototypes.decodelist(bases)
925 heads = wireprototypes.decodelist(heads)
902 heads = wireprototypes.decodelist(heads)
926 outgoing = discovery.outgoing(repo, missingroots=bases,
903 outgoing = discovery.outgoing(repo, missingroots=bases,
927 missingheads=heads)
904 missingheads=heads)
928 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
905 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
929 gen = iter(lambda: cg.read(32768), '')
906 gen = iter(lambda: cg.read(32768), '')
930 return wireprototypes.streamres(gen=gen)
907 return wireprototypes.streamres(gen=gen)
931
908
932 @wireprotocommand('debugwireargs', 'one two *',
909 @wireprotocommand('debugwireargs', 'one two *',
933 permission='pull', transportpolicy=POLICY_V1_ONLY)
910 permission='pull', transportpolicy=POLICY_V1_ONLY)
934 def debugwireargs(repo, proto, one, two, others):
911 def debugwireargs(repo, proto, one, two, others):
935 # only accept optional args from the known set
912 # only accept optional args from the known set
936 opts = options('debugwireargs', ['three', 'four'], others)
913 opts = options('debugwireargs', ['three', 'four'], others)
937 return wireprototypes.bytesresponse(repo.debugwireargs(
914 return wireprototypes.bytesresponse(repo.debugwireargs(
938 one, two, **pycompat.strkwargs(opts)))
915 one, two, **pycompat.strkwargs(opts)))
939
916
940 def find_pullbundle(repo, proto, opts, clheads, heads, common):
917 def find_pullbundle(repo, proto, opts, clheads, heads, common):
941 """Return a file object for the first matching pullbundle.
918 """Return a file object for the first matching pullbundle.
942
919
943 Pullbundles are specified in .hg/pullbundles.manifest similar to
920 Pullbundles are specified in .hg/pullbundles.manifest similar to
944 clonebundles.
921 clonebundles.
945 For each entry, the bundle specification is checked for compatibility:
922 For each entry, the bundle specification is checked for compatibility:
946 - Client features vs the BUNDLESPEC.
923 - Client features vs the BUNDLESPEC.
947 - Revisions shared with the clients vs base revisions of the bundle.
924 - Revisions shared with the clients vs base revisions of the bundle.
948 A bundle can be applied only if all its base revisions are known by
925 A bundle can be applied only if all its base revisions are known by
949 the client.
926 the client.
950 - At least one leaf of the bundle's DAG is missing on the client.
927 - At least one leaf of the bundle's DAG is missing on the client.
951 - Every leaf of the bundle's DAG is part of node set the client wants.
928 - Every leaf of the bundle's DAG is part of node set the client wants.
952 E.g. do not send a bundle of all changes if the client wants only
929 E.g. do not send a bundle of all changes if the client wants only
953 one specific branch of many.
930 one specific branch of many.
954 """
931 """
955 def decodehexstring(s):
932 def decodehexstring(s):
956 return set([h.decode('hex') for h in s.split(';')])
933 return set([h.decode('hex') for h in s.split(';')])
957
934
958 manifest = repo.vfs.tryread('pullbundles.manifest')
935 manifest = repo.vfs.tryread('pullbundles.manifest')
959 if not manifest:
936 if not manifest:
960 return None
937 return None
961 res = exchange.parseclonebundlesmanifest(repo, manifest)
938 res = exchange.parseclonebundlesmanifest(repo, manifest)
962 res = exchange.filterclonebundleentries(repo, res)
939 res = exchange.filterclonebundleentries(repo, res)
963 if not res:
940 if not res:
964 return None
941 return None
965 cl = repo.changelog
942 cl = repo.changelog
966 heads_anc = cl.ancestors([cl.rev(rev) for rev in heads], inclusive=True)
943 heads_anc = cl.ancestors([cl.rev(rev) for rev in heads], inclusive=True)
967 common_anc = cl.ancestors([cl.rev(rev) for rev in common], inclusive=True)
944 common_anc = cl.ancestors([cl.rev(rev) for rev in common], inclusive=True)
968 compformats = clientcompressionsupport(proto)
945 compformats = clientcompressionsupport(proto)
969 for entry in res:
946 for entry in res:
970 if 'COMPRESSION' in entry and entry['COMPRESSION'] not in compformats:
947 if 'COMPRESSION' in entry and entry['COMPRESSION'] not in compformats:
971 continue
948 continue
972 # No test yet for VERSION, since V2 is supported by any client
949 # No test yet for VERSION, since V2 is supported by any client
973 # that advertises partial pulls
950 # that advertises partial pulls
974 if 'heads' in entry:
951 if 'heads' in entry:
975 try:
952 try:
976 bundle_heads = decodehexstring(entry['heads'])
953 bundle_heads = decodehexstring(entry['heads'])
977 except TypeError:
954 except TypeError:
978 # Bad heads entry
955 # Bad heads entry
979 continue
956 continue
980 if bundle_heads.issubset(common):
957 if bundle_heads.issubset(common):
981 continue # Nothing new
958 continue # Nothing new
982 if all(cl.rev(rev) in common_anc for rev in bundle_heads):
959 if all(cl.rev(rev) in common_anc for rev in bundle_heads):
983 continue # Still nothing new
960 continue # Still nothing new
984 if any(cl.rev(rev) not in heads_anc and
961 if any(cl.rev(rev) not in heads_anc and
985 cl.rev(rev) not in common_anc for rev in bundle_heads):
962 cl.rev(rev) not in common_anc for rev in bundle_heads):
986 continue
963 continue
987 if 'bases' in entry:
964 if 'bases' in entry:
988 try:
965 try:
989 bundle_bases = decodehexstring(entry['bases'])
966 bundle_bases = decodehexstring(entry['bases'])
990 except TypeError:
967 except TypeError:
991 # Bad bases entry
968 # Bad bases entry
992 continue
969 continue
993 if not all(cl.rev(rev) in common_anc for rev in bundle_bases):
970 if not all(cl.rev(rev) in common_anc for rev in bundle_bases):
994 continue
971 continue
995 path = entry['URL']
972 path = entry['URL']
996 repo.ui.debug('sending pullbundle "%s"\n' % path)
973 repo.ui.debug('sending pullbundle "%s"\n' % path)
997 try:
974 try:
998 return repo.vfs.open(path)
975 return repo.vfs.open(path)
999 except IOError:
976 except IOError:
1000 repo.ui.debug('pullbundle "%s" not accessible\n' % path)
977 repo.ui.debug('pullbundle "%s" not accessible\n' % path)
1001 continue
978 continue
1002 return None
979 return None
1003
980
1004 @wireprotocommand('getbundle', '*', permission='pull',
981 @wireprotocommand('getbundle', '*', permission='pull',
1005 transportpolicy=POLICY_V1_ONLY)
982 transportpolicy=POLICY_V1_ONLY)
1006 def getbundle(repo, proto, others):
983 def getbundle(repo, proto, others):
1007 opts = options('getbundle', gboptsmap.keys(), others)
984 opts = options('getbundle', wireprototypes.GETBUNDLE_ARGUMENTS.keys(),
985 others)
1008 for k, v in opts.iteritems():
986 for k, v in opts.iteritems():
1009 keytype = gboptsmap[k]
987 keytype = wireprototypes.GETBUNDLE_ARGUMENTS[k]
1010 if keytype == 'nodes':
988 if keytype == 'nodes':
1011 opts[k] = wireprototypes.decodelist(v)
989 opts[k] = wireprototypes.decodelist(v)
1012 elif keytype == 'csv':
990 elif keytype == 'csv':
1013 opts[k] = list(v.split(','))
991 opts[k] = list(v.split(','))
1014 elif keytype == 'scsv':
992 elif keytype == 'scsv':
1015 opts[k] = set(v.split(','))
993 opts[k] = set(v.split(','))
1016 elif keytype == 'boolean':
994 elif keytype == 'boolean':
1017 # Client should serialize False as '0', which is a non-empty string
995 # Client should serialize False as '0', which is a non-empty string
1018 # so it evaluates as a True bool.
996 # so it evaluates as a True bool.
1019 if v == '0':
997 if v == '0':
1020 opts[k] = False
998 opts[k] = False
1021 else:
999 else:
1022 opts[k] = bool(v)
1000 opts[k] = bool(v)
1023 elif keytype != 'plain':
1001 elif keytype != 'plain':
1024 raise KeyError('unknown getbundle option type %s'
1002 raise KeyError('unknown getbundle option type %s'
1025 % keytype)
1003 % keytype)
1026
1004
1027 if not bundle1allowed(repo, 'pull'):
1005 if not bundle1allowed(repo, 'pull'):
1028 if not exchange.bundle2requested(opts.get('bundlecaps')):
1006 if not exchange.bundle2requested(opts.get('bundlecaps')):
1029 if proto.name == 'http-v1':
1007 if proto.name == 'http-v1':
1030 return wireprototypes.ooberror(bundle2required)
1008 return wireprototypes.ooberror(bundle2required)
1031 raise error.Abort(bundle2requiredmain,
1009 raise error.Abort(bundle2requiredmain,
1032 hint=bundle2requiredhint)
1010 hint=bundle2requiredhint)
1033
1011
1034 prefercompressed = True
1012 prefercompressed = True
1035
1013
1036 try:
1014 try:
1037 clheads = set(repo.changelog.heads())
1015 clheads = set(repo.changelog.heads())
1038 heads = set(opts.get('heads', set()))
1016 heads = set(opts.get('heads', set()))
1039 common = set(opts.get('common', set()))
1017 common = set(opts.get('common', set()))
1040 common.discard(nullid)
1018 common.discard(nullid)
1041 if (repo.ui.configbool('server', 'pullbundle') and
1019 if (repo.ui.configbool('server', 'pullbundle') and
1042 'partial-pull' in proto.getprotocaps()):
1020 'partial-pull' in proto.getprotocaps()):
1043 # Check if a pre-built bundle covers this request.
1021 # Check if a pre-built bundle covers this request.
1044 bundle = find_pullbundle(repo, proto, opts, clheads, heads, common)
1022 bundle = find_pullbundle(repo, proto, opts, clheads, heads, common)
1045 if bundle:
1023 if bundle:
1046 return wireprototypes.streamres(gen=util.filechunkiter(bundle),
1024 return wireprototypes.streamres(gen=util.filechunkiter(bundle),
1047 prefer_uncompressed=True)
1025 prefer_uncompressed=True)
1048
1026
1049 if repo.ui.configbool('server', 'disablefullbundle'):
1027 if repo.ui.configbool('server', 'disablefullbundle'):
1050 # Check to see if this is a full clone.
1028 # Check to see if this is a full clone.
1051 changegroup = opts.get('cg', True)
1029 changegroup = opts.get('cg', True)
1052 if changegroup and not common and clheads == heads:
1030 if changegroup and not common and clheads == heads:
1053 raise error.Abort(
1031 raise error.Abort(
1054 _('server has pull-based clones disabled'),
1032 _('server has pull-based clones disabled'),
1055 hint=_('remove --pull if specified or upgrade Mercurial'))
1033 hint=_('remove --pull if specified or upgrade Mercurial'))
1056
1034
1057 info, chunks = exchange.getbundlechunks(repo, 'serve',
1035 info, chunks = exchange.getbundlechunks(repo, 'serve',
1058 **pycompat.strkwargs(opts))
1036 **pycompat.strkwargs(opts))
1059 prefercompressed = info.get('prefercompressed', True)
1037 prefercompressed = info.get('prefercompressed', True)
1060 except error.Abort as exc:
1038 except error.Abort as exc:
1061 # cleanly forward Abort error to the client
1039 # cleanly forward Abort error to the client
1062 if not exchange.bundle2requested(opts.get('bundlecaps')):
1040 if not exchange.bundle2requested(opts.get('bundlecaps')):
1063 if proto.name == 'http-v1':
1041 if proto.name == 'http-v1':
1064 return wireprototypes.ooberror(pycompat.bytestr(exc) + '\n')
1042 return wireprototypes.ooberror(pycompat.bytestr(exc) + '\n')
1065 raise # cannot do better for bundle1 + ssh
1043 raise # cannot do better for bundle1 + ssh
1066 # bundle2 request expect a bundle2 reply
1044 # bundle2 request expect a bundle2 reply
1067 bundler = bundle2.bundle20(repo.ui)
1045 bundler = bundle2.bundle20(repo.ui)
1068 manargs = [('message', pycompat.bytestr(exc))]
1046 manargs = [('message', pycompat.bytestr(exc))]
1069 advargs = []
1047 advargs = []
1070 if exc.hint is not None:
1048 if exc.hint is not None:
1071 advargs.append(('hint', exc.hint))
1049 advargs.append(('hint', exc.hint))
1072 bundler.addpart(bundle2.bundlepart('error:abort',
1050 bundler.addpart(bundle2.bundlepart('error:abort',
1073 manargs, advargs))
1051 manargs, advargs))
1074 chunks = bundler.getchunks()
1052 chunks = bundler.getchunks()
1075 prefercompressed = False
1053 prefercompressed = False
1076
1054
1077 return wireprototypes.streamres(
1055 return wireprototypes.streamres(
1078 gen=chunks, prefer_uncompressed=not prefercompressed)
1056 gen=chunks, prefer_uncompressed=not prefercompressed)
1079
1057
1080 @wireprotocommand('heads', permission='pull', transportpolicy=POLICY_V1_ONLY)
1058 @wireprotocommand('heads', permission='pull', transportpolicy=POLICY_V1_ONLY)
1081 def heads(repo, proto):
1059 def heads(repo, proto):
1082 h = repo.heads()
1060 h = repo.heads()
1083 return wireprototypes.bytesresponse(wireprototypes.encodelist(h) + '\n')
1061 return wireprototypes.bytesresponse(wireprototypes.encodelist(h) + '\n')
1084
1062
1085 @wireprotocommand('hello', permission='pull', transportpolicy=POLICY_V1_ONLY)
1063 @wireprotocommand('hello', permission='pull', transportpolicy=POLICY_V1_ONLY)
1086 def hello(repo, proto):
1064 def hello(repo, proto):
1087 """Called as part of SSH handshake to obtain server info.
1065 """Called as part of SSH handshake to obtain server info.
1088
1066
1089 Returns a list of lines describing interesting things about the
1067 Returns a list of lines describing interesting things about the
1090 server, in an RFC822-like format.
1068 server, in an RFC822-like format.
1091
1069
1092 Currently, the only one defined is ``capabilities``, which consists of a
1070 Currently, the only one defined is ``capabilities``, which consists of a
1093 line of space separated tokens describing server abilities:
1071 line of space separated tokens describing server abilities:
1094
1072
1095 capabilities: <token0> <token1> <token2>
1073 capabilities: <token0> <token1> <token2>
1096 """
1074 """
1097 caps = capabilities(repo, proto).data
1075 caps = capabilities(repo, proto).data
1098 return wireprototypes.bytesresponse('capabilities: %s\n' % caps)
1076 return wireprototypes.bytesresponse('capabilities: %s\n' % caps)
1099
1077
1100 @wireprotocommand('listkeys', 'namespace', permission='pull',
1078 @wireprotocommand('listkeys', 'namespace', permission='pull',
1101 transportpolicy=POLICY_V1_ONLY)
1079 transportpolicy=POLICY_V1_ONLY)
1102 def listkeys(repo, proto, namespace):
1080 def listkeys(repo, proto, namespace):
1103 d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
1081 d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
1104 return wireprototypes.bytesresponse(pushkeymod.encodekeys(d))
1082 return wireprototypes.bytesresponse(pushkeymod.encodekeys(d))
1105
1083
1106 @wireprotocommand('lookup', 'key', permission='pull',
1084 @wireprotocommand('lookup', 'key', permission='pull',
1107 transportpolicy=POLICY_V1_ONLY)
1085 transportpolicy=POLICY_V1_ONLY)
1108 def lookup(repo, proto, key):
1086 def lookup(repo, proto, key):
1109 try:
1087 try:
1110 k = encoding.tolocal(key)
1088 k = encoding.tolocal(key)
1111 n = repo.lookup(k)
1089 n = repo.lookup(k)
1112 r = hex(n)
1090 r = hex(n)
1113 success = 1
1091 success = 1
1114 except Exception as inst:
1092 except Exception as inst:
1115 r = stringutil.forcebytestr(inst)
1093 r = stringutil.forcebytestr(inst)
1116 success = 0
1094 success = 0
1117 return wireprototypes.bytesresponse('%d %s\n' % (success, r))
1095 return wireprototypes.bytesresponse('%d %s\n' % (success, r))
1118
1096
1119 @wireprotocommand('known', 'nodes *', permission='pull',
1097 @wireprotocommand('known', 'nodes *', permission='pull',
1120 transportpolicy=POLICY_V1_ONLY)
1098 transportpolicy=POLICY_V1_ONLY)
1121 def known(repo, proto, nodes, others):
1099 def known(repo, proto, nodes, others):
1122 v = ''.join(b and '1' or '0'
1100 v = ''.join(b and '1' or '0'
1123 for b in repo.known(wireprototypes.decodelist(nodes)))
1101 for b in repo.known(wireprototypes.decodelist(nodes)))
1124 return wireprototypes.bytesresponse(v)
1102 return wireprototypes.bytesresponse(v)
1125
1103
1126 @wireprotocommand('protocaps', 'caps', permission='pull',
1104 @wireprotocommand('protocaps', 'caps', permission='pull',
1127 transportpolicy=POLICY_V1_ONLY)
1105 transportpolicy=POLICY_V1_ONLY)
1128 def protocaps(repo, proto, caps):
1106 def protocaps(repo, proto, caps):
1129 if proto.name == wireprototypes.SSHV1:
1107 if proto.name == wireprototypes.SSHV1:
1130 proto._protocaps = set(caps.split(' '))
1108 proto._protocaps = set(caps.split(' '))
1131 return wireprototypes.bytesresponse('OK')
1109 return wireprototypes.bytesresponse('OK')
1132
1110
1133 @wireprotocommand('pushkey', 'namespace key old new', permission='push',
1111 @wireprotocommand('pushkey', 'namespace key old new', permission='push',
1134 transportpolicy=POLICY_V1_ONLY)
1112 transportpolicy=POLICY_V1_ONLY)
1135 def pushkey(repo, proto, namespace, key, old, new):
1113 def pushkey(repo, proto, namespace, key, old, new):
1136 # compatibility with pre-1.8 clients which were accidentally
1114 # compatibility with pre-1.8 clients which were accidentally
1137 # sending raw binary nodes rather than utf-8-encoded hex
1115 # sending raw binary nodes rather than utf-8-encoded hex
1138 if len(new) == 20 and stringutil.escapestr(new) != new:
1116 if len(new) == 20 and stringutil.escapestr(new) != new:
1139 # looks like it could be a binary node
1117 # looks like it could be a binary node
1140 try:
1118 try:
1141 new.decode('utf-8')
1119 new.decode('utf-8')
1142 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
1120 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
1143 except UnicodeDecodeError:
1121 except UnicodeDecodeError:
1144 pass # binary, leave unmodified
1122 pass # binary, leave unmodified
1145 else:
1123 else:
1146 new = encoding.tolocal(new) # normal path
1124 new = encoding.tolocal(new) # normal path
1147
1125
1148 with proto.mayberedirectstdio() as output:
1126 with proto.mayberedirectstdio() as output:
1149 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
1127 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
1150 encoding.tolocal(old), new) or False
1128 encoding.tolocal(old), new) or False
1151
1129
1152 output = output.getvalue() if output else ''
1130 output = output.getvalue() if output else ''
1153 return wireprototypes.bytesresponse('%d\n%s' % (int(r), output))
1131 return wireprototypes.bytesresponse('%d\n%s' % (int(r), output))
1154
1132
1155 @wireprotocommand('stream_out', permission='pull',
1133 @wireprotocommand('stream_out', permission='pull',
1156 transportpolicy=POLICY_V1_ONLY)
1134 transportpolicy=POLICY_V1_ONLY)
1157 def stream(repo, proto):
1135 def stream(repo, proto):
1158 '''If the server supports streaming clone, it advertises the "stream"
1136 '''If the server supports streaming clone, it advertises the "stream"
1159 capability with a value representing the version and flags of the repo
1137 capability with a value representing the version and flags of the repo
1160 it is serving. Client checks to see if it understands the format.
1138 it is serving. Client checks to see if it understands the format.
1161 '''
1139 '''
1162 return wireprototypes.streamreslegacy(
1140 return wireprototypes.streamreslegacy(
1163 streamclone.generatev1wireproto(repo))
1141 streamclone.generatev1wireproto(repo))
1164
1142
1165 @wireprotocommand('unbundle', 'heads', permission='push',
1143 @wireprotocommand('unbundle', 'heads', permission='push',
1166 transportpolicy=POLICY_V1_ONLY)
1144 transportpolicy=POLICY_V1_ONLY)
1167 def unbundle(repo, proto, heads):
1145 def unbundle(repo, proto, heads):
1168 their_heads = wireprototypes.decodelist(heads)
1146 their_heads = wireprototypes.decodelist(heads)
1169
1147
1170 with proto.mayberedirectstdio() as output:
1148 with proto.mayberedirectstdio() as output:
1171 try:
1149 try:
1172 exchange.check_heads(repo, their_heads, 'preparing changes')
1150 exchange.check_heads(repo, their_heads, 'preparing changes')
1173 cleanup = lambda: None
1151 cleanup = lambda: None
1174 try:
1152 try:
1175 payload = proto.getpayload()
1153 payload = proto.getpayload()
1176 if repo.ui.configbool('server', 'streamunbundle'):
1154 if repo.ui.configbool('server', 'streamunbundle'):
1177 def cleanup():
1155 def cleanup():
1178 # Ensure that the full payload is consumed, so
1156 # Ensure that the full payload is consumed, so
1179 # that the connection doesn't contain trailing garbage.
1157 # that the connection doesn't contain trailing garbage.
1180 for p in payload:
1158 for p in payload:
1181 pass
1159 pass
1182 fp = util.chunkbuffer(payload)
1160 fp = util.chunkbuffer(payload)
1183 else:
1161 else:
1184 # write bundle data to temporary file as it can be big
1162 # write bundle data to temporary file as it can be big
1185 fp, tempname = None, None
1163 fp, tempname = None, None
1186 def cleanup():
1164 def cleanup():
1187 if fp:
1165 if fp:
1188 fp.close()
1166 fp.close()
1189 if tempname:
1167 if tempname:
1190 os.unlink(tempname)
1168 os.unlink(tempname)
1191 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1169 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1192 repo.ui.debug('redirecting incoming bundle to %s\n' %
1170 repo.ui.debug('redirecting incoming bundle to %s\n' %
1193 tempname)
1171 tempname)
1194 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
1172 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
1195 r = 0
1173 r = 0
1196 for p in payload:
1174 for p in payload:
1197 fp.write(p)
1175 fp.write(p)
1198 fp.seek(0)
1176 fp.seek(0)
1199
1177
1200 gen = exchange.readbundle(repo.ui, fp, None)
1178 gen = exchange.readbundle(repo.ui, fp, None)
1201 if (isinstance(gen, changegroupmod.cg1unpacker)
1179 if (isinstance(gen, changegroupmod.cg1unpacker)
1202 and not bundle1allowed(repo, 'push')):
1180 and not bundle1allowed(repo, 'push')):
1203 if proto.name == 'http-v1':
1181 if proto.name == 'http-v1':
1204 # need to special case http because stderr do not get to
1182 # need to special case http because stderr do not get to
1205 # the http client on failed push so we need to abuse
1183 # the http client on failed push so we need to abuse
1206 # some other error type to make sure the message get to
1184 # some other error type to make sure the message get to
1207 # the user.
1185 # the user.
1208 return wireprototypes.ooberror(bundle2required)
1186 return wireprototypes.ooberror(bundle2required)
1209 raise error.Abort(bundle2requiredmain,
1187 raise error.Abort(bundle2requiredmain,
1210 hint=bundle2requiredhint)
1188 hint=bundle2requiredhint)
1211
1189
1212 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1190 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1213 proto.client())
1191 proto.client())
1214 if util.safehasattr(r, 'addpart'):
1192 if util.safehasattr(r, 'addpart'):
1215 # The return looks streamable, we are in the bundle2 case
1193 # The return looks streamable, we are in the bundle2 case
1216 # and should return a stream.
1194 # and should return a stream.
1217 return wireprototypes.streamreslegacy(gen=r.getchunks())
1195 return wireprototypes.streamreslegacy(gen=r.getchunks())
1218 return wireprototypes.pushres(
1196 return wireprototypes.pushres(
1219 r, output.getvalue() if output else '')
1197 r, output.getvalue() if output else '')
1220
1198
1221 finally:
1199 finally:
1222 cleanup()
1200 cleanup()
1223
1201
1224 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1202 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1225 # handle non-bundle2 case first
1203 # handle non-bundle2 case first
1226 if not getattr(exc, 'duringunbundle2', False):
1204 if not getattr(exc, 'duringunbundle2', False):
1227 try:
1205 try:
1228 raise
1206 raise
1229 except error.Abort:
1207 except error.Abort:
1230 # The old code we moved used procutil.stderr directly.
1208 # The old code we moved used procutil.stderr directly.
1231 # We did not change it to minimise code change.
1209 # We did not change it to minimise code change.
1232 # This need to be moved to something proper.
1210 # This need to be moved to something proper.
1233 # Feel free to do it.
1211 # Feel free to do it.
1234 procutil.stderr.write("abort: %s\n" % exc)
1212 procutil.stderr.write("abort: %s\n" % exc)
1235 if exc.hint is not None:
1213 if exc.hint is not None:
1236 procutil.stderr.write("(%s)\n" % exc.hint)
1214 procutil.stderr.write("(%s)\n" % exc.hint)
1237 procutil.stderr.flush()
1215 procutil.stderr.flush()
1238 return wireprototypes.pushres(
1216 return wireprototypes.pushres(
1239 0, output.getvalue() if output else '')
1217 0, output.getvalue() if output else '')
1240 except error.PushRaced:
1218 except error.PushRaced:
1241 return wireprototypes.pusherr(
1219 return wireprototypes.pusherr(
1242 pycompat.bytestr(exc),
1220 pycompat.bytestr(exc),
1243 output.getvalue() if output else '')
1221 output.getvalue() if output else '')
1244
1222
1245 bundler = bundle2.bundle20(repo.ui)
1223 bundler = bundle2.bundle20(repo.ui)
1246 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1224 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1247 bundler.addpart(out)
1225 bundler.addpart(out)
1248 try:
1226 try:
1249 try:
1227 try:
1250 raise
1228 raise
1251 except error.PushkeyFailed as exc:
1229 except error.PushkeyFailed as exc:
1252 # check client caps
1230 # check client caps
1253 remotecaps = getattr(exc, '_replycaps', None)
1231 remotecaps = getattr(exc, '_replycaps', None)
1254 if (remotecaps is not None
1232 if (remotecaps is not None
1255 and 'pushkey' not in remotecaps.get('error', ())):
1233 and 'pushkey' not in remotecaps.get('error', ())):
1256 # no support remote side, fallback to Abort handler.
1234 # no support remote side, fallback to Abort handler.
1257 raise
1235 raise
1258 part = bundler.newpart('error:pushkey')
1236 part = bundler.newpart('error:pushkey')
1259 part.addparam('in-reply-to', exc.partid)
1237 part.addparam('in-reply-to', exc.partid)
1260 if exc.namespace is not None:
1238 if exc.namespace is not None:
1261 part.addparam('namespace', exc.namespace,
1239 part.addparam('namespace', exc.namespace,
1262 mandatory=False)
1240 mandatory=False)
1263 if exc.key is not None:
1241 if exc.key is not None:
1264 part.addparam('key', exc.key, mandatory=False)
1242 part.addparam('key', exc.key, mandatory=False)
1265 if exc.new is not None:
1243 if exc.new is not None:
1266 part.addparam('new', exc.new, mandatory=False)
1244 part.addparam('new', exc.new, mandatory=False)
1267 if exc.old is not None:
1245 if exc.old is not None:
1268 part.addparam('old', exc.old, mandatory=False)
1246 part.addparam('old', exc.old, mandatory=False)
1269 if exc.ret is not None:
1247 if exc.ret is not None:
1270 part.addparam('ret', exc.ret, mandatory=False)
1248 part.addparam('ret', exc.ret, mandatory=False)
1271 except error.BundleValueError as exc:
1249 except error.BundleValueError as exc:
1272 errpart = bundler.newpart('error:unsupportedcontent')
1250 errpart = bundler.newpart('error:unsupportedcontent')
1273 if exc.parttype is not None:
1251 if exc.parttype is not None:
1274 errpart.addparam('parttype', exc.parttype)
1252 errpart.addparam('parttype', exc.parttype)
1275 if exc.params:
1253 if exc.params:
1276 errpart.addparam('params', '\0'.join(exc.params))
1254 errpart.addparam('params', '\0'.join(exc.params))
1277 except error.Abort as exc:
1255 except error.Abort as exc:
1278 manargs = [('message', stringutil.forcebytestr(exc))]
1256 manargs = [('message', stringutil.forcebytestr(exc))]
1279 advargs = []
1257 advargs = []
1280 if exc.hint is not None:
1258 if exc.hint is not None:
1281 advargs.append(('hint', exc.hint))
1259 advargs.append(('hint', exc.hint))
1282 bundler.addpart(bundle2.bundlepart('error:abort',
1260 bundler.addpart(bundle2.bundlepart('error:abort',
1283 manargs, advargs))
1261 manargs, advargs))
1284 except error.PushRaced as exc:
1262 except error.PushRaced as exc:
1285 bundler.newpart('error:pushraced',
1263 bundler.newpart('error:pushraced',
1286 [('message', stringutil.forcebytestr(exc))])
1264 [('message', stringutil.forcebytestr(exc))])
1287 return wireprototypes.streamreslegacy(gen=bundler.getchunks())
1265 return wireprototypes.streamreslegacy(gen=bundler.getchunks())
@@ -1,203 +1,227 b''
1 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
1 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
2 #
2 #
3 # This software may be used and distributed according to the terms of the
3 # This software may be used and distributed according to the terms of the
4 # GNU General Public License version 2 or any later version.
4 # GNU General Public License version 2 or any later version.
5
5
6 from __future__ import absolute_import
6 from __future__ import absolute_import
7
7
8 from .node import (
8 from .node import (
9 bin,
9 bin,
10 hex,
10 hex,
11 )
11 )
12 from .thirdparty.zope import (
12 from .thirdparty.zope import (
13 interface as zi,
13 interface as zi,
14 )
14 )
15
15
16 # Names of the SSH protocol implementations.
16 # Names of the SSH protocol implementations.
17 SSHV1 = 'ssh-v1'
17 SSHV1 = 'ssh-v1'
18 # These are advertised over the wire. Increment the counters at the end
18 # These are advertised over the wire. Increment the counters at the end
19 # to reflect BC breakages.
19 # to reflect BC breakages.
20 SSHV2 = 'exp-ssh-v2-0001'
20 SSHV2 = 'exp-ssh-v2-0001'
21 HTTPV2 = 'exp-http-v2-0001'
21 HTTPV2 = 'exp-http-v2-0001'
22
22
23 # All available wire protocol transports.
23 # All available wire protocol transports.
24 TRANSPORTS = {
24 TRANSPORTS = {
25 SSHV1: {
25 SSHV1: {
26 'transport': 'ssh',
26 'transport': 'ssh',
27 'version': 1,
27 'version': 1,
28 },
28 },
29 SSHV2: {
29 SSHV2: {
30 'transport': 'ssh',
30 'transport': 'ssh',
31 # TODO mark as version 2 once all commands are implemented.
31 # TODO mark as version 2 once all commands are implemented.
32 'version': 1,
32 'version': 1,
33 },
33 },
34 'http-v1': {
34 'http-v1': {
35 'transport': 'http',
35 'transport': 'http',
36 'version': 1,
36 'version': 1,
37 },
37 },
38 HTTPV2: {
38 HTTPV2: {
39 'transport': 'http',
39 'transport': 'http',
40 'version': 2,
40 'version': 2,
41 }
41 }
42 }
42 }
43
43
44 class bytesresponse(object):
44 class bytesresponse(object):
45 """A wire protocol response consisting of raw bytes."""
45 """A wire protocol response consisting of raw bytes."""
46 def __init__(self, data):
46 def __init__(self, data):
47 self.data = data
47 self.data = data
48
48
49 class ooberror(object):
49 class ooberror(object):
50 """wireproto reply: failure of a batch of operation
50 """wireproto reply: failure of a batch of operation
51
51
52 Something failed during a batch call. The error message is stored in
52 Something failed during a batch call. The error message is stored in
53 `self.message`.
53 `self.message`.
54 """
54 """
55 def __init__(self, message):
55 def __init__(self, message):
56 self.message = message
56 self.message = message
57
57
58 class pushres(object):
58 class pushres(object):
59 """wireproto reply: success with simple integer return
59 """wireproto reply: success with simple integer return
60
60
61 The call was successful and returned an integer contained in `self.res`.
61 The call was successful and returned an integer contained in `self.res`.
62 """
62 """
63 def __init__(self, res, output):
63 def __init__(self, res, output):
64 self.res = res
64 self.res = res
65 self.output = output
65 self.output = output
66
66
67 class pusherr(object):
67 class pusherr(object):
68 """wireproto reply: failure
68 """wireproto reply: failure
69
69
70 The call failed. The `self.res` attribute contains the error message.
70 The call failed. The `self.res` attribute contains the error message.
71 """
71 """
72 def __init__(self, res, output):
72 def __init__(self, res, output):
73 self.res = res
73 self.res = res
74 self.output = output
74 self.output = output
75
75
76 class streamres(object):
76 class streamres(object):
77 """wireproto reply: binary stream
77 """wireproto reply: binary stream
78
78
79 The call was successful and the result is a stream.
79 The call was successful and the result is a stream.
80
80
81 Accepts a generator containing chunks of data to be sent to the client.
81 Accepts a generator containing chunks of data to be sent to the client.
82
82
83 ``prefer_uncompressed`` indicates that the data is expected to be
83 ``prefer_uncompressed`` indicates that the data is expected to be
84 uncompressable and that the stream should therefore use the ``none``
84 uncompressable and that the stream should therefore use the ``none``
85 engine.
85 engine.
86 """
86 """
87 def __init__(self, gen=None, prefer_uncompressed=False):
87 def __init__(self, gen=None, prefer_uncompressed=False):
88 self.gen = gen
88 self.gen = gen
89 self.prefer_uncompressed = prefer_uncompressed
89 self.prefer_uncompressed = prefer_uncompressed
90
90
91 class streamreslegacy(object):
91 class streamreslegacy(object):
92 """wireproto reply: uncompressed binary stream
92 """wireproto reply: uncompressed binary stream
93
93
94 The call was successful and the result is a stream.
94 The call was successful and the result is a stream.
95
95
96 Accepts a generator containing chunks of data to be sent to the client.
96 Accepts a generator containing chunks of data to be sent to the client.
97
97
98 Like ``streamres``, but sends an uncompressed data for "version 1" clients
98 Like ``streamres``, but sends an uncompressed data for "version 1" clients
99 using the application/mercurial-0.1 media type.
99 using the application/mercurial-0.1 media type.
100 """
100 """
101 def __init__(self, gen=None):
101 def __init__(self, gen=None):
102 self.gen = gen
102 self.gen = gen
103
103
104 class cborresponse(object):
104 class cborresponse(object):
105 """Encode the response value as CBOR."""
105 """Encode the response value as CBOR."""
106 def __init__(self, v):
106 def __init__(self, v):
107 self.value = v
107 self.value = v
108
108
109 # list of nodes encoding / decoding
109 # list of nodes encoding / decoding
110 def decodelist(l, sep=' '):
110 def decodelist(l, sep=' '):
111 if l:
111 if l:
112 return [bin(v) for v in l.split(sep)]
112 return [bin(v) for v in l.split(sep)]
113 return []
113 return []
114
114
115 def encodelist(l, sep=' '):
115 def encodelist(l, sep=' '):
116 try:
116 try:
117 return sep.join(map(hex, l))
117 return sep.join(map(hex, l))
118 except TypeError:
118 except TypeError:
119 raise
119 raise
120
120
121 # batched call argument encoding
121 # batched call argument encoding
122
122
123 def escapebatcharg(plain):
123 def escapebatcharg(plain):
124 return (plain
124 return (plain
125 .replace(':', ':c')
125 .replace(':', ':c')
126 .replace(',', ':o')
126 .replace(',', ':o')
127 .replace(';', ':s')
127 .replace(';', ':s')
128 .replace('=', ':e'))
128 .replace('=', ':e'))
129
129
130 def unescapebatcharg(escaped):
130 def unescapebatcharg(escaped):
131 return (escaped
131 return (escaped
132 .replace(':e', '=')
132 .replace(':e', '=')
133 .replace(':s', ';')
133 .replace(':s', ';')
134 .replace(':o', ',')
134 .replace(':o', ',')
135 .replace(':c', ':'))
135 .replace(':c', ':'))
136
136
137 # mapping of options accepted by getbundle and their types
138 #
139 # Meant to be extended by extensions. It is extensions responsibility to ensure
140 # such options are properly processed in exchange.getbundle.
141 #
142 # supported types are:
143 #
144 # :nodes: list of binary nodes
145 # :csv: list of comma-separated values
146 # :scsv: list of comma-separated values return as set
147 # :plain: string with no transformation needed.
148 GETBUNDLE_ARGUMENTS = {
149 'heads': 'nodes',
150 'bookmarks': 'boolean',
151 'common': 'nodes',
152 'obsmarkers': 'boolean',
153 'phases': 'boolean',
154 'bundlecaps': 'scsv',
155 'listkeys': 'csv',
156 'cg': 'boolean',
157 'cbattempted': 'boolean',
158 'stream': 'boolean',
159 }
160
137 class baseprotocolhandler(zi.Interface):
161 class baseprotocolhandler(zi.Interface):
138 """Abstract base class for wire protocol handlers.
162 """Abstract base class for wire protocol handlers.
139
163
140 A wire protocol handler serves as an interface between protocol command
164 A wire protocol handler serves as an interface between protocol command
141 handlers and the wire protocol transport layer. Protocol handlers provide
165 handlers and the wire protocol transport layer. Protocol handlers provide
142 methods to read command arguments, redirect stdio for the duration of
166 methods to read command arguments, redirect stdio for the duration of
143 the request, handle response types, etc.
167 the request, handle response types, etc.
144 """
168 """
145
169
146 name = zi.Attribute(
170 name = zi.Attribute(
147 """The name of the protocol implementation.
171 """The name of the protocol implementation.
148
172
149 Used for uniquely identifying the transport type.
173 Used for uniquely identifying the transport type.
150 """)
174 """)
151
175
152 def getargs(args):
176 def getargs(args):
153 """return the value for arguments in <args>
177 """return the value for arguments in <args>
154
178
155 For version 1 transports, returns a list of values in the same
179 For version 1 transports, returns a list of values in the same
156 order they appear in ``args``. For version 2 transports, returns
180 order they appear in ``args``. For version 2 transports, returns
157 a dict mapping argument name to value.
181 a dict mapping argument name to value.
158 """
182 """
159
183
160 def getprotocaps():
184 def getprotocaps():
161 """Returns the list of protocol-level capabilities of client
185 """Returns the list of protocol-level capabilities of client
162
186
163 Returns a list of capabilities as declared by the client for
187 Returns a list of capabilities as declared by the client for
164 the current request (or connection for stateful protocol handlers)."""
188 the current request (or connection for stateful protocol handlers)."""
165
189
166 def getpayload():
190 def getpayload():
167 """Provide a generator for the raw payload.
191 """Provide a generator for the raw payload.
168
192
169 The caller is responsible for ensuring that the full payload is
193 The caller is responsible for ensuring that the full payload is
170 processed.
194 processed.
171 """
195 """
172
196
173 def mayberedirectstdio():
197 def mayberedirectstdio():
174 """Context manager to possibly redirect stdio.
198 """Context manager to possibly redirect stdio.
175
199
176 The context manager yields a file-object like object that receives
200 The context manager yields a file-object like object that receives
177 stdout and stderr output when the context manager is active. Or it
201 stdout and stderr output when the context manager is active. Or it
178 yields ``None`` if no I/O redirection occurs.
202 yields ``None`` if no I/O redirection occurs.
179
203
180 The intent of this context manager is to capture stdio output
204 The intent of this context manager is to capture stdio output
181 so it may be sent in the response. Some transports support streaming
205 so it may be sent in the response. Some transports support streaming
182 stdio to the client in real time. For these transports, stdio output
206 stdio to the client in real time. For these transports, stdio output
183 won't be captured.
207 won't be captured.
184 """
208 """
185
209
186 def client():
210 def client():
187 """Returns a string representation of this client (as bytes)."""
211 """Returns a string representation of this client (as bytes)."""
188
212
189 def addcapabilities(repo, caps):
213 def addcapabilities(repo, caps):
190 """Adds advertised capabilities specific to this protocol.
214 """Adds advertised capabilities specific to this protocol.
191
215
192 Receives the list of capabilities collected so far.
216 Receives the list of capabilities collected so far.
193
217
194 Returns a list of capabilities. The passed in argument can be returned.
218 Returns a list of capabilities. The passed in argument can be returned.
195 """
219 """
196
220
197 def checkperm(perm):
221 def checkperm(perm):
198 """Validate that the client has permissions to perform a request.
222 """Validate that the client has permissions to perform a request.
199
223
200 The argument is the permission required to proceed. If the client
224 The argument is the permission required to proceed. If the client
201 doesn't have that permission, the exception should raise or abort
225 doesn't have that permission, the exception should raise or abort
202 in a protocol specific manner.
226 in a protocol specific manner.
203 """
227 """
General Comments 0
You need to be logged in to leave comments. Login now