##// END OF EJS Templates
wireproto: reimplement dispatch() for version 2 server...
Gregory Szorc -
r37800:99accae4 default
parent child Browse files
Show More
@@ -1,730 +1,720 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import os
10 import os
11 import tempfile
11 import tempfile
12
12
13 from .i18n import _
13 from .i18n import _
14 from .node import (
14 from .node import (
15 hex,
15 hex,
16 nullid,
16 nullid,
17 )
17 )
18
18
19 from . import (
19 from . import (
20 bundle2,
20 bundle2,
21 changegroup as changegroupmod,
21 changegroup as changegroupmod,
22 discovery,
22 discovery,
23 encoding,
23 encoding,
24 error,
24 error,
25 exchange,
25 exchange,
26 pushkey as pushkeymod,
26 pushkey as pushkeymod,
27 pycompat,
27 pycompat,
28 streamclone,
28 streamclone,
29 util,
29 util,
30 wireprototypes,
30 wireprototypes,
31 )
31 )
32
32
33 from .utils import (
33 from .utils import (
34 procutil,
34 procutil,
35 stringutil,
35 stringutil,
36 )
36 )
37
37
38 urlerr = util.urlerr
38 urlerr = util.urlerr
39 urlreq = util.urlreq
39 urlreq = util.urlreq
40
40
41 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
41 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
42 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
42 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
43 'IncompatibleClient')
43 'IncompatibleClient')
44 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
44 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
45
45
46 def clientcompressionsupport(proto):
46 def clientcompressionsupport(proto):
47 """Returns a list of compression methods supported by the client.
47 """Returns a list of compression methods supported by the client.
48
48
49 Returns a list of the compression methods supported by the client
49 Returns a list of the compression methods supported by the client
50 according to the protocol capabilities. If no such capability has
50 according to the protocol capabilities. If no such capability has
51 been announced, fallback to the default of zlib and uncompressed.
51 been announced, fallback to the default of zlib and uncompressed.
52 """
52 """
53 for cap in proto.getprotocaps():
53 for cap in proto.getprotocaps():
54 if cap.startswith('comp='):
54 if cap.startswith('comp='):
55 return cap[5:].split(',')
55 return cap[5:].split(',')
56 return ['zlib', 'none']
56 return ['zlib', 'none']
57
57
58 # wire protocol command can either return a string or one of these classes.
58 # wire protocol command can either return a string or one of these classes.
59
59
60 def getdispatchrepo(repo, proto, command):
60 def getdispatchrepo(repo, proto, command):
61 """Obtain the repo used for processing wire protocol commands.
61 """Obtain the repo used for processing wire protocol commands.
62
62
63 The intent of this function is to serve as a monkeypatch point for
63 The intent of this function is to serve as a monkeypatch point for
64 extensions that need commands to operate on different repo views under
64 extensions that need commands to operate on different repo views under
65 specialized circumstances.
65 specialized circumstances.
66 """
66 """
67 return repo.filtered('served')
67 return repo.filtered('served')
68
68
69 def dispatch(repo, proto, command):
69 def dispatch(repo, proto, command):
70 repo = getdispatchrepo(repo, proto, command)
70 repo = getdispatchrepo(repo, proto, command)
71
71
72 transportversion = wireprototypes.TRANSPORTS[proto.name]['version']
72 func, spec = commands[command]
73 commandtable = commandsv2 if transportversion == 2 else commands
74 func, spec = commandtable[command]
75
76 args = proto.getargs(spec)
73 args = proto.getargs(spec)
77
74
78 # Version 1 protocols define arguments as a list. Version 2 uses a dict.
75 return func(repo, proto, *args)
79 if isinstance(args, list):
80 return func(repo, proto, *args)
81 elif isinstance(args, dict):
82 return func(repo, proto, **args)
83 else:
84 raise error.ProgrammingError('unexpected type returned from '
85 'proto.getargs(): %s' % type(args))
86
76
87 def options(cmd, keys, others):
77 def options(cmd, keys, others):
88 opts = {}
78 opts = {}
89 for k in keys:
79 for k in keys:
90 if k in others:
80 if k in others:
91 opts[k] = others[k]
81 opts[k] = others[k]
92 del others[k]
82 del others[k]
93 if others:
83 if others:
94 procutil.stderr.write("warning: %s ignored unexpected arguments %s\n"
84 procutil.stderr.write("warning: %s ignored unexpected arguments %s\n"
95 % (cmd, ",".join(others)))
85 % (cmd, ",".join(others)))
96 return opts
86 return opts
97
87
98 def bundle1allowed(repo, action):
88 def bundle1allowed(repo, action):
99 """Whether a bundle1 operation is allowed from the server.
89 """Whether a bundle1 operation is allowed from the server.
100
90
101 Priority is:
91 Priority is:
102
92
103 1. server.bundle1gd.<action> (if generaldelta active)
93 1. server.bundle1gd.<action> (if generaldelta active)
104 2. server.bundle1.<action>
94 2. server.bundle1.<action>
105 3. server.bundle1gd (if generaldelta active)
95 3. server.bundle1gd (if generaldelta active)
106 4. server.bundle1
96 4. server.bundle1
107 """
97 """
108 ui = repo.ui
98 ui = repo.ui
109 gd = 'generaldelta' in repo.requirements
99 gd = 'generaldelta' in repo.requirements
110
100
111 if gd:
101 if gd:
112 v = ui.configbool('server', 'bundle1gd.%s' % action)
102 v = ui.configbool('server', 'bundle1gd.%s' % action)
113 if v is not None:
103 if v is not None:
114 return v
104 return v
115
105
116 v = ui.configbool('server', 'bundle1.%s' % action)
106 v = ui.configbool('server', 'bundle1.%s' % action)
117 if v is not None:
107 if v is not None:
118 return v
108 return v
119
109
120 if gd:
110 if gd:
121 v = ui.configbool('server', 'bundle1gd')
111 v = ui.configbool('server', 'bundle1gd')
122 if v is not None:
112 if v is not None:
123 return v
113 return v
124
114
125 return ui.configbool('server', 'bundle1')
115 return ui.configbool('server', 'bundle1')
126
116
127 def supportedcompengines(ui, role):
117 def supportedcompengines(ui, role):
128 """Obtain the list of supported compression engines for a request."""
118 """Obtain the list of supported compression engines for a request."""
129 assert role in (util.CLIENTROLE, util.SERVERROLE)
119 assert role in (util.CLIENTROLE, util.SERVERROLE)
130
120
131 compengines = util.compengines.supportedwireengines(role)
121 compengines = util.compengines.supportedwireengines(role)
132
122
133 # Allow config to override default list and ordering.
123 # Allow config to override default list and ordering.
134 if role == util.SERVERROLE:
124 if role == util.SERVERROLE:
135 configengines = ui.configlist('server', 'compressionengines')
125 configengines = ui.configlist('server', 'compressionengines')
136 config = 'server.compressionengines'
126 config = 'server.compressionengines'
137 else:
127 else:
138 # This is currently implemented mainly to facilitate testing. In most
128 # This is currently implemented mainly to facilitate testing. In most
139 # cases, the server should be in charge of choosing a compression engine
129 # cases, the server should be in charge of choosing a compression engine
140 # because a server has the most to lose from a sub-optimal choice. (e.g.
130 # because a server has the most to lose from a sub-optimal choice. (e.g.
141 # CPU DoS due to an expensive engine or a network DoS due to poor
131 # CPU DoS due to an expensive engine or a network DoS due to poor
142 # compression ratio).
132 # compression ratio).
143 configengines = ui.configlist('experimental',
133 configengines = ui.configlist('experimental',
144 'clientcompressionengines')
134 'clientcompressionengines')
145 config = 'experimental.clientcompressionengines'
135 config = 'experimental.clientcompressionengines'
146
136
147 # No explicit config. Filter out the ones that aren't supposed to be
137 # No explicit config. Filter out the ones that aren't supposed to be
148 # advertised and return default ordering.
138 # advertised and return default ordering.
149 if not configengines:
139 if not configengines:
150 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
140 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
151 return [e for e in compengines
141 return [e for e in compengines
152 if getattr(e.wireprotosupport(), attr) > 0]
142 if getattr(e.wireprotosupport(), attr) > 0]
153
143
154 # If compression engines are listed in the config, assume there is a good
144 # If compression engines are listed in the config, assume there is a good
155 # reason for it (like server operators wanting to achieve specific
145 # reason for it (like server operators wanting to achieve specific
156 # performance characteristics). So fail fast if the config references
146 # performance characteristics). So fail fast if the config references
157 # unusable compression engines.
147 # unusable compression engines.
158 validnames = set(e.name() for e in compengines)
148 validnames = set(e.name() for e in compengines)
159 invalidnames = set(e for e in configengines if e not in validnames)
149 invalidnames = set(e for e in configengines if e not in validnames)
160 if invalidnames:
150 if invalidnames:
161 raise error.Abort(_('invalid compression engine defined in %s: %s') %
151 raise error.Abort(_('invalid compression engine defined in %s: %s') %
162 (config, ', '.join(sorted(invalidnames))))
152 (config, ', '.join(sorted(invalidnames))))
163
153
164 compengines = [e for e in compengines if e.name() in configengines]
154 compengines = [e for e in compengines if e.name() in configengines]
165 compengines = sorted(compengines,
155 compengines = sorted(compengines,
166 key=lambda e: configengines.index(e.name()))
156 key=lambda e: configengines.index(e.name()))
167
157
168 if not compengines:
158 if not compengines:
169 raise error.Abort(_('%s config option does not specify any known '
159 raise error.Abort(_('%s config option does not specify any known '
170 'compression engines') % config,
160 'compression engines') % config,
171 hint=_('usable compression engines: %s') %
161 hint=_('usable compression engines: %s') %
172 ', '.sorted(validnames))
162 ', '.sorted(validnames))
173
163
174 return compengines
164 return compengines
175
165
176 # For version 1 transports.
166 # For version 1 transports.
177 commands = wireprototypes.commanddict()
167 commands = wireprototypes.commanddict()
178
168
179 # For version 2 transports.
169 # For version 2 transports.
180 commandsv2 = wireprototypes.commanddict()
170 commandsv2 = wireprototypes.commanddict()
181
171
182 def wireprotocommand(name, args=None, permission='push'):
172 def wireprotocommand(name, args=None, permission='push'):
183 """Decorator to declare a wire protocol command.
173 """Decorator to declare a wire protocol command.
184
174
185 ``name`` is the name of the wire protocol command being provided.
175 ``name`` is the name of the wire protocol command being provided.
186
176
187 ``args`` defines the named arguments accepted by the command. It is
177 ``args`` defines the named arguments accepted by the command. It is
188 a space-delimited list of argument names. ``*`` denotes a special value
178 a space-delimited list of argument names. ``*`` denotes a special value
189 that says to accept all named arguments.
179 that says to accept all named arguments.
190
180
191 ``permission`` defines the permission type needed to run this command.
181 ``permission`` defines the permission type needed to run this command.
192 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
182 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
193 respectively. Default is to assume command requires ``push`` permissions
183 respectively. Default is to assume command requires ``push`` permissions
194 because otherwise commands not declaring their permissions could modify
184 because otherwise commands not declaring their permissions could modify
195 a repository that is supposed to be read-only.
185 a repository that is supposed to be read-only.
196 """
186 """
197 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
187 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
198 if v['version'] == 1}
188 if v['version'] == 1}
199
189
200 # Because SSHv2 is a mirror of SSHv1, we allow "batch" commands through to
190 # Because SSHv2 is a mirror of SSHv1, we allow "batch" commands through to
201 # SSHv2.
191 # SSHv2.
202 # TODO undo this hack when SSH is using the unified frame protocol.
192 # TODO undo this hack when SSH is using the unified frame protocol.
203 if name == b'batch':
193 if name == b'batch':
204 transports.add(wireprototypes.SSHV2)
194 transports.add(wireprototypes.SSHV2)
205
195
206 if permission not in ('push', 'pull'):
196 if permission not in ('push', 'pull'):
207 raise error.ProgrammingError('invalid wire protocol permission; '
197 raise error.ProgrammingError('invalid wire protocol permission; '
208 'got %s; expected "push" or "pull"' %
198 'got %s; expected "push" or "pull"' %
209 permission)
199 permission)
210
200
211 if args is None:
201 if args is None:
212 args = ''
202 args = ''
213
203
214 if not isinstance(args, bytes):
204 if not isinstance(args, bytes):
215 raise error.ProgrammingError('arguments for version 1 commands '
205 raise error.ProgrammingError('arguments for version 1 commands '
216 'must be declared as bytes')
206 'must be declared as bytes')
217
207
218 def register(func):
208 def register(func):
219 if name in commands:
209 if name in commands:
220 raise error.ProgrammingError('%s command already registered '
210 raise error.ProgrammingError('%s command already registered '
221 'for version 1' % name)
211 'for version 1' % name)
222 commands[name] = wireprototypes.commandentry(
212 commands[name] = wireprototypes.commandentry(
223 func, args=args, transports=transports, permission=permission)
213 func, args=args, transports=transports, permission=permission)
224
214
225 return func
215 return func
226 return register
216 return register
227
217
228 # TODO define a more appropriate permissions type to use for this.
218 # TODO define a more appropriate permissions type to use for this.
229 @wireprotocommand('batch', 'cmds *', permission='pull')
219 @wireprotocommand('batch', 'cmds *', permission='pull')
230 def batch(repo, proto, cmds, others):
220 def batch(repo, proto, cmds, others):
231 unescapearg = wireprototypes.unescapebatcharg
221 unescapearg = wireprototypes.unescapebatcharg
232 repo = repo.filtered("served")
222 repo = repo.filtered("served")
233 res = []
223 res = []
234 for pair in cmds.split(';'):
224 for pair in cmds.split(';'):
235 op, args = pair.split(' ', 1)
225 op, args = pair.split(' ', 1)
236 vals = {}
226 vals = {}
237 for a in args.split(','):
227 for a in args.split(','):
238 if a:
228 if a:
239 n, v = a.split('=')
229 n, v = a.split('=')
240 vals[unescapearg(n)] = unescapearg(v)
230 vals[unescapearg(n)] = unescapearg(v)
241 func, spec = commands[op]
231 func, spec = commands[op]
242
232
243 # Validate that client has permissions to perform this command.
233 # Validate that client has permissions to perform this command.
244 perm = commands[op].permission
234 perm = commands[op].permission
245 assert perm in ('push', 'pull')
235 assert perm in ('push', 'pull')
246 proto.checkperm(perm)
236 proto.checkperm(perm)
247
237
248 if spec:
238 if spec:
249 keys = spec.split()
239 keys = spec.split()
250 data = {}
240 data = {}
251 for k in keys:
241 for k in keys:
252 if k == '*':
242 if k == '*':
253 star = {}
243 star = {}
254 for key in vals.keys():
244 for key in vals.keys():
255 if key not in keys:
245 if key not in keys:
256 star[key] = vals[key]
246 star[key] = vals[key]
257 data['*'] = star
247 data['*'] = star
258 else:
248 else:
259 data[k] = vals[k]
249 data[k] = vals[k]
260 result = func(repo, proto, *[data[k] for k in keys])
250 result = func(repo, proto, *[data[k] for k in keys])
261 else:
251 else:
262 result = func(repo, proto)
252 result = func(repo, proto)
263 if isinstance(result, wireprototypes.ooberror):
253 if isinstance(result, wireprototypes.ooberror):
264 return result
254 return result
265
255
266 # For now, all batchable commands must return bytesresponse or
256 # For now, all batchable commands must return bytesresponse or
267 # raw bytes (for backwards compatibility).
257 # raw bytes (for backwards compatibility).
268 assert isinstance(result, (wireprototypes.bytesresponse, bytes))
258 assert isinstance(result, (wireprototypes.bytesresponse, bytes))
269 if isinstance(result, wireprototypes.bytesresponse):
259 if isinstance(result, wireprototypes.bytesresponse):
270 result = result.data
260 result = result.data
271 res.append(wireprototypes.escapebatcharg(result))
261 res.append(wireprototypes.escapebatcharg(result))
272
262
273 return wireprototypes.bytesresponse(';'.join(res))
263 return wireprototypes.bytesresponse(';'.join(res))
274
264
275 @wireprotocommand('between', 'pairs', permission='pull')
265 @wireprotocommand('between', 'pairs', permission='pull')
276 def between(repo, proto, pairs):
266 def between(repo, proto, pairs):
277 pairs = [wireprototypes.decodelist(p, '-') for p in pairs.split(" ")]
267 pairs = [wireprototypes.decodelist(p, '-') for p in pairs.split(" ")]
278 r = []
268 r = []
279 for b in repo.between(pairs):
269 for b in repo.between(pairs):
280 r.append(wireprototypes.encodelist(b) + "\n")
270 r.append(wireprototypes.encodelist(b) + "\n")
281
271
282 return wireprototypes.bytesresponse(''.join(r))
272 return wireprototypes.bytesresponse(''.join(r))
283
273
284 @wireprotocommand('branchmap', permission='pull')
274 @wireprotocommand('branchmap', permission='pull')
285 def branchmap(repo, proto):
275 def branchmap(repo, proto):
286 branchmap = repo.branchmap()
276 branchmap = repo.branchmap()
287 heads = []
277 heads = []
288 for branch, nodes in branchmap.iteritems():
278 for branch, nodes in branchmap.iteritems():
289 branchname = urlreq.quote(encoding.fromlocal(branch))
279 branchname = urlreq.quote(encoding.fromlocal(branch))
290 branchnodes = wireprototypes.encodelist(nodes)
280 branchnodes = wireprototypes.encodelist(nodes)
291 heads.append('%s %s' % (branchname, branchnodes))
281 heads.append('%s %s' % (branchname, branchnodes))
292
282
293 return wireprototypes.bytesresponse('\n'.join(heads))
283 return wireprototypes.bytesresponse('\n'.join(heads))
294
284
295 @wireprotocommand('branches', 'nodes', permission='pull')
285 @wireprotocommand('branches', 'nodes', permission='pull')
296 def branches(repo, proto, nodes):
286 def branches(repo, proto, nodes):
297 nodes = wireprototypes.decodelist(nodes)
287 nodes = wireprototypes.decodelist(nodes)
298 r = []
288 r = []
299 for b in repo.branches(nodes):
289 for b in repo.branches(nodes):
300 r.append(wireprototypes.encodelist(b) + "\n")
290 r.append(wireprototypes.encodelist(b) + "\n")
301
291
302 return wireprototypes.bytesresponse(''.join(r))
292 return wireprototypes.bytesresponse(''.join(r))
303
293
304 @wireprotocommand('clonebundles', '', permission='pull')
294 @wireprotocommand('clonebundles', '', permission='pull')
305 def clonebundles(repo, proto):
295 def clonebundles(repo, proto):
306 """Server command for returning info for available bundles to seed clones.
296 """Server command for returning info for available bundles to seed clones.
307
297
308 Clients will parse this response and determine what bundle to fetch.
298 Clients will parse this response and determine what bundle to fetch.
309
299
310 Extensions may wrap this command to filter or dynamically emit data
300 Extensions may wrap this command to filter or dynamically emit data
311 depending on the request. e.g. you could advertise URLs for the closest
301 depending on the request. e.g. you could advertise URLs for the closest
312 data center given the client's IP address.
302 data center given the client's IP address.
313 """
303 """
314 return wireprototypes.bytesresponse(
304 return wireprototypes.bytesresponse(
315 repo.vfs.tryread('clonebundles.manifest'))
305 repo.vfs.tryread('clonebundles.manifest'))
316
306
317 wireprotocaps = ['lookup', 'branchmap', 'pushkey',
307 wireprotocaps = ['lookup', 'branchmap', 'pushkey',
318 'known', 'getbundle', 'unbundlehash']
308 'known', 'getbundle', 'unbundlehash']
319
309
320 def _capabilities(repo, proto):
310 def _capabilities(repo, proto):
321 """return a list of capabilities for a repo
311 """return a list of capabilities for a repo
322
312
323 This function exists to allow extensions to easily wrap capabilities
313 This function exists to allow extensions to easily wrap capabilities
324 computation
314 computation
325
315
326 - returns a lists: easy to alter
316 - returns a lists: easy to alter
327 - change done here will be propagated to both `capabilities` and `hello`
317 - change done here will be propagated to both `capabilities` and `hello`
328 command without any other action needed.
318 command without any other action needed.
329 """
319 """
330 # copy to prevent modification of the global list
320 # copy to prevent modification of the global list
331 caps = list(wireprotocaps)
321 caps = list(wireprotocaps)
332
322
333 # Command of same name as capability isn't exposed to version 1 of
323 # Command of same name as capability isn't exposed to version 1 of
334 # transports. So conditionally add it.
324 # transports. So conditionally add it.
335 if commands.commandavailable('changegroupsubset', proto):
325 if commands.commandavailable('changegroupsubset', proto):
336 caps.append('changegroupsubset')
326 caps.append('changegroupsubset')
337
327
338 if streamclone.allowservergeneration(repo):
328 if streamclone.allowservergeneration(repo):
339 if repo.ui.configbool('server', 'preferuncompressed'):
329 if repo.ui.configbool('server', 'preferuncompressed'):
340 caps.append('stream-preferred')
330 caps.append('stream-preferred')
341 requiredformats = repo.requirements & repo.supportedformats
331 requiredformats = repo.requirements & repo.supportedformats
342 # if our local revlogs are just revlogv1, add 'stream' cap
332 # if our local revlogs are just revlogv1, add 'stream' cap
343 if not requiredformats - {'revlogv1'}:
333 if not requiredformats - {'revlogv1'}:
344 caps.append('stream')
334 caps.append('stream')
345 # otherwise, add 'streamreqs' detailing our local revlog format
335 # otherwise, add 'streamreqs' detailing our local revlog format
346 else:
336 else:
347 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
337 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
348 if repo.ui.configbool('experimental', 'bundle2-advertise'):
338 if repo.ui.configbool('experimental', 'bundle2-advertise'):
349 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
339 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
350 caps.append('bundle2=' + urlreq.quote(capsblob))
340 caps.append('bundle2=' + urlreq.quote(capsblob))
351 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
341 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
352
342
353 return proto.addcapabilities(repo, caps)
343 return proto.addcapabilities(repo, caps)
354
344
355 # If you are writing an extension and consider wrapping this function. Wrap
345 # If you are writing an extension and consider wrapping this function. Wrap
356 # `_capabilities` instead.
346 # `_capabilities` instead.
357 @wireprotocommand('capabilities', permission='pull')
347 @wireprotocommand('capabilities', permission='pull')
358 def capabilities(repo, proto):
348 def capabilities(repo, proto):
359 caps = _capabilities(repo, proto)
349 caps = _capabilities(repo, proto)
360 return wireprototypes.bytesresponse(' '.join(sorted(caps)))
350 return wireprototypes.bytesresponse(' '.join(sorted(caps)))
361
351
362 @wireprotocommand('changegroup', 'roots', permission='pull')
352 @wireprotocommand('changegroup', 'roots', permission='pull')
363 def changegroup(repo, proto, roots):
353 def changegroup(repo, proto, roots):
364 nodes = wireprototypes.decodelist(roots)
354 nodes = wireprototypes.decodelist(roots)
365 outgoing = discovery.outgoing(repo, missingroots=nodes,
355 outgoing = discovery.outgoing(repo, missingroots=nodes,
366 missingheads=repo.heads())
356 missingheads=repo.heads())
367 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
357 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
368 gen = iter(lambda: cg.read(32768), '')
358 gen = iter(lambda: cg.read(32768), '')
369 return wireprototypes.streamres(gen=gen)
359 return wireprototypes.streamres(gen=gen)
370
360
371 @wireprotocommand('changegroupsubset', 'bases heads',
361 @wireprotocommand('changegroupsubset', 'bases heads',
372 permission='pull')
362 permission='pull')
373 def changegroupsubset(repo, proto, bases, heads):
363 def changegroupsubset(repo, proto, bases, heads):
374 bases = wireprototypes.decodelist(bases)
364 bases = wireprototypes.decodelist(bases)
375 heads = wireprototypes.decodelist(heads)
365 heads = wireprototypes.decodelist(heads)
376 outgoing = discovery.outgoing(repo, missingroots=bases,
366 outgoing = discovery.outgoing(repo, missingroots=bases,
377 missingheads=heads)
367 missingheads=heads)
378 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
368 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
379 gen = iter(lambda: cg.read(32768), '')
369 gen = iter(lambda: cg.read(32768), '')
380 return wireprototypes.streamres(gen=gen)
370 return wireprototypes.streamres(gen=gen)
381
371
382 @wireprotocommand('debugwireargs', 'one two *',
372 @wireprotocommand('debugwireargs', 'one two *',
383 permission='pull')
373 permission='pull')
384 def debugwireargs(repo, proto, one, two, others):
374 def debugwireargs(repo, proto, one, two, others):
385 # only accept optional args from the known set
375 # only accept optional args from the known set
386 opts = options('debugwireargs', ['three', 'four'], others)
376 opts = options('debugwireargs', ['three', 'four'], others)
387 return wireprototypes.bytesresponse(repo.debugwireargs(
377 return wireprototypes.bytesresponse(repo.debugwireargs(
388 one, two, **pycompat.strkwargs(opts)))
378 one, two, **pycompat.strkwargs(opts)))
389
379
390 def find_pullbundle(repo, proto, opts, clheads, heads, common):
380 def find_pullbundle(repo, proto, opts, clheads, heads, common):
391 """Return a file object for the first matching pullbundle.
381 """Return a file object for the first matching pullbundle.
392
382
393 Pullbundles are specified in .hg/pullbundles.manifest similar to
383 Pullbundles are specified in .hg/pullbundles.manifest similar to
394 clonebundles.
384 clonebundles.
395 For each entry, the bundle specification is checked for compatibility:
385 For each entry, the bundle specification is checked for compatibility:
396 - Client features vs the BUNDLESPEC.
386 - Client features vs the BUNDLESPEC.
397 - Revisions shared with the clients vs base revisions of the bundle.
387 - Revisions shared with the clients vs base revisions of the bundle.
398 A bundle can be applied only if all its base revisions are known by
388 A bundle can be applied only if all its base revisions are known by
399 the client.
389 the client.
400 - At least one leaf of the bundle's DAG is missing on the client.
390 - At least one leaf of the bundle's DAG is missing on the client.
401 - Every leaf of the bundle's DAG is part of node set the client wants.
391 - Every leaf of the bundle's DAG is part of node set the client wants.
402 E.g. do not send a bundle of all changes if the client wants only
392 E.g. do not send a bundle of all changes if the client wants only
403 one specific branch of many.
393 one specific branch of many.
404 """
394 """
405 def decodehexstring(s):
395 def decodehexstring(s):
406 return set([h.decode('hex') for h in s.split(';')])
396 return set([h.decode('hex') for h in s.split(';')])
407
397
408 manifest = repo.vfs.tryread('pullbundles.manifest')
398 manifest = repo.vfs.tryread('pullbundles.manifest')
409 if not manifest:
399 if not manifest:
410 return None
400 return None
411 res = exchange.parseclonebundlesmanifest(repo, manifest)
401 res = exchange.parseclonebundlesmanifest(repo, manifest)
412 res = exchange.filterclonebundleentries(repo, res)
402 res = exchange.filterclonebundleentries(repo, res)
413 if not res:
403 if not res:
414 return None
404 return None
415 cl = repo.changelog
405 cl = repo.changelog
416 heads_anc = cl.ancestors([cl.rev(rev) for rev in heads], inclusive=True)
406 heads_anc = cl.ancestors([cl.rev(rev) for rev in heads], inclusive=True)
417 common_anc = cl.ancestors([cl.rev(rev) for rev in common], inclusive=True)
407 common_anc = cl.ancestors([cl.rev(rev) for rev in common], inclusive=True)
418 compformats = clientcompressionsupport(proto)
408 compformats = clientcompressionsupport(proto)
419 for entry in res:
409 for entry in res:
420 if 'COMPRESSION' in entry and entry['COMPRESSION'] not in compformats:
410 if 'COMPRESSION' in entry and entry['COMPRESSION'] not in compformats:
421 continue
411 continue
422 # No test yet for VERSION, since V2 is supported by any client
412 # No test yet for VERSION, since V2 is supported by any client
423 # that advertises partial pulls
413 # that advertises partial pulls
424 if 'heads' in entry:
414 if 'heads' in entry:
425 try:
415 try:
426 bundle_heads = decodehexstring(entry['heads'])
416 bundle_heads = decodehexstring(entry['heads'])
427 except TypeError:
417 except TypeError:
428 # Bad heads entry
418 # Bad heads entry
429 continue
419 continue
430 if bundle_heads.issubset(common):
420 if bundle_heads.issubset(common):
431 continue # Nothing new
421 continue # Nothing new
432 if all(cl.rev(rev) in common_anc for rev in bundle_heads):
422 if all(cl.rev(rev) in common_anc for rev in bundle_heads):
433 continue # Still nothing new
423 continue # Still nothing new
434 if any(cl.rev(rev) not in heads_anc and
424 if any(cl.rev(rev) not in heads_anc and
435 cl.rev(rev) not in common_anc for rev in bundle_heads):
425 cl.rev(rev) not in common_anc for rev in bundle_heads):
436 continue
426 continue
437 if 'bases' in entry:
427 if 'bases' in entry:
438 try:
428 try:
439 bundle_bases = decodehexstring(entry['bases'])
429 bundle_bases = decodehexstring(entry['bases'])
440 except TypeError:
430 except TypeError:
441 # Bad bases entry
431 # Bad bases entry
442 continue
432 continue
443 if not all(cl.rev(rev) in common_anc for rev in bundle_bases):
433 if not all(cl.rev(rev) in common_anc for rev in bundle_bases):
444 continue
434 continue
445 path = entry['URL']
435 path = entry['URL']
446 repo.ui.debug('sending pullbundle "%s"\n' % path)
436 repo.ui.debug('sending pullbundle "%s"\n' % path)
447 try:
437 try:
448 return repo.vfs.open(path)
438 return repo.vfs.open(path)
449 except IOError:
439 except IOError:
450 repo.ui.debug('pullbundle "%s" not accessible\n' % path)
440 repo.ui.debug('pullbundle "%s" not accessible\n' % path)
451 continue
441 continue
452 return None
442 return None
453
443
454 @wireprotocommand('getbundle', '*', permission='pull')
444 @wireprotocommand('getbundle', '*', permission='pull')
455 def getbundle(repo, proto, others):
445 def getbundle(repo, proto, others):
456 opts = options('getbundle', wireprototypes.GETBUNDLE_ARGUMENTS.keys(),
446 opts = options('getbundle', wireprototypes.GETBUNDLE_ARGUMENTS.keys(),
457 others)
447 others)
458 for k, v in opts.iteritems():
448 for k, v in opts.iteritems():
459 keytype = wireprototypes.GETBUNDLE_ARGUMENTS[k]
449 keytype = wireprototypes.GETBUNDLE_ARGUMENTS[k]
460 if keytype == 'nodes':
450 if keytype == 'nodes':
461 opts[k] = wireprototypes.decodelist(v)
451 opts[k] = wireprototypes.decodelist(v)
462 elif keytype == 'csv':
452 elif keytype == 'csv':
463 opts[k] = list(v.split(','))
453 opts[k] = list(v.split(','))
464 elif keytype == 'scsv':
454 elif keytype == 'scsv':
465 opts[k] = set(v.split(','))
455 opts[k] = set(v.split(','))
466 elif keytype == 'boolean':
456 elif keytype == 'boolean':
467 # Client should serialize False as '0', which is a non-empty string
457 # Client should serialize False as '0', which is a non-empty string
468 # so it evaluates as a True bool.
458 # so it evaluates as a True bool.
469 if v == '0':
459 if v == '0':
470 opts[k] = False
460 opts[k] = False
471 else:
461 else:
472 opts[k] = bool(v)
462 opts[k] = bool(v)
473 elif keytype != 'plain':
463 elif keytype != 'plain':
474 raise KeyError('unknown getbundle option type %s'
464 raise KeyError('unknown getbundle option type %s'
475 % keytype)
465 % keytype)
476
466
477 if not bundle1allowed(repo, 'pull'):
467 if not bundle1allowed(repo, 'pull'):
478 if not exchange.bundle2requested(opts.get('bundlecaps')):
468 if not exchange.bundle2requested(opts.get('bundlecaps')):
479 if proto.name == 'http-v1':
469 if proto.name == 'http-v1':
480 return wireprototypes.ooberror(bundle2required)
470 return wireprototypes.ooberror(bundle2required)
481 raise error.Abort(bundle2requiredmain,
471 raise error.Abort(bundle2requiredmain,
482 hint=bundle2requiredhint)
472 hint=bundle2requiredhint)
483
473
484 prefercompressed = True
474 prefercompressed = True
485
475
486 try:
476 try:
487 clheads = set(repo.changelog.heads())
477 clheads = set(repo.changelog.heads())
488 heads = set(opts.get('heads', set()))
478 heads = set(opts.get('heads', set()))
489 common = set(opts.get('common', set()))
479 common = set(opts.get('common', set()))
490 common.discard(nullid)
480 common.discard(nullid)
491 if (repo.ui.configbool('server', 'pullbundle') and
481 if (repo.ui.configbool('server', 'pullbundle') and
492 'partial-pull' in proto.getprotocaps()):
482 'partial-pull' in proto.getprotocaps()):
493 # Check if a pre-built bundle covers this request.
483 # Check if a pre-built bundle covers this request.
494 bundle = find_pullbundle(repo, proto, opts, clheads, heads, common)
484 bundle = find_pullbundle(repo, proto, opts, clheads, heads, common)
495 if bundle:
485 if bundle:
496 return wireprototypes.streamres(gen=util.filechunkiter(bundle),
486 return wireprototypes.streamres(gen=util.filechunkiter(bundle),
497 prefer_uncompressed=True)
487 prefer_uncompressed=True)
498
488
499 if repo.ui.configbool('server', 'disablefullbundle'):
489 if repo.ui.configbool('server', 'disablefullbundle'):
500 # Check to see if this is a full clone.
490 # Check to see if this is a full clone.
501 changegroup = opts.get('cg', True)
491 changegroup = opts.get('cg', True)
502 if changegroup and not common and clheads == heads:
492 if changegroup and not common and clheads == heads:
503 raise error.Abort(
493 raise error.Abort(
504 _('server has pull-based clones disabled'),
494 _('server has pull-based clones disabled'),
505 hint=_('remove --pull if specified or upgrade Mercurial'))
495 hint=_('remove --pull if specified or upgrade Mercurial'))
506
496
507 info, chunks = exchange.getbundlechunks(repo, 'serve',
497 info, chunks = exchange.getbundlechunks(repo, 'serve',
508 **pycompat.strkwargs(opts))
498 **pycompat.strkwargs(opts))
509 prefercompressed = info.get('prefercompressed', True)
499 prefercompressed = info.get('prefercompressed', True)
510 except error.Abort as exc:
500 except error.Abort as exc:
511 # cleanly forward Abort error to the client
501 # cleanly forward Abort error to the client
512 if not exchange.bundle2requested(opts.get('bundlecaps')):
502 if not exchange.bundle2requested(opts.get('bundlecaps')):
513 if proto.name == 'http-v1':
503 if proto.name == 'http-v1':
514 return wireprototypes.ooberror(pycompat.bytestr(exc) + '\n')
504 return wireprototypes.ooberror(pycompat.bytestr(exc) + '\n')
515 raise # cannot do better for bundle1 + ssh
505 raise # cannot do better for bundle1 + ssh
516 # bundle2 request expect a bundle2 reply
506 # bundle2 request expect a bundle2 reply
517 bundler = bundle2.bundle20(repo.ui)
507 bundler = bundle2.bundle20(repo.ui)
518 manargs = [('message', pycompat.bytestr(exc))]
508 manargs = [('message', pycompat.bytestr(exc))]
519 advargs = []
509 advargs = []
520 if exc.hint is not None:
510 if exc.hint is not None:
521 advargs.append(('hint', exc.hint))
511 advargs.append(('hint', exc.hint))
522 bundler.addpart(bundle2.bundlepart('error:abort',
512 bundler.addpart(bundle2.bundlepart('error:abort',
523 manargs, advargs))
513 manargs, advargs))
524 chunks = bundler.getchunks()
514 chunks = bundler.getchunks()
525 prefercompressed = False
515 prefercompressed = False
526
516
527 return wireprototypes.streamres(
517 return wireprototypes.streamres(
528 gen=chunks, prefer_uncompressed=not prefercompressed)
518 gen=chunks, prefer_uncompressed=not prefercompressed)
529
519
530 @wireprotocommand('heads', permission='pull')
520 @wireprotocommand('heads', permission='pull')
531 def heads(repo, proto):
521 def heads(repo, proto):
532 h = repo.heads()
522 h = repo.heads()
533 return wireprototypes.bytesresponse(wireprototypes.encodelist(h) + '\n')
523 return wireprototypes.bytesresponse(wireprototypes.encodelist(h) + '\n')
534
524
535 @wireprotocommand('hello', permission='pull')
525 @wireprotocommand('hello', permission='pull')
536 def hello(repo, proto):
526 def hello(repo, proto):
537 """Called as part of SSH handshake to obtain server info.
527 """Called as part of SSH handshake to obtain server info.
538
528
539 Returns a list of lines describing interesting things about the
529 Returns a list of lines describing interesting things about the
540 server, in an RFC822-like format.
530 server, in an RFC822-like format.
541
531
542 Currently, the only one defined is ``capabilities``, which consists of a
532 Currently, the only one defined is ``capabilities``, which consists of a
543 line of space separated tokens describing server abilities:
533 line of space separated tokens describing server abilities:
544
534
545 capabilities: <token0> <token1> <token2>
535 capabilities: <token0> <token1> <token2>
546 """
536 """
547 caps = capabilities(repo, proto).data
537 caps = capabilities(repo, proto).data
548 return wireprototypes.bytesresponse('capabilities: %s\n' % caps)
538 return wireprototypes.bytesresponse('capabilities: %s\n' % caps)
549
539
550 @wireprotocommand('listkeys', 'namespace', permission='pull')
540 @wireprotocommand('listkeys', 'namespace', permission='pull')
551 def listkeys(repo, proto, namespace):
541 def listkeys(repo, proto, namespace):
552 d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
542 d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
553 return wireprototypes.bytesresponse(pushkeymod.encodekeys(d))
543 return wireprototypes.bytesresponse(pushkeymod.encodekeys(d))
554
544
555 @wireprotocommand('lookup', 'key', permission='pull')
545 @wireprotocommand('lookup', 'key', permission='pull')
556 def lookup(repo, proto, key):
546 def lookup(repo, proto, key):
557 try:
547 try:
558 k = encoding.tolocal(key)
548 k = encoding.tolocal(key)
559 n = repo.lookup(k)
549 n = repo.lookup(k)
560 r = hex(n)
550 r = hex(n)
561 success = 1
551 success = 1
562 except Exception as inst:
552 except Exception as inst:
563 r = stringutil.forcebytestr(inst)
553 r = stringutil.forcebytestr(inst)
564 success = 0
554 success = 0
565 return wireprototypes.bytesresponse('%d %s\n' % (success, r))
555 return wireprototypes.bytesresponse('%d %s\n' % (success, r))
566
556
567 @wireprotocommand('known', 'nodes *', permission='pull')
557 @wireprotocommand('known', 'nodes *', permission='pull')
568 def known(repo, proto, nodes, others):
558 def known(repo, proto, nodes, others):
569 v = ''.join(b and '1' or '0'
559 v = ''.join(b and '1' or '0'
570 for b in repo.known(wireprototypes.decodelist(nodes)))
560 for b in repo.known(wireprototypes.decodelist(nodes)))
571 return wireprototypes.bytesresponse(v)
561 return wireprototypes.bytesresponse(v)
572
562
573 @wireprotocommand('protocaps', 'caps', permission='pull')
563 @wireprotocommand('protocaps', 'caps', permission='pull')
574 def protocaps(repo, proto, caps):
564 def protocaps(repo, proto, caps):
575 if proto.name == wireprototypes.SSHV1:
565 if proto.name == wireprototypes.SSHV1:
576 proto._protocaps = set(caps.split(' '))
566 proto._protocaps = set(caps.split(' '))
577 return wireprototypes.bytesresponse('OK')
567 return wireprototypes.bytesresponse('OK')
578
568
579 @wireprotocommand('pushkey', 'namespace key old new', permission='push')
569 @wireprotocommand('pushkey', 'namespace key old new', permission='push')
580 def pushkey(repo, proto, namespace, key, old, new):
570 def pushkey(repo, proto, namespace, key, old, new):
581 # compatibility with pre-1.8 clients which were accidentally
571 # compatibility with pre-1.8 clients which were accidentally
582 # sending raw binary nodes rather than utf-8-encoded hex
572 # sending raw binary nodes rather than utf-8-encoded hex
583 if len(new) == 20 and stringutil.escapestr(new) != new:
573 if len(new) == 20 and stringutil.escapestr(new) != new:
584 # looks like it could be a binary node
574 # looks like it could be a binary node
585 try:
575 try:
586 new.decode('utf-8')
576 new.decode('utf-8')
587 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
577 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
588 except UnicodeDecodeError:
578 except UnicodeDecodeError:
589 pass # binary, leave unmodified
579 pass # binary, leave unmodified
590 else:
580 else:
591 new = encoding.tolocal(new) # normal path
581 new = encoding.tolocal(new) # normal path
592
582
593 with proto.mayberedirectstdio() as output:
583 with proto.mayberedirectstdio() as output:
594 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
584 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
595 encoding.tolocal(old), new) or False
585 encoding.tolocal(old), new) or False
596
586
597 output = output.getvalue() if output else ''
587 output = output.getvalue() if output else ''
598 return wireprototypes.bytesresponse('%d\n%s' % (int(r), output))
588 return wireprototypes.bytesresponse('%d\n%s' % (int(r), output))
599
589
600 @wireprotocommand('stream_out', permission='pull')
590 @wireprotocommand('stream_out', permission='pull')
601 def stream(repo, proto):
591 def stream(repo, proto):
602 '''If the server supports streaming clone, it advertises the "stream"
592 '''If the server supports streaming clone, it advertises the "stream"
603 capability with a value representing the version and flags of the repo
593 capability with a value representing the version and flags of the repo
604 it is serving. Client checks to see if it understands the format.
594 it is serving. Client checks to see if it understands the format.
605 '''
595 '''
606 return wireprototypes.streamreslegacy(
596 return wireprototypes.streamreslegacy(
607 streamclone.generatev1wireproto(repo))
597 streamclone.generatev1wireproto(repo))
608
598
609 @wireprotocommand('unbundle', 'heads', permission='push')
599 @wireprotocommand('unbundle', 'heads', permission='push')
610 def unbundle(repo, proto, heads):
600 def unbundle(repo, proto, heads):
611 their_heads = wireprototypes.decodelist(heads)
601 their_heads = wireprototypes.decodelist(heads)
612
602
613 with proto.mayberedirectstdio() as output:
603 with proto.mayberedirectstdio() as output:
614 try:
604 try:
615 exchange.check_heads(repo, their_heads, 'preparing changes')
605 exchange.check_heads(repo, their_heads, 'preparing changes')
616 cleanup = lambda: None
606 cleanup = lambda: None
617 try:
607 try:
618 payload = proto.getpayload()
608 payload = proto.getpayload()
619 if repo.ui.configbool('server', 'streamunbundle'):
609 if repo.ui.configbool('server', 'streamunbundle'):
620 def cleanup():
610 def cleanup():
621 # Ensure that the full payload is consumed, so
611 # Ensure that the full payload is consumed, so
622 # that the connection doesn't contain trailing garbage.
612 # that the connection doesn't contain trailing garbage.
623 for p in payload:
613 for p in payload:
624 pass
614 pass
625 fp = util.chunkbuffer(payload)
615 fp = util.chunkbuffer(payload)
626 else:
616 else:
627 # write bundle data to temporary file as it can be big
617 # write bundle data to temporary file as it can be big
628 fp, tempname = None, None
618 fp, tempname = None, None
629 def cleanup():
619 def cleanup():
630 if fp:
620 if fp:
631 fp.close()
621 fp.close()
632 if tempname:
622 if tempname:
633 os.unlink(tempname)
623 os.unlink(tempname)
634 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
624 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
635 repo.ui.debug('redirecting incoming bundle to %s\n' %
625 repo.ui.debug('redirecting incoming bundle to %s\n' %
636 tempname)
626 tempname)
637 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
627 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
638 r = 0
628 r = 0
639 for p in payload:
629 for p in payload:
640 fp.write(p)
630 fp.write(p)
641 fp.seek(0)
631 fp.seek(0)
642
632
643 gen = exchange.readbundle(repo.ui, fp, None)
633 gen = exchange.readbundle(repo.ui, fp, None)
644 if (isinstance(gen, changegroupmod.cg1unpacker)
634 if (isinstance(gen, changegroupmod.cg1unpacker)
645 and not bundle1allowed(repo, 'push')):
635 and not bundle1allowed(repo, 'push')):
646 if proto.name == 'http-v1':
636 if proto.name == 'http-v1':
647 # need to special case http because stderr do not get to
637 # need to special case http because stderr do not get to
648 # the http client on failed push so we need to abuse
638 # the http client on failed push so we need to abuse
649 # some other error type to make sure the message get to
639 # some other error type to make sure the message get to
650 # the user.
640 # the user.
651 return wireprototypes.ooberror(bundle2required)
641 return wireprototypes.ooberror(bundle2required)
652 raise error.Abort(bundle2requiredmain,
642 raise error.Abort(bundle2requiredmain,
653 hint=bundle2requiredhint)
643 hint=bundle2requiredhint)
654
644
655 r = exchange.unbundle(repo, gen, their_heads, 'serve',
645 r = exchange.unbundle(repo, gen, their_heads, 'serve',
656 proto.client())
646 proto.client())
657 if util.safehasattr(r, 'addpart'):
647 if util.safehasattr(r, 'addpart'):
658 # The return looks streamable, we are in the bundle2 case
648 # The return looks streamable, we are in the bundle2 case
659 # and should return a stream.
649 # and should return a stream.
660 return wireprototypes.streamreslegacy(gen=r.getchunks())
650 return wireprototypes.streamreslegacy(gen=r.getchunks())
661 return wireprototypes.pushres(
651 return wireprototypes.pushres(
662 r, output.getvalue() if output else '')
652 r, output.getvalue() if output else '')
663
653
664 finally:
654 finally:
665 cleanup()
655 cleanup()
666
656
667 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
657 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
668 # handle non-bundle2 case first
658 # handle non-bundle2 case first
669 if not getattr(exc, 'duringunbundle2', False):
659 if not getattr(exc, 'duringunbundle2', False):
670 try:
660 try:
671 raise
661 raise
672 except error.Abort:
662 except error.Abort:
673 # The old code we moved used procutil.stderr directly.
663 # The old code we moved used procutil.stderr directly.
674 # We did not change it to minimise code change.
664 # We did not change it to minimise code change.
675 # This need to be moved to something proper.
665 # This need to be moved to something proper.
676 # Feel free to do it.
666 # Feel free to do it.
677 procutil.stderr.write("abort: %s\n" % exc)
667 procutil.stderr.write("abort: %s\n" % exc)
678 if exc.hint is not None:
668 if exc.hint is not None:
679 procutil.stderr.write("(%s)\n" % exc.hint)
669 procutil.stderr.write("(%s)\n" % exc.hint)
680 procutil.stderr.flush()
670 procutil.stderr.flush()
681 return wireprototypes.pushres(
671 return wireprototypes.pushres(
682 0, output.getvalue() if output else '')
672 0, output.getvalue() if output else '')
683 except error.PushRaced:
673 except error.PushRaced:
684 return wireprototypes.pusherr(
674 return wireprototypes.pusherr(
685 pycompat.bytestr(exc),
675 pycompat.bytestr(exc),
686 output.getvalue() if output else '')
676 output.getvalue() if output else '')
687
677
688 bundler = bundle2.bundle20(repo.ui)
678 bundler = bundle2.bundle20(repo.ui)
689 for out in getattr(exc, '_bundle2salvagedoutput', ()):
679 for out in getattr(exc, '_bundle2salvagedoutput', ()):
690 bundler.addpart(out)
680 bundler.addpart(out)
691 try:
681 try:
692 try:
682 try:
693 raise
683 raise
694 except error.PushkeyFailed as exc:
684 except error.PushkeyFailed as exc:
695 # check client caps
685 # check client caps
696 remotecaps = getattr(exc, '_replycaps', None)
686 remotecaps = getattr(exc, '_replycaps', None)
697 if (remotecaps is not None
687 if (remotecaps is not None
698 and 'pushkey' not in remotecaps.get('error', ())):
688 and 'pushkey' not in remotecaps.get('error', ())):
699 # no support remote side, fallback to Abort handler.
689 # no support remote side, fallback to Abort handler.
700 raise
690 raise
701 part = bundler.newpart('error:pushkey')
691 part = bundler.newpart('error:pushkey')
702 part.addparam('in-reply-to', exc.partid)
692 part.addparam('in-reply-to', exc.partid)
703 if exc.namespace is not None:
693 if exc.namespace is not None:
704 part.addparam('namespace', exc.namespace,
694 part.addparam('namespace', exc.namespace,
705 mandatory=False)
695 mandatory=False)
706 if exc.key is not None:
696 if exc.key is not None:
707 part.addparam('key', exc.key, mandatory=False)
697 part.addparam('key', exc.key, mandatory=False)
708 if exc.new is not None:
698 if exc.new is not None:
709 part.addparam('new', exc.new, mandatory=False)
699 part.addparam('new', exc.new, mandatory=False)
710 if exc.old is not None:
700 if exc.old is not None:
711 part.addparam('old', exc.old, mandatory=False)
701 part.addparam('old', exc.old, mandatory=False)
712 if exc.ret is not None:
702 if exc.ret is not None:
713 part.addparam('ret', exc.ret, mandatory=False)
703 part.addparam('ret', exc.ret, mandatory=False)
714 except error.BundleValueError as exc:
704 except error.BundleValueError as exc:
715 errpart = bundler.newpart('error:unsupportedcontent')
705 errpart = bundler.newpart('error:unsupportedcontent')
716 if exc.parttype is not None:
706 if exc.parttype is not None:
717 errpart.addparam('parttype', exc.parttype)
707 errpart.addparam('parttype', exc.parttype)
718 if exc.params:
708 if exc.params:
719 errpart.addparam('params', '\0'.join(exc.params))
709 errpart.addparam('params', '\0'.join(exc.params))
720 except error.Abort as exc:
710 except error.Abort as exc:
721 manargs = [('message', stringutil.forcebytestr(exc))]
711 manargs = [('message', stringutil.forcebytestr(exc))]
722 advargs = []
712 advargs = []
723 if exc.hint is not None:
713 if exc.hint is not None:
724 advargs.append(('hint', exc.hint))
714 advargs.append(('hint', exc.hint))
725 bundler.addpart(bundle2.bundlepart('error:abort',
715 bundler.addpart(bundle2.bundlepart('error:abort',
726 manargs, advargs))
716 manargs, advargs))
727 except error.PushRaced as exc:
717 except error.PushRaced as exc:
728 bundler.newpart('error:pushraced',
718 bundler.newpart('error:pushraced',
729 [('message', stringutil.forcebytestr(exc))])
719 [('message', stringutil.forcebytestr(exc))])
730 return wireprototypes.streamreslegacy(gen=bundler.getchunks())
720 return wireprototypes.streamreslegacy(gen=bundler.getchunks())
@@ -1,522 +1,533 b''
1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 #
3 #
4 # This software may be used and distributed according to the terms of the
4 # This software may be used and distributed according to the terms of the
5 # GNU General Public License version 2 or any later version.
5 # GNU General Public License version 2 or any later version.
6
6
7 from __future__ import absolute_import
7 from __future__ import absolute_import
8
8
9 import contextlib
9 import contextlib
10
10
11 from .i18n import _
11 from .i18n import _
12 from .thirdparty import (
12 from .thirdparty import (
13 cbor,
13 cbor,
14 )
14 )
15 from .thirdparty.zope import (
15 from .thirdparty.zope import (
16 interface as zi,
16 interface as zi,
17 )
17 )
18 from . import (
18 from . import (
19 encoding,
19 encoding,
20 error,
20 error,
21 pycompat,
21 pycompat,
22 streamclone,
22 streamclone,
23 util,
23 util,
24 wireproto,
24 wireproto,
25 wireprotoframing,
25 wireprotoframing,
26 wireprototypes,
26 wireprototypes,
27 )
27 )
28
28
29 FRAMINGTYPE = b'application/mercurial-exp-framing-0005'
29 FRAMINGTYPE = b'application/mercurial-exp-framing-0005'
30
30
31 HTTP_WIREPROTO_V2 = wireprototypes.HTTP_WIREPROTO_V2
31 HTTP_WIREPROTO_V2 = wireprototypes.HTTP_WIREPROTO_V2
32
32
33 def handlehttpv2request(rctx, req, res, checkperm, urlparts):
33 def handlehttpv2request(rctx, req, res, checkperm, urlparts):
34 from .hgweb import common as hgwebcommon
34 from .hgweb import common as hgwebcommon
35
35
36 # URL space looks like: <permissions>/<command>, where <permission> can
36 # URL space looks like: <permissions>/<command>, where <permission> can
37 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
37 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
38
38
39 # Root URL does nothing meaningful... yet.
39 # Root URL does nothing meaningful... yet.
40 if not urlparts:
40 if not urlparts:
41 res.status = b'200 OK'
41 res.status = b'200 OK'
42 res.headers[b'Content-Type'] = b'text/plain'
42 res.headers[b'Content-Type'] = b'text/plain'
43 res.setbodybytes(_('HTTP version 2 API handler'))
43 res.setbodybytes(_('HTTP version 2 API handler'))
44 return
44 return
45
45
46 if len(urlparts) == 1:
46 if len(urlparts) == 1:
47 res.status = b'404 Not Found'
47 res.status = b'404 Not Found'
48 res.headers[b'Content-Type'] = b'text/plain'
48 res.headers[b'Content-Type'] = b'text/plain'
49 res.setbodybytes(_('do not know how to process %s\n') %
49 res.setbodybytes(_('do not know how to process %s\n') %
50 req.dispatchpath)
50 req.dispatchpath)
51 return
51 return
52
52
53 permission, command = urlparts[0:2]
53 permission, command = urlparts[0:2]
54
54
55 if permission not in (b'ro', b'rw'):
55 if permission not in (b'ro', b'rw'):
56 res.status = b'404 Not Found'
56 res.status = b'404 Not Found'
57 res.headers[b'Content-Type'] = b'text/plain'
57 res.headers[b'Content-Type'] = b'text/plain'
58 res.setbodybytes(_('unknown permission: %s') % permission)
58 res.setbodybytes(_('unknown permission: %s') % permission)
59 return
59 return
60
60
61 if req.method != 'POST':
61 if req.method != 'POST':
62 res.status = b'405 Method Not Allowed'
62 res.status = b'405 Method Not Allowed'
63 res.headers[b'Allow'] = b'POST'
63 res.headers[b'Allow'] = b'POST'
64 res.setbodybytes(_('commands require POST requests'))
64 res.setbodybytes(_('commands require POST requests'))
65 return
65 return
66
66
67 # At some point we'll want to use our own API instead of recycling the
67 # At some point we'll want to use our own API instead of recycling the
68 # behavior of version 1 of the wire protocol...
68 # behavior of version 1 of the wire protocol...
69 # TODO return reasonable responses - not responses that overload the
69 # TODO return reasonable responses - not responses that overload the
70 # HTTP status line message for error reporting.
70 # HTTP status line message for error reporting.
71 try:
71 try:
72 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
72 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
73 except hgwebcommon.ErrorResponse as e:
73 except hgwebcommon.ErrorResponse as e:
74 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
74 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
75 for k, v in e.headers:
75 for k, v in e.headers:
76 res.headers[k] = v
76 res.headers[k] = v
77 res.setbodybytes('permission denied')
77 res.setbodybytes('permission denied')
78 return
78 return
79
79
80 # We have a special endpoint to reflect the request back at the client.
80 # We have a special endpoint to reflect the request back at the client.
81 if command == b'debugreflect':
81 if command == b'debugreflect':
82 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
82 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
83 return
83 return
84
84
85 # Extra commands that we handle that aren't really wire protocol
85 # Extra commands that we handle that aren't really wire protocol
86 # commands. Think extra hard before making this hackery available to
86 # commands. Think extra hard before making this hackery available to
87 # extension.
87 # extension.
88 extracommands = {'multirequest'}
88 extracommands = {'multirequest'}
89
89
90 if command not in wireproto.commandsv2 and command not in extracommands:
90 if command not in wireproto.commandsv2 and command not in extracommands:
91 res.status = b'404 Not Found'
91 res.status = b'404 Not Found'
92 res.headers[b'Content-Type'] = b'text/plain'
92 res.headers[b'Content-Type'] = b'text/plain'
93 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
93 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
94 return
94 return
95
95
96 repo = rctx.repo
96 repo = rctx.repo
97 ui = repo.ui
97 ui = repo.ui
98
98
99 proto = httpv2protocolhandler(req, ui)
99 proto = httpv2protocolhandler(req, ui)
100
100
101 if (not wireproto.commandsv2.commandavailable(command, proto)
101 if (not wireproto.commandsv2.commandavailable(command, proto)
102 and command not in extracommands):
102 and command not in extracommands):
103 res.status = b'404 Not Found'
103 res.status = b'404 Not Found'
104 res.headers[b'Content-Type'] = b'text/plain'
104 res.headers[b'Content-Type'] = b'text/plain'
105 res.setbodybytes(_('invalid wire protocol command: %s') % command)
105 res.setbodybytes(_('invalid wire protocol command: %s') % command)
106 return
106 return
107
107
108 # TODO consider cases where proxies may add additional Accept headers.
108 # TODO consider cases where proxies may add additional Accept headers.
109 if req.headers.get(b'Accept') != FRAMINGTYPE:
109 if req.headers.get(b'Accept') != FRAMINGTYPE:
110 res.status = b'406 Not Acceptable'
110 res.status = b'406 Not Acceptable'
111 res.headers[b'Content-Type'] = b'text/plain'
111 res.headers[b'Content-Type'] = b'text/plain'
112 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
112 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
113 % FRAMINGTYPE)
113 % FRAMINGTYPE)
114 return
114 return
115
115
116 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
116 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
117 res.status = b'415 Unsupported Media Type'
117 res.status = b'415 Unsupported Media Type'
118 # TODO we should send a response with appropriate media type,
118 # TODO we should send a response with appropriate media type,
119 # since client does Accept it.
119 # since client does Accept it.
120 res.headers[b'Content-Type'] = b'text/plain'
120 res.headers[b'Content-Type'] = b'text/plain'
121 res.setbodybytes(_('client MUST send Content-Type header with '
121 res.setbodybytes(_('client MUST send Content-Type header with '
122 'value: %s\n') % FRAMINGTYPE)
122 'value: %s\n') % FRAMINGTYPE)
123 return
123 return
124
124
125 _processhttpv2request(ui, repo, req, res, permission, command, proto)
125 _processhttpv2request(ui, repo, req, res, permission, command, proto)
126
126
127 def _processhttpv2reflectrequest(ui, repo, req, res):
127 def _processhttpv2reflectrequest(ui, repo, req, res):
128 """Reads unified frame protocol request and dumps out state to client.
128 """Reads unified frame protocol request and dumps out state to client.
129
129
130 This special endpoint can be used to help debug the wire protocol.
130 This special endpoint can be used to help debug the wire protocol.
131
131
132 Instead of routing the request through the normal dispatch mechanism,
132 Instead of routing the request through the normal dispatch mechanism,
133 we instead read all frames, decode them, and feed them into our state
133 we instead read all frames, decode them, and feed them into our state
134 tracker. We then dump the log of all that activity back out to the
134 tracker. We then dump the log of all that activity back out to the
135 client.
135 client.
136 """
136 """
137 import json
137 import json
138
138
139 # Reflection APIs have a history of being abused, accidentally disclosing
139 # Reflection APIs have a history of being abused, accidentally disclosing
140 # sensitive data, etc. So we have a config knob.
140 # sensitive data, etc. So we have a config knob.
141 if not ui.configbool('experimental', 'web.api.debugreflect'):
141 if not ui.configbool('experimental', 'web.api.debugreflect'):
142 res.status = b'404 Not Found'
142 res.status = b'404 Not Found'
143 res.headers[b'Content-Type'] = b'text/plain'
143 res.headers[b'Content-Type'] = b'text/plain'
144 res.setbodybytes(_('debugreflect service not available'))
144 res.setbodybytes(_('debugreflect service not available'))
145 return
145 return
146
146
147 # We assume we have a unified framing protocol request body.
147 # We assume we have a unified framing protocol request body.
148
148
149 reactor = wireprotoframing.serverreactor()
149 reactor = wireprotoframing.serverreactor()
150 states = []
150 states = []
151
151
152 while True:
152 while True:
153 frame = wireprotoframing.readframe(req.bodyfh)
153 frame = wireprotoframing.readframe(req.bodyfh)
154
154
155 if not frame:
155 if not frame:
156 states.append(b'received: <no frame>')
156 states.append(b'received: <no frame>')
157 break
157 break
158
158
159 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
159 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
160 frame.requestid,
160 frame.requestid,
161 frame.payload))
161 frame.payload))
162
162
163 action, meta = reactor.onframerecv(frame)
163 action, meta = reactor.onframerecv(frame)
164 states.append(json.dumps((action, meta), sort_keys=True,
164 states.append(json.dumps((action, meta), sort_keys=True,
165 separators=(', ', ': ')))
165 separators=(', ', ': ')))
166
166
167 action, meta = reactor.oninputeof()
167 action, meta = reactor.oninputeof()
168 meta['action'] = action
168 meta['action'] = action
169 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
169 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
170
170
171 res.status = b'200 OK'
171 res.status = b'200 OK'
172 res.headers[b'Content-Type'] = b'text/plain'
172 res.headers[b'Content-Type'] = b'text/plain'
173 res.setbodybytes(b'\n'.join(states))
173 res.setbodybytes(b'\n'.join(states))
174
174
175 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
175 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
176 """Post-validation handler for HTTPv2 requests.
176 """Post-validation handler for HTTPv2 requests.
177
177
178 Called when the HTTP request contains unified frame-based protocol
178 Called when the HTTP request contains unified frame-based protocol
179 frames for evaluation.
179 frames for evaluation.
180 """
180 """
181 # TODO Some HTTP clients are full duplex and can receive data before
181 # TODO Some HTTP clients are full duplex and can receive data before
182 # the entire request is transmitted. Figure out a way to indicate support
182 # the entire request is transmitted. Figure out a way to indicate support
183 # for that so we can opt into full duplex mode.
183 # for that so we can opt into full duplex mode.
184 reactor = wireprotoframing.serverreactor(deferoutput=True)
184 reactor = wireprotoframing.serverreactor(deferoutput=True)
185 seencommand = False
185 seencommand = False
186
186
187 outstream = reactor.makeoutputstream()
187 outstream = reactor.makeoutputstream()
188
188
189 while True:
189 while True:
190 frame = wireprotoframing.readframe(req.bodyfh)
190 frame = wireprotoframing.readframe(req.bodyfh)
191 if not frame:
191 if not frame:
192 break
192 break
193
193
194 action, meta = reactor.onframerecv(frame)
194 action, meta = reactor.onframerecv(frame)
195
195
196 if action == 'wantframe':
196 if action == 'wantframe':
197 # Need more data before we can do anything.
197 # Need more data before we can do anything.
198 continue
198 continue
199 elif action == 'runcommand':
199 elif action == 'runcommand':
200 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
200 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
201 reqcommand, reactor, outstream,
201 reqcommand, reactor, outstream,
202 meta, issubsequent=seencommand)
202 meta, issubsequent=seencommand)
203
203
204 if sentoutput:
204 if sentoutput:
205 return
205 return
206
206
207 seencommand = True
207 seencommand = True
208
208
209 elif action == 'error':
209 elif action == 'error':
210 # TODO define proper error mechanism.
210 # TODO define proper error mechanism.
211 res.status = b'200 OK'
211 res.status = b'200 OK'
212 res.headers[b'Content-Type'] = b'text/plain'
212 res.headers[b'Content-Type'] = b'text/plain'
213 res.setbodybytes(meta['message'] + b'\n')
213 res.setbodybytes(meta['message'] + b'\n')
214 return
214 return
215 else:
215 else:
216 raise error.ProgrammingError(
216 raise error.ProgrammingError(
217 'unhandled action from frame processor: %s' % action)
217 'unhandled action from frame processor: %s' % action)
218
218
219 action, meta = reactor.oninputeof()
219 action, meta = reactor.oninputeof()
220 if action == 'sendframes':
220 if action == 'sendframes':
221 # We assume we haven't started sending the response yet. If we're
221 # We assume we haven't started sending the response yet. If we're
222 # wrong, the response type will raise an exception.
222 # wrong, the response type will raise an exception.
223 res.status = b'200 OK'
223 res.status = b'200 OK'
224 res.headers[b'Content-Type'] = FRAMINGTYPE
224 res.headers[b'Content-Type'] = FRAMINGTYPE
225 res.setbodygen(meta['framegen'])
225 res.setbodygen(meta['framegen'])
226 elif action == 'noop':
226 elif action == 'noop':
227 pass
227 pass
228 else:
228 else:
229 raise error.ProgrammingError('unhandled action from frame processor: %s'
229 raise error.ProgrammingError('unhandled action from frame processor: %s'
230 % action)
230 % action)
231
231
232 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
232 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
233 outstream, command, issubsequent):
233 outstream, command, issubsequent):
234 """Dispatch a wire protocol command made from HTTPv2 requests.
234 """Dispatch a wire protocol command made from HTTPv2 requests.
235
235
236 The authenticated permission (``authedperm``) along with the original
236 The authenticated permission (``authedperm``) along with the original
237 command from the URL (``reqcommand``) are passed in.
237 command from the URL (``reqcommand``) are passed in.
238 """
238 """
239 # We already validated that the session has permissions to perform the
239 # We already validated that the session has permissions to perform the
240 # actions in ``authedperm``. In the unified frame protocol, the canonical
240 # actions in ``authedperm``. In the unified frame protocol, the canonical
241 # command to run is expressed in a frame. However, the URL also requested
241 # command to run is expressed in a frame. However, the URL also requested
242 # to run a specific command. We need to be careful that the command we
242 # to run a specific command. We need to be careful that the command we
243 # run doesn't have permissions requirements greater than what was granted
243 # run doesn't have permissions requirements greater than what was granted
244 # by ``authedperm``.
244 # by ``authedperm``.
245 #
245 #
246 # Our rule for this is we only allow one command per HTTP request and
246 # Our rule for this is we only allow one command per HTTP request and
247 # that command must match the command in the URL. However, we make
247 # that command must match the command in the URL. However, we make
248 # an exception for the ``multirequest`` URL. This URL is allowed to
248 # an exception for the ``multirequest`` URL. This URL is allowed to
249 # execute multiple commands. We double check permissions of each command
249 # execute multiple commands. We double check permissions of each command
250 # as it is invoked to ensure there is no privilege escalation.
250 # as it is invoked to ensure there is no privilege escalation.
251 # TODO consider allowing multiple commands to regular command URLs
251 # TODO consider allowing multiple commands to regular command URLs
252 # iff each command is the same.
252 # iff each command is the same.
253
253
254 proto = httpv2protocolhandler(req, ui, args=command['args'])
254 proto = httpv2protocolhandler(req, ui, args=command['args'])
255
255
256 if reqcommand == b'multirequest':
256 if reqcommand == b'multirequest':
257 if not wireproto.commandsv2.commandavailable(command['command'], proto):
257 if not wireproto.commandsv2.commandavailable(command['command'], proto):
258 # TODO proper error mechanism
258 # TODO proper error mechanism
259 res.status = b'200 OK'
259 res.status = b'200 OK'
260 res.headers[b'Content-Type'] = b'text/plain'
260 res.headers[b'Content-Type'] = b'text/plain'
261 res.setbodybytes(_('wire protocol command not available: %s') %
261 res.setbodybytes(_('wire protocol command not available: %s') %
262 command['command'])
262 command['command'])
263 return True
263 return True
264
264
265 # TODO don't use assert here, since it may be elided by -O.
265 # TODO don't use assert here, since it may be elided by -O.
266 assert authedperm in (b'ro', b'rw')
266 assert authedperm in (b'ro', b'rw')
267 wirecommand = wireproto.commandsv2[command['command']]
267 wirecommand = wireproto.commandsv2[command['command']]
268 assert wirecommand.permission in ('push', 'pull')
268 assert wirecommand.permission in ('push', 'pull')
269
269
270 if authedperm == b'ro' and wirecommand.permission != 'pull':
270 if authedperm == b'ro' and wirecommand.permission != 'pull':
271 # TODO proper error mechanism
271 # TODO proper error mechanism
272 res.status = b'403 Forbidden'
272 res.status = b'403 Forbidden'
273 res.headers[b'Content-Type'] = b'text/plain'
273 res.headers[b'Content-Type'] = b'text/plain'
274 res.setbodybytes(_('insufficient permissions to execute '
274 res.setbodybytes(_('insufficient permissions to execute '
275 'command: %s') % command['command'])
275 'command: %s') % command['command'])
276 return True
276 return True
277
277
278 # TODO should we also call checkperm() here? Maybe not if we're going
278 # TODO should we also call checkperm() here? Maybe not if we're going
279 # to overhaul that API. The granted scope from the URL check should
279 # to overhaul that API. The granted scope from the URL check should
280 # be good enough.
280 # be good enough.
281
281
282 else:
282 else:
283 # Don't allow multiple commands outside of ``multirequest`` URL.
283 # Don't allow multiple commands outside of ``multirequest`` URL.
284 if issubsequent:
284 if issubsequent:
285 # TODO proper error mechanism
285 # TODO proper error mechanism
286 res.status = b'200 OK'
286 res.status = b'200 OK'
287 res.headers[b'Content-Type'] = b'text/plain'
287 res.headers[b'Content-Type'] = b'text/plain'
288 res.setbodybytes(_('multiple commands cannot be issued to this '
288 res.setbodybytes(_('multiple commands cannot be issued to this '
289 'URL'))
289 'URL'))
290 return True
290 return True
291
291
292 if reqcommand != command['command']:
292 if reqcommand != command['command']:
293 # TODO define proper error mechanism
293 # TODO define proper error mechanism
294 res.status = b'200 OK'
294 res.status = b'200 OK'
295 res.headers[b'Content-Type'] = b'text/plain'
295 res.headers[b'Content-Type'] = b'text/plain'
296 res.setbodybytes(_('command in frame must match command in URL'))
296 res.setbodybytes(_('command in frame must match command in URL'))
297 return True
297 return True
298
298
299 rsp = wireproto.dispatch(repo, proto, command['command'])
299 rsp = dispatch(repo, proto, command['command'])
300
300
301 res.status = b'200 OK'
301 res.status = b'200 OK'
302 res.headers[b'Content-Type'] = FRAMINGTYPE
302 res.headers[b'Content-Type'] = FRAMINGTYPE
303
303
304 if isinstance(rsp, wireprototypes.cborresponse):
304 if isinstance(rsp, wireprototypes.cborresponse):
305 encoded = cbor.dumps(rsp.value, canonical=True)
305 encoded = cbor.dumps(rsp.value, canonical=True)
306 action, meta = reactor.oncommandresponseready(outstream,
306 action, meta = reactor.oncommandresponseready(outstream,
307 command['requestid'],
307 command['requestid'],
308 encoded)
308 encoded)
309 elif isinstance(rsp, wireprototypes.v2streamingresponse):
309 elif isinstance(rsp, wireprototypes.v2streamingresponse):
310 action, meta = reactor.oncommandresponsereadygen(outstream,
310 action, meta = reactor.oncommandresponsereadygen(outstream,
311 command['requestid'],
311 command['requestid'],
312 rsp.gen)
312 rsp.gen)
313 elif isinstance(rsp, wireprototypes.v2errorresponse):
313 elif isinstance(rsp, wireprototypes.v2errorresponse):
314 action, meta = reactor.oncommanderror(outstream,
314 action, meta = reactor.oncommanderror(outstream,
315 command['requestid'],
315 command['requestid'],
316 rsp.message,
316 rsp.message,
317 rsp.args)
317 rsp.args)
318 else:
318 else:
319 action, meta = reactor.onservererror(
319 action, meta = reactor.onservererror(
320 _('unhandled response type from wire proto command'))
320 _('unhandled response type from wire proto command'))
321
321
322 if action == 'sendframes':
322 if action == 'sendframes':
323 res.setbodygen(meta['framegen'])
323 res.setbodygen(meta['framegen'])
324 return True
324 return True
325 elif action == 'noop':
325 elif action == 'noop':
326 return False
326 return False
327 else:
327 else:
328 raise error.ProgrammingError('unhandled event from reactor: %s' %
328 raise error.ProgrammingError('unhandled event from reactor: %s' %
329 action)
329 action)
330
330
331 def getdispatchrepo(repo, proto, command):
332 return repo.filtered('served')
333
334 def dispatch(repo, proto, command):
335 repo = getdispatchrepo(repo, proto, command)
336
337 func, spec = wireproto.commandsv2[command]
338 args = proto.getargs(spec)
339
340 return func(repo, proto, **args)
341
331 @zi.implementer(wireprototypes.baseprotocolhandler)
342 @zi.implementer(wireprototypes.baseprotocolhandler)
332 class httpv2protocolhandler(object):
343 class httpv2protocolhandler(object):
333 def __init__(self, req, ui, args=None):
344 def __init__(self, req, ui, args=None):
334 self._req = req
345 self._req = req
335 self._ui = ui
346 self._ui = ui
336 self._args = args
347 self._args = args
337
348
338 @property
349 @property
339 def name(self):
350 def name(self):
340 return HTTP_WIREPROTO_V2
351 return HTTP_WIREPROTO_V2
341
352
342 def getargs(self, args):
353 def getargs(self, args):
343 data = {}
354 data = {}
344 for k, typ in args.items():
355 for k, typ in args.items():
345 if k == '*':
356 if k == '*':
346 raise NotImplementedError('do not support * args')
357 raise NotImplementedError('do not support * args')
347 elif k in self._args:
358 elif k in self._args:
348 # TODO consider validating value types.
359 # TODO consider validating value types.
349 data[k] = self._args[k]
360 data[k] = self._args[k]
350
361
351 return data
362 return data
352
363
353 def getprotocaps(self):
364 def getprotocaps(self):
354 # Protocol capabilities are currently not implemented for HTTP V2.
365 # Protocol capabilities are currently not implemented for HTTP V2.
355 return set()
366 return set()
356
367
357 def getpayload(self):
368 def getpayload(self):
358 raise NotImplementedError
369 raise NotImplementedError
359
370
360 @contextlib.contextmanager
371 @contextlib.contextmanager
361 def mayberedirectstdio(self):
372 def mayberedirectstdio(self):
362 raise NotImplementedError
373 raise NotImplementedError
363
374
364 def client(self):
375 def client(self):
365 raise NotImplementedError
376 raise NotImplementedError
366
377
367 def addcapabilities(self, repo, caps):
378 def addcapabilities(self, repo, caps):
368 return caps
379 return caps
369
380
370 def checkperm(self, perm):
381 def checkperm(self, perm):
371 raise NotImplementedError
382 raise NotImplementedError
372
383
373 def httpv2apidescriptor(req, repo):
384 def httpv2apidescriptor(req, repo):
374 proto = httpv2protocolhandler(req, repo.ui)
385 proto = httpv2protocolhandler(req, repo.ui)
375
386
376 return _capabilitiesv2(repo, proto)
387 return _capabilitiesv2(repo, proto)
377
388
378 def _capabilitiesv2(repo, proto):
389 def _capabilitiesv2(repo, proto):
379 """Obtain the set of capabilities for version 2 transports.
390 """Obtain the set of capabilities for version 2 transports.
380
391
381 These capabilities are distinct from the capabilities for version 1
392 These capabilities are distinct from the capabilities for version 1
382 transports.
393 transports.
383 """
394 """
384 compression = []
395 compression = []
385 for engine in wireproto.supportedcompengines(repo.ui, util.SERVERROLE):
396 for engine in wireproto.supportedcompengines(repo.ui, util.SERVERROLE):
386 compression.append({
397 compression.append({
387 b'name': engine.wireprotosupport().name,
398 b'name': engine.wireprotosupport().name,
388 })
399 })
389
400
390 caps = {
401 caps = {
391 'commands': {},
402 'commands': {},
392 'compression': compression,
403 'compression': compression,
393 'framingmediatypes': [FRAMINGTYPE],
404 'framingmediatypes': [FRAMINGTYPE],
394 }
405 }
395
406
396 for command, entry in wireproto.commandsv2.items():
407 for command, entry in wireproto.commandsv2.items():
397 caps['commands'][command] = {
408 caps['commands'][command] = {
398 'args': entry.args,
409 'args': entry.args,
399 'permissions': [entry.permission],
410 'permissions': [entry.permission],
400 }
411 }
401
412
402 if streamclone.allowservergeneration(repo):
413 if streamclone.allowservergeneration(repo):
403 caps['rawrepoformats'] = sorted(repo.requirements &
414 caps['rawrepoformats'] = sorted(repo.requirements &
404 repo.supportedformats)
415 repo.supportedformats)
405
416
406 return proto.addcapabilities(repo, caps)
417 return proto.addcapabilities(repo, caps)
407
418
408 def wireprotocommand(name, args=None, permission='push'):
419 def wireprotocommand(name, args=None, permission='push'):
409 """Decorator to declare a wire protocol command.
420 """Decorator to declare a wire protocol command.
410
421
411 ``name`` is the name of the wire protocol command being provided.
422 ``name`` is the name of the wire protocol command being provided.
412
423
413 ``args`` is a dict of argument names to example values.
424 ``args`` is a dict of argument names to example values.
414
425
415 ``permission`` defines the permission type needed to run this command.
426 ``permission`` defines the permission type needed to run this command.
416 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
427 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
417 respectively. Default is to assume command requires ``push`` permissions
428 respectively. Default is to assume command requires ``push`` permissions
418 because otherwise commands not declaring their permissions could modify
429 because otherwise commands not declaring their permissions could modify
419 a repository that is supposed to be read-only.
430 a repository that is supposed to be read-only.
420 """
431 """
421 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
432 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
422 if v['version'] == 2}
433 if v['version'] == 2}
423
434
424 if permission not in ('push', 'pull'):
435 if permission not in ('push', 'pull'):
425 raise error.ProgrammingError('invalid wire protocol permission; '
436 raise error.ProgrammingError('invalid wire protocol permission; '
426 'got %s; expected "push" or "pull"' %
437 'got %s; expected "push" or "pull"' %
427 permission)
438 permission)
428
439
429 if args is None:
440 if args is None:
430 args = {}
441 args = {}
431
442
432 if not isinstance(args, dict):
443 if not isinstance(args, dict):
433 raise error.ProgrammingError('arguments for version 2 commands '
444 raise error.ProgrammingError('arguments for version 2 commands '
434 'must be declared as dicts')
445 'must be declared as dicts')
435
446
436 def register(func):
447 def register(func):
437 if name in wireproto.commandsv2:
448 if name in wireproto.commandsv2:
438 raise error.ProgrammingError('%s command already registered '
449 raise error.ProgrammingError('%s command already registered '
439 'for version 2' % name)
450 'for version 2' % name)
440
451
441 wireproto.commandsv2[name] = wireprototypes.commandentry(
452 wireproto.commandsv2[name] = wireprototypes.commandentry(
442 func, args=args, transports=transports, permission=permission)
453 func, args=args, transports=transports, permission=permission)
443
454
444 return func
455 return func
445
456
446 return register
457 return register
447
458
448 @wireprotocommand('branchmap', permission='pull')
459 @wireprotocommand('branchmap', permission='pull')
449 def branchmapv2(repo, proto):
460 def branchmapv2(repo, proto):
450 branchmap = {encoding.fromlocal(k): v
461 branchmap = {encoding.fromlocal(k): v
451 for k, v in repo.branchmap().iteritems()}
462 for k, v in repo.branchmap().iteritems()}
452
463
453 return wireprototypes.cborresponse(branchmap)
464 return wireprototypes.cborresponse(branchmap)
454
465
455 @wireprotocommand('capabilities', permission='pull')
466 @wireprotocommand('capabilities', permission='pull')
456 def capabilitiesv2(repo, proto):
467 def capabilitiesv2(repo, proto):
457 caps = _capabilitiesv2(repo, proto)
468 caps = _capabilitiesv2(repo, proto)
458
469
459 return wireprototypes.cborresponse(caps)
470 return wireprototypes.cborresponse(caps)
460
471
461 @wireprotocommand('heads',
472 @wireprotocommand('heads',
462 args={
473 args={
463 'publiconly': False,
474 'publiconly': False,
464 },
475 },
465 permission='pull')
476 permission='pull')
466 def headsv2(repo, proto, publiconly=False):
477 def headsv2(repo, proto, publiconly=False):
467 if publiconly:
478 if publiconly:
468 repo = repo.filtered('immutable')
479 repo = repo.filtered('immutable')
469
480
470 return wireprototypes.cborresponse(repo.heads())
481 return wireprototypes.cborresponse(repo.heads())
471
482
472 @wireprotocommand('known',
483 @wireprotocommand('known',
473 args={
484 args={
474 'nodes': [b'deadbeef'],
485 'nodes': [b'deadbeef'],
475 },
486 },
476 permission='pull')
487 permission='pull')
477 def knownv2(repo, proto, nodes=None):
488 def knownv2(repo, proto, nodes=None):
478 nodes = nodes or []
489 nodes = nodes or []
479 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes))
490 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes))
480 return wireprototypes.cborresponse(result)
491 return wireprototypes.cborresponse(result)
481
492
482 @wireprotocommand('listkeys',
493 @wireprotocommand('listkeys',
483 args={
494 args={
484 'namespace': b'ns',
495 'namespace': b'ns',
485 },
496 },
486 permission='pull')
497 permission='pull')
487 def listkeysv2(repo, proto, namespace=None):
498 def listkeysv2(repo, proto, namespace=None):
488 keys = repo.listkeys(encoding.tolocal(namespace))
499 keys = repo.listkeys(encoding.tolocal(namespace))
489 keys = {encoding.fromlocal(k): encoding.fromlocal(v)
500 keys = {encoding.fromlocal(k): encoding.fromlocal(v)
490 for k, v in keys.iteritems()}
501 for k, v in keys.iteritems()}
491
502
492 return wireprototypes.cborresponse(keys)
503 return wireprototypes.cborresponse(keys)
493
504
494 @wireprotocommand('lookup',
505 @wireprotocommand('lookup',
495 args={
506 args={
496 'key': b'foo',
507 'key': b'foo',
497 },
508 },
498 permission='pull')
509 permission='pull')
499 def lookupv2(repo, proto, key):
510 def lookupv2(repo, proto, key):
500 key = encoding.tolocal(key)
511 key = encoding.tolocal(key)
501
512
502 # TODO handle exception.
513 # TODO handle exception.
503 node = repo.lookup(key)
514 node = repo.lookup(key)
504
515
505 return wireprototypes.cborresponse(node)
516 return wireprototypes.cborresponse(node)
506
517
507 @wireprotocommand('pushkey',
518 @wireprotocommand('pushkey',
508 args={
519 args={
509 'namespace': b'ns',
520 'namespace': b'ns',
510 'key': b'key',
521 'key': b'key',
511 'old': b'old',
522 'old': b'old',
512 'new': b'new',
523 'new': b'new',
513 },
524 },
514 permission='push')
525 permission='push')
515 def pushkeyv2(repo, proto, namespace, key, old, new):
526 def pushkeyv2(repo, proto, namespace, key, old, new):
516 # TODO handle ui output redirection
527 # TODO handle ui output redirection
517 r = repo.pushkey(encoding.tolocal(namespace),
528 r = repo.pushkey(encoding.tolocal(namespace),
518 encoding.tolocal(key),
529 encoding.tolocal(key),
519 encoding.tolocal(old),
530 encoding.tolocal(old),
520 encoding.tolocal(new))
531 encoding.tolocal(new))
521
532
522 return wireprototypes.cborresponse(r)
533 return wireprototypes.cborresponse(r)
General Comments 0
You need to be logged in to leave comments. Login now