##// END OF EJS Templates
wireprotov2server: let repo.narrowmatch(match) do matcher intersection...
Martin von Zweigbergk -
r40685:f83cea7f default
parent child Browse files
Show More
@@ -1,1460 +1,1460
1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 #
3 #
4 # This software may be used and distributed according to the terms of the
4 # This software may be used and distributed according to the terms of the
5 # GNU General Public License version 2 or any later version.
5 # GNU General Public License version 2 or any later version.
6
6
7 from __future__ import absolute_import
7 from __future__ import absolute_import
8
8
9 import collections
9 import collections
10 import contextlib
10 import contextlib
11 import hashlib
11 import hashlib
12
12
13 from .i18n import _
13 from .i18n import _
14 from .node import (
14 from .node import (
15 hex,
15 hex,
16 nullid,
16 nullid,
17 )
17 )
18 from . import (
18 from . import (
19 discovery,
19 discovery,
20 encoding,
20 encoding,
21 error,
21 error,
22 match as matchmod,
22 match as matchmod,
23 narrowspec,
23 narrowspec,
24 pycompat,
24 pycompat,
25 streamclone,
25 streamclone,
26 util,
26 util,
27 wireprotoframing,
27 wireprotoframing,
28 wireprototypes,
28 wireprototypes,
29 )
29 )
30 from .utils import (
30 from .utils import (
31 cborutil,
31 cborutil,
32 interfaceutil,
32 interfaceutil,
33 stringutil,
33 stringutil,
34 )
34 )
35
35
36 FRAMINGTYPE = b'application/mercurial-exp-framing-0006'
36 FRAMINGTYPE = b'application/mercurial-exp-framing-0006'
37
37
38 HTTP_WIREPROTO_V2 = wireprototypes.HTTP_WIREPROTO_V2
38 HTTP_WIREPROTO_V2 = wireprototypes.HTTP_WIREPROTO_V2
39
39
40 COMMANDS = wireprototypes.commanddict()
40 COMMANDS = wireprototypes.commanddict()
41
41
42 # Value inserted into cache key computation function. Change the value to
42 # Value inserted into cache key computation function. Change the value to
43 # force new cache keys for every command request. This should be done when
43 # force new cache keys for every command request. This should be done when
44 # there is a change to how caching works, etc.
44 # there is a change to how caching works, etc.
45 GLOBAL_CACHE_VERSION = 1
45 GLOBAL_CACHE_VERSION = 1
46
46
47 def handlehttpv2request(rctx, req, res, checkperm, urlparts):
47 def handlehttpv2request(rctx, req, res, checkperm, urlparts):
48 from .hgweb import common as hgwebcommon
48 from .hgweb import common as hgwebcommon
49
49
50 # URL space looks like: <permissions>/<command>, where <permission> can
50 # URL space looks like: <permissions>/<command>, where <permission> can
51 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
51 # be ``ro`` or ``rw`` to signal read-only or read-write, respectively.
52
52
53 # Root URL does nothing meaningful... yet.
53 # Root URL does nothing meaningful... yet.
54 if not urlparts:
54 if not urlparts:
55 res.status = b'200 OK'
55 res.status = b'200 OK'
56 res.headers[b'Content-Type'] = b'text/plain'
56 res.headers[b'Content-Type'] = b'text/plain'
57 res.setbodybytes(_('HTTP version 2 API handler'))
57 res.setbodybytes(_('HTTP version 2 API handler'))
58 return
58 return
59
59
60 if len(urlparts) == 1:
60 if len(urlparts) == 1:
61 res.status = b'404 Not Found'
61 res.status = b'404 Not Found'
62 res.headers[b'Content-Type'] = b'text/plain'
62 res.headers[b'Content-Type'] = b'text/plain'
63 res.setbodybytes(_('do not know how to process %s\n') %
63 res.setbodybytes(_('do not know how to process %s\n') %
64 req.dispatchpath)
64 req.dispatchpath)
65 return
65 return
66
66
67 permission, command = urlparts[0:2]
67 permission, command = urlparts[0:2]
68
68
69 if permission not in (b'ro', b'rw'):
69 if permission not in (b'ro', b'rw'):
70 res.status = b'404 Not Found'
70 res.status = b'404 Not Found'
71 res.headers[b'Content-Type'] = b'text/plain'
71 res.headers[b'Content-Type'] = b'text/plain'
72 res.setbodybytes(_('unknown permission: %s') % permission)
72 res.setbodybytes(_('unknown permission: %s') % permission)
73 return
73 return
74
74
75 if req.method != 'POST':
75 if req.method != 'POST':
76 res.status = b'405 Method Not Allowed'
76 res.status = b'405 Method Not Allowed'
77 res.headers[b'Allow'] = b'POST'
77 res.headers[b'Allow'] = b'POST'
78 res.setbodybytes(_('commands require POST requests'))
78 res.setbodybytes(_('commands require POST requests'))
79 return
79 return
80
80
81 # At some point we'll want to use our own API instead of recycling the
81 # At some point we'll want to use our own API instead of recycling the
82 # behavior of version 1 of the wire protocol...
82 # behavior of version 1 of the wire protocol...
83 # TODO return reasonable responses - not responses that overload the
83 # TODO return reasonable responses - not responses that overload the
84 # HTTP status line message for error reporting.
84 # HTTP status line message for error reporting.
85 try:
85 try:
86 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
86 checkperm(rctx, req, 'pull' if permission == b'ro' else 'push')
87 except hgwebcommon.ErrorResponse as e:
87 except hgwebcommon.ErrorResponse as e:
88 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
88 res.status = hgwebcommon.statusmessage(e.code, pycompat.bytestr(e))
89 for k, v in e.headers:
89 for k, v in e.headers:
90 res.headers[k] = v
90 res.headers[k] = v
91 res.setbodybytes('permission denied')
91 res.setbodybytes('permission denied')
92 return
92 return
93
93
94 # We have a special endpoint to reflect the request back at the client.
94 # We have a special endpoint to reflect the request back at the client.
95 if command == b'debugreflect':
95 if command == b'debugreflect':
96 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
96 _processhttpv2reflectrequest(rctx.repo.ui, rctx.repo, req, res)
97 return
97 return
98
98
99 # Extra commands that we handle that aren't really wire protocol
99 # Extra commands that we handle that aren't really wire protocol
100 # commands. Think extra hard before making this hackery available to
100 # commands. Think extra hard before making this hackery available to
101 # extension.
101 # extension.
102 extracommands = {'multirequest'}
102 extracommands = {'multirequest'}
103
103
104 if command not in COMMANDS and command not in extracommands:
104 if command not in COMMANDS and command not in extracommands:
105 res.status = b'404 Not Found'
105 res.status = b'404 Not Found'
106 res.headers[b'Content-Type'] = b'text/plain'
106 res.headers[b'Content-Type'] = b'text/plain'
107 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
107 res.setbodybytes(_('unknown wire protocol command: %s\n') % command)
108 return
108 return
109
109
110 repo = rctx.repo
110 repo = rctx.repo
111 ui = repo.ui
111 ui = repo.ui
112
112
113 proto = httpv2protocolhandler(req, ui)
113 proto = httpv2protocolhandler(req, ui)
114
114
115 if (not COMMANDS.commandavailable(command, proto)
115 if (not COMMANDS.commandavailable(command, proto)
116 and command not in extracommands):
116 and command not in extracommands):
117 res.status = b'404 Not Found'
117 res.status = b'404 Not Found'
118 res.headers[b'Content-Type'] = b'text/plain'
118 res.headers[b'Content-Type'] = b'text/plain'
119 res.setbodybytes(_('invalid wire protocol command: %s') % command)
119 res.setbodybytes(_('invalid wire protocol command: %s') % command)
120 return
120 return
121
121
122 # TODO consider cases where proxies may add additional Accept headers.
122 # TODO consider cases where proxies may add additional Accept headers.
123 if req.headers.get(b'Accept') != FRAMINGTYPE:
123 if req.headers.get(b'Accept') != FRAMINGTYPE:
124 res.status = b'406 Not Acceptable'
124 res.status = b'406 Not Acceptable'
125 res.headers[b'Content-Type'] = b'text/plain'
125 res.headers[b'Content-Type'] = b'text/plain'
126 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
126 res.setbodybytes(_('client MUST specify Accept header with value: %s\n')
127 % FRAMINGTYPE)
127 % FRAMINGTYPE)
128 return
128 return
129
129
130 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
130 if req.headers.get(b'Content-Type') != FRAMINGTYPE:
131 res.status = b'415 Unsupported Media Type'
131 res.status = b'415 Unsupported Media Type'
132 # TODO we should send a response with appropriate media type,
132 # TODO we should send a response with appropriate media type,
133 # since client does Accept it.
133 # since client does Accept it.
134 res.headers[b'Content-Type'] = b'text/plain'
134 res.headers[b'Content-Type'] = b'text/plain'
135 res.setbodybytes(_('client MUST send Content-Type header with '
135 res.setbodybytes(_('client MUST send Content-Type header with '
136 'value: %s\n') % FRAMINGTYPE)
136 'value: %s\n') % FRAMINGTYPE)
137 return
137 return
138
138
139 _processhttpv2request(ui, repo, req, res, permission, command, proto)
139 _processhttpv2request(ui, repo, req, res, permission, command, proto)
140
140
141 def _processhttpv2reflectrequest(ui, repo, req, res):
141 def _processhttpv2reflectrequest(ui, repo, req, res):
142 """Reads unified frame protocol request and dumps out state to client.
142 """Reads unified frame protocol request and dumps out state to client.
143
143
144 This special endpoint can be used to help debug the wire protocol.
144 This special endpoint can be used to help debug the wire protocol.
145
145
146 Instead of routing the request through the normal dispatch mechanism,
146 Instead of routing the request through the normal dispatch mechanism,
147 we instead read all frames, decode them, and feed them into our state
147 we instead read all frames, decode them, and feed them into our state
148 tracker. We then dump the log of all that activity back out to the
148 tracker. We then dump the log of all that activity back out to the
149 client.
149 client.
150 """
150 """
151 import json
151 import json
152
152
153 # Reflection APIs have a history of being abused, accidentally disclosing
153 # Reflection APIs have a history of being abused, accidentally disclosing
154 # sensitive data, etc. So we have a config knob.
154 # sensitive data, etc. So we have a config knob.
155 if not ui.configbool('experimental', 'web.api.debugreflect'):
155 if not ui.configbool('experimental', 'web.api.debugreflect'):
156 res.status = b'404 Not Found'
156 res.status = b'404 Not Found'
157 res.headers[b'Content-Type'] = b'text/plain'
157 res.headers[b'Content-Type'] = b'text/plain'
158 res.setbodybytes(_('debugreflect service not available'))
158 res.setbodybytes(_('debugreflect service not available'))
159 return
159 return
160
160
161 # We assume we have a unified framing protocol request body.
161 # We assume we have a unified framing protocol request body.
162
162
163 reactor = wireprotoframing.serverreactor(ui)
163 reactor = wireprotoframing.serverreactor(ui)
164 states = []
164 states = []
165
165
166 while True:
166 while True:
167 frame = wireprotoframing.readframe(req.bodyfh)
167 frame = wireprotoframing.readframe(req.bodyfh)
168
168
169 if not frame:
169 if not frame:
170 states.append(b'received: <no frame>')
170 states.append(b'received: <no frame>')
171 break
171 break
172
172
173 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
173 states.append(b'received: %d %d %d %s' % (frame.typeid, frame.flags,
174 frame.requestid,
174 frame.requestid,
175 frame.payload))
175 frame.payload))
176
176
177 action, meta = reactor.onframerecv(frame)
177 action, meta = reactor.onframerecv(frame)
178 states.append(json.dumps((action, meta), sort_keys=True,
178 states.append(json.dumps((action, meta), sort_keys=True,
179 separators=(', ', ': ')))
179 separators=(', ', ': ')))
180
180
181 action, meta = reactor.oninputeof()
181 action, meta = reactor.oninputeof()
182 meta['action'] = action
182 meta['action'] = action
183 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
183 states.append(json.dumps(meta, sort_keys=True, separators=(', ',': ')))
184
184
185 res.status = b'200 OK'
185 res.status = b'200 OK'
186 res.headers[b'Content-Type'] = b'text/plain'
186 res.headers[b'Content-Type'] = b'text/plain'
187 res.setbodybytes(b'\n'.join(states))
187 res.setbodybytes(b'\n'.join(states))
188
188
189 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
189 def _processhttpv2request(ui, repo, req, res, authedperm, reqcommand, proto):
190 """Post-validation handler for HTTPv2 requests.
190 """Post-validation handler for HTTPv2 requests.
191
191
192 Called when the HTTP request contains unified frame-based protocol
192 Called when the HTTP request contains unified frame-based protocol
193 frames for evaluation.
193 frames for evaluation.
194 """
194 """
195 # TODO Some HTTP clients are full duplex and can receive data before
195 # TODO Some HTTP clients are full duplex and can receive data before
196 # the entire request is transmitted. Figure out a way to indicate support
196 # the entire request is transmitted. Figure out a way to indicate support
197 # for that so we can opt into full duplex mode.
197 # for that so we can opt into full duplex mode.
198 reactor = wireprotoframing.serverreactor(ui, deferoutput=True)
198 reactor = wireprotoframing.serverreactor(ui, deferoutput=True)
199 seencommand = False
199 seencommand = False
200
200
201 outstream = None
201 outstream = None
202
202
203 while True:
203 while True:
204 frame = wireprotoframing.readframe(req.bodyfh)
204 frame = wireprotoframing.readframe(req.bodyfh)
205 if not frame:
205 if not frame:
206 break
206 break
207
207
208 action, meta = reactor.onframerecv(frame)
208 action, meta = reactor.onframerecv(frame)
209
209
210 if action == 'wantframe':
210 if action == 'wantframe':
211 # Need more data before we can do anything.
211 # Need more data before we can do anything.
212 continue
212 continue
213 elif action == 'runcommand':
213 elif action == 'runcommand':
214 # Defer creating output stream because we need to wait for
214 # Defer creating output stream because we need to wait for
215 # protocol settings frames so proper encoding can be applied.
215 # protocol settings frames so proper encoding can be applied.
216 if not outstream:
216 if not outstream:
217 outstream = reactor.makeoutputstream()
217 outstream = reactor.makeoutputstream()
218
218
219 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
219 sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
220 reqcommand, reactor, outstream,
220 reqcommand, reactor, outstream,
221 meta, issubsequent=seencommand)
221 meta, issubsequent=seencommand)
222
222
223 if sentoutput:
223 if sentoutput:
224 return
224 return
225
225
226 seencommand = True
226 seencommand = True
227
227
228 elif action == 'error':
228 elif action == 'error':
229 # TODO define proper error mechanism.
229 # TODO define proper error mechanism.
230 res.status = b'200 OK'
230 res.status = b'200 OK'
231 res.headers[b'Content-Type'] = b'text/plain'
231 res.headers[b'Content-Type'] = b'text/plain'
232 res.setbodybytes(meta['message'] + b'\n')
232 res.setbodybytes(meta['message'] + b'\n')
233 return
233 return
234 else:
234 else:
235 raise error.ProgrammingError(
235 raise error.ProgrammingError(
236 'unhandled action from frame processor: %s' % action)
236 'unhandled action from frame processor: %s' % action)
237
237
238 action, meta = reactor.oninputeof()
238 action, meta = reactor.oninputeof()
239 if action == 'sendframes':
239 if action == 'sendframes':
240 # We assume we haven't started sending the response yet. If we're
240 # We assume we haven't started sending the response yet. If we're
241 # wrong, the response type will raise an exception.
241 # wrong, the response type will raise an exception.
242 res.status = b'200 OK'
242 res.status = b'200 OK'
243 res.headers[b'Content-Type'] = FRAMINGTYPE
243 res.headers[b'Content-Type'] = FRAMINGTYPE
244 res.setbodygen(meta['framegen'])
244 res.setbodygen(meta['framegen'])
245 elif action == 'noop':
245 elif action == 'noop':
246 pass
246 pass
247 else:
247 else:
248 raise error.ProgrammingError('unhandled action from frame processor: %s'
248 raise error.ProgrammingError('unhandled action from frame processor: %s'
249 % action)
249 % action)
250
250
251 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
251 def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
252 outstream, command, issubsequent):
252 outstream, command, issubsequent):
253 """Dispatch a wire protocol command made from HTTPv2 requests.
253 """Dispatch a wire protocol command made from HTTPv2 requests.
254
254
255 The authenticated permission (``authedperm``) along with the original
255 The authenticated permission (``authedperm``) along with the original
256 command from the URL (``reqcommand``) are passed in.
256 command from the URL (``reqcommand``) are passed in.
257 """
257 """
258 # We already validated that the session has permissions to perform the
258 # We already validated that the session has permissions to perform the
259 # actions in ``authedperm``. In the unified frame protocol, the canonical
259 # actions in ``authedperm``. In the unified frame protocol, the canonical
260 # command to run is expressed in a frame. However, the URL also requested
260 # command to run is expressed in a frame. However, the URL also requested
261 # to run a specific command. We need to be careful that the command we
261 # to run a specific command. We need to be careful that the command we
262 # run doesn't have permissions requirements greater than what was granted
262 # run doesn't have permissions requirements greater than what was granted
263 # by ``authedperm``.
263 # by ``authedperm``.
264 #
264 #
265 # Our rule for this is we only allow one command per HTTP request and
265 # Our rule for this is we only allow one command per HTTP request and
266 # that command must match the command in the URL. However, we make
266 # that command must match the command in the URL. However, we make
267 # an exception for the ``multirequest`` URL. This URL is allowed to
267 # an exception for the ``multirequest`` URL. This URL is allowed to
268 # execute multiple commands. We double check permissions of each command
268 # execute multiple commands. We double check permissions of each command
269 # as it is invoked to ensure there is no privilege escalation.
269 # as it is invoked to ensure there is no privilege escalation.
270 # TODO consider allowing multiple commands to regular command URLs
270 # TODO consider allowing multiple commands to regular command URLs
271 # iff each command is the same.
271 # iff each command is the same.
272
272
273 proto = httpv2protocolhandler(req, ui, args=command['args'])
273 proto = httpv2protocolhandler(req, ui, args=command['args'])
274
274
275 if reqcommand == b'multirequest':
275 if reqcommand == b'multirequest':
276 if not COMMANDS.commandavailable(command['command'], proto):
276 if not COMMANDS.commandavailable(command['command'], proto):
277 # TODO proper error mechanism
277 # TODO proper error mechanism
278 res.status = b'200 OK'
278 res.status = b'200 OK'
279 res.headers[b'Content-Type'] = b'text/plain'
279 res.headers[b'Content-Type'] = b'text/plain'
280 res.setbodybytes(_('wire protocol command not available: %s') %
280 res.setbodybytes(_('wire protocol command not available: %s') %
281 command['command'])
281 command['command'])
282 return True
282 return True
283
283
284 # TODO don't use assert here, since it may be elided by -O.
284 # TODO don't use assert here, since it may be elided by -O.
285 assert authedperm in (b'ro', b'rw')
285 assert authedperm in (b'ro', b'rw')
286 wirecommand = COMMANDS[command['command']]
286 wirecommand = COMMANDS[command['command']]
287 assert wirecommand.permission in ('push', 'pull')
287 assert wirecommand.permission in ('push', 'pull')
288
288
289 if authedperm == b'ro' and wirecommand.permission != 'pull':
289 if authedperm == b'ro' and wirecommand.permission != 'pull':
290 # TODO proper error mechanism
290 # TODO proper error mechanism
291 res.status = b'403 Forbidden'
291 res.status = b'403 Forbidden'
292 res.headers[b'Content-Type'] = b'text/plain'
292 res.headers[b'Content-Type'] = b'text/plain'
293 res.setbodybytes(_('insufficient permissions to execute '
293 res.setbodybytes(_('insufficient permissions to execute '
294 'command: %s') % command['command'])
294 'command: %s') % command['command'])
295 return True
295 return True
296
296
297 # TODO should we also call checkperm() here? Maybe not if we're going
297 # TODO should we also call checkperm() here? Maybe not if we're going
298 # to overhaul that API. The granted scope from the URL check should
298 # to overhaul that API. The granted scope from the URL check should
299 # be good enough.
299 # be good enough.
300
300
301 else:
301 else:
302 # Don't allow multiple commands outside of ``multirequest`` URL.
302 # Don't allow multiple commands outside of ``multirequest`` URL.
303 if issubsequent:
303 if issubsequent:
304 # TODO proper error mechanism
304 # TODO proper error mechanism
305 res.status = b'200 OK'
305 res.status = b'200 OK'
306 res.headers[b'Content-Type'] = b'text/plain'
306 res.headers[b'Content-Type'] = b'text/plain'
307 res.setbodybytes(_('multiple commands cannot be issued to this '
307 res.setbodybytes(_('multiple commands cannot be issued to this '
308 'URL'))
308 'URL'))
309 return True
309 return True
310
310
311 if reqcommand != command['command']:
311 if reqcommand != command['command']:
312 # TODO define proper error mechanism
312 # TODO define proper error mechanism
313 res.status = b'200 OK'
313 res.status = b'200 OK'
314 res.headers[b'Content-Type'] = b'text/plain'
314 res.headers[b'Content-Type'] = b'text/plain'
315 res.setbodybytes(_('command in frame must match command in URL'))
315 res.setbodybytes(_('command in frame must match command in URL'))
316 return True
316 return True
317
317
318 res.status = b'200 OK'
318 res.status = b'200 OK'
319 res.headers[b'Content-Type'] = FRAMINGTYPE
319 res.headers[b'Content-Type'] = FRAMINGTYPE
320
320
321 try:
321 try:
322 objs = dispatch(repo, proto, command['command'], command['redirect'])
322 objs = dispatch(repo, proto, command['command'], command['redirect'])
323
323
324 action, meta = reactor.oncommandresponsereadyobjects(
324 action, meta = reactor.oncommandresponsereadyobjects(
325 outstream, command['requestid'], objs)
325 outstream, command['requestid'], objs)
326
326
327 except error.WireprotoCommandError as e:
327 except error.WireprotoCommandError as e:
328 action, meta = reactor.oncommanderror(
328 action, meta = reactor.oncommanderror(
329 outstream, command['requestid'], e.message, e.messageargs)
329 outstream, command['requestid'], e.message, e.messageargs)
330
330
331 except Exception as e:
331 except Exception as e:
332 action, meta = reactor.onservererror(
332 action, meta = reactor.onservererror(
333 outstream, command['requestid'],
333 outstream, command['requestid'],
334 _('exception when invoking command: %s') %
334 _('exception when invoking command: %s') %
335 stringutil.forcebytestr(e))
335 stringutil.forcebytestr(e))
336
336
337 if action == 'sendframes':
337 if action == 'sendframes':
338 res.setbodygen(meta['framegen'])
338 res.setbodygen(meta['framegen'])
339 return True
339 return True
340 elif action == 'noop':
340 elif action == 'noop':
341 return False
341 return False
342 else:
342 else:
343 raise error.ProgrammingError('unhandled event from reactor: %s' %
343 raise error.ProgrammingError('unhandled event from reactor: %s' %
344 action)
344 action)
345
345
346 def getdispatchrepo(repo, proto, command):
346 def getdispatchrepo(repo, proto, command):
347 return repo.filtered('served')
347 return repo.filtered('served')
348
348
349 def dispatch(repo, proto, command, redirect):
349 def dispatch(repo, proto, command, redirect):
350 """Run a wire protocol command.
350 """Run a wire protocol command.
351
351
352 Returns an iterable of objects that will be sent to the client.
352 Returns an iterable of objects that will be sent to the client.
353 """
353 """
354 repo = getdispatchrepo(repo, proto, command)
354 repo = getdispatchrepo(repo, proto, command)
355
355
356 entry = COMMANDS[command]
356 entry = COMMANDS[command]
357 func = entry.func
357 func = entry.func
358 spec = entry.args
358 spec = entry.args
359
359
360 args = proto.getargs(spec)
360 args = proto.getargs(spec)
361
361
362 # There is some duplicate boilerplate code here for calling the command and
362 # There is some duplicate boilerplate code here for calling the command and
363 # emitting objects. It is either that or a lot of indented code that looks
363 # emitting objects. It is either that or a lot of indented code that looks
364 # like a pyramid (since there are a lot of code paths that result in not
364 # like a pyramid (since there are a lot of code paths that result in not
365 # using the cacher).
365 # using the cacher).
366 callcommand = lambda: func(repo, proto, **pycompat.strkwargs(args))
366 callcommand = lambda: func(repo, proto, **pycompat.strkwargs(args))
367
367
368 # Request is not cacheable. Don't bother instantiating a cacher.
368 # Request is not cacheable. Don't bother instantiating a cacher.
369 if not entry.cachekeyfn:
369 if not entry.cachekeyfn:
370 for o in callcommand():
370 for o in callcommand():
371 yield o
371 yield o
372 return
372 return
373
373
374 if redirect:
374 if redirect:
375 redirecttargets = redirect[b'targets']
375 redirecttargets = redirect[b'targets']
376 redirecthashes = redirect[b'hashes']
376 redirecthashes = redirect[b'hashes']
377 else:
377 else:
378 redirecttargets = []
378 redirecttargets = []
379 redirecthashes = []
379 redirecthashes = []
380
380
381 cacher = makeresponsecacher(repo, proto, command, args,
381 cacher = makeresponsecacher(repo, proto, command, args,
382 cborutil.streamencode,
382 cborutil.streamencode,
383 redirecttargets=redirecttargets,
383 redirecttargets=redirecttargets,
384 redirecthashes=redirecthashes)
384 redirecthashes=redirecthashes)
385
385
386 # But we have no cacher. Do default handling.
386 # But we have no cacher. Do default handling.
387 if not cacher:
387 if not cacher:
388 for o in callcommand():
388 for o in callcommand():
389 yield o
389 yield o
390 return
390 return
391
391
392 with cacher:
392 with cacher:
393 cachekey = entry.cachekeyfn(repo, proto, cacher, **args)
393 cachekey = entry.cachekeyfn(repo, proto, cacher, **args)
394
394
395 # No cache key or the cacher doesn't like it. Do default handling.
395 # No cache key or the cacher doesn't like it. Do default handling.
396 if cachekey is None or not cacher.setcachekey(cachekey):
396 if cachekey is None or not cacher.setcachekey(cachekey):
397 for o in callcommand():
397 for o in callcommand():
398 yield o
398 yield o
399 return
399 return
400
400
401 # Serve it from the cache, if possible.
401 # Serve it from the cache, if possible.
402 cached = cacher.lookup()
402 cached = cacher.lookup()
403
403
404 if cached:
404 if cached:
405 for o in cached['objs']:
405 for o in cached['objs']:
406 yield o
406 yield o
407 return
407 return
408
408
409 # Else call the command and feed its output into the cacher, allowing
409 # Else call the command and feed its output into the cacher, allowing
410 # the cacher to buffer/mutate objects as it desires.
410 # the cacher to buffer/mutate objects as it desires.
411 for o in callcommand():
411 for o in callcommand():
412 for o in cacher.onobject(o):
412 for o in cacher.onobject(o):
413 yield o
413 yield o
414
414
415 for o in cacher.onfinished():
415 for o in cacher.onfinished():
416 yield o
416 yield o
417
417
418 @interfaceutil.implementer(wireprototypes.baseprotocolhandler)
418 @interfaceutil.implementer(wireprototypes.baseprotocolhandler)
419 class httpv2protocolhandler(object):
419 class httpv2protocolhandler(object):
420 def __init__(self, req, ui, args=None):
420 def __init__(self, req, ui, args=None):
421 self._req = req
421 self._req = req
422 self._ui = ui
422 self._ui = ui
423 self._args = args
423 self._args = args
424
424
425 @property
425 @property
426 def name(self):
426 def name(self):
427 return HTTP_WIREPROTO_V2
427 return HTTP_WIREPROTO_V2
428
428
429 def getargs(self, args):
429 def getargs(self, args):
430 # First look for args that were passed but aren't registered on this
430 # First look for args that were passed but aren't registered on this
431 # command.
431 # command.
432 extra = set(self._args) - set(args)
432 extra = set(self._args) - set(args)
433 if extra:
433 if extra:
434 raise error.WireprotoCommandError(
434 raise error.WireprotoCommandError(
435 'unsupported argument to command: %s' %
435 'unsupported argument to command: %s' %
436 ', '.join(sorted(extra)))
436 ', '.join(sorted(extra)))
437
437
438 # And look for required arguments that are missing.
438 # And look for required arguments that are missing.
439 missing = {a for a in args if args[a]['required']} - set(self._args)
439 missing = {a for a in args if args[a]['required']} - set(self._args)
440
440
441 if missing:
441 if missing:
442 raise error.WireprotoCommandError(
442 raise error.WireprotoCommandError(
443 'missing required arguments: %s' % ', '.join(sorted(missing)))
443 'missing required arguments: %s' % ', '.join(sorted(missing)))
444
444
445 # Now derive the arguments to pass to the command, taking into
445 # Now derive the arguments to pass to the command, taking into
446 # account the arguments specified by the client.
446 # account the arguments specified by the client.
447 data = {}
447 data = {}
448 for k, meta in sorted(args.items()):
448 for k, meta in sorted(args.items()):
449 # This argument wasn't passed by the client.
449 # This argument wasn't passed by the client.
450 if k not in self._args:
450 if k not in self._args:
451 data[k] = meta['default']()
451 data[k] = meta['default']()
452 continue
452 continue
453
453
454 v = self._args[k]
454 v = self._args[k]
455
455
456 # Sets may be expressed as lists. Silently normalize.
456 # Sets may be expressed as lists. Silently normalize.
457 if meta['type'] == 'set' and isinstance(v, list):
457 if meta['type'] == 'set' and isinstance(v, list):
458 v = set(v)
458 v = set(v)
459
459
460 # TODO consider more/stronger type validation.
460 # TODO consider more/stronger type validation.
461
461
462 data[k] = v
462 data[k] = v
463
463
464 return data
464 return data
465
465
466 def getprotocaps(self):
466 def getprotocaps(self):
467 # Protocol capabilities are currently not implemented for HTTP V2.
467 # Protocol capabilities are currently not implemented for HTTP V2.
468 return set()
468 return set()
469
469
470 def getpayload(self):
470 def getpayload(self):
471 raise NotImplementedError
471 raise NotImplementedError
472
472
473 @contextlib.contextmanager
473 @contextlib.contextmanager
474 def mayberedirectstdio(self):
474 def mayberedirectstdio(self):
475 raise NotImplementedError
475 raise NotImplementedError
476
476
477 def client(self):
477 def client(self):
478 raise NotImplementedError
478 raise NotImplementedError
479
479
480 def addcapabilities(self, repo, caps):
480 def addcapabilities(self, repo, caps):
481 return caps
481 return caps
482
482
483 def checkperm(self, perm):
483 def checkperm(self, perm):
484 raise NotImplementedError
484 raise NotImplementedError
485
485
486 def httpv2apidescriptor(req, repo):
486 def httpv2apidescriptor(req, repo):
487 proto = httpv2protocolhandler(req, repo.ui)
487 proto = httpv2protocolhandler(req, repo.ui)
488
488
489 return _capabilitiesv2(repo, proto)
489 return _capabilitiesv2(repo, proto)
490
490
491 def _capabilitiesv2(repo, proto):
491 def _capabilitiesv2(repo, proto):
492 """Obtain the set of capabilities for version 2 transports.
492 """Obtain the set of capabilities for version 2 transports.
493
493
494 These capabilities are distinct from the capabilities for version 1
494 These capabilities are distinct from the capabilities for version 1
495 transports.
495 transports.
496 """
496 """
497 caps = {
497 caps = {
498 'commands': {},
498 'commands': {},
499 'framingmediatypes': [FRAMINGTYPE],
499 'framingmediatypes': [FRAMINGTYPE],
500 'pathfilterprefixes': set(narrowspec.VALID_PREFIXES),
500 'pathfilterprefixes': set(narrowspec.VALID_PREFIXES),
501 }
501 }
502
502
503 for command, entry in COMMANDS.items():
503 for command, entry in COMMANDS.items():
504 args = {}
504 args = {}
505
505
506 for arg, meta in entry.args.items():
506 for arg, meta in entry.args.items():
507 args[arg] = {
507 args[arg] = {
508 # TODO should this be a normalized type using CBOR's
508 # TODO should this be a normalized type using CBOR's
509 # terminology?
509 # terminology?
510 b'type': meta['type'],
510 b'type': meta['type'],
511 b'required': meta['required'],
511 b'required': meta['required'],
512 }
512 }
513
513
514 if not meta['required']:
514 if not meta['required']:
515 args[arg][b'default'] = meta['default']()
515 args[arg][b'default'] = meta['default']()
516
516
517 if meta['validvalues']:
517 if meta['validvalues']:
518 args[arg][b'validvalues'] = meta['validvalues']
518 args[arg][b'validvalues'] = meta['validvalues']
519
519
520 # TODO this type of check should be defined in a per-command callback.
520 # TODO this type of check should be defined in a per-command callback.
521 if (command == b'rawstorefiledata'
521 if (command == b'rawstorefiledata'
522 and not streamclone.allowservergeneration(repo)):
522 and not streamclone.allowservergeneration(repo)):
523 continue
523 continue
524
524
525 caps['commands'][command] = {
525 caps['commands'][command] = {
526 'args': args,
526 'args': args,
527 'permissions': [entry.permission],
527 'permissions': [entry.permission],
528 }
528 }
529
529
530 if entry.extracapabilitiesfn:
530 if entry.extracapabilitiesfn:
531 extracaps = entry.extracapabilitiesfn(repo, proto)
531 extracaps = entry.extracapabilitiesfn(repo, proto)
532 caps['commands'][command].update(extracaps)
532 caps['commands'][command].update(extracaps)
533
533
534 caps['rawrepoformats'] = sorted(repo.requirements &
534 caps['rawrepoformats'] = sorted(repo.requirements &
535 repo.supportedformats)
535 repo.supportedformats)
536
536
537 targets = getadvertisedredirecttargets(repo, proto)
537 targets = getadvertisedredirecttargets(repo, proto)
538 if targets:
538 if targets:
539 caps[b'redirect'] = {
539 caps[b'redirect'] = {
540 b'targets': [],
540 b'targets': [],
541 b'hashes': [b'sha256', b'sha1'],
541 b'hashes': [b'sha256', b'sha1'],
542 }
542 }
543
543
544 for target in targets:
544 for target in targets:
545 entry = {
545 entry = {
546 b'name': target['name'],
546 b'name': target['name'],
547 b'protocol': target['protocol'],
547 b'protocol': target['protocol'],
548 b'uris': target['uris'],
548 b'uris': target['uris'],
549 }
549 }
550
550
551 for key in ('snirequired', 'tlsversions'):
551 for key in ('snirequired', 'tlsversions'):
552 if key in target:
552 if key in target:
553 entry[key] = target[key]
553 entry[key] = target[key]
554
554
555 caps[b'redirect'][b'targets'].append(entry)
555 caps[b'redirect'][b'targets'].append(entry)
556
556
557 return proto.addcapabilities(repo, caps)
557 return proto.addcapabilities(repo, caps)
558
558
559 def getadvertisedredirecttargets(repo, proto):
559 def getadvertisedredirecttargets(repo, proto):
560 """Obtain a list of content redirect targets.
560 """Obtain a list of content redirect targets.
561
561
562 Returns a list containing potential redirect targets that will be
562 Returns a list containing potential redirect targets that will be
563 advertised in capabilities data. Each dict MUST have the following
563 advertised in capabilities data. Each dict MUST have the following
564 keys:
564 keys:
565
565
566 name
566 name
567 The name of this redirect target. This is the identifier clients use
567 The name of this redirect target. This is the identifier clients use
568 to refer to a target. It is transferred as part of every command
568 to refer to a target. It is transferred as part of every command
569 request.
569 request.
570
570
571 protocol
571 protocol
572 Network protocol used by this target. Typically this is the string
572 Network protocol used by this target. Typically this is the string
573 in front of the ``://`` in a URL. e.g. ``https``.
573 in front of the ``://`` in a URL. e.g. ``https``.
574
574
575 uris
575 uris
576 List of representative URIs for this target. Clients can use the
576 List of representative URIs for this target. Clients can use the
577 URIs to test parsing for compatibility or for ordering preference
577 URIs to test parsing for compatibility or for ordering preference
578 for which target to use.
578 for which target to use.
579
579
580 The following optional keys are recognized:
580 The following optional keys are recognized:
581
581
582 snirequired
582 snirequired
583 Bool indicating if Server Name Indication (SNI) is required to
583 Bool indicating if Server Name Indication (SNI) is required to
584 connect to this target.
584 connect to this target.
585
585
586 tlsversions
586 tlsversions
587 List of bytes indicating which TLS versions are supported by this
587 List of bytes indicating which TLS versions are supported by this
588 target.
588 target.
589
589
590 By default, clients reflect the target order advertised by servers
590 By default, clients reflect the target order advertised by servers
591 and servers will use the first client-advertised target when picking
591 and servers will use the first client-advertised target when picking
592 a redirect target. So targets should be advertised in the order the
592 a redirect target. So targets should be advertised in the order the
593 server prefers they be used.
593 server prefers they be used.
594 """
594 """
595 return []
595 return []
596
596
597 def wireprotocommand(name, args=None, permission='push', cachekeyfn=None,
597 def wireprotocommand(name, args=None, permission='push', cachekeyfn=None,
598 extracapabilitiesfn=None):
598 extracapabilitiesfn=None):
599 """Decorator to declare a wire protocol command.
599 """Decorator to declare a wire protocol command.
600
600
601 ``name`` is the name of the wire protocol command being provided.
601 ``name`` is the name of the wire protocol command being provided.
602
602
603 ``args`` is a dict defining arguments accepted by the command. Keys are
603 ``args`` is a dict defining arguments accepted by the command. Keys are
604 the argument name. Values are dicts with the following keys:
604 the argument name. Values are dicts with the following keys:
605
605
606 ``type``
606 ``type``
607 The argument data type. Must be one of the following string
607 The argument data type. Must be one of the following string
608 literals: ``bytes``, ``int``, ``list``, ``dict``, ``set``,
608 literals: ``bytes``, ``int``, ``list``, ``dict``, ``set``,
609 or ``bool``.
609 or ``bool``.
610
610
611 ``default``
611 ``default``
612 A callable returning the default value for this argument. If not
612 A callable returning the default value for this argument. If not
613 specified, ``None`` will be the default value.
613 specified, ``None`` will be the default value.
614
614
615 ``example``
615 ``example``
616 An example value for this argument.
616 An example value for this argument.
617
617
618 ``validvalues``
618 ``validvalues``
619 Set of recognized values for this argument.
619 Set of recognized values for this argument.
620
620
621 ``permission`` defines the permission type needed to run this command.
621 ``permission`` defines the permission type needed to run this command.
622 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
622 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
623 respectively. Default is to assume command requires ``push`` permissions
623 respectively. Default is to assume command requires ``push`` permissions
624 because otherwise commands not declaring their permissions could modify
624 because otherwise commands not declaring their permissions could modify
625 a repository that is supposed to be read-only.
625 a repository that is supposed to be read-only.
626
626
627 ``cachekeyfn`` defines an optional callable that can derive the
627 ``cachekeyfn`` defines an optional callable that can derive the
628 cache key for this request.
628 cache key for this request.
629
629
630 ``extracapabilitiesfn`` defines an optional callable that defines extra
630 ``extracapabilitiesfn`` defines an optional callable that defines extra
631 command capabilities/parameters that are advertised next to the command
631 command capabilities/parameters that are advertised next to the command
632 in the capabilities data structure describing the server. The callable
632 in the capabilities data structure describing the server. The callable
633 receives as arguments the repository and protocol objects. It returns
633 receives as arguments the repository and protocol objects. It returns
634 a dict of extra fields to add to the command descriptor.
634 a dict of extra fields to add to the command descriptor.
635
635
636 Wire protocol commands are generators of objects to be serialized and
636 Wire protocol commands are generators of objects to be serialized and
637 sent to the client.
637 sent to the client.
638
638
639 If a command raises an uncaught exception, this will be translated into
639 If a command raises an uncaught exception, this will be translated into
640 a command error.
640 a command error.
641
641
642 All commands can opt in to being cacheable by defining a function
642 All commands can opt in to being cacheable by defining a function
643 (``cachekeyfn``) that is called to derive a cache key. This function
643 (``cachekeyfn``) that is called to derive a cache key. This function
644 receives the same arguments as the command itself plus a ``cacher``
644 receives the same arguments as the command itself plus a ``cacher``
645 argument containing the active cacher for the request and returns a bytes
645 argument containing the active cacher for the request and returns a bytes
646 containing the key in a cache the response to this command may be cached
646 containing the key in a cache the response to this command may be cached
647 under.
647 under.
648 """
648 """
649 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
649 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
650 if v['version'] == 2}
650 if v['version'] == 2}
651
651
652 if permission not in ('push', 'pull'):
652 if permission not in ('push', 'pull'):
653 raise error.ProgrammingError('invalid wire protocol permission; '
653 raise error.ProgrammingError('invalid wire protocol permission; '
654 'got %s; expected "push" or "pull"' %
654 'got %s; expected "push" or "pull"' %
655 permission)
655 permission)
656
656
657 if args is None:
657 if args is None:
658 args = {}
658 args = {}
659
659
660 if not isinstance(args, dict):
660 if not isinstance(args, dict):
661 raise error.ProgrammingError('arguments for version 2 commands '
661 raise error.ProgrammingError('arguments for version 2 commands '
662 'must be declared as dicts')
662 'must be declared as dicts')
663
663
664 for arg, meta in args.items():
664 for arg, meta in args.items():
665 if arg == '*':
665 if arg == '*':
666 raise error.ProgrammingError('* argument name not allowed on '
666 raise error.ProgrammingError('* argument name not allowed on '
667 'version 2 commands')
667 'version 2 commands')
668
668
669 if not isinstance(meta, dict):
669 if not isinstance(meta, dict):
670 raise error.ProgrammingError('arguments for version 2 commands '
670 raise error.ProgrammingError('arguments for version 2 commands '
671 'must declare metadata as a dict')
671 'must declare metadata as a dict')
672
672
673 if 'type' not in meta:
673 if 'type' not in meta:
674 raise error.ProgrammingError('%s argument for command %s does not '
674 raise error.ProgrammingError('%s argument for command %s does not '
675 'declare type field' % (arg, name))
675 'declare type field' % (arg, name))
676
676
677 if meta['type'] not in ('bytes', 'int', 'list', 'dict', 'set', 'bool'):
677 if meta['type'] not in ('bytes', 'int', 'list', 'dict', 'set', 'bool'):
678 raise error.ProgrammingError('%s argument for command %s has '
678 raise error.ProgrammingError('%s argument for command %s has '
679 'illegal type: %s' % (arg, name,
679 'illegal type: %s' % (arg, name,
680 meta['type']))
680 meta['type']))
681
681
682 if 'example' not in meta:
682 if 'example' not in meta:
683 raise error.ProgrammingError('%s argument for command %s does not '
683 raise error.ProgrammingError('%s argument for command %s does not '
684 'declare example field' % (arg, name))
684 'declare example field' % (arg, name))
685
685
686 meta['required'] = 'default' not in meta
686 meta['required'] = 'default' not in meta
687
687
688 meta.setdefault('default', lambda: None)
688 meta.setdefault('default', lambda: None)
689 meta.setdefault('validvalues', None)
689 meta.setdefault('validvalues', None)
690
690
691 def register(func):
691 def register(func):
692 if name in COMMANDS:
692 if name in COMMANDS:
693 raise error.ProgrammingError('%s command already registered '
693 raise error.ProgrammingError('%s command already registered '
694 'for version 2' % name)
694 'for version 2' % name)
695
695
696 COMMANDS[name] = wireprototypes.commandentry(
696 COMMANDS[name] = wireprototypes.commandentry(
697 func, args=args, transports=transports, permission=permission,
697 func, args=args, transports=transports, permission=permission,
698 cachekeyfn=cachekeyfn, extracapabilitiesfn=extracapabilitiesfn)
698 cachekeyfn=cachekeyfn, extracapabilitiesfn=extracapabilitiesfn)
699
699
700 return func
700 return func
701
701
702 return register
702 return register
703
703
704 def makecommandcachekeyfn(command, localversion=None, allargs=False):
704 def makecommandcachekeyfn(command, localversion=None, allargs=False):
705 """Construct a cache key derivation function with common features.
705 """Construct a cache key derivation function with common features.
706
706
707 By default, the cache key is a hash of:
707 By default, the cache key is a hash of:
708
708
709 * The command name.
709 * The command name.
710 * A global cache version number.
710 * A global cache version number.
711 * A local cache version number (passed via ``localversion``).
711 * A local cache version number (passed via ``localversion``).
712 * All the arguments passed to the command.
712 * All the arguments passed to the command.
713 * The media type used.
713 * The media type used.
714 * Wire protocol version string.
714 * Wire protocol version string.
715 * The repository path.
715 * The repository path.
716 """
716 """
717 if not allargs:
717 if not allargs:
718 raise error.ProgrammingError('only allargs=True is currently supported')
718 raise error.ProgrammingError('only allargs=True is currently supported')
719
719
720 if localversion is None:
720 if localversion is None:
721 raise error.ProgrammingError('must set localversion argument value')
721 raise error.ProgrammingError('must set localversion argument value')
722
722
723 def cachekeyfn(repo, proto, cacher, **args):
723 def cachekeyfn(repo, proto, cacher, **args):
724 spec = COMMANDS[command]
724 spec = COMMANDS[command]
725
725
726 # Commands that mutate the repo can not be cached.
726 # Commands that mutate the repo can not be cached.
727 if spec.permission == 'push':
727 if spec.permission == 'push':
728 return None
728 return None
729
729
730 # TODO config option to disable caching.
730 # TODO config option to disable caching.
731
731
732 # Our key derivation strategy is to construct a data structure
732 # Our key derivation strategy is to construct a data structure
733 # holding everything that could influence cacheability and to hash
733 # holding everything that could influence cacheability and to hash
734 # the CBOR representation of that. Using CBOR seems like it might
734 # the CBOR representation of that. Using CBOR seems like it might
735 # be overkill. However, simpler hashing mechanisms are prone to
735 # be overkill. However, simpler hashing mechanisms are prone to
736 # duplicate input issues. e.g. if you just concatenate two values,
736 # duplicate input issues. e.g. if you just concatenate two values,
737 # "foo"+"bar" is identical to "fo"+"obar". Using CBOR provides
737 # "foo"+"bar" is identical to "fo"+"obar". Using CBOR provides
738 # "padding" between values and prevents these problems.
738 # "padding" between values and prevents these problems.
739
739
740 # Seed the hash with various data.
740 # Seed the hash with various data.
741 state = {
741 state = {
742 # To invalidate all cache keys.
742 # To invalidate all cache keys.
743 b'globalversion': GLOBAL_CACHE_VERSION,
743 b'globalversion': GLOBAL_CACHE_VERSION,
744 # More granular cache key invalidation.
744 # More granular cache key invalidation.
745 b'localversion': localversion,
745 b'localversion': localversion,
746 # Cache keys are segmented by command.
746 # Cache keys are segmented by command.
747 b'command': pycompat.sysbytes(command),
747 b'command': pycompat.sysbytes(command),
748 # Throw in the media type and API version strings so changes
748 # Throw in the media type and API version strings so changes
749 # to exchange semantics invalid cache.
749 # to exchange semantics invalid cache.
750 b'mediatype': FRAMINGTYPE,
750 b'mediatype': FRAMINGTYPE,
751 b'version': HTTP_WIREPROTO_V2,
751 b'version': HTTP_WIREPROTO_V2,
752 # So same requests for different repos don't share cache keys.
752 # So same requests for different repos don't share cache keys.
753 b'repo': repo.root,
753 b'repo': repo.root,
754 }
754 }
755
755
756 # The arguments passed to us will have already been normalized.
756 # The arguments passed to us will have already been normalized.
757 # Default values will be set, etc. This is important because it
757 # Default values will be set, etc. This is important because it
758 # means that it doesn't matter if clients send an explicit argument
758 # means that it doesn't matter if clients send an explicit argument
759 # or rely on the default value: it will all normalize to the same
759 # or rely on the default value: it will all normalize to the same
760 # set of arguments on the server and therefore the same cache key.
760 # set of arguments on the server and therefore the same cache key.
761 #
761 #
762 # Arguments by their very nature must support being encoded to CBOR.
762 # Arguments by their very nature must support being encoded to CBOR.
763 # And the CBOR encoder is deterministic. So we hash the arguments
763 # And the CBOR encoder is deterministic. So we hash the arguments
764 # by feeding the CBOR of their representation into the hasher.
764 # by feeding the CBOR of their representation into the hasher.
765 if allargs:
765 if allargs:
766 state[b'args'] = pycompat.byteskwargs(args)
766 state[b'args'] = pycompat.byteskwargs(args)
767
767
768 cacher.adjustcachekeystate(state)
768 cacher.adjustcachekeystate(state)
769
769
770 hasher = hashlib.sha1()
770 hasher = hashlib.sha1()
771 for chunk in cborutil.streamencode(state):
771 for chunk in cborutil.streamencode(state):
772 hasher.update(chunk)
772 hasher.update(chunk)
773
773
774 return pycompat.sysbytes(hasher.hexdigest())
774 return pycompat.sysbytes(hasher.hexdigest())
775
775
776 return cachekeyfn
776 return cachekeyfn
777
777
778 def makeresponsecacher(repo, proto, command, args, objencoderfn,
778 def makeresponsecacher(repo, proto, command, args, objencoderfn,
779 redirecttargets, redirecthashes):
779 redirecttargets, redirecthashes):
780 """Construct a cacher for a cacheable command.
780 """Construct a cacher for a cacheable command.
781
781
782 Returns an ``iwireprotocolcommandcacher`` instance.
782 Returns an ``iwireprotocolcommandcacher`` instance.
783
783
784 Extensions can monkeypatch this function to provide custom caching
784 Extensions can monkeypatch this function to provide custom caching
785 backends.
785 backends.
786 """
786 """
787 return None
787 return None
788
788
789 def resolvenodes(repo, revisions):
789 def resolvenodes(repo, revisions):
790 """Resolve nodes from a revisions specifier data structure."""
790 """Resolve nodes from a revisions specifier data structure."""
791 cl = repo.changelog
791 cl = repo.changelog
792 clhasnode = cl.hasnode
792 clhasnode = cl.hasnode
793
793
794 seen = set()
794 seen = set()
795 nodes = []
795 nodes = []
796
796
797 if not isinstance(revisions, list):
797 if not isinstance(revisions, list):
798 raise error.WireprotoCommandError('revisions must be defined as an '
798 raise error.WireprotoCommandError('revisions must be defined as an '
799 'array')
799 'array')
800
800
801 for spec in revisions:
801 for spec in revisions:
802 if b'type' not in spec:
802 if b'type' not in spec:
803 raise error.WireprotoCommandError(
803 raise error.WireprotoCommandError(
804 'type key not present in revision specifier')
804 'type key not present in revision specifier')
805
805
806 typ = spec[b'type']
806 typ = spec[b'type']
807
807
808 if typ == b'changesetexplicit':
808 if typ == b'changesetexplicit':
809 if b'nodes' not in spec:
809 if b'nodes' not in spec:
810 raise error.WireprotoCommandError(
810 raise error.WireprotoCommandError(
811 'nodes key not present in changesetexplicit revision '
811 'nodes key not present in changesetexplicit revision '
812 'specifier')
812 'specifier')
813
813
814 for node in spec[b'nodes']:
814 for node in spec[b'nodes']:
815 if node not in seen:
815 if node not in seen:
816 nodes.append(node)
816 nodes.append(node)
817 seen.add(node)
817 seen.add(node)
818
818
819 elif typ == b'changesetexplicitdepth':
819 elif typ == b'changesetexplicitdepth':
820 for key in (b'nodes', b'depth'):
820 for key in (b'nodes', b'depth'):
821 if key not in spec:
821 if key not in spec:
822 raise error.WireprotoCommandError(
822 raise error.WireprotoCommandError(
823 '%s key not present in changesetexplicitdepth revision '
823 '%s key not present in changesetexplicitdepth revision '
824 'specifier', (key,))
824 'specifier', (key,))
825
825
826 for rev in repo.revs(b'ancestors(%ln, %d)', spec[b'nodes'],
826 for rev in repo.revs(b'ancestors(%ln, %d)', spec[b'nodes'],
827 spec[b'depth'] - 1):
827 spec[b'depth'] - 1):
828 node = cl.node(rev)
828 node = cl.node(rev)
829
829
830 if node not in seen:
830 if node not in seen:
831 nodes.append(node)
831 nodes.append(node)
832 seen.add(node)
832 seen.add(node)
833
833
834 elif typ == b'changesetdagrange':
834 elif typ == b'changesetdagrange':
835 for key in (b'roots', b'heads'):
835 for key in (b'roots', b'heads'):
836 if key not in spec:
836 if key not in spec:
837 raise error.WireprotoCommandError(
837 raise error.WireprotoCommandError(
838 '%s key not present in changesetdagrange revision '
838 '%s key not present in changesetdagrange revision '
839 'specifier', (key,))
839 'specifier', (key,))
840
840
841 if not spec[b'heads']:
841 if not spec[b'heads']:
842 raise error.WireprotoCommandError(
842 raise error.WireprotoCommandError(
843 'heads key in changesetdagrange cannot be empty')
843 'heads key in changesetdagrange cannot be empty')
844
844
845 if spec[b'roots']:
845 if spec[b'roots']:
846 common = [n for n in spec[b'roots'] if clhasnode(n)]
846 common = [n for n in spec[b'roots'] if clhasnode(n)]
847 else:
847 else:
848 common = [nullid]
848 common = [nullid]
849
849
850 for n in discovery.outgoing(repo, common, spec[b'heads']).missing:
850 for n in discovery.outgoing(repo, common, spec[b'heads']).missing:
851 if n not in seen:
851 if n not in seen:
852 nodes.append(n)
852 nodes.append(n)
853 seen.add(n)
853 seen.add(n)
854
854
855 else:
855 else:
856 raise error.WireprotoCommandError(
856 raise error.WireprotoCommandError(
857 'unknown revision specifier type: %s', (typ,))
857 'unknown revision specifier type: %s', (typ,))
858
858
859 return nodes
859 return nodes
860
860
861 @wireprotocommand('branchmap', permission='pull')
861 @wireprotocommand('branchmap', permission='pull')
862 def branchmapv2(repo, proto):
862 def branchmapv2(repo, proto):
863 yield {encoding.fromlocal(k): v
863 yield {encoding.fromlocal(k): v
864 for k, v in repo.branchmap().iteritems()}
864 for k, v in repo.branchmap().iteritems()}
865
865
866 @wireprotocommand('capabilities', permission='pull')
866 @wireprotocommand('capabilities', permission='pull')
867 def capabilitiesv2(repo, proto):
867 def capabilitiesv2(repo, proto):
868 yield _capabilitiesv2(repo, proto)
868 yield _capabilitiesv2(repo, proto)
869
869
870 @wireprotocommand(
870 @wireprotocommand(
871 'changesetdata',
871 'changesetdata',
872 args={
872 args={
873 'revisions': {
873 'revisions': {
874 'type': 'list',
874 'type': 'list',
875 'example': [{
875 'example': [{
876 b'type': b'changesetexplicit',
876 b'type': b'changesetexplicit',
877 b'nodes': [b'abcdef...'],
877 b'nodes': [b'abcdef...'],
878 }],
878 }],
879 },
879 },
880 'fields': {
880 'fields': {
881 'type': 'set',
881 'type': 'set',
882 'default': set,
882 'default': set,
883 'example': {b'parents', b'revision'},
883 'example': {b'parents', b'revision'},
884 'validvalues': {b'bookmarks', b'parents', b'phase', b'revision'},
884 'validvalues': {b'bookmarks', b'parents', b'phase', b'revision'},
885 },
885 },
886 },
886 },
887 permission='pull')
887 permission='pull')
888 def changesetdata(repo, proto, revisions, fields):
888 def changesetdata(repo, proto, revisions, fields):
889 # TODO look for unknown fields and abort when they can't be serviced.
889 # TODO look for unknown fields and abort when they can't be serviced.
890 # This could probably be validated by dispatcher using validvalues.
890 # This could probably be validated by dispatcher using validvalues.
891
891
892 cl = repo.changelog
892 cl = repo.changelog
893 outgoing = resolvenodes(repo, revisions)
893 outgoing = resolvenodes(repo, revisions)
894 publishing = repo.publishing()
894 publishing = repo.publishing()
895
895
896 if outgoing:
896 if outgoing:
897 repo.hook('preoutgoing', throw=True, source='serve')
897 repo.hook('preoutgoing', throw=True, source='serve')
898
898
899 yield {
899 yield {
900 b'totalitems': len(outgoing),
900 b'totalitems': len(outgoing),
901 }
901 }
902
902
903 # The phases of nodes already transferred to the client may have changed
903 # The phases of nodes already transferred to the client may have changed
904 # since the client last requested data. We send phase-only records
904 # since the client last requested data. We send phase-only records
905 # for these revisions, if requested.
905 # for these revisions, if requested.
906 # TODO actually do this. We'll probably want to emit phase heads
906 # TODO actually do this. We'll probably want to emit phase heads
907 # in the ancestry set of the outgoing revisions. This will ensure
907 # in the ancestry set of the outgoing revisions. This will ensure
908 # that phase updates within that set are seen.
908 # that phase updates within that set are seen.
909 if b'phase' in fields:
909 if b'phase' in fields:
910 pass
910 pass
911
911
912 nodebookmarks = {}
912 nodebookmarks = {}
913 for mark, node in repo._bookmarks.items():
913 for mark, node in repo._bookmarks.items():
914 nodebookmarks.setdefault(node, set()).add(mark)
914 nodebookmarks.setdefault(node, set()).add(mark)
915
915
916 # It is already topologically sorted by revision number.
916 # It is already topologically sorted by revision number.
917 for node in outgoing:
917 for node in outgoing:
918 d = {
918 d = {
919 b'node': node,
919 b'node': node,
920 }
920 }
921
921
922 if b'parents' in fields:
922 if b'parents' in fields:
923 d[b'parents'] = cl.parents(node)
923 d[b'parents'] = cl.parents(node)
924
924
925 if b'phase' in fields:
925 if b'phase' in fields:
926 if publishing:
926 if publishing:
927 d[b'phase'] = b'public'
927 d[b'phase'] = b'public'
928 else:
928 else:
929 ctx = repo[node]
929 ctx = repo[node]
930 d[b'phase'] = ctx.phasestr()
930 d[b'phase'] = ctx.phasestr()
931
931
932 if b'bookmarks' in fields and node in nodebookmarks:
932 if b'bookmarks' in fields and node in nodebookmarks:
933 d[b'bookmarks'] = sorted(nodebookmarks[node])
933 d[b'bookmarks'] = sorted(nodebookmarks[node])
934 del nodebookmarks[node]
934 del nodebookmarks[node]
935
935
936 followingmeta = []
936 followingmeta = []
937 followingdata = []
937 followingdata = []
938
938
939 if b'revision' in fields:
939 if b'revision' in fields:
940 revisiondata = cl.revision(node, raw=True)
940 revisiondata = cl.revision(node, raw=True)
941 followingmeta.append((b'revision', len(revisiondata)))
941 followingmeta.append((b'revision', len(revisiondata)))
942 followingdata.append(revisiondata)
942 followingdata.append(revisiondata)
943
943
944 # TODO make it possible for extensions to wrap a function or register
944 # TODO make it possible for extensions to wrap a function or register
945 # a handler to service custom fields.
945 # a handler to service custom fields.
946
946
947 if followingmeta:
947 if followingmeta:
948 d[b'fieldsfollowing'] = followingmeta
948 d[b'fieldsfollowing'] = followingmeta
949
949
950 yield d
950 yield d
951
951
952 for extra in followingdata:
952 for extra in followingdata:
953 yield extra
953 yield extra
954
954
955 # If requested, send bookmarks from nodes that didn't have revision
955 # If requested, send bookmarks from nodes that didn't have revision
956 # data sent so receiver is aware of any bookmark updates.
956 # data sent so receiver is aware of any bookmark updates.
957 if b'bookmarks' in fields:
957 if b'bookmarks' in fields:
958 for node, marks in sorted(nodebookmarks.iteritems()):
958 for node, marks in sorted(nodebookmarks.iteritems()):
959 yield {
959 yield {
960 b'node': node,
960 b'node': node,
961 b'bookmarks': sorted(marks),
961 b'bookmarks': sorted(marks),
962 }
962 }
963
963
964 class FileAccessError(Exception):
964 class FileAccessError(Exception):
965 """Represents an error accessing a specific file."""
965 """Represents an error accessing a specific file."""
966
966
967 def __init__(self, path, msg, args):
967 def __init__(self, path, msg, args):
968 self.path = path
968 self.path = path
969 self.msg = msg
969 self.msg = msg
970 self.args = args
970 self.args = args
971
971
972 def getfilestore(repo, proto, path):
972 def getfilestore(repo, proto, path):
973 """Obtain a file storage object for use with wire protocol.
973 """Obtain a file storage object for use with wire protocol.
974
974
975 Exists as a standalone function so extensions can monkeypatch to add
975 Exists as a standalone function so extensions can monkeypatch to add
976 access control.
976 access control.
977 """
977 """
978 # This seems to work even if the file doesn't exist. So catch
978 # This seems to work even if the file doesn't exist. So catch
979 # "empty" files and return an error.
979 # "empty" files and return an error.
980 fl = repo.file(path)
980 fl = repo.file(path)
981
981
982 if not len(fl):
982 if not len(fl):
983 raise FileAccessError(path, 'unknown file: %s', (path,))
983 raise FileAccessError(path, 'unknown file: %s', (path,))
984
984
985 return fl
985 return fl
986
986
987 def emitfilerevisions(repo, path, revisions, fields):
987 def emitfilerevisions(repo, path, revisions, fields):
988 clnode = repo.changelog.node
988 clnode = repo.changelog.node
989
989
990 for revision in revisions:
990 for revision in revisions:
991 d = {
991 d = {
992 b'node': revision.node,
992 b'node': revision.node,
993 }
993 }
994
994
995 if b'parents' in fields:
995 if b'parents' in fields:
996 d[b'parents'] = [revision.p1node, revision.p2node]
996 d[b'parents'] = [revision.p1node, revision.p2node]
997
997
998 if b'linknode' in fields:
998 if b'linknode' in fields:
999 # TODO by creating the filectx against a specific file revision
999 # TODO by creating the filectx against a specific file revision
1000 # instead of changeset, linkrev() is always used. This is wrong for
1000 # instead of changeset, linkrev() is always used. This is wrong for
1001 # cases where linkrev() may refer to a hidden changeset. We need an
1001 # cases where linkrev() may refer to a hidden changeset. We need an
1002 # API for performing linkrev adjustment that takes this into
1002 # API for performing linkrev adjustment that takes this into
1003 # account.
1003 # account.
1004 fctx = repo.filectx(path, fileid=revision.node)
1004 fctx = repo.filectx(path, fileid=revision.node)
1005 d[b'linknode'] = clnode(fctx.introrev())
1005 d[b'linknode'] = clnode(fctx.introrev())
1006
1006
1007 followingmeta = []
1007 followingmeta = []
1008 followingdata = []
1008 followingdata = []
1009
1009
1010 if b'revision' in fields:
1010 if b'revision' in fields:
1011 if revision.revision is not None:
1011 if revision.revision is not None:
1012 followingmeta.append((b'revision', len(revision.revision)))
1012 followingmeta.append((b'revision', len(revision.revision)))
1013 followingdata.append(revision.revision)
1013 followingdata.append(revision.revision)
1014 else:
1014 else:
1015 d[b'deltabasenode'] = revision.basenode
1015 d[b'deltabasenode'] = revision.basenode
1016 followingmeta.append((b'delta', len(revision.delta)))
1016 followingmeta.append((b'delta', len(revision.delta)))
1017 followingdata.append(revision.delta)
1017 followingdata.append(revision.delta)
1018
1018
1019 if followingmeta:
1019 if followingmeta:
1020 d[b'fieldsfollowing'] = followingmeta
1020 d[b'fieldsfollowing'] = followingmeta
1021
1021
1022 yield d
1022 yield d
1023
1023
1024 for extra in followingdata:
1024 for extra in followingdata:
1025 yield extra
1025 yield extra
1026
1026
1027 def makefilematcher(repo, pathfilter):
1027 def makefilematcher(repo, pathfilter):
1028 """Construct a matcher from a path filter dict."""
1028 """Construct a matcher from a path filter dict."""
1029
1029
1030 # Validate values.
1030 # Validate values.
1031 if pathfilter:
1031 if pathfilter:
1032 for key in (b'include', b'exclude'):
1032 for key in (b'include', b'exclude'):
1033 for pattern in pathfilter.get(key, []):
1033 for pattern in pathfilter.get(key, []):
1034 if not pattern.startswith((b'path:', b'rootfilesin:')):
1034 if not pattern.startswith((b'path:', b'rootfilesin:')):
1035 raise error.WireprotoCommandError(
1035 raise error.WireprotoCommandError(
1036 '%s pattern must begin with `path:` or `rootfilesin:`; '
1036 '%s pattern must begin with `path:` or `rootfilesin:`; '
1037 'got %s', (key, pattern))
1037 'got %s', (key, pattern))
1038
1038
1039 if pathfilter:
1039 if pathfilter:
1040 matcher = matchmod.match(repo.root, b'',
1040 matcher = matchmod.match(repo.root, b'',
1041 include=pathfilter.get(b'include', []),
1041 include=pathfilter.get(b'include', []),
1042 exclude=pathfilter.get(b'exclude', []))
1042 exclude=pathfilter.get(b'exclude', []))
1043 else:
1043 else:
1044 matcher = matchmod.match(repo.root, b'')
1044 matcher = matchmod.match(repo.root, b'')
1045
1045
1046 # Requested patterns could include files not in the local store. So
1046 # Requested patterns could include files not in the local store. So
1047 # filter those out.
1047 # filter those out.
1048 return matchmod.intersectmatchers(repo.narrowmatch(), matcher)
1048 return repo.narrowmatch(matcher)
1049
1049
1050 @wireprotocommand(
1050 @wireprotocommand(
1051 'filedata',
1051 'filedata',
1052 args={
1052 args={
1053 'haveparents': {
1053 'haveparents': {
1054 'type': 'bool',
1054 'type': 'bool',
1055 'default': lambda: False,
1055 'default': lambda: False,
1056 'example': True,
1056 'example': True,
1057 },
1057 },
1058 'nodes': {
1058 'nodes': {
1059 'type': 'list',
1059 'type': 'list',
1060 'example': [b'0123456...'],
1060 'example': [b'0123456...'],
1061 },
1061 },
1062 'fields': {
1062 'fields': {
1063 'type': 'set',
1063 'type': 'set',
1064 'default': set,
1064 'default': set,
1065 'example': {b'parents', b'revision'},
1065 'example': {b'parents', b'revision'},
1066 'validvalues': {b'parents', b'revision', b'linknode'},
1066 'validvalues': {b'parents', b'revision', b'linknode'},
1067 },
1067 },
1068 'path': {
1068 'path': {
1069 'type': 'bytes',
1069 'type': 'bytes',
1070 'example': b'foo.txt',
1070 'example': b'foo.txt',
1071 }
1071 }
1072 },
1072 },
1073 permission='pull',
1073 permission='pull',
1074 # TODO censoring a file revision won't invalidate the cache.
1074 # TODO censoring a file revision won't invalidate the cache.
1075 # Figure out a way to take censoring into account when deriving
1075 # Figure out a way to take censoring into account when deriving
1076 # the cache key.
1076 # the cache key.
1077 cachekeyfn=makecommandcachekeyfn('filedata', 1, allargs=True))
1077 cachekeyfn=makecommandcachekeyfn('filedata', 1, allargs=True))
1078 def filedata(repo, proto, haveparents, nodes, fields, path):
1078 def filedata(repo, proto, haveparents, nodes, fields, path):
1079 # TODO this API allows access to file revisions that are attached to
1079 # TODO this API allows access to file revisions that are attached to
1080 # secret changesets. filesdata does not have this problem. Maybe this
1080 # secret changesets. filesdata does not have this problem. Maybe this
1081 # API should be deleted?
1081 # API should be deleted?
1082
1082
1083 try:
1083 try:
1084 # Extensions may wish to access the protocol handler.
1084 # Extensions may wish to access the protocol handler.
1085 store = getfilestore(repo, proto, path)
1085 store = getfilestore(repo, proto, path)
1086 except FileAccessError as e:
1086 except FileAccessError as e:
1087 raise error.WireprotoCommandError(e.msg, e.args)
1087 raise error.WireprotoCommandError(e.msg, e.args)
1088
1088
1089 # Validate requested nodes.
1089 # Validate requested nodes.
1090 for node in nodes:
1090 for node in nodes:
1091 try:
1091 try:
1092 store.rev(node)
1092 store.rev(node)
1093 except error.LookupError:
1093 except error.LookupError:
1094 raise error.WireprotoCommandError('unknown file node: %s',
1094 raise error.WireprotoCommandError('unknown file node: %s',
1095 (hex(node),))
1095 (hex(node),))
1096
1096
1097 revisions = store.emitrevisions(nodes,
1097 revisions = store.emitrevisions(nodes,
1098 revisiondata=b'revision' in fields,
1098 revisiondata=b'revision' in fields,
1099 assumehaveparentrevisions=haveparents)
1099 assumehaveparentrevisions=haveparents)
1100
1100
1101 yield {
1101 yield {
1102 b'totalitems': len(nodes),
1102 b'totalitems': len(nodes),
1103 }
1103 }
1104
1104
1105 for o in emitfilerevisions(repo, path, revisions, fields):
1105 for o in emitfilerevisions(repo, path, revisions, fields):
1106 yield o
1106 yield o
1107
1107
1108 def filesdatacapabilities(repo, proto):
1108 def filesdatacapabilities(repo, proto):
1109 batchsize = repo.ui.configint(
1109 batchsize = repo.ui.configint(
1110 b'experimental', b'server.filesdata.recommended-batch-size')
1110 b'experimental', b'server.filesdata.recommended-batch-size')
1111 return {
1111 return {
1112 b'recommendedbatchsize': batchsize,
1112 b'recommendedbatchsize': batchsize,
1113 }
1113 }
1114
1114
1115 @wireprotocommand(
1115 @wireprotocommand(
1116 'filesdata',
1116 'filesdata',
1117 args={
1117 args={
1118 'haveparents': {
1118 'haveparents': {
1119 'type': 'bool',
1119 'type': 'bool',
1120 'default': lambda: False,
1120 'default': lambda: False,
1121 'example': True,
1121 'example': True,
1122 },
1122 },
1123 'fields': {
1123 'fields': {
1124 'type': 'set',
1124 'type': 'set',
1125 'default': set,
1125 'default': set,
1126 'example': {b'parents', b'revision'},
1126 'example': {b'parents', b'revision'},
1127 'validvalues': {b'firstchangeset', b'linknode', b'parents',
1127 'validvalues': {b'firstchangeset', b'linknode', b'parents',
1128 b'revision'},
1128 b'revision'},
1129 },
1129 },
1130 'pathfilter': {
1130 'pathfilter': {
1131 'type': 'dict',
1131 'type': 'dict',
1132 'default': lambda: None,
1132 'default': lambda: None,
1133 'example': {b'include': [b'path:tests']},
1133 'example': {b'include': [b'path:tests']},
1134 },
1134 },
1135 'revisions': {
1135 'revisions': {
1136 'type': 'list',
1136 'type': 'list',
1137 'example': [{
1137 'example': [{
1138 b'type': b'changesetexplicit',
1138 b'type': b'changesetexplicit',
1139 b'nodes': [b'abcdef...'],
1139 b'nodes': [b'abcdef...'],
1140 }],
1140 }],
1141 },
1141 },
1142 },
1142 },
1143 permission='pull',
1143 permission='pull',
1144 # TODO censoring a file revision won't invalidate the cache.
1144 # TODO censoring a file revision won't invalidate the cache.
1145 # Figure out a way to take censoring into account when deriving
1145 # Figure out a way to take censoring into account when deriving
1146 # the cache key.
1146 # the cache key.
1147 cachekeyfn=makecommandcachekeyfn('filesdata', 1, allargs=True),
1147 cachekeyfn=makecommandcachekeyfn('filesdata', 1, allargs=True),
1148 extracapabilitiesfn=filesdatacapabilities)
1148 extracapabilitiesfn=filesdatacapabilities)
1149 def filesdata(repo, proto, haveparents, fields, pathfilter, revisions):
1149 def filesdata(repo, proto, haveparents, fields, pathfilter, revisions):
1150 # TODO This should operate on a repo that exposes obsolete changesets. There
1150 # TODO This should operate on a repo that exposes obsolete changesets. There
1151 # is a race between a client making a push that obsoletes a changeset and
1151 # is a race between a client making a push that obsoletes a changeset and
1152 # another client fetching files data for that changeset. If a client has a
1152 # another client fetching files data for that changeset. If a client has a
1153 # changeset, it should probably be allowed to access files data for that
1153 # changeset, it should probably be allowed to access files data for that
1154 # changeset.
1154 # changeset.
1155
1155
1156 cl = repo.changelog
1156 cl = repo.changelog
1157 outgoing = resolvenodes(repo, revisions)
1157 outgoing = resolvenodes(repo, revisions)
1158 filematcher = makefilematcher(repo, pathfilter)
1158 filematcher = makefilematcher(repo, pathfilter)
1159
1159
1160 # Figure out what needs to be emitted.
1160 # Figure out what needs to be emitted.
1161 changedpaths = set()
1161 changedpaths = set()
1162 fnodes = collections.defaultdict(set)
1162 fnodes = collections.defaultdict(set)
1163
1163
1164 for node in outgoing:
1164 for node in outgoing:
1165 ctx = repo[node]
1165 ctx = repo[node]
1166 changedpaths.update(ctx.files())
1166 changedpaths.update(ctx.files())
1167
1167
1168 changedpaths = sorted(p for p in changedpaths if filematcher(p))
1168 changedpaths = sorted(p for p in changedpaths if filematcher(p))
1169
1169
1170 # If ancestors are known, we send file revisions having a linkrev in the
1170 # If ancestors are known, we send file revisions having a linkrev in the
1171 # outgoing set of changeset revisions.
1171 # outgoing set of changeset revisions.
1172 if haveparents:
1172 if haveparents:
1173 outgoingclrevs = set(cl.rev(n) for n in outgoing)
1173 outgoingclrevs = set(cl.rev(n) for n in outgoing)
1174
1174
1175 for path in changedpaths:
1175 for path in changedpaths:
1176 try:
1176 try:
1177 store = getfilestore(repo, proto, path)
1177 store = getfilestore(repo, proto, path)
1178 except FileAccessError as e:
1178 except FileAccessError as e:
1179 raise error.WireprotoCommandError(e.msg, e.args)
1179 raise error.WireprotoCommandError(e.msg, e.args)
1180
1180
1181 for rev in store:
1181 for rev in store:
1182 linkrev = store.linkrev(rev)
1182 linkrev = store.linkrev(rev)
1183
1183
1184 if linkrev in outgoingclrevs:
1184 if linkrev in outgoingclrevs:
1185 fnodes[path].add(store.node(rev))
1185 fnodes[path].add(store.node(rev))
1186
1186
1187 # If ancestors aren't known, we walk the manifests and send all
1187 # If ancestors aren't known, we walk the manifests and send all
1188 # encountered file revisions.
1188 # encountered file revisions.
1189 else:
1189 else:
1190 for node in outgoing:
1190 for node in outgoing:
1191 mctx = repo[node].manifestctx()
1191 mctx = repo[node].manifestctx()
1192
1192
1193 for path, fnode in mctx.read().items():
1193 for path, fnode in mctx.read().items():
1194 if filematcher(path):
1194 if filematcher(path):
1195 fnodes[path].add(fnode)
1195 fnodes[path].add(fnode)
1196
1196
1197 yield {
1197 yield {
1198 b'totalpaths': len(fnodes),
1198 b'totalpaths': len(fnodes),
1199 b'totalitems': sum(len(v) for v in fnodes.values())
1199 b'totalitems': sum(len(v) for v in fnodes.values())
1200 }
1200 }
1201
1201
1202 for path, filenodes in sorted(fnodes.items()):
1202 for path, filenodes in sorted(fnodes.items()):
1203 try:
1203 try:
1204 store = getfilestore(repo, proto, path)
1204 store = getfilestore(repo, proto, path)
1205 except FileAccessError as e:
1205 except FileAccessError as e:
1206 raise error.WireprotoCommandError(e.msg, e.args)
1206 raise error.WireprotoCommandError(e.msg, e.args)
1207
1207
1208 yield {
1208 yield {
1209 b'path': path,
1209 b'path': path,
1210 b'totalitems': len(filenodes),
1210 b'totalitems': len(filenodes),
1211 }
1211 }
1212
1212
1213 revisions = store.emitrevisions(filenodes,
1213 revisions = store.emitrevisions(filenodes,
1214 revisiondata=b'revision' in fields,
1214 revisiondata=b'revision' in fields,
1215 assumehaveparentrevisions=haveparents)
1215 assumehaveparentrevisions=haveparents)
1216
1216
1217 for o in emitfilerevisions(repo, path, revisions, fields):
1217 for o in emitfilerevisions(repo, path, revisions, fields):
1218 yield o
1218 yield o
1219
1219
1220 @wireprotocommand(
1220 @wireprotocommand(
1221 'heads',
1221 'heads',
1222 args={
1222 args={
1223 'publiconly': {
1223 'publiconly': {
1224 'type': 'bool',
1224 'type': 'bool',
1225 'default': lambda: False,
1225 'default': lambda: False,
1226 'example': False,
1226 'example': False,
1227 },
1227 },
1228 },
1228 },
1229 permission='pull')
1229 permission='pull')
1230 def headsv2(repo, proto, publiconly):
1230 def headsv2(repo, proto, publiconly):
1231 if publiconly:
1231 if publiconly:
1232 repo = repo.filtered('immutable')
1232 repo = repo.filtered('immutable')
1233
1233
1234 yield repo.heads()
1234 yield repo.heads()
1235
1235
1236 @wireprotocommand(
1236 @wireprotocommand(
1237 'known',
1237 'known',
1238 args={
1238 args={
1239 'nodes': {
1239 'nodes': {
1240 'type': 'list',
1240 'type': 'list',
1241 'default': list,
1241 'default': list,
1242 'example': [b'deadbeef'],
1242 'example': [b'deadbeef'],
1243 },
1243 },
1244 },
1244 },
1245 permission='pull')
1245 permission='pull')
1246 def knownv2(repo, proto, nodes):
1246 def knownv2(repo, proto, nodes):
1247 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes))
1247 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes))
1248 yield result
1248 yield result
1249
1249
1250 @wireprotocommand(
1250 @wireprotocommand(
1251 'listkeys',
1251 'listkeys',
1252 args={
1252 args={
1253 'namespace': {
1253 'namespace': {
1254 'type': 'bytes',
1254 'type': 'bytes',
1255 'example': b'ns',
1255 'example': b'ns',
1256 },
1256 },
1257 },
1257 },
1258 permission='pull')
1258 permission='pull')
1259 def listkeysv2(repo, proto, namespace):
1259 def listkeysv2(repo, proto, namespace):
1260 keys = repo.listkeys(encoding.tolocal(namespace))
1260 keys = repo.listkeys(encoding.tolocal(namespace))
1261 keys = {encoding.fromlocal(k): encoding.fromlocal(v)
1261 keys = {encoding.fromlocal(k): encoding.fromlocal(v)
1262 for k, v in keys.iteritems()}
1262 for k, v in keys.iteritems()}
1263
1263
1264 yield keys
1264 yield keys
1265
1265
1266 @wireprotocommand(
1266 @wireprotocommand(
1267 'lookup',
1267 'lookup',
1268 args={
1268 args={
1269 'key': {
1269 'key': {
1270 'type': 'bytes',
1270 'type': 'bytes',
1271 'example': b'foo',
1271 'example': b'foo',
1272 },
1272 },
1273 },
1273 },
1274 permission='pull')
1274 permission='pull')
1275 def lookupv2(repo, proto, key):
1275 def lookupv2(repo, proto, key):
1276 key = encoding.tolocal(key)
1276 key = encoding.tolocal(key)
1277
1277
1278 # TODO handle exception.
1278 # TODO handle exception.
1279 node = repo.lookup(key)
1279 node = repo.lookup(key)
1280
1280
1281 yield node
1281 yield node
1282
1282
1283 def manifestdatacapabilities(repo, proto):
1283 def manifestdatacapabilities(repo, proto):
1284 batchsize = repo.ui.configint(
1284 batchsize = repo.ui.configint(
1285 b'experimental', b'server.manifestdata.recommended-batch-size')
1285 b'experimental', b'server.manifestdata.recommended-batch-size')
1286
1286
1287 return {
1287 return {
1288 b'recommendedbatchsize': batchsize,
1288 b'recommendedbatchsize': batchsize,
1289 }
1289 }
1290
1290
1291 @wireprotocommand(
1291 @wireprotocommand(
1292 'manifestdata',
1292 'manifestdata',
1293 args={
1293 args={
1294 'nodes': {
1294 'nodes': {
1295 'type': 'list',
1295 'type': 'list',
1296 'example': [b'0123456...'],
1296 'example': [b'0123456...'],
1297 },
1297 },
1298 'haveparents': {
1298 'haveparents': {
1299 'type': 'bool',
1299 'type': 'bool',
1300 'default': lambda: False,
1300 'default': lambda: False,
1301 'example': True,
1301 'example': True,
1302 },
1302 },
1303 'fields': {
1303 'fields': {
1304 'type': 'set',
1304 'type': 'set',
1305 'default': set,
1305 'default': set,
1306 'example': {b'parents', b'revision'},
1306 'example': {b'parents', b'revision'},
1307 'validvalues': {b'parents', b'revision'},
1307 'validvalues': {b'parents', b'revision'},
1308 },
1308 },
1309 'tree': {
1309 'tree': {
1310 'type': 'bytes',
1310 'type': 'bytes',
1311 'example': b'',
1311 'example': b'',
1312 },
1312 },
1313 },
1313 },
1314 permission='pull',
1314 permission='pull',
1315 cachekeyfn=makecommandcachekeyfn('manifestdata', 1, allargs=True),
1315 cachekeyfn=makecommandcachekeyfn('manifestdata', 1, allargs=True),
1316 extracapabilitiesfn=manifestdatacapabilities)
1316 extracapabilitiesfn=manifestdatacapabilities)
1317 def manifestdata(repo, proto, haveparents, nodes, fields, tree):
1317 def manifestdata(repo, proto, haveparents, nodes, fields, tree):
1318 store = repo.manifestlog.getstorage(tree)
1318 store = repo.manifestlog.getstorage(tree)
1319
1319
1320 # Validate the node is known and abort on unknown revisions.
1320 # Validate the node is known and abort on unknown revisions.
1321 for node in nodes:
1321 for node in nodes:
1322 try:
1322 try:
1323 store.rev(node)
1323 store.rev(node)
1324 except error.LookupError:
1324 except error.LookupError:
1325 raise error.WireprotoCommandError(
1325 raise error.WireprotoCommandError(
1326 'unknown node: %s', (node,))
1326 'unknown node: %s', (node,))
1327
1327
1328 revisions = store.emitrevisions(nodes,
1328 revisions = store.emitrevisions(nodes,
1329 revisiondata=b'revision' in fields,
1329 revisiondata=b'revision' in fields,
1330 assumehaveparentrevisions=haveparents)
1330 assumehaveparentrevisions=haveparents)
1331
1331
1332 yield {
1332 yield {
1333 b'totalitems': len(nodes),
1333 b'totalitems': len(nodes),
1334 }
1334 }
1335
1335
1336 for revision in revisions:
1336 for revision in revisions:
1337 d = {
1337 d = {
1338 b'node': revision.node,
1338 b'node': revision.node,
1339 }
1339 }
1340
1340
1341 if b'parents' in fields:
1341 if b'parents' in fields:
1342 d[b'parents'] = [revision.p1node, revision.p2node]
1342 d[b'parents'] = [revision.p1node, revision.p2node]
1343
1343
1344 followingmeta = []
1344 followingmeta = []
1345 followingdata = []
1345 followingdata = []
1346
1346
1347 if b'revision' in fields:
1347 if b'revision' in fields:
1348 if revision.revision is not None:
1348 if revision.revision is not None:
1349 followingmeta.append((b'revision', len(revision.revision)))
1349 followingmeta.append((b'revision', len(revision.revision)))
1350 followingdata.append(revision.revision)
1350 followingdata.append(revision.revision)
1351 else:
1351 else:
1352 d[b'deltabasenode'] = revision.basenode
1352 d[b'deltabasenode'] = revision.basenode
1353 followingmeta.append((b'delta', len(revision.delta)))
1353 followingmeta.append((b'delta', len(revision.delta)))
1354 followingdata.append(revision.delta)
1354 followingdata.append(revision.delta)
1355
1355
1356 if followingmeta:
1356 if followingmeta:
1357 d[b'fieldsfollowing'] = followingmeta
1357 d[b'fieldsfollowing'] = followingmeta
1358
1358
1359 yield d
1359 yield d
1360
1360
1361 for extra in followingdata:
1361 for extra in followingdata:
1362 yield extra
1362 yield extra
1363
1363
1364 @wireprotocommand(
1364 @wireprotocommand(
1365 'pushkey',
1365 'pushkey',
1366 args={
1366 args={
1367 'namespace': {
1367 'namespace': {
1368 'type': 'bytes',
1368 'type': 'bytes',
1369 'example': b'ns',
1369 'example': b'ns',
1370 },
1370 },
1371 'key': {
1371 'key': {
1372 'type': 'bytes',
1372 'type': 'bytes',
1373 'example': b'key',
1373 'example': b'key',
1374 },
1374 },
1375 'old': {
1375 'old': {
1376 'type': 'bytes',
1376 'type': 'bytes',
1377 'example': b'old',
1377 'example': b'old',
1378 },
1378 },
1379 'new': {
1379 'new': {
1380 'type': 'bytes',
1380 'type': 'bytes',
1381 'example': 'new',
1381 'example': 'new',
1382 },
1382 },
1383 },
1383 },
1384 permission='push')
1384 permission='push')
1385 def pushkeyv2(repo, proto, namespace, key, old, new):
1385 def pushkeyv2(repo, proto, namespace, key, old, new):
1386 # TODO handle ui output redirection
1386 # TODO handle ui output redirection
1387 yield repo.pushkey(encoding.tolocal(namespace),
1387 yield repo.pushkey(encoding.tolocal(namespace),
1388 encoding.tolocal(key),
1388 encoding.tolocal(key),
1389 encoding.tolocal(old),
1389 encoding.tolocal(old),
1390 encoding.tolocal(new))
1390 encoding.tolocal(new))
1391
1391
1392
1392
1393 @wireprotocommand(
1393 @wireprotocommand(
1394 'rawstorefiledata',
1394 'rawstorefiledata',
1395 args={
1395 args={
1396 'files': {
1396 'files': {
1397 'type': 'list',
1397 'type': 'list',
1398 'example': [b'changelog', b'manifestlog'],
1398 'example': [b'changelog', b'manifestlog'],
1399 },
1399 },
1400 'pathfilter': {
1400 'pathfilter': {
1401 'type': 'list',
1401 'type': 'list',
1402 'default': lambda: None,
1402 'default': lambda: None,
1403 'example': {b'include': [b'path:tests']},
1403 'example': {b'include': [b'path:tests']},
1404 },
1404 },
1405 },
1405 },
1406 permission='pull')
1406 permission='pull')
1407 def rawstorefiledata(repo, proto, files, pathfilter):
1407 def rawstorefiledata(repo, proto, files, pathfilter):
1408 if not streamclone.allowservergeneration(repo):
1408 if not streamclone.allowservergeneration(repo):
1409 raise error.WireprotoCommandError(b'stream clone is disabled')
1409 raise error.WireprotoCommandError(b'stream clone is disabled')
1410
1410
1411 # TODO support dynamically advertising what store files "sets" are
1411 # TODO support dynamically advertising what store files "sets" are
1412 # available. For now, we support changelog, manifestlog, and files.
1412 # available. For now, we support changelog, manifestlog, and files.
1413 files = set(files)
1413 files = set(files)
1414 allowedfiles = {b'changelog', b'manifestlog'}
1414 allowedfiles = {b'changelog', b'manifestlog'}
1415
1415
1416 unsupported = files - allowedfiles
1416 unsupported = files - allowedfiles
1417 if unsupported:
1417 if unsupported:
1418 raise error.WireprotoCommandError(b'unknown file type: %s',
1418 raise error.WireprotoCommandError(b'unknown file type: %s',
1419 (b', '.join(sorted(unsupported)),))
1419 (b', '.join(sorted(unsupported)),))
1420
1420
1421 with repo.lock():
1421 with repo.lock():
1422 topfiles = list(repo.store.topfiles())
1422 topfiles = list(repo.store.topfiles())
1423
1423
1424 sendfiles = []
1424 sendfiles = []
1425 totalsize = 0
1425 totalsize = 0
1426
1426
1427 # TODO this is a bunch of storage layer interface abstractions because
1427 # TODO this is a bunch of storage layer interface abstractions because
1428 # it assumes revlogs.
1428 # it assumes revlogs.
1429 for name, encodedname, size in topfiles:
1429 for name, encodedname, size in topfiles:
1430 if b'changelog' in files and name.startswith(b'00changelog'):
1430 if b'changelog' in files and name.startswith(b'00changelog'):
1431 pass
1431 pass
1432 elif b'manifestlog' in files and name.startswith(b'00manifest'):
1432 elif b'manifestlog' in files and name.startswith(b'00manifest'):
1433 pass
1433 pass
1434 else:
1434 else:
1435 continue
1435 continue
1436
1436
1437 sendfiles.append((b'store', name, size))
1437 sendfiles.append((b'store', name, size))
1438 totalsize += size
1438 totalsize += size
1439
1439
1440 yield {
1440 yield {
1441 b'filecount': len(sendfiles),
1441 b'filecount': len(sendfiles),
1442 b'totalsize': totalsize,
1442 b'totalsize': totalsize,
1443 }
1443 }
1444
1444
1445 for location, name, size in sendfiles:
1445 for location, name, size in sendfiles:
1446 yield {
1446 yield {
1447 b'location': location,
1447 b'location': location,
1448 b'path': name,
1448 b'path': name,
1449 b'size': size,
1449 b'size': size,
1450 }
1450 }
1451
1451
1452 # We have to use a closure for this to ensure the context manager is
1452 # We have to use a closure for this to ensure the context manager is
1453 # closed only after sending the final chunk.
1453 # closed only after sending the final chunk.
1454 def getfiledata():
1454 def getfiledata():
1455 with repo.svfs(name, 'rb', auditpath=False) as fh:
1455 with repo.svfs(name, 'rb', auditpath=False) as fh:
1456 for chunk in util.filechunkiter(fh, limit=size):
1456 for chunk in util.filechunkiter(fh, limit=size):
1457 yield chunk
1457 yield chunk
1458
1458
1459 yield wireprototypes.indefinitebytestringresponse(
1459 yield wireprototypes.indefinitebytestringresponse(
1460 getfiledata())
1460 getfiledata())
General Comments 0
You need to be logged in to leave comments. Login now