##// END OF EJS Templates
wireproto: move supportedcompengines out of wireproto...
Gregory Szorc -
r37801:9d818539 default
parent child Browse files
Show More
@@ -1,720 +1,671
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import os
10 import os
11 import tempfile
11 import tempfile
12
12
13 from .i18n import _
13 from .i18n import _
14 from .node import (
14 from .node import (
15 hex,
15 hex,
16 nullid,
16 nullid,
17 )
17 )
18
18
19 from . import (
19 from . import (
20 bundle2,
20 bundle2,
21 changegroup as changegroupmod,
21 changegroup as changegroupmod,
22 discovery,
22 discovery,
23 encoding,
23 encoding,
24 error,
24 error,
25 exchange,
25 exchange,
26 pushkey as pushkeymod,
26 pushkey as pushkeymod,
27 pycompat,
27 pycompat,
28 streamclone,
28 streamclone,
29 util,
29 util,
30 wireprototypes,
30 wireprototypes,
31 )
31 )
32
32
33 from .utils import (
33 from .utils import (
34 procutil,
34 procutil,
35 stringutil,
35 stringutil,
36 )
36 )
37
37
38 urlerr = util.urlerr
38 urlerr = util.urlerr
39 urlreq = util.urlreq
39 urlreq = util.urlreq
40
40
41 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
41 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
42 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
42 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
43 'IncompatibleClient')
43 'IncompatibleClient')
44 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
44 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
45
45
46 def clientcompressionsupport(proto):
46 def clientcompressionsupport(proto):
47 """Returns a list of compression methods supported by the client.
47 """Returns a list of compression methods supported by the client.
48
48
49 Returns a list of the compression methods supported by the client
49 Returns a list of the compression methods supported by the client
50 according to the protocol capabilities. If no such capability has
50 according to the protocol capabilities. If no such capability has
51 been announced, fallback to the default of zlib and uncompressed.
51 been announced, fallback to the default of zlib and uncompressed.
52 """
52 """
53 for cap in proto.getprotocaps():
53 for cap in proto.getprotocaps():
54 if cap.startswith('comp='):
54 if cap.startswith('comp='):
55 return cap[5:].split(',')
55 return cap[5:].split(',')
56 return ['zlib', 'none']
56 return ['zlib', 'none']
57
57
58 # wire protocol command can either return a string or one of these classes.
58 # wire protocol command can either return a string or one of these classes.
59
59
60 def getdispatchrepo(repo, proto, command):
60 def getdispatchrepo(repo, proto, command):
61 """Obtain the repo used for processing wire protocol commands.
61 """Obtain the repo used for processing wire protocol commands.
62
62
63 The intent of this function is to serve as a monkeypatch point for
63 The intent of this function is to serve as a monkeypatch point for
64 extensions that need commands to operate on different repo views under
64 extensions that need commands to operate on different repo views under
65 specialized circumstances.
65 specialized circumstances.
66 """
66 """
67 return repo.filtered('served')
67 return repo.filtered('served')
68
68
69 def dispatch(repo, proto, command):
69 def dispatch(repo, proto, command):
70 repo = getdispatchrepo(repo, proto, command)
70 repo = getdispatchrepo(repo, proto, command)
71
71
72 func, spec = commands[command]
72 func, spec = commands[command]
73 args = proto.getargs(spec)
73 args = proto.getargs(spec)
74
74
75 return func(repo, proto, *args)
75 return func(repo, proto, *args)
76
76
77 def options(cmd, keys, others):
77 def options(cmd, keys, others):
78 opts = {}
78 opts = {}
79 for k in keys:
79 for k in keys:
80 if k in others:
80 if k in others:
81 opts[k] = others[k]
81 opts[k] = others[k]
82 del others[k]
82 del others[k]
83 if others:
83 if others:
84 procutil.stderr.write("warning: %s ignored unexpected arguments %s\n"
84 procutil.stderr.write("warning: %s ignored unexpected arguments %s\n"
85 % (cmd, ",".join(others)))
85 % (cmd, ",".join(others)))
86 return opts
86 return opts
87
87
88 def bundle1allowed(repo, action):
88 def bundle1allowed(repo, action):
89 """Whether a bundle1 operation is allowed from the server.
89 """Whether a bundle1 operation is allowed from the server.
90
90
91 Priority is:
91 Priority is:
92
92
93 1. server.bundle1gd.<action> (if generaldelta active)
93 1. server.bundle1gd.<action> (if generaldelta active)
94 2. server.bundle1.<action>
94 2. server.bundle1.<action>
95 3. server.bundle1gd (if generaldelta active)
95 3. server.bundle1gd (if generaldelta active)
96 4. server.bundle1
96 4. server.bundle1
97 """
97 """
98 ui = repo.ui
98 ui = repo.ui
99 gd = 'generaldelta' in repo.requirements
99 gd = 'generaldelta' in repo.requirements
100
100
101 if gd:
101 if gd:
102 v = ui.configbool('server', 'bundle1gd.%s' % action)
102 v = ui.configbool('server', 'bundle1gd.%s' % action)
103 if v is not None:
103 if v is not None:
104 return v
104 return v
105
105
106 v = ui.configbool('server', 'bundle1.%s' % action)
106 v = ui.configbool('server', 'bundle1.%s' % action)
107 if v is not None:
107 if v is not None:
108 return v
108 return v
109
109
110 if gd:
110 if gd:
111 v = ui.configbool('server', 'bundle1gd')
111 v = ui.configbool('server', 'bundle1gd')
112 if v is not None:
112 if v is not None:
113 return v
113 return v
114
114
115 return ui.configbool('server', 'bundle1')
115 return ui.configbool('server', 'bundle1')
116
116
117 def supportedcompengines(ui, role):
118 """Obtain the list of supported compression engines for a request."""
119 assert role in (util.CLIENTROLE, util.SERVERROLE)
120
121 compengines = util.compengines.supportedwireengines(role)
122
123 # Allow config to override default list and ordering.
124 if role == util.SERVERROLE:
125 configengines = ui.configlist('server', 'compressionengines')
126 config = 'server.compressionengines'
127 else:
128 # This is currently implemented mainly to facilitate testing. In most
129 # cases, the server should be in charge of choosing a compression engine
130 # because a server has the most to lose from a sub-optimal choice. (e.g.
131 # CPU DoS due to an expensive engine or a network DoS due to poor
132 # compression ratio).
133 configengines = ui.configlist('experimental',
134 'clientcompressionengines')
135 config = 'experimental.clientcompressionengines'
136
137 # No explicit config. Filter out the ones that aren't supposed to be
138 # advertised and return default ordering.
139 if not configengines:
140 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
141 return [e for e in compengines
142 if getattr(e.wireprotosupport(), attr) > 0]
143
144 # If compression engines are listed in the config, assume there is a good
145 # reason for it (like server operators wanting to achieve specific
146 # performance characteristics). So fail fast if the config references
147 # unusable compression engines.
148 validnames = set(e.name() for e in compengines)
149 invalidnames = set(e for e in configengines if e not in validnames)
150 if invalidnames:
151 raise error.Abort(_('invalid compression engine defined in %s: %s') %
152 (config, ', '.join(sorted(invalidnames))))
153
154 compengines = [e for e in compengines if e.name() in configengines]
155 compengines = sorted(compengines,
156 key=lambda e: configengines.index(e.name()))
157
158 if not compengines:
159 raise error.Abort(_('%s config option does not specify any known '
160 'compression engines') % config,
161 hint=_('usable compression engines: %s') %
162 ', '.sorted(validnames))
163
164 return compengines
165
166 # For version 1 transports.
117 # For version 1 transports.
167 commands = wireprototypes.commanddict()
118 commands = wireprototypes.commanddict()
168
119
169 # For version 2 transports.
120 # For version 2 transports.
170 commandsv2 = wireprototypes.commanddict()
121 commandsv2 = wireprototypes.commanddict()
171
122
172 def wireprotocommand(name, args=None, permission='push'):
123 def wireprotocommand(name, args=None, permission='push'):
173 """Decorator to declare a wire protocol command.
124 """Decorator to declare a wire protocol command.
174
125
175 ``name`` is the name of the wire protocol command being provided.
126 ``name`` is the name of the wire protocol command being provided.
176
127
177 ``args`` defines the named arguments accepted by the command. It is
128 ``args`` defines the named arguments accepted by the command. It is
178 a space-delimited list of argument names. ``*`` denotes a special value
129 a space-delimited list of argument names. ``*`` denotes a special value
179 that says to accept all named arguments.
130 that says to accept all named arguments.
180
131
181 ``permission`` defines the permission type needed to run this command.
132 ``permission`` defines the permission type needed to run this command.
182 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
133 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
183 respectively. Default is to assume command requires ``push`` permissions
134 respectively. Default is to assume command requires ``push`` permissions
184 because otherwise commands not declaring their permissions could modify
135 because otherwise commands not declaring their permissions could modify
185 a repository that is supposed to be read-only.
136 a repository that is supposed to be read-only.
186 """
137 """
187 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
138 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
188 if v['version'] == 1}
139 if v['version'] == 1}
189
140
190 # Because SSHv2 is a mirror of SSHv1, we allow "batch" commands through to
141 # Because SSHv2 is a mirror of SSHv1, we allow "batch" commands through to
191 # SSHv2.
142 # SSHv2.
192 # TODO undo this hack when SSH is using the unified frame protocol.
143 # TODO undo this hack when SSH is using the unified frame protocol.
193 if name == b'batch':
144 if name == b'batch':
194 transports.add(wireprototypes.SSHV2)
145 transports.add(wireprototypes.SSHV2)
195
146
196 if permission not in ('push', 'pull'):
147 if permission not in ('push', 'pull'):
197 raise error.ProgrammingError('invalid wire protocol permission; '
148 raise error.ProgrammingError('invalid wire protocol permission; '
198 'got %s; expected "push" or "pull"' %
149 'got %s; expected "push" or "pull"' %
199 permission)
150 permission)
200
151
201 if args is None:
152 if args is None:
202 args = ''
153 args = ''
203
154
204 if not isinstance(args, bytes):
155 if not isinstance(args, bytes):
205 raise error.ProgrammingError('arguments for version 1 commands '
156 raise error.ProgrammingError('arguments for version 1 commands '
206 'must be declared as bytes')
157 'must be declared as bytes')
207
158
208 def register(func):
159 def register(func):
209 if name in commands:
160 if name in commands:
210 raise error.ProgrammingError('%s command already registered '
161 raise error.ProgrammingError('%s command already registered '
211 'for version 1' % name)
162 'for version 1' % name)
212 commands[name] = wireprototypes.commandentry(
163 commands[name] = wireprototypes.commandentry(
213 func, args=args, transports=transports, permission=permission)
164 func, args=args, transports=transports, permission=permission)
214
165
215 return func
166 return func
216 return register
167 return register
217
168
218 # TODO define a more appropriate permissions type to use for this.
169 # TODO define a more appropriate permissions type to use for this.
219 @wireprotocommand('batch', 'cmds *', permission='pull')
170 @wireprotocommand('batch', 'cmds *', permission='pull')
220 def batch(repo, proto, cmds, others):
171 def batch(repo, proto, cmds, others):
221 unescapearg = wireprototypes.unescapebatcharg
172 unescapearg = wireprototypes.unescapebatcharg
222 repo = repo.filtered("served")
173 repo = repo.filtered("served")
223 res = []
174 res = []
224 for pair in cmds.split(';'):
175 for pair in cmds.split(';'):
225 op, args = pair.split(' ', 1)
176 op, args = pair.split(' ', 1)
226 vals = {}
177 vals = {}
227 for a in args.split(','):
178 for a in args.split(','):
228 if a:
179 if a:
229 n, v = a.split('=')
180 n, v = a.split('=')
230 vals[unescapearg(n)] = unescapearg(v)
181 vals[unescapearg(n)] = unescapearg(v)
231 func, spec = commands[op]
182 func, spec = commands[op]
232
183
233 # Validate that client has permissions to perform this command.
184 # Validate that client has permissions to perform this command.
234 perm = commands[op].permission
185 perm = commands[op].permission
235 assert perm in ('push', 'pull')
186 assert perm in ('push', 'pull')
236 proto.checkperm(perm)
187 proto.checkperm(perm)
237
188
238 if spec:
189 if spec:
239 keys = spec.split()
190 keys = spec.split()
240 data = {}
191 data = {}
241 for k in keys:
192 for k in keys:
242 if k == '*':
193 if k == '*':
243 star = {}
194 star = {}
244 for key in vals.keys():
195 for key in vals.keys():
245 if key not in keys:
196 if key not in keys:
246 star[key] = vals[key]
197 star[key] = vals[key]
247 data['*'] = star
198 data['*'] = star
248 else:
199 else:
249 data[k] = vals[k]
200 data[k] = vals[k]
250 result = func(repo, proto, *[data[k] for k in keys])
201 result = func(repo, proto, *[data[k] for k in keys])
251 else:
202 else:
252 result = func(repo, proto)
203 result = func(repo, proto)
253 if isinstance(result, wireprototypes.ooberror):
204 if isinstance(result, wireprototypes.ooberror):
254 return result
205 return result
255
206
256 # For now, all batchable commands must return bytesresponse or
207 # For now, all batchable commands must return bytesresponse or
257 # raw bytes (for backwards compatibility).
208 # raw bytes (for backwards compatibility).
258 assert isinstance(result, (wireprototypes.bytesresponse, bytes))
209 assert isinstance(result, (wireprototypes.bytesresponse, bytes))
259 if isinstance(result, wireprototypes.bytesresponse):
210 if isinstance(result, wireprototypes.bytesresponse):
260 result = result.data
211 result = result.data
261 res.append(wireprototypes.escapebatcharg(result))
212 res.append(wireprototypes.escapebatcharg(result))
262
213
263 return wireprototypes.bytesresponse(';'.join(res))
214 return wireprototypes.bytesresponse(';'.join(res))
264
215
265 @wireprotocommand('between', 'pairs', permission='pull')
216 @wireprotocommand('between', 'pairs', permission='pull')
266 def between(repo, proto, pairs):
217 def between(repo, proto, pairs):
267 pairs = [wireprototypes.decodelist(p, '-') for p in pairs.split(" ")]
218 pairs = [wireprototypes.decodelist(p, '-') for p in pairs.split(" ")]
268 r = []
219 r = []
269 for b in repo.between(pairs):
220 for b in repo.between(pairs):
270 r.append(wireprototypes.encodelist(b) + "\n")
221 r.append(wireprototypes.encodelist(b) + "\n")
271
222
272 return wireprototypes.bytesresponse(''.join(r))
223 return wireprototypes.bytesresponse(''.join(r))
273
224
274 @wireprotocommand('branchmap', permission='pull')
225 @wireprotocommand('branchmap', permission='pull')
275 def branchmap(repo, proto):
226 def branchmap(repo, proto):
276 branchmap = repo.branchmap()
227 branchmap = repo.branchmap()
277 heads = []
228 heads = []
278 for branch, nodes in branchmap.iteritems():
229 for branch, nodes in branchmap.iteritems():
279 branchname = urlreq.quote(encoding.fromlocal(branch))
230 branchname = urlreq.quote(encoding.fromlocal(branch))
280 branchnodes = wireprototypes.encodelist(nodes)
231 branchnodes = wireprototypes.encodelist(nodes)
281 heads.append('%s %s' % (branchname, branchnodes))
232 heads.append('%s %s' % (branchname, branchnodes))
282
233
283 return wireprototypes.bytesresponse('\n'.join(heads))
234 return wireprototypes.bytesresponse('\n'.join(heads))
284
235
285 @wireprotocommand('branches', 'nodes', permission='pull')
236 @wireprotocommand('branches', 'nodes', permission='pull')
286 def branches(repo, proto, nodes):
237 def branches(repo, proto, nodes):
287 nodes = wireprototypes.decodelist(nodes)
238 nodes = wireprototypes.decodelist(nodes)
288 r = []
239 r = []
289 for b in repo.branches(nodes):
240 for b in repo.branches(nodes):
290 r.append(wireprototypes.encodelist(b) + "\n")
241 r.append(wireprototypes.encodelist(b) + "\n")
291
242
292 return wireprototypes.bytesresponse(''.join(r))
243 return wireprototypes.bytesresponse(''.join(r))
293
244
294 @wireprotocommand('clonebundles', '', permission='pull')
245 @wireprotocommand('clonebundles', '', permission='pull')
295 def clonebundles(repo, proto):
246 def clonebundles(repo, proto):
296 """Server command for returning info for available bundles to seed clones.
247 """Server command for returning info for available bundles to seed clones.
297
248
298 Clients will parse this response and determine what bundle to fetch.
249 Clients will parse this response and determine what bundle to fetch.
299
250
300 Extensions may wrap this command to filter or dynamically emit data
251 Extensions may wrap this command to filter or dynamically emit data
301 depending on the request. e.g. you could advertise URLs for the closest
252 depending on the request. e.g. you could advertise URLs for the closest
302 data center given the client's IP address.
253 data center given the client's IP address.
303 """
254 """
304 return wireprototypes.bytesresponse(
255 return wireprototypes.bytesresponse(
305 repo.vfs.tryread('clonebundles.manifest'))
256 repo.vfs.tryread('clonebundles.manifest'))
306
257
307 wireprotocaps = ['lookup', 'branchmap', 'pushkey',
258 wireprotocaps = ['lookup', 'branchmap', 'pushkey',
308 'known', 'getbundle', 'unbundlehash']
259 'known', 'getbundle', 'unbundlehash']
309
260
310 def _capabilities(repo, proto):
261 def _capabilities(repo, proto):
311 """return a list of capabilities for a repo
262 """return a list of capabilities for a repo
312
263
313 This function exists to allow extensions to easily wrap capabilities
264 This function exists to allow extensions to easily wrap capabilities
314 computation
265 computation
315
266
316 - returns a lists: easy to alter
267 - returns a lists: easy to alter
317 - change done here will be propagated to both `capabilities` and `hello`
268 - change done here will be propagated to both `capabilities` and `hello`
318 command without any other action needed.
269 command without any other action needed.
319 """
270 """
320 # copy to prevent modification of the global list
271 # copy to prevent modification of the global list
321 caps = list(wireprotocaps)
272 caps = list(wireprotocaps)
322
273
323 # Command of same name as capability isn't exposed to version 1 of
274 # Command of same name as capability isn't exposed to version 1 of
324 # transports. So conditionally add it.
275 # transports. So conditionally add it.
325 if commands.commandavailable('changegroupsubset', proto):
276 if commands.commandavailable('changegroupsubset', proto):
326 caps.append('changegroupsubset')
277 caps.append('changegroupsubset')
327
278
328 if streamclone.allowservergeneration(repo):
279 if streamclone.allowservergeneration(repo):
329 if repo.ui.configbool('server', 'preferuncompressed'):
280 if repo.ui.configbool('server', 'preferuncompressed'):
330 caps.append('stream-preferred')
281 caps.append('stream-preferred')
331 requiredformats = repo.requirements & repo.supportedformats
282 requiredformats = repo.requirements & repo.supportedformats
332 # if our local revlogs are just revlogv1, add 'stream' cap
283 # if our local revlogs are just revlogv1, add 'stream' cap
333 if not requiredformats - {'revlogv1'}:
284 if not requiredformats - {'revlogv1'}:
334 caps.append('stream')
285 caps.append('stream')
335 # otherwise, add 'streamreqs' detailing our local revlog format
286 # otherwise, add 'streamreqs' detailing our local revlog format
336 else:
287 else:
337 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
288 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
338 if repo.ui.configbool('experimental', 'bundle2-advertise'):
289 if repo.ui.configbool('experimental', 'bundle2-advertise'):
339 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
290 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
340 caps.append('bundle2=' + urlreq.quote(capsblob))
291 caps.append('bundle2=' + urlreq.quote(capsblob))
341 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
292 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
342
293
343 return proto.addcapabilities(repo, caps)
294 return proto.addcapabilities(repo, caps)
344
295
345 # If you are writing an extension and consider wrapping this function. Wrap
296 # If you are writing an extension and consider wrapping this function. Wrap
346 # `_capabilities` instead.
297 # `_capabilities` instead.
347 @wireprotocommand('capabilities', permission='pull')
298 @wireprotocommand('capabilities', permission='pull')
348 def capabilities(repo, proto):
299 def capabilities(repo, proto):
349 caps = _capabilities(repo, proto)
300 caps = _capabilities(repo, proto)
350 return wireprototypes.bytesresponse(' '.join(sorted(caps)))
301 return wireprototypes.bytesresponse(' '.join(sorted(caps)))
351
302
352 @wireprotocommand('changegroup', 'roots', permission='pull')
303 @wireprotocommand('changegroup', 'roots', permission='pull')
353 def changegroup(repo, proto, roots):
304 def changegroup(repo, proto, roots):
354 nodes = wireprototypes.decodelist(roots)
305 nodes = wireprototypes.decodelist(roots)
355 outgoing = discovery.outgoing(repo, missingroots=nodes,
306 outgoing = discovery.outgoing(repo, missingroots=nodes,
356 missingheads=repo.heads())
307 missingheads=repo.heads())
357 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
308 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
358 gen = iter(lambda: cg.read(32768), '')
309 gen = iter(lambda: cg.read(32768), '')
359 return wireprototypes.streamres(gen=gen)
310 return wireprototypes.streamres(gen=gen)
360
311
361 @wireprotocommand('changegroupsubset', 'bases heads',
312 @wireprotocommand('changegroupsubset', 'bases heads',
362 permission='pull')
313 permission='pull')
363 def changegroupsubset(repo, proto, bases, heads):
314 def changegroupsubset(repo, proto, bases, heads):
364 bases = wireprototypes.decodelist(bases)
315 bases = wireprototypes.decodelist(bases)
365 heads = wireprototypes.decodelist(heads)
316 heads = wireprototypes.decodelist(heads)
366 outgoing = discovery.outgoing(repo, missingroots=bases,
317 outgoing = discovery.outgoing(repo, missingroots=bases,
367 missingheads=heads)
318 missingheads=heads)
368 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
319 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
369 gen = iter(lambda: cg.read(32768), '')
320 gen = iter(lambda: cg.read(32768), '')
370 return wireprototypes.streamres(gen=gen)
321 return wireprototypes.streamres(gen=gen)
371
322
372 @wireprotocommand('debugwireargs', 'one two *',
323 @wireprotocommand('debugwireargs', 'one two *',
373 permission='pull')
324 permission='pull')
374 def debugwireargs(repo, proto, one, two, others):
325 def debugwireargs(repo, proto, one, two, others):
375 # only accept optional args from the known set
326 # only accept optional args from the known set
376 opts = options('debugwireargs', ['three', 'four'], others)
327 opts = options('debugwireargs', ['three', 'four'], others)
377 return wireprototypes.bytesresponse(repo.debugwireargs(
328 return wireprototypes.bytesresponse(repo.debugwireargs(
378 one, two, **pycompat.strkwargs(opts)))
329 one, two, **pycompat.strkwargs(opts)))
379
330
380 def find_pullbundle(repo, proto, opts, clheads, heads, common):
331 def find_pullbundle(repo, proto, opts, clheads, heads, common):
381 """Return a file object for the first matching pullbundle.
332 """Return a file object for the first matching pullbundle.
382
333
383 Pullbundles are specified in .hg/pullbundles.manifest similar to
334 Pullbundles are specified in .hg/pullbundles.manifest similar to
384 clonebundles.
335 clonebundles.
385 For each entry, the bundle specification is checked for compatibility:
336 For each entry, the bundle specification is checked for compatibility:
386 - Client features vs the BUNDLESPEC.
337 - Client features vs the BUNDLESPEC.
387 - Revisions shared with the clients vs base revisions of the bundle.
338 - Revisions shared with the clients vs base revisions of the bundle.
388 A bundle can be applied only if all its base revisions are known by
339 A bundle can be applied only if all its base revisions are known by
389 the client.
340 the client.
390 - At least one leaf of the bundle's DAG is missing on the client.
341 - At least one leaf of the bundle's DAG is missing on the client.
391 - Every leaf of the bundle's DAG is part of node set the client wants.
342 - Every leaf of the bundle's DAG is part of node set the client wants.
392 E.g. do not send a bundle of all changes if the client wants only
343 E.g. do not send a bundle of all changes if the client wants only
393 one specific branch of many.
344 one specific branch of many.
394 """
345 """
395 def decodehexstring(s):
346 def decodehexstring(s):
396 return set([h.decode('hex') for h in s.split(';')])
347 return set([h.decode('hex') for h in s.split(';')])
397
348
398 manifest = repo.vfs.tryread('pullbundles.manifest')
349 manifest = repo.vfs.tryread('pullbundles.manifest')
399 if not manifest:
350 if not manifest:
400 return None
351 return None
401 res = exchange.parseclonebundlesmanifest(repo, manifest)
352 res = exchange.parseclonebundlesmanifest(repo, manifest)
402 res = exchange.filterclonebundleentries(repo, res)
353 res = exchange.filterclonebundleentries(repo, res)
403 if not res:
354 if not res:
404 return None
355 return None
405 cl = repo.changelog
356 cl = repo.changelog
406 heads_anc = cl.ancestors([cl.rev(rev) for rev in heads], inclusive=True)
357 heads_anc = cl.ancestors([cl.rev(rev) for rev in heads], inclusive=True)
407 common_anc = cl.ancestors([cl.rev(rev) for rev in common], inclusive=True)
358 common_anc = cl.ancestors([cl.rev(rev) for rev in common], inclusive=True)
408 compformats = clientcompressionsupport(proto)
359 compformats = clientcompressionsupport(proto)
409 for entry in res:
360 for entry in res:
410 if 'COMPRESSION' in entry and entry['COMPRESSION'] not in compformats:
361 if 'COMPRESSION' in entry and entry['COMPRESSION'] not in compformats:
411 continue
362 continue
412 # No test yet for VERSION, since V2 is supported by any client
363 # No test yet for VERSION, since V2 is supported by any client
413 # that advertises partial pulls
364 # that advertises partial pulls
414 if 'heads' in entry:
365 if 'heads' in entry:
415 try:
366 try:
416 bundle_heads = decodehexstring(entry['heads'])
367 bundle_heads = decodehexstring(entry['heads'])
417 except TypeError:
368 except TypeError:
418 # Bad heads entry
369 # Bad heads entry
419 continue
370 continue
420 if bundle_heads.issubset(common):
371 if bundle_heads.issubset(common):
421 continue # Nothing new
372 continue # Nothing new
422 if all(cl.rev(rev) in common_anc for rev in bundle_heads):
373 if all(cl.rev(rev) in common_anc for rev in bundle_heads):
423 continue # Still nothing new
374 continue # Still nothing new
424 if any(cl.rev(rev) not in heads_anc and
375 if any(cl.rev(rev) not in heads_anc and
425 cl.rev(rev) not in common_anc for rev in bundle_heads):
376 cl.rev(rev) not in common_anc for rev in bundle_heads):
426 continue
377 continue
427 if 'bases' in entry:
378 if 'bases' in entry:
428 try:
379 try:
429 bundle_bases = decodehexstring(entry['bases'])
380 bundle_bases = decodehexstring(entry['bases'])
430 except TypeError:
381 except TypeError:
431 # Bad bases entry
382 # Bad bases entry
432 continue
383 continue
433 if not all(cl.rev(rev) in common_anc for rev in bundle_bases):
384 if not all(cl.rev(rev) in common_anc for rev in bundle_bases):
434 continue
385 continue
435 path = entry['URL']
386 path = entry['URL']
436 repo.ui.debug('sending pullbundle "%s"\n' % path)
387 repo.ui.debug('sending pullbundle "%s"\n' % path)
437 try:
388 try:
438 return repo.vfs.open(path)
389 return repo.vfs.open(path)
439 except IOError:
390 except IOError:
440 repo.ui.debug('pullbundle "%s" not accessible\n' % path)
391 repo.ui.debug('pullbundle "%s" not accessible\n' % path)
441 continue
392 continue
442 return None
393 return None
443
394
444 @wireprotocommand('getbundle', '*', permission='pull')
395 @wireprotocommand('getbundle', '*', permission='pull')
445 def getbundle(repo, proto, others):
396 def getbundle(repo, proto, others):
446 opts = options('getbundle', wireprototypes.GETBUNDLE_ARGUMENTS.keys(),
397 opts = options('getbundle', wireprototypes.GETBUNDLE_ARGUMENTS.keys(),
447 others)
398 others)
448 for k, v in opts.iteritems():
399 for k, v in opts.iteritems():
449 keytype = wireprototypes.GETBUNDLE_ARGUMENTS[k]
400 keytype = wireprototypes.GETBUNDLE_ARGUMENTS[k]
450 if keytype == 'nodes':
401 if keytype == 'nodes':
451 opts[k] = wireprototypes.decodelist(v)
402 opts[k] = wireprototypes.decodelist(v)
452 elif keytype == 'csv':
403 elif keytype == 'csv':
453 opts[k] = list(v.split(','))
404 opts[k] = list(v.split(','))
454 elif keytype == 'scsv':
405 elif keytype == 'scsv':
455 opts[k] = set(v.split(','))
406 opts[k] = set(v.split(','))
456 elif keytype == 'boolean':
407 elif keytype == 'boolean':
457 # Client should serialize False as '0', which is a non-empty string
408 # Client should serialize False as '0', which is a non-empty string
458 # so it evaluates as a True bool.
409 # so it evaluates as a True bool.
459 if v == '0':
410 if v == '0':
460 opts[k] = False
411 opts[k] = False
461 else:
412 else:
462 opts[k] = bool(v)
413 opts[k] = bool(v)
463 elif keytype != 'plain':
414 elif keytype != 'plain':
464 raise KeyError('unknown getbundle option type %s'
415 raise KeyError('unknown getbundle option type %s'
465 % keytype)
416 % keytype)
466
417
467 if not bundle1allowed(repo, 'pull'):
418 if not bundle1allowed(repo, 'pull'):
468 if not exchange.bundle2requested(opts.get('bundlecaps')):
419 if not exchange.bundle2requested(opts.get('bundlecaps')):
469 if proto.name == 'http-v1':
420 if proto.name == 'http-v1':
470 return wireprototypes.ooberror(bundle2required)
421 return wireprototypes.ooberror(bundle2required)
471 raise error.Abort(bundle2requiredmain,
422 raise error.Abort(bundle2requiredmain,
472 hint=bundle2requiredhint)
423 hint=bundle2requiredhint)
473
424
474 prefercompressed = True
425 prefercompressed = True
475
426
476 try:
427 try:
477 clheads = set(repo.changelog.heads())
428 clheads = set(repo.changelog.heads())
478 heads = set(opts.get('heads', set()))
429 heads = set(opts.get('heads', set()))
479 common = set(opts.get('common', set()))
430 common = set(opts.get('common', set()))
480 common.discard(nullid)
431 common.discard(nullid)
481 if (repo.ui.configbool('server', 'pullbundle') and
432 if (repo.ui.configbool('server', 'pullbundle') and
482 'partial-pull' in proto.getprotocaps()):
433 'partial-pull' in proto.getprotocaps()):
483 # Check if a pre-built bundle covers this request.
434 # Check if a pre-built bundle covers this request.
484 bundle = find_pullbundle(repo, proto, opts, clheads, heads, common)
435 bundle = find_pullbundle(repo, proto, opts, clheads, heads, common)
485 if bundle:
436 if bundle:
486 return wireprototypes.streamres(gen=util.filechunkiter(bundle),
437 return wireprototypes.streamres(gen=util.filechunkiter(bundle),
487 prefer_uncompressed=True)
438 prefer_uncompressed=True)
488
439
489 if repo.ui.configbool('server', 'disablefullbundle'):
440 if repo.ui.configbool('server', 'disablefullbundle'):
490 # Check to see if this is a full clone.
441 # Check to see if this is a full clone.
491 changegroup = opts.get('cg', True)
442 changegroup = opts.get('cg', True)
492 if changegroup and not common and clheads == heads:
443 if changegroup and not common and clheads == heads:
493 raise error.Abort(
444 raise error.Abort(
494 _('server has pull-based clones disabled'),
445 _('server has pull-based clones disabled'),
495 hint=_('remove --pull if specified or upgrade Mercurial'))
446 hint=_('remove --pull if specified or upgrade Mercurial'))
496
447
497 info, chunks = exchange.getbundlechunks(repo, 'serve',
448 info, chunks = exchange.getbundlechunks(repo, 'serve',
498 **pycompat.strkwargs(opts))
449 **pycompat.strkwargs(opts))
499 prefercompressed = info.get('prefercompressed', True)
450 prefercompressed = info.get('prefercompressed', True)
500 except error.Abort as exc:
451 except error.Abort as exc:
501 # cleanly forward Abort error to the client
452 # cleanly forward Abort error to the client
502 if not exchange.bundle2requested(opts.get('bundlecaps')):
453 if not exchange.bundle2requested(opts.get('bundlecaps')):
503 if proto.name == 'http-v1':
454 if proto.name == 'http-v1':
504 return wireprototypes.ooberror(pycompat.bytestr(exc) + '\n')
455 return wireprototypes.ooberror(pycompat.bytestr(exc) + '\n')
505 raise # cannot do better for bundle1 + ssh
456 raise # cannot do better for bundle1 + ssh
506 # bundle2 request expect a bundle2 reply
457 # bundle2 request expect a bundle2 reply
507 bundler = bundle2.bundle20(repo.ui)
458 bundler = bundle2.bundle20(repo.ui)
508 manargs = [('message', pycompat.bytestr(exc))]
459 manargs = [('message', pycompat.bytestr(exc))]
509 advargs = []
460 advargs = []
510 if exc.hint is not None:
461 if exc.hint is not None:
511 advargs.append(('hint', exc.hint))
462 advargs.append(('hint', exc.hint))
512 bundler.addpart(bundle2.bundlepart('error:abort',
463 bundler.addpart(bundle2.bundlepart('error:abort',
513 manargs, advargs))
464 manargs, advargs))
514 chunks = bundler.getchunks()
465 chunks = bundler.getchunks()
515 prefercompressed = False
466 prefercompressed = False
516
467
517 return wireprototypes.streamres(
468 return wireprototypes.streamres(
518 gen=chunks, prefer_uncompressed=not prefercompressed)
469 gen=chunks, prefer_uncompressed=not prefercompressed)
519
470
520 @wireprotocommand('heads', permission='pull')
471 @wireprotocommand('heads', permission='pull')
521 def heads(repo, proto):
472 def heads(repo, proto):
522 h = repo.heads()
473 h = repo.heads()
523 return wireprototypes.bytesresponse(wireprototypes.encodelist(h) + '\n')
474 return wireprototypes.bytesresponse(wireprototypes.encodelist(h) + '\n')
524
475
525 @wireprotocommand('hello', permission='pull')
476 @wireprotocommand('hello', permission='pull')
526 def hello(repo, proto):
477 def hello(repo, proto):
527 """Called as part of SSH handshake to obtain server info.
478 """Called as part of SSH handshake to obtain server info.
528
479
529 Returns a list of lines describing interesting things about the
480 Returns a list of lines describing interesting things about the
530 server, in an RFC822-like format.
481 server, in an RFC822-like format.
531
482
532 Currently, the only one defined is ``capabilities``, which consists of a
483 Currently, the only one defined is ``capabilities``, which consists of a
533 line of space separated tokens describing server abilities:
484 line of space separated tokens describing server abilities:
534
485
535 capabilities: <token0> <token1> <token2>
486 capabilities: <token0> <token1> <token2>
536 """
487 """
537 caps = capabilities(repo, proto).data
488 caps = capabilities(repo, proto).data
538 return wireprototypes.bytesresponse('capabilities: %s\n' % caps)
489 return wireprototypes.bytesresponse('capabilities: %s\n' % caps)
539
490
540 @wireprotocommand('listkeys', 'namespace', permission='pull')
491 @wireprotocommand('listkeys', 'namespace', permission='pull')
541 def listkeys(repo, proto, namespace):
492 def listkeys(repo, proto, namespace):
542 d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
493 d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
543 return wireprototypes.bytesresponse(pushkeymod.encodekeys(d))
494 return wireprototypes.bytesresponse(pushkeymod.encodekeys(d))
544
495
545 @wireprotocommand('lookup', 'key', permission='pull')
496 @wireprotocommand('lookup', 'key', permission='pull')
546 def lookup(repo, proto, key):
497 def lookup(repo, proto, key):
547 try:
498 try:
548 k = encoding.tolocal(key)
499 k = encoding.tolocal(key)
549 n = repo.lookup(k)
500 n = repo.lookup(k)
550 r = hex(n)
501 r = hex(n)
551 success = 1
502 success = 1
552 except Exception as inst:
503 except Exception as inst:
553 r = stringutil.forcebytestr(inst)
504 r = stringutil.forcebytestr(inst)
554 success = 0
505 success = 0
555 return wireprototypes.bytesresponse('%d %s\n' % (success, r))
506 return wireprototypes.bytesresponse('%d %s\n' % (success, r))
556
507
557 @wireprotocommand('known', 'nodes *', permission='pull')
508 @wireprotocommand('known', 'nodes *', permission='pull')
558 def known(repo, proto, nodes, others):
509 def known(repo, proto, nodes, others):
559 v = ''.join(b and '1' or '0'
510 v = ''.join(b and '1' or '0'
560 for b in repo.known(wireprototypes.decodelist(nodes)))
511 for b in repo.known(wireprototypes.decodelist(nodes)))
561 return wireprototypes.bytesresponse(v)
512 return wireprototypes.bytesresponse(v)
562
513
563 @wireprotocommand('protocaps', 'caps', permission='pull')
514 @wireprotocommand('protocaps', 'caps', permission='pull')
564 def protocaps(repo, proto, caps):
515 def protocaps(repo, proto, caps):
565 if proto.name == wireprototypes.SSHV1:
516 if proto.name == wireprototypes.SSHV1:
566 proto._protocaps = set(caps.split(' '))
517 proto._protocaps = set(caps.split(' '))
567 return wireprototypes.bytesresponse('OK')
518 return wireprototypes.bytesresponse('OK')
568
519
569 @wireprotocommand('pushkey', 'namespace key old new', permission='push')
520 @wireprotocommand('pushkey', 'namespace key old new', permission='push')
570 def pushkey(repo, proto, namespace, key, old, new):
521 def pushkey(repo, proto, namespace, key, old, new):
571 # compatibility with pre-1.8 clients which were accidentally
522 # compatibility with pre-1.8 clients which were accidentally
572 # sending raw binary nodes rather than utf-8-encoded hex
523 # sending raw binary nodes rather than utf-8-encoded hex
573 if len(new) == 20 and stringutil.escapestr(new) != new:
524 if len(new) == 20 and stringutil.escapestr(new) != new:
574 # looks like it could be a binary node
525 # looks like it could be a binary node
575 try:
526 try:
576 new.decode('utf-8')
527 new.decode('utf-8')
577 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
528 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
578 except UnicodeDecodeError:
529 except UnicodeDecodeError:
579 pass # binary, leave unmodified
530 pass # binary, leave unmodified
580 else:
531 else:
581 new = encoding.tolocal(new) # normal path
532 new = encoding.tolocal(new) # normal path
582
533
583 with proto.mayberedirectstdio() as output:
534 with proto.mayberedirectstdio() as output:
584 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
535 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
585 encoding.tolocal(old), new) or False
536 encoding.tolocal(old), new) or False
586
537
587 output = output.getvalue() if output else ''
538 output = output.getvalue() if output else ''
588 return wireprototypes.bytesresponse('%d\n%s' % (int(r), output))
539 return wireprototypes.bytesresponse('%d\n%s' % (int(r), output))
589
540
590 @wireprotocommand('stream_out', permission='pull')
541 @wireprotocommand('stream_out', permission='pull')
591 def stream(repo, proto):
542 def stream(repo, proto):
592 '''If the server supports streaming clone, it advertises the "stream"
543 '''If the server supports streaming clone, it advertises the "stream"
593 capability with a value representing the version and flags of the repo
544 capability with a value representing the version and flags of the repo
594 it is serving. Client checks to see if it understands the format.
545 it is serving. Client checks to see if it understands the format.
595 '''
546 '''
596 return wireprototypes.streamreslegacy(
547 return wireprototypes.streamreslegacy(
597 streamclone.generatev1wireproto(repo))
548 streamclone.generatev1wireproto(repo))
598
549
599 @wireprotocommand('unbundle', 'heads', permission='push')
550 @wireprotocommand('unbundle', 'heads', permission='push')
600 def unbundle(repo, proto, heads):
551 def unbundle(repo, proto, heads):
601 their_heads = wireprototypes.decodelist(heads)
552 their_heads = wireprototypes.decodelist(heads)
602
553
603 with proto.mayberedirectstdio() as output:
554 with proto.mayberedirectstdio() as output:
604 try:
555 try:
605 exchange.check_heads(repo, their_heads, 'preparing changes')
556 exchange.check_heads(repo, their_heads, 'preparing changes')
606 cleanup = lambda: None
557 cleanup = lambda: None
607 try:
558 try:
608 payload = proto.getpayload()
559 payload = proto.getpayload()
609 if repo.ui.configbool('server', 'streamunbundle'):
560 if repo.ui.configbool('server', 'streamunbundle'):
610 def cleanup():
561 def cleanup():
611 # Ensure that the full payload is consumed, so
562 # Ensure that the full payload is consumed, so
612 # that the connection doesn't contain trailing garbage.
563 # that the connection doesn't contain trailing garbage.
613 for p in payload:
564 for p in payload:
614 pass
565 pass
615 fp = util.chunkbuffer(payload)
566 fp = util.chunkbuffer(payload)
616 else:
567 else:
617 # write bundle data to temporary file as it can be big
568 # write bundle data to temporary file as it can be big
618 fp, tempname = None, None
569 fp, tempname = None, None
619 def cleanup():
570 def cleanup():
620 if fp:
571 if fp:
621 fp.close()
572 fp.close()
622 if tempname:
573 if tempname:
623 os.unlink(tempname)
574 os.unlink(tempname)
624 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
575 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
625 repo.ui.debug('redirecting incoming bundle to %s\n' %
576 repo.ui.debug('redirecting incoming bundle to %s\n' %
626 tempname)
577 tempname)
627 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
578 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
628 r = 0
579 r = 0
629 for p in payload:
580 for p in payload:
630 fp.write(p)
581 fp.write(p)
631 fp.seek(0)
582 fp.seek(0)
632
583
633 gen = exchange.readbundle(repo.ui, fp, None)
584 gen = exchange.readbundle(repo.ui, fp, None)
634 if (isinstance(gen, changegroupmod.cg1unpacker)
585 if (isinstance(gen, changegroupmod.cg1unpacker)
635 and not bundle1allowed(repo, 'push')):
586 and not bundle1allowed(repo, 'push')):
636 if proto.name == 'http-v1':
587 if proto.name == 'http-v1':
637 # need to special case http because stderr do not get to
588 # need to special case http because stderr do not get to
638 # the http client on failed push so we need to abuse
589 # the http client on failed push so we need to abuse
639 # some other error type to make sure the message get to
590 # some other error type to make sure the message get to
640 # the user.
591 # the user.
641 return wireprototypes.ooberror(bundle2required)
592 return wireprototypes.ooberror(bundle2required)
642 raise error.Abort(bundle2requiredmain,
593 raise error.Abort(bundle2requiredmain,
643 hint=bundle2requiredhint)
594 hint=bundle2requiredhint)
644
595
645 r = exchange.unbundle(repo, gen, their_heads, 'serve',
596 r = exchange.unbundle(repo, gen, their_heads, 'serve',
646 proto.client())
597 proto.client())
647 if util.safehasattr(r, 'addpart'):
598 if util.safehasattr(r, 'addpart'):
648 # The return looks streamable, we are in the bundle2 case
599 # The return looks streamable, we are in the bundle2 case
649 # and should return a stream.
600 # and should return a stream.
650 return wireprototypes.streamreslegacy(gen=r.getchunks())
601 return wireprototypes.streamreslegacy(gen=r.getchunks())
651 return wireprototypes.pushres(
602 return wireprototypes.pushres(
652 r, output.getvalue() if output else '')
603 r, output.getvalue() if output else '')
653
604
654 finally:
605 finally:
655 cleanup()
606 cleanup()
656
607
657 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
608 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
658 # handle non-bundle2 case first
609 # handle non-bundle2 case first
659 if not getattr(exc, 'duringunbundle2', False):
610 if not getattr(exc, 'duringunbundle2', False):
660 try:
611 try:
661 raise
612 raise
662 except error.Abort:
613 except error.Abort:
663 # The old code we moved used procutil.stderr directly.
614 # The old code we moved used procutil.stderr directly.
664 # We did not change it to minimise code change.
615 # We did not change it to minimise code change.
665 # This need to be moved to something proper.
616 # This need to be moved to something proper.
666 # Feel free to do it.
617 # Feel free to do it.
667 procutil.stderr.write("abort: %s\n" % exc)
618 procutil.stderr.write("abort: %s\n" % exc)
668 if exc.hint is not None:
619 if exc.hint is not None:
669 procutil.stderr.write("(%s)\n" % exc.hint)
620 procutil.stderr.write("(%s)\n" % exc.hint)
670 procutil.stderr.flush()
621 procutil.stderr.flush()
671 return wireprototypes.pushres(
622 return wireprototypes.pushres(
672 0, output.getvalue() if output else '')
623 0, output.getvalue() if output else '')
673 except error.PushRaced:
624 except error.PushRaced:
674 return wireprototypes.pusherr(
625 return wireprototypes.pusherr(
675 pycompat.bytestr(exc),
626 pycompat.bytestr(exc),
676 output.getvalue() if output else '')
627 output.getvalue() if output else '')
677
628
678 bundler = bundle2.bundle20(repo.ui)
629 bundler = bundle2.bundle20(repo.ui)
679 for out in getattr(exc, '_bundle2salvagedoutput', ()):
630 for out in getattr(exc, '_bundle2salvagedoutput', ()):
680 bundler.addpart(out)
631 bundler.addpart(out)
681 try:
632 try:
682 try:
633 try:
683 raise
634 raise
684 except error.PushkeyFailed as exc:
635 except error.PushkeyFailed as exc:
685 # check client caps
636 # check client caps
686 remotecaps = getattr(exc, '_replycaps', None)
637 remotecaps = getattr(exc, '_replycaps', None)
687 if (remotecaps is not None
638 if (remotecaps is not None
688 and 'pushkey' not in remotecaps.get('error', ())):
639 and 'pushkey' not in remotecaps.get('error', ())):
689 # no support remote side, fallback to Abort handler.
640 # no support remote side, fallback to Abort handler.
690 raise
641 raise
691 part = bundler.newpart('error:pushkey')
642 part = bundler.newpart('error:pushkey')
692 part.addparam('in-reply-to', exc.partid)
643 part.addparam('in-reply-to', exc.partid)
693 if exc.namespace is not None:
644 if exc.namespace is not None:
694 part.addparam('namespace', exc.namespace,
645 part.addparam('namespace', exc.namespace,
695 mandatory=False)
646 mandatory=False)
696 if exc.key is not None:
647 if exc.key is not None:
697 part.addparam('key', exc.key, mandatory=False)
648 part.addparam('key', exc.key, mandatory=False)
698 if exc.new is not None:
649 if exc.new is not None:
699 part.addparam('new', exc.new, mandatory=False)
650 part.addparam('new', exc.new, mandatory=False)
700 if exc.old is not None:
651 if exc.old is not None:
701 part.addparam('old', exc.old, mandatory=False)
652 part.addparam('old', exc.old, mandatory=False)
702 if exc.ret is not None:
653 if exc.ret is not None:
703 part.addparam('ret', exc.ret, mandatory=False)
654 part.addparam('ret', exc.ret, mandatory=False)
704 except error.BundleValueError as exc:
655 except error.BundleValueError as exc:
705 errpart = bundler.newpart('error:unsupportedcontent')
656 errpart = bundler.newpart('error:unsupportedcontent')
706 if exc.parttype is not None:
657 if exc.parttype is not None:
707 errpart.addparam('parttype', exc.parttype)
658 errpart.addparam('parttype', exc.parttype)
708 if exc.params:
659 if exc.params:
709 errpart.addparam('params', '\0'.join(exc.params))
660 errpart.addparam('params', '\0'.join(exc.params))
710 except error.Abort as exc:
661 except error.Abort as exc:
711 manargs = [('message', stringutil.forcebytestr(exc))]
662 manargs = [('message', stringutil.forcebytestr(exc))]
712 advargs = []
663 advargs = []
713 if exc.hint is not None:
664 if exc.hint is not None:
714 advargs.append(('hint', exc.hint))
665 advargs.append(('hint', exc.hint))
715 bundler.addpart(bundle2.bundlepart('error:abort',
666 bundler.addpart(bundle2.bundlepart('error:abort',
716 manargs, advargs))
667 manargs, advargs))
717 except error.PushRaced as exc:
668 except error.PushRaced as exc:
718 bundler.newpart('error:pushraced',
669 bundler.newpart('error:pushraced',
719 [('message', stringutil.forcebytestr(exc))])
670 [('message', stringutil.forcebytestr(exc))])
720 return wireprototypes.streamreslegacy(gen=bundler.getchunks())
671 return wireprototypes.streamreslegacy(gen=bundler.getchunks())
@@ -1,811 +1,812
1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 #
3 #
4 # This software may be used and distributed according to the terms of the
4 # This software may be used and distributed according to the terms of the
5 # GNU General Public License version 2 or any later version.
5 # GNU General Public License version 2 or any later version.
6
6
7 from __future__ import absolute_import
7 from __future__ import absolute_import
8
8
9 import contextlib
9 import contextlib
10 import struct
10 import struct
11 import sys
11 import sys
12 import threading
12 import threading
13
13
14 from .i18n import _
14 from .i18n import _
15 from .thirdparty import (
15 from .thirdparty import (
16 cbor,
16 cbor,
17 )
17 )
18 from .thirdparty.zope import (
18 from .thirdparty.zope import (
19 interface as zi,
19 interface as zi,
20 )
20 )
21 from . import (
21 from . import (
22 encoding,
22 encoding,
23 error,
23 error,
24 hook,
24 hook,
25 pycompat,
25 pycompat,
26 util,
26 util,
27 wireproto,
27 wireproto,
28 wireprototypes,
28 wireprototypes,
29 wireprotov2server,
29 wireprotov2server,
30 )
30 )
31 from .utils import (
31 from .utils import (
32 procutil,
32 procutil,
33 )
33 )
34
34
35 stringio = util.stringio
35 stringio = util.stringio
36
36
37 urlerr = util.urlerr
37 urlerr = util.urlerr
38 urlreq = util.urlreq
38 urlreq = util.urlreq
39
39
40 HTTP_OK = 200
40 HTTP_OK = 200
41
41
42 HGTYPE = 'application/mercurial-0.1'
42 HGTYPE = 'application/mercurial-0.1'
43 HGTYPE2 = 'application/mercurial-0.2'
43 HGTYPE2 = 'application/mercurial-0.2'
44 HGERRTYPE = 'application/hg-error'
44 HGERRTYPE = 'application/hg-error'
45
45
46 SSHV1 = wireprototypes.SSHV1
46 SSHV1 = wireprototypes.SSHV1
47 SSHV2 = wireprototypes.SSHV2
47 SSHV2 = wireprototypes.SSHV2
48
48
49 def decodevaluefromheaders(req, headerprefix):
49 def decodevaluefromheaders(req, headerprefix):
50 """Decode a long value from multiple HTTP request headers.
50 """Decode a long value from multiple HTTP request headers.
51
51
52 Returns the value as a bytes, not a str.
52 Returns the value as a bytes, not a str.
53 """
53 """
54 chunks = []
54 chunks = []
55 i = 1
55 i = 1
56 while True:
56 while True:
57 v = req.headers.get(b'%s-%d' % (headerprefix, i))
57 v = req.headers.get(b'%s-%d' % (headerprefix, i))
58 if v is None:
58 if v is None:
59 break
59 break
60 chunks.append(pycompat.bytesurl(v))
60 chunks.append(pycompat.bytesurl(v))
61 i += 1
61 i += 1
62
62
63 return ''.join(chunks)
63 return ''.join(chunks)
64
64
65 @zi.implementer(wireprototypes.baseprotocolhandler)
65 @zi.implementer(wireprototypes.baseprotocolhandler)
66 class httpv1protocolhandler(object):
66 class httpv1protocolhandler(object):
67 def __init__(self, req, ui, checkperm):
67 def __init__(self, req, ui, checkperm):
68 self._req = req
68 self._req = req
69 self._ui = ui
69 self._ui = ui
70 self._checkperm = checkperm
70 self._checkperm = checkperm
71 self._protocaps = None
71 self._protocaps = None
72
72
73 @property
73 @property
74 def name(self):
74 def name(self):
75 return 'http-v1'
75 return 'http-v1'
76
76
77 def getargs(self, args):
77 def getargs(self, args):
78 knownargs = self._args()
78 knownargs = self._args()
79 data = {}
79 data = {}
80 keys = args.split()
80 keys = args.split()
81 for k in keys:
81 for k in keys:
82 if k == '*':
82 if k == '*':
83 star = {}
83 star = {}
84 for key in knownargs.keys():
84 for key in knownargs.keys():
85 if key != 'cmd' and key not in keys:
85 if key != 'cmd' and key not in keys:
86 star[key] = knownargs[key][0]
86 star[key] = knownargs[key][0]
87 data['*'] = star
87 data['*'] = star
88 else:
88 else:
89 data[k] = knownargs[k][0]
89 data[k] = knownargs[k][0]
90 return [data[k] for k in keys]
90 return [data[k] for k in keys]
91
91
92 def _args(self):
92 def _args(self):
93 args = self._req.qsparams.asdictoflists()
93 args = self._req.qsparams.asdictoflists()
94 postlen = int(self._req.headers.get(b'X-HgArgs-Post', 0))
94 postlen = int(self._req.headers.get(b'X-HgArgs-Post', 0))
95 if postlen:
95 if postlen:
96 args.update(urlreq.parseqs(
96 args.update(urlreq.parseqs(
97 self._req.bodyfh.read(postlen), keep_blank_values=True))
97 self._req.bodyfh.read(postlen), keep_blank_values=True))
98 return args
98 return args
99
99
100 argvalue = decodevaluefromheaders(self._req, b'X-HgArg')
100 argvalue = decodevaluefromheaders(self._req, b'X-HgArg')
101 args.update(urlreq.parseqs(argvalue, keep_blank_values=True))
101 args.update(urlreq.parseqs(argvalue, keep_blank_values=True))
102 return args
102 return args
103
103
104 def getprotocaps(self):
104 def getprotocaps(self):
105 if self._protocaps is None:
105 if self._protocaps is None:
106 value = decodevaluefromheaders(self._req, b'X-HgProto')
106 value = decodevaluefromheaders(self._req, b'X-HgProto')
107 self._protocaps = set(value.split(' '))
107 self._protocaps = set(value.split(' '))
108 return self._protocaps
108 return self._protocaps
109
109
110 def getpayload(self):
110 def getpayload(self):
111 # Existing clients *always* send Content-Length.
111 # Existing clients *always* send Content-Length.
112 length = int(self._req.headers[b'Content-Length'])
112 length = int(self._req.headers[b'Content-Length'])
113
113
114 # If httppostargs is used, we need to read Content-Length
114 # If httppostargs is used, we need to read Content-Length
115 # minus the amount that was consumed by args.
115 # minus the amount that was consumed by args.
116 length -= int(self._req.headers.get(b'X-HgArgs-Post', 0))
116 length -= int(self._req.headers.get(b'X-HgArgs-Post', 0))
117 return util.filechunkiter(self._req.bodyfh, limit=length)
117 return util.filechunkiter(self._req.bodyfh, limit=length)
118
118
119 @contextlib.contextmanager
119 @contextlib.contextmanager
120 def mayberedirectstdio(self):
120 def mayberedirectstdio(self):
121 oldout = self._ui.fout
121 oldout = self._ui.fout
122 olderr = self._ui.ferr
122 olderr = self._ui.ferr
123
123
124 out = util.stringio()
124 out = util.stringio()
125
125
126 try:
126 try:
127 self._ui.fout = out
127 self._ui.fout = out
128 self._ui.ferr = out
128 self._ui.ferr = out
129 yield out
129 yield out
130 finally:
130 finally:
131 self._ui.fout = oldout
131 self._ui.fout = oldout
132 self._ui.ferr = olderr
132 self._ui.ferr = olderr
133
133
134 def client(self):
134 def client(self):
135 return 'remote:%s:%s:%s' % (
135 return 'remote:%s:%s:%s' % (
136 self._req.urlscheme,
136 self._req.urlscheme,
137 urlreq.quote(self._req.remotehost or ''),
137 urlreq.quote(self._req.remotehost or ''),
138 urlreq.quote(self._req.remoteuser or ''))
138 urlreq.quote(self._req.remoteuser or ''))
139
139
140 def addcapabilities(self, repo, caps):
140 def addcapabilities(self, repo, caps):
141 caps.append(b'batch')
141 caps.append(b'batch')
142
142
143 caps.append('httpheader=%d' %
143 caps.append('httpheader=%d' %
144 repo.ui.configint('server', 'maxhttpheaderlen'))
144 repo.ui.configint('server', 'maxhttpheaderlen'))
145 if repo.ui.configbool('experimental', 'httppostargs'):
145 if repo.ui.configbool('experimental', 'httppostargs'):
146 caps.append('httppostargs')
146 caps.append('httppostargs')
147
147
148 # FUTURE advertise 0.2rx once support is implemented
148 # FUTURE advertise 0.2rx once support is implemented
149 # FUTURE advertise minrx and mintx after consulting config option
149 # FUTURE advertise minrx and mintx after consulting config option
150 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
150 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
151
151
152 compengines = wireproto.supportedcompengines(repo.ui, util.SERVERROLE)
152 compengines = wireprototypes.supportedcompengines(repo.ui,
153 util.SERVERROLE)
153 if compengines:
154 if compengines:
154 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
155 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
155 for e in compengines)
156 for e in compengines)
156 caps.append('compression=%s' % comptypes)
157 caps.append('compression=%s' % comptypes)
157
158
158 return caps
159 return caps
159
160
160 def checkperm(self, perm):
161 def checkperm(self, perm):
161 return self._checkperm(perm)
162 return self._checkperm(perm)
162
163
163 # This method exists mostly so that extensions like remotefilelog can
164 # This method exists mostly so that extensions like remotefilelog can
164 # disable a kludgey legacy method only over http. As of early 2018,
165 # disable a kludgey legacy method only over http. As of early 2018,
165 # there are no other known users, so with any luck we can discard this
166 # there are no other known users, so with any luck we can discard this
166 # hook if remotefilelog becomes a first-party extension.
167 # hook if remotefilelog becomes a first-party extension.
167 def iscmd(cmd):
168 def iscmd(cmd):
168 return cmd in wireproto.commands
169 return cmd in wireproto.commands
169
170
170 def handlewsgirequest(rctx, req, res, checkperm):
171 def handlewsgirequest(rctx, req, res, checkperm):
171 """Possibly process a wire protocol request.
172 """Possibly process a wire protocol request.
172
173
173 If the current request is a wire protocol request, the request is
174 If the current request is a wire protocol request, the request is
174 processed by this function.
175 processed by this function.
175
176
176 ``req`` is a ``parsedrequest`` instance.
177 ``req`` is a ``parsedrequest`` instance.
177 ``res`` is a ``wsgiresponse`` instance.
178 ``res`` is a ``wsgiresponse`` instance.
178
179
179 Returns a bool indicating if the request was serviced. If set, the caller
180 Returns a bool indicating if the request was serviced. If set, the caller
180 should stop processing the request, as a response has already been issued.
181 should stop processing the request, as a response has already been issued.
181 """
182 """
182 # Avoid cycle involving hg module.
183 # Avoid cycle involving hg module.
183 from .hgweb import common as hgwebcommon
184 from .hgweb import common as hgwebcommon
184
185
185 repo = rctx.repo
186 repo = rctx.repo
186
187
187 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
188 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
188 # string parameter. If it isn't present, this isn't a wire protocol
189 # string parameter. If it isn't present, this isn't a wire protocol
189 # request.
190 # request.
190 if 'cmd' not in req.qsparams:
191 if 'cmd' not in req.qsparams:
191 return False
192 return False
192
193
193 cmd = req.qsparams['cmd']
194 cmd = req.qsparams['cmd']
194
195
195 # The "cmd" request parameter is used by both the wire protocol and hgweb.
196 # The "cmd" request parameter is used by both the wire protocol and hgweb.
196 # While not all wire protocol commands are available for all transports,
197 # While not all wire protocol commands are available for all transports,
197 # if we see a "cmd" value that resembles a known wire protocol command, we
198 # if we see a "cmd" value that resembles a known wire protocol command, we
198 # route it to a protocol handler. This is better than routing possible
199 # route it to a protocol handler. This is better than routing possible
199 # wire protocol requests to hgweb because it prevents hgweb from using
200 # wire protocol requests to hgweb because it prevents hgweb from using
200 # known wire protocol commands and it is less confusing for machine
201 # known wire protocol commands and it is less confusing for machine
201 # clients.
202 # clients.
202 if not iscmd(cmd):
203 if not iscmd(cmd):
203 return False
204 return False
204
205
205 # The "cmd" query string argument is only valid on the root path of the
206 # The "cmd" query string argument is only valid on the root path of the
206 # repo. e.g. ``/?cmd=foo``, ``/repo?cmd=foo``. URL paths within the repo
207 # repo. e.g. ``/?cmd=foo``, ``/repo?cmd=foo``. URL paths within the repo
207 # like ``/blah?cmd=foo`` are not allowed. So don't recognize the request
208 # like ``/blah?cmd=foo`` are not allowed. So don't recognize the request
208 # in this case. We send an HTTP 404 for backwards compatibility reasons.
209 # in this case. We send an HTTP 404 for backwards compatibility reasons.
209 if req.dispatchpath:
210 if req.dispatchpath:
210 res.status = hgwebcommon.statusmessage(404)
211 res.status = hgwebcommon.statusmessage(404)
211 res.headers['Content-Type'] = HGTYPE
212 res.headers['Content-Type'] = HGTYPE
212 # TODO This is not a good response to issue for this request. This
213 # TODO This is not a good response to issue for this request. This
213 # is mostly for BC for now.
214 # is mostly for BC for now.
214 res.setbodybytes('0\n%s\n' % b'Not Found')
215 res.setbodybytes('0\n%s\n' % b'Not Found')
215 return True
216 return True
216
217
217 proto = httpv1protocolhandler(req, repo.ui,
218 proto = httpv1protocolhandler(req, repo.ui,
218 lambda perm: checkperm(rctx, req, perm))
219 lambda perm: checkperm(rctx, req, perm))
219
220
220 # The permissions checker should be the only thing that can raise an
221 # The permissions checker should be the only thing that can raise an
221 # ErrorResponse. It is kind of a layer violation to catch an hgweb
222 # ErrorResponse. It is kind of a layer violation to catch an hgweb
222 # exception here. So consider refactoring into a exception type that
223 # exception here. So consider refactoring into a exception type that
223 # is associated with the wire protocol.
224 # is associated with the wire protocol.
224 try:
225 try:
225 _callhttp(repo, req, res, proto, cmd)
226 _callhttp(repo, req, res, proto, cmd)
226 except hgwebcommon.ErrorResponse as e:
227 except hgwebcommon.ErrorResponse as e:
227 for k, v in e.headers:
228 for k, v in e.headers:
228 res.headers[k] = v
229 res.headers[k] = v
229 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
230 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
230 # TODO This response body assumes the failed command was
231 # TODO This response body assumes the failed command was
231 # "unbundle." That assumption is not always valid.
232 # "unbundle." That assumption is not always valid.
232 res.setbodybytes('0\n%s\n' % pycompat.bytestr(e))
233 res.setbodybytes('0\n%s\n' % pycompat.bytestr(e))
233
234
234 return True
235 return True
235
236
236 def _availableapis(repo):
237 def _availableapis(repo):
237 apis = set()
238 apis = set()
238
239
239 # Registered APIs are made available via config options of the name of
240 # Registered APIs are made available via config options of the name of
240 # the protocol.
241 # the protocol.
241 for k, v in API_HANDLERS.items():
242 for k, v in API_HANDLERS.items():
242 section, option = v['config']
243 section, option = v['config']
243 if repo.ui.configbool(section, option):
244 if repo.ui.configbool(section, option):
244 apis.add(k)
245 apis.add(k)
245
246
246 return apis
247 return apis
247
248
248 def handlewsgiapirequest(rctx, req, res, checkperm):
249 def handlewsgiapirequest(rctx, req, res, checkperm):
249 """Handle requests to /api/*."""
250 """Handle requests to /api/*."""
250 assert req.dispatchparts[0] == b'api'
251 assert req.dispatchparts[0] == b'api'
251
252
252 repo = rctx.repo
253 repo = rctx.repo
253
254
254 # This whole URL space is experimental for now. But we want to
255 # This whole URL space is experimental for now. But we want to
255 # reserve the URL space. So, 404 all URLs if the feature isn't enabled.
256 # reserve the URL space. So, 404 all URLs if the feature isn't enabled.
256 if not repo.ui.configbool('experimental', 'web.apiserver'):
257 if not repo.ui.configbool('experimental', 'web.apiserver'):
257 res.status = b'404 Not Found'
258 res.status = b'404 Not Found'
258 res.headers[b'Content-Type'] = b'text/plain'
259 res.headers[b'Content-Type'] = b'text/plain'
259 res.setbodybytes(_('Experimental API server endpoint not enabled'))
260 res.setbodybytes(_('Experimental API server endpoint not enabled'))
260 return
261 return
261
262
262 # The URL space is /api/<protocol>/*. The structure of URLs under varies
263 # The URL space is /api/<protocol>/*. The structure of URLs under varies
263 # by <protocol>.
264 # by <protocol>.
264
265
265 availableapis = _availableapis(repo)
266 availableapis = _availableapis(repo)
266
267
267 # Requests to /api/ list available APIs.
268 # Requests to /api/ list available APIs.
268 if req.dispatchparts == [b'api']:
269 if req.dispatchparts == [b'api']:
269 res.status = b'200 OK'
270 res.status = b'200 OK'
270 res.headers[b'Content-Type'] = b'text/plain'
271 res.headers[b'Content-Type'] = b'text/plain'
271 lines = [_('APIs can be accessed at /api/<name>, where <name> can be '
272 lines = [_('APIs can be accessed at /api/<name>, where <name> can be '
272 'one of the following:\n')]
273 'one of the following:\n')]
273 if availableapis:
274 if availableapis:
274 lines.extend(sorted(availableapis))
275 lines.extend(sorted(availableapis))
275 else:
276 else:
276 lines.append(_('(no available APIs)\n'))
277 lines.append(_('(no available APIs)\n'))
277 res.setbodybytes(b'\n'.join(lines))
278 res.setbodybytes(b'\n'.join(lines))
278 return
279 return
279
280
280 proto = req.dispatchparts[1]
281 proto = req.dispatchparts[1]
281
282
282 if proto not in API_HANDLERS:
283 if proto not in API_HANDLERS:
283 res.status = b'404 Not Found'
284 res.status = b'404 Not Found'
284 res.headers[b'Content-Type'] = b'text/plain'
285 res.headers[b'Content-Type'] = b'text/plain'
285 res.setbodybytes(_('Unknown API: %s\nKnown APIs: %s') % (
286 res.setbodybytes(_('Unknown API: %s\nKnown APIs: %s') % (
286 proto, b', '.join(sorted(availableapis))))
287 proto, b', '.join(sorted(availableapis))))
287 return
288 return
288
289
289 if proto not in availableapis:
290 if proto not in availableapis:
290 res.status = b'404 Not Found'
291 res.status = b'404 Not Found'
291 res.headers[b'Content-Type'] = b'text/plain'
292 res.headers[b'Content-Type'] = b'text/plain'
292 res.setbodybytes(_('API %s not enabled\n') % proto)
293 res.setbodybytes(_('API %s not enabled\n') % proto)
293 return
294 return
294
295
295 API_HANDLERS[proto]['handler'](rctx, req, res, checkperm,
296 API_HANDLERS[proto]['handler'](rctx, req, res, checkperm,
296 req.dispatchparts[2:])
297 req.dispatchparts[2:])
297
298
298 # Maps API name to metadata so custom API can be registered.
299 # Maps API name to metadata so custom API can be registered.
299 # Keys are:
300 # Keys are:
300 #
301 #
301 # config
302 # config
302 # Config option that controls whether service is enabled.
303 # Config option that controls whether service is enabled.
303 # handler
304 # handler
304 # Callable receiving (rctx, req, res, checkperm, urlparts) that is called
305 # Callable receiving (rctx, req, res, checkperm, urlparts) that is called
305 # when a request to this API is received.
306 # when a request to this API is received.
306 # apidescriptor
307 # apidescriptor
307 # Callable receiving (req, repo) that is called to obtain an API
308 # Callable receiving (req, repo) that is called to obtain an API
308 # descriptor for this service. The response must be serializable to CBOR.
309 # descriptor for this service. The response must be serializable to CBOR.
309 API_HANDLERS = {
310 API_HANDLERS = {
310 wireprotov2server.HTTP_WIREPROTO_V2: {
311 wireprotov2server.HTTP_WIREPROTO_V2: {
311 'config': ('experimental', 'web.api.http-v2'),
312 'config': ('experimental', 'web.api.http-v2'),
312 'handler': wireprotov2server.handlehttpv2request,
313 'handler': wireprotov2server.handlehttpv2request,
313 'apidescriptor': wireprotov2server.httpv2apidescriptor,
314 'apidescriptor': wireprotov2server.httpv2apidescriptor,
314 },
315 },
315 }
316 }
316
317
317 def _httpresponsetype(ui, proto, prefer_uncompressed):
318 def _httpresponsetype(ui, proto, prefer_uncompressed):
318 """Determine the appropriate response type and compression settings.
319 """Determine the appropriate response type and compression settings.
319
320
320 Returns a tuple of (mediatype, compengine, engineopts).
321 Returns a tuple of (mediatype, compengine, engineopts).
321 """
322 """
322 # Determine the response media type and compression engine based
323 # Determine the response media type and compression engine based
323 # on the request parameters.
324 # on the request parameters.
324
325
325 if '0.2' in proto.getprotocaps():
326 if '0.2' in proto.getprotocaps():
326 # All clients are expected to support uncompressed data.
327 # All clients are expected to support uncompressed data.
327 if prefer_uncompressed:
328 if prefer_uncompressed:
328 return HGTYPE2, util._noopengine(), {}
329 return HGTYPE2, util._noopengine(), {}
329
330
330 # Now find an agreed upon compression format.
331 # Now find an agreed upon compression format.
331 compformats = wireproto.clientcompressionsupport(proto)
332 compformats = wireproto.clientcompressionsupport(proto)
332 for engine in wireproto.supportedcompengines(ui, util.SERVERROLE):
333 for engine in wireprototypes.supportedcompengines(ui, util.SERVERROLE):
333 if engine.wireprotosupport().name in compformats:
334 if engine.wireprotosupport().name in compformats:
334 opts = {}
335 opts = {}
335 level = ui.configint('server', '%slevel' % engine.name())
336 level = ui.configint('server', '%slevel' % engine.name())
336 if level is not None:
337 if level is not None:
337 opts['level'] = level
338 opts['level'] = level
338
339
339 return HGTYPE2, engine, opts
340 return HGTYPE2, engine, opts
340
341
341 # No mutually supported compression format. Fall back to the
342 # No mutually supported compression format. Fall back to the
342 # legacy protocol.
343 # legacy protocol.
343
344
344 # Don't allow untrusted settings because disabling compression or
345 # Don't allow untrusted settings because disabling compression or
345 # setting a very high compression level could lead to flooding
346 # setting a very high compression level could lead to flooding
346 # the server's network or CPU.
347 # the server's network or CPU.
347 opts = {'level': ui.configint('server', 'zliblevel')}
348 opts = {'level': ui.configint('server', 'zliblevel')}
348 return HGTYPE, util.compengines['zlib'], opts
349 return HGTYPE, util.compengines['zlib'], opts
349
350
350 def processcapabilitieshandshake(repo, req, res, proto):
351 def processcapabilitieshandshake(repo, req, res, proto):
351 """Called during a ?cmd=capabilities request.
352 """Called during a ?cmd=capabilities request.
352
353
353 If the client is advertising support for a newer protocol, we send
354 If the client is advertising support for a newer protocol, we send
354 a CBOR response with information about available services. If no
355 a CBOR response with information about available services. If no
355 advertised services are available, we don't handle the request.
356 advertised services are available, we don't handle the request.
356 """
357 """
357 # Fall back to old behavior unless the API server is enabled.
358 # Fall back to old behavior unless the API server is enabled.
358 if not repo.ui.configbool('experimental', 'web.apiserver'):
359 if not repo.ui.configbool('experimental', 'web.apiserver'):
359 return False
360 return False
360
361
361 clientapis = decodevaluefromheaders(req, b'X-HgUpgrade')
362 clientapis = decodevaluefromheaders(req, b'X-HgUpgrade')
362 protocaps = decodevaluefromheaders(req, b'X-HgProto')
363 protocaps = decodevaluefromheaders(req, b'X-HgProto')
363 if not clientapis or not protocaps:
364 if not clientapis or not protocaps:
364 return False
365 return False
365
366
366 # We currently only support CBOR responses.
367 # We currently only support CBOR responses.
367 protocaps = set(protocaps.split(' '))
368 protocaps = set(protocaps.split(' '))
368 if b'cbor' not in protocaps:
369 if b'cbor' not in protocaps:
369 return False
370 return False
370
371
371 descriptors = {}
372 descriptors = {}
372
373
373 for api in sorted(set(clientapis.split()) & _availableapis(repo)):
374 for api in sorted(set(clientapis.split()) & _availableapis(repo)):
374 handler = API_HANDLERS[api]
375 handler = API_HANDLERS[api]
375
376
376 descriptorfn = handler.get('apidescriptor')
377 descriptorfn = handler.get('apidescriptor')
377 if not descriptorfn:
378 if not descriptorfn:
378 continue
379 continue
379
380
380 descriptors[api] = descriptorfn(req, repo)
381 descriptors[api] = descriptorfn(req, repo)
381
382
382 v1caps = wireproto.dispatch(repo, proto, 'capabilities')
383 v1caps = wireproto.dispatch(repo, proto, 'capabilities')
383 assert isinstance(v1caps, wireprototypes.bytesresponse)
384 assert isinstance(v1caps, wireprototypes.bytesresponse)
384
385
385 m = {
386 m = {
386 # TODO allow this to be configurable.
387 # TODO allow this to be configurable.
387 'apibase': 'api/',
388 'apibase': 'api/',
388 'apis': descriptors,
389 'apis': descriptors,
389 'v1capabilities': v1caps.data,
390 'v1capabilities': v1caps.data,
390 }
391 }
391
392
392 res.status = b'200 OK'
393 res.status = b'200 OK'
393 res.headers[b'Content-Type'] = b'application/mercurial-cbor'
394 res.headers[b'Content-Type'] = b'application/mercurial-cbor'
394 res.setbodybytes(cbor.dumps(m, canonical=True))
395 res.setbodybytes(cbor.dumps(m, canonical=True))
395
396
396 return True
397 return True
397
398
398 def _callhttp(repo, req, res, proto, cmd):
399 def _callhttp(repo, req, res, proto, cmd):
399 # Avoid cycle involving hg module.
400 # Avoid cycle involving hg module.
400 from .hgweb import common as hgwebcommon
401 from .hgweb import common as hgwebcommon
401
402
402 def genversion2(gen, engine, engineopts):
403 def genversion2(gen, engine, engineopts):
403 # application/mercurial-0.2 always sends a payload header
404 # application/mercurial-0.2 always sends a payload header
404 # identifying the compression engine.
405 # identifying the compression engine.
405 name = engine.wireprotosupport().name
406 name = engine.wireprotosupport().name
406 assert 0 < len(name) < 256
407 assert 0 < len(name) < 256
407 yield struct.pack('B', len(name))
408 yield struct.pack('B', len(name))
408 yield name
409 yield name
409
410
410 for chunk in gen:
411 for chunk in gen:
411 yield chunk
412 yield chunk
412
413
413 def setresponse(code, contenttype, bodybytes=None, bodygen=None):
414 def setresponse(code, contenttype, bodybytes=None, bodygen=None):
414 if code == HTTP_OK:
415 if code == HTTP_OK:
415 res.status = '200 Script output follows'
416 res.status = '200 Script output follows'
416 else:
417 else:
417 res.status = hgwebcommon.statusmessage(code)
418 res.status = hgwebcommon.statusmessage(code)
418
419
419 res.headers['Content-Type'] = contenttype
420 res.headers['Content-Type'] = contenttype
420
421
421 if bodybytes is not None:
422 if bodybytes is not None:
422 res.setbodybytes(bodybytes)
423 res.setbodybytes(bodybytes)
423 if bodygen is not None:
424 if bodygen is not None:
424 res.setbodygen(bodygen)
425 res.setbodygen(bodygen)
425
426
426 if not wireproto.commands.commandavailable(cmd, proto):
427 if not wireproto.commands.commandavailable(cmd, proto):
427 setresponse(HTTP_OK, HGERRTYPE,
428 setresponse(HTTP_OK, HGERRTYPE,
428 _('requested wire protocol command is not available over '
429 _('requested wire protocol command is not available over '
429 'HTTP'))
430 'HTTP'))
430 return
431 return
431
432
432 proto.checkperm(wireproto.commands[cmd].permission)
433 proto.checkperm(wireproto.commands[cmd].permission)
433
434
434 # Possibly handle a modern client wanting to switch protocols.
435 # Possibly handle a modern client wanting to switch protocols.
435 if (cmd == 'capabilities' and
436 if (cmd == 'capabilities' and
436 processcapabilitieshandshake(repo, req, res, proto)):
437 processcapabilitieshandshake(repo, req, res, proto)):
437
438
438 return
439 return
439
440
440 rsp = wireproto.dispatch(repo, proto, cmd)
441 rsp = wireproto.dispatch(repo, proto, cmd)
441
442
442 if isinstance(rsp, bytes):
443 if isinstance(rsp, bytes):
443 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
444 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
444 elif isinstance(rsp, wireprototypes.bytesresponse):
445 elif isinstance(rsp, wireprototypes.bytesresponse):
445 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp.data)
446 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp.data)
446 elif isinstance(rsp, wireprototypes.streamreslegacy):
447 elif isinstance(rsp, wireprototypes.streamreslegacy):
447 setresponse(HTTP_OK, HGTYPE, bodygen=rsp.gen)
448 setresponse(HTTP_OK, HGTYPE, bodygen=rsp.gen)
448 elif isinstance(rsp, wireprototypes.streamres):
449 elif isinstance(rsp, wireprototypes.streamres):
449 gen = rsp.gen
450 gen = rsp.gen
450
451
451 # This code for compression should not be streamres specific. It
452 # This code for compression should not be streamres specific. It
452 # is here because we only compress streamres at the moment.
453 # is here because we only compress streamres at the moment.
453 mediatype, engine, engineopts = _httpresponsetype(
454 mediatype, engine, engineopts = _httpresponsetype(
454 repo.ui, proto, rsp.prefer_uncompressed)
455 repo.ui, proto, rsp.prefer_uncompressed)
455 gen = engine.compressstream(gen, engineopts)
456 gen = engine.compressstream(gen, engineopts)
456
457
457 if mediatype == HGTYPE2:
458 if mediatype == HGTYPE2:
458 gen = genversion2(gen, engine, engineopts)
459 gen = genversion2(gen, engine, engineopts)
459
460
460 setresponse(HTTP_OK, mediatype, bodygen=gen)
461 setresponse(HTTP_OK, mediatype, bodygen=gen)
461 elif isinstance(rsp, wireprototypes.pushres):
462 elif isinstance(rsp, wireprototypes.pushres):
462 rsp = '%d\n%s' % (rsp.res, rsp.output)
463 rsp = '%d\n%s' % (rsp.res, rsp.output)
463 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
464 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
464 elif isinstance(rsp, wireprototypes.pusherr):
465 elif isinstance(rsp, wireprototypes.pusherr):
465 rsp = '0\n%s\n' % rsp.res
466 rsp = '0\n%s\n' % rsp.res
466 res.drain = True
467 res.drain = True
467 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
468 setresponse(HTTP_OK, HGTYPE, bodybytes=rsp)
468 elif isinstance(rsp, wireprototypes.ooberror):
469 elif isinstance(rsp, wireprototypes.ooberror):
469 setresponse(HTTP_OK, HGERRTYPE, bodybytes=rsp.message)
470 setresponse(HTTP_OK, HGERRTYPE, bodybytes=rsp.message)
470 else:
471 else:
471 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
472 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
472
473
473 def _sshv1respondbytes(fout, value):
474 def _sshv1respondbytes(fout, value):
474 """Send a bytes response for protocol version 1."""
475 """Send a bytes response for protocol version 1."""
475 fout.write('%d\n' % len(value))
476 fout.write('%d\n' % len(value))
476 fout.write(value)
477 fout.write(value)
477 fout.flush()
478 fout.flush()
478
479
479 def _sshv1respondstream(fout, source):
480 def _sshv1respondstream(fout, source):
480 write = fout.write
481 write = fout.write
481 for chunk in source.gen:
482 for chunk in source.gen:
482 write(chunk)
483 write(chunk)
483 fout.flush()
484 fout.flush()
484
485
485 def _sshv1respondooberror(fout, ferr, rsp):
486 def _sshv1respondooberror(fout, ferr, rsp):
486 ferr.write(b'%s\n-\n' % rsp)
487 ferr.write(b'%s\n-\n' % rsp)
487 ferr.flush()
488 ferr.flush()
488 fout.write(b'\n')
489 fout.write(b'\n')
489 fout.flush()
490 fout.flush()
490
491
491 @zi.implementer(wireprototypes.baseprotocolhandler)
492 @zi.implementer(wireprototypes.baseprotocolhandler)
492 class sshv1protocolhandler(object):
493 class sshv1protocolhandler(object):
493 """Handler for requests services via version 1 of SSH protocol."""
494 """Handler for requests services via version 1 of SSH protocol."""
494 def __init__(self, ui, fin, fout):
495 def __init__(self, ui, fin, fout):
495 self._ui = ui
496 self._ui = ui
496 self._fin = fin
497 self._fin = fin
497 self._fout = fout
498 self._fout = fout
498 self._protocaps = set()
499 self._protocaps = set()
499
500
500 @property
501 @property
501 def name(self):
502 def name(self):
502 return wireprototypes.SSHV1
503 return wireprototypes.SSHV1
503
504
504 def getargs(self, args):
505 def getargs(self, args):
505 data = {}
506 data = {}
506 keys = args.split()
507 keys = args.split()
507 for n in xrange(len(keys)):
508 for n in xrange(len(keys)):
508 argline = self._fin.readline()[:-1]
509 argline = self._fin.readline()[:-1]
509 arg, l = argline.split()
510 arg, l = argline.split()
510 if arg not in keys:
511 if arg not in keys:
511 raise error.Abort(_("unexpected parameter %r") % arg)
512 raise error.Abort(_("unexpected parameter %r") % arg)
512 if arg == '*':
513 if arg == '*':
513 star = {}
514 star = {}
514 for k in xrange(int(l)):
515 for k in xrange(int(l)):
515 argline = self._fin.readline()[:-1]
516 argline = self._fin.readline()[:-1]
516 arg, l = argline.split()
517 arg, l = argline.split()
517 val = self._fin.read(int(l))
518 val = self._fin.read(int(l))
518 star[arg] = val
519 star[arg] = val
519 data['*'] = star
520 data['*'] = star
520 else:
521 else:
521 val = self._fin.read(int(l))
522 val = self._fin.read(int(l))
522 data[arg] = val
523 data[arg] = val
523 return [data[k] for k in keys]
524 return [data[k] for k in keys]
524
525
525 def getprotocaps(self):
526 def getprotocaps(self):
526 return self._protocaps
527 return self._protocaps
527
528
528 def getpayload(self):
529 def getpayload(self):
529 # We initially send an empty response. This tells the client it is
530 # We initially send an empty response. This tells the client it is
530 # OK to start sending data. If a client sees any other response, it
531 # OK to start sending data. If a client sees any other response, it
531 # interprets it as an error.
532 # interprets it as an error.
532 _sshv1respondbytes(self._fout, b'')
533 _sshv1respondbytes(self._fout, b'')
533
534
534 # The file is in the form:
535 # The file is in the form:
535 #
536 #
536 # <chunk size>\n<chunk>
537 # <chunk size>\n<chunk>
537 # ...
538 # ...
538 # 0\n
539 # 0\n
539 count = int(self._fin.readline())
540 count = int(self._fin.readline())
540 while count:
541 while count:
541 yield self._fin.read(count)
542 yield self._fin.read(count)
542 count = int(self._fin.readline())
543 count = int(self._fin.readline())
543
544
544 @contextlib.contextmanager
545 @contextlib.contextmanager
545 def mayberedirectstdio(self):
546 def mayberedirectstdio(self):
546 yield None
547 yield None
547
548
548 def client(self):
549 def client(self):
549 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
550 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
550 return 'remote:ssh:' + client
551 return 'remote:ssh:' + client
551
552
552 def addcapabilities(self, repo, caps):
553 def addcapabilities(self, repo, caps):
553 if self.name == wireprototypes.SSHV1:
554 if self.name == wireprototypes.SSHV1:
554 caps.append(b'protocaps')
555 caps.append(b'protocaps')
555 caps.append(b'batch')
556 caps.append(b'batch')
556 return caps
557 return caps
557
558
558 def checkperm(self, perm):
559 def checkperm(self, perm):
559 pass
560 pass
560
561
561 class sshv2protocolhandler(sshv1protocolhandler):
562 class sshv2protocolhandler(sshv1protocolhandler):
562 """Protocol handler for version 2 of the SSH protocol."""
563 """Protocol handler for version 2 of the SSH protocol."""
563
564
564 @property
565 @property
565 def name(self):
566 def name(self):
566 return wireprototypes.SSHV2
567 return wireprototypes.SSHV2
567
568
568 def addcapabilities(self, repo, caps):
569 def addcapabilities(self, repo, caps):
569 return caps
570 return caps
570
571
571 def _runsshserver(ui, repo, fin, fout, ev):
572 def _runsshserver(ui, repo, fin, fout, ev):
572 # This function operates like a state machine of sorts. The following
573 # This function operates like a state machine of sorts. The following
573 # states are defined:
574 # states are defined:
574 #
575 #
575 # protov1-serving
576 # protov1-serving
576 # Server is in protocol version 1 serving mode. Commands arrive on
577 # Server is in protocol version 1 serving mode. Commands arrive on
577 # new lines. These commands are processed in this state, one command
578 # new lines. These commands are processed in this state, one command
578 # after the other.
579 # after the other.
579 #
580 #
580 # protov2-serving
581 # protov2-serving
581 # Server is in protocol version 2 serving mode.
582 # Server is in protocol version 2 serving mode.
582 #
583 #
583 # upgrade-initial
584 # upgrade-initial
584 # The server is going to process an upgrade request.
585 # The server is going to process an upgrade request.
585 #
586 #
586 # upgrade-v2-filter-legacy-handshake
587 # upgrade-v2-filter-legacy-handshake
587 # The protocol is being upgraded to version 2. The server is expecting
588 # The protocol is being upgraded to version 2. The server is expecting
588 # the legacy handshake from version 1.
589 # the legacy handshake from version 1.
589 #
590 #
590 # upgrade-v2-finish
591 # upgrade-v2-finish
591 # The upgrade to version 2 of the protocol is imminent.
592 # The upgrade to version 2 of the protocol is imminent.
592 #
593 #
593 # shutdown
594 # shutdown
594 # The server is shutting down, possibly in reaction to a client event.
595 # The server is shutting down, possibly in reaction to a client event.
595 #
596 #
596 # And here are their transitions:
597 # And here are their transitions:
597 #
598 #
598 # protov1-serving -> shutdown
599 # protov1-serving -> shutdown
599 # When server receives an empty request or encounters another
600 # When server receives an empty request or encounters another
600 # error.
601 # error.
601 #
602 #
602 # protov1-serving -> upgrade-initial
603 # protov1-serving -> upgrade-initial
603 # An upgrade request line was seen.
604 # An upgrade request line was seen.
604 #
605 #
605 # upgrade-initial -> upgrade-v2-filter-legacy-handshake
606 # upgrade-initial -> upgrade-v2-filter-legacy-handshake
606 # Upgrade to version 2 in progress. Server is expecting to
607 # Upgrade to version 2 in progress. Server is expecting to
607 # process a legacy handshake.
608 # process a legacy handshake.
608 #
609 #
609 # upgrade-v2-filter-legacy-handshake -> shutdown
610 # upgrade-v2-filter-legacy-handshake -> shutdown
610 # Client did not fulfill upgrade handshake requirements.
611 # Client did not fulfill upgrade handshake requirements.
611 #
612 #
612 # upgrade-v2-filter-legacy-handshake -> upgrade-v2-finish
613 # upgrade-v2-filter-legacy-handshake -> upgrade-v2-finish
613 # Client fulfilled version 2 upgrade requirements. Finishing that
614 # Client fulfilled version 2 upgrade requirements. Finishing that
614 # upgrade.
615 # upgrade.
615 #
616 #
616 # upgrade-v2-finish -> protov2-serving
617 # upgrade-v2-finish -> protov2-serving
617 # Protocol upgrade to version 2 complete. Server can now speak protocol
618 # Protocol upgrade to version 2 complete. Server can now speak protocol
618 # version 2.
619 # version 2.
619 #
620 #
620 # protov2-serving -> protov1-serving
621 # protov2-serving -> protov1-serving
621 # Ths happens by default since protocol version 2 is the same as
622 # Ths happens by default since protocol version 2 is the same as
622 # version 1 except for the handshake.
623 # version 1 except for the handshake.
623
624
624 state = 'protov1-serving'
625 state = 'protov1-serving'
625 proto = sshv1protocolhandler(ui, fin, fout)
626 proto = sshv1protocolhandler(ui, fin, fout)
626 protoswitched = False
627 protoswitched = False
627
628
628 while not ev.is_set():
629 while not ev.is_set():
629 if state == 'protov1-serving':
630 if state == 'protov1-serving':
630 # Commands are issued on new lines.
631 # Commands are issued on new lines.
631 request = fin.readline()[:-1]
632 request = fin.readline()[:-1]
632
633
633 # Empty lines signal to terminate the connection.
634 # Empty lines signal to terminate the connection.
634 if not request:
635 if not request:
635 state = 'shutdown'
636 state = 'shutdown'
636 continue
637 continue
637
638
638 # It looks like a protocol upgrade request. Transition state to
639 # It looks like a protocol upgrade request. Transition state to
639 # handle it.
640 # handle it.
640 if request.startswith(b'upgrade '):
641 if request.startswith(b'upgrade '):
641 if protoswitched:
642 if protoswitched:
642 _sshv1respondooberror(fout, ui.ferr,
643 _sshv1respondooberror(fout, ui.ferr,
643 b'cannot upgrade protocols multiple '
644 b'cannot upgrade protocols multiple '
644 b'times')
645 b'times')
645 state = 'shutdown'
646 state = 'shutdown'
646 continue
647 continue
647
648
648 state = 'upgrade-initial'
649 state = 'upgrade-initial'
649 continue
650 continue
650
651
651 available = wireproto.commands.commandavailable(request, proto)
652 available = wireproto.commands.commandavailable(request, proto)
652
653
653 # This command isn't available. Send an empty response and go
654 # This command isn't available. Send an empty response and go
654 # back to waiting for a new command.
655 # back to waiting for a new command.
655 if not available:
656 if not available:
656 _sshv1respondbytes(fout, b'')
657 _sshv1respondbytes(fout, b'')
657 continue
658 continue
658
659
659 rsp = wireproto.dispatch(repo, proto, request)
660 rsp = wireproto.dispatch(repo, proto, request)
660
661
661 if isinstance(rsp, bytes):
662 if isinstance(rsp, bytes):
662 _sshv1respondbytes(fout, rsp)
663 _sshv1respondbytes(fout, rsp)
663 elif isinstance(rsp, wireprototypes.bytesresponse):
664 elif isinstance(rsp, wireprototypes.bytesresponse):
664 _sshv1respondbytes(fout, rsp.data)
665 _sshv1respondbytes(fout, rsp.data)
665 elif isinstance(rsp, wireprototypes.streamres):
666 elif isinstance(rsp, wireprototypes.streamres):
666 _sshv1respondstream(fout, rsp)
667 _sshv1respondstream(fout, rsp)
667 elif isinstance(rsp, wireprototypes.streamreslegacy):
668 elif isinstance(rsp, wireprototypes.streamreslegacy):
668 _sshv1respondstream(fout, rsp)
669 _sshv1respondstream(fout, rsp)
669 elif isinstance(rsp, wireprototypes.pushres):
670 elif isinstance(rsp, wireprototypes.pushres):
670 _sshv1respondbytes(fout, b'')
671 _sshv1respondbytes(fout, b'')
671 _sshv1respondbytes(fout, b'%d' % rsp.res)
672 _sshv1respondbytes(fout, b'%d' % rsp.res)
672 elif isinstance(rsp, wireprototypes.pusherr):
673 elif isinstance(rsp, wireprototypes.pusherr):
673 _sshv1respondbytes(fout, rsp.res)
674 _sshv1respondbytes(fout, rsp.res)
674 elif isinstance(rsp, wireprototypes.ooberror):
675 elif isinstance(rsp, wireprototypes.ooberror):
675 _sshv1respondooberror(fout, ui.ferr, rsp.message)
676 _sshv1respondooberror(fout, ui.ferr, rsp.message)
676 else:
677 else:
677 raise error.ProgrammingError('unhandled response type from '
678 raise error.ProgrammingError('unhandled response type from '
678 'wire protocol command: %s' % rsp)
679 'wire protocol command: %s' % rsp)
679
680
680 # For now, protocol version 2 serving just goes back to version 1.
681 # For now, protocol version 2 serving just goes back to version 1.
681 elif state == 'protov2-serving':
682 elif state == 'protov2-serving':
682 state = 'protov1-serving'
683 state = 'protov1-serving'
683 continue
684 continue
684
685
685 elif state == 'upgrade-initial':
686 elif state == 'upgrade-initial':
686 # We should never transition into this state if we've switched
687 # We should never transition into this state if we've switched
687 # protocols.
688 # protocols.
688 assert not protoswitched
689 assert not protoswitched
689 assert proto.name == wireprototypes.SSHV1
690 assert proto.name == wireprototypes.SSHV1
690
691
691 # Expected: upgrade <token> <capabilities>
692 # Expected: upgrade <token> <capabilities>
692 # If we get something else, the request is malformed. It could be
693 # If we get something else, the request is malformed. It could be
693 # from a future client that has altered the upgrade line content.
694 # from a future client that has altered the upgrade line content.
694 # We treat this as an unknown command.
695 # We treat this as an unknown command.
695 try:
696 try:
696 token, caps = request.split(b' ')[1:]
697 token, caps = request.split(b' ')[1:]
697 except ValueError:
698 except ValueError:
698 _sshv1respondbytes(fout, b'')
699 _sshv1respondbytes(fout, b'')
699 state = 'protov1-serving'
700 state = 'protov1-serving'
700 continue
701 continue
701
702
702 # Send empty response if we don't support upgrading protocols.
703 # Send empty response if we don't support upgrading protocols.
703 if not ui.configbool('experimental', 'sshserver.support-v2'):
704 if not ui.configbool('experimental', 'sshserver.support-v2'):
704 _sshv1respondbytes(fout, b'')
705 _sshv1respondbytes(fout, b'')
705 state = 'protov1-serving'
706 state = 'protov1-serving'
706 continue
707 continue
707
708
708 try:
709 try:
709 caps = urlreq.parseqs(caps)
710 caps = urlreq.parseqs(caps)
710 except ValueError:
711 except ValueError:
711 _sshv1respondbytes(fout, b'')
712 _sshv1respondbytes(fout, b'')
712 state = 'protov1-serving'
713 state = 'protov1-serving'
713 continue
714 continue
714
715
715 # We don't see an upgrade request to protocol version 2. Ignore
716 # We don't see an upgrade request to protocol version 2. Ignore
716 # the upgrade request.
717 # the upgrade request.
717 wantedprotos = caps.get(b'proto', [b''])[0]
718 wantedprotos = caps.get(b'proto', [b''])[0]
718 if SSHV2 not in wantedprotos:
719 if SSHV2 not in wantedprotos:
719 _sshv1respondbytes(fout, b'')
720 _sshv1respondbytes(fout, b'')
720 state = 'protov1-serving'
721 state = 'protov1-serving'
721 continue
722 continue
722
723
723 # It looks like we can honor this upgrade request to protocol 2.
724 # It looks like we can honor this upgrade request to protocol 2.
724 # Filter the rest of the handshake protocol request lines.
725 # Filter the rest of the handshake protocol request lines.
725 state = 'upgrade-v2-filter-legacy-handshake'
726 state = 'upgrade-v2-filter-legacy-handshake'
726 continue
727 continue
727
728
728 elif state == 'upgrade-v2-filter-legacy-handshake':
729 elif state == 'upgrade-v2-filter-legacy-handshake':
729 # Client should have sent legacy handshake after an ``upgrade``
730 # Client should have sent legacy handshake after an ``upgrade``
730 # request. Expected lines:
731 # request. Expected lines:
731 #
732 #
732 # hello
733 # hello
733 # between
734 # between
734 # pairs 81
735 # pairs 81
735 # 0000...-0000...
736 # 0000...-0000...
736
737
737 ok = True
738 ok = True
738 for line in (b'hello', b'between', b'pairs 81'):
739 for line in (b'hello', b'between', b'pairs 81'):
739 request = fin.readline()[:-1]
740 request = fin.readline()[:-1]
740
741
741 if request != line:
742 if request != line:
742 _sshv1respondooberror(fout, ui.ferr,
743 _sshv1respondooberror(fout, ui.ferr,
743 b'malformed handshake protocol: '
744 b'malformed handshake protocol: '
744 b'missing %s' % line)
745 b'missing %s' % line)
745 ok = False
746 ok = False
746 state = 'shutdown'
747 state = 'shutdown'
747 break
748 break
748
749
749 if not ok:
750 if not ok:
750 continue
751 continue
751
752
752 request = fin.read(81)
753 request = fin.read(81)
753 if request != b'%s-%s' % (b'0' * 40, b'0' * 40):
754 if request != b'%s-%s' % (b'0' * 40, b'0' * 40):
754 _sshv1respondooberror(fout, ui.ferr,
755 _sshv1respondooberror(fout, ui.ferr,
755 b'malformed handshake protocol: '
756 b'malformed handshake protocol: '
756 b'missing between argument value')
757 b'missing between argument value')
757 state = 'shutdown'
758 state = 'shutdown'
758 continue
759 continue
759
760
760 state = 'upgrade-v2-finish'
761 state = 'upgrade-v2-finish'
761 continue
762 continue
762
763
763 elif state == 'upgrade-v2-finish':
764 elif state == 'upgrade-v2-finish':
764 # Send the upgrade response.
765 # Send the upgrade response.
765 fout.write(b'upgraded %s %s\n' % (token, SSHV2))
766 fout.write(b'upgraded %s %s\n' % (token, SSHV2))
766 servercaps = wireproto.capabilities(repo, proto)
767 servercaps = wireproto.capabilities(repo, proto)
767 rsp = b'capabilities: %s' % servercaps.data
768 rsp = b'capabilities: %s' % servercaps.data
768 fout.write(b'%d\n%s\n' % (len(rsp), rsp))
769 fout.write(b'%d\n%s\n' % (len(rsp), rsp))
769 fout.flush()
770 fout.flush()
770
771
771 proto = sshv2protocolhandler(ui, fin, fout)
772 proto = sshv2protocolhandler(ui, fin, fout)
772 protoswitched = True
773 protoswitched = True
773
774
774 state = 'protov2-serving'
775 state = 'protov2-serving'
775 continue
776 continue
776
777
777 elif state == 'shutdown':
778 elif state == 'shutdown':
778 break
779 break
779
780
780 else:
781 else:
781 raise error.ProgrammingError('unhandled ssh server state: %s' %
782 raise error.ProgrammingError('unhandled ssh server state: %s' %
782 state)
783 state)
783
784
784 class sshserver(object):
785 class sshserver(object):
785 def __init__(self, ui, repo, logfh=None):
786 def __init__(self, ui, repo, logfh=None):
786 self._ui = ui
787 self._ui = ui
787 self._repo = repo
788 self._repo = repo
788 self._fin = ui.fin
789 self._fin = ui.fin
789 self._fout = ui.fout
790 self._fout = ui.fout
790
791
791 # Log write I/O to stdout and stderr if configured.
792 # Log write I/O to stdout and stderr if configured.
792 if logfh:
793 if logfh:
793 self._fout = util.makeloggingfileobject(
794 self._fout = util.makeloggingfileobject(
794 logfh, self._fout, 'o', logdata=True)
795 logfh, self._fout, 'o', logdata=True)
795 ui.ferr = util.makeloggingfileobject(
796 ui.ferr = util.makeloggingfileobject(
796 logfh, ui.ferr, 'e', logdata=True)
797 logfh, ui.ferr, 'e', logdata=True)
797
798
798 hook.redirect(True)
799 hook.redirect(True)
799 ui.fout = repo.ui.fout = ui.ferr
800 ui.fout = repo.ui.fout = ui.ferr
800
801
801 # Prevent insertion/deletion of CRs
802 # Prevent insertion/deletion of CRs
802 procutil.setbinary(self._fin)
803 procutil.setbinary(self._fin)
803 procutil.setbinary(self._fout)
804 procutil.setbinary(self._fout)
804
805
805 def serve_forever(self):
806 def serve_forever(self):
806 self.serveuntil(threading.Event())
807 self.serveuntil(threading.Event())
807 sys.exit(0)
808 sys.exit(0)
808
809
809 def serveuntil(self, ev):
810 def serveuntil(self, ev):
810 """Serve until a threading.Event is set."""
811 """Serve until a threading.Event is set."""
811 _runsshserver(self._ui, self._repo, self._fin, self._fout, ev)
812 _runsshserver(self._ui, self._repo, self._fin, self._fout, ev)
@@ -1,321 +1,375
1 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
1 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
2 #
2 #
3 # This software may be used and distributed according to the terms of the
3 # This software may be used and distributed according to the terms of the
4 # GNU General Public License version 2 or any later version.
4 # GNU General Public License version 2 or any later version.
5
5
6 from __future__ import absolute_import
6 from __future__ import absolute_import
7
7
8 from .node import (
8 from .node import (
9 bin,
9 bin,
10 hex,
10 hex,
11 )
11 )
12 from .thirdparty.zope import (
12 from .thirdparty.zope import (
13 interface as zi,
13 interface as zi,
14 )
14 )
15 from .i18n import _
16 from . import (
17 error,
18 util,
19 )
15
20
16 # Names of the SSH protocol implementations.
21 # Names of the SSH protocol implementations.
17 SSHV1 = 'ssh-v1'
22 SSHV1 = 'ssh-v1'
18 # These are advertised over the wire. Increment the counters at the end
23 # These are advertised over the wire. Increment the counters at the end
19 # to reflect BC breakages.
24 # to reflect BC breakages.
20 SSHV2 = 'exp-ssh-v2-0001'
25 SSHV2 = 'exp-ssh-v2-0001'
21 HTTP_WIREPROTO_V2 = 'exp-http-v2-0001'
26 HTTP_WIREPROTO_V2 = 'exp-http-v2-0001'
22
27
23 # All available wire protocol transports.
28 # All available wire protocol transports.
24 TRANSPORTS = {
29 TRANSPORTS = {
25 SSHV1: {
30 SSHV1: {
26 'transport': 'ssh',
31 'transport': 'ssh',
27 'version': 1,
32 'version': 1,
28 },
33 },
29 SSHV2: {
34 SSHV2: {
30 'transport': 'ssh',
35 'transport': 'ssh',
31 # TODO mark as version 2 once all commands are implemented.
36 # TODO mark as version 2 once all commands are implemented.
32 'version': 1,
37 'version': 1,
33 },
38 },
34 'http-v1': {
39 'http-v1': {
35 'transport': 'http',
40 'transport': 'http',
36 'version': 1,
41 'version': 1,
37 },
42 },
38 HTTP_WIREPROTO_V2: {
43 HTTP_WIREPROTO_V2: {
39 'transport': 'http',
44 'transport': 'http',
40 'version': 2,
45 'version': 2,
41 }
46 }
42 }
47 }
43
48
44 class bytesresponse(object):
49 class bytesresponse(object):
45 """A wire protocol response consisting of raw bytes."""
50 """A wire protocol response consisting of raw bytes."""
46 def __init__(self, data):
51 def __init__(self, data):
47 self.data = data
52 self.data = data
48
53
49 class ooberror(object):
54 class ooberror(object):
50 """wireproto reply: failure of a batch of operation
55 """wireproto reply: failure of a batch of operation
51
56
52 Something failed during a batch call. The error message is stored in
57 Something failed during a batch call. The error message is stored in
53 `self.message`.
58 `self.message`.
54 """
59 """
55 def __init__(self, message):
60 def __init__(self, message):
56 self.message = message
61 self.message = message
57
62
58 class pushres(object):
63 class pushres(object):
59 """wireproto reply: success with simple integer return
64 """wireproto reply: success with simple integer return
60
65
61 The call was successful and returned an integer contained in `self.res`.
66 The call was successful and returned an integer contained in `self.res`.
62 """
67 """
63 def __init__(self, res, output):
68 def __init__(self, res, output):
64 self.res = res
69 self.res = res
65 self.output = output
70 self.output = output
66
71
67 class pusherr(object):
72 class pusherr(object):
68 """wireproto reply: failure
73 """wireproto reply: failure
69
74
70 The call failed. The `self.res` attribute contains the error message.
75 The call failed. The `self.res` attribute contains the error message.
71 """
76 """
72 def __init__(self, res, output):
77 def __init__(self, res, output):
73 self.res = res
78 self.res = res
74 self.output = output
79 self.output = output
75
80
76 class streamres(object):
81 class streamres(object):
77 """wireproto reply: binary stream
82 """wireproto reply: binary stream
78
83
79 The call was successful and the result is a stream.
84 The call was successful and the result is a stream.
80
85
81 Accepts a generator containing chunks of data to be sent to the client.
86 Accepts a generator containing chunks of data to be sent to the client.
82
87
83 ``prefer_uncompressed`` indicates that the data is expected to be
88 ``prefer_uncompressed`` indicates that the data is expected to be
84 uncompressable and that the stream should therefore use the ``none``
89 uncompressable and that the stream should therefore use the ``none``
85 engine.
90 engine.
86 """
91 """
87 def __init__(self, gen=None, prefer_uncompressed=False):
92 def __init__(self, gen=None, prefer_uncompressed=False):
88 self.gen = gen
93 self.gen = gen
89 self.prefer_uncompressed = prefer_uncompressed
94 self.prefer_uncompressed = prefer_uncompressed
90
95
91 class streamreslegacy(object):
96 class streamreslegacy(object):
92 """wireproto reply: uncompressed binary stream
97 """wireproto reply: uncompressed binary stream
93
98
94 The call was successful and the result is a stream.
99 The call was successful and the result is a stream.
95
100
96 Accepts a generator containing chunks of data to be sent to the client.
101 Accepts a generator containing chunks of data to be sent to the client.
97
102
98 Like ``streamres``, but sends an uncompressed data for "version 1" clients
103 Like ``streamres``, but sends an uncompressed data for "version 1" clients
99 using the application/mercurial-0.1 media type.
104 using the application/mercurial-0.1 media type.
100 """
105 """
101 def __init__(self, gen=None):
106 def __init__(self, gen=None):
102 self.gen = gen
107 self.gen = gen
103
108
104 class cborresponse(object):
109 class cborresponse(object):
105 """Encode the response value as CBOR."""
110 """Encode the response value as CBOR."""
106 def __init__(self, v):
111 def __init__(self, v):
107 self.value = v
112 self.value = v
108
113
109 class v2errorresponse(object):
114 class v2errorresponse(object):
110 """Represents a command error for version 2 transports."""
115 """Represents a command error for version 2 transports."""
111 def __init__(self, message, args=None):
116 def __init__(self, message, args=None):
112 self.message = message
117 self.message = message
113 self.args = args
118 self.args = args
114
119
115 class v2streamingresponse(object):
120 class v2streamingresponse(object):
116 """A response whose data is supplied by a generator.
121 """A response whose data is supplied by a generator.
117
122
118 The generator can either consist of data structures to CBOR
123 The generator can either consist of data structures to CBOR
119 encode or a stream of already-encoded bytes.
124 encode or a stream of already-encoded bytes.
120 """
125 """
121 def __init__(self, gen, compressible=True):
126 def __init__(self, gen, compressible=True):
122 self.gen = gen
127 self.gen = gen
123 self.compressible = compressible
128 self.compressible = compressible
124
129
125 # list of nodes encoding / decoding
130 # list of nodes encoding / decoding
126 def decodelist(l, sep=' '):
131 def decodelist(l, sep=' '):
127 if l:
132 if l:
128 return [bin(v) for v in l.split(sep)]
133 return [bin(v) for v in l.split(sep)]
129 return []
134 return []
130
135
131 def encodelist(l, sep=' '):
136 def encodelist(l, sep=' '):
132 try:
137 try:
133 return sep.join(map(hex, l))
138 return sep.join(map(hex, l))
134 except TypeError:
139 except TypeError:
135 raise
140 raise
136
141
137 # batched call argument encoding
142 # batched call argument encoding
138
143
139 def escapebatcharg(plain):
144 def escapebatcharg(plain):
140 return (plain
145 return (plain
141 .replace(':', ':c')
146 .replace(':', ':c')
142 .replace(',', ':o')
147 .replace(',', ':o')
143 .replace(';', ':s')
148 .replace(';', ':s')
144 .replace('=', ':e'))
149 .replace('=', ':e'))
145
150
146 def unescapebatcharg(escaped):
151 def unescapebatcharg(escaped):
147 return (escaped
152 return (escaped
148 .replace(':e', '=')
153 .replace(':e', '=')
149 .replace(':s', ';')
154 .replace(':s', ';')
150 .replace(':o', ',')
155 .replace(':o', ',')
151 .replace(':c', ':'))
156 .replace(':c', ':'))
152
157
153 # mapping of options accepted by getbundle and their types
158 # mapping of options accepted by getbundle and their types
154 #
159 #
155 # Meant to be extended by extensions. It is extensions responsibility to ensure
160 # Meant to be extended by extensions. It is extensions responsibility to ensure
156 # such options are properly processed in exchange.getbundle.
161 # such options are properly processed in exchange.getbundle.
157 #
162 #
158 # supported types are:
163 # supported types are:
159 #
164 #
160 # :nodes: list of binary nodes
165 # :nodes: list of binary nodes
161 # :csv: list of comma-separated values
166 # :csv: list of comma-separated values
162 # :scsv: list of comma-separated values return as set
167 # :scsv: list of comma-separated values return as set
163 # :plain: string with no transformation needed.
168 # :plain: string with no transformation needed.
164 GETBUNDLE_ARGUMENTS = {
169 GETBUNDLE_ARGUMENTS = {
165 'heads': 'nodes',
170 'heads': 'nodes',
166 'bookmarks': 'boolean',
171 'bookmarks': 'boolean',
167 'common': 'nodes',
172 'common': 'nodes',
168 'obsmarkers': 'boolean',
173 'obsmarkers': 'boolean',
169 'phases': 'boolean',
174 'phases': 'boolean',
170 'bundlecaps': 'scsv',
175 'bundlecaps': 'scsv',
171 'listkeys': 'csv',
176 'listkeys': 'csv',
172 'cg': 'boolean',
177 'cg': 'boolean',
173 'cbattempted': 'boolean',
178 'cbattempted': 'boolean',
174 'stream': 'boolean',
179 'stream': 'boolean',
175 }
180 }
176
181
177 class baseprotocolhandler(zi.Interface):
182 class baseprotocolhandler(zi.Interface):
178 """Abstract base class for wire protocol handlers.
183 """Abstract base class for wire protocol handlers.
179
184
180 A wire protocol handler serves as an interface between protocol command
185 A wire protocol handler serves as an interface between protocol command
181 handlers and the wire protocol transport layer. Protocol handlers provide
186 handlers and the wire protocol transport layer. Protocol handlers provide
182 methods to read command arguments, redirect stdio for the duration of
187 methods to read command arguments, redirect stdio for the duration of
183 the request, handle response types, etc.
188 the request, handle response types, etc.
184 """
189 """
185
190
186 name = zi.Attribute(
191 name = zi.Attribute(
187 """The name of the protocol implementation.
192 """The name of the protocol implementation.
188
193
189 Used for uniquely identifying the transport type.
194 Used for uniquely identifying the transport type.
190 """)
195 """)
191
196
192 def getargs(args):
197 def getargs(args):
193 """return the value for arguments in <args>
198 """return the value for arguments in <args>
194
199
195 For version 1 transports, returns a list of values in the same
200 For version 1 transports, returns a list of values in the same
196 order they appear in ``args``. For version 2 transports, returns
201 order they appear in ``args``. For version 2 transports, returns
197 a dict mapping argument name to value.
202 a dict mapping argument name to value.
198 """
203 """
199
204
200 def getprotocaps():
205 def getprotocaps():
201 """Returns the list of protocol-level capabilities of client
206 """Returns the list of protocol-level capabilities of client
202
207
203 Returns a list of capabilities as declared by the client for
208 Returns a list of capabilities as declared by the client for
204 the current request (or connection for stateful protocol handlers)."""
209 the current request (or connection for stateful protocol handlers)."""
205
210
206 def getpayload():
211 def getpayload():
207 """Provide a generator for the raw payload.
212 """Provide a generator for the raw payload.
208
213
209 The caller is responsible for ensuring that the full payload is
214 The caller is responsible for ensuring that the full payload is
210 processed.
215 processed.
211 """
216 """
212
217
213 def mayberedirectstdio():
218 def mayberedirectstdio():
214 """Context manager to possibly redirect stdio.
219 """Context manager to possibly redirect stdio.
215
220
216 The context manager yields a file-object like object that receives
221 The context manager yields a file-object like object that receives
217 stdout and stderr output when the context manager is active. Or it
222 stdout and stderr output when the context manager is active. Or it
218 yields ``None`` if no I/O redirection occurs.
223 yields ``None`` if no I/O redirection occurs.
219
224
220 The intent of this context manager is to capture stdio output
225 The intent of this context manager is to capture stdio output
221 so it may be sent in the response. Some transports support streaming
226 so it may be sent in the response. Some transports support streaming
222 stdio to the client in real time. For these transports, stdio output
227 stdio to the client in real time. For these transports, stdio output
223 won't be captured.
228 won't be captured.
224 """
229 """
225
230
226 def client():
231 def client():
227 """Returns a string representation of this client (as bytes)."""
232 """Returns a string representation of this client (as bytes)."""
228
233
229 def addcapabilities(repo, caps):
234 def addcapabilities(repo, caps):
230 """Adds advertised capabilities specific to this protocol.
235 """Adds advertised capabilities specific to this protocol.
231
236
232 Receives the list of capabilities collected so far.
237 Receives the list of capabilities collected so far.
233
238
234 Returns a list of capabilities. The passed in argument can be returned.
239 Returns a list of capabilities. The passed in argument can be returned.
235 """
240 """
236
241
237 def checkperm(perm):
242 def checkperm(perm):
238 """Validate that the client has permissions to perform a request.
243 """Validate that the client has permissions to perform a request.
239
244
240 The argument is the permission required to proceed. If the client
245 The argument is the permission required to proceed. If the client
241 doesn't have that permission, the exception should raise or abort
246 doesn't have that permission, the exception should raise or abort
242 in a protocol specific manner.
247 in a protocol specific manner.
243 """
248 """
244
249
245 class commandentry(object):
250 class commandentry(object):
246 """Represents a declared wire protocol command."""
251 """Represents a declared wire protocol command."""
247 def __init__(self, func, args='', transports=None,
252 def __init__(self, func, args='', transports=None,
248 permission='push'):
253 permission='push'):
249 self.func = func
254 self.func = func
250 self.args = args
255 self.args = args
251 self.transports = transports or set()
256 self.transports = transports or set()
252 self.permission = permission
257 self.permission = permission
253
258
254 def _merge(self, func, args):
259 def _merge(self, func, args):
255 """Merge this instance with an incoming 2-tuple.
260 """Merge this instance with an incoming 2-tuple.
256
261
257 This is called when a caller using the old 2-tuple API attempts
262 This is called when a caller using the old 2-tuple API attempts
258 to replace an instance. The incoming values are merged with
263 to replace an instance. The incoming values are merged with
259 data not captured by the 2-tuple and a new instance containing
264 data not captured by the 2-tuple and a new instance containing
260 the union of the two objects is returned.
265 the union of the two objects is returned.
261 """
266 """
262 return commandentry(func, args=args, transports=set(self.transports),
267 return commandentry(func, args=args, transports=set(self.transports),
263 permission=self.permission)
268 permission=self.permission)
264
269
265 # Old code treats instances as 2-tuples. So expose that interface.
270 # Old code treats instances as 2-tuples. So expose that interface.
266 def __iter__(self):
271 def __iter__(self):
267 yield self.func
272 yield self.func
268 yield self.args
273 yield self.args
269
274
270 def __getitem__(self, i):
275 def __getitem__(self, i):
271 if i == 0:
276 if i == 0:
272 return self.func
277 return self.func
273 elif i == 1:
278 elif i == 1:
274 return self.args
279 return self.args
275 else:
280 else:
276 raise IndexError('can only access elements 0 and 1')
281 raise IndexError('can only access elements 0 and 1')
277
282
278 class commanddict(dict):
283 class commanddict(dict):
279 """Container for registered wire protocol commands.
284 """Container for registered wire protocol commands.
280
285
281 It behaves like a dict. But __setitem__ is overwritten to allow silent
286 It behaves like a dict. But __setitem__ is overwritten to allow silent
282 coercion of values from 2-tuples for API compatibility.
287 coercion of values from 2-tuples for API compatibility.
283 """
288 """
284 def __setitem__(self, k, v):
289 def __setitem__(self, k, v):
285 if isinstance(v, commandentry):
290 if isinstance(v, commandentry):
286 pass
291 pass
287 # Cast 2-tuples to commandentry instances.
292 # Cast 2-tuples to commandentry instances.
288 elif isinstance(v, tuple):
293 elif isinstance(v, tuple):
289 if len(v) != 2:
294 if len(v) != 2:
290 raise ValueError('command tuples must have exactly 2 elements')
295 raise ValueError('command tuples must have exactly 2 elements')
291
296
292 # It is common for extensions to wrap wire protocol commands via
297 # It is common for extensions to wrap wire protocol commands via
293 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
298 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
294 # doing this aren't aware of the new API that uses objects to store
299 # doing this aren't aware of the new API that uses objects to store
295 # command entries, we automatically merge old state with new.
300 # command entries, we automatically merge old state with new.
296 if k in self:
301 if k in self:
297 v = self[k]._merge(v[0], v[1])
302 v = self[k]._merge(v[0], v[1])
298 else:
303 else:
299 # Use default values from @wireprotocommand.
304 # Use default values from @wireprotocommand.
300 v = commandentry(v[0], args=v[1],
305 v = commandentry(v[0], args=v[1],
301 transports=set(TRANSPORTS),
306 transports=set(TRANSPORTS),
302 permission='push')
307 permission='push')
303 else:
308 else:
304 raise ValueError('command entries must be commandentry instances '
309 raise ValueError('command entries must be commandentry instances '
305 'or 2-tuples')
310 'or 2-tuples')
306
311
307 return super(commanddict, self).__setitem__(k, v)
312 return super(commanddict, self).__setitem__(k, v)
308
313
309 def commandavailable(self, command, proto):
314 def commandavailable(self, command, proto):
310 """Determine if a command is available for the requested protocol."""
315 """Determine if a command is available for the requested protocol."""
311 assert proto.name in TRANSPORTS
316 assert proto.name in TRANSPORTS
312
317
313 entry = self.get(command)
318 entry = self.get(command)
314
319
315 if not entry:
320 if not entry:
316 return False
321 return False
317
322
318 if proto.name not in entry.transports:
323 if proto.name not in entry.transports:
319 return False
324 return False
320
325
321 return True
326 return True
327
328 def supportedcompengines(ui, role):
329 """Obtain the list of supported compression engines for a request."""
330 assert role in (util.CLIENTROLE, util.SERVERROLE)
331
332 compengines = util.compengines.supportedwireengines(role)
333
334 # Allow config to override default list and ordering.
335 if role == util.SERVERROLE:
336 configengines = ui.configlist('server', 'compressionengines')
337 config = 'server.compressionengines'
338 else:
339 # This is currently implemented mainly to facilitate testing. In most
340 # cases, the server should be in charge of choosing a compression engine
341 # because a server has the most to lose from a sub-optimal choice. (e.g.
342 # CPU DoS due to an expensive engine or a network DoS due to poor
343 # compression ratio).
344 configengines = ui.configlist('experimental',
345 'clientcompressionengines')
346 config = 'experimental.clientcompressionengines'
347
348 # No explicit config. Filter out the ones that aren't supposed to be
349 # advertised and return default ordering.
350 if not configengines:
351 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
352 return [e for e in compengines
353 if getattr(e.wireprotosupport(), attr) > 0]
354
355 # If compression engines are listed in the config, assume there is a good
356 # reason for it (like server operators wanting to achieve specific
357 # performance characteristics). So fail fast if the config references
358 # unusable compression engines.
359 validnames = set(e.name() for e in compengines)
360 invalidnames = set(e for e in configengines if e not in validnames)
361 if invalidnames:
362 raise error.Abort(_('invalid compression engine defined in %s: %s') %
363 (config, ', '.join(sorted(invalidnames))))
364
365 compengines = [e for e in compengines if e.name() in configengines]
366 compengines = sorted(compengines,
367 key=lambda e: configengines.index(e.name()))
368
369 if not compengines:
370 raise error.Abort(_('%s config option does not specify any known '
371 'compression engines') % config,
372 hint=_('usable compression engines: %s') %
373 ', '.sorted(validnames))
374
375 return compengines
@@ -1,533 +1,533
1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 #
3 #
4 # This software may be used and distributed according to the terms of the
4 # This software may be used and distributed according to the terms of the
5 # GNU General Public License version 2 or any later version.
5 # GNU General Public License version 2 or any later version.
6
6
7 from __future__ import absolute_import
7 from __future__ import absolute_import
8
8
9 import contextlib
9 import contextlib
10
10
11 from .i18n import _
11 from .i18n import _
12 from .thirdparty import (
12 from .thirdparty import (
13 cbor,
13 cbor,
14 )
14 )
15 from .thirdparty.zope import (
15 from .thirdparty.zope import (
16 interface as zi,
16 interface as zi,
17 )
17 )
18 from . import (
18 from . import (
19 encoding,
19 encoding,
20 error,
20 error,
21 pycompat,
21 pycompat,
22 streamclone,
22 streamclone,
23 util,
23 util,
24 wireproto,
24 wireproto,
25 wireprotoframing,
25 wireprotoframing,
26 wireprototypes,
26 wireprototypes,
27 )
27 )
28
28
29 FRAMINGTYPE = b'application/mercurial-exp-framing-0005'
29 FRAMINGTYPE = b'application/mercurial-exp-framing-0005'
30
30
31 HTTP_WIREPROTO_V2 = wireprototypes.HTTP_WIREPROTO_V2
31 HTTP_WIREPROTO_V2 = wireprototypes.HTTP_WIREPROTO_V2
32
32
33 def handlehttpv2request(rctx, req, res, checkperm, urlparts):
33 def handlehttpv2request(rctx, req, res, checkperm, urlparts):
34 from .hgweb import common as hgwebcommon
34 from .hgweb import common as hgwebcommon
35
35
36 # URL space looks like: <permissions>/<command>, where <permission> can
36 # URL space looks like: <permissions>/<command>, where <permission> can
37 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
37 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
38
38
39 # Root URL does nothing meaningful... yet.
39 # Root URL does nothing meaningful... yet.
40 if not urlparts:
40 if not urlparts:
41 res.status = b'200 OK'
41 res.status = b'200 OK'
42 res.headers[b'Content-Type'] = b'text/plain'
42 res.headers[b'Content-Type'] = b'text/plain'
43 res.setbodybytes(_('HTTP version 2 API handler'))
43 res.setbodybytes(_('HTTP version 2 API handler'))
44 return
44 return
45
45
46 if len(urlparts) == 1:
46 if len(urlparts) == 1:
47 res.status = b'404 Not Found'
47 res.status = b'404 Not Found'
48 res.headers[b'Content-Type'] = b'text/plain'
48 res.headers[b'Content-Type'] = b'text/plain'
49 res.setbodybytes(_('do not know how to process %s\n') %
49 res.setbodybytes(_('do not know how to process %s\n') %
50 req.dispatchpath)
50 req.dispatchpath)
51 return
51 return
52
52
53 permission, command = urlparts[0:2]
53 permission, command = urlparts[0:2]
54
54
55 if permission not in (b'ro', b'rw'):
55 if permission not in (b'ro', b'rw'):
56 res.status = b'404 Not Found'
56 res.status = b'404 Not Found'
57 res.headers[b'Content-Type'] = b'text/plain'
57 res.headers[b'Content-Type'] = b'text/plain'
58 res.setbodybytes(_('unknown permission: %s') % permission)
58 res.setbodybytes(_('unknown permission: %s') % permission)
59 return
59 return
60
60
61 if req.method != 'POST':
61 if req.method != 'POST':
62 res.status = b'405 Method Not Allowed'
62 res.status = b'405 Method Not Allowed'
63 res.headers[b'Allow'] = b'POST'
63 res.headers[b'Allow'] = b'POST'
64 res.setbodybytes(_('commands require POST requests'))
64 res.setbodybytes(_('commands require POST requests'))
65 return
65 return
66
66
67 # At some point we'll want to use our own API instead of recycling the
67 # At some point we'll want to use our own API instead of recycling the
68 # behavior of version 1 of the wire protocol...
68 # behavior of version 1 of the wire protocol...
69 # TODO return reasonable responses - not responses that overload the
69 # TODO return reasonable responses - not responses that overload the
70 # HTTP status line message for error reporting.
70 # HTTP status line message for error reporting.
71 try:
71 try:
72 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
72 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
73 except hgwebcommon.ErrorResponse as e:
73 except hgwebcommon.ErrorResponse as e:
74 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
74 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
75 for k, v in e.headers:
75 for k, v in e.headers:
76 res.headers[k] = v
76 res.headers[k] = v
77 res.setbodybytes('permission denied')
77 res.setbodybytes('permission denied')
78 return
78 return
79
79
80 # We have a special endpoint to reflect the request back at the client.
80 # We have a special endpoint to reflect the request back at the client.
81 if command == b'debugreflect':
81 if command == b'debugreflect':
82 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
82 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
83 return
83 return
84
84
85 # Extra commands that we handle that aren't really wire protocol
85 # Extra commands that we handle that aren't really wire protocol
86 # commands. Think extra hard before making this hackery available to
86 # commands. Think extra hard before making this hackery available to
87 # extension.
87 # extension.
88 extracommands = {'multirequest'}
88 extracommands = {'multirequest'}
89
89
90 if command not in wireproto.commandsv2 and command not in extracommands:
90 if command not in wireproto.commandsv2 and command not in extracommands:
91 res.status = b'404 Not Found'
91 res.status = b'404 Not Found'
92 res.headers[b'Content-Type'] = b'text/plain'
92 res.headers[b'Content-Type'] = b'text/plain'
93 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
93 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
94 return
94 return
95
95
96 repo = rctx.repo
96 repo = rctx.repo
97 ui = repo.ui
97 ui = repo.ui
98
98
99 proto = httpv2protocolhandler(req, ui)
99 proto = httpv2protocolhandler(req, ui)
100
100
101 if (not wireproto.commandsv2.commandavailable(command, proto)
101 if (not wireproto.commandsv2.commandavailable(command, proto)
102 and command not in extracommands):
102 and command not in extracommands):
103 res.status = b'404 Not Found'
103 res.status = b'404 Not Found'
104 res.headers[b'Content-Type'] = b'text/plain'
104 res.headers[b'Content-Type'] = b'text/plain'
105 res.setbodybytes(_('invalid wire protocol command: %s') % command)
105 res.setbodybytes(_('invalid wire protocol command: %s') % command)
106 return
106 return
107
107
108 # TODO consider cases where proxies may add additional Accept headers.
108 # TODO consider cases where proxies may add additional Accept headers.
109 if req.headers.get(b'Accept') != FRAMINGTYPE:
109 if req.headers.get(b'Accept') != FRAMINGTYPE:
110 res.status = b'406 Not Acceptable'
110 res.status = b'406 Not Acceptable'
111 res.headers[b'Content-Type'] = b'text/plain'
111 res.headers[b'Content-Type'] = b'text/plain'
112 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
112 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
113 % FRAMINGTYPE)
113 % FRAMINGTYPE)
114 return
114 return
115
115
116 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
116 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
117 res.status = b'415 Unsupported Media Type'
117 res.status = b'415 Unsupported Media Type'
118 # TODO we should send a response with appropriate media type,
118 # TODO we should send a response with appropriate media type,
119 # since client does Accept it.
119 # since client does Accept it.
120 res.headers[b'Content-Type'] = b'text/plain'
120 res.headers[b'Content-Type'] = b'text/plain'
121 res.setbodybytes(_('client MUST send Content-Type header with '
121 res.setbodybytes(_('client MUST send Content-Type header with '
122 'value: %s\n') % FRAMINGTYPE)
122 'value: %s\n') % FRAMINGTYPE)
123 return
123 return
124
124
125 _processhttpv2request(ui, repo, req, res, permission, command, proto)
125 _processhttpv2request(ui, repo, req, res, permission, command, proto)
126
126
127 def _processhttpv2reflectrequest(ui, repo, req, res):
127 def _processhttpv2reflectrequest(ui, repo, req, res):
128 """Reads unified frame protocol request and dumps out state to client.
128 """Reads unified frame protocol request and dumps out state to client.
129
129
130 This special endpoint can be used to help debug the wire protocol.
130 This special endpoint can be used to help debug the wire protocol.
131
131
132 Instead of routing the request through the normal dispatch mechanism,
132 Instead of routing the request through the normal dispatch mechanism,
133 we instead read all frames, decode them, and feed them into our state
133 we instead read all frames, decode them, and feed them into our state
134 tracker. We then dump the log of all that activity back out to the
134 tracker. We then dump the log of all that activity back out to the
135 client.
135 client.
136 """
136 """
137 import json
137 import json
138
138
139 # Reflection APIs have a history of being abused, accidentally disclosing
139 # Reflection APIs have a history of being abused, accidentally disclosing
140 # sensitive data, etc. So we have a config knob.
140 # sensitive data, etc. So we have a config knob.
141 if not ui.configbool('experimental', 'web.api.debugreflect'):
141 if not ui.configbool('experimental', 'web.api.debugreflect'):
142 res.status = b'404 Not Found'
142 res.status = b'404 Not Found'
143 res.headers[b'Content-Type'] = b'text/plain'
143 res.headers[b'Content-Type'] = b'text/plain'
144 res.setbodybytes(_('debugreflect service not available'))
144 res.setbodybytes(_('debugreflect service not available'))
145 return
145 return
146
146
147 # We assume we have a unified framing protocol request body.
147 # We assume we have a unified framing protocol request body.
148
148
149 reactor = wireprotoframing.serverreactor()
149 reactor = wireprotoframing.serverreactor()
150 states = []
150 states = []
151
151
152 while True:
152 while True:
153 frame = wireprotoframing.readframe(req.bodyfh)
153 frame = wireprotoframing.readframe(req.bodyfh)
154
154
155 if not frame:
155 if not frame:
156 states.append(b'received: <no frame>')
156 states.append(b'received: <no frame>')
157 break
157 break
158
158
159 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
159 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
160 frame.requestid,
160 frame.requestid,
161 frame.payload))
161 frame.payload))
162
162
163 action, meta = reactor.onframerecv(frame)
163 action, meta = reactor.onframerecv(frame)
164 states.append(json.dumps((action, meta), sort_keys=True,
164 states.append(json.dumps((action, meta), sort_keys=True,
165 separators=(', ', ': ')))
165 separators=(', ', ': ')))
166
166
167 action, meta = reactor.oninputeof()
167 action, meta = reactor.oninputeof()
168 meta['action'] = action
168 meta['action'] = action
169 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
169 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
170
170
171 res.status = b'200 OK'
171 res.status = b'200 OK'
172 res.headers[b'Content-Type'] = b'text/plain'
172 res.headers[b'Content-Type'] = b'text/plain'
173 res.setbodybytes(b'\n'.join(states))
173 res.setbodybytes(b'\n'.join(states))
174
174
175 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
175 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
176 """Post-validation handler for HTTPv2 requests.
176 """Post-validation handler for HTTPv2 requests.
177
177
178 Called when the HTTP request contains unified frame-based protocol
178 Called when the HTTP request contains unified frame-based protocol
179 frames for evaluation.
179 frames for evaluation.
180 """
180 """
181 # TODO Some HTTP clients are full duplex and can receive data before
181 # TODO Some HTTP clients are full duplex and can receive data before
182 # the entire request is transmitted. Figure out a way to indicate support
182 # the entire request is transmitted. Figure out a way to indicate support
183 # for that so we can opt into full duplex mode.
183 # for that so we can opt into full duplex mode.
184 reactor = wireprotoframing.serverreactor(deferoutput=True)
184 reactor = wireprotoframing.serverreactor(deferoutput=True)
185 seencommand = False
185 seencommand = False
186
186
187 outstream = reactor.makeoutputstream()
187 outstream = reactor.makeoutputstream()
188
188
189 while True:
189 while True:
190 frame = wireprotoframing.readframe(req.bodyfh)
190 frame = wireprotoframing.readframe(req.bodyfh)
191 if not frame:
191 if not frame:
192 break
192 break
193
193
194 action, meta = reactor.onframerecv(frame)
194 action, meta = reactor.onframerecv(frame)
195
195
196 if action == 'wantframe':
196 if action == 'wantframe':
197 # Need more data before we can do anything.
197 # Need more data before we can do anything.
198 continue
198 continue
199 elif action == 'runcommand':
199 elif action == 'runcommand':
200 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
200 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
201 reqcommand, reactor, outstream,
201 reqcommand, reactor, outstream,
202 meta, issubsequent=seencommand)
202 meta, issubsequent=seencommand)
203
203
204 if sentoutput:
204 if sentoutput:
205 return
205 return
206
206
207 seencommand = True
207 seencommand = True
208
208
209 elif action == 'error':
209 elif action == 'error':
210 # TODO define proper error mechanism.
210 # TODO define proper error mechanism.
211 res.status = b'200 OK'
211 res.status = b'200 OK'
212 res.headers[b'Content-Type'] = b'text/plain'
212 res.headers[b'Content-Type'] = b'text/plain'
213 res.setbodybytes(meta['message'] + b'\n')
213 res.setbodybytes(meta['message'] + b'\n')
214 return
214 return
215 else:
215 else:
216 raise error.ProgrammingError(
216 raise error.ProgrammingError(
217 'unhandled action from frame processor: %s' % action)
217 'unhandled action from frame processor: %s' % action)
218
218
219 action, meta = reactor.oninputeof()
219 action, meta = reactor.oninputeof()
220 if action == 'sendframes':
220 if action == 'sendframes':
221 # We assume we haven't started sending the response yet. If we're
221 # We assume we haven't started sending the response yet. If we're
222 # wrong, the response type will raise an exception.
222 # wrong, the response type will raise an exception.
223 res.status = b'200 OK'
223 res.status = b'200 OK'
224 res.headers[b'Content-Type'] = FRAMINGTYPE
224 res.headers[b'Content-Type'] = FRAMINGTYPE
225 res.setbodygen(meta['framegen'])
225 res.setbodygen(meta['framegen'])
226 elif action == 'noop':
226 elif action == 'noop':
227 pass
227 pass
228 else:
228 else:
229 raise error.ProgrammingError('unhandled action from frame processor: %s'
229 raise error.ProgrammingError('unhandled action from frame processor: %s'
230 % action)
230 % action)
231
231
232 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
232 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
233 outstream, command, issubsequent):
233 outstream, command, issubsequent):
234 """Dispatch a wire protocol command made from HTTPv2 requests.
234 """Dispatch a wire protocol command made from HTTPv2 requests.
235
235
236 The authenticated permission (``authedperm``) along with the original
236 The authenticated permission (``authedperm``) along with the original
237 command from the URL (``reqcommand``) are passed in.
237 command from the URL (``reqcommand``) are passed in.
238 """
238 """
239 # We already validated that the session has permissions to perform the
239 # We already validated that the session has permissions to perform the
240 # actions in ``authedperm``. In the unified frame protocol, the canonical
240 # actions in ``authedperm``. In the unified frame protocol, the canonical
241 # command to run is expressed in a frame. However, the URL also requested
241 # command to run is expressed in a frame. However, the URL also requested
242 # to run a specific command. We need to be careful that the command we
242 # to run a specific command. We need to be careful that the command we
243 # run doesn't have permissions requirements greater than what was granted
243 # run doesn't have permissions requirements greater than what was granted
244 # by ``authedperm``.
244 # by ``authedperm``.
245 #
245 #
246 # Our rule for this is we only allow one command per HTTP request and
246 # Our rule for this is we only allow one command per HTTP request and
247 # that command must match the command in the URL. However, we make
247 # that command must match the command in the URL. However, we make
248 # an exception for the ``multirequest`` URL. This URL is allowed to
248 # an exception for the ``multirequest`` URL. This URL is allowed to
249 # execute multiple commands. We double check permissions of each command
249 # execute multiple commands. We double check permissions of each command
250 # as it is invoked to ensure there is no privilege escalation.
250 # as it is invoked to ensure there is no privilege escalation.
251 # TODO consider allowing multiple commands to regular command URLs
251 # TODO consider allowing multiple commands to regular command URLs
252 # iff each command is the same.
252 # iff each command is the same.
253
253
254 proto = httpv2protocolhandler(req, ui, args=command['args'])
254 proto = httpv2protocolhandler(req, ui, args=command['args'])
255
255
256 if reqcommand == b'multirequest':
256 if reqcommand == b'multirequest':
257 if not wireproto.commandsv2.commandavailable(command['command'], proto):
257 if not wireproto.commandsv2.commandavailable(command['command'], proto):
258 # TODO proper error mechanism
258 # TODO proper error mechanism
259 res.status = b'200 OK'
259 res.status = b'200 OK'
260 res.headers[b'Content-Type'] = b'text/plain'
260 res.headers[b'Content-Type'] = b'text/plain'
261 res.setbodybytes(_('wire protocol command not available: %s') %
261 res.setbodybytes(_('wire protocol command not available: %s') %
262 command['command'])
262 command['command'])
263 return True
263 return True
264
264
265 # TODO don't use assert here, since it may be elided by -O.
265 # TODO don't use assert here, since it may be elided by -O.
266 assert authedperm in (b'ro', b'rw')
266 assert authedperm in (b'ro', b'rw')
267 wirecommand = wireproto.commandsv2[command['command']]
267 wirecommand = wireproto.commandsv2[command['command']]
268 assert wirecommand.permission in ('push', 'pull')
268 assert wirecommand.permission in ('push', 'pull')
269
269
270 if authedperm == b'ro' and wirecommand.permission != 'pull':
270 if authedperm == b'ro' and wirecommand.permission != 'pull':
271 # TODO proper error mechanism
271 # TODO proper error mechanism
272 res.status = b'403 Forbidden'
272 res.status = b'403 Forbidden'
273 res.headers[b'Content-Type'] = b'text/plain'
273 res.headers[b'Content-Type'] = b'text/plain'
274 res.setbodybytes(_('insufficient permissions to execute '
274 res.setbodybytes(_('insufficient permissions to execute '
275 'command: %s') % command['command'])
275 'command: %s') % command['command'])
276 return True
276 return True
277
277
278 # TODO should we also call checkperm() here? Maybe not if we're going
278 # TODO should we also call checkperm() here? Maybe not if we're going
279 # to overhaul that API. The granted scope from the URL check should
279 # to overhaul that API. The granted scope from the URL check should
280 # be good enough.
280 # be good enough.
281
281
282 else:
282 else:
283 # Don't allow multiple commands outside of ``multirequest`` URL.
283 # Don't allow multiple commands outside of ``multirequest`` URL.
284 if issubsequent:
284 if issubsequent:
285 # TODO proper error mechanism
285 # TODO proper error mechanism
286 res.status = b'200 OK'
286 res.status = b'200 OK'
287 res.headers[b'Content-Type'] = b'text/plain'
287 res.headers[b'Content-Type'] = b'text/plain'
288 res.setbodybytes(_('multiple commands cannot be issued to this '
288 res.setbodybytes(_('multiple commands cannot be issued to this '
289 'URL'))
289 'URL'))
290 return True
290 return True
291
291
292 if reqcommand != command['command']:
292 if reqcommand != command['command']:
293 # TODO define proper error mechanism
293 # TODO define proper error mechanism
294 res.status = b'200 OK'
294 res.status = b'200 OK'
295 res.headers[b'Content-Type'] = b'text/plain'
295 res.headers[b'Content-Type'] = b'text/plain'
296 res.setbodybytes(_('command in frame must match command in URL'))
296 res.setbodybytes(_('command in frame must match command in URL'))
297 return True
297 return True
298
298
299 rsp = dispatch(repo, proto, command['command'])
299 rsp = dispatch(repo, proto, command['command'])
300
300
301 res.status = b'200 OK'
301 res.status = b'200 OK'
302 res.headers[b'Content-Type'] = FRAMINGTYPE
302 res.headers[b'Content-Type'] = FRAMINGTYPE
303
303
304 if isinstance(rsp, wireprototypes.cborresponse):
304 if isinstance(rsp, wireprototypes.cborresponse):
305 encoded = cbor.dumps(rsp.value, canonical=True)
305 encoded = cbor.dumps(rsp.value, canonical=True)
306 action, meta = reactor.oncommandresponseready(outstream,
306 action, meta = reactor.oncommandresponseready(outstream,
307 command['requestid'],
307 command['requestid'],
308 encoded)
308 encoded)
309 elif isinstance(rsp, wireprototypes.v2streamingresponse):
309 elif isinstance(rsp, wireprototypes.v2streamingresponse):
310 action, meta = reactor.oncommandresponsereadygen(outstream,
310 action, meta = reactor.oncommandresponsereadygen(outstream,
311 command['requestid'],
311 command['requestid'],
312 rsp.gen)
312 rsp.gen)
313 elif isinstance(rsp, wireprototypes.v2errorresponse):
313 elif isinstance(rsp, wireprototypes.v2errorresponse):
314 action, meta = reactor.oncommanderror(outstream,
314 action, meta = reactor.oncommanderror(outstream,
315 command['requestid'],
315 command['requestid'],
316 rsp.message,
316 rsp.message,
317 rsp.args)
317 rsp.args)
318 else:
318 else:
319 action, meta = reactor.onservererror(
319 action, meta = reactor.onservererror(
320 _('unhandled response type from wire proto command'))
320 _('unhandled response type from wire proto command'))
321
321
322 if action == 'sendframes':
322 if action == 'sendframes':
323 res.setbodygen(meta['framegen'])
323 res.setbodygen(meta['framegen'])
324 return True
324 return True
325 elif action == 'noop':
325 elif action == 'noop':
326 return False
326 return False
327 else:
327 else:
328 raise error.ProgrammingError('unhandled event from reactor: %s' %
328 raise error.ProgrammingError('unhandled event from reactor: %s' %
329 action)
329 action)
330
330
331 def getdispatchrepo(repo, proto, command):
331 def getdispatchrepo(repo, proto, command):
332 return repo.filtered('served')
332 return repo.filtered('served')
333
333
334 def dispatch(repo, proto, command):
334 def dispatch(repo, proto, command):
335 repo = getdispatchrepo(repo, proto, command)
335 repo = getdispatchrepo(repo, proto, command)
336
336
337 func, spec = wireproto.commandsv2[command]
337 func, spec = wireproto.commandsv2[command]
338 args = proto.getargs(spec)
338 args = proto.getargs(spec)
339
339
340 return func(repo, proto, **args)
340 return func(repo, proto, **args)
341
341
342 @zi.implementer(wireprototypes.baseprotocolhandler)
342 @zi.implementer(wireprototypes.baseprotocolhandler)
343 class httpv2protocolhandler(object):
343 class httpv2protocolhandler(object):
344 def __init__(self, req, ui, args=None):
344 def __init__(self, req, ui, args=None):
345 self._req = req
345 self._req = req
346 self._ui = ui
346 self._ui = ui
347 self._args = args
347 self._args = args
348
348
349 @property
349 @property
350 def name(self):
350 def name(self):
351 return HTTP_WIREPROTO_V2
351 return HTTP_WIREPROTO_V2
352
352
353 def getargs(self, args):
353 def getargs(self, args):
354 data = {}
354 data = {}
355 for k, typ in args.items():
355 for k, typ in args.items():
356 if k == '*':
356 if k == '*':
357 raise NotImplementedError('do not support * args')
357 raise NotImplementedError('do not support * args')
358 elif k in self._args:
358 elif k in self._args:
359 # TODO consider validating value types.
359 # TODO consider validating value types.
360 data[k] = self._args[k]
360 data[k] = self._args[k]
361
361
362 return data
362 return data
363
363
364 def getprotocaps(self):
364 def getprotocaps(self):
365 # Protocol capabilities are currently not implemented for HTTP V2.
365 # Protocol capabilities are currently not implemented for HTTP V2.
366 return set()
366 return set()
367
367
368 def getpayload(self):
368 def getpayload(self):
369 raise NotImplementedError
369 raise NotImplementedError
370
370
371 @contextlib.contextmanager
371 @contextlib.contextmanager
372 def mayberedirectstdio(self):
372 def mayberedirectstdio(self):
373 raise NotImplementedError
373 raise NotImplementedError
374
374
375 def client(self):
375 def client(self):
376 raise NotImplementedError
376 raise NotImplementedError
377
377
378 def addcapabilities(self, repo, caps):
378 def addcapabilities(self, repo, caps):
379 return caps
379 return caps
380
380
381 def checkperm(self, perm):
381 def checkperm(self, perm):
382 raise NotImplementedError
382 raise NotImplementedError
383
383
384 def httpv2apidescriptor(req, repo):
384 def httpv2apidescriptor(req, repo):
385 proto = httpv2protocolhandler(req, repo.ui)
385 proto = httpv2protocolhandler(req, repo.ui)
386
386
387 return _capabilitiesv2(repo, proto)
387 return _capabilitiesv2(repo, proto)
388
388
389 def _capabilitiesv2(repo, proto):
389 def _capabilitiesv2(repo, proto):
390 """Obtain the set of capabilities for version 2 transports.
390 """Obtain the set of capabilities for version 2 transports.
391
391
392 These capabilities are distinct from the capabilities for version 1
392 These capabilities are distinct from the capabilities for version 1
393 transports.
393 transports.
394 """
394 """
395 compression = []
395 compression = []
396 for engine in wireproto.supportedcompengines(repo.ui, util.SERVERROLE):
396 for engine in wireprototypes.supportedcompengines(repo.ui, util.SERVERROLE):
397 compression.append({
397 compression.append({
398 b'name': engine.wireprotosupport().name,
398 b'name': engine.wireprotosupport().name,
399 })
399 })
400
400
401 caps = {
401 caps = {
402 'commands': {},
402 'commands': {},
403 'compression': compression,
403 'compression': compression,
404 'framingmediatypes': [FRAMINGTYPE],
404 'framingmediatypes': [FRAMINGTYPE],
405 }
405 }
406
406
407 for command, entry in wireproto.commandsv2.items():
407 for command, entry in wireproto.commandsv2.items():
408 caps['commands'][command] = {
408 caps['commands'][command] = {
409 'args': entry.args,
409 'args': entry.args,
410 'permissions': [entry.permission],
410 'permissions': [entry.permission],
411 }
411 }
412
412
413 if streamclone.allowservergeneration(repo):
413 if streamclone.allowservergeneration(repo):
414 caps['rawrepoformats'] = sorted(repo.requirements &
414 caps['rawrepoformats'] = sorted(repo.requirements &
415 repo.supportedformats)
415 repo.supportedformats)
416
416
417 return proto.addcapabilities(repo, caps)
417 return proto.addcapabilities(repo, caps)
418
418
419 def wireprotocommand(name, args=None, permission='push'):
419 def wireprotocommand(name, args=None, permission='push'):
420 """Decorator to declare a wire protocol command.
420 """Decorator to declare a wire protocol command.
421
421
422 ``name`` is the name of the wire protocol command being provided.
422 ``name`` is the name of the wire protocol command being provided.
423
423
424 ``args`` is a dict of argument names to example values.
424 ``args`` is a dict of argument names to example values.
425
425
426 ``permission`` defines the permission type needed to run this command.
426 ``permission`` defines the permission type needed to run this command.
427 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
427 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
428 respectively. Default is to assume command requires ``push`` permissions
428 respectively. Default is to assume command requires ``push`` permissions
429 because otherwise commands not declaring their permissions could modify
429 because otherwise commands not declaring their permissions could modify
430 a repository that is supposed to be read-only.
430 a repository that is supposed to be read-only.
431 """
431 """
432 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
432 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
433 if v['version'] == 2}
433 if v['version'] == 2}
434
434
435 if permission not in ('push', 'pull'):
435 if permission not in ('push', 'pull'):
436 raise error.ProgrammingError('invalid wire protocol permission; '
436 raise error.ProgrammingError('invalid wire protocol permission; '
437 'got %s; expected "push" or "pull"' %
437 'got %s; expected "push" or "pull"' %
438 permission)
438 permission)
439
439
440 if args is None:
440 if args is None:
441 args = {}
441 args = {}
442
442
443 if not isinstance(args, dict):
443 if not isinstance(args, dict):
444 raise error.ProgrammingError('arguments for version 2 commands '
444 raise error.ProgrammingError('arguments for version 2 commands '
445 'must be declared as dicts')
445 'must be declared as dicts')
446
446
447 def register(func):
447 def register(func):
448 if name in wireproto.commandsv2:
448 if name in wireproto.commandsv2:
449 raise error.ProgrammingError('%s command already registered '
449 raise error.ProgrammingError('%s command already registered '
450 'for version 2' % name)
450 'for version 2' % name)
451
451
452 wireproto.commandsv2[name] = wireprototypes.commandentry(
452 wireproto.commandsv2[name] = wireprototypes.commandentry(
453 func, args=args, transports=transports, permission=permission)
453 func, args=args, transports=transports, permission=permission)
454
454
455 return func
455 return func
456
456
457 return register
457 return register
458
458
459 @wireprotocommand('branchmap', permission='pull')
459 @wireprotocommand('branchmap', permission='pull')
460 def branchmapv2(repo, proto):
460 def branchmapv2(repo, proto):
461 branchmap = {encoding.fromlocal(k): v
461 branchmap = {encoding.fromlocal(k): v
462 for k, v in repo.branchmap().iteritems()}
462 for k, v in repo.branchmap().iteritems()}
463
463
464 return wireprototypes.cborresponse(branchmap)
464 return wireprototypes.cborresponse(branchmap)
465
465
466 @wireprotocommand('capabilities', permission='pull')
466 @wireprotocommand('capabilities', permission='pull')
467 def capabilitiesv2(repo, proto):
467 def capabilitiesv2(repo, proto):
468 caps = _capabilitiesv2(repo, proto)
468 caps = _capabilitiesv2(repo, proto)
469
469
470 return wireprototypes.cborresponse(caps)
470 return wireprototypes.cborresponse(caps)
471
471
472 @wireprotocommand('heads',
472 @wireprotocommand('heads',
473 args={
473 args={
474 'publiconly': False,
474 'publiconly': False,
475 },
475 },
476 permission='pull')
476 permission='pull')
477 def headsv2(repo, proto, publiconly=False):
477 def headsv2(repo, proto, publiconly=False):
478 if publiconly:
478 if publiconly:
479 repo = repo.filtered('immutable')
479 repo = repo.filtered('immutable')
480
480
481 return wireprototypes.cborresponse(repo.heads())
481 return wireprototypes.cborresponse(repo.heads())
482
482
483 @wireprotocommand('known',
483 @wireprotocommand('known',
484 args={
484 args={
485 'nodes': [b'deadbeef'],
485 'nodes': [b'deadbeef'],
486 },
486 },
487 permission='pull')
487 permission='pull')
488 def knownv2(repo, proto, nodes=None):
488 def knownv2(repo, proto, nodes=None):
489 nodes = nodes or []
489 nodes = nodes or []
490 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes))
490 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes))
491 return wireprototypes.cborresponse(result)
491 return wireprototypes.cborresponse(result)
492
492
493 @wireprotocommand('listkeys',
493 @wireprotocommand('listkeys',
494 args={
494 args={
495 'namespace': b'ns',
495 'namespace': b'ns',
496 },
496 },
497 permission='pull')
497 permission='pull')
498 def listkeysv2(repo, proto, namespace=None):
498 def listkeysv2(repo, proto, namespace=None):
499 keys = repo.listkeys(encoding.tolocal(namespace))
499 keys = repo.listkeys(encoding.tolocal(namespace))
500 keys = {encoding.fromlocal(k): encoding.fromlocal(v)
500 keys = {encoding.fromlocal(k): encoding.fromlocal(v)
501 for k, v in keys.iteritems()}
501 for k, v in keys.iteritems()}
502
502
503 return wireprototypes.cborresponse(keys)
503 return wireprototypes.cborresponse(keys)
504
504
505 @wireprotocommand('lookup',
505 @wireprotocommand('lookup',
506 args={
506 args={
507 'key': b'foo',
507 'key': b'foo',
508 },
508 },
509 permission='pull')
509 permission='pull')
510 def lookupv2(repo, proto, key):
510 def lookupv2(repo, proto, key):
511 key = encoding.tolocal(key)
511 key = encoding.tolocal(key)
512
512
513 # TODO handle exception.
513 # TODO handle exception.
514 node = repo.lookup(key)
514 node = repo.lookup(key)
515
515
516 return wireprototypes.cborresponse(node)
516 return wireprototypes.cborresponse(node)
517
517
518 @wireprotocommand('pushkey',
518 @wireprotocommand('pushkey',
519 args={
519 args={
520 'namespace': b'ns',
520 'namespace': b'ns',
521 'key': b'key',
521 'key': b'key',
522 'old': b'old',
522 'old': b'old',
523 'new': b'new',
523 'new': b'new',
524 },
524 },
525 permission='push')
525 permission='push')
526 def pushkeyv2(repo, proto, namespace, key, old, new):
526 def pushkeyv2(repo, proto, namespace, key, old, new):
527 # TODO handle ui output redirection
527 # TODO handle ui output redirection
528 r = repo.pushkey(encoding.tolocal(namespace),
528 r = repo.pushkey(encoding.tolocal(namespace),
529 encoding.tolocal(key),
529 encoding.tolocal(key),
530 encoding.tolocal(old),
530 encoding.tolocal(old),
531 encoding.tolocal(new))
531 encoding.tolocal(new))
532
532
533 return wireprototypes.cborresponse(r)
533 return wireprototypes.cborresponse(r)
General Comments 0
You need to be logged in to leave comments. Login now