##// END OF EJS Templates
wireproto: separate commands tables for version 1 and 2 commands...
Gregory Szorc -
r37311:45b39c69 default
parent child Browse files
Show More
@@ -1,202 +1,203 b''
1 # Copyright 2009-2010 Gregory P. Ward
1 # Copyright 2009-2010 Gregory P. Ward
2 # Copyright 2009-2010 Intelerad Medical Systems Incorporated
2 # Copyright 2009-2010 Intelerad Medical Systems Incorporated
3 # Copyright 2010-2011 Fog Creek Software
3 # Copyright 2010-2011 Fog Creek Software
4 # Copyright 2010-2011 Unity Technologies
4 # Copyright 2010-2011 Unity Technologies
5 #
5 #
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8
8
9 '''setup for largefiles extension: uisetup'''
9 '''setup for largefiles extension: uisetup'''
10 from __future__ import absolute_import
10 from __future__ import absolute_import
11
11
12 from mercurial.i18n import _
12 from mercurial.i18n import _
13
13
14 from mercurial.hgweb import (
14 from mercurial.hgweb import (
15 webcommands,
15 webcommands,
16 )
16 )
17
17
18 from mercurial import (
18 from mercurial import (
19 archival,
19 archival,
20 cmdutil,
20 cmdutil,
21 commands,
21 commands,
22 copies,
22 copies,
23 exchange,
23 exchange,
24 extensions,
24 extensions,
25 filemerge,
25 filemerge,
26 hg,
26 hg,
27 httppeer,
27 httppeer,
28 merge,
28 merge,
29 scmutil,
29 scmutil,
30 sshpeer,
30 sshpeer,
31 subrepo,
31 subrepo,
32 upgrade,
32 upgrade,
33 url,
33 url,
34 wireproto,
34 wireproto,
35 )
35 )
36
36
37 from . import (
37 from . import (
38 overrides,
38 overrides,
39 proto,
39 proto,
40 )
40 )
41
41
42 def uisetup(ui):
42 def uisetup(ui):
43 # Disable auto-status for some commands which assume that all
43 # Disable auto-status for some commands which assume that all
44 # files in the result are under Mercurial's control
44 # files in the result are under Mercurial's control
45
45
46 entry = extensions.wrapcommand(commands.table, 'add',
46 entry = extensions.wrapcommand(commands.table, 'add',
47 overrides.overrideadd)
47 overrides.overrideadd)
48 addopt = [('', 'large', None, _('add as largefile')),
48 addopt = [('', 'large', None, _('add as largefile')),
49 ('', 'normal', None, _('add as normal file')),
49 ('', 'normal', None, _('add as normal file')),
50 ('', 'lfsize', '', _('add all files above this size '
50 ('', 'lfsize', '', _('add all files above this size '
51 '(in megabytes) as largefiles '
51 '(in megabytes) as largefiles '
52 '(default: 10)'))]
52 '(default: 10)'))]
53 entry[1].extend(addopt)
53 entry[1].extend(addopt)
54
54
55 # The scmutil function is called both by the (trivial) addremove command,
55 # The scmutil function is called both by the (trivial) addremove command,
56 # and in the process of handling commit -A (issue3542)
56 # and in the process of handling commit -A (issue3542)
57 extensions.wrapfunction(scmutil, 'addremove', overrides.scmutiladdremove)
57 extensions.wrapfunction(scmutil, 'addremove', overrides.scmutiladdremove)
58 extensions.wrapfunction(cmdutil, 'add', overrides.cmdutiladd)
58 extensions.wrapfunction(cmdutil, 'add', overrides.cmdutiladd)
59 extensions.wrapfunction(cmdutil, 'remove', overrides.cmdutilremove)
59 extensions.wrapfunction(cmdutil, 'remove', overrides.cmdutilremove)
60 extensions.wrapfunction(cmdutil, 'forget', overrides.cmdutilforget)
60 extensions.wrapfunction(cmdutil, 'forget', overrides.cmdutilforget)
61
61
62 extensions.wrapfunction(copies, 'pathcopies', overrides.copiespathcopies)
62 extensions.wrapfunction(copies, 'pathcopies', overrides.copiespathcopies)
63
63
64 extensions.wrapfunction(upgrade, 'preservedrequirements',
64 extensions.wrapfunction(upgrade, 'preservedrequirements',
65 overrides.upgraderequirements)
65 overrides.upgraderequirements)
66
66
67 extensions.wrapfunction(upgrade, 'supporteddestrequirements',
67 extensions.wrapfunction(upgrade, 'supporteddestrequirements',
68 overrides.upgraderequirements)
68 overrides.upgraderequirements)
69
69
70 # Subrepos call status function
70 # Subrepos call status function
71 entry = extensions.wrapcommand(commands.table, 'status',
71 entry = extensions.wrapcommand(commands.table, 'status',
72 overrides.overridestatus)
72 overrides.overridestatus)
73 extensions.wrapfunction(subrepo.hgsubrepo, 'status',
73 extensions.wrapfunction(subrepo.hgsubrepo, 'status',
74 overrides.overridestatusfn)
74 overrides.overridestatusfn)
75
75
76 entry = extensions.wrapcommand(commands.table, 'log',
76 entry = extensions.wrapcommand(commands.table, 'log',
77 overrides.overridelog)
77 overrides.overridelog)
78 entry = extensions.wrapcommand(commands.table, 'rollback',
78 entry = extensions.wrapcommand(commands.table, 'rollback',
79 overrides.overriderollback)
79 overrides.overriderollback)
80 entry = extensions.wrapcommand(commands.table, 'verify',
80 entry = extensions.wrapcommand(commands.table, 'verify',
81 overrides.overrideverify)
81 overrides.overrideverify)
82
82
83 verifyopt = [('', 'large', None,
83 verifyopt = [('', 'large', None,
84 _('verify that all largefiles in current revision exists')),
84 _('verify that all largefiles in current revision exists')),
85 ('', 'lfa', None,
85 ('', 'lfa', None,
86 _('verify largefiles in all revisions, not just current')),
86 _('verify largefiles in all revisions, not just current')),
87 ('', 'lfc', None,
87 ('', 'lfc', None,
88 _('verify local largefile contents, not just existence'))]
88 _('verify local largefile contents, not just existence'))]
89 entry[1].extend(verifyopt)
89 entry[1].extend(verifyopt)
90
90
91 entry = extensions.wrapcommand(commands.table, 'debugstate',
91 entry = extensions.wrapcommand(commands.table, 'debugstate',
92 overrides.overridedebugstate)
92 overrides.overridedebugstate)
93 debugstateopt = [('', 'large', None, _('display largefiles dirstate'))]
93 debugstateopt = [('', 'large', None, _('display largefiles dirstate'))]
94 entry[1].extend(debugstateopt)
94 entry[1].extend(debugstateopt)
95
95
96 outgoing = lambda orgfunc, *arg, **kwargs: orgfunc(*arg, **kwargs)
96 outgoing = lambda orgfunc, *arg, **kwargs: orgfunc(*arg, **kwargs)
97 entry = extensions.wrapcommand(commands.table, 'outgoing', outgoing)
97 entry = extensions.wrapcommand(commands.table, 'outgoing', outgoing)
98 outgoingopt = [('', 'large', None, _('display outgoing largefiles'))]
98 outgoingopt = [('', 'large', None, _('display outgoing largefiles'))]
99 entry[1].extend(outgoingopt)
99 entry[1].extend(outgoingopt)
100 cmdutil.outgoinghooks.add('largefiles', overrides.outgoinghook)
100 cmdutil.outgoinghooks.add('largefiles', overrides.outgoinghook)
101 entry = extensions.wrapcommand(commands.table, 'summary',
101 entry = extensions.wrapcommand(commands.table, 'summary',
102 overrides.overridesummary)
102 overrides.overridesummary)
103 summaryopt = [('', 'large', None, _('display outgoing largefiles'))]
103 summaryopt = [('', 'large', None, _('display outgoing largefiles'))]
104 entry[1].extend(summaryopt)
104 entry[1].extend(summaryopt)
105 cmdutil.summaryremotehooks.add('largefiles', overrides.summaryremotehook)
105 cmdutil.summaryremotehooks.add('largefiles', overrides.summaryremotehook)
106
106
107 entry = extensions.wrapcommand(commands.table, 'pull',
107 entry = extensions.wrapcommand(commands.table, 'pull',
108 overrides.overridepull)
108 overrides.overridepull)
109 pullopt = [('', 'all-largefiles', None,
109 pullopt = [('', 'all-largefiles', None,
110 _('download all pulled versions of largefiles (DEPRECATED)')),
110 _('download all pulled versions of largefiles (DEPRECATED)')),
111 ('', 'lfrev', [],
111 ('', 'lfrev', [],
112 _('download largefiles for these revisions'), _('REV'))]
112 _('download largefiles for these revisions'), _('REV'))]
113 entry[1].extend(pullopt)
113 entry[1].extend(pullopt)
114
114
115 entry = extensions.wrapcommand(commands.table, 'push',
115 entry = extensions.wrapcommand(commands.table, 'push',
116 overrides.overridepush)
116 overrides.overridepush)
117 pushopt = [('', 'lfrev', [],
117 pushopt = [('', 'lfrev', [],
118 _('upload largefiles for these revisions'), _('REV'))]
118 _('upload largefiles for these revisions'), _('REV'))]
119 entry[1].extend(pushopt)
119 entry[1].extend(pushopt)
120 extensions.wrapfunction(exchange, 'pushoperation',
120 extensions.wrapfunction(exchange, 'pushoperation',
121 overrides.exchangepushoperation)
121 overrides.exchangepushoperation)
122
122
123 entry = extensions.wrapcommand(commands.table, 'clone',
123 entry = extensions.wrapcommand(commands.table, 'clone',
124 overrides.overrideclone)
124 overrides.overrideclone)
125 cloneopt = [('', 'all-largefiles', None,
125 cloneopt = [('', 'all-largefiles', None,
126 _('download all versions of all largefiles'))]
126 _('download all versions of all largefiles'))]
127 entry[1].extend(cloneopt)
127 entry[1].extend(cloneopt)
128 extensions.wrapfunction(hg, 'clone', overrides.hgclone)
128 extensions.wrapfunction(hg, 'clone', overrides.hgclone)
129 extensions.wrapfunction(hg, 'postshare', overrides.hgpostshare)
129 extensions.wrapfunction(hg, 'postshare', overrides.hgpostshare)
130
130
131 entry = extensions.wrapcommand(commands.table, 'cat',
131 entry = extensions.wrapcommand(commands.table, 'cat',
132 overrides.overridecat)
132 overrides.overridecat)
133 extensions.wrapfunction(merge, '_checkunknownfile',
133 extensions.wrapfunction(merge, '_checkunknownfile',
134 overrides.overridecheckunknownfile)
134 overrides.overridecheckunknownfile)
135 extensions.wrapfunction(merge, 'calculateupdates',
135 extensions.wrapfunction(merge, 'calculateupdates',
136 overrides.overridecalculateupdates)
136 overrides.overridecalculateupdates)
137 extensions.wrapfunction(merge, 'recordupdates',
137 extensions.wrapfunction(merge, 'recordupdates',
138 overrides.mergerecordupdates)
138 overrides.mergerecordupdates)
139 extensions.wrapfunction(merge, 'update', overrides.mergeupdate)
139 extensions.wrapfunction(merge, 'update', overrides.mergeupdate)
140 extensions.wrapfunction(filemerge, '_filemerge',
140 extensions.wrapfunction(filemerge, '_filemerge',
141 overrides.overridefilemerge)
141 overrides.overridefilemerge)
142 extensions.wrapfunction(cmdutil, 'copy', overrides.overridecopy)
142 extensions.wrapfunction(cmdutil, 'copy', overrides.overridecopy)
143
143
144 # Summary calls dirty on the subrepos
144 # Summary calls dirty on the subrepos
145 extensions.wrapfunction(subrepo.hgsubrepo, 'dirty', overrides.overridedirty)
145 extensions.wrapfunction(subrepo.hgsubrepo, 'dirty', overrides.overridedirty)
146
146
147 extensions.wrapfunction(cmdutil, 'revert', overrides.overriderevert)
147 extensions.wrapfunction(cmdutil, 'revert', overrides.overriderevert)
148
148
149 extensions.wrapcommand(commands.table, 'archive',
149 extensions.wrapcommand(commands.table, 'archive',
150 overrides.overridearchivecmd)
150 overrides.overridearchivecmd)
151 extensions.wrapfunction(archival, 'archive', overrides.overridearchive)
151 extensions.wrapfunction(archival, 'archive', overrides.overridearchive)
152 extensions.wrapfunction(subrepo.hgsubrepo, 'archive',
152 extensions.wrapfunction(subrepo.hgsubrepo, 'archive',
153 overrides.hgsubrepoarchive)
153 overrides.hgsubrepoarchive)
154 extensions.wrapfunction(webcommands, 'archive', overrides.hgwebarchive)
154 extensions.wrapfunction(webcommands, 'archive', overrides.hgwebarchive)
155 extensions.wrapfunction(cmdutil, 'bailifchanged',
155 extensions.wrapfunction(cmdutil, 'bailifchanged',
156 overrides.overridebailifchanged)
156 overrides.overridebailifchanged)
157
157
158 extensions.wrapfunction(cmdutil, 'postcommitstatus',
158 extensions.wrapfunction(cmdutil, 'postcommitstatus',
159 overrides.postcommitstatus)
159 overrides.postcommitstatus)
160 extensions.wrapfunction(scmutil, 'marktouched',
160 extensions.wrapfunction(scmutil, 'marktouched',
161 overrides.scmutilmarktouched)
161 overrides.scmutilmarktouched)
162
162
163 extensions.wrapfunction(url, 'open',
163 extensions.wrapfunction(url, 'open',
164 overrides.openlargefile)
164 overrides.openlargefile)
165
165
166 # create the new wireproto commands ...
166 # create the new wireproto commands ...
167 wireproto.wireprotocommand('putlfile', 'sha', permission='push')(
167 wireproto.wireprotocommand('putlfile', 'sha', permission='push')(
168 proto.putlfile)
168 proto.putlfile)
169 wireproto.wireprotocommand('getlfile', 'sha', permission='pull')(
169 wireproto.wireprotocommand('getlfile', 'sha', permission='pull')(
170 proto.getlfile)
170 proto.getlfile)
171 wireproto.wireprotocommand('statlfile', 'sha', permission='pull')(
171 wireproto.wireprotocommand('statlfile', 'sha', permission='pull')(
172 proto.statlfile)
172 proto.statlfile)
173 wireproto.wireprotocommand('lheads', '', permission='pull')(
173 wireproto.wireprotocommand('lheads', '', permission='pull')(
174 wireproto.heads)
174 wireproto.heads)
175
175
176 # ... and wrap some existing ones
176 # ... and wrap some existing ones
177 wireproto.commands['heads'].func = proto.heads
177 wireproto.commands['heads'].func = proto.heads
178 # TODO also wrap wireproto.commandsv2 once heads is implemented there.
178
179
179 extensions.wrapfunction(webcommands, 'decodepath', overrides.decodepath)
180 extensions.wrapfunction(webcommands, 'decodepath', overrides.decodepath)
180
181
181 extensions.wrapfunction(wireproto, '_capabilities', proto._capabilities)
182 extensions.wrapfunction(wireproto, '_capabilities', proto._capabilities)
182
183
183 # can't do this in reposetup because it needs to have happened before
184 # can't do this in reposetup because it needs to have happened before
184 # wirerepo.__init__ is called
185 # wirerepo.__init__ is called
185 proto.ssholdcallstream = sshpeer.sshv1peer._callstream
186 proto.ssholdcallstream = sshpeer.sshv1peer._callstream
186 proto.httpoldcallstream = httppeer.httppeer._callstream
187 proto.httpoldcallstream = httppeer.httppeer._callstream
187 sshpeer.sshv1peer._callstream = proto.sshrepocallstream
188 sshpeer.sshv1peer._callstream = proto.sshrepocallstream
188 httppeer.httppeer._callstream = proto.httprepocallstream
189 httppeer.httppeer._callstream = proto.httprepocallstream
189
190
190 # override some extensions' stuff as well
191 # override some extensions' stuff as well
191 for name, module in extensions.extensions():
192 for name, module in extensions.extensions():
192 if name == 'purge':
193 if name == 'purge':
193 extensions.wrapcommand(getattr(module, 'cmdtable'), 'purge',
194 extensions.wrapcommand(getattr(module, 'cmdtable'), 'purge',
194 overrides.overridepurge)
195 overrides.overridepurge)
195 if name == 'rebase':
196 if name == 'rebase':
196 extensions.wrapcommand(getattr(module, 'cmdtable'), 'rebase',
197 extensions.wrapcommand(getattr(module, 'cmdtable'), 'rebase',
197 overrides.overriderebase)
198 overrides.overriderebase)
198 extensions.wrapfunction(module, 'rebase',
199 extensions.wrapfunction(module, 'rebase',
199 overrides.overriderebase)
200 overrides.overriderebase)
200 if name == 'transplant':
201 if name == 'transplant':
201 extensions.wrapcommand(getattr(module, 'cmdtable'), 'transplant',
202 extensions.wrapcommand(getattr(module, 'cmdtable'), 'transplant',
202 overrides.overridetransplant)
203 overrides.overridetransplant)
@@ -1,1139 +1,1163 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import os
11 import os
12 import tempfile
12 import tempfile
13
13
14 from .i18n import _
14 from .i18n import _
15 from .node import (
15 from .node import (
16 bin,
16 bin,
17 hex,
17 hex,
18 nullid,
18 nullid,
19 )
19 )
20
20
21 from . import (
21 from . import (
22 bundle2,
22 bundle2,
23 changegroup as changegroupmod,
23 changegroup as changegroupmod,
24 discovery,
24 discovery,
25 encoding,
25 encoding,
26 error,
26 error,
27 exchange,
27 exchange,
28 peer,
28 peer,
29 pushkey as pushkeymod,
29 pushkey as pushkeymod,
30 pycompat,
30 pycompat,
31 repository,
31 repository,
32 streamclone,
32 streamclone,
33 util,
33 util,
34 wireprototypes,
34 wireprototypes,
35 )
35 )
36
36
37 from .utils import (
37 from .utils import (
38 procutil,
38 procutil,
39 stringutil,
39 stringutil,
40 )
40 )
41
41
42 urlerr = util.urlerr
42 urlerr = util.urlerr
43 urlreq = util.urlreq
43 urlreq = util.urlreq
44
44
45 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
45 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
46 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
46 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
47 'IncompatibleClient')
47 'IncompatibleClient')
48 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
48 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
49
49
50 class remoteiterbatcher(peer.iterbatcher):
50 class remoteiterbatcher(peer.iterbatcher):
51 def __init__(self, remote):
51 def __init__(self, remote):
52 super(remoteiterbatcher, self).__init__()
52 super(remoteiterbatcher, self).__init__()
53 self._remote = remote
53 self._remote = remote
54
54
55 def __getattr__(self, name):
55 def __getattr__(self, name):
56 # Validate this method is batchable, since submit() only supports
56 # Validate this method is batchable, since submit() only supports
57 # batchable methods.
57 # batchable methods.
58 fn = getattr(self._remote, name)
58 fn = getattr(self._remote, name)
59 if not getattr(fn, 'batchable', None):
59 if not getattr(fn, 'batchable', None):
60 raise error.ProgrammingError('Attempted to batch a non-batchable '
60 raise error.ProgrammingError('Attempted to batch a non-batchable '
61 'call to %r' % name)
61 'call to %r' % name)
62
62
63 return super(remoteiterbatcher, self).__getattr__(name)
63 return super(remoteiterbatcher, self).__getattr__(name)
64
64
65 def submit(self):
65 def submit(self):
66 """Break the batch request into many patch calls and pipeline them.
66 """Break the batch request into many patch calls and pipeline them.
67
67
68 This is mostly valuable over http where request sizes can be
68 This is mostly valuable over http where request sizes can be
69 limited, but can be used in other places as well.
69 limited, but can be used in other places as well.
70 """
70 """
71 # 2-tuple of (command, arguments) that represents what will be
71 # 2-tuple of (command, arguments) that represents what will be
72 # sent over the wire.
72 # sent over the wire.
73 requests = []
73 requests = []
74
74
75 # 4-tuple of (command, final future, @batchable generator, remote
75 # 4-tuple of (command, final future, @batchable generator, remote
76 # future).
76 # future).
77 results = []
77 results = []
78
78
79 for command, args, opts, finalfuture in self.calls:
79 for command, args, opts, finalfuture in self.calls:
80 mtd = getattr(self._remote, command)
80 mtd = getattr(self._remote, command)
81 batchable = mtd.batchable(mtd.__self__, *args, **opts)
81 batchable = mtd.batchable(mtd.__self__, *args, **opts)
82
82
83 commandargs, fremote = next(batchable)
83 commandargs, fremote = next(batchable)
84 assert fremote
84 assert fremote
85 requests.append((command, commandargs))
85 requests.append((command, commandargs))
86 results.append((command, finalfuture, batchable, fremote))
86 results.append((command, finalfuture, batchable, fremote))
87
87
88 if requests:
88 if requests:
89 self._resultiter = self._remote._submitbatch(requests)
89 self._resultiter = self._remote._submitbatch(requests)
90
90
91 self._results = results
91 self._results = results
92
92
93 def results(self):
93 def results(self):
94 for command, finalfuture, batchable, remotefuture in self._results:
94 for command, finalfuture, batchable, remotefuture in self._results:
95 # Get the raw result, set it in the remote future, feed it
95 # Get the raw result, set it in the remote future, feed it
96 # back into the @batchable generator so it can be decoded, and
96 # back into the @batchable generator so it can be decoded, and
97 # set the result on the final future to this value.
97 # set the result on the final future to this value.
98 remoteresult = next(self._resultiter)
98 remoteresult = next(self._resultiter)
99 remotefuture.set(remoteresult)
99 remotefuture.set(remoteresult)
100 finalfuture.set(next(batchable))
100 finalfuture.set(next(batchable))
101
101
102 # Verify our @batchable generators only emit 2 values.
102 # Verify our @batchable generators only emit 2 values.
103 try:
103 try:
104 next(batchable)
104 next(batchable)
105 except StopIteration:
105 except StopIteration:
106 pass
106 pass
107 else:
107 else:
108 raise error.ProgrammingError('%s @batchable generator emitted '
108 raise error.ProgrammingError('%s @batchable generator emitted '
109 'unexpected value count' % command)
109 'unexpected value count' % command)
110
110
111 yield finalfuture.value
111 yield finalfuture.value
112
112
113 # Forward a couple of names from peer to make wireproto interactions
113 # Forward a couple of names from peer to make wireproto interactions
114 # slightly more sensible.
114 # slightly more sensible.
115 batchable = peer.batchable
115 batchable = peer.batchable
116 future = peer.future
116 future = peer.future
117
117
118 # list of nodes encoding / decoding
118 # list of nodes encoding / decoding
119
119
120 def decodelist(l, sep=' '):
120 def decodelist(l, sep=' '):
121 if l:
121 if l:
122 return [bin(v) for v in l.split(sep)]
122 return [bin(v) for v in l.split(sep)]
123 return []
123 return []
124
124
125 def encodelist(l, sep=' '):
125 def encodelist(l, sep=' '):
126 try:
126 try:
127 return sep.join(map(hex, l))
127 return sep.join(map(hex, l))
128 except TypeError:
128 except TypeError:
129 raise
129 raise
130
130
131 # batched call argument encoding
131 # batched call argument encoding
132
132
133 def escapearg(plain):
133 def escapearg(plain):
134 return (plain
134 return (plain
135 .replace(':', ':c')
135 .replace(':', ':c')
136 .replace(',', ':o')
136 .replace(',', ':o')
137 .replace(';', ':s')
137 .replace(';', ':s')
138 .replace('=', ':e'))
138 .replace('=', ':e'))
139
139
140 def unescapearg(escaped):
140 def unescapearg(escaped):
141 return (escaped
141 return (escaped
142 .replace(':e', '=')
142 .replace(':e', '=')
143 .replace(':s', ';')
143 .replace(':s', ';')
144 .replace(':o', ',')
144 .replace(':o', ',')
145 .replace(':c', ':'))
145 .replace(':c', ':'))
146
146
147 def encodebatchcmds(req):
147 def encodebatchcmds(req):
148 """Return a ``cmds`` argument value for the ``batch`` command."""
148 """Return a ``cmds`` argument value for the ``batch`` command."""
149 cmds = []
149 cmds = []
150 for op, argsdict in req:
150 for op, argsdict in req:
151 # Old servers didn't properly unescape argument names. So prevent
151 # Old servers didn't properly unescape argument names. So prevent
152 # the sending of argument names that may not be decoded properly by
152 # the sending of argument names that may not be decoded properly by
153 # servers.
153 # servers.
154 assert all(escapearg(k) == k for k in argsdict)
154 assert all(escapearg(k) == k for k in argsdict)
155
155
156 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
156 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
157 for k, v in argsdict.iteritems())
157 for k, v in argsdict.iteritems())
158 cmds.append('%s %s' % (op, args))
158 cmds.append('%s %s' % (op, args))
159
159
160 return ';'.join(cmds)
160 return ';'.join(cmds)
161
161
162 # mapping of options accepted by getbundle and their types
162 # mapping of options accepted by getbundle and their types
163 #
163 #
164 # Meant to be extended by extensions. It is extensions responsibility to ensure
164 # Meant to be extended by extensions. It is extensions responsibility to ensure
165 # such options are properly processed in exchange.getbundle.
165 # such options are properly processed in exchange.getbundle.
166 #
166 #
167 # supported types are:
167 # supported types are:
168 #
168 #
169 # :nodes: list of binary nodes
169 # :nodes: list of binary nodes
170 # :csv: list of comma-separated values
170 # :csv: list of comma-separated values
171 # :scsv: list of comma-separated values return as set
171 # :scsv: list of comma-separated values return as set
172 # :plain: string with no transformation needed.
172 # :plain: string with no transformation needed.
173 gboptsmap = {'heads': 'nodes',
173 gboptsmap = {'heads': 'nodes',
174 'bookmarks': 'boolean',
174 'bookmarks': 'boolean',
175 'common': 'nodes',
175 'common': 'nodes',
176 'obsmarkers': 'boolean',
176 'obsmarkers': 'boolean',
177 'phases': 'boolean',
177 'phases': 'boolean',
178 'bundlecaps': 'scsv',
178 'bundlecaps': 'scsv',
179 'listkeys': 'csv',
179 'listkeys': 'csv',
180 'cg': 'boolean',
180 'cg': 'boolean',
181 'cbattempted': 'boolean',
181 'cbattempted': 'boolean',
182 'stream': 'boolean',
182 'stream': 'boolean',
183 }
183 }
184
184
185 # client side
185 # client side
186
186
187 class wirepeer(repository.legacypeer):
187 class wirepeer(repository.legacypeer):
188 """Client-side interface for communicating with a peer repository.
188 """Client-side interface for communicating with a peer repository.
189
189
190 Methods commonly call wire protocol commands of the same name.
190 Methods commonly call wire protocol commands of the same name.
191
191
192 See also httppeer.py and sshpeer.py for protocol-specific
192 See also httppeer.py and sshpeer.py for protocol-specific
193 implementations of this interface.
193 implementations of this interface.
194 """
194 """
195 # Begin of basewirepeer interface.
195 # Begin of basewirepeer interface.
196
196
197 def iterbatch(self):
197 def iterbatch(self):
198 return remoteiterbatcher(self)
198 return remoteiterbatcher(self)
199
199
200 @batchable
200 @batchable
201 def lookup(self, key):
201 def lookup(self, key):
202 self.requirecap('lookup', _('look up remote revision'))
202 self.requirecap('lookup', _('look up remote revision'))
203 f = future()
203 f = future()
204 yield {'key': encoding.fromlocal(key)}, f
204 yield {'key': encoding.fromlocal(key)}, f
205 d = f.value
205 d = f.value
206 success, data = d[:-1].split(" ", 1)
206 success, data = d[:-1].split(" ", 1)
207 if int(success):
207 if int(success):
208 yield bin(data)
208 yield bin(data)
209 else:
209 else:
210 self._abort(error.RepoError(data))
210 self._abort(error.RepoError(data))
211
211
212 @batchable
212 @batchable
213 def heads(self):
213 def heads(self):
214 f = future()
214 f = future()
215 yield {}, f
215 yield {}, f
216 d = f.value
216 d = f.value
217 try:
217 try:
218 yield decodelist(d[:-1])
218 yield decodelist(d[:-1])
219 except ValueError:
219 except ValueError:
220 self._abort(error.ResponseError(_("unexpected response:"), d))
220 self._abort(error.ResponseError(_("unexpected response:"), d))
221
221
222 @batchable
222 @batchable
223 def known(self, nodes):
223 def known(self, nodes):
224 f = future()
224 f = future()
225 yield {'nodes': encodelist(nodes)}, f
225 yield {'nodes': encodelist(nodes)}, f
226 d = f.value
226 d = f.value
227 try:
227 try:
228 yield [bool(int(b)) for b in d]
228 yield [bool(int(b)) for b in d]
229 except ValueError:
229 except ValueError:
230 self._abort(error.ResponseError(_("unexpected response:"), d))
230 self._abort(error.ResponseError(_("unexpected response:"), d))
231
231
232 @batchable
232 @batchable
233 def branchmap(self):
233 def branchmap(self):
234 f = future()
234 f = future()
235 yield {}, f
235 yield {}, f
236 d = f.value
236 d = f.value
237 try:
237 try:
238 branchmap = {}
238 branchmap = {}
239 for branchpart in d.splitlines():
239 for branchpart in d.splitlines():
240 branchname, branchheads = branchpart.split(' ', 1)
240 branchname, branchheads = branchpart.split(' ', 1)
241 branchname = encoding.tolocal(urlreq.unquote(branchname))
241 branchname = encoding.tolocal(urlreq.unquote(branchname))
242 branchheads = decodelist(branchheads)
242 branchheads = decodelist(branchheads)
243 branchmap[branchname] = branchheads
243 branchmap[branchname] = branchheads
244 yield branchmap
244 yield branchmap
245 except TypeError:
245 except TypeError:
246 self._abort(error.ResponseError(_("unexpected response:"), d))
246 self._abort(error.ResponseError(_("unexpected response:"), d))
247
247
248 @batchable
248 @batchable
249 def listkeys(self, namespace):
249 def listkeys(self, namespace):
250 if not self.capable('pushkey'):
250 if not self.capable('pushkey'):
251 yield {}, None
251 yield {}, None
252 f = future()
252 f = future()
253 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
253 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
254 yield {'namespace': encoding.fromlocal(namespace)}, f
254 yield {'namespace': encoding.fromlocal(namespace)}, f
255 d = f.value
255 d = f.value
256 self.ui.debug('received listkey for "%s": %i bytes\n'
256 self.ui.debug('received listkey for "%s": %i bytes\n'
257 % (namespace, len(d)))
257 % (namespace, len(d)))
258 yield pushkeymod.decodekeys(d)
258 yield pushkeymod.decodekeys(d)
259
259
260 @batchable
260 @batchable
261 def pushkey(self, namespace, key, old, new):
261 def pushkey(self, namespace, key, old, new):
262 if not self.capable('pushkey'):
262 if not self.capable('pushkey'):
263 yield False, None
263 yield False, None
264 f = future()
264 f = future()
265 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
265 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
266 yield {'namespace': encoding.fromlocal(namespace),
266 yield {'namespace': encoding.fromlocal(namespace),
267 'key': encoding.fromlocal(key),
267 'key': encoding.fromlocal(key),
268 'old': encoding.fromlocal(old),
268 'old': encoding.fromlocal(old),
269 'new': encoding.fromlocal(new)}, f
269 'new': encoding.fromlocal(new)}, f
270 d = f.value
270 d = f.value
271 d, output = d.split('\n', 1)
271 d, output = d.split('\n', 1)
272 try:
272 try:
273 d = bool(int(d))
273 d = bool(int(d))
274 except ValueError:
274 except ValueError:
275 raise error.ResponseError(
275 raise error.ResponseError(
276 _('push failed (unexpected response):'), d)
276 _('push failed (unexpected response):'), d)
277 for l in output.splitlines(True):
277 for l in output.splitlines(True):
278 self.ui.status(_('remote: '), l)
278 self.ui.status(_('remote: '), l)
279 yield d
279 yield d
280
280
281 def stream_out(self):
281 def stream_out(self):
282 return self._callstream('stream_out')
282 return self._callstream('stream_out')
283
283
284 def getbundle(self, source, **kwargs):
284 def getbundle(self, source, **kwargs):
285 kwargs = pycompat.byteskwargs(kwargs)
285 kwargs = pycompat.byteskwargs(kwargs)
286 self.requirecap('getbundle', _('look up remote changes'))
286 self.requirecap('getbundle', _('look up remote changes'))
287 opts = {}
287 opts = {}
288 bundlecaps = kwargs.get('bundlecaps')
288 bundlecaps = kwargs.get('bundlecaps')
289 if bundlecaps is not None:
289 if bundlecaps is not None:
290 kwargs['bundlecaps'] = sorted(bundlecaps)
290 kwargs['bundlecaps'] = sorted(bundlecaps)
291 else:
291 else:
292 bundlecaps = () # kwargs could have it to None
292 bundlecaps = () # kwargs could have it to None
293 for key, value in kwargs.iteritems():
293 for key, value in kwargs.iteritems():
294 if value is None:
294 if value is None:
295 continue
295 continue
296 keytype = gboptsmap.get(key)
296 keytype = gboptsmap.get(key)
297 if keytype is None:
297 if keytype is None:
298 raise error.ProgrammingError(
298 raise error.ProgrammingError(
299 'Unexpectedly None keytype for key %s' % key)
299 'Unexpectedly None keytype for key %s' % key)
300 elif keytype == 'nodes':
300 elif keytype == 'nodes':
301 value = encodelist(value)
301 value = encodelist(value)
302 elif keytype in ('csv', 'scsv'):
302 elif keytype in ('csv', 'scsv'):
303 value = ','.join(value)
303 value = ','.join(value)
304 elif keytype == 'boolean':
304 elif keytype == 'boolean':
305 value = '%i' % bool(value)
305 value = '%i' % bool(value)
306 elif keytype != 'plain':
306 elif keytype != 'plain':
307 raise KeyError('unknown getbundle option type %s'
307 raise KeyError('unknown getbundle option type %s'
308 % keytype)
308 % keytype)
309 opts[key] = value
309 opts[key] = value
310 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
310 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
311 if any((cap.startswith('HG2') for cap in bundlecaps)):
311 if any((cap.startswith('HG2') for cap in bundlecaps)):
312 return bundle2.getunbundler(self.ui, f)
312 return bundle2.getunbundler(self.ui, f)
313 else:
313 else:
314 return changegroupmod.cg1unpacker(f, 'UN')
314 return changegroupmod.cg1unpacker(f, 'UN')
315
315
316 def unbundle(self, cg, heads, url):
316 def unbundle(self, cg, heads, url):
317 '''Send cg (a readable file-like object representing the
317 '''Send cg (a readable file-like object representing the
318 changegroup to push, typically a chunkbuffer object) to the
318 changegroup to push, typically a chunkbuffer object) to the
319 remote server as a bundle.
319 remote server as a bundle.
320
320
321 When pushing a bundle10 stream, return an integer indicating the
321 When pushing a bundle10 stream, return an integer indicating the
322 result of the push (see changegroup.apply()).
322 result of the push (see changegroup.apply()).
323
323
324 When pushing a bundle20 stream, return a bundle20 stream.
324 When pushing a bundle20 stream, return a bundle20 stream.
325
325
326 `url` is the url the client thinks it's pushing to, which is
326 `url` is the url the client thinks it's pushing to, which is
327 visible to hooks.
327 visible to hooks.
328 '''
328 '''
329
329
330 if heads != ['force'] and self.capable('unbundlehash'):
330 if heads != ['force'] and self.capable('unbundlehash'):
331 heads = encodelist(['hashed',
331 heads = encodelist(['hashed',
332 hashlib.sha1(''.join(sorted(heads))).digest()])
332 hashlib.sha1(''.join(sorted(heads))).digest()])
333 else:
333 else:
334 heads = encodelist(heads)
334 heads = encodelist(heads)
335
335
336 if util.safehasattr(cg, 'deltaheader'):
336 if util.safehasattr(cg, 'deltaheader'):
337 # this a bundle10, do the old style call sequence
337 # this a bundle10, do the old style call sequence
338 ret, output = self._callpush("unbundle", cg, heads=heads)
338 ret, output = self._callpush("unbundle", cg, heads=heads)
339 if ret == "":
339 if ret == "":
340 raise error.ResponseError(
340 raise error.ResponseError(
341 _('push failed:'), output)
341 _('push failed:'), output)
342 try:
342 try:
343 ret = int(ret)
343 ret = int(ret)
344 except ValueError:
344 except ValueError:
345 raise error.ResponseError(
345 raise error.ResponseError(
346 _('push failed (unexpected response):'), ret)
346 _('push failed (unexpected response):'), ret)
347
347
348 for l in output.splitlines(True):
348 for l in output.splitlines(True):
349 self.ui.status(_('remote: '), l)
349 self.ui.status(_('remote: '), l)
350 else:
350 else:
351 # bundle2 push. Send a stream, fetch a stream.
351 # bundle2 push. Send a stream, fetch a stream.
352 stream = self._calltwowaystream('unbundle', cg, heads=heads)
352 stream = self._calltwowaystream('unbundle', cg, heads=heads)
353 ret = bundle2.getunbundler(self.ui, stream)
353 ret = bundle2.getunbundler(self.ui, stream)
354 return ret
354 return ret
355
355
356 # End of basewirepeer interface.
356 # End of basewirepeer interface.
357
357
358 # Begin of baselegacywirepeer interface.
358 # Begin of baselegacywirepeer interface.
359
359
360 def branches(self, nodes):
360 def branches(self, nodes):
361 n = encodelist(nodes)
361 n = encodelist(nodes)
362 d = self._call("branches", nodes=n)
362 d = self._call("branches", nodes=n)
363 try:
363 try:
364 br = [tuple(decodelist(b)) for b in d.splitlines()]
364 br = [tuple(decodelist(b)) for b in d.splitlines()]
365 return br
365 return br
366 except ValueError:
366 except ValueError:
367 self._abort(error.ResponseError(_("unexpected response:"), d))
367 self._abort(error.ResponseError(_("unexpected response:"), d))
368
368
369 def between(self, pairs):
369 def between(self, pairs):
370 batch = 8 # avoid giant requests
370 batch = 8 # avoid giant requests
371 r = []
371 r = []
372 for i in xrange(0, len(pairs), batch):
372 for i in xrange(0, len(pairs), batch):
373 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
373 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
374 d = self._call("between", pairs=n)
374 d = self._call("between", pairs=n)
375 try:
375 try:
376 r.extend(l and decodelist(l) or [] for l in d.splitlines())
376 r.extend(l and decodelist(l) or [] for l in d.splitlines())
377 except ValueError:
377 except ValueError:
378 self._abort(error.ResponseError(_("unexpected response:"), d))
378 self._abort(error.ResponseError(_("unexpected response:"), d))
379 return r
379 return r
380
380
381 def changegroup(self, nodes, kind):
381 def changegroup(self, nodes, kind):
382 n = encodelist(nodes)
382 n = encodelist(nodes)
383 f = self._callcompressable("changegroup", roots=n)
383 f = self._callcompressable("changegroup", roots=n)
384 return changegroupmod.cg1unpacker(f, 'UN')
384 return changegroupmod.cg1unpacker(f, 'UN')
385
385
386 def changegroupsubset(self, bases, heads, kind):
386 def changegroupsubset(self, bases, heads, kind):
387 self.requirecap('changegroupsubset', _('look up remote changes'))
387 self.requirecap('changegroupsubset', _('look up remote changes'))
388 bases = encodelist(bases)
388 bases = encodelist(bases)
389 heads = encodelist(heads)
389 heads = encodelist(heads)
390 f = self._callcompressable("changegroupsubset",
390 f = self._callcompressable("changegroupsubset",
391 bases=bases, heads=heads)
391 bases=bases, heads=heads)
392 return changegroupmod.cg1unpacker(f, 'UN')
392 return changegroupmod.cg1unpacker(f, 'UN')
393
393
394 # End of baselegacywirepeer interface.
394 # End of baselegacywirepeer interface.
395
395
396 def _submitbatch(self, req):
396 def _submitbatch(self, req):
397 """run batch request <req> on the server
397 """run batch request <req> on the server
398
398
399 Returns an iterator of the raw responses from the server.
399 Returns an iterator of the raw responses from the server.
400 """
400 """
401 ui = self.ui
401 ui = self.ui
402 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
402 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
403 ui.debug('devel-peer-request: batched-content\n')
403 ui.debug('devel-peer-request: batched-content\n')
404 for op, args in req:
404 for op, args in req:
405 msg = 'devel-peer-request: - %s (%d arguments)\n'
405 msg = 'devel-peer-request: - %s (%d arguments)\n'
406 ui.debug(msg % (op, len(args)))
406 ui.debug(msg % (op, len(args)))
407
407
408 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
408 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
409 chunk = rsp.read(1024)
409 chunk = rsp.read(1024)
410 work = [chunk]
410 work = [chunk]
411 while chunk:
411 while chunk:
412 while ';' not in chunk and chunk:
412 while ';' not in chunk and chunk:
413 chunk = rsp.read(1024)
413 chunk = rsp.read(1024)
414 work.append(chunk)
414 work.append(chunk)
415 merged = ''.join(work)
415 merged = ''.join(work)
416 while ';' in merged:
416 while ';' in merged:
417 one, merged = merged.split(';', 1)
417 one, merged = merged.split(';', 1)
418 yield unescapearg(one)
418 yield unescapearg(one)
419 chunk = rsp.read(1024)
419 chunk = rsp.read(1024)
420 work = [merged, chunk]
420 work = [merged, chunk]
421 yield unescapearg(''.join(work))
421 yield unescapearg(''.join(work))
422
422
423 def _submitone(self, op, args):
423 def _submitone(self, op, args):
424 return self._call(op, **pycompat.strkwargs(args))
424 return self._call(op, **pycompat.strkwargs(args))
425
425
426 def debugwireargs(self, one, two, three=None, four=None, five=None):
426 def debugwireargs(self, one, two, three=None, four=None, five=None):
427 # don't pass optional arguments left at their default value
427 # don't pass optional arguments left at their default value
428 opts = {}
428 opts = {}
429 if three is not None:
429 if three is not None:
430 opts[r'three'] = three
430 opts[r'three'] = three
431 if four is not None:
431 if four is not None:
432 opts[r'four'] = four
432 opts[r'four'] = four
433 return self._call('debugwireargs', one=one, two=two, **opts)
433 return self._call('debugwireargs', one=one, two=two, **opts)
434
434
435 def _call(self, cmd, **args):
435 def _call(self, cmd, **args):
436 """execute <cmd> on the server
436 """execute <cmd> on the server
437
437
438 The command is expected to return a simple string.
438 The command is expected to return a simple string.
439
439
440 returns the server reply as a string."""
440 returns the server reply as a string."""
441 raise NotImplementedError()
441 raise NotImplementedError()
442
442
443 def _callstream(self, cmd, **args):
443 def _callstream(self, cmd, **args):
444 """execute <cmd> on the server
444 """execute <cmd> on the server
445
445
446 The command is expected to return a stream. Note that if the
446 The command is expected to return a stream. Note that if the
447 command doesn't return a stream, _callstream behaves
447 command doesn't return a stream, _callstream behaves
448 differently for ssh and http peers.
448 differently for ssh and http peers.
449
449
450 returns the server reply as a file like object.
450 returns the server reply as a file like object.
451 """
451 """
452 raise NotImplementedError()
452 raise NotImplementedError()
453
453
454 def _callcompressable(self, cmd, **args):
454 def _callcompressable(self, cmd, **args):
455 """execute <cmd> on the server
455 """execute <cmd> on the server
456
456
457 The command is expected to return a stream.
457 The command is expected to return a stream.
458
458
459 The stream may have been compressed in some implementations. This
459 The stream may have been compressed in some implementations. This
460 function takes care of the decompression. This is the only difference
460 function takes care of the decompression. This is the only difference
461 with _callstream.
461 with _callstream.
462
462
463 returns the server reply as a file like object.
463 returns the server reply as a file like object.
464 """
464 """
465 raise NotImplementedError()
465 raise NotImplementedError()
466
466
467 def _callpush(self, cmd, fp, **args):
467 def _callpush(self, cmd, fp, **args):
468 """execute a <cmd> on server
468 """execute a <cmd> on server
469
469
470 The command is expected to be related to a push. Push has a special
470 The command is expected to be related to a push. Push has a special
471 return method.
471 return method.
472
472
473 returns the server reply as a (ret, output) tuple. ret is either
473 returns the server reply as a (ret, output) tuple. ret is either
474 empty (error) or a stringified int.
474 empty (error) or a stringified int.
475 """
475 """
476 raise NotImplementedError()
476 raise NotImplementedError()
477
477
478 def _calltwowaystream(self, cmd, fp, **args):
478 def _calltwowaystream(self, cmd, fp, **args):
479 """execute <cmd> on server
479 """execute <cmd> on server
480
480
481 The command will send a stream to the server and get a stream in reply.
481 The command will send a stream to the server and get a stream in reply.
482 """
482 """
483 raise NotImplementedError()
483 raise NotImplementedError()
484
484
485 def _abort(self, exception):
485 def _abort(self, exception):
486 """clearly abort the wire protocol connection and raise the exception
486 """clearly abort the wire protocol connection and raise the exception
487 """
487 """
488 raise NotImplementedError()
488 raise NotImplementedError()
489
489
490 # server side
490 # server side
491
491
492 # wire protocol command can either return a string or one of these classes.
492 # wire protocol command can either return a string or one of these classes.
493
493
494 def getdispatchrepo(repo, proto, command):
494 def getdispatchrepo(repo, proto, command):
495 """Obtain the repo used for processing wire protocol commands.
495 """Obtain the repo used for processing wire protocol commands.
496
496
497 The intent of this function is to serve as a monkeypatch point for
497 The intent of this function is to serve as a monkeypatch point for
498 extensions that need commands to operate on different repo views under
498 extensions that need commands to operate on different repo views under
499 specialized circumstances.
499 specialized circumstances.
500 """
500 """
501 return repo.filtered('served')
501 return repo.filtered('served')
502
502
503 def dispatch(repo, proto, command):
503 def dispatch(repo, proto, command):
504 repo = getdispatchrepo(repo, proto, command)
504 repo = getdispatchrepo(repo, proto, command)
505 func, spec = commands[command]
505
506 transportversion = wireprototypes.TRANSPORTS[proto.name]['version']
507 commandtable = commandsv2 if transportversion == 2 else commands
508 func, spec = commandtable[command]
509
506 args = proto.getargs(spec)
510 args = proto.getargs(spec)
507 return func(repo, proto, *args)
511 return func(repo, proto, *args)
508
512
509 def options(cmd, keys, others):
513 def options(cmd, keys, others):
510 opts = {}
514 opts = {}
511 for k in keys:
515 for k in keys:
512 if k in others:
516 if k in others:
513 opts[k] = others[k]
517 opts[k] = others[k]
514 del others[k]
518 del others[k]
515 if others:
519 if others:
516 procutil.stderr.write("warning: %s ignored unexpected arguments %s\n"
520 procutil.stderr.write("warning: %s ignored unexpected arguments %s\n"
517 % (cmd, ",".join(others)))
521 % (cmd, ",".join(others)))
518 return opts
522 return opts
519
523
520 def bundle1allowed(repo, action):
524 def bundle1allowed(repo, action):
521 """Whether a bundle1 operation is allowed from the server.
525 """Whether a bundle1 operation is allowed from the server.
522
526
523 Priority is:
527 Priority is:
524
528
525 1. server.bundle1gd.<action> (if generaldelta active)
529 1. server.bundle1gd.<action> (if generaldelta active)
526 2. server.bundle1.<action>
530 2. server.bundle1.<action>
527 3. server.bundle1gd (if generaldelta active)
531 3. server.bundle1gd (if generaldelta active)
528 4. server.bundle1
532 4. server.bundle1
529 """
533 """
530 ui = repo.ui
534 ui = repo.ui
531 gd = 'generaldelta' in repo.requirements
535 gd = 'generaldelta' in repo.requirements
532
536
533 if gd:
537 if gd:
534 v = ui.configbool('server', 'bundle1gd.%s' % action)
538 v = ui.configbool('server', 'bundle1gd.%s' % action)
535 if v is not None:
539 if v is not None:
536 return v
540 return v
537
541
538 v = ui.configbool('server', 'bundle1.%s' % action)
542 v = ui.configbool('server', 'bundle1.%s' % action)
539 if v is not None:
543 if v is not None:
540 return v
544 return v
541
545
542 if gd:
546 if gd:
543 v = ui.configbool('server', 'bundle1gd')
547 v = ui.configbool('server', 'bundle1gd')
544 if v is not None:
548 if v is not None:
545 return v
549 return v
546
550
547 return ui.configbool('server', 'bundle1')
551 return ui.configbool('server', 'bundle1')
548
552
549 def supportedcompengines(ui, role):
553 def supportedcompengines(ui, role):
550 """Obtain the list of supported compression engines for a request."""
554 """Obtain the list of supported compression engines for a request."""
551 assert role in (util.CLIENTROLE, util.SERVERROLE)
555 assert role in (util.CLIENTROLE, util.SERVERROLE)
552
556
553 compengines = util.compengines.supportedwireengines(role)
557 compengines = util.compengines.supportedwireengines(role)
554
558
555 # Allow config to override default list and ordering.
559 # Allow config to override default list and ordering.
556 if role == util.SERVERROLE:
560 if role == util.SERVERROLE:
557 configengines = ui.configlist('server', 'compressionengines')
561 configengines = ui.configlist('server', 'compressionengines')
558 config = 'server.compressionengines'
562 config = 'server.compressionengines'
559 else:
563 else:
560 # This is currently implemented mainly to facilitate testing. In most
564 # This is currently implemented mainly to facilitate testing. In most
561 # cases, the server should be in charge of choosing a compression engine
565 # cases, the server should be in charge of choosing a compression engine
562 # because a server has the most to lose from a sub-optimal choice. (e.g.
566 # because a server has the most to lose from a sub-optimal choice. (e.g.
563 # CPU DoS due to an expensive engine or a network DoS due to poor
567 # CPU DoS due to an expensive engine or a network DoS due to poor
564 # compression ratio).
568 # compression ratio).
565 configengines = ui.configlist('experimental',
569 configengines = ui.configlist('experimental',
566 'clientcompressionengines')
570 'clientcompressionengines')
567 config = 'experimental.clientcompressionengines'
571 config = 'experimental.clientcompressionengines'
568
572
569 # No explicit config. Filter out the ones that aren't supposed to be
573 # No explicit config. Filter out the ones that aren't supposed to be
570 # advertised and return default ordering.
574 # advertised and return default ordering.
571 if not configengines:
575 if not configengines:
572 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
576 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
573 return [e for e in compengines
577 return [e for e in compengines
574 if getattr(e.wireprotosupport(), attr) > 0]
578 if getattr(e.wireprotosupport(), attr) > 0]
575
579
576 # If compression engines are listed in the config, assume there is a good
580 # If compression engines are listed in the config, assume there is a good
577 # reason for it (like server operators wanting to achieve specific
581 # reason for it (like server operators wanting to achieve specific
578 # performance characteristics). So fail fast if the config references
582 # performance characteristics). So fail fast if the config references
579 # unusable compression engines.
583 # unusable compression engines.
580 validnames = set(e.name() for e in compengines)
584 validnames = set(e.name() for e in compengines)
581 invalidnames = set(e for e in configengines if e not in validnames)
585 invalidnames = set(e for e in configengines if e not in validnames)
582 if invalidnames:
586 if invalidnames:
583 raise error.Abort(_('invalid compression engine defined in %s: %s') %
587 raise error.Abort(_('invalid compression engine defined in %s: %s') %
584 (config, ', '.join(sorted(invalidnames))))
588 (config, ', '.join(sorted(invalidnames))))
585
589
586 compengines = [e for e in compengines if e.name() in configengines]
590 compengines = [e for e in compengines if e.name() in configengines]
587 compengines = sorted(compengines,
591 compengines = sorted(compengines,
588 key=lambda e: configengines.index(e.name()))
592 key=lambda e: configengines.index(e.name()))
589
593
590 if not compengines:
594 if not compengines:
591 raise error.Abort(_('%s config option does not specify any known '
595 raise error.Abort(_('%s config option does not specify any known '
592 'compression engines') % config,
596 'compression engines') % config,
593 hint=_('usable compression engines: %s') %
597 hint=_('usable compression engines: %s') %
594 ', '.sorted(validnames))
598 ', '.sorted(validnames))
595
599
596 return compengines
600 return compengines
597
601
598 class commandentry(object):
602 class commandentry(object):
599 """Represents a declared wire protocol command."""
603 """Represents a declared wire protocol command."""
600 def __init__(self, func, args='', transports=None,
604 def __init__(self, func, args='', transports=None,
601 permission='push'):
605 permission='push'):
602 self.func = func
606 self.func = func
603 self.args = args
607 self.args = args
604 self.transports = transports or set()
608 self.transports = transports or set()
605 self.permission = permission
609 self.permission = permission
606
610
607 def _merge(self, func, args):
611 def _merge(self, func, args):
608 """Merge this instance with an incoming 2-tuple.
612 """Merge this instance with an incoming 2-tuple.
609
613
610 This is called when a caller using the old 2-tuple API attempts
614 This is called when a caller using the old 2-tuple API attempts
611 to replace an instance. The incoming values are merged with
615 to replace an instance. The incoming values are merged with
612 data not captured by the 2-tuple and a new instance containing
616 data not captured by the 2-tuple and a new instance containing
613 the union of the two objects is returned.
617 the union of the two objects is returned.
614 """
618 """
615 return commandentry(func, args=args, transports=set(self.transports),
619 return commandentry(func, args=args, transports=set(self.transports),
616 permission=self.permission)
620 permission=self.permission)
617
621
618 # Old code treats instances as 2-tuples. So expose that interface.
622 # Old code treats instances as 2-tuples. So expose that interface.
619 def __iter__(self):
623 def __iter__(self):
620 yield self.func
624 yield self.func
621 yield self.args
625 yield self.args
622
626
623 def __getitem__(self, i):
627 def __getitem__(self, i):
624 if i == 0:
628 if i == 0:
625 return self.func
629 return self.func
626 elif i == 1:
630 elif i == 1:
627 return self.args
631 return self.args
628 else:
632 else:
629 raise IndexError('can only access elements 0 and 1')
633 raise IndexError('can only access elements 0 and 1')
630
634
631 class commanddict(dict):
635 class commanddict(dict):
632 """Container for registered wire protocol commands.
636 """Container for registered wire protocol commands.
633
637
634 It behaves like a dict. But __setitem__ is overwritten to allow silent
638 It behaves like a dict. But __setitem__ is overwritten to allow silent
635 coercion of values from 2-tuples for API compatibility.
639 coercion of values from 2-tuples for API compatibility.
636 """
640 """
637 def __setitem__(self, k, v):
641 def __setitem__(self, k, v):
638 if isinstance(v, commandentry):
642 if isinstance(v, commandentry):
639 pass
643 pass
640 # Cast 2-tuples to commandentry instances.
644 # Cast 2-tuples to commandentry instances.
641 elif isinstance(v, tuple):
645 elif isinstance(v, tuple):
642 if len(v) != 2:
646 if len(v) != 2:
643 raise ValueError('command tuples must have exactly 2 elements')
647 raise ValueError('command tuples must have exactly 2 elements')
644
648
645 # It is common for extensions to wrap wire protocol commands via
649 # It is common for extensions to wrap wire protocol commands via
646 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
650 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
647 # doing this aren't aware of the new API that uses objects to store
651 # doing this aren't aware of the new API that uses objects to store
648 # command entries, we automatically merge old state with new.
652 # command entries, we automatically merge old state with new.
649 if k in self:
653 if k in self:
650 v = self[k]._merge(v[0], v[1])
654 v = self[k]._merge(v[0], v[1])
651 else:
655 else:
652 # Use default values from @wireprotocommand.
656 # Use default values from @wireprotocommand.
653 v = commandentry(v[0], args=v[1],
657 v = commandentry(v[0], args=v[1],
654 transports=set(wireprototypes.TRANSPORTS),
658 transports=set(wireprototypes.TRANSPORTS),
655 permission='push')
659 permission='push')
656 else:
660 else:
657 raise ValueError('command entries must be commandentry instances '
661 raise ValueError('command entries must be commandentry instances '
658 'or 2-tuples')
662 'or 2-tuples')
659
663
660 return super(commanddict, self).__setitem__(k, v)
664 return super(commanddict, self).__setitem__(k, v)
661
665
662 def commandavailable(self, command, proto):
666 def commandavailable(self, command, proto):
663 """Determine if a command is available for the requested protocol."""
667 """Determine if a command is available for the requested protocol."""
664 assert proto.name in wireprototypes.TRANSPORTS
668 assert proto.name in wireprototypes.TRANSPORTS
665
669
666 entry = self.get(command)
670 entry = self.get(command)
667
671
668 if not entry:
672 if not entry:
669 return False
673 return False
670
674
671 if proto.name not in entry.transports:
675 if proto.name not in entry.transports:
672 return False
676 return False
673
677
674 return True
678 return True
675
679
676 # Constants specifying which transports a wire protocol command should be
680 # Constants specifying which transports a wire protocol command should be
677 # available on. For use with @wireprotocommand.
681 # available on. For use with @wireprotocommand.
678 POLICY_ALL = 'all'
682 POLICY_ALL = 'all'
679 POLICY_V1_ONLY = 'v1-only'
683 POLICY_V1_ONLY = 'v1-only'
680 POLICY_V2_ONLY = 'v2-only'
684 POLICY_V2_ONLY = 'v2-only'
681
685
686 # For version 1 transports.
682 commands = commanddict()
687 commands = commanddict()
683
688
689 # For version 2 transports.
690 commandsv2 = commanddict()
691
684 def wireprotocommand(name, args='', transportpolicy=POLICY_ALL,
692 def wireprotocommand(name, args='', transportpolicy=POLICY_ALL,
685 permission='push'):
693 permission='push'):
686 """Decorator to declare a wire protocol command.
694 """Decorator to declare a wire protocol command.
687
695
688 ``name`` is the name of the wire protocol command being provided.
696 ``name`` is the name of the wire protocol command being provided.
689
697
690 ``args`` is a space-delimited list of named arguments that the command
698 ``args`` is a space-delimited list of named arguments that the command
691 accepts. ``*`` is a special value that says to accept all arguments.
699 accepts. ``*`` is a special value that says to accept all arguments.
692
700
693 ``transportpolicy`` is a POLICY_* constant denoting which transports
701 ``transportpolicy`` is a POLICY_* constant denoting which transports
694 this wire protocol command should be exposed to. By default, commands
702 this wire protocol command should be exposed to. By default, commands
695 are exposed to all wire protocol transports.
703 are exposed to all wire protocol transports.
696
704
697 ``permission`` defines the permission type needed to run this command.
705 ``permission`` defines the permission type needed to run this command.
698 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
706 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
699 respectively. Default is to assume command requires ``push`` permissions
707 respectively. Default is to assume command requires ``push`` permissions
700 because otherwise commands not declaring their permissions could modify
708 because otherwise commands not declaring their permissions could modify
701 a repository that is supposed to be read-only.
709 a repository that is supposed to be read-only.
702 """
710 """
703 if transportpolicy == POLICY_ALL:
711 if transportpolicy == POLICY_ALL:
704 transports = set(wireprototypes.TRANSPORTS)
712 transports = set(wireprototypes.TRANSPORTS)
713 transportversions = {1, 2}
705 elif transportpolicy == POLICY_V1_ONLY:
714 elif transportpolicy == POLICY_V1_ONLY:
706 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
715 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
707 if v['version'] == 1}
716 if v['version'] == 1}
717 transportversions = {1}
708 elif transportpolicy == POLICY_V2_ONLY:
718 elif transportpolicy == POLICY_V2_ONLY:
709 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
719 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
710 if v['version'] == 2}
720 if v['version'] == 2}
721 transportversions = {2}
711 else:
722 else:
712 raise error.ProgrammingError('invalid transport policy value: %s' %
723 raise error.ProgrammingError('invalid transport policy value: %s' %
713 transportpolicy)
724 transportpolicy)
714
725
715 # Because SSHv2 is a mirror of SSHv1, we allow "batch" commands through to
726 # Because SSHv2 is a mirror of SSHv1, we allow "batch" commands through to
716 # SSHv2.
727 # SSHv2.
717 # TODO undo this hack when SSH is using the unified frame protocol.
728 # TODO undo this hack when SSH is using the unified frame protocol.
718 if name == b'batch':
729 if name == b'batch':
719 transports.add(wireprototypes.SSHV2)
730 transports.add(wireprototypes.SSHV2)
720
731
721 if permission not in ('push', 'pull'):
732 if permission not in ('push', 'pull'):
722 raise error.ProgrammingError('invalid wire protocol permission; '
733 raise error.ProgrammingError('invalid wire protocol permission; '
723 'got %s; expected "push" or "pull"' %
734 'got %s; expected "push" or "pull"' %
724 permission)
735 permission)
725
736
726 def register(func):
737 def register(func):
727 commands[name] = commandentry(func, args=args, transports=transports,
738 if 1 in transportversions:
739 if name in commands:
740 raise error.ProgrammingError('%s command already registered '
741 'for version 1' % name)
742 commands[name] = commandentry(func, args=args,
743 transports=transports,
728 permission=permission)
744 permission=permission)
745 if 2 in transportversions:
746 if name in commandsv2:
747 raise error.ProgrammingError('%s command already registered '
748 'for version 2' % name)
749 commandsv2[name] = commandentry(func, args=args,
750 transports=transports,
751 permission=permission)
752
729 return func
753 return func
730 return register
754 return register
731
755
732 # TODO define a more appropriate permissions type to use for this.
756 # TODO define a more appropriate permissions type to use for this.
733 @wireprotocommand('batch', 'cmds *', permission='pull',
757 @wireprotocommand('batch', 'cmds *', permission='pull',
734 transportpolicy=POLICY_V1_ONLY)
758 transportpolicy=POLICY_V1_ONLY)
735 def batch(repo, proto, cmds, others):
759 def batch(repo, proto, cmds, others):
736 repo = repo.filtered("served")
760 repo = repo.filtered("served")
737 res = []
761 res = []
738 for pair in cmds.split(';'):
762 for pair in cmds.split(';'):
739 op, args = pair.split(' ', 1)
763 op, args = pair.split(' ', 1)
740 vals = {}
764 vals = {}
741 for a in args.split(','):
765 for a in args.split(','):
742 if a:
766 if a:
743 n, v = a.split('=')
767 n, v = a.split('=')
744 vals[unescapearg(n)] = unescapearg(v)
768 vals[unescapearg(n)] = unescapearg(v)
745 func, spec = commands[op]
769 func, spec = commands[op]
746
770
747 # Validate that client has permissions to perform this command.
771 # Validate that client has permissions to perform this command.
748 perm = commands[op].permission
772 perm = commands[op].permission
749 assert perm in ('push', 'pull')
773 assert perm in ('push', 'pull')
750 proto.checkperm(perm)
774 proto.checkperm(perm)
751
775
752 if spec:
776 if spec:
753 keys = spec.split()
777 keys = spec.split()
754 data = {}
778 data = {}
755 for k in keys:
779 for k in keys:
756 if k == '*':
780 if k == '*':
757 star = {}
781 star = {}
758 for key in vals.keys():
782 for key in vals.keys():
759 if key not in keys:
783 if key not in keys:
760 star[key] = vals[key]
784 star[key] = vals[key]
761 data['*'] = star
785 data['*'] = star
762 else:
786 else:
763 data[k] = vals[k]
787 data[k] = vals[k]
764 result = func(repo, proto, *[data[k] for k in keys])
788 result = func(repo, proto, *[data[k] for k in keys])
765 else:
789 else:
766 result = func(repo, proto)
790 result = func(repo, proto)
767 if isinstance(result, wireprototypes.ooberror):
791 if isinstance(result, wireprototypes.ooberror):
768 return result
792 return result
769
793
770 # For now, all batchable commands must return bytesresponse or
794 # For now, all batchable commands must return bytesresponse or
771 # raw bytes (for backwards compatibility).
795 # raw bytes (for backwards compatibility).
772 assert isinstance(result, (wireprototypes.bytesresponse, bytes))
796 assert isinstance(result, (wireprototypes.bytesresponse, bytes))
773 if isinstance(result, wireprototypes.bytesresponse):
797 if isinstance(result, wireprototypes.bytesresponse):
774 result = result.data
798 result = result.data
775 res.append(escapearg(result))
799 res.append(escapearg(result))
776
800
777 return wireprototypes.bytesresponse(';'.join(res))
801 return wireprototypes.bytesresponse(';'.join(res))
778
802
779 @wireprotocommand('between', 'pairs', transportpolicy=POLICY_V1_ONLY,
803 @wireprotocommand('between', 'pairs', transportpolicy=POLICY_V1_ONLY,
780 permission='pull')
804 permission='pull')
781 def between(repo, proto, pairs):
805 def between(repo, proto, pairs):
782 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
806 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
783 r = []
807 r = []
784 for b in repo.between(pairs):
808 for b in repo.between(pairs):
785 r.append(encodelist(b) + "\n")
809 r.append(encodelist(b) + "\n")
786
810
787 return wireprototypes.bytesresponse(''.join(r))
811 return wireprototypes.bytesresponse(''.join(r))
788
812
789 @wireprotocommand('branchmap', permission='pull')
813 @wireprotocommand('branchmap', permission='pull')
790 def branchmap(repo, proto):
814 def branchmap(repo, proto):
791 branchmap = repo.branchmap()
815 branchmap = repo.branchmap()
792 heads = []
816 heads = []
793 for branch, nodes in branchmap.iteritems():
817 for branch, nodes in branchmap.iteritems():
794 branchname = urlreq.quote(encoding.fromlocal(branch))
818 branchname = urlreq.quote(encoding.fromlocal(branch))
795 branchnodes = encodelist(nodes)
819 branchnodes = encodelist(nodes)
796 heads.append('%s %s' % (branchname, branchnodes))
820 heads.append('%s %s' % (branchname, branchnodes))
797
821
798 return wireprototypes.bytesresponse('\n'.join(heads))
822 return wireprototypes.bytesresponse('\n'.join(heads))
799
823
800 @wireprotocommand('branches', 'nodes', transportpolicy=POLICY_V1_ONLY,
824 @wireprotocommand('branches', 'nodes', transportpolicy=POLICY_V1_ONLY,
801 permission='pull')
825 permission='pull')
802 def branches(repo, proto, nodes):
826 def branches(repo, proto, nodes):
803 nodes = decodelist(nodes)
827 nodes = decodelist(nodes)
804 r = []
828 r = []
805 for b in repo.branches(nodes):
829 for b in repo.branches(nodes):
806 r.append(encodelist(b) + "\n")
830 r.append(encodelist(b) + "\n")
807
831
808 return wireprototypes.bytesresponse(''.join(r))
832 return wireprototypes.bytesresponse(''.join(r))
809
833
810 @wireprotocommand('clonebundles', '', permission='pull')
834 @wireprotocommand('clonebundles', '', permission='pull')
811 def clonebundles(repo, proto):
835 def clonebundles(repo, proto):
812 """Server command for returning info for available bundles to seed clones.
836 """Server command for returning info for available bundles to seed clones.
813
837
814 Clients will parse this response and determine what bundle to fetch.
838 Clients will parse this response and determine what bundle to fetch.
815
839
816 Extensions may wrap this command to filter or dynamically emit data
840 Extensions may wrap this command to filter or dynamically emit data
817 depending on the request. e.g. you could advertise URLs for the closest
841 depending on the request. e.g. you could advertise URLs for the closest
818 data center given the client's IP address.
842 data center given the client's IP address.
819 """
843 """
820 return wireprototypes.bytesresponse(
844 return wireprototypes.bytesresponse(
821 repo.vfs.tryread('clonebundles.manifest'))
845 repo.vfs.tryread('clonebundles.manifest'))
822
846
823 wireprotocaps = ['lookup', 'branchmap', 'pushkey',
847 wireprotocaps = ['lookup', 'branchmap', 'pushkey',
824 'known', 'getbundle', 'unbundlehash']
848 'known', 'getbundle', 'unbundlehash']
825
849
826 def _capabilities(repo, proto):
850 def _capabilities(repo, proto):
827 """return a list of capabilities for a repo
851 """return a list of capabilities for a repo
828
852
829 This function exists to allow extensions to easily wrap capabilities
853 This function exists to allow extensions to easily wrap capabilities
830 computation
854 computation
831
855
832 - returns a lists: easy to alter
856 - returns a lists: easy to alter
833 - change done here will be propagated to both `capabilities` and `hello`
857 - change done here will be propagated to both `capabilities` and `hello`
834 command without any other action needed.
858 command without any other action needed.
835 """
859 """
836 # copy to prevent modification of the global list
860 # copy to prevent modification of the global list
837 caps = list(wireprotocaps)
861 caps = list(wireprotocaps)
838
862
839 # Command of same name as capability isn't exposed to version 1 of
863 # Command of same name as capability isn't exposed to version 1 of
840 # transports. So conditionally add it.
864 # transports. So conditionally add it.
841 if commands.commandavailable('changegroupsubset', proto):
865 if commands.commandavailable('changegroupsubset', proto):
842 caps.append('changegroupsubset')
866 caps.append('changegroupsubset')
843
867
844 if streamclone.allowservergeneration(repo):
868 if streamclone.allowservergeneration(repo):
845 if repo.ui.configbool('server', 'preferuncompressed'):
869 if repo.ui.configbool('server', 'preferuncompressed'):
846 caps.append('stream-preferred')
870 caps.append('stream-preferred')
847 requiredformats = repo.requirements & repo.supportedformats
871 requiredformats = repo.requirements & repo.supportedformats
848 # if our local revlogs are just revlogv1, add 'stream' cap
872 # if our local revlogs are just revlogv1, add 'stream' cap
849 if not requiredformats - {'revlogv1'}:
873 if not requiredformats - {'revlogv1'}:
850 caps.append('stream')
874 caps.append('stream')
851 # otherwise, add 'streamreqs' detailing our local revlog format
875 # otherwise, add 'streamreqs' detailing our local revlog format
852 else:
876 else:
853 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
877 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
854 if repo.ui.configbool('experimental', 'bundle2-advertise'):
878 if repo.ui.configbool('experimental', 'bundle2-advertise'):
855 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
879 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
856 caps.append('bundle2=' + urlreq.quote(capsblob))
880 caps.append('bundle2=' + urlreq.quote(capsblob))
857 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
881 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
858
882
859 return proto.addcapabilities(repo, caps)
883 return proto.addcapabilities(repo, caps)
860
884
861 # If you are writing an extension and consider wrapping this function. Wrap
885 # If you are writing an extension and consider wrapping this function. Wrap
862 # `_capabilities` instead.
886 # `_capabilities` instead.
863 @wireprotocommand('capabilities', permission='pull')
887 @wireprotocommand('capabilities', permission='pull')
864 def capabilities(repo, proto):
888 def capabilities(repo, proto):
865 return wireprototypes.bytesresponse(' '.join(_capabilities(repo, proto)))
889 return wireprototypes.bytesresponse(' '.join(_capabilities(repo, proto)))
866
890
867 @wireprotocommand('changegroup', 'roots', transportpolicy=POLICY_V1_ONLY,
891 @wireprotocommand('changegroup', 'roots', transportpolicy=POLICY_V1_ONLY,
868 permission='pull')
892 permission='pull')
869 def changegroup(repo, proto, roots):
893 def changegroup(repo, proto, roots):
870 nodes = decodelist(roots)
894 nodes = decodelist(roots)
871 outgoing = discovery.outgoing(repo, missingroots=nodes,
895 outgoing = discovery.outgoing(repo, missingroots=nodes,
872 missingheads=repo.heads())
896 missingheads=repo.heads())
873 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
897 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
874 gen = iter(lambda: cg.read(32768), '')
898 gen = iter(lambda: cg.read(32768), '')
875 return wireprototypes.streamres(gen=gen)
899 return wireprototypes.streamres(gen=gen)
876
900
877 @wireprotocommand('changegroupsubset', 'bases heads',
901 @wireprotocommand('changegroupsubset', 'bases heads',
878 transportpolicy=POLICY_V1_ONLY,
902 transportpolicy=POLICY_V1_ONLY,
879 permission='pull')
903 permission='pull')
880 def changegroupsubset(repo, proto, bases, heads):
904 def changegroupsubset(repo, proto, bases, heads):
881 bases = decodelist(bases)
905 bases = decodelist(bases)
882 heads = decodelist(heads)
906 heads = decodelist(heads)
883 outgoing = discovery.outgoing(repo, missingroots=bases,
907 outgoing = discovery.outgoing(repo, missingroots=bases,
884 missingheads=heads)
908 missingheads=heads)
885 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
909 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
886 gen = iter(lambda: cg.read(32768), '')
910 gen = iter(lambda: cg.read(32768), '')
887 return wireprototypes.streamres(gen=gen)
911 return wireprototypes.streamres(gen=gen)
888
912
889 @wireprotocommand('debugwireargs', 'one two *',
913 @wireprotocommand('debugwireargs', 'one two *',
890 permission='pull')
914 permission='pull')
891 def debugwireargs(repo, proto, one, two, others):
915 def debugwireargs(repo, proto, one, two, others):
892 # only accept optional args from the known set
916 # only accept optional args from the known set
893 opts = options('debugwireargs', ['three', 'four'], others)
917 opts = options('debugwireargs', ['three', 'four'], others)
894 return wireprototypes.bytesresponse(repo.debugwireargs(
918 return wireprototypes.bytesresponse(repo.debugwireargs(
895 one, two, **pycompat.strkwargs(opts)))
919 one, two, **pycompat.strkwargs(opts)))
896
920
897 @wireprotocommand('getbundle', '*', permission='pull')
921 @wireprotocommand('getbundle', '*', permission='pull')
898 def getbundle(repo, proto, others):
922 def getbundle(repo, proto, others):
899 opts = options('getbundle', gboptsmap.keys(), others)
923 opts = options('getbundle', gboptsmap.keys(), others)
900 for k, v in opts.iteritems():
924 for k, v in opts.iteritems():
901 keytype = gboptsmap[k]
925 keytype = gboptsmap[k]
902 if keytype == 'nodes':
926 if keytype == 'nodes':
903 opts[k] = decodelist(v)
927 opts[k] = decodelist(v)
904 elif keytype == 'csv':
928 elif keytype == 'csv':
905 opts[k] = list(v.split(','))
929 opts[k] = list(v.split(','))
906 elif keytype == 'scsv':
930 elif keytype == 'scsv':
907 opts[k] = set(v.split(','))
931 opts[k] = set(v.split(','))
908 elif keytype == 'boolean':
932 elif keytype == 'boolean':
909 # Client should serialize False as '0', which is a non-empty string
933 # Client should serialize False as '0', which is a non-empty string
910 # so it evaluates as a True bool.
934 # so it evaluates as a True bool.
911 if v == '0':
935 if v == '0':
912 opts[k] = False
936 opts[k] = False
913 else:
937 else:
914 opts[k] = bool(v)
938 opts[k] = bool(v)
915 elif keytype != 'plain':
939 elif keytype != 'plain':
916 raise KeyError('unknown getbundle option type %s'
940 raise KeyError('unknown getbundle option type %s'
917 % keytype)
941 % keytype)
918
942
919 if not bundle1allowed(repo, 'pull'):
943 if not bundle1allowed(repo, 'pull'):
920 if not exchange.bundle2requested(opts.get('bundlecaps')):
944 if not exchange.bundle2requested(opts.get('bundlecaps')):
921 if proto.name == 'http-v1':
945 if proto.name == 'http-v1':
922 return wireprototypes.ooberror(bundle2required)
946 return wireprototypes.ooberror(bundle2required)
923 raise error.Abort(bundle2requiredmain,
947 raise error.Abort(bundle2requiredmain,
924 hint=bundle2requiredhint)
948 hint=bundle2requiredhint)
925
949
926 prefercompressed = True
950 prefercompressed = True
927
951
928 try:
952 try:
929 if repo.ui.configbool('server', 'disablefullbundle'):
953 if repo.ui.configbool('server', 'disablefullbundle'):
930 # Check to see if this is a full clone.
954 # Check to see if this is a full clone.
931 clheads = set(repo.changelog.heads())
955 clheads = set(repo.changelog.heads())
932 changegroup = opts.get('cg', True)
956 changegroup = opts.get('cg', True)
933 heads = set(opts.get('heads', set()))
957 heads = set(opts.get('heads', set()))
934 common = set(opts.get('common', set()))
958 common = set(opts.get('common', set()))
935 common.discard(nullid)
959 common.discard(nullid)
936 if changegroup and not common and clheads == heads:
960 if changegroup and not common and clheads == heads:
937 raise error.Abort(
961 raise error.Abort(
938 _('server has pull-based clones disabled'),
962 _('server has pull-based clones disabled'),
939 hint=_('remove --pull if specified or upgrade Mercurial'))
963 hint=_('remove --pull if specified or upgrade Mercurial'))
940
964
941 info, chunks = exchange.getbundlechunks(repo, 'serve',
965 info, chunks = exchange.getbundlechunks(repo, 'serve',
942 **pycompat.strkwargs(opts))
966 **pycompat.strkwargs(opts))
943 prefercompressed = info.get('prefercompressed', True)
967 prefercompressed = info.get('prefercompressed', True)
944 except error.Abort as exc:
968 except error.Abort as exc:
945 # cleanly forward Abort error to the client
969 # cleanly forward Abort error to the client
946 if not exchange.bundle2requested(opts.get('bundlecaps')):
970 if not exchange.bundle2requested(opts.get('bundlecaps')):
947 if proto.name == 'http-v1':
971 if proto.name == 'http-v1':
948 return wireprototypes.ooberror(pycompat.bytestr(exc) + '\n')
972 return wireprototypes.ooberror(pycompat.bytestr(exc) + '\n')
949 raise # cannot do better for bundle1 + ssh
973 raise # cannot do better for bundle1 + ssh
950 # bundle2 request expect a bundle2 reply
974 # bundle2 request expect a bundle2 reply
951 bundler = bundle2.bundle20(repo.ui)
975 bundler = bundle2.bundle20(repo.ui)
952 manargs = [('message', pycompat.bytestr(exc))]
976 manargs = [('message', pycompat.bytestr(exc))]
953 advargs = []
977 advargs = []
954 if exc.hint is not None:
978 if exc.hint is not None:
955 advargs.append(('hint', exc.hint))
979 advargs.append(('hint', exc.hint))
956 bundler.addpart(bundle2.bundlepart('error:abort',
980 bundler.addpart(bundle2.bundlepart('error:abort',
957 manargs, advargs))
981 manargs, advargs))
958 chunks = bundler.getchunks()
982 chunks = bundler.getchunks()
959 prefercompressed = False
983 prefercompressed = False
960
984
961 return wireprototypes.streamres(
985 return wireprototypes.streamres(
962 gen=chunks, prefer_uncompressed=not prefercompressed)
986 gen=chunks, prefer_uncompressed=not prefercompressed)
963
987
964 @wireprotocommand('heads', permission='pull')
988 @wireprotocommand('heads', permission='pull')
965 def heads(repo, proto):
989 def heads(repo, proto):
966 h = repo.heads()
990 h = repo.heads()
967 return wireprototypes.bytesresponse(encodelist(h) + '\n')
991 return wireprototypes.bytesresponse(encodelist(h) + '\n')
968
992
969 @wireprotocommand('hello', permission='pull')
993 @wireprotocommand('hello', permission='pull')
970 def hello(repo, proto):
994 def hello(repo, proto):
971 """Called as part of SSH handshake to obtain server info.
995 """Called as part of SSH handshake to obtain server info.
972
996
973 Returns a list of lines describing interesting things about the
997 Returns a list of lines describing interesting things about the
974 server, in an RFC822-like format.
998 server, in an RFC822-like format.
975
999
976 Currently, the only one defined is ``capabilities``, which consists of a
1000 Currently, the only one defined is ``capabilities``, which consists of a
977 line of space separated tokens describing server abilities:
1001 line of space separated tokens describing server abilities:
978
1002
979 capabilities: <token0> <token1> <token2>
1003 capabilities: <token0> <token1> <token2>
980 """
1004 """
981 caps = capabilities(repo, proto).data
1005 caps = capabilities(repo, proto).data
982 return wireprototypes.bytesresponse('capabilities: %s\n' % caps)
1006 return wireprototypes.bytesresponse('capabilities: %s\n' % caps)
983
1007
984 @wireprotocommand('listkeys', 'namespace', permission='pull')
1008 @wireprotocommand('listkeys', 'namespace', permission='pull')
985 def listkeys(repo, proto, namespace):
1009 def listkeys(repo, proto, namespace):
986 d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
1010 d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
987 return wireprototypes.bytesresponse(pushkeymod.encodekeys(d))
1011 return wireprototypes.bytesresponse(pushkeymod.encodekeys(d))
988
1012
989 @wireprotocommand('lookup', 'key', permission='pull')
1013 @wireprotocommand('lookup', 'key', permission='pull')
990 def lookup(repo, proto, key):
1014 def lookup(repo, proto, key):
991 try:
1015 try:
992 k = encoding.tolocal(key)
1016 k = encoding.tolocal(key)
993 c = repo[k]
1017 c = repo[k]
994 r = c.hex()
1018 r = c.hex()
995 success = 1
1019 success = 1
996 except Exception as inst:
1020 except Exception as inst:
997 r = stringutil.forcebytestr(inst)
1021 r = stringutil.forcebytestr(inst)
998 success = 0
1022 success = 0
999 return wireprototypes.bytesresponse('%d %s\n' % (success, r))
1023 return wireprototypes.bytesresponse('%d %s\n' % (success, r))
1000
1024
1001 @wireprotocommand('known', 'nodes *', permission='pull')
1025 @wireprotocommand('known', 'nodes *', permission='pull')
1002 def known(repo, proto, nodes, others):
1026 def known(repo, proto, nodes, others):
1003 v = ''.join(b and '1' or '0' for b in repo.known(decodelist(nodes)))
1027 v = ''.join(b and '1' or '0' for b in repo.known(decodelist(nodes)))
1004 return wireprototypes.bytesresponse(v)
1028 return wireprototypes.bytesresponse(v)
1005
1029
1006 @wireprotocommand('pushkey', 'namespace key old new', permission='push')
1030 @wireprotocommand('pushkey', 'namespace key old new', permission='push')
1007 def pushkey(repo, proto, namespace, key, old, new):
1031 def pushkey(repo, proto, namespace, key, old, new):
1008 # compatibility with pre-1.8 clients which were accidentally
1032 # compatibility with pre-1.8 clients which were accidentally
1009 # sending raw binary nodes rather than utf-8-encoded hex
1033 # sending raw binary nodes rather than utf-8-encoded hex
1010 if len(new) == 20 and stringutil.escapestr(new) != new:
1034 if len(new) == 20 and stringutil.escapestr(new) != new:
1011 # looks like it could be a binary node
1035 # looks like it could be a binary node
1012 try:
1036 try:
1013 new.decode('utf-8')
1037 new.decode('utf-8')
1014 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
1038 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
1015 except UnicodeDecodeError:
1039 except UnicodeDecodeError:
1016 pass # binary, leave unmodified
1040 pass # binary, leave unmodified
1017 else:
1041 else:
1018 new = encoding.tolocal(new) # normal path
1042 new = encoding.tolocal(new) # normal path
1019
1043
1020 with proto.mayberedirectstdio() as output:
1044 with proto.mayberedirectstdio() as output:
1021 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
1045 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
1022 encoding.tolocal(old), new) or False
1046 encoding.tolocal(old), new) or False
1023
1047
1024 output = output.getvalue() if output else ''
1048 output = output.getvalue() if output else ''
1025 return wireprototypes.bytesresponse('%d\n%s' % (int(r), output))
1049 return wireprototypes.bytesresponse('%d\n%s' % (int(r), output))
1026
1050
1027 @wireprotocommand('stream_out', permission='pull')
1051 @wireprotocommand('stream_out', permission='pull')
1028 def stream(repo, proto):
1052 def stream(repo, proto):
1029 '''If the server supports streaming clone, it advertises the "stream"
1053 '''If the server supports streaming clone, it advertises the "stream"
1030 capability with a value representing the version and flags of the repo
1054 capability with a value representing the version and flags of the repo
1031 it is serving. Client checks to see if it understands the format.
1055 it is serving. Client checks to see if it understands the format.
1032 '''
1056 '''
1033 return wireprototypes.streamreslegacy(
1057 return wireprototypes.streamreslegacy(
1034 streamclone.generatev1wireproto(repo))
1058 streamclone.generatev1wireproto(repo))
1035
1059
1036 @wireprotocommand('unbundle', 'heads', permission='push')
1060 @wireprotocommand('unbundle', 'heads', permission='push')
1037 def unbundle(repo, proto, heads):
1061 def unbundle(repo, proto, heads):
1038 their_heads = decodelist(heads)
1062 their_heads = decodelist(heads)
1039
1063
1040 with proto.mayberedirectstdio() as output:
1064 with proto.mayberedirectstdio() as output:
1041 try:
1065 try:
1042 exchange.check_heads(repo, their_heads, 'preparing changes')
1066 exchange.check_heads(repo, their_heads, 'preparing changes')
1043
1067
1044 # write bundle data to temporary file because it can be big
1068 # write bundle data to temporary file because it can be big
1045 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1069 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1046 fp = os.fdopen(fd, r'wb+')
1070 fp = os.fdopen(fd, r'wb+')
1047 r = 0
1071 r = 0
1048 try:
1072 try:
1049 proto.forwardpayload(fp)
1073 proto.forwardpayload(fp)
1050 fp.seek(0)
1074 fp.seek(0)
1051 gen = exchange.readbundle(repo.ui, fp, None)
1075 gen = exchange.readbundle(repo.ui, fp, None)
1052 if (isinstance(gen, changegroupmod.cg1unpacker)
1076 if (isinstance(gen, changegroupmod.cg1unpacker)
1053 and not bundle1allowed(repo, 'push')):
1077 and not bundle1allowed(repo, 'push')):
1054 if proto.name == 'http-v1':
1078 if proto.name == 'http-v1':
1055 # need to special case http because stderr do not get to
1079 # need to special case http because stderr do not get to
1056 # the http client on failed push so we need to abuse
1080 # the http client on failed push so we need to abuse
1057 # some other error type to make sure the message get to
1081 # some other error type to make sure the message get to
1058 # the user.
1082 # the user.
1059 return wireprototypes.ooberror(bundle2required)
1083 return wireprototypes.ooberror(bundle2required)
1060 raise error.Abort(bundle2requiredmain,
1084 raise error.Abort(bundle2requiredmain,
1061 hint=bundle2requiredhint)
1085 hint=bundle2requiredhint)
1062
1086
1063 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1087 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1064 proto.client())
1088 proto.client())
1065 if util.safehasattr(r, 'addpart'):
1089 if util.safehasattr(r, 'addpart'):
1066 # The return looks streamable, we are in the bundle2 case
1090 # The return looks streamable, we are in the bundle2 case
1067 # and should return a stream.
1091 # and should return a stream.
1068 return wireprototypes.streamreslegacy(gen=r.getchunks())
1092 return wireprototypes.streamreslegacy(gen=r.getchunks())
1069 return wireprototypes.pushres(
1093 return wireprototypes.pushres(
1070 r, output.getvalue() if output else '')
1094 r, output.getvalue() if output else '')
1071
1095
1072 finally:
1096 finally:
1073 fp.close()
1097 fp.close()
1074 os.unlink(tempname)
1098 os.unlink(tempname)
1075
1099
1076 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1100 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1077 # handle non-bundle2 case first
1101 # handle non-bundle2 case first
1078 if not getattr(exc, 'duringunbundle2', False):
1102 if not getattr(exc, 'duringunbundle2', False):
1079 try:
1103 try:
1080 raise
1104 raise
1081 except error.Abort:
1105 except error.Abort:
1082 # The old code we moved used procutil.stderr directly.
1106 # The old code we moved used procutil.stderr directly.
1083 # We did not change it to minimise code change.
1107 # We did not change it to minimise code change.
1084 # This need to be moved to something proper.
1108 # This need to be moved to something proper.
1085 # Feel free to do it.
1109 # Feel free to do it.
1086 procutil.stderr.write("abort: %s\n" % exc)
1110 procutil.stderr.write("abort: %s\n" % exc)
1087 if exc.hint is not None:
1111 if exc.hint is not None:
1088 procutil.stderr.write("(%s)\n" % exc.hint)
1112 procutil.stderr.write("(%s)\n" % exc.hint)
1089 procutil.stderr.flush()
1113 procutil.stderr.flush()
1090 return wireprototypes.pushres(
1114 return wireprototypes.pushres(
1091 0, output.getvalue() if output else '')
1115 0, output.getvalue() if output else '')
1092 except error.PushRaced:
1116 except error.PushRaced:
1093 return wireprototypes.pusherr(
1117 return wireprototypes.pusherr(
1094 pycompat.bytestr(exc),
1118 pycompat.bytestr(exc),
1095 output.getvalue() if output else '')
1119 output.getvalue() if output else '')
1096
1120
1097 bundler = bundle2.bundle20(repo.ui)
1121 bundler = bundle2.bundle20(repo.ui)
1098 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1122 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1099 bundler.addpart(out)
1123 bundler.addpart(out)
1100 try:
1124 try:
1101 try:
1125 try:
1102 raise
1126 raise
1103 except error.PushkeyFailed as exc:
1127 except error.PushkeyFailed as exc:
1104 # check client caps
1128 # check client caps
1105 remotecaps = getattr(exc, '_replycaps', None)
1129 remotecaps = getattr(exc, '_replycaps', None)
1106 if (remotecaps is not None
1130 if (remotecaps is not None
1107 and 'pushkey' not in remotecaps.get('error', ())):
1131 and 'pushkey' not in remotecaps.get('error', ())):
1108 # no support remote side, fallback to Abort handler.
1132 # no support remote side, fallback to Abort handler.
1109 raise
1133 raise
1110 part = bundler.newpart('error:pushkey')
1134 part = bundler.newpart('error:pushkey')
1111 part.addparam('in-reply-to', exc.partid)
1135 part.addparam('in-reply-to', exc.partid)
1112 if exc.namespace is not None:
1136 if exc.namespace is not None:
1113 part.addparam('namespace', exc.namespace,
1137 part.addparam('namespace', exc.namespace,
1114 mandatory=False)
1138 mandatory=False)
1115 if exc.key is not None:
1139 if exc.key is not None:
1116 part.addparam('key', exc.key, mandatory=False)
1140 part.addparam('key', exc.key, mandatory=False)
1117 if exc.new is not None:
1141 if exc.new is not None:
1118 part.addparam('new', exc.new, mandatory=False)
1142 part.addparam('new', exc.new, mandatory=False)
1119 if exc.old is not None:
1143 if exc.old is not None:
1120 part.addparam('old', exc.old, mandatory=False)
1144 part.addparam('old', exc.old, mandatory=False)
1121 if exc.ret is not None:
1145 if exc.ret is not None:
1122 part.addparam('ret', exc.ret, mandatory=False)
1146 part.addparam('ret', exc.ret, mandatory=False)
1123 except error.BundleValueError as exc:
1147 except error.BundleValueError as exc:
1124 errpart = bundler.newpart('error:unsupportedcontent')
1148 errpart = bundler.newpart('error:unsupportedcontent')
1125 if exc.parttype is not None:
1149 if exc.parttype is not None:
1126 errpart.addparam('parttype', exc.parttype)
1150 errpart.addparam('parttype', exc.parttype)
1127 if exc.params:
1151 if exc.params:
1128 errpart.addparam('params', '\0'.join(exc.params))
1152 errpart.addparam('params', '\0'.join(exc.params))
1129 except error.Abort as exc:
1153 except error.Abort as exc:
1130 manargs = [('message', stringutil.forcebytestr(exc))]
1154 manargs = [('message', stringutil.forcebytestr(exc))]
1131 advargs = []
1155 advargs = []
1132 if exc.hint is not None:
1156 if exc.hint is not None:
1133 advargs.append(('hint', exc.hint))
1157 advargs.append(('hint', exc.hint))
1134 bundler.addpart(bundle2.bundlepart('error:abort',
1158 bundler.addpart(bundle2.bundlepart('error:abort',
1135 manargs, advargs))
1159 manargs, advargs))
1136 except error.PushRaced as exc:
1160 except error.PushRaced as exc:
1137 bundler.newpart('error:pushraced',
1161 bundler.newpart('error:pushraced',
1138 [('message', stringutil.forcebytestr(exc))])
1162 [('message', stringutil.forcebytestr(exc))])
1139 return wireprototypes.streamreslegacy(gen=bundler.getchunks())
1163 return wireprototypes.streamreslegacy(gen=bundler.getchunks())
@@ -1,1050 +1,1050 b''
1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 #
3 #
4 # This software may be used and distributed according to the terms of the
4 # This software may be used and distributed according to the terms of the
5 # GNU General Public License version 2 or any later version.
5 # GNU General Public License version 2 or any later version.
6
6
7 from __future__ import absolute_import
7 from __future__ import absolute_import
8
8
9 import contextlib
9 import contextlib
10 import struct
10 import struct
11 import sys
11 import sys
12 import threading
12 import threading
13
13
14 from .i18n import _
14 from .i18n import _
15 from . import (
15 from . import (
16 encoding,
16 encoding,
17 error,
17 error,
18 hook,
18 hook,
19 pycompat,
19 pycompat,
20 util,
20 util,
21 wireproto,
21 wireproto,
22 wireprotoframing,
22 wireprotoframing,
23 wireprototypes,
23 wireprototypes,
24 )
24 )
25 from .utils import (
25 from .utils import (
26 procutil,
26 procutil,
27 )
27 )
28
28
29 stringio = util.stringio
29 stringio = util.stringio
30
30
31 urlerr = util.urlerr
31 urlerr = util.urlerr
32 urlreq = util.urlreq
32 urlreq = util.urlreq
33
33
34 HTTP_OK = 200
34 HTTP_OK = 200
35
35
36 HGTYPE = 'application/mercurial-0.1'
36 HGTYPE = 'application/mercurial-0.1'
37 HGTYPE2 = 'application/mercurial-0.2'
37 HGTYPE2 = 'application/mercurial-0.2'
38 HGERRTYPE = 'application/hg-error'
38 HGERRTYPE = 'application/hg-error'
39 FRAMINGTYPE = b'application/mercurial-exp-framing-0003'
39 FRAMINGTYPE = b'application/mercurial-exp-framing-0003'
40
40
41 HTTPV2 = wireprototypes.HTTPV2
41 HTTPV2 = wireprototypes.HTTPV2
42 SSHV1 = wireprototypes.SSHV1
42 SSHV1 = wireprototypes.SSHV1
43 SSHV2 = wireprototypes.SSHV2
43 SSHV2 = wireprototypes.SSHV2
44
44
45 def decodevaluefromheaders(req, headerprefix):
45 def decodevaluefromheaders(req, headerprefix):
46 """Decode a long value from multiple HTTP request headers.
46 """Decode a long value from multiple HTTP request headers.
47
47
48 Returns the value as a bytes, not a str.
48 Returns the value as a bytes, not a str.
49 """
49 """
50 chunks = []
50 chunks = []
51 i = 1
51 i = 1
52 while True:
52 while True:
53 v = req.headers.get(b'%s-%d' % (headerprefix, i))
53 v = req.headers.get(b'%s-%d' % (headerprefix, i))
54 if v is None:
54 if v is None:
55 break
55 break
56 chunks.append(pycompat.bytesurl(v))
56 chunks.append(pycompat.bytesurl(v))
57 i += 1
57 i += 1
58
58
59 return ''.join(chunks)
59 return ''.join(chunks)
60
60
61 class httpv1protocolhandler(wireprototypes.baseprotocolhandler):
61 class httpv1protocolhandler(wireprototypes.baseprotocolhandler):
62 def __init__(self, req, ui, checkperm):
62 def __init__(self, req, ui, checkperm):
63 self._req = req
63 self._req = req
64 self._ui = ui
64 self._ui = ui
65 self._checkperm = checkperm
65 self._checkperm = checkperm
66
66
67 @property
67 @property
68 def name(self):
68 def name(self):
69 return 'http-v1'
69 return 'http-v1'
70
70
71 def getargs(self, args):
71 def getargs(self, args):
72 knownargs = self._args()
72 knownargs = self._args()
73 data = {}
73 data = {}
74 keys = args.split()
74 keys = args.split()
75 for k in keys:
75 for k in keys:
76 if k == '*':
76 if k == '*':
77 star = {}
77 star = {}
78 for key in knownargs.keys():
78 for key in knownargs.keys():
79 if key != 'cmd' and key not in keys:
79 if key != 'cmd' and key not in keys:
80 star[key] = knownargs[key][0]
80 star[key] = knownargs[key][0]
81 data['*'] = star
81 data['*'] = star
82 else:
82 else:
83 data[k] = knownargs[k][0]
83 data[k] = knownargs[k][0]
84 return [data[k] for k in keys]
84 return [data[k] for k in keys]
85
85
86 def _args(self):
86 def _args(self):
87 args = self._req.qsparams.asdictoflists()
87 args = self._req.qsparams.asdictoflists()
88 postlen = int(self._req.headers.get(b'X-HgArgs-Post', 0))
88 postlen = int(self._req.headers.get(b'X-HgArgs-Post', 0))
89 if postlen:
89 if postlen:
90 args.update(urlreq.parseqs(
90 args.update(urlreq.parseqs(
91 self._req.bodyfh.read(postlen), keep_blank_values=True))
91 self._req.bodyfh.read(postlen), keep_blank_values=True))
92 return args
92 return args
93
93
94 argvalue = decodevaluefromheaders(self._req, b'X-HgArg')
94 argvalue = decodevaluefromheaders(self._req, b'X-HgArg')
95 args.update(urlreq.parseqs(argvalue, keep_blank_values=True))
95 args.update(urlreq.parseqs(argvalue, keep_blank_values=True))
96 return args
96 return args
97
97
98 def forwardpayload(self, fp):
98 def forwardpayload(self, fp):
99 # Existing clients *always* send Content-Length.
99 # Existing clients *always* send Content-Length.
100 length = int(self._req.headers[b'Content-Length'])
100 length = int(self._req.headers[b'Content-Length'])
101
101
102 # If httppostargs is used, we need to read Content-Length
102 # If httppostargs is used, we need to read Content-Length
103 # minus the amount that was consumed by args.
103 # minus the amount that was consumed by args.
104 length -= int(self._req.headers.get(b'X-HgArgs-Post', 0))
104 length -= int(self._req.headers.get(b'X-HgArgs-Post', 0))
105 for s in util.filechunkiter(self._req.bodyfh, limit=length):
105 for s in util.filechunkiter(self._req.bodyfh, limit=length):
106 fp.write(s)
106 fp.write(s)
107
107
108 @contextlib.contextmanager
108 @contextlib.contextmanager
109 def mayberedirectstdio(self):
109 def mayberedirectstdio(self):
110 oldout = self._ui.fout
110 oldout = self._ui.fout
111 olderr = self._ui.ferr
111 olderr = self._ui.ferr
112
112
113 out = util.stringio()
113 out = util.stringio()
114
114
115 try:
115 try:
116 self._ui.fout = out
116 self._ui.fout = out
117 self._ui.ferr = out
117 self._ui.ferr = out
118 yield out
118 yield out
119 finally:
119 finally:
120 self._ui.fout = oldout
120 self._ui.fout = oldout
121 self._ui.ferr = olderr
121 self._ui.ferr = olderr
122
122
123 def client(self):
123 def client(self):
124 return 'remote:%s:%s:%s' % (
124 return 'remote:%s:%s:%s' % (
125 self._req.urlscheme,
125 self._req.urlscheme,
126 urlreq.quote(self._req.remotehost or ''),
126 urlreq.quote(self._req.remotehost or ''),
127 urlreq.quote(self._req.remoteuser or ''))
127 urlreq.quote(self._req.remoteuser or ''))
128
128
129 def addcapabilities(self, repo, caps):
129 def addcapabilities(self, repo, caps):
130 caps.append(b'batch')
130 caps.append(b'batch')
131
131
132 caps.append('httpheader=%d' %
132 caps.append('httpheader=%d' %
133 repo.ui.configint('server', 'maxhttpheaderlen'))
133 repo.ui.configint('server', 'maxhttpheaderlen'))
134 if repo.ui.configbool('experimental', 'httppostargs'):
134 if repo.ui.configbool('experimental', 'httppostargs'):
135 caps.append('httppostargs')
135 caps.append('httppostargs')
136
136
137 # FUTURE advertise 0.2rx once support is implemented
137 # FUTURE advertise 0.2rx once support is implemented
138 # FUTURE advertise minrx and mintx after consulting config option
138 # FUTURE advertise minrx and mintx after consulting config option
139 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
139 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
140
140
141 compengines = wireproto.supportedcompengines(repo.ui, util.SERVERROLE)
141 compengines = wireproto.supportedcompengines(repo.ui, util.SERVERROLE)
142 if compengines:
142 if compengines:
143 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
143 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
144 for e in compengines)
144 for e in compengines)
145 caps.append('compression=%s' % comptypes)
145 caps.append('compression=%s' % comptypes)
146
146
147 return caps
147 return caps
148
148
149 def checkperm(self, perm):
149 def checkperm(self, perm):
150 return self._checkperm(perm)
150 return self._checkperm(perm)
151
151
152 # This method exists mostly so that extensions like remotefilelog can
152 # This method exists mostly so that extensions like remotefilelog can
153 # disable a kludgey legacy method only over http. As of early 2018,
153 # disable a kludgey legacy method only over http. As of early 2018,
154 # there are no other known users, so with any luck we can discard this
154 # there are no other known users, so with any luck we can discard this
155 # hook if remotefilelog becomes a first-party extension.
155 # hook if remotefilelog becomes a first-party extension.
156 def iscmd(cmd):
156 def iscmd(cmd):
157 return cmd in wireproto.commands
157 return cmd in wireproto.commands
158
158
159 def handlewsgirequest(rctx, req, res, checkperm):
159 def handlewsgirequest(rctx, req, res, checkperm):
160 """Possibly process a wire protocol request.
160 """Possibly process a wire protocol request.
161
161
162 If the current request is a wire protocol request, the request is
162 If the current request is a wire protocol request, the request is
163 processed by this function.
163 processed by this function.
164
164
165 ``req`` is a ``parsedrequest`` instance.
165 ``req`` is a ``parsedrequest`` instance.
166 ``res`` is a ``wsgiresponse`` instance.
166 ``res`` is a ``wsgiresponse`` instance.
167
167
168 Returns a bool indicating if the request was serviced. If set, the caller
168 Returns a bool indicating if the request was serviced. If set, the caller
169 should stop processing the request, as a response has already been issued.
169 should stop processing the request, as a response has already been issued.
170 """
170 """
171 # Avoid cycle involving hg module.
171 # Avoid cycle involving hg module.
172 from .hgweb import common as hgwebcommon
172 from .hgweb import common as hgwebcommon
173
173
174 repo = rctx.repo
174 repo = rctx.repo
175
175
176 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
176 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
177 # string parameter. If it isn't present, this isn't a wire protocol
177 # string parameter. If it isn't present, this isn't a wire protocol
178 # request.
178 # request.
179 if 'cmd' not in req.qsparams:
179 if 'cmd' not in req.qsparams:
180 return False
180 return False
181
181
182 cmd = req.qsparams['cmd']
182 cmd = req.qsparams['cmd']
183
183
184 # The "cmd" request parameter is used by both the wire protocol and hgweb.
184 # The "cmd" request parameter is used by both the wire protocol and hgweb.
185 # While not all wire protocol commands are available for all transports,
185 # While not all wire protocol commands are available for all transports,
186 # if we see a "cmd" value that resembles a known wire protocol command, we
186 # if we see a "cmd" value that resembles a known wire protocol command, we
187 # route it to a protocol handler. This is better than routing possible
187 # route it to a protocol handler. This is better than routing possible
188 # wire protocol requests to hgweb because it prevents hgweb from using
188 # wire protocol requests to hgweb because it prevents hgweb from using
189 # known wire protocol commands and it is less confusing for machine
189 # known wire protocol commands and it is less confusing for machine
190 # clients.
190 # clients.
191 if not iscmd(cmd):
191 if not iscmd(cmd):
192 return False
192 return False
193
193
194 # The "cmd" query string argument is only valid on the root path of the
194 # The "cmd" query string argument is only valid on the root path of the
195 # repo. e.g. ``/?cmd=foo``, ``/repo?cmd=foo``. URL paths within the repo
195 # repo. e.g. ``/?cmd=foo``, ``/repo?cmd=foo``. URL paths within the repo
196 # like ``/blah?cmd=foo`` are not allowed. So don't recognize the request
196 # like ``/blah?cmd=foo`` are not allowed. So don't recognize the request
197 # in this case. We send an HTTP 404 for backwards compatibility reasons.
197 # in this case. We send an HTTP 404 for backwards compatibility reasons.
198 if req.dispatchpath:
198 if req.dispatchpath:
199 res.status = hgwebcommon.statusmessage(404)
199 res.status = hgwebcommon.statusmessage(404)
200 res.headers['Content-Type'] = HGTYPE
200 res.headers['Content-Type'] = HGTYPE
201 # TODO This is not a good response to issue for this request. This
201 # TODO This is not a good response to issue for this request. This
202 # is mostly for BC for now.
202 # is mostly for BC for now.
203 res.setbodybytes('0\n%s\n' % b'Not Found')
203 res.setbodybytes('0\n%s\n' % b'Not Found')
204 return True
204 return True
205
205
206 proto = httpv1protocolhandler(req, repo.ui,
206 proto = httpv1protocolhandler(req, repo.ui,
207 lambda perm: checkperm(rctx, req, perm))
207 lambda perm: checkperm(rctx, req, perm))
208
208
209 # The permissions checker should be the only thing that can raise an
209 # The permissions checker should be the only thing that can raise an
210 # ErrorResponse. It is kind of a layer violation to catch an hgweb
210 # ErrorResponse. It is kind of a layer violation to catch an hgweb
211 # exception here. So consider refactoring into a exception type that
211 # exception here. So consider refactoring into a exception type that
212 # is associated with the wire protocol.
212 # is associated with the wire protocol.
213 try:
213 try:
214 _callhttp(repo, req, res, proto, cmd)
214 _callhttp(repo, req, res, proto, cmd)
215 except hgwebcommon.ErrorResponse as e:
215 except hgwebcommon.ErrorResponse as e:
216 for k, v in e.headers:
216 for k, v in e.headers:
217 res.headers[k] = v
217 res.headers[k] = v
218 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
218 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
219 # TODO This response body assumes the failed command was
219 # TODO This response body assumes the failed command was
220 # "unbundle." That assumption is not always valid.
220 # "unbundle." That assumption is not always valid.
221 res.setbodybytes('0\n%s\n' % pycompat.bytestr(e))
221 res.setbodybytes('0\n%s\n' % pycompat.bytestr(e))
222
222
223 return True
223 return True
224
224
225 def handlewsgiapirequest(rctx, req, res, checkperm):
225 def handlewsgiapirequest(rctx, req, res, checkperm):
226 """Handle requests to /api/*."""
226 """Handle requests to /api/*."""
227 assert req.dispatchparts[0] == b'api'
227 assert req.dispatchparts[0] == b'api'
228
228
229 repo = rctx.repo
229 repo = rctx.repo
230
230
231 # This whole URL space is experimental for now. But we want to
231 # This whole URL space is experimental for now. But we want to
232 # reserve the URL space. So, 404 all URLs if the feature isn't enabled.
232 # reserve the URL space. So, 404 all URLs if the feature isn't enabled.
233 if not repo.ui.configbool('experimental', 'web.apiserver'):
233 if not repo.ui.configbool('experimental', 'web.apiserver'):
234 res.status = b'404 Not Found'
234 res.status = b'404 Not Found'
235 res.headers[b'Content-Type'] = b'text/plain'
235 res.headers[b'Content-Type'] = b'text/plain'
236 res.setbodybytes(_('Experimental API server endpoint not enabled'))
236 res.setbodybytes(_('Experimental API server endpoint not enabled'))
237 return
237 return
238
238
239 # The URL space is /api/<protocol>/*. The structure of URLs under varies
239 # The URL space is /api/<protocol>/*. The structure of URLs under varies
240 # by <protocol>.
240 # by <protocol>.
241
241
242 # Registered APIs are made available via config options of the name of
242 # Registered APIs are made available via config options of the name of
243 # the protocol.
243 # the protocol.
244 availableapis = set()
244 availableapis = set()
245 for k, v in API_HANDLERS.items():
245 for k, v in API_HANDLERS.items():
246 section, option = v['config']
246 section, option = v['config']
247 if repo.ui.configbool(section, option):
247 if repo.ui.configbool(section, option):
248 availableapis.add(k)
248 availableapis.add(k)
249
249
250 # Requests to /api/ list available APIs.
250 # Requests to /api/ list available APIs.
251 if req.dispatchparts == [b'api']:
251 if req.dispatchparts == [b'api']:
252 res.status = b'200 OK'
252 res.status = b'200 OK'
253 res.headers[b'Content-Type'] = b'text/plain'
253 res.headers[b'Content-Type'] = b'text/plain'
254 lines = [_('APIs can be accessed at /api/<name>, where <name> can be '
254 lines = [_('APIs can be accessed at /api/<name>, where <name> can be '
255 'one of the following:\n')]
255 'one of the following:\n')]
256 if availableapis:
256 if availableapis:
257 lines.extend(sorted(availableapis))
257 lines.extend(sorted(availableapis))
258 else:
258 else:
259 lines.append(_('(no available APIs)\n'))
259 lines.append(_('(no available APIs)\n'))
260 res.setbodybytes(b'\n'.join(lines))
260 res.setbodybytes(b'\n'.join(lines))
261 return
261 return
262
262
263 proto = req.dispatchparts[1]
263 proto = req.dispatchparts[1]
264
264
265 if proto not in API_HANDLERS:
265 if proto not in API_HANDLERS:
266 res.status = b'404 Not Found'
266 res.status = b'404 Not Found'
267 res.headers[b'Content-Type'] = b'text/plain'
267 res.headers[b'Content-Type'] = b'text/plain'
268 res.setbodybytes(_('Unknown API: %s\nKnown APIs: %s') % (
268 res.setbodybytes(_('Unknown API: %s\nKnown APIs: %s') % (
269 proto, b', '.join(sorted(availableapis))))
269 proto, b', '.join(sorted(availableapis))))
270 return
270 return
271
271
272 if proto not in availableapis:
272 if proto not in availableapis:
273 res.status = b'404 Not Found'
273 res.status = b'404 Not Found'
274 res.headers[b'Content-Type'] = b'text/plain'
274 res.headers[b'Content-Type'] = b'text/plain'
275 res.setbodybytes(_('API %s not enabled\n') % proto)
275 res.setbodybytes(_('API %s not enabled\n') % proto)
276 return
276 return
277
277
278 API_HANDLERS[proto]['handler'](rctx, req, res, checkperm,
278 API_HANDLERS[proto]['handler'](rctx, req, res, checkperm,
279 req.dispatchparts[2:])
279 req.dispatchparts[2:])
280
280
281 def _handlehttpv2request(rctx, req, res, checkperm, urlparts):
281 def _handlehttpv2request(rctx, req, res, checkperm, urlparts):
282 from .hgweb import common as hgwebcommon
282 from .hgweb import common as hgwebcommon
283
283
284 # URL space looks like: <permissions>/<command>, where <permission> can
284 # URL space looks like: <permissions>/<command>, where <permission> can
285 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
285 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
286
286
287 # Root URL does nothing meaningful... yet.
287 # Root URL does nothing meaningful... yet.
288 if not urlparts:
288 if not urlparts:
289 res.status = b'200 OK'
289 res.status = b'200 OK'
290 res.headers[b'Content-Type'] = b'text/plain'
290 res.headers[b'Content-Type'] = b'text/plain'
291 res.setbodybytes(_('HTTP version 2 API handler'))
291 res.setbodybytes(_('HTTP version 2 API handler'))
292 return
292 return
293
293
294 if len(urlparts) == 1:
294 if len(urlparts) == 1:
295 res.status = b'404 Not Found'
295 res.status = b'404 Not Found'
296 res.headers[b'Content-Type'] = b'text/plain'
296 res.headers[b'Content-Type'] = b'text/plain'
297 res.setbodybytes(_('do not know how to process %s\n') %
297 res.setbodybytes(_('do not know how to process %s\n') %
298 req.dispatchpath)
298 req.dispatchpath)
299 return
299 return
300
300
301 permission, command = urlparts[0:2]
301 permission, command = urlparts[0:2]
302
302
303 if permission not in (b'ro', b'rw'):
303 if permission not in (b'ro', b'rw'):
304 res.status = b'404 Not Found'
304 res.status = b'404 Not Found'
305 res.headers[b'Content-Type'] = b'text/plain'
305 res.headers[b'Content-Type'] = b'text/plain'
306 res.setbodybytes(_('unknown permission: %s') % permission)
306 res.setbodybytes(_('unknown permission: %s') % permission)
307 return
307 return
308
308
309 if req.method != 'POST':
309 if req.method != 'POST':
310 res.status = b'405 Method Not Allowed'
310 res.status = b'405 Method Not Allowed'
311 res.headers[b'Allow'] = b'POST'
311 res.headers[b'Allow'] = b'POST'
312 res.setbodybytes(_('commands require POST requests'))
312 res.setbodybytes(_('commands require POST requests'))
313 return
313 return
314
314
315 # At some point we'll want to use our own API instead of recycling the
315 # At some point we'll want to use our own API instead of recycling the
316 # behavior of version 1 of the wire protocol...
316 # behavior of version 1 of the wire protocol...
317 # TODO return reasonable responses - not responses that overload the
317 # TODO return reasonable responses - not responses that overload the
318 # HTTP status line message for error reporting.
318 # HTTP status line message for error reporting.
319 try:
319 try:
320 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
320 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
321 except hgwebcommon.ErrorResponse as e:
321 except hgwebcommon.ErrorResponse as e:
322 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
322 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
323 for k, v in e.headers:
323 for k, v in e.headers:
324 res.headers[k] = v
324 res.headers[k] = v
325 res.setbodybytes('permission denied')
325 res.setbodybytes('permission denied')
326 return
326 return
327
327
328 # We have a special endpoint to reflect the request back at the client.
328 # We have a special endpoint to reflect the request back at the client.
329 if command == b'debugreflect':
329 if command == b'debugreflect':
330 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
330 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
331 return
331 return
332
332
333 # Extra commands that we handle that aren't really wire protocol
333 # Extra commands that we handle that aren't really wire protocol
334 # commands. Think extra hard before making this hackery available to
334 # commands. Think extra hard before making this hackery available to
335 # extension.
335 # extension.
336 extracommands = {'multirequest'}
336 extracommands = {'multirequest'}
337
337
338 if command not in wireproto.commands and command not in extracommands:
338 if command not in wireproto.commandsv2 and command not in extracommands:
339 res.status = b'404 Not Found'
339 res.status = b'404 Not Found'
340 res.headers[b'Content-Type'] = b'text/plain'
340 res.headers[b'Content-Type'] = b'text/plain'
341 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
341 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
342 return
342 return
343
343
344 repo = rctx.repo
344 repo = rctx.repo
345 ui = repo.ui
345 ui = repo.ui
346
346
347 proto = httpv2protocolhandler(req, ui)
347 proto = httpv2protocolhandler(req, ui)
348
348
349 if (not wireproto.commands.commandavailable(command, proto)
349 if (not wireproto.commandsv2.commandavailable(command, proto)
350 and command not in extracommands):
350 and command not in extracommands):
351 res.status = b'404 Not Found'
351 res.status = b'404 Not Found'
352 res.headers[b'Content-Type'] = b'text/plain'
352 res.headers[b'Content-Type'] = b'text/plain'
353 res.setbodybytes(_('invalid wire protocol command: %s') % command)
353 res.setbodybytes(_('invalid wire protocol command: %s') % command)
354 return
354 return
355
355
356 # TODO consider cases where proxies may add additional Accept headers.
356 # TODO consider cases where proxies may add additional Accept headers.
357 if req.headers.get(b'Accept') != FRAMINGTYPE:
357 if req.headers.get(b'Accept') != FRAMINGTYPE:
358 res.status = b'406 Not Acceptable'
358 res.status = b'406 Not Acceptable'
359 res.headers[b'Content-Type'] = b'text/plain'
359 res.headers[b'Content-Type'] = b'text/plain'
360 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
360 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
361 % FRAMINGTYPE)
361 % FRAMINGTYPE)
362 return
362 return
363
363
364 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
364 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
365 res.status = b'415 Unsupported Media Type'
365 res.status = b'415 Unsupported Media Type'
366 # TODO we should send a response with appropriate media type,
366 # TODO we should send a response with appropriate media type,
367 # since client does Accept it.
367 # since client does Accept it.
368 res.headers[b'Content-Type'] = b'text/plain'
368 res.headers[b'Content-Type'] = b'text/plain'
369 res.setbodybytes(_('client MUST send Content-Type header with '
369 res.setbodybytes(_('client MUST send Content-Type header with '
370 'value: %s\n') % FRAMINGTYPE)
370 'value: %s\n') % FRAMINGTYPE)
371 return
371 return
372
372
373 _processhttpv2request(ui, repo, req, res, permission, command, proto)
373 _processhttpv2request(ui, repo, req, res, permission, command, proto)
374
374
375 def _processhttpv2reflectrequest(ui, repo, req, res):
375 def _processhttpv2reflectrequest(ui, repo, req, res):
376 """Reads unified frame protocol request and dumps out state to client.
376 """Reads unified frame protocol request and dumps out state to client.
377
377
378 This special endpoint can be used to help debug the wire protocol.
378 This special endpoint can be used to help debug the wire protocol.
379
379
380 Instead of routing the request through the normal dispatch mechanism,
380 Instead of routing the request through the normal dispatch mechanism,
381 we instead read all frames, decode them, and feed them into our state
381 we instead read all frames, decode them, and feed them into our state
382 tracker. We then dump the log of all that activity back out to the
382 tracker. We then dump the log of all that activity back out to the
383 client.
383 client.
384 """
384 """
385 import json
385 import json
386
386
387 # Reflection APIs have a history of being abused, accidentally disclosing
387 # Reflection APIs have a history of being abused, accidentally disclosing
388 # sensitive data, etc. So we have a config knob.
388 # sensitive data, etc. So we have a config knob.
389 if not ui.configbool('experimental', 'web.api.debugreflect'):
389 if not ui.configbool('experimental', 'web.api.debugreflect'):
390 res.status = b'404 Not Found'
390 res.status = b'404 Not Found'
391 res.headers[b'Content-Type'] = b'text/plain'
391 res.headers[b'Content-Type'] = b'text/plain'
392 res.setbodybytes(_('debugreflect service not available'))
392 res.setbodybytes(_('debugreflect service not available'))
393 return
393 return
394
394
395 # We assume we have a unified framing protocol request body.
395 # We assume we have a unified framing protocol request body.
396
396
397 reactor = wireprotoframing.serverreactor()
397 reactor = wireprotoframing.serverreactor()
398 states = []
398 states = []
399
399
400 while True:
400 while True:
401 frame = wireprotoframing.readframe(req.bodyfh)
401 frame = wireprotoframing.readframe(req.bodyfh)
402
402
403 if not frame:
403 if not frame:
404 states.append(b'received: <no frame>')
404 states.append(b'received: <no frame>')
405 break
405 break
406
406
407 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
407 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
408 frame.requestid,
408 frame.requestid,
409 frame.payload))
409 frame.payload))
410
410
411 action, meta = reactor.onframerecv(frame)
411 action, meta = reactor.onframerecv(frame)
412 states.append(json.dumps((action, meta), sort_keys=True,
412 states.append(json.dumps((action, meta), sort_keys=True,
413 separators=(', ', ': ')))
413 separators=(', ', ': ')))
414
414
415 action, meta = reactor.oninputeof()
415 action, meta = reactor.oninputeof()
416 meta['action'] = action
416 meta['action'] = action
417 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
417 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
418
418
419 res.status = b'200 OK'
419 res.status = b'200 OK'
420 res.headers[b'Content-Type'] = b'text/plain'
420 res.headers[b'Content-Type'] = b'text/plain'
421 res.setbodybytes(b'\n'.join(states))
421 res.setbodybytes(b'\n'.join(states))
422
422
423 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
423 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
424 """Post-validation handler for HTTPv2 requests.
424 """Post-validation handler for HTTPv2 requests.
425
425
426 Called when the HTTP request contains unified frame-based protocol
426 Called when the HTTP request contains unified frame-based protocol
427 frames for evaluation.
427 frames for evaluation.
428 """
428 """
429 # TODO Some HTTP clients are full duplex and can receive data before
429 # TODO Some HTTP clients are full duplex and can receive data before
430 # the entire request is transmitted. Figure out a way to indicate support
430 # the entire request is transmitted. Figure out a way to indicate support
431 # for that so we can opt into full duplex mode.
431 # for that so we can opt into full duplex mode.
432 reactor = wireprotoframing.serverreactor(deferoutput=True)
432 reactor = wireprotoframing.serverreactor(deferoutput=True)
433 seencommand = False
433 seencommand = False
434
434
435 outstream = reactor.makeoutputstream()
435 outstream = reactor.makeoutputstream()
436
436
437 while True:
437 while True:
438 frame = wireprotoframing.readframe(req.bodyfh)
438 frame = wireprotoframing.readframe(req.bodyfh)
439 if not frame:
439 if not frame:
440 break
440 break
441
441
442 action, meta = reactor.onframerecv(frame)
442 action, meta = reactor.onframerecv(frame)
443
443
444 if action == 'wantframe':
444 if action == 'wantframe':
445 # Need more data before we can do anything.
445 # Need more data before we can do anything.
446 continue
446 continue
447 elif action == 'runcommand':
447 elif action == 'runcommand':
448 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
448 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
449 reqcommand, reactor, outstream,
449 reqcommand, reactor, outstream,
450 meta, issubsequent=seencommand)
450 meta, issubsequent=seencommand)
451
451
452 if sentoutput:
452 if sentoutput:
453 return
453 return
454
454
455 seencommand = True
455 seencommand = True
456
456
457 elif action == 'error':
457 elif action == 'error':
458 # TODO define proper error mechanism.
458 # TODO define proper error mechanism.
459 res.status = b'200 OK'
459 res.status = b'200 OK'
460 res.headers[b'Content-Type'] = b'text/plain'
460 res.headers[b'Content-Type'] = b'text/plain'
461 res.setbodybytes(meta['message'] + b'\n')
461 res.setbodybytes(meta['message'] + b'\n')
462 return
462 return
463 else:
463 else:
464 raise error.ProgrammingError(
464 raise error.ProgrammingError(
465 'unhandled action from frame processor: %s' % action)
465 'unhandled action from frame processor: %s' % action)
466
466
467 action, meta = reactor.oninputeof()
467 action, meta = reactor.oninputeof()
468 if action == 'sendframes':
468 if action == 'sendframes':
469 # We assume we haven't started sending the response yet. If we're
469 # We assume we haven't started sending the response yet. If we're
470 # wrong, the response type will raise an exception.
470 # wrong, the response type will raise an exception.
471 res.status = b'200 OK'
471 res.status = b'200 OK'
472 res.headers[b'Content-Type'] = FRAMINGTYPE
472 res.headers[b'Content-Type'] = FRAMINGTYPE
473 res.setbodygen(meta['framegen'])
473 res.setbodygen(meta['framegen'])
474 elif action == 'noop':
474 elif action == 'noop':
475 pass
475 pass
476 else:
476 else:
477 raise error.ProgrammingError('unhandled action from frame processor: %s'
477 raise error.ProgrammingError('unhandled action from frame processor: %s'
478 % action)
478 % action)
479
479
480 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
480 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
481 outstream, command, issubsequent):
481 outstream, command, issubsequent):
482 """Dispatch a wire protocol command made from HTTPv2 requests.
482 """Dispatch a wire protocol command made from HTTPv2 requests.
483
483
484 The authenticated permission (``authedperm``) along with the original
484 The authenticated permission (``authedperm``) along with the original
485 command from the URL (``reqcommand``) are passed in.
485 command from the URL (``reqcommand``) are passed in.
486 """
486 """
487 # We already validated that the session has permissions to perform the
487 # We already validated that the session has permissions to perform the
488 # actions in ``authedperm``. In the unified frame protocol, the canonical
488 # actions in ``authedperm``. In the unified frame protocol, the canonical
489 # command to run is expressed in a frame. However, the URL also requested
489 # command to run is expressed in a frame. However, the URL also requested
490 # to run a specific command. We need to be careful that the command we
490 # to run a specific command. We need to be careful that the command we
491 # run doesn't have permissions requirements greater than what was granted
491 # run doesn't have permissions requirements greater than what was granted
492 # by ``authedperm``.
492 # by ``authedperm``.
493 #
493 #
494 # Our rule for this is we only allow one command per HTTP request and
494 # Our rule for this is we only allow one command per HTTP request and
495 # that command must match the command in the URL. However, we make
495 # that command must match the command in the URL. However, we make
496 # an exception for the ``multirequest`` URL. This URL is allowed to
496 # an exception for the ``multirequest`` URL. This URL is allowed to
497 # execute multiple commands. We double check permissions of each command
497 # execute multiple commands. We double check permissions of each command
498 # as it is invoked to ensure there is no privilege escalation.
498 # as it is invoked to ensure there is no privilege escalation.
499 # TODO consider allowing multiple commands to regular command URLs
499 # TODO consider allowing multiple commands to regular command URLs
500 # iff each command is the same.
500 # iff each command is the same.
501
501
502 proto = httpv2protocolhandler(req, ui, args=command['args'])
502 proto = httpv2protocolhandler(req, ui, args=command['args'])
503
503
504 if reqcommand == b'multirequest':
504 if reqcommand == b'multirequest':
505 if not wireproto.commands.commandavailable(command['command'], proto):
505 if not wireproto.commandsv2.commandavailable(command['command'], proto):
506 # TODO proper error mechanism
506 # TODO proper error mechanism
507 res.status = b'200 OK'
507 res.status = b'200 OK'
508 res.headers[b'Content-Type'] = b'text/plain'
508 res.headers[b'Content-Type'] = b'text/plain'
509 res.setbodybytes(_('wire protocol command not available: %s') %
509 res.setbodybytes(_('wire protocol command not available: %s') %
510 command['command'])
510 command['command'])
511 return True
511 return True
512
512
513 # TODO don't use assert here, since it may be elided by -O.
513 # TODO don't use assert here, since it may be elided by -O.
514 assert authedperm in (b'ro', b'rw')
514 assert authedperm in (b'ro', b'rw')
515 wirecommand = wireproto.commands[command['command']]
515 wirecommand = wireproto.commandsv2[command['command']]
516 assert wirecommand.permission in ('push', 'pull')
516 assert wirecommand.permission in ('push', 'pull')
517
517
518 if authedperm == b'ro' and wirecommand.permission != 'pull':
518 if authedperm == b'ro' and wirecommand.permission != 'pull':
519 # TODO proper error mechanism
519 # TODO proper error mechanism
520 res.status = b'403 Forbidden'
520 res.status = b'403 Forbidden'
521 res.headers[b'Content-Type'] = b'text/plain'
521 res.headers[b'Content-Type'] = b'text/plain'
522 res.setbodybytes(_('insufficient permissions to execute '
522 res.setbodybytes(_('insufficient permissions to execute '
523 'command: %s') % command['command'])
523 'command: %s') % command['command'])
524 return True
524 return True
525
525
526 # TODO should we also call checkperm() here? Maybe not if we're going
526 # TODO should we also call checkperm() here? Maybe not if we're going
527 # to overhaul that API. The granted scope from the URL check should
527 # to overhaul that API. The granted scope from the URL check should
528 # be good enough.
528 # be good enough.
529
529
530 else:
530 else:
531 # Don't allow multiple commands outside of ``multirequest`` URL.
531 # Don't allow multiple commands outside of ``multirequest`` URL.
532 if issubsequent:
532 if issubsequent:
533 # TODO proper error mechanism
533 # TODO proper error mechanism
534 res.status = b'200 OK'
534 res.status = b'200 OK'
535 res.headers[b'Content-Type'] = b'text/plain'
535 res.headers[b'Content-Type'] = b'text/plain'
536 res.setbodybytes(_('multiple commands cannot be issued to this '
536 res.setbodybytes(_('multiple commands cannot be issued to this '
537 'URL'))
537 'URL'))
538 return True
538 return True
539
539
540 if reqcommand != command['command']:
540 if reqcommand != command['command']:
541 # TODO define proper error mechanism
541 # TODO define proper error mechanism
542 res.status = b'200 OK'
542 res.status = b'200 OK'
543 res.headers[b'Content-Type'] = b'text/plain'
543 res.headers[b'Content-Type'] = b'text/plain'
544 res.setbodybytes(_('command in frame must match command in URL'))
544 res.setbodybytes(_('command in frame must match command in URL'))
545 return True
545 return True
546
546
547 rsp = wireproto.dispatch(repo, proto, command['command'])
547 rsp = wireproto.dispatch(repo, proto, command['command'])
548
548
549 res.status = b'200 OK'
549 res.status = b'200 OK'
550 res.headers[b'Content-Type'] = FRAMINGTYPE
550 res.headers[b'Content-Type'] = FRAMINGTYPE
551
551
552 if isinstance(rsp, wireprototypes.bytesresponse):
552 if isinstance(rsp, wireprototypes.bytesresponse):
553 action, meta = reactor.onbytesresponseready(outstream,
553 action, meta = reactor.onbytesresponseready(outstream,
554 command['requestid'],
554 command['requestid'],
555 rsp.data)
555 rsp.data)
556 else:
556 else:
557 action, meta = reactor.onapplicationerror(
557 action, meta = reactor.onapplicationerror(
558 _('unhandled response type from wire proto command'))
558 _('unhandled response type from wire proto command'))
559
559
560 if action == 'sendframes':
560 if action == 'sendframes':
561 res.setbodygen(meta['framegen'])
561 res.setbodygen(meta['framegen'])
562 return True
562 return True
563 elif action == 'noop':
563 elif action == 'noop':
564 return False
564 return False
565 else:
565 else:
566 raise error.ProgrammingError('unhandled event from reactor: %s' %
566 raise error.ProgrammingError('unhandled event from reactor: %s' %
567 action)
567 action)
568
568
569 # Maps API name to metadata so custom API can be registered.
569 # Maps API name to metadata so custom API can be registered.
570 API_HANDLERS = {
570 API_HANDLERS = {
571 HTTPV2: {
571 HTTPV2: {
572 'config': ('experimental', 'web.api.http-v2'),
572 'config': ('experimental', 'web.api.http-v2'),
573 'handler': _handlehttpv2request,
573 'handler': _handlehttpv2request,
574 },
574 },
575 }
575 }
576
576
577 class httpv2protocolhandler(wireprototypes.baseprotocolhandler):
577 class httpv2protocolhandler(wireprototypes.baseprotocolhandler):
578 def __init__(self, req, ui, args=None):
578 def __init__(self, req, ui, args=None):
579 self._req = req
579 self._req = req
580 self._ui = ui
580 self._ui = ui
581 self._args = args
581 self._args = args
582
582
583 @property
583 @property
584 def name(self):
584 def name(self):
585 return HTTPV2
585 return HTTPV2
586
586
587 def getargs(self, args):
587 def getargs(self, args):
588 data = {}
588 data = {}
589 for k in args.split():
589 for k in args.split():
590 if k == '*':
590 if k == '*':
591 raise NotImplementedError('do not support * args')
591 raise NotImplementedError('do not support * args')
592 else:
592 else:
593 data[k] = self._args[k]
593 data[k] = self._args[k]
594
594
595 return [data[k] for k in args.split()]
595 return [data[k] for k in args.split()]
596
596
597 def forwardpayload(self, fp):
597 def forwardpayload(self, fp):
598 raise NotImplementedError
598 raise NotImplementedError
599
599
600 @contextlib.contextmanager
600 @contextlib.contextmanager
601 def mayberedirectstdio(self):
601 def mayberedirectstdio(self):
602 raise NotImplementedError
602 raise NotImplementedError
603
603
604 def client(self):
604 def client(self):
605 raise NotImplementedError
605 raise NotImplementedError
606
606
607 def addcapabilities(self, repo, caps):
607 def addcapabilities(self, repo, caps):
608 return caps
608 return caps
609
609
610 def checkperm(self, perm):
610 def checkperm(self, perm):
611 raise NotImplementedError
611 raise NotImplementedError
612
612
613 def _httpresponsetype(ui, req, prefer_uncompressed):
613 def _httpresponsetype(ui, req, prefer_uncompressed):
614 """Determine the appropriate response type and compression settings.
614 """Determine the appropriate response type and compression settings.
615
615
616 Returns a tuple of (mediatype, compengine, engineopts).
616 Returns a tuple of (mediatype, compengine, engineopts).
617 """
617 """
618 # Determine the response media type and compression engine based
618 # Determine the response media type and compression engine based
619 # on the request parameters.
619 # on the request parameters.
620 protocaps = decodevaluefromheaders(req, 'X-HgProto').split(' ')
620 protocaps = decodevaluefromheaders(req, 'X-HgProto').split(' ')
621
621
622 if '0.2' in protocaps:
622 if '0.2' in protocaps:
623 # All clients are expected to support uncompressed data.
623 # All clients are expected to support uncompressed data.
624 if prefer_uncompressed:
624 if prefer_uncompressed:
625 return HGTYPE2, util._noopengine(), {}
625 return HGTYPE2, util._noopengine(), {}
626
626
627 # Default as defined by wire protocol spec.
627 # Default as defined by wire protocol spec.
628 compformats = ['zlib', 'none']
628 compformats = ['zlib', 'none']
629 for cap in protocaps:
629 for cap in protocaps:
630 if cap.startswith('comp='):
630 if cap.startswith('comp='):
631 compformats = cap[5:].split(',')
631 compformats = cap[5:].split(',')
632 break
632 break
633
633
634 # Now find an agreed upon compression format.
634 # Now find an agreed upon compression format.
635 for engine in wireproto.supportedcompengines(ui, util.SERVERROLE):
635 for engine in wireproto.supportedcompengines(ui, util.SERVERROLE):
636 if engine.wireprotosupport().name in compformats:
636 if engine.wireprotosupport().name in compformats:
637 opts = {}
637 opts = {}
638 level = ui.configint('server', '%slevel' % engine.name())
638 level = ui.configint('server', '%slevel' % engine.name())
639 if level is not None:
639 if level is not None:
640 opts['level'] = level
640 opts['level'] = level
641
641
642 return HGTYPE2, engine, opts
642 return HGTYPE2, engine, opts
643
643
644 # No mutually supported compression format. Fall back to the
644 # No mutually supported compression format. Fall back to the
645 # legacy protocol.
645 # legacy protocol.
646
646
647 # Don't allow untrusted settings because disabling compression or
647 # Don't allow untrusted settings because disabling compression or
648 # setting a very high compression level could lead to flooding
648 # setting a very high compression level could lead to flooding
649 # the server's network or CPU.
649 # the server's network or CPU.
650 opts = {'level': ui.configint('server', 'zliblevel')}
650 opts = {'level': ui.configint('server', 'zliblevel')}
651 return HGTYPE, util.compengines['zlib'], opts
651 return HGTYPE, util.compengines['zlib'], opts
652
652
653 def _callhttp(repo, req, res, proto, cmd):
653 def _callhttp(repo, req, res, proto, cmd):
654 # Avoid cycle involving hg module.
654 # Avoid cycle involving hg module.
655 from .hgweb import common as hgwebcommon
655 from .hgweb import common as hgwebcommon
656
656
657 def genversion2(gen, engine, engineopts):
657 def genversion2(gen, engine, engineopts):
658 # application/mercurial-0.2 always sends a payload header
658 # application/mercurial-0.2 always sends a payload header
659 # identifying the compression engine.
659 # identifying the compression engine.
660 name = engine.wireprotosupport().name
660 name = engine.wireprotosupport().name
661 assert 0 < len(name) < 256
661 assert 0 < len(name) < 256
662 yield struct.pack('B', len(name))
662 yield struct.pack('B', len(name))
663 yield name
663 yield name
664
664
665 for chunk in gen:
665 for chunk in gen:
666 yield chunk
666 yield chunk
667
667
668 def setresponse(code, contenttype, bodybytes=None, bodygen=None):
668 def setresponse(code, contenttype, bodybytes=None, bodygen=None):
669 if code == HTTP_OK:
669 if code == HTTP_OK:
670 res.status = '200 Script output follows'
670 res.status = '200 Script output follows'
671 else:
671 else:
672 res.status = hgwebcommon.statusmessage(code)
672 res.status = hgwebcommon.statusmessage(code)
673
673
674 res.headers['Content-Type'] = contenttype
674 res.headers['Content-Type'] = contenttype
675
675
676 if bodybytes is not None:
676 if bodybytes is not None:
677 res.setbodybytes(bodybytes)
677 res.setbodybytes(bodybytes)
678 if bodygen is not None:
678 if bodygen is not None:
679 res.setbodygen(bodygen)
679 res.setbodygen(bodygen)
680
680
681 if not wireproto.commands.commandavailable(cmd, proto):
681 if not wireproto.commands.commandavailable(cmd, proto):
682 setresponse(HTTP_OK, HGERRTYPE,
682 setresponse(HTTP_OK, HGERRTYPE,
683 _('requested wire protocol command is not available over '
683 _('requested wire protocol command is not available over '
684 'HTTP'))
684 'HTTP'))
685 return
685 return
686
686
687 proto.checkperm(wireproto.commands[cmd].permission)
687 proto.checkperm(wireproto.commands[cmd].permission)
688
688
689 rsp = wireproto.dispatch(repo, proto, cmd)
689 rsp = wireproto.dispatch(repo, proto, cmd)
690
690
691 if isinstance(rsp, bytes):
691 if isinstance(rsp, bytes):
692 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
692 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
693 elif isinstance(rsp, wireprototypes.bytesresponse):
693 elif isinstance(rsp, wireprototypes.bytesresponse):
694 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp.data)
694 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp.data)
695 elif isinstance(rsp, wireprototypes.streamreslegacy):
695 elif isinstance(rsp, wireprototypes.streamreslegacy):
696 setresponse(HTTP_OK, HGTYPE, bodygen=rsp.gen)
696 setresponse(HTTP_OK, HGTYPE, bodygen=rsp.gen)
697 elif isinstance(rsp, wireprototypes.streamres):
697 elif isinstance(rsp, wireprototypes.streamres):
698 gen = rsp.gen
698 gen = rsp.gen
699
699
700 # This code for compression should not be streamres specific. It
700 # This code for compression should not be streamres specific. It
701 # is here because we only compress streamres at the moment.
701 # is here because we only compress streamres at the moment.
702 mediatype, engine, engineopts = _httpresponsetype(
702 mediatype, engine, engineopts = _httpresponsetype(
703 repo.ui, req, rsp.prefer_uncompressed)
703 repo.ui, req, rsp.prefer_uncompressed)
704 gen = engine.compressstream(gen, engineopts)
704 gen = engine.compressstream(gen, engineopts)
705
705
706 if mediatype == HGTYPE2:
706 if mediatype == HGTYPE2:
707 gen = genversion2(gen, engine, engineopts)
707 gen = genversion2(gen, engine, engineopts)
708
708
709 setresponse(HTTP_OK, mediatype, bodygen=gen)
709 setresponse(HTTP_OK, mediatype, bodygen=gen)
710 elif isinstance(rsp, wireprototypes.pushres):
710 elif isinstance(rsp, wireprototypes.pushres):
711 rsp = '%d\n%s' % (rsp.res, rsp.output)
711 rsp = '%d\n%s' % (rsp.res, rsp.output)
712 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
712 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
713 elif isinstance(rsp, wireprototypes.pusherr):
713 elif isinstance(rsp, wireprototypes.pusherr):
714 rsp = '0\n%s\n' % rsp.res
714 rsp = '0\n%s\n' % rsp.res
715 res.drain = True
715 res.drain = True
716 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
716 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
717 elif isinstance(rsp, wireprototypes.ooberror):
717 elif isinstance(rsp, wireprototypes.ooberror):
718 setresponse(HTTP_OK, HGERRTYPE, bodybytes=rsp.message)
718 setresponse(HTTP_OK, HGERRTYPE, bodybytes=rsp.message)
719 else:
719 else:
720 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
720 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
721
721
722 def _sshv1respondbytes(fout, value):
722 def _sshv1respondbytes(fout, value):
723 """Send a bytes response for protocol version 1."""
723 """Send a bytes response for protocol version 1."""
724 fout.write('%d\n' % len(value))
724 fout.write('%d\n' % len(value))
725 fout.write(value)
725 fout.write(value)
726 fout.flush()
726 fout.flush()
727
727
728 def _sshv1respondstream(fout, source):
728 def _sshv1respondstream(fout, source):
729 write = fout.write
729 write = fout.write
730 for chunk in source.gen:
730 for chunk in source.gen:
731 write(chunk)
731 write(chunk)
732 fout.flush()
732 fout.flush()
733
733
734 def _sshv1respondooberror(fout, ferr, rsp):
734 def _sshv1respondooberror(fout, ferr, rsp):
735 ferr.write(b'%s\n-\n' % rsp)
735 ferr.write(b'%s\n-\n' % rsp)
736 ferr.flush()
736 ferr.flush()
737 fout.write(b'\n')
737 fout.write(b'\n')
738 fout.flush()
738 fout.flush()
739
739
740 class sshv1protocolhandler(wireprototypes.baseprotocolhandler):
740 class sshv1protocolhandler(wireprototypes.baseprotocolhandler):
741 """Handler for requests services via version 1 of SSH protocol."""
741 """Handler for requests services via version 1 of SSH protocol."""
742 def __init__(self, ui, fin, fout):
742 def __init__(self, ui, fin, fout):
743 self._ui = ui
743 self._ui = ui
744 self._fin = fin
744 self._fin = fin
745 self._fout = fout
745 self._fout = fout
746
746
747 @property
747 @property
748 def name(self):
748 def name(self):
749 return wireprototypes.SSHV1
749 return wireprototypes.SSHV1
750
750
751 def getargs(self, args):
751 def getargs(self, args):
752 data = {}
752 data = {}
753 keys = args.split()
753 keys = args.split()
754 for n in xrange(len(keys)):
754 for n in xrange(len(keys)):
755 argline = self._fin.readline()[:-1]
755 argline = self._fin.readline()[:-1]
756 arg, l = argline.split()
756 arg, l = argline.split()
757 if arg not in keys:
757 if arg not in keys:
758 raise error.Abort(_("unexpected parameter %r") % arg)
758 raise error.Abort(_("unexpected parameter %r") % arg)
759 if arg == '*':
759 if arg == '*':
760 star = {}
760 star = {}
761 for k in xrange(int(l)):
761 for k in xrange(int(l)):
762 argline = self._fin.readline()[:-1]
762 argline = self._fin.readline()[:-1]
763 arg, l = argline.split()
763 arg, l = argline.split()
764 val = self._fin.read(int(l))
764 val = self._fin.read(int(l))
765 star[arg] = val
765 star[arg] = val
766 data['*'] = star
766 data['*'] = star
767 else:
767 else:
768 val = self._fin.read(int(l))
768 val = self._fin.read(int(l))
769 data[arg] = val
769 data[arg] = val
770 return [data[k] for k in keys]
770 return [data[k] for k in keys]
771
771
772 def forwardpayload(self, fpout):
772 def forwardpayload(self, fpout):
773 # We initially send an empty response. This tells the client it is
773 # We initially send an empty response. This tells the client it is
774 # OK to start sending data. If a client sees any other response, it
774 # OK to start sending data. If a client sees any other response, it
775 # interprets it as an error.
775 # interprets it as an error.
776 _sshv1respondbytes(self._fout, b'')
776 _sshv1respondbytes(self._fout, b'')
777
777
778 # The file is in the form:
778 # The file is in the form:
779 #
779 #
780 # <chunk size>\n<chunk>
780 # <chunk size>\n<chunk>
781 # ...
781 # ...
782 # 0\n
782 # 0\n
783 count = int(self._fin.readline())
783 count = int(self._fin.readline())
784 while count:
784 while count:
785 fpout.write(self._fin.read(count))
785 fpout.write(self._fin.read(count))
786 count = int(self._fin.readline())
786 count = int(self._fin.readline())
787
787
788 @contextlib.contextmanager
788 @contextlib.contextmanager
789 def mayberedirectstdio(self):
789 def mayberedirectstdio(self):
790 yield None
790 yield None
791
791
792 def client(self):
792 def client(self):
793 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
793 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
794 return 'remote:ssh:' + client
794 return 'remote:ssh:' + client
795
795
796 def addcapabilities(self, repo, caps):
796 def addcapabilities(self, repo, caps):
797 caps.append(b'batch')
797 caps.append(b'batch')
798 return caps
798 return caps
799
799
800 def checkperm(self, perm):
800 def checkperm(self, perm):
801 pass
801 pass
802
802
803 class sshv2protocolhandler(sshv1protocolhandler):
803 class sshv2protocolhandler(sshv1protocolhandler):
804 """Protocol handler for version 2 of the SSH protocol."""
804 """Protocol handler for version 2 of the SSH protocol."""
805
805
806 @property
806 @property
807 def name(self):
807 def name(self):
808 return wireprototypes.SSHV2
808 return wireprototypes.SSHV2
809
809
810 def _runsshserver(ui, repo, fin, fout, ev):
810 def _runsshserver(ui, repo, fin, fout, ev):
811 # This function operates like a state machine of sorts. The following
811 # This function operates like a state machine of sorts. The following
812 # states are defined:
812 # states are defined:
813 #
813 #
814 # protov1-serving
814 # protov1-serving
815 # Server is in protocol version 1 serving mode. Commands arrive on
815 # Server is in protocol version 1 serving mode. Commands arrive on
816 # new lines. These commands are processed in this state, one command
816 # new lines. These commands are processed in this state, one command
817 # after the other.
817 # after the other.
818 #
818 #
819 # protov2-serving
819 # protov2-serving
820 # Server is in protocol version 2 serving mode.
820 # Server is in protocol version 2 serving mode.
821 #
821 #
822 # upgrade-initial
822 # upgrade-initial
823 # The server is going to process an upgrade request.
823 # The server is going to process an upgrade request.
824 #
824 #
825 # upgrade-v2-filter-legacy-handshake
825 # upgrade-v2-filter-legacy-handshake
826 # The protocol is being upgraded to version 2. The server is expecting
826 # The protocol is being upgraded to version 2. The server is expecting
827 # the legacy handshake from version 1.
827 # the legacy handshake from version 1.
828 #
828 #
829 # upgrade-v2-finish
829 # upgrade-v2-finish
830 # The upgrade to version 2 of the protocol is imminent.
830 # The upgrade to version 2 of the protocol is imminent.
831 #
831 #
832 # shutdown
832 # shutdown
833 # The server is shutting down, possibly in reaction to a client event.
833 # The server is shutting down, possibly in reaction to a client event.
834 #
834 #
835 # And here are their transitions:
835 # And here are their transitions:
836 #
836 #
837 # protov1-serving -> shutdown
837 # protov1-serving -> shutdown
838 # When server receives an empty request or encounters another
838 # When server receives an empty request or encounters another
839 # error.
839 # error.
840 #
840 #
841 # protov1-serving -> upgrade-initial
841 # protov1-serving -> upgrade-initial
842 # An upgrade request line was seen.
842 # An upgrade request line was seen.
843 #
843 #
844 # upgrade-initial -> upgrade-v2-filter-legacy-handshake
844 # upgrade-initial -> upgrade-v2-filter-legacy-handshake
845 # Upgrade to version 2 in progress. Server is expecting to
845 # Upgrade to version 2 in progress. Server is expecting to
846 # process a legacy handshake.
846 # process a legacy handshake.
847 #
847 #
848 # upgrade-v2-filter-legacy-handshake -> shutdown
848 # upgrade-v2-filter-legacy-handshake -> shutdown
849 # Client did not fulfill upgrade handshake requirements.
849 # Client did not fulfill upgrade handshake requirements.
850 #
850 #
851 # upgrade-v2-filter-legacy-handshake -> upgrade-v2-finish
851 # upgrade-v2-filter-legacy-handshake -> upgrade-v2-finish
852 # Client fulfilled version 2 upgrade requirements. Finishing that
852 # Client fulfilled version 2 upgrade requirements. Finishing that
853 # upgrade.
853 # upgrade.
854 #
854 #
855 # upgrade-v2-finish -> protov2-serving
855 # upgrade-v2-finish -> protov2-serving
856 # Protocol upgrade to version 2 complete. Server can now speak protocol
856 # Protocol upgrade to version 2 complete. Server can now speak protocol
857 # version 2.
857 # version 2.
858 #
858 #
859 # protov2-serving -> protov1-serving
859 # protov2-serving -> protov1-serving
860 # Ths happens by default since protocol version 2 is the same as
860 # Ths happens by default since protocol version 2 is the same as
861 # version 1 except for the handshake.
861 # version 1 except for the handshake.
862
862
863 state = 'protov1-serving'
863 state = 'protov1-serving'
864 proto = sshv1protocolhandler(ui, fin, fout)
864 proto = sshv1protocolhandler(ui, fin, fout)
865 protoswitched = False
865 protoswitched = False
866
866
867 while not ev.is_set():
867 while not ev.is_set():
868 if state == 'protov1-serving':
868 if state == 'protov1-serving':
869 # Commands are issued on new lines.
869 # Commands are issued on new lines.
870 request = fin.readline()[:-1]
870 request = fin.readline()[:-1]
871
871
872 # Empty lines signal to terminate the connection.
872 # Empty lines signal to terminate the connection.
873 if not request:
873 if not request:
874 state = 'shutdown'
874 state = 'shutdown'
875 continue
875 continue
876
876
877 # It looks like a protocol upgrade request. Transition state to
877 # It looks like a protocol upgrade request. Transition state to
878 # handle it.
878 # handle it.
879 if request.startswith(b'upgrade '):
879 if request.startswith(b'upgrade '):
880 if protoswitched:
880 if protoswitched:
881 _sshv1respondooberror(fout, ui.ferr,
881 _sshv1respondooberror(fout, ui.ferr,
882 b'cannot upgrade protocols multiple '
882 b'cannot upgrade protocols multiple '
883 b'times')
883 b'times')
884 state = 'shutdown'
884 state = 'shutdown'
885 continue
885 continue
886
886
887 state = 'upgrade-initial'
887 state = 'upgrade-initial'
888 continue
888 continue
889
889
890 available = wireproto.commands.commandavailable(request, proto)
890 available = wireproto.commands.commandavailable(request, proto)
891
891
892 # This command isn't available. Send an empty response and go
892 # This command isn't available. Send an empty response and go
893 # back to waiting for a new command.
893 # back to waiting for a new command.
894 if not available:
894 if not available:
895 _sshv1respondbytes(fout, b'')
895 _sshv1respondbytes(fout, b'')
896 continue
896 continue
897
897
898 rsp = wireproto.dispatch(repo, proto, request)
898 rsp = wireproto.dispatch(repo, proto, request)
899
899
900 if isinstance(rsp, bytes):
900 if isinstance(rsp, bytes):
901 _sshv1respondbytes(fout, rsp)
901 _sshv1respondbytes(fout, rsp)
902 elif isinstance(rsp, wireprototypes.bytesresponse):
902 elif isinstance(rsp, wireprototypes.bytesresponse):
903 _sshv1respondbytes(fout, rsp.data)
903 _sshv1respondbytes(fout, rsp.data)
904 elif isinstance(rsp, wireprototypes.streamres):
904 elif isinstance(rsp, wireprototypes.streamres):
905 _sshv1respondstream(fout, rsp)
905 _sshv1respondstream(fout, rsp)
906 elif isinstance(rsp, wireprototypes.streamreslegacy):
906 elif isinstance(rsp, wireprototypes.streamreslegacy):
907 _sshv1respondstream(fout, rsp)
907 _sshv1respondstream(fout, rsp)
908 elif isinstance(rsp, wireprototypes.pushres):
908 elif isinstance(rsp, wireprototypes.pushres):
909 _sshv1respondbytes(fout, b'')
909 _sshv1respondbytes(fout, b'')
910 _sshv1respondbytes(fout, b'%d' % rsp.res)
910 _sshv1respondbytes(fout, b'%d' % rsp.res)
911 elif isinstance(rsp, wireprototypes.pusherr):
911 elif isinstance(rsp, wireprototypes.pusherr):
912 _sshv1respondbytes(fout, rsp.res)
912 _sshv1respondbytes(fout, rsp.res)
913 elif isinstance(rsp, wireprototypes.ooberror):
913 elif isinstance(rsp, wireprototypes.ooberror):
914 _sshv1respondooberror(fout, ui.ferr, rsp.message)
914 _sshv1respondooberror(fout, ui.ferr, rsp.message)
915 else:
915 else:
916 raise error.ProgrammingError('unhandled response type from '
916 raise error.ProgrammingError('unhandled response type from '
917 'wire protocol command: %s' % rsp)
917 'wire protocol command: %s' % rsp)
918
918
919 # For now, protocol version 2 serving just goes back to version 1.
919 # For now, protocol version 2 serving just goes back to version 1.
920 elif state == 'protov2-serving':
920 elif state == 'protov2-serving':
921 state = 'protov1-serving'
921 state = 'protov1-serving'
922 continue
922 continue
923
923
924 elif state == 'upgrade-initial':
924 elif state == 'upgrade-initial':
925 # We should never transition into this state if we've switched
925 # We should never transition into this state if we've switched
926 # protocols.
926 # protocols.
927 assert not protoswitched
927 assert not protoswitched
928 assert proto.name == wireprototypes.SSHV1
928 assert proto.name == wireprototypes.SSHV1
929
929
930 # Expected: upgrade <token> <capabilities>
930 # Expected: upgrade <token> <capabilities>
931 # If we get something else, the request is malformed. It could be
931 # If we get something else, the request is malformed. It could be
932 # from a future client that has altered the upgrade line content.
932 # from a future client that has altered the upgrade line content.
933 # We treat this as an unknown command.
933 # We treat this as an unknown command.
934 try:
934 try:
935 token, caps = request.split(b' ')[1:]
935 token, caps = request.split(b' ')[1:]
936 except ValueError:
936 except ValueError:
937 _sshv1respondbytes(fout, b'')
937 _sshv1respondbytes(fout, b'')
938 state = 'protov1-serving'
938 state = 'protov1-serving'
939 continue
939 continue
940
940
941 # Send empty response if we don't support upgrading protocols.
941 # Send empty response if we don't support upgrading protocols.
942 if not ui.configbool('experimental', 'sshserver.support-v2'):
942 if not ui.configbool('experimental', 'sshserver.support-v2'):
943 _sshv1respondbytes(fout, b'')
943 _sshv1respondbytes(fout, b'')
944 state = 'protov1-serving'
944 state = 'protov1-serving'
945 continue
945 continue
946
946
947 try:
947 try:
948 caps = urlreq.parseqs(caps)
948 caps = urlreq.parseqs(caps)
949 except ValueError:
949 except ValueError:
950 _sshv1respondbytes(fout, b'')
950 _sshv1respondbytes(fout, b'')
951 state = 'protov1-serving'
951 state = 'protov1-serving'
952 continue
952 continue
953
953
954 # We don't see an upgrade request to protocol version 2. Ignore
954 # We don't see an upgrade request to protocol version 2. Ignore
955 # the upgrade request.
955 # the upgrade request.
956 wantedprotos = caps.get(b'proto', [b''])[0]
956 wantedprotos = caps.get(b'proto', [b''])[0]
957 if SSHV2 not in wantedprotos:
957 if SSHV2 not in wantedprotos:
958 _sshv1respondbytes(fout, b'')
958 _sshv1respondbytes(fout, b'')
959 state = 'protov1-serving'
959 state = 'protov1-serving'
960 continue
960 continue
961
961
962 # It looks like we can honor this upgrade request to protocol 2.
962 # It looks like we can honor this upgrade request to protocol 2.
963 # Filter the rest of the handshake protocol request lines.
963 # Filter the rest of the handshake protocol request lines.
964 state = 'upgrade-v2-filter-legacy-handshake'
964 state = 'upgrade-v2-filter-legacy-handshake'
965 continue
965 continue
966
966
967 elif state == 'upgrade-v2-filter-legacy-handshake':
967 elif state == 'upgrade-v2-filter-legacy-handshake':
968 # Client should have sent legacy handshake after an ``upgrade``
968 # Client should have sent legacy handshake after an ``upgrade``
969 # request. Expected lines:
969 # request. Expected lines:
970 #
970 #
971 # hello
971 # hello
972 # between
972 # between
973 # pairs 81
973 # pairs 81
974 # 0000...-0000...
974 # 0000...-0000...
975
975
976 ok = True
976 ok = True
977 for line in (b'hello', b'between', b'pairs 81'):
977 for line in (b'hello', b'between', b'pairs 81'):
978 request = fin.readline()[:-1]
978 request = fin.readline()[:-1]
979
979
980 if request != line:
980 if request != line:
981 _sshv1respondooberror(fout, ui.ferr,
981 _sshv1respondooberror(fout, ui.ferr,
982 b'malformed handshake protocol: '
982 b'malformed handshake protocol: '
983 b'missing %s' % line)
983 b'missing %s' % line)
984 ok = False
984 ok = False
985 state = 'shutdown'
985 state = 'shutdown'
986 break
986 break
987
987
988 if not ok:
988 if not ok:
989 continue
989 continue
990
990
991 request = fin.read(81)
991 request = fin.read(81)
992 if request != b'%s-%s' % (b'0' * 40, b'0' * 40):
992 if request != b'%s-%s' % (b'0' * 40, b'0' * 40):
993 _sshv1respondooberror(fout, ui.ferr,
993 _sshv1respondooberror(fout, ui.ferr,
994 b'malformed handshake protocol: '
994 b'malformed handshake protocol: '
995 b'missing between argument value')
995 b'missing between argument value')
996 state = 'shutdown'
996 state = 'shutdown'
997 continue
997 continue
998
998
999 state = 'upgrade-v2-finish'
999 state = 'upgrade-v2-finish'
1000 continue
1000 continue
1001
1001
1002 elif state == 'upgrade-v2-finish':
1002 elif state == 'upgrade-v2-finish':
1003 # Send the upgrade response.
1003 # Send the upgrade response.
1004 fout.write(b'upgraded %s %s\n' % (token, SSHV2))
1004 fout.write(b'upgraded %s %s\n' % (token, SSHV2))
1005 servercaps = wireproto.capabilities(repo, proto)
1005 servercaps = wireproto.capabilities(repo, proto)
1006 rsp = b'capabilities: %s' % servercaps.data
1006 rsp = b'capabilities: %s' % servercaps.data
1007 fout.write(b'%d\n%s\n' % (len(rsp), rsp))
1007 fout.write(b'%d\n%s\n' % (len(rsp), rsp))
1008 fout.flush()
1008 fout.flush()
1009
1009
1010 proto = sshv2protocolhandler(ui, fin, fout)
1010 proto = sshv2protocolhandler(ui, fin, fout)
1011 protoswitched = True
1011 protoswitched = True
1012
1012
1013 state = 'protov2-serving'
1013 state = 'protov2-serving'
1014 continue
1014 continue
1015
1015
1016 elif state == 'shutdown':
1016 elif state == 'shutdown':
1017 break
1017 break
1018
1018
1019 else:
1019 else:
1020 raise error.ProgrammingError('unhandled ssh server state: %s' %
1020 raise error.ProgrammingError('unhandled ssh server state: %s' %
1021 state)
1021 state)
1022
1022
1023 class sshserver(object):
1023 class sshserver(object):
1024 def __init__(self, ui, repo, logfh=None):
1024 def __init__(self, ui, repo, logfh=None):
1025 self._ui = ui
1025 self._ui = ui
1026 self._repo = repo
1026 self._repo = repo
1027 self._fin = ui.fin
1027 self._fin = ui.fin
1028 self._fout = ui.fout
1028 self._fout = ui.fout
1029
1029
1030 # Log write I/O to stdout and stderr if configured.
1030 # Log write I/O to stdout and stderr if configured.
1031 if logfh:
1031 if logfh:
1032 self._fout = util.makeloggingfileobject(
1032 self._fout = util.makeloggingfileobject(
1033 logfh, self._fout, 'o', logdata=True)
1033 logfh, self._fout, 'o', logdata=True)
1034 ui.ferr = util.makeloggingfileobject(
1034 ui.ferr = util.makeloggingfileobject(
1035 logfh, ui.ferr, 'e', logdata=True)
1035 logfh, ui.ferr, 'e', logdata=True)
1036
1036
1037 hook.redirect(True)
1037 hook.redirect(True)
1038 ui.fout = repo.ui.fout = ui.ferr
1038 ui.fout = repo.ui.fout = ui.ferr
1039
1039
1040 # Prevent insertion/deletion of CRs
1040 # Prevent insertion/deletion of CRs
1041 procutil.setbinary(self._fin)
1041 procutil.setbinary(self._fin)
1042 procutil.setbinary(self._fout)
1042 procutil.setbinary(self._fout)
1043
1043
1044 def serve_forever(self):
1044 def serve_forever(self):
1045 self.serveuntil(threading.Event())
1045 self.serveuntil(threading.Event())
1046 sys.exit(0)
1046 sys.exit(0)
1047
1047
1048 def serveuntil(self, ev):
1048 def serveuntil(self, ev):
1049 """Serve until a threading.Event is set."""
1049 """Serve until a threading.Event is set."""
1050 _runsshserver(self._ui, self._repo, self._fin, self._fout, ev)
1050 _runsshserver(self._ui, self._repo, self._fin, self._fout, ev)
@@ -1,95 +1,102 b''
1 from __future__ import absolute_import, print_function
1 from __future__ import absolute_import, print_function
2
2
3 from mercurial import (
3 from mercurial import (
4 error,
4 error,
5 pycompat,
5 pycompat,
6 ui as uimod,
6 ui as uimod,
7 util,
7 util,
8 wireproto,
8 wireproto,
9 wireprototypes,
9 wireprototypes,
10 )
10 )
11 stringio = util.stringio
11 stringio = util.stringio
12
12
13 class proto(object):
13 class proto(object):
14 def __init__(self, args):
14 def __init__(self, args):
15 self.args = args
15 self.args = args
16 self.name = 'dummyproto'
17
16 def getargs(self, spec):
18 def getargs(self, spec):
17 args = self.args
19 args = self.args
18 args.setdefault(b'*', {})
20 args.setdefault(b'*', {})
19 names = spec.split()
21 names = spec.split()
20 return [args[n] for n in names]
22 return [args[n] for n in names]
21
23
22 def checkperm(self, perm):
24 def checkperm(self, perm):
23 pass
25 pass
24
26
27 wireprototypes.TRANSPORTS['dummyproto'] = {
28 'transport': 'dummy',
29 'version': 1,
30 }
31
25 class clientpeer(wireproto.wirepeer):
32 class clientpeer(wireproto.wirepeer):
26 def __init__(self, serverrepo, ui):
33 def __init__(self, serverrepo, ui):
27 self.serverrepo = serverrepo
34 self.serverrepo = serverrepo
28 self._ui = ui
35 self._ui = ui
29
36
30 @property
37 @property
31 def ui(self):
38 def ui(self):
32 return self._ui
39 return self._ui
33
40
34 def url(self):
41 def url(self):
35 return b'test'
42 return b'test'
36
43
37 def local(self):
44 def local(self):
38 return None
45 return None
39
46
40 def peer(self):
47 def peer(self):
41 return self
48 return self
42
49
43 def canpush(self):
50 def canpush(self):
44 return True
51 return True
45
52
46 def close(self):
53 def close(self):
47 pass
54 pass
48
55
49 def capabilities(self):
56 def capabilities(self):
50 return [b'batch']
57 return [b'batch']
51
58
52 def _call(self, cmd, **args):
59 def _call(self, cmd, **args):
53 args = pycompat.byteskwargs(args)
60 args = pycompat.byteskwargs(args)
54 res = wireproto.dispatch(self.serverrepo, proto(args), cmd)
61 res = wireproto.dispatch(self.serverrepo, proto(args), cmd)
55 if isinstance(res, wireprototypes.bytesresponse):
62 if isinstance(res, wireprototypes.bytesresponse):
56 return res.data
63 return res.data
57 elif isinstance(res, bytes):
64 elif isinstance(res, bytes):
58 return res
65 return res
59 else:
66 else:
60 raise error.Abort('dummy client does not support response type')
67 raise error.Abort('dummy client does not support response type')
61
68
62 def _callstream(self, cmd, **args):
69 def _callstream(self, cmd, **args):
63 return stringio(self._call(cmd, **args))
70 return stringio(self._call(cmd, **args))
64
71
65 @wireproto.batchable
72 @wireproto.batchable
66 def greet(self, name):
73 def greet(self, name):
67 f = wireproto.future()
74 f = wireproto.future()
68 yield {b'name': mangle(name)}, f
75 yield {b'name': mangle(name)}, f
69 yield unmangle(f.value)
76 yield unmangle(f.value)
70
77
71 class serverrepo(object):
78 class serverrepo(object):
72 def greet(self, name):
79 def greet(self, name):
73 return b"Hello, " + name
80 return b"Hello, " + name
74
81
75 def filtered(self, name):
82 def filtered(self, name):
76 return self
83 return self
77
84
78 def mangle(s):
85 def mangle(s):
79 return b''.join(pycompat.bytechr(ord(c) + 1) for c in pycompat.bytestr(s))
86 return b''.join(pycompat.bytechr(ord(c) + 1) for c in pycompat.bytestr(s))
80 def unmangle(s):
87 def unmangle(s):
81 return b''.join(pycompat.bytechr(ord(c) - 1) for c in pycompat.bytestr(s))
88 return b''.join(pycompat.bytechr(ord(c) - 1) for c in pycompat.bytestr(s))
82
89
83 def greet(repo, proto, name):
90 def greet(repo, proto, name):
84 return mangle(repo.greet(unmangle(name)))
91 return mangle(repo.greet(unmangle(name)))
85
92
86 wireproto.commands[b'greet'] = (greet, b'name',)
93 wireproto.commands[b'greet'] = (greet, b'name',)
87
94
88 srv = serverrepo()
95 srv = serverrepo()
89 clt = clientpeer(srv, uimod.ui())
96 clt = clientpeer(srv, uimod.ui())
90
97
91 print(clt.greet(b"Foobar"))
98 print(clt.greet(b"Foobar"))
92 b = clt.iterbatch()
99 b = clt.iterbatch()
93 list(map(b.greet, (b'Fo, =;:<o', b'Bar')))
100 list(map(b.greet, (b'Fo, =;:<o', b'Bar')))
94 b.submit()
101 b.submit()
95 print([r for r in b.results()])
102 print([r for r in b.results()])
General Comments 0
You need to be logged in to leave comments. Login now