##// END OF EJS Templates
wireproto: only expose "debugwireargs" to version 1 transports...
Gregory Szorc -
r37508:3a91911c default
parent child Browse files
Show More
@@ -1,1244 +1,1244 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import os
11 import os
12 import tempfile
12 import tempfile
13
13
14 from .i18n import _
14 from .i18n import _
15 from .node import (
15 from .node import (
16 bin,
16 bin,
17 hex,
17 hex,
18 nullid,
18 nullid,
19 )
19 )
20
20
21 from . import (
21 from . import (
22 bundle2,
22 bundle2,
23 changegroup as changegroupmod,
23 changegroup as changegroupmod,
24 discovery,
24 discovery,
25 encoding,
25 encoding,
26 error,
26 error,
27 exchange,
27 exchange,
28 peer,
28 peer,
29 pushkey as pushkeymod,
29 pushkey as pushkeymod,
30 pycompat,
30 pycompat,
31 repository,
31 repository,
32 streamclone,
32 streamclone,
33 util,
33 util,
34 wireprototypes,
34 wireprototypes,
35 )
35 )
36
36
37 from .utils import (
37 from .utils import (
38 procutil,
38 procutil,
39 stringutil,
39 stringutil,
40 )
40 )
41
41
42 urlerr = util.urlerr
42 urlerr = util.urlerr
43 urlreq = util.urlreq
43 urlreq = util.urlreq
44
44
45 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
45 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
46 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
46 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
47 'IncompatibleClient')
47 'IncompatibleClient')
48 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
48 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
49
49
50 class remoteiterbatcher(peer.iterbatcher):
50 class remoteiterbatcher(peer.iterbatcher):
51 def __init__(self, remote):
51 def __init__(self, remote):
52 super(remoteiterbatcher, self).__init__()
52 super(remoteiterbatcher, self).__init__()
53 self._remote = remote
53 self._remote = remote
54
54
55 def __getattr__(self, name):
55 def __getattr__(self, name):
56 # Validate this method is batchable, since submit() only supports
56 # Validate this method is batchable, since submit() only supports
57 # batchable methods.
57 # batchable methods.
58 fn = getattr(self._remote, name)
58 fn = getattr(self._remote, name)
59 if not getattr(fn, 'batchable', None):
59 if not getattr(fn, 'batchable', None):
60 raise error.ProgrammingError('Attempted to batch a non-batchable '
60 raise error.ProgrammingError('Attempted to batch a non-batchable '
61 'call to %r' % name)
61 'call to %r' % name)
62
62
63 return super(remoteiterbatcher, self).__getattr__(name)
63 return super(remoteiterbatcher, self).__getattr__(name)
64
64
65 def submit(self):
65 def submit(self):
66 """Break the batch request into many patch calls and pipeline them.
66 """Break the batch request into many patch calls and pipeline them.
67
67
68 This is mostly valuable over http where request sizes can be
68 This is mostly valuable over http where request sizes can be
69 limited, but can be used in other places as well.
69 limited, but can be used in other places as well.
70 """
70 """
71 # 2-tuple of (command, arguments) that represents what will be
71 # 2-tuple of (command, arguments) that represents what will be
72 # sent over the wire.
72 # sent over the wire.
73 requests = []
73 requests = []
74
74
75 # 4-tuple of (command, final future, @batchable generator, remote
75 # 4-tuple of (command, final future, @batchable generator, remote
76 # future).
76 # future).
77 results = []
77 results = []
78
78
79 for command, args, opts, finalfuture in self.calls:
79 for command, args, opts, finalfuture in self.calls:
80 mtd = getattr(self._remote, command)
80 mtd = getattr(self._remote, command)
81 batchable = mtd.batchable(mtd.__self__, *args, **opts)
81 batchable = mtd.batchable(mtd.__self__, *args, **opts)
82
82
83 commandargs, fremote = next(batchable)
83 commandargs, fremote = next(batchable)
84 assert fremote
84 assert fremote
85 requests.append((command, commandargs))
85 requests.append((command, commandargs))
86 results.append((command, finalfuture, batchable, fremote))
86 results.append((command, finalfuture, batchable, fremote))
87
87
88 if requests:
88 if requests:
89 self._resultiter = self._remote._submitbatch(requests)
89 self._resultiter = self._remote._submitbatch(requests)
90
90
91 self._results = results
91 self._results = results
92
92
93 def results(self):
93 def results(self):
94 for command, finalfuture, batchable, remotefuture in self._results:
94 for command, finalfuture, batchable, remotefuture in self._results:
95 # Get the raw result, set it in the remote future, feed it
95 # Get the raw result, set it in the remote future, feed it
96 # back into the @batchable generator so it can be decoded, and
96 # back into the @batchable generator so it can be decoded, and
97 # set the result on the final future to this value.
97 # set the result on the final future to this value.
98 remoteresult = next(self._resultiter)
98 remoteresult = next(self._resultiter)
99 remotefuture.set(remoteresult)
99 remotefuture.set(remoteresult)
100 finalfuture.set(next(batchable))
100 finalfuture.set(next(batchable))
101
101
102 # Verify our @batchable generators only emit 2 values.
102 # Verify our @batchable generators only emit 2 values.
103 try:
103 try:
104 next(batchable)
104 next(batchable)
105 except StopIteration:
105 except StopIteration:
106 pass
106 pass
107 else:
107 else:
108 raise error.ProgrammingError('%s @batchable generator emitted '
108 raise error.ProgrammingError('%s @batchable generator emitted '
109 'unexpected value count' % command)
109 'unexpected value count' % command)
110
110
111 yield finalfuture.value
111 yield finalfuture.value
112
112
113 # Forward a couple of names from peer to make wireproto interactions
113 # Forward a couple of names from peer to make wireproto interactions
114 # slightly more sensible.
114 # slightly more sensible.
115 batchable = peer.batchable
115 batchable = peer.batchable
116 future = peer.future
116 future = peer.future
117
117
118 # list of nodes encoding / decoding
118 # list of nodes encoding / decoding
119
119
120 def decodelist(l, sep=' '):
120 def decodelist(l, sep=' '):
121 if l:
121 if l:
122 return [bin(v) for v in l.split(sep)]
122 return [bin(v) for v in l.split(sep)]
123 return []
123 return []
124
124
125 def encodelist(l, sep=' '):
125 def encodelist(l, sep=' '):
126 try:
126 try:
127 return sep.join(map(hex, l))
127 return sep.join(map(hex, l))
128 except TypeError:
128 except TypeError:
129 raise
129 raise
130
130
131 # batched call argument encoding
131 # batched call argument encoding
132
132
133 def escapearg(plain):
133 def escapearg(plain):
134 return (plain
134 return (plain
135 .replace(':', ':c')
135 .replace(':', ':c')
136 .replace(',', ':o')
136 .replace(',', ':o')
137 .replace(';', ':s')
137 .replace(';', ':s')
138 .replace('=', ':e'))
138 .replace('=', ':e'))
139
139
140 def unescapearg(escaped):
140 def unescapearg(escaped):
141 return (escaped
141 return (escaped
142 .replace(':e', '=')
142 .replace(':e', '=')
143 .replace(':s', ';')
143 .replace(':s', ';')
144 .replace(':o', ',')
144 .replace(':o', ',')
145 .replace(':c', ':'))
145 .replace(':c', ':'))
146
146
147 def encodebatchcmds(req):
147 def encodebatchcmds(req):
148 """Return a ``cmds`` argument value for the ``batch`` command."""
148 """Return a ``cmds`` argument value for the ``batch`` command."""
149 cmds = []
149 cmds = []
150 for op, argsdict in req:
150 for op, argsdict in req:
151 # Old servers didn't properly unescape argument names. So prevent
151 # Old servers didn't properly unescape argument names. So prevent
152 # the sending of argument names that may not be decoded properly by
152 # the sending of argument names that may not be decoded properly by
153 # servers.
153 # servers.
154 assert all(escapearg(k) == k for k in argsdict)
154 assert all(escapearg(k) == k for k in argsdict)
155
155
156 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
156 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
157 for k, v in argsdict.iteritems())
157 for k, v in argsdict.iteritems())
158 cmds.append('%s %s' % (op, args))
158 cmds.append('%s %s' % (op, args))
159
159
160 return ';'.join(cmds)
160 return ';'.join(cmds)
161
161
162 def clientcompressionsupport(proto):
162 def clientcompressionsupport(proto):
163 """Returns a list of compression methods supported by the client.
163 """Returns a list of compression methods supported by the client.
164
164
165 Returns a list of the compression methods supported by the client
165 Returns a list of the compression methods supported by the client
166 according to the protocol capabilities. If no such capability has
166 according to the protocol capabilities. If no such capability has
167 been announced, fallback to the default of zlib and uncompressed.
167 been announced, fallback to the default of zlib and uncompressed.
168 """
168 """
169 for cap in proto.getprotocaps():
169 for cap in proto.getprotocaps():
170 if cap.startswith('comp='):
170 if cap.startswith('comp='):
171 return cap[5:].split(',')
171 return cap[5:].split(',')
172 return ['zlib', 'none']
172 return ['zlib', 'none']
173
173
174 # mapping of options accepted by getbundle and their types
174 # mapping of options accepted by getbundle and their types
175 #
175 #
176 # Meant to be extended by extensions. It is extensions responsibility to ensure
176 # Meant to be extended by extensions. It is extensions responsibility to ensure
177 # such options are properly processed in exchange.getbundle.
177 # such options are properly processed in exchange.getbundle.
178 #
178 #
179 # supported types are:
179 # supported types are:
180 #
180 #
181 # :nodes: list of binary nodes
181 # :nodes: list of binary nodes
182 # :csv: list of comma-separated values
182 # :csv: list of comma-separated values
183 # :scsv: list of comma-separated values return as set
183 # :scsv: list of comma-separated values return as set
184 # :plain: string with no transformation needed.
184 # :plain: string with no transformation needed.
185 gboptsmap = {'heads': 'nodes',
185 gboptsmap = {'heads': 'nodes',
186 'bookmarks': 'boolean',
186 'bookmarks': 'boolean',
187 'common': 'nodes',
187 'common': 'nodes',
188 'obsmarkers': 'boolean',
188 'obsmarkers': 'boolean',
189 'phases': 'boolean',
189 'phases': 'boolean',
190 'bundlecaps': 'scsv',
190 'bundlecaps': 'scsv',
191 'listkeys': 'csv',
191 'listkeys': 'csv',
192 'cg': 'boolean',
192 'cg': 'boolean',
193 'cbattempted': 'boolean',
193 'cbattempted': 'boolean',
194 'stream': 'boolean',
194 'stream': 'boolean',
195 }
195 }
196
196
197 # client side
197 # client side
198
198
199 class wirepeer(repository.legacypeer):
199 class wirepeer(repository.legacypeer):
200 """Client-side interface for communicating with a peer repository.
200 """Client-side interface for communicating with a peer repository.
201
201
202 Methods commonly call wire protocol commands of the same name.
202 Methods commonly call wire protocol commands of the same name.
203
203
204 See also httppeer.py and sshpeer.py for protocol-specific
204 See also httppeer.py and sshpeer.py for protocol-specific
205 implementations of this interface.
205 implementations of this interface.
206 """
206 """
207 # Begin of ipeercommands interface.
207 # Begin of ipeercommands interface.
208
208
209 def iterbatch(self):
209 def iterbatch(self):
210 return remoteiterbatcher(self)
210 return remoteiterbatcher(self)
211
211
212 @batchable
212 @batchable
213 def lookup(self, key):
213 def lookup(self, key):
214 self.requirecap('lookup', _('look up remote revision'))
214 self.requirecap('lookup', _('look up remote revision'))
215 f = future()
215 f = future()
216 yield {'key': encoding.fromlocal(key)}, f
216 yield {'key': encoding.fromlocal(key)}, f
217 d = f.value
217 d = f.value
218 success, data = d[:-1].split(" ", 1)
218 success, data = d[:-1].split(" ", 1)
219 if int(success):
219 if int(success):
220 yield bin(data)
220 yield bin(data)
221 else:
221 else:
222 self._abort(error.RepoError(data))
222 self._abort(error.RepoError(data))
223
223
224 @batchable
224 @batchable
225 def heads(self):
225 def heads(self):
226 f = future()
226 f = future()
227 yield {}, f
227 yield {}, f
228 d = f.value
228 d = f.value
229 try:
229 try:
230 yield decodelist(d[:-1])
230 yield decodelist(d[:-1])
231 except ValueError:
231 except ValueError:
232 self._abort(error.ResponseError(_("unexpected response:"), d))
232 self._abort(error.ResponseError(_("unexpected response:"), d))
233
233
234 @batchable
234 @batchable
235 def known(self, nodes):
235 def known(self, nodes):
236 f = future()
236 f = future()
237 yield {'nodes': encodelist(nodes)}, f
237 yield {'nodes': encodelist(nodes)}, f
238 d = f.value
238 d = f.value
239 try:
239 try:
240 yield [bool(int(b)) for b in d]
240 yield [bool(int(b)) for b in d]
241 except ValueError:
241 except ValueError:
242 self._abort(error.ResponseError(_("unexpected response:"), d))
242 self._abort(error.ResponseError(_("unexpected response:"), d))
243
243
244 @batchable
244 @batchable
245 def branchmap(self):
245 def branchmap(self):
246 f = future()
246 f = future()
247 yield {}, f
247 yield {}, f
248 d = f.value
248 d = f.value
249 try:
249 try:
250 branchmap = {}
250 branchmap = {}
251 for branchpart in d.splitlines():
251 for branchpart in d.splitlines():
252 branchname, branchheads = branchpart.split(' ', 1)
252 branchname, branchheads = branchpart.split(' ', 1)
253 branchname = encoding.tolocal(urlreq.unquote(branchname))
253 branchname = encoding.tolocal(urlreq.unquote(branchname))
254 branchheads = decodelist(branchheads)
254 branchheads = decodelist(branchheads)
255 branchmap[branchname] = branchheads
255 branchmap[branchname] = branchheads
256 yield branchmap
256 yield branchmap
257 except TypeError:
257 except TypeError:
258 self._abort(error.ResponseError(_("unexpected response:"), d))
258 self._abort(error.ResponseError(_("unexpected response:"), d))
259
259
260 @batchable
260 @batchable
261 def listkeys(self, namespace):
261 def listkeys(self, namespace):
262 if not self.capable('pushkey'):
262 if not self.capable('pushkey'):
263 yield {}, None
263 yield {}, None
264 f = future()
264 f = future()
265 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
265 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
266 yield {'namespace': encoding.fromlocal(namespace)}, f
266 yield {'namespace': encoding.fromlocal(namespace)}, f
267 d = f.value
267 d = f.value
268 self.ui.debug('received listkey for "%s": %i bytes\n'
268 self.ui.debug('received listkey for "%s": %i bytes\n'
269 % (namespace, len(d)))
269 % (namespace, len(d)))
270 yield pushkeymod.decodekeys(d)
270 yield pushkeymod.decodekeys(d)
271
271
272 @batchable
272 @batchable
273 def pushkey(self, namespace, key, old, new):
273 def pushkey(self, namespace, key, old, new):
274 if not self.capable('pushkey'):
274 if not self.capable('pushkey'):
275 yield False, None
275 yield False, None
276 f = future()
276 f = future()
277 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
277 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
278 yield {'namespace': encoding.fromlocal(namespace),
278 yield {'namespace': encoding.fromlocal(namespace),
279 'key': encoding.fromlocal(key),
279 'key': encoding.fromlocal(key),
280 'old': encoding.fromlocal(old),
280 'old': encoding.fromlocal(old),
281 'new': encoding.fromlocal(new)}, f
281 'new': encoding.fromlocal(new)}, f
282 d = f.value
282 d = f.value
283 d, output = d.split('\n', 1)
283 d, output = d.split('\n', 1)
284 try:
284 try:
285 d = bool(int(d))
285 d = bool(int(d))
286 except ValueError:
286 except ValueError:
287 raise error.ResponseError(
287 raise error.ResponseError(
288 _('push failed (unexpected response):'), d)
288 _('push failed (unexpected response):'), d)
289 for l in output.splitlines(True):
289 for l in output.splitlines(True):
290 self.ui.status(_('remote: '), l)
290 self.ui.status(_('remote: '), l)
291 yield d
291 yield d
292
292
293 def stream_out(self):
293 def stream_out(self):
294 return self._callstream('stream_out')
294 return self._callstream('stream_out')
295
295
296 def getbundle(self, source, **kwargs):
296 def getbundle(self, source, **kwargs):
297 kwargs = pycompat.byteskwargs(kwargs)
297 kwargs = pycompat.byteskwargs(kwargs)
298 self.requirecap('getbundle', _('look up remote changes'))
298 self.requirecap('getbundle', _('look up remote changes'))
299 opts = {}
299 opts = {}
300 bundlecaps = kwargs.get('bundlecaps') or set()
300 bundlecaps = kwargs.get('bundlecaps') or set()
301 for key, value in kwargs.iteritems():
301 for key, value in kwargs.iteritems():
302 if value is None:
302 if value is None:
303 continue
303 continue
304 keytype = gboptsmap.get(key)
304 keytype = gboptsmap.get(key)
305 if keytype is None:
305 if keytype is None:
306 raise error.ProgrammingError(
306 raise error.ProgrammingError(
307 'Unexpectedly None keytype for key %s' % key)
307 'Unexpectedly None keytype for key %s' % key)
308 elif keytype == 'nodes':
308 elif keytype == 'nodes':
309 value = encodelist(value)
309 value = encodelist(value)
310 elif keytype == 'csv':
310 elif keytype == 'csv':
311 value = ','.join(value)
311 value = ','.join(value)
312 elif keytype == 'scsv':
312 elif keytype == 'scsv':
313 value = ','.join(sorted(value))
313 value = ','.join(sorted(value))
314 elif keytype == 'boolean':
314 elif keytype == 'boolean':
315 value = '%i' % bool(value)
315 value = '%i' % bool(value)
316 elif keytype != 'plain':
316 elif keytype != 'plain':
317 raise KeyError('unknown getbundle option type %s'
317 raise KeyError('unknown getbundle option type %s'
318 % keytype)
318 % keytype)
319 opts[key] = value
319 opts[key] = value
320 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
320 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
321 if any((cap.startswith('HG2') for cap in bundlecaps)):
321 if any((cap.startswith('HG2') for cap in bundlecaps)):
322 return bundle2.getunbundler(self.ui, f)
322 return bundle2.getunbundler(self.ui, f)
323 else:
323 else:
324 return changegroupmod.cg1unpacker(f, 'UN')
324 return changegroupmod.cg1unpacker(f, 'UN')
325
325
326 def unbundle(self, cg, heads, url):
326 def unbundle(self, cg, heads, url):
327 '''Send cg (a readable file-like object representing the
327 '''Send cg (a readable file-like object representing the
328 changegroup to push, typically a chunkbuffer object) to the
328 changegroup to push, typically a chunkbuffer object) to the
329 remote server as a bundle.
329 remote server as a bundle.
330
330
331 When pushing a bundle10 stream, return an integer indicating the
331 When pushing a bundle10 stream, return an integer indicating the
332 result of the push (see changegroup.apply()).
332 result of the push (see changegroup.apply()).
333
333
334 When pushing a bundle20 stream, return a bundle20 stream.
334 When pushing a bundle20 stream, return a bundle20 stream.
335
335
336 `url` is the url the client thinks it's pushing to, which is
336 `url` is the url the client thinks it's pushing to, which is
337 visible to hooks.
337 visible to hooks.
338 '''
338 '''
339
339
340 if heads != ['force'] and self.capable('unbundlehash'):
340 if heads != ['force'] and self.capable('unbundlehash'):
341 heads = encodelist(['hashed',
341 heads = encodelist(['hashed',
342 hashlib.sha1(''.join(sorted(heads))).digest()])
342 hashlib.sha1(''.join(sorted(heads))).digest()])
343 else:
343 else:
344 heads = encodelist(heads)
344 heads = encodelist(heads)
345
345
346 if util.safehasattr(cg, 'deltaheader'):
346 if util.safehasattr(cg, 'deltaheader'):
347 # this a bundle10, do the old style call sequence
347 # this a bundle10, do the old style call sequence
348 ret, output = self._callpush("unbundle", cg, heads=heads)
348 ret, output = self._callpush("unbundle", cg, heads=heads)
349 if ret == "":
349 if ret == "":
350 raise error.ResponseError(
350 raise error.ResponseError(
351 _('push failed:'), output)
351 _('push failed:'), output)
352 try:
352 try:
353 ret = int(ret)
353 ret = int(ret)
354 except ValueError:
354 except ValueError:
355 raise error.ResponseError(
355 raise error.ResponseError(
356 _('push failed (unexpected response):'), ret)
356 _('push failed (unexpected response):'), ret)
357
357
358 for l in output.splitlines(True):
358 for l in output.splitlines(True):
359 self.ui.status(_('remote: '), l)
359 self.ui.status(_('remote: '), l)
360 else:
360 else:
361 # bundle2 push. Send a stream, fetch a stream.
361 # bundle2 push. Send a stream, fetch a stream.
362 stream = self._calltwowaystream('unbundle', cg, heads=heads)
362 stream = self._calltwowaystream('unbundle', cg, heads=heads)
363 ret = bundle2.getunbundler(self.ui, stream)
363 ret = bundle2.getunbundler(self.ui, stream)
364 return ret
364 return ret
365
365
366 # End of ipeercommands interface.
366 # End of ipeercommands interface.
367
367
368 # Begin of ipeerlegacycommands interface.
368 # Begin of ipeerlegacycommands interface.
369
369
370 def branches(self, nodes):
370 def branches(self, nodes):
371 n = encodelist(nodes)
371 n = encodelist(nodes)
372 d = self._call("branches", nodes=n)
372 d = self._call("branches", nodes=n)
373 try:
373 try:
374 br = [tuple(decodelist(b)) for b in d.splitlines()]
374 br = [tuple(decodelist(b)) for b in d.splitlines()]
375 return br
375 return br
376 except ValueError:
376 except ValueError:
377 self._abort(error.ResponseError(_("unexpected response:"), d))
377 self._abort(error.ResponseError(_("unexpected response:"), d))
378
378
379 def between(self, pairs):
379 def between(self, pairs):
380 batch = 8 # avoid giant requests
380 batch = 8 # avoid giant requests
381 r = []
381 r = []
382 for i in xrange(0, len(pairs), batch):
382 for i in xrange(0, len(pairs), batch):
383 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
383 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
384 d = self._call("between", pairs=n)
384 d = self._call("between", pairs=n)
385 try:
385 try:
386 r.extend(l and decodelist(l) or [] for l in d.splitlines())
386 r.extend(l and decodelist(l) or [] for l in d.splitlines())
387 except ValueError:
387 except ValueError:
388 self._abort(error.ResponseError(_("unexpected response:"), d))
388 self._abort(error.ResponseError(_("unexpected response:"), d))
389 return r
389 return r
390
390
391 def changegroup(self, nodes, kind):
391 def changegroup(self, nodes, kind):
392 n = encodelist(nodes)
392 n = encodelist(nodes)
393 f = self._callcompressable("changegroup", roots=n)
393 f = self._callcompressable("changegroup", roots=n)
394 return changegroupmod.cg1unpacker(f, 'UN')
394 return changegroupmod.cg1unpacker(f, 'UN')
395
395
396 def changegroupsubset(self, bases, heads, kind):
396 def changegroupsubset(self, bases, heads, kind):
397 self.requirecap('changegroupsubset', _('look up remote changes'))
397 self.requirecap('changegroupsubset', _('look up remote changes'))
398 bases = encodelist(bases)
398 bases = encodelist(bases)
399 heads = encodelist(heads)
399 heads = encodelist(heads)
400 f = self._callcompressable("changegroupsubset",
400 f = self._callcompressable("changegroupsubset",
401 bases=bases, heads=heads)
401 bases=bases, heads=heads)
402 return changegroupmod.cg1unpacker(f, 'UN')
402 return changegroupmod.cg1unpacker(f, 'UN')
403
403
404 # End of ipeerlegacycommands interface.
404 # End of ipeerlegacycommands interface.
405
405
406 def _submitbatch(self, req):
406 def _submitbatch(self, req):
407 """run batch request <req> on the server
407 """run batch request <req> on the server
408
408
409 Returns an iterator of the raw responses from the server.
409 Returns an iterator of the raw responses from the server.
410 """
410 """
411 ui = self.ui
411 ui = self.ui
412 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
412 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
413 ui.debug('devel-peer-request: batched-content\n')
413 ui.debug('devel-peer-request: batched-content\n')
414 for op, args in req:
414 for op, args in req:
415 msg = 'devel-peer-request: - %s (%d arguments)\n'
415 msg = 'devel-peer-request: - %s (%d arguments)\n'
416 ui.debug(msg % (op, len(args)))
416 ui.debug(msg % (op, len(args)))
417
417
418 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
418 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
419 chunk = rsp.read(1024)
419 chunk = rsp.read(1024)
420 work = [chunk]
420 work = [chunk]
421 while chunk:
421 while chunk:
422 while ';' not in chunk and chunk:
422 while ';' not in chunk and chunk:
423 chunk = rsp.read(1024)
423 chunk = rsp.read(1024)
424 work.append(chunk)
424 work.append(chunk)
425 merged = ''.join(work)
425 merged = ''.join(work)
426 while ';' in merged:
426 while ';' in merged:
427 one, merged = merged.split(';', 1)
427 one, merged = merged.split(';', 1)
428 yield unescapearg(one)
428 yield unescapearg(one)
429 chunk = rsp.read(1024)
429 chunk = rsp.read(1024)
430 work = [merged, chunk]
430 work = [merged, chunk]
431 yield unescapearg(''.join(work))
431 yield unescapearg(''.join(work))
432
432
433 def _submitone(self, op, args):
433 def _submitone(self, op, args):
434 return self._call(op, **pycompat.strkwargs(args))
434 return self._call(op, **pycompat.strkwargs(args))
435
435
436 def debugwireargs(self, one, two, three=None, four=None, five=None):
436 def debugwireargs(self, one, two, three=None, four=None, five=None):
437 # don't pass optional arguments left at their default value
437 # don't pass optional arguments left at their default value
438 opts = {}
438 opts = {}
439 if three is not None:
439 if three is not None:
440 opts[r'three'] = three
440 opts[r'three'] = three
441 if four is not None:
441 if four is not None:
442 opts[r'four'] = four
442 opts[r'four'] = four
443 return self._call('debugwireargs', one=one, two=two, **opts)
443 return self._call('debugwireargs', one=one, two=two, **opts)
444
444
445 def _call(self, cmd, **args):
445 def _call(self, cmd, **args):
446 """execute <cmd> on the server
446 """execute <cmd> on the server
447
447
448 The command is expected to return a simple string.
448 The command is expected to return a simple string.
449
449
450 returns the server reply as a string."""
450 returns the server reply as a string."""
451 raise NotImplementedError()
451 raise NotImplementedError()
452
452
453 def _callstream(self, cmd, **args):
453 def _callstream(self, cmd, **args):
454 """execute <cmd> on the server
454 """execute <cmd> on the server
455
455
456 The command is expected to return a stream. Note that if the
456 The command is expected to return a stream. Note that if the
457 command doesn't return a stream, _callstream behaves
457 command doesn't return a stream, _callstream behaves
458 differently for ssh and http peers.
458 differently for ssh and http peers.
459
459
460 returns the server reply as a file like object.
460 returns the server reply as a file like object.
461 """
461 """
462 raise NotImplementedError()
462 raise NotImplementedError()
463
463
464 def _callcompressable(self, cmd, **args):
464 def _callcompressable(self, cmd, **args):
465 """execute <cmd> on the server
465 """execute <cmd> on the server
466
466
467 The command is expected to return a stream.
467 The command is expected to return a stream.
468
468
469 The stream may have been compressed in some implementations. This
469 The stream may have been compressed in some implementations. This
470 function takes care of the decompression. This is the only difference
470 function takes care of the decompression. This is the only difference
471 with _callstream.
471 with _callstream.
472
472
473 returns the server reply as a file like object.
473 returns the server reply as a file like object.
474 """
474 """
475 raise NotImplementedError()
475 raise NotImplementedError()
476
476
477 def _callpush(self, cmd, fp, **args):
477 def _callpush(self, cmd, fp, **args):
478 """execute a <cmd> on server
478 """execute a <cmd> on server
479
479
480 The command is expected to be related to a push. Push has a special
480 The command is expected to be related to a push. Push has a special
481 return method.
481 return method.
482
482
483 returns the server reply as a (ret, output) tuple. ret is either
483 returns the server reply as a (ret, output) tuple. ret is either
484 empty (error) or a stringified int.
484 empty (error) or a stringified int.
485 """
485 """
486 raise NotImplementedError()
486 raise NotImplementedError()
487
487
488 def _calltwowaystream(self, cmd, fp, **args):
488 def _calltwowaystream(self, cmd, fp, **args):
489 """execute <cmd> on server
489 """execute <cmd> on server
490
490
491 The command will send a stream to the server and get a stream in reply.
491 The command will send a stream to the server and get a stream in reply.
492 """
492 """
493 raise NotImplementedError()
493 raise NotImplementedError()
494
494
495 def _abort(self, exception):
495 def _abort(self, exception):
496 """clearly abort the wire protocol connection and raise the exception
496 """clearly abort the wire protocol connection and raise the exception
497 """
497 """
498 raise NotImplementedError()
498 raise NotImplementedError()
499
499
500 # server side
500 # server side
501
501
502 # wire protocol command can either return a string or one of these classes.
502 # wire protocol command can either return a string or one of these classes.
503
503
504 def getdispatchrepo(repo, proto, command):
504 def getdispatchrepo(repo, proto, command):
505 """Obtain the repo used for processing wire protocol commands.
505 """Obtain the repo used for processing wire protocol commands.
506
506
507 The intent of this function is to serve as a monkeypatch point for
507 The intent of this function is to serve as a monkeypatch point for
508 extensions that need commands to operate on different repo views under
508 extensions that need commands to operate on different repo views under
509 specialized circumstances.
509 specialized circumstances.
510 """
510 """
511 return repo.filtered('served')
511 return repo.filtered('served')
512
512
513 def dispatch(repo, proto, command):
513 def dispatch(repo, proto, command):
514 repo = getdispatchrepo(repo, proto, command)
514 repo = getdispatchrepo(repo, proto, command)
515
515
516 transportversion = wireprototypes.TRANSPORTS[proto.name]['version']
516 transportversion = wireprototypes.TRANSPORTS[proto.name]['version']
517 commandtable = commandsv2 if transportversion == 2 else commands
517 commandtable = commandsv2 if transportversion == 2 else commands
518 func, spec = commandtable[command]
518 func, spec = commandtable[command]
519
519
520 args = proto.getargs(spec)
520 args = proto.getargs(spec)
521
521
522 # Version 1 protocols define arguments as a list. Version 2 uses a dict.
522 # Version 1 protocols define arguments as a list. Version 2 uses a dict.
523 if isinstance(args, list):
523 if isinstance(args, list):
524 return func(repo, proto, *args)
524 return func(repo, proto, *args)
525 elif isinstance(args, dict):
525 elif isinstance(args, dict):
526 return func(repo, proto, **args)
526 return func(repo, proto, **args)
527 else:
527 else:
528 raise error.ProgrammingError('unexpected type returned from '
528 raise error.ProgrammingError('unexpected type returned from '
529 'proto.getargs(): %s' % type(args))
529 'proto.getargs(): %s' % type(args))
530
530
531 def options(cmd, keys, others):
531 def options(cmd, keys, others):
532 opts = {}
532 opts = {}
533 for k in keys:
533 for k in keys:
534 if k in others:
534 if k in others:
535 opts[k] = others[k]
535 opts[k] = others[k]
536 del others[k]
536 del others[k]
537 if others:
537 if others:
538 procutil.stderr.write("warning: %s ignored unexpected arguments %s\n"
538 procutil.stderr.write("warning: %s ignored unexpected arguments %s\n"
539 % (cmd, ",".join(others)))
539 % (cmd, ",".join(others)))
540 return opts
540 return opts
541
541
542 def bundle1allowed(repo, action):
542 def bundle1allowed(repo, action):
543 """Whether a bundle1 operation is allowed from the server.
543 """Whether a bundle1 operation is allowed from the server.
544
544
545 Priority is:
545 Priority is:
546
546
547 1. server.bundle1gd.<action> (if generaldelta active)
547 1. server.bundle1gd.<action> (if generaldelta active)
548 2. server.bundle1.<action>
548 2. server.bundle1.<action>
549 3. server.bundle1gd (if generaldelta active)
549 3. server.bundle1gd (if generaldelta active)
550 4. server.bundle1
550 4. server.bundle1
551 """
551 """
552 ui = repo.ui
552 ui = repo.ui
553 gd = 'generaldelta' in repo.requirements
553 gd = 'generaldelta' in repo.requirements
554
554
555 if gd:
555 if gd:
556 v = ui.configbool('server', 'bundle1gd.%s' % action)
556 v = ui.configbool('server', 'bundle1gd.%s' % action)
557 if v is not None:
557 if v is not None:
558 return v
558 return v
559
559
560 v = ui.configbool('server', 'bundle1.%s' % action)
560 v = ui.configbool('server', 'bundle1.%s' % action)
561 if v is not None:
561 if v is not None:
562 return v
562 return v
563
563
564 if gd:
564 if gd:
565 v = ui.configbool('server', 'bundle1gd')
565 v = ui.configbool('server', 'bundle1gd')
566 if v is not None:
566 if v is not None:
567 return v
567 return v
568
568
569 return ui.configbool('server', 'bundle1')
569 return ui.configbool('server', 'bundle1')
570
570
571 def supportedcompengines(ui, role):
571 def supportedcompengines(ui, role):
572 """Obtain the list of supported compression engines for a request."""
572 """Obtain the list of supported compression engines for a request."""
573 assert role in (util.CLIENTROLE, util.SERVERROLE)
573 assert role in (util.CLIENTROLE, util.SERVERROLE)
574
574
575 compengines = util.compengines.supportedwireengines(role)
575 compengines = util.compengines.supportedwireengines(role)
576
576
577 # Allow config to override default list and ordering.
577 # Allow config to override default list and ordering.
578 if role == util.SERVERROLE:
578 if role == util.SERVERROLE:
579 configengines = ui.configlist('server', 'compressionengines')
579 configengines = ui.configlist('server', 'compressionengines')
580 config = 'server.compressionengines'
580 config = 'server.compressionengines'
581 else:
581 else:
582 # This is currently implemented mainly to facilitate testing. In most
582 # This is currently implemented mainly to facilitate testing. In most
583 # cases, the server should be in charge of choosing a compression engine
583 # cases, the server should be in charge of choosing a compression engine
584 # because a server has the most to lose from a sub-optimal choice. (e.g.
584 # because a server has the most to lose from a sub-optimal choice. (e.g.
585 # CPU DoS due to an expensive engine or a network DoS due to poor
585 # CPU DoS due to an expensive engine or a network DoS due to poor
586 # compression ratio).
586 # compression ratio).
587 configengines = ui.configlist('experimental',
587 configengines = ui.configlist('experimental',
588 'clientcompressionengines')
588 'clientcompressionengines')
589 config = 'experimental.clientcompressionengines'
589 config = 'experimental.clientcompressionengines'
590
590
591 # No explicit config. Filter out the ones that aren't supposed to be
591 # No explicit config. Filter out the ones that aren't supposed to be
592 # advertised and return default ordering.
592 # advertised and return default ordering.
593 if not configengines:
593 if not configengines:
594 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
594 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
595 return [e for e in compengines
595 return [e for e in compengines
596 if getattr(e.wireprotosupport(), attr) > 0]
596 if getattr(e.wireprotosupport(), attr) > 0]
597
597
598 # If compression engines are listed in the config, assume there is a good
598 # If compression engines are listed in the config, assume there is a good
599 # reason for it (like server operators wanting to achieve specific
599 # reason for it (like server operators wanting to achieve specific
600 # performance characteristics). So fail fast if the config references
600 # performance characteristics). So fail fast if the config references
601 # unusable compression engines.
601 # unusable compression engines.
602 validnames = set(e.name() for e in compengines)
602 validnames = set(e.name() for e in compengines)
603 invalidnames = set(e for e in configengines if e not in validnames)
603 invalidnames = set(e for e in configengines if e not in validnames)
604 if invalidnames:
604 if invalidnames:
605 raise error.Abort(_('invalid compression engine defined in %s: %s') %
605 raise error.Abort(_('invalid compression engine defined in %s: %s') %
606 (config, ', '.join(sorted(invalidnames))))
606 (config, ', '.join(sorted(invalidnames))))
607
607
608 compengines = [e for e in compengines if e.name() in configengines]
608 compengines = [e for e in compengines if e.name() in configengines]
609 compengines = sorted(compengines,
609 compengines = sorted(compengines,
610 key=lambda e: configengines.index(e.name()))
610 key=lambda e: configengines.index(e.name()))
611
611
612 if not compengines:
612 if not compengines:
613 raise error.Abort(_('%s config option does not specify any known '
613 raise error.Abort(_('%s config option does not specify any known '
614 'compression engines') % config,
614 'compression engines') % config,
615 hint=_('usable compression engines: %s') %
615 hint=_('usable compression engines: %s') %
616 ', '.sorted(validnames))
616 ', '.sorted(validnames))
617
617
618 return compengines
618 return compengines
619
619
620 class commandentry(object):
620 class commandentry(object):
621 """Represents a declared wire protocol command."""
621 """Represents a declared wire protocol command."""
622 def __init__(self, func, args='', transports=None,
622 def __init__(self, func, args='', transports=None,
623 permission='push'):
623 permission='push'):
624 self.func = func
624 self.func = func
625 self.args = args
625 self.args = args
626 self.transports = transports or set()
626 self.transports = transports or set()
627 self.permission = permission
627 self.permission = permission
628
628
629 def _merge(self, func, args):
629 def _merge(self, func, args):
630 """Merge this instance with an incoming 2-tuple.
630 """Merge this instance with an incoming 2-tuple.
631
631
632 This is called when a caller using the old 2-tuple API attempts
632 This is called when a caller using the old 2-tuple API attempts
633 to replace an instance. The incoming values are merged with
633 to replace an instance. The incoming values are merged with
634 data not captured by the 2-tuple and a new instance containing
634 data not captured by the 2-tuple and a new instance containing
635 the union of the two objects is returned.
635 the union of the two objects is returned.
636 """
636 """
637 return commandentry(func, args=args, transports=set(self.transports),
637 return commandentry(func, args=args, transports=set(self.transports),
638 permission=self.permission)
638 permission=self.permission)
639
639
640 # Old code treats instances as 2-tuples. So expose that interface.
640 # Old code treats instances as 2-tuples. So expose that interface.
641 def __iter__(self):
641 def __iter__(self):
642 yield self.func
642 yield self.func
643 yield self.args
643 yield self.args
644
644
645 def __getitem__(self, i):
645 def __getitem__(self, i):
646 if i == 0:
646 if i == 0:
647 return self.func
647 return self.func
648 elif i == 1:
648 elif i == 1:
649 return self.args
649 return self.args
650 else:
650 else:
651 raise IndexError('can only access elements 0 and 1')
651 raise IndexError('can only access elements 0 and 1')
652
652
653 class commanddict(dict):
653 class commanddict(dict):
654 """Container for registered wire protocol commands.
654 """Container for registered wire protocol commands.
655
655
656 It behaves like a dict. But __setitem__ is overwritten to allow silent
656 It behaves like a dict. But __setitem__ is overwritten to allow silent
657 coercion of values from 2-tuples for API compatibility.
657 coercion of values from 2-tuples for API compatibility.
658 """
658 """
659 def __setitem__(self, k, v):
659 def __setitem__(self, k, v):
660 if isinstance(v, commandentry):
660 if isinstance(v, commandentry):
661 pass
661 pass
662 # Cast 2-tuples to commandentry instances.
662 # Cast 2-tuples to commandentry instances.
663 elif isinstance(v, tuple):
663 elif isinstance(v, tuple):
664 if len(v) != 2:
664 if len(v) != 2:
665 raise ValueError('command tuples must have exactly 2 elements')
665 raise ValueError('command tuples must have exactly 2 elements')
666
666
667 # It is common for extensions to wrap wire protocol commands via
667 # It is common for extensions to wrap wire protocol commands via
668 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
668 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
669 # doing this aren't aware of the new API that uses objects to store
669 # doing this aren't aware of the new API that uses objects to store
670 # command entries, we automatically merge old state with new.
670 # command entries, we automatically merge old state with new.
671 if k in self:
671 if k in self:
672 v = self[k]._merge(v[0], v[1])
672 v = self[k]._merge(v[0], v[1])
673 else:
673 else:
674 # Use default values from @wireprotocommand.
674 # Use default values from @wireprotocommand.
675 v = commandentry(v[0], args=v[1],
675 v = commandentry(v[0], args=v[1],
676 transports=set(wireprototypes.TRANSPORTS),
676 transports=set(wireprototypes.TRANSPORTS),
677 permission='push')
677 permission='push')
678 else:
678 else:
679 raise ValueError('command entries must be commandentry instances '
679 raise ValueError('command entries must be commandentry instances '
680 'or 2-tuples')
680 'or 2-tuples')
681
681
682 return super(commanddict, self).__setitem__(k, v)
682 return super(commanddict, self).__setitem__(k, v)
683
683
684 def commandavailable(self, command, proto):
684 def commandavailable(self, command, proto):
685 """Determine if a command is available for the requested protocol."""
685 """Determine if a command is available for the requested protocol."""
686 assert proto.name in wireprototypes.TRANSPORTS
686 assert proto.name in wireprototypes.TRANSPORTS
687
687
688 entry = self.get(command)
688 entry = self.get(command)
689
689
690 if not entry:
690 if not entry:
691 return False
691 return False
692
692
693 if proto.name not in entry.transports:
693 if proto.name not in entry.transports:
694 return False
694 return False
695
695
696 return True
696 return True
697
697
698 # Constants specifying which transports a wire protocol command should be
698 # Constants specifying which transports a wire protocol command should be
699 # available on. For use with @wireprotocommand.
699 # available on. For use with @wireprotocommand.
700 POLICY_ALL = 'all'
700 POLICY_ALL = 'all'
701 POLICY_V1_ONLY = 'v1-only'
701 POLICY_V1_ONLY = 'v1-only'
702 POLICY_V2_ONLY = 'v2-only'
702 POLICY_V2_ONLY = 'v2-only'
703
703
704 # For version 1 transports.
704 # For version 1 transports.
705 commands = commanddict()
705 commands = commanddict()
706
706
707 # For version 2 transports.
707 # For version 2 transports.
708 commandsv2 = commanddict()
708 commandsv2 = commanddict()
709
709
710 def wireprotocommand(name, args='', transportpolicy=POLICY_ALL,
710 def wireprotocommand(name, args='', transportpolicy=POLICY_ALL,
711 permission='push'):
711 permission='push'):
712 """Decorator to declare a wire protocol command.
712 """Decorator to declare a wire protocol command.
713
713
714 ``name`` is the name of the wire protocol command being provided.
714 ``name`` is the name of the wire protocol command being provided.
715
715
716 ``args`` is a space-delimited list of named arguments that the command
716 ``args`` is a space-delimited list of named arguments that the command
717 accepts. ``*`` is a special value that says to accept all arguments.
717 accepts. ``*`` is a special value that says to accept all arguments.
718
718
719 ``transportpolicy`` is a POLICY_* constant denoting which transports
719 ``transportpolicy`` is a POLICY_* constant denoting which transports
720 this wire protocol command should be exposed to. By default, commands
720 this wire protocol command should be exposed to. By default, commands
721 are exposed to all wire protocol transports.
721 are exposed to all wire protocol transports.
722
722
723 ``permission`` defines the permission type needed to run this command.
723 ``permission`` defines the permission type needed to run this command.
724 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
724 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
725 respectively. Default is to assume command requires ``push`` permissions
725 respectively. Default is to assume command requires ``push`` permissions
726 because otherwise commands not declaring their permissions could modify
726 because otherwise commands not declaring their permissions could modify
727 a repository that is supposed to be read-only.
727 a repository that is supposed to be read-only.
728 """
728 """
729 if transportpolicy == POLICY_ALL:
729 if transportpolicy == POLICY_ALL:
730 transports = set(wireprototypes.TRANSPORTS)
730 transports = set(wireprototypes.TRANSPORTS)
731 transportversions = {1, 2}
731 transportversions = {1, 2}
732 elif transportpolicy == POLICY_V1_ONLY:
732 elif transportpolicy == POLICY_V1_ONLY:
733 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
733 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
734 if v['version'] == 1}
734 if v['version'] == 1}
735 transportversions = {1}
735 transportversions = {1}
736 elif transportpolicy == POLICY_V2_ONLY:
736 elif transportpolicy == POLICY_V2_ONLY:
737 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
737 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
738 if v['version'] == 2}
738 if v['version'] == 2}
739 transportversions = {2}
739 transportversions = {2}
740 else:
740 else:
741 raise error.ProgrammingError('invalid transport policy value: %s' %
741 raise error.ProgrammingError('invalid transport policy value: %s' %
742 transportpolicy)
742 transportpolicy)
743
743
744 # Because SSHv2 is a mirror of SSHv1, we allow "batch" commands through to
744 # Because SSHv2 is a mirror of SSHv1, we allow "batch" commands through to
745 # SSHv2.
745 # SSHv2.
746 # TODO undo this hack when SSH is using the unified frame protocol.
746 # TODO undo this hack when SSH is using the unified frame protocol.
747 if name == b'batch':
747 if name == b'batch':
748 transports.add(wireprototypes.SSHV2)
748 transports.add(wireprototypes.SSHV2)
749
749
750 if permission not in ('push', 'pull'):
750 if permission not in ('push', 'pull'):
751 raise error.ProgrammingError('invalid wire protocol permission; '
751 raise error.ProgrammingError('invalid wire protocol permission; '
752 'got %s; expected "push" or "pull"' %
752 'got %s; expected "push" or "pull"' %
753 permission)
753 permission)
754
754
755 def register(func):
755 def register(func):
756 if 1 in transportversions:
756 if 1 in transportversions:
757 if name in commands:
757 if name in commands:
758 raise error.ProgrammingError('%s command already registered '
758 raise error.ProgrammingError('%s command already registered '
759 'for version 1' % name)
759 'for version 1' % name)
760 commands[name] = commandentry(func, args=args,
760 commands[name] = commandentry(func, args=args,
761 transports=transports,
761 transports=transports,
762 permission=permission)
762 permission=permission)
763 if 2 in transportversions:
763 if 2 in transportversions:
764 if name in commandsv2:
764 if name in commandsv2:
765 raise error.ProgrammingError('%s command already registered '
765 raise error.ProgrammingError('%s command already registered '
766 'for version 2' % name)
766 'for version 2' % name)
767 commandsv2[name] = commandentry(func, args=args,
767 commandsv2[name] = commandentry(func, args=args,
768 transports=transports,
768 transports=transports,
769 permission=permission)
769 permission=permission)
770
770
771 return func
771 return func
772 return register
772 return register
773
773
774 # TODO define a more appropriate permissions type to use for this.
774 # TODO define a more appropriate permissions type to use for this.
775 @wireprotocommand('batch', 'cmds *', permission='pull',
775 @wireprotocommand('batch', 'cmds *', permission='pull',
776 transportpolicy=POLICY_V1_ONLY)
776 transportpolicy=POLICY_V1_ONLY)
777 def batch(repo, proto, cmds, others):
777 def batch(repo, proto, cmds, others):
778 repo = repo.filtered("served")
778 repo = repo.filtered("served")
779 res = []
779 res = []
780 for pair in cmds.split(';'):
780 for pair in cmds.split(';'):
781 op, args = pair.split(' ', 1)
781 op, args = pair.split(' ', 1)
782 vals = {}
782 vals = {}
783 for a in args.split(','):
783 for a in args.split(','):
784 if a:
784 if a:
785 n, v = a.split('=')
785 n, v = a.split('=')
786 vals[unescapearg(n)] = unescapearg(v)
786 vals[unescapearg(n)] = unescapearg(v)
787 func, spec = commands[op]
787 func, spec = commands[op]
788
788
789 # Validate that client has permissions to perform this command.
789 # Validate that client has permissions to perform this command.
790 perm = commands[op].permission
790 perm = commands[op].permission
791 assert perm in ('push', 'pull')
791 assert perm in ('push', 'pull')
792 proto.checkperm(perm)
792 proto.checkperm(perm)
793
793
794 if spec:
794 if spec:
795 keys = spec.split()
795 keys = spec.split()
796 data = {}
796 data = {}
797 for k in keys:
797 for k in keys:
798 if k == '*':
798 if k == '*':
799 star = {}
799 star = {}
800 for key in vals.keys():
800 for key in vals.keys():
801 if key not in keys:
801 if key not in keys:
802 star[key] = vals[key]
802 star[key] = vals[key]
803 data['*'] = star
803 data['*'] = star
804 else:
804 else:
805 data[k] = vals[k]
805 data[k] = vals[k]
806 result = func(repo, proto, *[data[k] for k in keys])
806 result = func(repo, proto, *[data[k] for k in keys])
807 else:
807 else:
808 result = func(repo, proto)
808 result = func(repo, proto)
809 if isinstance(result, wireprototypes.ooberror):
809 if isinstance(result, wireprototypes.ooberror):
810 return result
810 return result
811
811
812 # For now, all batchable commands must return bytesresponse or
812 # For now, all batchable commands must return bytesresponse or
813 # raw bytes (for backwards compatibility).
813 # raw bytes (for backwards compatibility).
814 assert isinstance(result, (wireprototypes.bytesresponse, bytes))
814 assert isinstance(result, (wireprototypes.bytesresponse, bytes))
815 if isinstance(result, wireprototypes.bytesresponse):
815 if isinstance(result, wireprototypes.bytesresponse):
816 result = result.data
816 result = result.data
817 res.append(escapearg(result))
817 res.append(escapearg(result))
818
818
819 return wireprototypes.bytesresponse(';'.join(res))
819 return wireprototypes.bytesresponse(';'.join(res))
820
820
821 @wireprotocommand('between', 'pairs', transportpolicy=POLICY_V1_ONLY,
821 @wireprotocommand('between', 'pairs', transportpolicy=POLICY_V1_ONLY,
822 permission='pull')
822 permission='pull')
823 def between(repo, proto, pairs):
823 def between(repo, proto, pairs):
824 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
824 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
825 r = []
825 r = []
826 for b in repo.between(pairs):
826 for b in repo.between(pairs):
827 r.append(encodelist(b) + "\n")
827 r.append(encodelist(b) + "\n")
828
828
829 return wireprototypes.bytesresponse(''.join(r))
829 return wireprototypes.bytesresponse(''.join(r))
830
830
831 @wireprotocommand('branchmap', permission='pull',
831 @wireprotocommand('branchmap', permission='pull',
832 transportpolicy=POLICY_V1_ONLY)
832 transportpolicy=POLICY_V1_ONLY)
833 def branchmap(repo, proto):
833 def branchmap(repo, proto):
834 branchmap = repo.branchmap()
834 branchmap = repo.branchmap()
835 heads = []
835 heads = []
836 for branch, nodes in branchmap.iteritems():
836 for branch, nodes in branchmap.iteritems():
837 branchname = urlreq.quote(encoding.fromlocal(branch))
837 branchname = urlreq.quote(encoding.fromlocal(branch))
838 branchnodes = encodelist(nodes)
838 branchnodes = encodelist(nodes)
839 heads.append('%s %s' % (branchname, branchnodes))
839 heads.append('%s %s' % (branchname, branchnodes))
840
840
841 return wireprototypes.bytesresponse('\n'.join(heads))
841 return wireprototypes.bytesresponse('\n'.join(heads))
842
842
843 @wireprotocommand('branches', 'nodes', transportpolicy=POLICY_V1_ONLY,
843 @wireprotocommand('branches', 'nodes', transportpolicy=POLICY_V1_ONLY,
844 permission='pull')
844 permission='pull')
845 def branches(repo, proto, nodes):
845 def branches(repo, proto, nodes):
846 nodes = decodelist(nodes)
846 nodes = decodelist(nodes)
847 r = []
847 r = []
848 for b in repo.branches(nodes):
848 for b in repo.branches(nodes):
849 r.append(encodelist(b) + "\n")
849 r.append(encodelist(b) + "\n")
850
850
851 return wireprototypes.bytesresponse(''.join(r))
851 return wireprototypes.bytesresponse(''.join(r))
852
852
853 @wireprotocommand('clonebundles', '', permission='pull')
853 @wireprotocommand('clonebundles', '', permission='pull')
854 def clonebundles(repo, proto):
854 def clonebundles(repo, proto):
855 """Server command for returning info for available bundles to seed clones.
855 """Server command for returning info for available bundles to seed clones.
856
856
857 Clients will parse this response and determine what bundle to fetch.
857 Clients will parse this response and determine what bundle to fetch.
858
858
859 Extensions may wrap this command to filter or dynamically emit data
859 Extensions may wrap this command to filter or dynamically emit data
860 depending on the request. e.g. you could advertise URLs for the closest
860 depending on the request. e.g. you could advertise URLs for the closest
861 data center given the client's IP address.
861 data center given the client's IP address.
862 """
862 """
863 return wireprototypes.bytesresponse(
863 return wireprototypes.bytesresponse(
864 repo.vfs.tryread('clonebundles.manifest'))
864 repo.vfs.tryread('clonebundles.manifest'))
865
865
866 wireprotocaps = ['lookup', 'branchmap', 'pushkey',
866 wireprotocaps = ['lookup', 'branchmap', 'pushkey',
867 'known', 'getbundle', 'unbundlehash']
867 'known', 'getbundle', 'unbundlehash']
868
868
869 def _capabilities(repo, proto):
869 def _capabilities(repo, proto):
870 """return a list of capabilities for a repo
870 """return a list of capabilities for a repo
871
871
872 This function exists to allow extensions to easily wrap capabilities
872 This function exists to allow extensions to easily wrap capabilities
873 computation
873 computation
874
874
875 - returns a lists: easy to alter
875 - returns a lists: easy to alter
876 - change done here will be propagated to both `capabilities` and `hello`
876 - change done here will be propagated to both `capabilities` and `hello`
877 command without any other action needed.
877 command without any other action needed.
878 """
878 """
879 # copy to prevent modification of the global list
879 # copy to prevent modification of the global list
880 caps = list(wireprotocaps)
880 caps = list(wireprotocaps)
881
881
882 # Command of same name as capability isn't exposed to version 1 of
882 # Command of same name as capability isn't exposed to version 1 of
883 # transports. So conditionally add it.
883 # transports. So conditionally add it.
884 if commands.commandavailable('changegroupsubset', proto):
884 if commands.commandavailable('changegroupsubset', proto):
885 caps.append('changegroupsubset')
885 caps.append('changegroupsubset')
886
886
887 if streamclone.allowservergeneration(repo):
887 if streamclone.allowservergeneration(repo):
888 if repo.ui.configbool('server', 'preferuncompressed'):
888 if repo.ui.configbool('server', 'preferuncompressed'):
889 caps.append('stream-preferred')
889 caps.append('stream-preferred')
890 requiredformats = repo.requirements & repo.supportedformats
890 requiredformats = repo.requirements & repo.supportedformats
891 # if our local revlogs are just revlogv1, add 'stream' cap
891 # if our local revlogs are just revlogv1, add 'stream' cap
892 if not requiredformats - {'revlogv1'}:
892 if not requiredformats - {'revlogv1'}:
893 caps.append('stream')
893 caps.append('stream')
894 # otherwise, add 'streamreqs' detailing our local revlog format
894 # otherwise, add 'streamreqs' detailing our local revlog format
895 else:
895 else:
896 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
896 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
897 if repo.ui.configbool('experimental', 'bundle2-advertise'):
897 if repo.ui.configbool('experimental', 'bundle2-advertise'):
898 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
898 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
899 caps.append('bundle2=' + urlreq.quote(capsblob))
899 caps.append('bundle2=' + urlreq.quote(capsblob))
900 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
900 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
901
901
902 return proto.addcapabilities(repo, caps)
902 return proto.addcapabilities(repo, caps)
903
903
904 # If you are writing an extension and consider wrapping this function. Wrap
904 # If you are writing an extension and consider wrapping this function. Wrap
905 # `_capabilities` instead.
905 # `_capabilities` instead.
906 @wireprotocommand('capabilities', permission='pull')
906 @wireprotocommand('capabilities', permission='pull')
907 def capabilities(repo, proto):
907 def capabilities(repo, proto):
908 caps = _capabilities(repo, proto)
908 caps = _capabilities(repo, proto)
909 return wireprototypes.bytesresponse(' '.join(sorted(caps)))
909 return wireprototypes.bytesresponse(' '.join(sorted(caps)))
910
910
911 @wireprotocommand('changegroup', 'roots', transportpolicy=POLICY_V1_ONLY,
911 @wireprotocommand('changegroup', 'roots', transportpolicy=POLICY_V1_ONLY,
912 permission='pull')
912 permission='pull')
913 def changegroup(repo, proto, roots):
913 def changegroup(repo, proto, roots):
914 nodes = decodelist(roots)
914 nodes = decodelist(roots)
915 outgoing = discovery.outgoing(repo, missingroots=nodes,
915 outgoing = discovery.outgoing(repo, missingroots=nodes,
916 missingheads=repo.heads())
916 missingheads=repo.heads())
917 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
917 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
918 gen = iter(lambda: cg.read(32768), '')
918 gen = iter(lambda: cg.read(32768), '')
919 return wireprototypes.streamres(gen=gen)
919 return wireprototypes.streamres(gen=gen)
920
920
921 @wireprotocommand('changegroupsubset', 'bases heads',
921 @wireprotocommand('changegroupsubset', 'bases heads',
922 transportpolicy=POLICY_V1_ONLY,
922 transportpolicy=POLICY_V1_ONLY,
923 permission='pull')
923 permission='pull')
924 def changegroupsubset(repo, proto, bases, heads):
924 def changegroupsubset(repo, proto, bases, heads):
925 bases = decodelist(bases)
925 bases = decodelist(bases)
926 heads = decodelist(heads)
926 heads = decodelist(heads)
927 outgoing = discovery.outgoing(repo, missingroots=bases,
927 outgoing = discovery.outgoing(repo, missingroots=bases,
928 missingheads=heads)
928 missingheads=heads)
929 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
929 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
930 gen = iter(lambda: cg.read(32768), '')
930 gen = iter(lambda: cg.read(32768), '')
931 return wireprototypes.streamres(gen=gen)
931 return wireprototypes.streamres(gen=gen)
932
932
933 @wireprotocommand('debugwireargs', 'one two *',
933 @wireprotocommand('debugwireargs', 'one two *',
934 permission='pull')
934 permission='pull', transportpolicy=POLICY_V1_ONLY)
935 def debugwireargs(repo, proto, one, two, others):
935 def debugwireargs(repo, proto, one, two, others):
936 # only accept optional args from the known set
936 # only accept optional args from the known set
937 opts = options('debugwireargs', ['three', 'four'], others)
937 opts = options('debugwireargs', ['three', 'four'], others)
938 return wireprototypes.bytesresponse(repo.debugwireargs(
938 return wireprototypes.bytesresponse(repo.debugwireargs(
939 one, two, **pycompat.strkwargs(opts)))
939 one, two, **pycompat.strkwargs(opts)))
940
940
941 @wireprotocommand('getbundle', '*', permission='pull')
941 @wireprotocommand('getbundle', '*', permission='pull')
942 def getbundle(repo, proto, others):
942 def getbundle(repo, proto, others):
943 opts = options('getbundle', gboptsmap.keys(), others)
943 opts = options('getbundle', gboptsmap.keys(), others)
944 for k, v in opts.iteritems():
944 for k, v in opts.iteritems():
945 keytype = gboptsmap[k]
945 keytype = gboptsmap[k]
946 if keytype == 'nodes':
946 if keytype == 'nodes':
947 opts[k] = decodelist(v)
947 opts[k] = decodelist(v)
948 elif keytype == 'csv':
948 elif keytype == 'csv':
949 opts[k] = list(v.split(','))
949 opts[k] = list(v.split(','))
950 elif keytype == 'scsv':
950 elif keytype == 'scsv':
951 opts[k] = set(v.split(','))
951 opts[k] = set(v.split(','))
952 elif keytype == 'boolean':
952 elif keytype == 'boolean':
953 # Client should serialize False as '0', which is a non-empty string
953 # Client should serialize False as '0', which is a non-empty string
954 # so it evaluates as a True bool.
954 # so it evaluates as a True bool.
955 if v == '0':
955 if v == '0':
956 opts[k] = False
956 opts[k] = False
957 else:
957 else:
958 opts[k] = bool(v)
958 opts[k] = bool(v)
959 elif keytype != 'plain':
959 elif keytype != 'plain':
960 raise KeyError('unknown getbundle option type %s'
960 raise KeyError('unknown getbundle option type %s'
961 % keytype)
961 % keytype)
962
962
963 if not bundle1allowed(repo, 'pull'):
963 if not bundle1allowed(repo, 'pull'):
964 if not exchange.bundle2requested(opts.get('bundlecaps')):
964 if not exchange.bundle2requested(opts.get('bundlecaps')):
965 if proto.name == 'http-v1':
965 if proto.name == 'http-v1':
966 return wireprototypes.ooberror(bundle2required)
966 return wireprototypes.ooberror(bundle2required)
967 raise error.Abort(bundle2requiredmain,
967 raise error.Abort(bundle2requiredmain,
968 hint=bundle2requiredhint)
968 hint=bundle2requiredhint)
969
969
970 prefercompressed = True
970 prefercompressed = True
971
971
972 try:
972 try:
973 if repo.ui.configbool('server', 'disablefullbundle'):
973 if repo.ui.configbool('server', 'disablefullbundle'):
974 # Check to see if this is a full clone.
974 # Check to see if this is a full clone.
975 clheads = set(repo.changelog.heads())
975 clheads = set(repo.changelog.heads())
976 changegroup = opts.get('cg', True)
976 changegroup = opts.get('cg', True)
977 heads = set(opts.get('heads', set()))
977 heads = set(opts.get('heads', set()))
978 common = set(opts.get('common', set()))
978 common = set(opts.get('common', set()))
979 common.discard(nullid)
979 common.discard(nullid)
980 if changegroup and not common and clheads == heads:
980 if changegroup and not common and clheads == heads:
981 raise error.Abort(
981 raise error.Abort(
982 _('server has pull-based clones disabled'),
982 _('server has pull-based clones disabled'),
983 hint=_('remove --pull if specified or upgrade Mercurial'))
983 hint=_('remove --pull if specified or upgrade Mercurial'))
984
984
985 info, chunks = exchange.getbundlechunks(repo, 'serve',
985 info, chunks = exchange.getbundlechunks(repo, 'serve',
986 **pycompat.strkwargs(opts))
986 **pycompat.strkwargs(opts))
987 prefercompressed = info.get('prefercompressed', True)
987 prefercompressed = info.get('prefercompressed', True)
988 except error.Abort as exc:
988 except error.Abort as exc:
989 # cleanly forward Abort error to the client
989 # cleanly forward Abort error to the client
990 if not exchange.bundle2requested(opts.get('bundlecaps')):
990 if not exchange.bundle2requested(opts.get('bundlecaps')):
991 if proto.name == 'http-v1':
991 if proto.name == 'http-v1':
992 return wireprototypes.ooberror(pycompat.bytestr(exc) + '\n')
992 return wireprototypes.ooberror(pycompat.bytestr(exc) + '\n')
993 raise # cannot do better for bundle1 + ssh
993 raise # cannot do better for bundle1 + ssh
994 # bundle2 request expect a bundle2 reply
994 # bundle2 request expect a bundle2 reply
995 bundler = bundle2.bundle20(repo.ui)
995 bundler = bundle2.bundle20(repo.ui)
996 manargs = [('message', pycompat.bytestr(exc))]
996 manargs = [('message', pycompat.bytestr(exc))]
997 advargs = []
997 advargs = []
998 if exc.hint is not None:
998 if exc.hint is not None:
999 advargs.append(('hint', exc.hint))
999 advargs.append(('hint', exc.hint))
1000 bundler.addpart(bundle2.bundlepart('error:abort',
1000 bundler.addpart(bundle2.bundlepart('error:abort',
1001 manargs, advargs))
1001 manargs, advargs))
1002 chunks = bundler.getchunks()
1002 chunks = bundler.getchunks()
1003 prefercompressed = False
1003 prefercompressed = False
1004
1004
1005 return wireprototypes.streamres(
1005 return wireprototypes.streamres(
1006 gen=chunks, prefer_uncompressed=not prefercompressed)
1006 gen=chunks, prefer_uncompressed=not prefercompressed)
1007
1007
1008 @wireprotocommand('heads', permission='pull', transportpolicy=POLICY_V1_ONLY)
1008 @wireprotocommand('heads', permission='pull', transportpolicy=POLICY_V1_ONLY)
1009 def heads(repo, proto):
1009 def heads(repo, proto):
1010 h = repo.heads()
1010 h = repo.heads()
1011 return wireprototypes.bytesresponse(encodelist(h) + '\n')
1011 return wireprototypes.bytesresponse(encodelist(h) + '\n')
1012
1012
1013 @wireprotocommand('hello', permission='pull', transportpolicy=POLICY_V1_ONLY)
1013 @wireprotocommand('hello', permission='pull', transportpolicy=POLICY_V1_ONLY)
1014 def hello(repo, proto):
1014 def hello(repo, proto):
1015 """Called as part of SSH handshake to obtain server info.
1015 """Called as part of SSH handshake to obtain server info.
1016
1016
1017 Returns a list of lines describing interesting things about the
1017 Returns a list of lines describing interesting things about the
1018 server, in an RFC822-like format.
1018 server, in an RFC822-like format.
1019
1019
1020 Currently, the only one defined is ``capabilities``, which consists of a
1020 Currently, the only one defined is ``capabilities``, which consists of a
1021 line of space separated tokens describing server abilities:
1021 line of space separated tokens describing server abilities:
1022
1022
1023 capabilities: <token0> <token1> <token2>
1023 capabilities: <token0> <token1> <token2>
1024 """
1024 """
1025 caps = capabilities(repo, proto).data
1025 caps = capabilities(repo, proto).data
1026 return wireprototypes.bytesresponse('capabilities: %s\n' % caps)
1026 return wireprototypes.bytesresponse('capabilities: %s\n' % caps)
1027
1027
1028 @wireprotocommand('listkeys', 'namespace', permission='pull',
1028 @wireprotocommand('listkeys', 'namespace', permission='pull',
1029 transportpolicy=POLICY_V1_ONLY)
1029 transportpolicy=POLICY_V1_ONLY)
1030 def listkeys(repo, proto, namespace):
1030 def listkeys(repo, proto, namespace):
1031 d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
1031 d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
1032 return wireprototypes.bytesresponse(pushkeymod.encodekeys(d))
1032 return wireprototypes.bytesresponse(pushkeymod.encodekeys(d))
1033
1033
1034 @wireprotocommand('lookup', 'key', permission='pull')
1034 @wireprotocommand('lookup', 'key', permission='pull')
1035 def lookup(repo, proto, key):
1035 def lookup(repo, proto, key):
1036 try:
1036 try:
1037 k = encoding.tolocal(key)
1037 k = encoding.tolocal(key)
1038 n = repo.lookup(k)
1038 n = repo.lookup(k)
1039 r = hex(n)
1039 r = hex(n)
1040 success = 1
1040 success = 1
1041 except Exception as inst:
1041 except Exception as inst:
1042 r = stringutil.forcebytestr(inst)
1042 r = stringutil.forcebytestr(inst)
1043 success = 0
1043 success = 0
1044 return wireprototypes.bytesresponse('%d %s\n' % (success, r))
1044 return wireprototypes.bytesresponse('%d %s\n' % (success, r))
1045
1045
1046 @wireprotocommand('known', 'nodes *', permission='pull',
1046 @wireprotocommand('known', 'nodes *', permission='pull',
1047 transportpolicy=POLICY_V1_ONLY)
1047 transportpolicy=POLICY_V1_ONLY)
1048 def known(repo, proto, nodes, others):
1048 def known(repo, proto, nodes, others):
1049 v = ''.join(b and '1' or '0' for b in repo.known(decodelist(nodes)))
1049 v = ''.join(b and '1' or '0' for b in repo.known(decodelist(nodes)))
1050 return wireprototypes.bytesresponse(v)
1050 return wireprototypes.bytesresponse(v)
1051
1051
1052 @wireprotocommand('protocaps', 'caps', permission='pull',
1052 @wireprotocommand('protocaps', 'caps', permission='pull',
1053 transportpolicy=POLICY_V1_ONLY)
1053 transportpolicy=POLICY_V1_ONLY)
1054 def protocaps(repo, proto, caps):
1054 def protocaps(repo, proto, caps):
1055 if proto.name == wireprototypes.SSHV1:
1055 if proto.name == wireprototypes.SSHV1:
1056 proto._protocaps = set(caps.split(' '))
1056 proto._protocaps = set(caps.split(' '))
1057 return wireprototypes.bytesresponse('OK')
1057 return wireprototypes.bytesresponse('OK')
1058
1058
1059 @wireprotocommand('pushkey', 'namespace key old new', permission='push')
1059 @wireprotocommand('pushkey', 'namespace key old new', permission='push')
1060 def pushkey(repo, proto, namespace, key, old, new):
1060 def pushkey(repo, proto, namespace, key, old, new):
1061 # compatibility with pre-1.8 clients which were accidentally
1061 # compatibility with pre-1.8 clients which were accidentally
1062 # sending raw binary nodes rather than utf-8-encoded hex
1062 # sending raw binary nodes rather than utf-8-encoded hex
1063 if len(new) == 20 and stringutil.escapestr(new) != new:
1063 if len(new) == 20 and stringutil.escapestr(new) != new:
1064 # looks like it could be a binary node
1064 # looks like it could be a binary node
1065 try:
1065 try:
1066 new.decode('utf-8')
1066 new.decode('utf-8')
1067 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
1067 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
1068 except UnicodeDecodeError:
1068 except UnicodeDecodeError:
1069 pass # binary, leave unmodified
1069 pass # binary, leave unmodified
1070 else:
1070 else:
1071 new = encoding.tolocal(new) # normal path
1071 new = encoding.tolocal(new) # normal path
1072
1072
1073 with proto.mayberedirectstdio() as output:
1073 with proto.mayberedirectstdio() as output:
1074 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
1074 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
1075 encoding.tolocal(old), new) or False
1075 encoding.tolocal(old), new) or False
1076
1076
1077 output = output.getvalue() if output else ''
1077 output = output.getvalue() if output else ''
1078 return wireprototypes.bytesresponse('%d\n%s' % (int(r), output))
1078 return wireprototypes.bytesresponse('%d\n%s' % (int(r), output))
1079
1079
1080 @wireprotocommand('stream_out', permission='pull')
1080 @wireprotocommand('stream_out', permission='pull')
1081 def stream(repo, proto):
1081 def stream(repo, proto):
1082 '''If the server supports streaming clone, it advertises the "stream"
1082 '''If the server supports streaming clone, it advertises the "stream"
1083 capability with a value representing the version and flags of the repo
1083 capability with a value representing the version and flags of the repo
1084 it is serving. Client checks to see if it understands the format.
1084 it is serving. Client checks to see if it understands the format.
1085 '''
1085 '''
1086 return wireprototypes.streamreslegacy(
1086 return wireprototypes.streamreslegacy(
1087 streamclone.generatev1wireproto(repo))
1087 streamclone.generatev1wireproto(repo))
1088
1088
1089 @wireprotocommand('unbundle', 'heads', permission='push')
1089 @wireprotocommand('unbundle', 'heads', permission='push')
1090 def unbundle(repo, proto, heads):
1090 def unbundle(repo, proto, heads):
1091 their_heads = decodelist(heads)
1091 their_heads = decodelist(heads)
1092
1092
1093 with proto.mayberedirectstdio() as output:
1093 with proto.mayberedirectstdio() as output:
1094 try:
1094 try:
1095 exchange.check_heads(repo, their_heads, 'preparing changes')
1095 exchange.check_heads(repo, their_heads, 'preparing changes')
1096 cleanup = lambda: None
1096 cleanup = lambda: None
1097 try:
1097 try:
1098 payload = proto.getpayload()
1098 payload = proto.getpayload()
1099 if repo.ui.configbool('server', 'streamunbundle'):
1099 if repo.ui.configbool('server', 'streamunbundle'):
1100 def cleanup():
1100 def cleanup():
1101 # Ensure that the full payload is consumed, so
1101 # Ensure that the full payload is consumed, so
1102 # that the connection doesn't contain trailing garbage.
1102 # that the connection doesn't contain trailing garbage.
1103 for p in payload:
1103 for p in payload:
1104 pass
1104 pass
1105 fp = util.chunkbuffer(payload)
1105 fp = util.chunkbuffer(payload)
1106 else:
1106 else:
1107 # write bundle data to temporary file as it can be big
1107 # write bundle data to temporary file as it can be big
1108 fp, tempname = None, None
1108 fp, tempname = None, None
1109 def cleanup():
1109 def cleanup():
1110 if fp:
1110 if fp:
1111 fp.close()
1111 fp.close()
1112 if tempname:
1112 if tempname:
1113 os.unlink(tempname)
1113 os.unlink(tempname)
1114 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1114 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1115 repo.ui.debug('redirecting incoming bundle to %s\n' %
1115 repo.ui.debug('redirecting incoming bundle to %s\n' %
1116 tempname)
1116 tempname)
1117 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
1117 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
1118 r = 0
1118 r = 0
1119 for p in payload:
1119 for p in payload:
1120 fp.write(p)
1120 fp.write(p)
1121 fp.seek(0)
1121 fp.seek(0)
1122
1122
1123 gen = exchange.readbundle(repo.ui, fp, None)
1123 gen = exchange.readbundle(repo.ui, fp, None)
1124 if (isinstance(gen, changegroupmod.cg1unpacker)
1124 if (isinstance(gen, changegroupmod.cg1unpacker)
1125 and not bundle1allowed(repo, 'push')):
1125 and not bundle1allowed(repo, 'push')):
1126 if proto.name == 'http-v1':
1126 if proto.name == 'http-v1':
1127 # need to special case http because stderr do not get to
1127 # need to special case http because stderr do not get to
1128 # the http client on failed push so we need to abuse
1128 # the http client on failed push so we need to abuse
1129 # some other error type to make sure the message get to
1129 # some other error type to make sure the message get to
1130 # the user.
1130 # the user.
1131 return wireprototypes.ooberror(bundle2required)
1131 return wireprototypes.ooberror(bundle2required)
1132 raise error.Abort(bundle2requiredmain,
1132 raise error.Abort(bundle2requiredmain,
1133 hint=bundle2requiredhint)
1133 hint=bundle2requiredhint)
1134
1134
1135 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1135 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1136 proto.client())
1136 proto.client())
1137 if util.safehasattr(r, 'addpart'):
1137 if util.safehasattr(r, 'addpart'):
1138 # The return looks streamable, we are in the bundle2 case
1138 # The return looks streamable, we are in the bundle2 case
1139 # and should return a stream.
1139 # and should return a stream.
1140 return wireprototypes.streamreslegacy(gen=r.getchunks())
1140 return wireprototypes.streamreslegacy(gen=r.getchunks())
1141 return wireprototypes.pushres(
1141 return wireprototypes.pushres(
1142 r, output.getvalue() if output else '')
1142 r, output.getvalue() if output else '')
1143
1143
1144 finally:
1144 finally:
1145 cleanup()
1145 cleanup()
1146
1146
1147 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1147 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1148 # handle non-bundle2 case first
1148 # handle non-bundle2 case first
1149 if not getattr(exc, 'duringunbundle2', False):
1149 if not getattr(exc, 'duringunbundle2', False):
1150 try:
1150 try:
1151 raise
1151 raise
1152 except error.Abort:
1152 except error.Abort:
1153 # The old code we moved used procutil.stderr directly.
1153 # The old code we moved used procutil.stderr directly.
1154 # We did not change it to minimise code change.
1154 # We did not change it to minimise code change.
1155 # This need to be moved to something proper.
1155 # This need to be moved to something proper.
1156 # Feel free to do it.
1156 # Feel free to do it.
1157 procutil.stderr.write("abort: %s\n" % exc)
1157 procutil.stderr.write("abort: %s\n" % exc)
1158 if exc.hint is not None:
1158 if exc.hint is not None:
1159 procutil.stderr.write("(%s)\n" % exc.hint)
1159 procutil.stderr.write("(%s)\n" % exc.hint)
1160 procutil.stderr.flush()
1160 procutil.stderr.flush()
1161 return wireprototypes.pushres(
1161 return wireprototypes.pushres(
1162 0, output.getvalue() if output else '')
1162 0, output.getvalue() if output else '')
1163 except error.PushRaced:
1163 except error.PushRaced:
1164 return wireprototypes.pusherr(
1164 return wireprototypes.pusherr(
1165 pycompat.bytestr(exc),
1165 pycompat.bytestr(exc),
1166 output.getvalue() if output else '')
1166 output.getvalue() if output else '')
1167
1167
1168 bundler = bundle2.bundle20(repo.ui)
1168 bundler = bundle2.bundle20(repo.ui)
1169 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1169 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1170 bundler.addpart(out)
1170 bundler.addpart(out)
1171 try:
1171 try:
1172 try:
1172 try:
1173 raise
1173 raise
1174 except error.PushkeyFailed as exc:
1174 except error.PushkeyFailed as exc:
1175 # check client caps
1175 # check client caps
1176 remotecaps = getattr(exc, '_replycaps', None)
1176 remotecaps = getattr(exc, '_replycaps', None)
1177 if (remotecaps is not None
1177 if (remotecaps is not None
1178 and 'pushkey' not in remotecaps.get('error', ())):
1178 and 'pushkey' not in remotecaps.get('error', ())):
1179 # no support remote side, fallback to Abort handler.
1179 # no support remote side, fallback to Abort handler.
1180 raise
1180 raise
1181 part = bundler.newpart('error:pushkey')
1181 part = bundler.newpart('error:pushkey')
1182 part.addparam('in-reply-to', exc.partid)
1182 part.addparam('in-reply-to', exc.partid)
1183 if exc.namespace is not None:
1183 if exc.namespace is not None:
1184 part.addparam('namespace', exc.namespace,
1184 part.addparam('namespace', exc.namespace,
1185 mandatory=False)
1185 mandatory=False)
1186 if exc.key is not None:
1186 if exc.key is not None:
1187 part.addparam('key', exc.key, mandatory=False)
1187 part.addparam('key', exc.key, mandatory=False)
1188 if exc.new is not None:
1188 if exc.new is not None:
1189 part.addparam('new', exc.new, mandatory=False)
1189 part.addparam('new', exc.new, mandatory=False)
1190 if exc.old is not None:
1190 if exc.old is not None:
1191 part.addparam('old', exc.old, mandatory=False)
1191 part.addparam('old', exc.old, mandatory=False)
1192 if exc.ret is not None:
1192 if exc.ret is not None:
1193 part.addparam('ret', exc.ret, mandatory=False)
1193 part.addparam('ret', exc.ret, mandatory=False)
1194 except error.BundleValueError as exc:
1194 except error.BundleValueError as exc:
1195 errpart = bundler.newpart('error:unsupportedcontent')
1195 errpart = bundler.newpart('error:unsupportedcontent')
1196 if exc.parttype is not None:
1196 if exc.parttype is not None:
1197 errpart.addparam('parttype', exc.parttype)
1197 errpart.addparam('parttype', exc.parttype)
1198 if exc.params:
1198 if exc.params:
1199 errpart.addparam('params', '\0'.join(exc.params))
1199 errpart.addparam('params', '\0'.join(exc.params))
1200 except error.Abort as exc:
1200 except error.Abort as exc:
1201 manargs = [('message', stringutil.forcebytestr(exc))]
1201 manargs = [('message', stringutil.forcebytestr(exc))]
1202 advargs = []
1202 advargs = []
1203 if exc.hint is not None:
1203 if exc.hint is not None:
1204 advargs.append(('hint', exc.hint))
1204 advargs.append(('hint', exc.hint))
1205 bundler.addpart(bundle2.bundlepart('error:abort',
1205 bundler.addpart(bundle2.bundlepart('error:abort',
1206 manargs, advargs))
1206 manargs, advargs))
1207 except error.PushRaced as exc:
1207 except error.PushRaced as exc:
1208 bundler.newpart('error:pushraced',
1208 bundler.newpart('error:pushraced',
1209 [('message', stringutil.forcebytestr(exc))])
1209 [('message', stringutil.forcebytestr(exc))])
1210 return wireprototypes.streamreslegacy(gen=bundler.getchunks())
1210 return wireprototypes.streamreslegacy(gen=bundler.getchunks())
1211
1211
1212 # Wire protocol version 2 commands only past this point.
1212 # Wire protocol version 2 commands only past this point.
1213
1213
1214 @wireprotocommand('branchmap', permission='pull',
1214 @wireprotocommand('branchmap', permission='pull',
1215 transportpolicy=POLICY_V2_ONLY)
1215 transportpolicy=POLICY_V2_ONLY)
1216 def branchmapv2(repo, proto):
1216 def branchmapv2(repo, proto):
1217 branchmap = {encoding.fromlocal(k): v
1217 branchmap = {encoding.fromlocal(k): v
1218 for k, v in repo.branchmap().iteritems()}
1218 for k, v in repo.branchmap().iteritems()}
1219
1219
1220 return wireprototypes.cborresponse(branchmap)
1220 return wireprototypes.cborresponse(branchmap)
1221
1221
1222 @wireprotocommand('heads', args='publiconly', permission='pull',
1222 @wireprotocommand('heads', args='publiconly', permission='pull',
1223 transportpolicy=POLICY_V2_ONLY)
1223 transportpolicy=POLICY_V2_ONLY)
1224 def headsv2(repo, proto, publiconly=False):
1224 def headsv2(repo, proto, publiconly=False):
1225 if publiconly:
1225 if publiconly:
1226 repo = repo.filtered('immutable')
1226 repo = repo.filtered('immutable')
1227
1227
1228 return wireprototypes.cborresponse(repo.heads())
1228 return wireprototypes.cborresponse(repo.heads())
1229
1229
1230 @wireprotocommand('known', 'nodes', permission='pull',
1230 @wireprotocommand('known', 'nodes', permission='pull',
1231 transportpolicy=POLICY_V2_ONLY)
1231 transportpolicy=POLICY_V2_ONLY)
1232 def knownv2(repo, proto, nodes=None):
1232 def knownv2(repo, proto, nodes=None):
1233 nodes = nodes or []
1233 nodes = nodes or []
1234 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes))
1234 result = b''.join(b'1' if n else b'0' for n in repo.known(nodes))
1235 return wireprototypes.cborresponse(result)
1235 return wireprototypes.cborresponse(result)
1236
1236
1237 @wireprotocommand('listkeys', 'namespace', permission='pull',
1237 @wireprotocommand('listkeys', 'namespace', permission='pull',
1238 transportpolicy=POLICY_V2_ONLY)
1238 transportpolicy=POLICY_V2_ONLY)
1239 def listkeysv2(repo, proto, namespace=None):
1239 def listkeysv2(repo, proto, namespace=None):
1240 keys = repo.listkeys(encoding.tolocal(namespace))
1240 keys = repo.listkeys(encoding.tolocal(namespace))
1241 keys = {encoding.fromlocal(k): encoding.fromlocal(v)
1241 keys = {encoding.fromlocal(k): encoding.fromlocal(v)
1242 for k, v in keys.iteritems()}
1242 for k, v in keys.iteritems()}
1243
1243
1244 return wireprototypes.cborresponse(keys)
1244 return wireprototypes.cborresponse(keys)
General Comments 0
You need to be logged in to leave comments. Login now