##// END OF EJS Templates
py3: handle keyword arguments correctly in wireproto.py...
Pulkit Goyal -
r35375:7d229241 default
parent child Browse files
Show More
@@ -1,1070 +1,1070 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import os
11 import os
12 import tempfile
12 import tempfile
13
13
14 from .i18n import _
14 from .i18n import _
15 from .node import (
15 from .node import (
16 bin,
16 bin,
17 hex,
17 hex,
18 nullid,
18 nullid,
19 )
19 )
20
20
21 from . import (
21 from . import (
22 bundle2,
22 bundle2,
23 changegroup as changegroupmod,
23 changegroup as changegroupmod,
24 discovery,
24 discovery,
25 encoding,
25 encoding,
26 error,
26 error,
27 exchange,
27 exchange,
28 peer,
28 peer,
29 pushkey as pushkeymod,
29 pushkey as pushkeymod,
30 pycompat,
30 pycompat,
31 repository,
31 repository,
32 streamclone,
32 streamclone,
33 util,
33 util,
34 )
34 )
35
35
36 urlerr = util.urlerr
36 urlerr = util.urlerr
37 urlreq = util.urlreq
37 urlreq = util.urlreq
38
38
39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
41 'IncompatibleClient')
41 'IncompatibleClient')
42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
43
43
44 class abstractserverproto(object):
44 class abstractserverproto(object):
45 """abstract class that summarizes the protocol API
45 """abstract class that summarizes the protocol API
46
46
47 Used as reference and documentation.
47 Used as reference and documentation.
48 """
48 """
49
49
50 def getargs(self, args):
50 def getargs(self, args):
51 """return the value for arguments in <args>
51 """return the value for arguments in <args>
52
52
53 returns a list of values (same order as <args>)"""
53 returns a list of values (same order as <args>)"""
54 raise NotImplementedError()
54 raise NotImplementedError()
55
55
56 def getfile(self, fp):
56 def getfile(self, fp):
57 """write the whole content of a file into a file like object
57 """write the whole content of a file into a file like object
58
58
59 The file is in the form::
59 The file is in the form::
60
60
61 (<chunk-size>\n<chunk>)+0\n
61 (<chunk-size>\n<chunk>)+0\n
62
62
63 chunk size is the ascii version of the int.
63 chunk size is the ascii version of the int.
64 """
64 """
65 raise NotImplementedError()
65 raise NotImplementedError()
66
66
67 def redirect(self):
67 def redirect(self):
68 """may setup interception for stdout and stderr
68 """may setup interception for stdout and stderr
69
69
70 See also the `restore` method."""
70 See also the `restore` method."""
71 raise NotImplementedError()
71 raise NotImplementedError()
72
72
73 # If the `redirect` function does install interception, the `restore`
73 # If the `redirect` function does install interception, the `restore`
74 # function MUST be defined. If interception is not used, this function
74 # function MUST be defined. If interception is not used, this function
75 # MUST NOT be defined.
75 # MUST NOT be defined.
76 #
76 #
77 # left commented here on purpose
77 # left commented here on purpose
78 #
78 #
79 #def restore(self):
79 #def restore(self):
80 # """reinstall previous stdout and stderr and return intercepted stdout
80 # """reinstall previous stdout and stderr and return intercepted stdout
81 # """
81 # """
82 # raise NotImplementedError()
82 # raise NotImplementedError()
83
83
84 class remoteiterbatcher(peer.iterbatcher):
84 class remoteiterbatcher(peer.iterbatcher):
85 def __init__(self, remote):
85 def __init__(self, remote):
86 super(remoteiterbatcher, self).__init__()
86 super(remoteiterbatcher, self).__init__()
87 self._remote = remote
87 self._remote = remote
88
88
89 def __getattr__(self, name):
89 def __getattr__(self, name):
90 # Validate this method is batchable, since submit() only supports
90 # Validate this method is batchable, since submit() only supports
91 # batchable methods.
91 # batchable methods.
92 fn = getattr(self._remote, name)
92 fn = getattr(self._remote, name)
93 if not getattr(fn, 'batchable', None):
93 if not getattr(fn, 'batchable', None):
94 raise error.ProgrammingError('Attempted to batch a non-batchable '
94 raise error.ProgrammingError('Attempted to batch a non-batchable '
95 'call to %r' % name)
95 'call to %r' % name)
96
96
97 return super(remoteiterbatcher, self).__getattr__(name)
97 return super(remoteiterbatcher, self).__getattr__(name)
98
98
99 def submit(self):
99 def submit(self):
100 """Break the batch request into many patch calls and pipeline them.
100 """Break the batch request into many patch calls and pipeline them.
101
101
102 This is mostly valuable over http where request sizes can be
102 This is mostly valuable over http where request sizes can be
103 limited, but can be used in other places as well.
103 limited, but can be used in other places as well.
104 """
104 """
105 # 2-tuple of (command, arguments) that represents what will be
105 # 2-tuple of (command, arguments) that represents what will be
106 # sent over the wire.
106 # sent over the wire.
107 requests = []
107 requests = []
108
108
109 # 4-tuple of (command, final future, @batchable generator, remote
109 # 4-tuple of (command, final future, @batchable generator, remote
110 # future).
110 # future).
111 results = []
111 results = []
112
112
113 for command, args, opts, finalfuture in self.calls:
113 for command, args, opts, finalfuture in self.calls:
114 mtd = getattr(self._remote, command)
114 mtd = getattr(self._remote, command)
115 batchable = mtd.batchable(mtd.__self__, *args, **opts)
115 batchable = mtd.batchable(mtd.__self__, *args, **opts)
116
116
117 commandargs, fremote = next(batchable)
117 commandargs, fremote = next(batchable)
118 assert fremote
118 assert fremote
119 requests.append((command, commandargs))
119 requests.append((command, commandargs))
120 results.append((command, finalfuture, batchable, fremote))
120 results.append((command, finalfuture, batchable, fremote))
121
121
122 if requests:
122 if requests:
123 self._resultiter = self._remote._submitbatch(requests)
123 self._resultiter = self._remote._submitbatch(requests)
124
124
125 self._results = results
125 self._results = results
126
126
127 def results(self):
127 def results(self):
128 for command, finalfuture, batchable, remotefuture in self._results:
128 for command, finalfuture, batchable, remotefuture in self._results:
129 # Get the raw result, set it in the remote future, feed it
129 # Get the raw result, set it in the remote future, feed it
130 # back into the @batchable generator so it can be decoded, and
130 # back into the @batchable generator so it can be decoded, and
131 # set the result on the final future to this value.
131 # set the result on the final future to this value.
132 remoteresult = next(self._resultiter)
132 remoteresult = next(self._resultiter)
133 remotefuture.set(remoteresult)
133 remotefuture.set(remoteresult)
134 finalfuture.set(next(batchable))
134 finalfuture.set(next(batchable))
135
135
136 # Verify our @batchable generators only emit 2 values.
136 # Verify our @batchable generators only emit 2 values.
137 try:
137 try:
138 next(batchable)
138 next(batchable)
139 except StopIteration:
139 except StopIteration:
140 pass
140 pass
141 else:
141 else:
142 raise error.ProgrammingError('%s @batchable generator emitted '
142 raise error.ProgrammingError('%s @batchable generator emitted '
143 'unexpected value count' % command)
143 'unexpected value count' % command)
144
144
145 yield finalfuture.value
145 yield finalfuture.value
146
146
147 # Forward a couple of names from peer to make wireproto interactions
147 # Forward a couple of names from peer to make wireproto interactions
148 # slightly more sensible.
148 # slightly more sensible.
149 batchable = peer.batchable
149 batchable = peer.batchable
150 future = peer.future
150 future = peer.future
151
151
152 # list of nodes encoding / decoding
152 # list of nodes encoding / decoding
153
153
154 def decodelist(l, sep=' '):
154 def decodelist(l, sep=' '):
155 if l:
155 if l:
156 return [bin(v) for v in l.split(sep)]
156 return [bin(v) for v in l.split(sep)]
157 return []
157 return []
158
158
159 def encodelist(l, sep=' '):
159 def encodelist(l, sep=' '):
160 try:
160 try:
161 return sep.join(map(hex, l))
161 return sep.join(map(hex, l))
162 except TypeError:
162 except TypeError:
163 raise
163 raise
164
164
165 # batched call argument encoding
165 # batched call argument encoding
166
166
167 def escapearg(plain):
167 def escapearg(plain):
168 return (plain
168 return (plain
169 .replace(':', ':c')
169 .replace(':', ':c')
170 .replace(',', ':o')
170 .replace(',', ':o')
171 .replace(';', ':s')
171 .replace(';', ':s')
172 .replace('=', ':e'))
172 .replace('=', ':e'))
173
173
174 def unescapearg(escaped):
174 def unescapearg(escaped):
175 return (escaped
175 return (escaped
176 .replace(':e', '=')
176 .replace(':e', '=')
177 .replace(':s', ';')
177 .replace(':s', ';')
178 .replace(':o', ',')
178 .replace(':o', ',')
179 .replace(':c', ':'))
179 .replace(':c', ':'))
180
180
181 def encodebatchcmds(req):
181 def encodebatchcmds(req):
182 """Return a ``cmds`` argument value for the ``batch`` command."""
182 """Return a ``cmds`` argument value for the ``batch`` command."""
183 cmds = []
183 cmds = []
184 for op, argsdict in req:
184 for op, argsdict in req:
185 # Old servers didn't properly unescape argument names. So prevent
185 # Old servers didn't properly unescape argument names. So prevent
186 # the sending of argument names that may not be decoded properly by
186 # the sending of argument names that may not be decoded properly by
187 # servers.
187 # servers.
188 assert all(escapearg(k) == k for k in argsdict)
188 assert all(escapearg(k) == k for k in argsdict)
189
189
190 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
190 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
191 for k, v in argsdict.iteritems())
191 for k, v in argsdict.iteritems())
192 cmds.append('%s %s' % (op, args))
192 cmds.append('%s %s' % (op, args))
193
193
194 return ';'.join(cmds)
194 return ';'.join(cmds)
195
195
196 # mapping of options accepted by getbundle and their types
196 # mapping of options accepted by getbundle and their types
197 #
197 #
198 # Meant to be extended by extensions. It is extensions responsibility to ensure
198 # Meant to be extended by extensions. It is extensions responsibility to ensure
199 # such options are properly processed in exchange.getbundle.
199 # such options are properly processed in exchange.getbundle.
200 #
200 #
201 # supported types are:
201 # supported types are:
202 #
202 #
203 # :nodes: list of binary nodes
203 # :nodes: list of binary nodes
204 # :csv: list of comma-separated values
204 # :csv: list of comma-separated values
205 # :scsv: list of comma-separated values return as set
205 # :scsv: list of comma-separated values return as set
206 # :plain: string with no transformation needed.
206 # :plain: string with no transformation needed.
207 gboptsmap = {'heads': 'nodes',
207 gboptsmap = {'heads': 'nodes',
208 'bookmarks': 'boolean',
208 'bookmarks': 'boolean',
209 'common': 'nodes',
209 'common': 'nodes',
210 'obsmarkers': 'boolean',
210 'obsmarkers': 'boolean',
211 'phases': 'boolean',
211 'phases': 'boolean',
212 'bundlecaps': 'scsv',
212 'bundlecaps': 'scsv',
213 'listkeys': 'csv',
213 'listkeys': 'csv',
214 'cg': 'boolean',
214 'cg': 'boolean',
215 'cbattempted': 'boolean'}
215 'cbattempted': 'boolean'}
216
216
217 # client side
217 # client side
218
218
219 class wirepeer(repository.legacypeer):
219 class wirepeer(repository.legacypeer):
220 """Client-side interface for communicating with a peer repository.
220 """Client-side interface for communicating with a peer repository.
221
221
222 Methods commonly call wire protocol commands of the same name.
222 Methods commonly call wire protocol commands of the same name.
223
223
224 See also httppeer.py and sshpeer.py for protocol-specific
224 See also httppeer.py and sshpeer.py for protocol-specific
225 implementations of this interface.
225 implementations of this interface.
226 """
226 """
227 # Begin of basewirepeer interface.
227 # Begin of basewirepeer interface.
228
228
229 def iterbatch(self):
229 def iterbatch(self):
230 return remoteiterbatcher(self)
230 return remoteiterbatcher(self)
231
231
232 @batchable
232 @batchable
233 def lookup(self, key):
233 def lookup(self, key):
234 self.requirecap('lookup', _('look up remote revision'))
234 self.requirecap('lookup', _('look up remote revision'))
235 f = future()
235 f = future()
236 yield {'key': encoding.fromlocal(key)}, f
236 yield {'key': encoding.fromlocal(key)}, f
237 d = f.value
237 d = f.value
238 success, data = d[:-1].split(" ", 1)
238 success, data = d[:-1].split(" ", 1)
239 if int(success):
239 if int(success):
240 yield bin(data)
240 yield bin(data)
241 else:
241 else:
242 self._abort(error.RepoError(data))
242 self._abort(error.RepoError(data))
243
243
244 @batchable
244 @batchable
245 def heads(self):
245 def heads(self):
246 f = future()
246 f = future()
247 yield {}, f
247 yield {}, f
248 d = f.value
248 d = f.value
249 try:
249 try:
250 yield decodelist(d[:-1])
250 yield decodelist(d[:-1])
251 except ValueError:
251 except ValueError:
252 self._abort(error.ResponseError(_("unexpected response:"), d))
252 self._abort(error.ResponseError(_("unexpected response:"), d))
253
253
254 @batchable
254 @batchable
255 def known(self, nodes):
255 def known(self, nodes):
256 f = future()
256 f = future()
257 yield {'nodes': encodelist(nodes)}, f
257 yield {'nodes': encodelist(nodes)}, f
258 d = f.value
258 d = f.value
259 try:
259 try:
260 yield [bool(int(b)) for b in d]
260 yield [bool(int(b)) for b in d]
261 except ValueError:
261 except ValueError:
262 self._abort(error.ResponseError(_("unexpected response:"), d))
262 self._abort(error.ResponseError(_("unexpected response:"), d))
263
263
264 @batchable
264 @batchable
265 def branchmap(self):
265 def branchmap(self):
266 f = future()
266 f = future()
267 yield {}, f
267 yield {}, f
268 d = f.value
268 d = f.value
269 try:
269 try:
270 branchmap = {}
270 branchmap = {}
271 for branchpart in d.splitlines():
271 for branchpart in d.splitlines():
272 branchname, branchheads = branchpart.split(' ', 1)
272 branchname, branchheads = branchpart.split(' ', 1)
273 branchname = encoding.tolocal(urlreq.unquote(branchname))
273 branchname = encoding.tolocal(urlreq.unquote(branchname))
274 branchheads = decodelist(branchheads)
274 branchheads = decodelist(branchheads)
275 branchmap[branchname] = branchheads
275 branchmap[branchname] = branchheads
276 yield branchmap
276 yield branchmap
277 except TypeError:
277 except TypeError:
278 self._abort(error.ResponseError(_("unexpected response:"), d))
278 self._abort(error.ResponseError(_("unexpected response:"), d))
279
279
280 @batchable
280 @batchable
281 def listkeys(self, namespace):
281 def listkeys(self, namespace):
282 if not self.capable('pushkey'):
282 if not self.capable('pushkey'):
283 yield {}, None
283 yield {}, None
284 f = future()
284 f = future()
285 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
285 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
286 yield {'namespace': encoding.fromlocal(namespace)}, f
286 yield {'namespace': encoding.fromlocal(namespace)}, f
287 d = f.value
287 d = f.value
288 self.ui.debug('received listkey for "%s": %i bytes\n'
288 self.ui.debug('received listkey for "%s": %i bytes\n'
289 % (namespace, len(d)))
289 % (namespace, len(d)))
290 yield pushkeymod.decodekeys(d)
290 yield pushkeymod.decodekeys(d)
291
291
292 @batchable
292 @batchable
293 def pushkey(self, namespace, key, old, new):
293 def pushkey(self, namespace, key, old, new):
294 if not self.capable('pushkey'):
294 if not self.capable('pushkey'):
295 yield False, None
295 yield False, None
296 f = future()
296 f = future()
297 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
297 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
298 yield {'namespace': encoding.fromlocal(namespace),
298 yield {'namespace': encoding.fromlocal(namespace),
299 'key': encoding.fromlocal(key),
299 'key': encoding.fromlocal(key),
300 'old': encoding.fromlocal(old),
300 'old': encoding.fromlocal(old),
301 'new': encoding.fromlocal(new)}, f
301 'new': encoding.fromlocal(new)}, f
302 d = f.value
302 d = f.value
303 d, output = d.split('\n', 1)
303 d, output = d.split('\n', 1)
304 try:
304 try:
305 d = bool(int(d))
305 d = bool(int(d))
306 except ValueError:
306 except ValueError:
307 raise error.ResponseError(
307 raise error.ResponseError(
308 _('push failed (unexpected response):'), d)
308 _('push failed (unexpected response):'), d)
309 for l in output.splitlines(True):
309 for l in output.splitlines(True):
310 self.ui.status(_('remote: '), l)
310 self.ui.status(_('remote: '), l)
311 yield d
311 yield d
312
312
313 def stream_out(self):
313 def stream_out(self):
314 return self._callstream('stream_out')
314 return self._callstream('stream_out')
315
315
316 def getbundle(self, source, **kwargs):
316 def getbundle(self, source, **kwargs):
317 kwargs = pycompat.byteskwargs(kwargs)
317 kwargs = pycompat.byteskwargs(kwargs)
318 self.requirecap('getbundle', _('look up remote changes'))
318 self.requirecap('getbundle', _('look up remote changes'))
319 opts = {}
319 opts = {}
320 bundlecaps = kwargs.get('bundlecaps')
320 bundlecaps = kwargs.get('bundlecaps')
321 if bundlecaps is not None:
321 if bundlecaps is not None:
322 kwargs['bundlecaps'] = sorted(bundlecaps)
322 kwargs['bundlecaps'] = sorted(bundlecaps)
323 else:
323 else:
324 bundlecaps = () # kwargs could have it to None
324 bundlecaps = () # kwargs could have it to None
325 for key, value in kwargs.iteritems():
325 for key, value in kwargs.iteritems():
326 if value is None:
326 if value is None:
327 continue
327 continue
328 keytype = gboptsmap.get(key)
328 keytype = gboptsmap.get(key)
329 if keytype is None:
329 if keytype is None:
330 raise error.ProgrammingError(
330 raise error.ProgrammingError(
331 'Unexpectedly None keytype for key %s' % key)
331 'Unexpectedly None keytype for key %s' % key)
332 elif keytype == 'nodes':
332 elif keytype == 'nodes':
333 value = encodelist(value)
333 value = encodelist(value)
334 elif keytype in ('csv', 'scsv'):
334 elif keytype in ('csv', 'scsv'):
335 value = ','.join(value)
335 value = ','.join(value)
336 elif keytype == 'boolean':
336 elif keytype == 'boolean':
337 value = '%i' % bool(value)
337 value = '%i' % bool(value)
338 elif keytype != 'plain':
338 elif keytype != 'plain':
339 raise KeyError('unknown getbundle option type %s'
339 raise KeyError('unknown getbundle option type %s'
340 % keytype)
340 % keytype)
341 opts[key] = value
341 opts[key] = value
342 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
342 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
343 if any((cap.startswith('HG2') for cap in bundlecaps)):
343 if any((cap.startswith('HG2') for cap in bundlecaps)):
344 return bundle2.getunbundler(self.ui, f)
344 return bundle2.getunbundler(self.ui, f)
345 else:
345 else:
346 return changegroupmod.cg1unpacker(f, 'UN')
346 return changegroupmod.cg1unpacker(f, 'UN')
347
347
348 def unbundle(self, cg, heads, url):
348 def unbundle(self, cg, heads, url):
349 '''Send cg (a readable file-like object representing the
349 '''Send cg (a readable file-like object representing the
350 changegroup to push, typically a chunkbuffer object) to the
350 changegroup to push, typically a chunkbuffer object) to the
351 remote server as a bundle.
351 remote server as a bundle.
352
352
353 When pushing a bundle10 stream, return an integer indicating the
353 When pushing a bundle10 stream, return an integer indicating the
354 result of the push (see changegroup.apply()).
354 result of the push (see changegroup.apply()).
355
355
356 When pushing a bundle20 stream, return a bundle20 stream.
356 When pushing a bundle20 stream, return a bundle20 stream.
357
357
358 `url` is the url the client thinks it's pushing to, which is
358 `url` is the url the client thinks it's pushing to, which is
359 visible to hooks.
359 visible to hooks.
360 '''
360 '''
361
361
362 if heads != ['force'] and self.capable('unbundlehash'):
362 if heads != ['force'] and self.capable('unbundlehash'):
363 heads = encodelist(['hashed',
363 heads = encodelist(['hashed',
364 hashlib.sha1(''.join(sorted(heads))).digest()])
364 hashlib.sha1(''.join(sorted(heads))).digest()])
365 else:
365 else:
366 heads = encodelist(heads)
366 heads = encodelist(heads)
367
367
368 if util.safehasattr(cg, 'deltaheader'):
368 if util.safehasattr(cg, 'deltaheader'):
369 # this a bundle10, do the old style call sequence
369 # this a bundle10, do the old style call sequence
370 ret, output = self._callpush("unbundle", cg, heads=heads)
370 ret, output = self._callpush("unbundle", cg, heads=heads)
371 if ret == "":
371 if ret == "":
372 raise error.ResponseError(
372 raise error.ResponseError(
373 _('push failed:'), output)
373 _('push failed:'), output)
374 try:
374 try:
375 ret = int(ret)
375 ret = int(ret)
376 except ValueError:
376 except ValueError:
377 raise error.ResponseError(
377 raise error.ResponseError(
378 _('push failed (unexpected response):'), ret)
378 _('push failed (unexpected response):'), ret)
379
379
380 for l in output.splitlines(True):
380 for l in output.splitlines(True):
381 self.ui.status(_('remote: '), l)
381 self.ui.status(_('remote: '), l)
382 else:
382 else:
383 # bundle2 push. Send a stream, fetch a stream.
383 # bundle2 push. Send a stream, fetch a stream.
384 stream = self._calltwowaystream('unbundle', cg, heads=heads)
384 stream = self._calltwowaystream('unbundle', cg, heads=heads)
385 ret = bundle2.getunbundler(self.ui, stream)
385 ret = bundle2.getunbundler(self.ui, stream)
386 return ret
386 return ret
387
387
388 # End of basewirepeer interface.
388 # End of basewirepeer interface.
389
389
390 # Begin of baselegacywirepeer interface.
390 # Begin of baselegacywirepeer interface.
391
391
392 def branches(self, nodes):
392 def branches(self, nodes):
393 n = encodelist(nodes)
393 n = encodelist(nodes)
394 d = self._call("branches", nodes=n)
394 d = self._call("branches", nodes=n)
395 try:
395 try:
396 br = [tuple(decodelist(b)) for b in d.splitlines()]
396 br = [tuple(decodelist(b)) for b in d.splitlines()]
397 return br
397 return br
398 except ValueError:
398 except ValueError:
399 self._abort(error.ResponseError(_("unexpected response:"), d))
399 self._abort(error.ResponseError(_("unexpected response:"), d))
400
400
401 def between(self, pairs):
401 def between(self, pairs):
402 batch = 8 # avoid giant requests
402 batch = 8 # avoid giant requests
403 r = []
403 r = []
404 for i in xrange(0, len(pairs), batch):
404 for i in xrange(0, len(pairs), batch):
405 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
405 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
406 d = self._call("between", pairs=n)
406 d = self._call("between", pairs=n)
407 try:
407 try:
408 r.extend(l and decodelist(l) or [] for l in d.splitlines())
408 r.extend(l and decodelist(l) or [] for l in d.splitlines())
409 except ValueError:
409 except ValueError:
410 self._abort(error.ResponseError(_("unexpected response:"), d))
410 self._abort(error.ResponseError(_("unexpected response:"), d))
411 return r
411 return r
412
412
413 def changegroup(self, nodes, kind):
413 def changegroup(self, nodes, kind):
414 n = encodelist(nodes)
414 n = encodelist(nodes)
415 f = self._callcompressable("changegroup", roots=n)
415 f = self._callcompressable("changegroup", roots=n)
416 return changegroupmod.cg1unpacker(f, 'UN')
416 return changegroupmod.cg1unpacker(f, 'UN')
417
417
418 def changegroupsubset(self, bases, heads, kind):
418 def changegroupsubset(self, bases, heads, kind):
419 self.requirecap('changegroupsubset', _('look up remote changes'))
419 self.requirecap('changegroupsubset', _('look up remote changes'))
420 bases = encodelist(bases)
420 bases = encodelist(bases)
421 heads = encodelist(heads)
421 heads = encodelist(heads)
422 f = self._callcompressable("changegroupsubset",
422 f = self._callcompressable("changegroupsubset",
423 bases=bases, heads=heads)
423 bases=bases, heads=heads)
424 return changegroupmod.cg1unpacker(f, 'UN')
424 return changegroupmod.cg1unpacker(f, 'UN')
425
425
426 # End of baselegacywirepeer interface.
426 # End of baselegacywirepeer interface.
427
427
428 def _submitbatch(self, req):
428 def _submitbatch(self, req):
429 """run batch request <req> on the server
429 """run batch request <req> on the server
430
430
431 Returns an iterator of the raw responses from the server.
431 Returns an iterator of the raw responses from the server.
432 """
432 """
433 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
433 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
434 chunk = rsp.read(1024)
434 chunk = rsp.read(1024)
435 work = [chunk]
435 work = [chunk]
436 while chunk:
436 while chunk:
437 while ';' not in chunk and chunk:
437 while ';' not in chunk and chunk:
438 chunk = rsp.read(1024)
438 chunk = rsp.read(1024)
439 work.append(chunk)
439 work.append(chunk)
440 merged = ''.join(work)
440 merged = ''.join(work)
441 while ';' in merged:
441 while ';' in merged:
442 one, merged = merged.split(';', 1)
442 one, merged = merged.split(';', 1)
443 yield unescapearg(one)
443 yield unescapearg(one)
444 chunk = rsp.read(1024)
444 chunk = rsp.read(1024)
445 work = [merged, chunk]
445 work = [merged, chunk]
446 yield unescapearg(''.join(work))
446 yield unescapearg(''.join(work))
447
447
448 def _submitone(self, op, args):
448 def _submitone(self, op, args):
449 return self._call(op, **pycompat.strkwargs(args))
449 return self._call(op, **pycompat.strkwargs(args))
450
450
451 def debugwireargs(self, one, two, three=None, four=None, five=None):
451 def debugwireargs(self, one, two, three=None, four=None, five=None):
452 # don't pass optional arguments left at their default value
452 # don't pass optional arguments left at their default value
453 opts = {}
453 opts = {}
454 if three is not None:
454 if three is not None:
455 opts['three'] = three
455 opts[r'three'] = three
456 if four is not None:
456 if four is not None:
457 opts['four'] = four
457 opts[r'four'] = four
458 return self._call('debugwireargs', one=one, two=two, **opts)
458 return self._call('debugwireargs', one=one, two=two, **opts)
459
459
460 def _call(self, cmd, **args):
460 def _call(self, cmd, **args):
461 """execute <cmd> on the server
461 """execute <cmd> on the server
462
462
463 The command is expected to return a simple string.
463 The command is expected to return a simple string.
464
464
465 returns the server reply as a string."""
465 returns the server reply as a string."""
466 raise NotImplementedError()
466 raise NotImplementedError()
467
467
468 def _callstream(self, cmd, **args):
468 def _callstream(self, cmd, **args):
469 """execute <cmd> on the server
469 """execute <cmd> on the server
470
470
471 The command is expected to return a stream. Note that if the
471 The command is expected to return a stream. Note that if the
472 command doesn't return a stream, _callstream behaves
472 command doesn't return a stream, _callstream behaves
473 differently for ssh and http peers.
473 differently for ssh and http peers.
474
474
475 returns the server reply as a file like object.
475 returns the server reply as a file like object.
476 """
476 """
477 raise NotImplementedError()
477 raise NotImplementedError()
478
478
479 def _callcompressable(self, cmd, **args):
479 def _callcompressable(self, cmd, **args):
480 """execute <cmd> on the server
480 """execute <cmd> on the server
481
481
482 The command is expected to return a stream.
482 The command is expected to return a stream.
483
483
484 The stream may have been compressed in some implementations. This
484 The stream may have been compressed in some implementations. This
485 function takes care of the decompression. This is the only difference
485 function takes care of the decompression. This is the only difference
486 with _callstream.
486 with _callstream.
487
487
488 returns the server reply as a file like object.
488 returns the server reply as a file like object.
489 """
489 """
490 raise NotImplementedError()
490 raise NotImplementedError()
491
491
492 def _callpush(self, cmd, fp, **args):
492 def _callpush(self, cmd, fp, **args):
493 """execute a <cmd> on server
493 """execute a <cmd> on server
494
494
495 The command is expected to be related to a push. Push has a special
495 The command is expected to be related to a push. Push has a special
496 return method.
496 return method.
497
497
498 returns the server reply as a (ret, output) tuple. ret is either
498 returns the server reply as a (ret, output) tuple. ret is either
499 empty (error) or a stringified int.
499 empty (error) or a stringified int.
500 """
500 """
501 raise NotImplementedError()
501 raise NotImplementedError()
502
502
503 def _calltwowaystream(self, cmd, fp, **args):
503 def _calltwowaystream(self, cmd, fp, **args):
504 """execute <cmd> on server
504 """execute <cmd> on server
505
505
506 The command will send a stream to the server and get a stream in reply.
506 The command will send a stream to the server and get a stream in reply.
507 """
507 """
508 raise NotImplementedError()
508 raise NotImplementedError()
509
509
510 def _abort(self, exception):
510 def _abort(self, exception):
511 """clearly abort the wire protocol connection and raise the exception
511 """clearly abort the wire protocol connection and raise the exception
512 """
512 """
513 raise NotImplementedError()
513 raise NotImplementedError()
514
514
515 # server side
515 # server side
516
516
517 # wire protocol command can either return a string or one of these classes.
517 # wire protocol command can either return a string or one of these classes.
518 class streamres(object):
518 class streamres(object):
519 """wireproto reply: binary stream
519 """wireproto reply: binary stream
520
520
521 The call was successful and the result is a stream.
521 The call was successful and the result is a stream.
522
522
523 Accepts either a generator or an object with a ``read(size)`` method.
523 Accepts either a generator or an object with a ``read(size)`` method.
524
524
525 ``v1compressible`` indicates whether this data can be compressed to
525 ``v1compressible`` indicates whether this data can be compressed to
526 "version 1" clients (technically: HTTP peers using
526 "version 1" clients (technically: HTTP peers using
527 application/mercurial-0.1 media type). This flag should NOT be used on
527 application/mercurial-0.1 media type). This flag should NOT be used on
528 new commands because new clients should support a more modern compression
528 new commands because new clients should support a more modern compression
529 mechanism.
529 mechanism.
530 """
530 """
531 def __init__(self, gen=None, reader=None, v1compressible=False):
531 def __init__(self, gen=None, reader=None, v1compressible=False):
532 self.gen = gen
532 self.gen = gen
533 self.reader = reader
533 self.reader = reader
534 self.v1compressible = v1compressible
534 self.v1compressible = v1compressible
535
535
536 class pushres(object):
536 class pushres(object):
537 """wireproto reply: success with simple integer return
537 """wireproto reply: success with simple integer return
538
538
539 The call was successful and returned an integer contained in `self.res`.
539 The call was successful and returned an integer contained in `self.res`.
540 """
540 """
541 def __init__(self, res):
541 def __init__(self, res):
542 self.res = res
542 self.res = res
543
543
544 class pusherr(object):
544 class pusherr(object):
545 """wireproto reply: failure
545 """wireproto reply: failure
546
546
547 The call failed. The `self.res` attribute contains the error message.
547 The call failed. The `self.res` attribute contains the error message.
548 """
548 """
549 def __init__(self, res):
549 def __init__(self, res):
550 self.res = res
550 self.res = res
551
551
552 class ooberror(object):
552 class ooberror(object):
553 """wireproto reply: failure of a batch of operation
553 """wireproto reply: failure of a batch of operation
554
554
555 Something failed during a batch call. The error message is stored in
555 Something failed during a batch call. The error message is stored in
556 `self.message`.
556 `self.message`.
557 """
557 """
558 def __init__(self, message):
558 def __init__(self, message):
559 self.message = message
559 self.message = message
560
560
561 def getdispatchrepo(repo, proto, command):
561 def getdispatchrepo(repo, proto, command):
562 """Obtain the repo used for processing wire protocol commands.
562 """Obtain the repo used for processing wire protocol commands.
563
563
564 The intent of this function is to serve as a monkeypatch point for
564 The intent of this function is to serve as a monkeypatch point for
565 extensions that need commands to operate on different repo views under
565 extensions that need commands to operate on different repo views under
566 specialized circumstances.
566 specialized circumstances.
567 """
567 """
568 return repo.filtered('served')
568 return repo.filtered('served')
569
569
570 def dispatch(repo, proto, command):
570 def dispatch(repo, proto, command):
571 repo = getdispatchrepo(repo, proto, command)
571 repo = getdispatchrepo(repo, proto, command)
572 func, spec = commands[command]
572 func, spec = commands[command]
573 args = proto.getargs(spec)
573 args = proto.getargs(spec)
574 return func(repo, proto, *args)
574 return func(repo, proto, *args)
575
575
576 def options(cmd, keys, others):
576 def options(cmd, keys, others):
577 opts = {}
577 opts = {}
578 for k in keys:
578 for k in keys:
579 if k in others:
579 if k in others:
580 opts[k] = others[k]
580 opts[k] = others[k]
581 del others[k]
581 del others[k]
582 if others:
582 if others:
583 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
583 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
584 % (cmd, ",".join(others)))
584 % (cmd, ",".join(others)))
585 return opts
585 return opts
586
586
587 def bundle1allowed(repo, action):
587 def bundle1allowed(repo, action):
588 """Whether a bundle1 operation is allowed from the server.
588 """Whether a bundle1 operation is allowed from the server.
589
589
590 Priority is:
590 Priority is:
591
591
592 1. server.bundle1gd.<action> (if generaldelta active)
592 1. server.bundle1gd.<action> (if generaldelta active)
593 2. server.bundle1.<action>
593 2. server.bundle1.<action>
594 3. server.bundle1gd (if generaldelta active)
594 3. server.bundle1gd (if generaldelta active)
595 4. server.bundle1
595 4. server.bundle1
596 """
596 """
597 ui = repo.ui
597 ui = repo.ui
598 gd = 'generaldelta' in repo.requirements
598 gd = 'generaldelta' in repo.requirements
599
599
600 if gd:
600 if gd:
601 v = ui.configbool('server', 'bundle1gd.%s' % action)
601 v = ui.configbool('server', 'bundle1gd.%s' % action)
602 if v is not None:
602 if v is not None:
603 return v
603 return v
604
604
605 v = ui.configbool('server', 'bundle1.%s' % action)
605 v = ui.configbool('server', 'bundle1.%s' % action)
606 if v is not None:
606 if v is not None:
607 return v
607 return v
608
608
609 if gd:
609 if gd:
610 v = ui.configbool('server', 'bundle1gd')
610 v = ui.configbool('server', 'bundle1gd')
611 if v is not None:
611 if v is not None:
612 return v
612 return v
613
613
614 return ui.configbool('server', 'bundle1')
614 return ui.configbool('server', 'bundle1')
615
615
616 def supportedcompengines(ui, proto, role):
616 def supportedcompengines(ui, proto, role):
617 """Obtain the list of supported compression engines for a request."""
617 """Obtain the list of supported compression engines for a request."""
618 assert role in (util.CLIENTROLE, util.SERVERROLE)
618 assert role in (util.CLIENTROLE, util.SERVERROLE)
619
619
620 compengines = util.compengines.supportedwireengines(role)
620 compengines = util.compengines.supportedwireengines(role)
621
621
622 # Allow config to override default list and ordering.
622 # Allow config to override default list and ordering.
623 if role == util.SERVERROLE:
623 if role == util.SERVERROLE:
624 configengines = ui.configlist('server', 'compressionengines')
624 configengines = ui.configlist('server', 'compressionengines')
625 config = 'server.compressionengines'
625 config = 'server.compressionengines'
626 else:
626 else:
627 # This is currently implemented mainly to facilitate testing. In most
627 # This is currently implemented mainly to facilitate testing. In most
628 # cases, the server should be in charge of choosing a compression engine
628 # cases, the server should be in charge of choosing a compression engine
629 # because a server has the most to lose from a sub-optimal choice. (e.g.
629 # because a server has the most to lose from a sub-optimal choice. (e.g.
630 # CPU DoS due to an expensive engine or a network DoS due to poor
630 # CPU DoS due to an expensive engine or a network DoS due to poor
631 # compression ratio).
631 # compression ratio).
632 configengines = ui.configlist('experimental',
632 configengines = ui.configlist('experimental',
633 'clientcompressionengines')
633 'clientcompressionengines')
634 config = 'experimental.clientcompressionengines'
634 config = 'experimental.clientcompressionengines'
635
635
636 # No explicit config. Filter out the ones that aren't supposed to be
636 # No explicit config. Filter out the ones that aren't supposed to be
637 # advertised and return default ordering.
637 # advertised and return default ordering.
638 if not configengines:
638 if not configengines:
639 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
639 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
640 return [e for e in compengines
640 return [e for e in compengines
641 if getattr(e.wireprotosupport(), attr) > 0]
641 if getattr(e.wireprotosupport(), attr) > 0]
642
642
643 # If compression engines are listed in the config, assume there is a good
643 # If compression engines are listed in the config, assume there is a good
644 # reason for it (like server operators wanting to achieve specific
644 # reason for it (like server operators wanting to achieve specific
645 # performance characteristics). So fail fast if the config references
645 # performance characteristics). So fail fast if the config references
646 # unusable compression engines.
646 # unusable compression engines.
647 validnames = set(e.name() for e in compengines)
647 validnames = set(e.name() for e in compengines)
648 invalidnames = set(e for e in configengines if e not in validnames)
648 invalidnames = set(e for e in configengines if e not in validnames)
649 if invalidnames:
649 if invalidnames:
650 raise error.Abort(_('invalid compression engine defined in %s: %s') %
650 raise error.Abort(_('invalid compression engine defined in %s: %s') %
651 (config, ', '.join(sorted(invalidnames))))
651 (config, ', '.join(sorted(invalidnames))))
652
652
653 compengines = [e for e in compengines if e.name() in configengines]
653 compengines = [e for e in compengines if e.name() in configengines]
654 compengines = sorted(compengines,
654 compengines = sorted(compengines,
655 key=lambda e: configengines.index(e.name()))
655 key=lambda e: configengines.index(e.name()))
656
656
657 if not compengines:
657 if not compengines:
658 raise error.Abort(_('%s config option does not specify any known '
658 raise error.Abort(_('%s config option does not specify any known '
659 'compression engines') % config,
659 'compression engines') % config,
660 hint=_('usable compression engines: %s') %
660 hint=_('usable compression engines: %s') %
661 ', '.sorted(validnames))
661 ', '.sorted(validnames))
662
662
663 return compengines
663 return compengines
664
664
665 # list of commands
665 # list of commands
666 commands = {}
666 commands = {}
667
667
668 def wireprotocommand(name, args=''):
668 def wireprotocommand(name, args=''):
669 """decorator for wire protocol command"""
669 """decorator for wire protocol command"""
670 def register(func):
670 def register(func):
671 commands[name] = (func, args)
671 commands[name] = (func, args)
672 return func
672 return func
673 return register
673 return register
674
674
675 @wireprotocommand('batch', 'cmds *')
675 @wireprotocommand('batch', 'cmds *')
676 def batch(repo, proto, cmds, others):
676 def batch(repo, proto, cmds, others):
677 repo = repo.filtered("served")
677 repo = repo.filtered("served")
678 res = []
678 res = []
679 for pair in cmds.split(';'):
679 for pair in cmds.split(';'):
680 op, args = pair.split(' ', 1)
680 op, args = pair.split(' ', 1)
681 vals = {}
681 vals = {}
682 for a in args.split(','):
682 for a in args.split(','):
683 if a:
683 if a:
684 n, v = a.split('=')
684 n, v = a.split('=')
685 vals[unescapearg(n)] = unescapearg(v)
685 vals[unescapearg(n)] = unescapearg(v)
686 func, spec = commands[op]
686 func, spec = commands[op]
687 if spec:
687 if spec:
688 keys = spec.split()
688 keys = spec.split()
689 data = {}
689 data = {}
690 for k in keys:
690 for k in keys:
691 if k == '*':
691 if k == '*':
692 star = {}
692 star = {}
693 for key in vals.keys():
693 for key in vals.keys():
694 if key not in keys:
694 if key not in keys:
695 star[key] = vals[key]
695 star[key] = vals[key]
696 data['*'] = star
696 data['*'] = star
697 else:
697 else:
698 data[k] = vals[k]
698 data[k] = vals[k]
699 result = func(repo, proto, *[data[k] for k in keys])
699 result = func(repo, proto, *[data[k] for k in keys])
700 else:
700 else:
701 result = func(repo, proto)
701 result = func(repo, proto)
702 if isinstance(result, ooberror):
702 if isinstance(result, ooberror):
703 return result
703 return result
704 res.append(escapearg(result))
704 res.append(escapearg(result))
705 return ';'.join(res)
705 return ';'.join(res)
706
706
707 @wireprotocommand('between', 'pairs')
707 @wireprotocommand('between', 'pairs')
708 def between(repo, proto, pairs):
708 def between(repo, proto, pairs):
709 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
709 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
710 r = []
710 r = []
711 for b in repo.between(pairs):
711 for b in repo.between(pairs):
712 r.append(encodelist(b) + "\n")
712 r.append(encodelist(b) + "\n")
713 return "".join(r)
713 return "".join(r)
714
714
715 @wireprotocommand('branchmap')
715 @wireprotocommand('branchmap')
716 def branchmap(repo, proto):
716 def branchmap(repo, proto):
717 branchmap = repo.branchmap()
717 branchmap = repo.branchmap()
718 heads = []
718 heads = []
719 for branch, nodes in branchmap.iteritems():
719 for branch, nodes in branchmap.iteritems():
720 branchname = urlreq.quote(encoding.fromlocal(branch))
720 branchname = urlreq.quote(encoding.fromlocal(branch))
721 branchnodes = encodelist(nodes)
721 branchnodes = encodelist(nodes)
722 heads.append('%s %s' % (branchname, branchnodes))
722 heads.append('%s %s' % (branchname, branchnodes))
723 return '\n'.join(heads)
723 return '\n'.join(heads)
724
724
725 @wireprotocommand('branches', 'nodes')
725 @wireprotocommand('branches', 'nodes')
726 def branches(repo, proto, nodes):
726 def branches(repo, proto, nodes):
727 nodes = decodelist(nodes)
727 nodes = decodelist(nodes)
728 r = []
728 r = []
729 for b in repo.branches(nodes):
729 for b in repo.branches(nodes):
730 r.append(encodelist(b) + "\n")
730 r.append(encodelist(b) + "\n")
731 return "".join(r)
731 return "".join(r)
732
732
733 @wireprotocommand('clonebundles', '')
733 @wireprotocommand('clonebundles', '')
734 def clonebundles(repo, proto):
734 def clonebundles(repo, proto):
735 """Server command for returning info for available bundles to seed clones.
735 """Server command for returning info for available bundles to seed clones.
736
736
737 Clients will parse this response and determine what bundle to fetch.
737 Clients will parse this response and determine what bundle to fetch.
738
738
739 Extensions may wrap this command to filter or dynamically emit data
739 Extensions may wrap this command to filter or dynamically emit data
740 depending on the request. e.g. you could advertise URLs for the closest
740 depending on the request. e.g. you could advertise URLs for the closest
741 data center given the client's IP address.
741 data center given the client's IP address.
742 """
742 """
743 return repo.vfs.tryread('clonebundles.manifest')
743 return repo.vfs.tryread('clonebundles.manifest')
744
744
745 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
745 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
746 'known', 'getbundle', 'unbundlehash', 'batch']
746 'known', 'getbundle', 'unbundlehash', 'batch']
747
747
748 def _capabilities(repo, proto):
748 def _capabilities(repo, proto):
749 """return a list of capabilities for a repo
749 """return a list of capabilities for a repo
750
750
751 This function exists to allow extensions to easily wrap capabilities
751 This function exists to allow extensions to easily wrap capabilities
752 computation
752 computation
753
753
754 - returns a lists: easy to alter
754 - returns a lists: easy to alter
755 - change done here will be propagated to both `capabilities` and `hello`
755 - change done here will be propagated to both `capabilities` and `hello`
756 command without any other action needed.
756 command without any other action needed.
757 """
757 """
758 # copy to prevent modification of the global list
758 # copy to prevent modification of the global list
759 caps = list(wireprotocaps)
759 caps = list(wireprotocaps)
760 if streamclone.allowservergeneration(repo):
760 if streamclone.allowservergeneration(repo):
761 if repo.ui.configbool('server', 'preferuncompressed'):
761 if repo.ui.configbool('server', 'preferuncompressed'):
762 caps.append('stream-preferred')
762 caps.append('stream-preferred')
763 requiredformats = repo.requirements & repo.supportedformats
763 requiredformats = repo.requirements & repo.supportedformats
764 # if our local revlogs are just revlogv1, add 'stream' cap
764 # if our local revlogs are just revlogv1, add 'stream' cap
765 if not requiredformats - {'revlogv1'}:
765 if not requiredformats - {'revlogv1'}:
766 caps.append('stream')
766 caps.append('stream')
767 # otherwise, add 'streamreqs' detailing our local revlog format
767 # otherwise, add 'streamreqs' detailing our local revlog format
768 else:
768 else:
769 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
769 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
770 if repo.ui.configbool('experimental', 'bundle2-advertise'):
770 if repo.ui.configbool('experimental', 'bundle2-advertise'):
771 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
771 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
772 caps.append('bundle2=' + urlreq.quote(capsblob))
772 caps.append('bundle2=' + urlreq.quote(capsblob))
773 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
773 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
774
774
775 if proto.name == 'http':
775 if proto.name == 'http':
776 caps.append('httpheader=%d' %
776 caps.append('httpheader=%d' %
777 repo.ui.configint('server', 'maxhttpheaderlen'))
777 repo.ui.configint('server', 'maxhttpheaderlen'))
778 if repo.ui.configbool('experimental', 'httppostargs'):
778 if repo.ui.configbool('experimental', 'httppostargs'):
779 caps.append('httppostargs')
779 caps.append('httppostargs')
780
780
781 # FUTURE advertise 0.2rx once support is implemented
781 # FUTURE advertise 0.2rx once support is implemented
782 # FUTURE advertise minrx and mintx after consulting config option
782 # FUTURE advertise minrx and mintx after consulting config option
783 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
783 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
784
784
785 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
785 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
786 if compengines:
786 if compengines:
787 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
787 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
788 for e in compengines)
788 for e in compengines)
789 caps.append('compression=%s' % comptypes)
789 caps.append('compression=%s' % comptypes)
790
790
791 return caps
791 return caps
792
792
793 # If you are writing an extension and consider wrapping this function. Wrap
793 # If you are writing an extension and consider wrapping this function. Wrap
794 # `_capabilities` instead.
794 # `_capabilities` instead.
795 @wireprotocommand('capabilities')
795 @wireprotocommand('capabilities')
796 def capabilities(repo, proto):
796 def capabilities(repo, proto):
797 return ' '.join(_capabilities(repo, proto))
797 return ' '.join(_capabilities(repo, proto))
798
798
799 @wireprotocommand('changegroup', 'roots')
799 @wireprotocommand('changegroup', 'roots')
800 def changegroup(repo, proto, roots):
800 def changegroup(repo, proto, roots):
801 nodes = decodelist(roots)
801 nodes = decodelist(roots)
802 outgoing = discovery.outgoing(repo, missingroots=nodes,
802 outgoing = discovery.outgoing(repo, missingroots=nodes,
803 missingheads=repo.heads())
803 missingheads=repo.heads())
804 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
804 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
805 return streamres(reader=cg, v1compressible=True)
805 return streamres(reader=cg, v1compressible=True)
806
806
807 @wireprotocommand('changegroupsubset', 'bases heads')
807 @wireprotocommand('changegroupsubset', 'bases heads')
808 def changegroupsubset(repo, proto, bases, heads):
808 def changegroupsubset(repo, proto, bases, heads):
809 bases = decodelist(bases)
809 bases = decodelist(bases)
810 heads = decodelist(heads)
810 heads = decodelist(heads)
811 outgoing = discovery.outgoing(repo, missingroots=bases,
811 outgoing = discovery.outgoing(repo, missingroots=bases,
812 missingheads=heads)
812 missingheads=heads)
813 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
813 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
814 return streamres(reader=cg, v1compressible=True)
814 return streamres(reader=cg, v1compressible=True)
815
815
816 @wireprotocommand('debugwireargs', 'one two *')
816 @wireprotocommand('debugwireargs', 'one two *')
817 def debugwireargs(repo, proto, one, two, others):
817 def debugwireargs(repo, proto, one, two, others):
818 # only accept optional args from the known set
818 # only accept optional args from the known set
819 opts = options('debugwireargs', ['three', 'four'], others)
819 opts = options('debugwireargs', ['three', 'four'], others)
820 return repo.debugwireargs(one, two, **opts)
820 return repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
821
821
822 @wireprotocommand('getbundle', '*')
822 @wireprotocommand('getbundle', '*')
823 def getbundle(repo, proto, others):
823 def getbundle(repo, proto, others):
824 opts = options('getbundle', gboptsmap.keys(), others)
824 opts = options('getbundle', gboptsmap.keys(), others)
825 for k, v in opts.iteritems():
825 for k, v in opts.iteritems():
826 keytype = gboptsmap[k]
826 keytype = gboptsmap[k]
827 if keytype == 'nodes':
827 if keytype == 'nodes':
828 opts[k] = decodelist(v)
828 opts[k] = decodelist(v)
829 elif keytype == 'csv':
829 elif keytype == 'csv':
830 opts[k] = list(v.split(','))
830 opts[k] = list(v.split(','))
831 elif keytype == 'scsv':
831 elif keytype == 'scsv':
832 opts[k] = set(v.split(','))
832 opts[k] = set(v.split(','))
833 elif keytype == 'boolean':
833 elif keytype == 'boolean':
834 # Client should serialize False as '0', which is a non-empty string
834 # Client should serialize False as '0', which is a non-empty string
835 # so it evaluates as a True bool.
835 # so it evaluates as a True bool.
836 if v == '0':
836 if v == '0':
837 opts[k] = False
837 opts[k] = False
838 else:
838 else:
839 opts[k] = bool(v)
839 opts[k] = bool(v)
840 elif keytype != 'plain':
840 elif keytype != 'plain':
841 raise KeyError('unknown getbundle option type %s'
841 raise KeyError('unknown getbundle option type %s'
842 % keytype)
842 % keytype)
843
843
844 if not bundle1allowed(repo, 'pull'):
844 if not bundle1allowed(repo, 'pull'):
845 if not exchange.bundle2requested(opts.get('bundlecaps')):
845 if not exchange.bundle2requested(opts.get('bundlecaps')):
846 if proto.name == 'http':
846 if proto.name == 'http':
847 return ooberror(bundle2required)
847 return ooberror(bundle2required)
848 raise error.Abort(bundle2requiredmain,
848 raise error.Abort(bundle2requiredmain,
849 hint=bundle2requiredhint)
849 hint=bundle2requiredhint)
850
850
851 try:
851 try:
852 if repo.ui.configbool('server', 'disablefullbundle'):
852 if repo.ui.configbool('server', 'disablefullbundle'):
853 # Check to see if this is a full clone.
853 # Check to see if this is a full clone.
854 clheads = set(repo.changelog.heads())
854 clheads = set(repo.changelog.heads())
855 heads = set(opts.get('heads', set()))
855 heads = set(opts.get('heads', set()))
856 common = set(opts.get('common', set()))
856 common = set(opts.get('common', set()))
857 common.discard(nullid)
857 common.discard(nullid)
858 if not common and clheads == heads:
858 if not common and clheads == heads:
859 raise error.Abort(
859 raise error.Abort(
860 _('server has pull-based clones disabled'),
860 _('server has pull-based clones disabled'),
861 hint=_('remove --pull if specified or upgrade Mercurial'))
861 hint=_('remove --pull if specified or upgrade Mercurial'))
862
862
863 chunks = exchange.getbundlechunks(repo, 'serve',
863 chunks = exchange.getbundlechunks(repo, 'serve',
864 **pycompat.strkwargs(opts))
864 **pycompat.strkwargs(opts))
865 except error.Abort as exc:
865 except error.Abort as exc:
866 # cleanly forward Abort error to the client
866 # cleanly forward Abort error to the client
867 if not exchange.bundle2requested(opts.get('bundlecaps')):
867 if not exchange.bundle2requested(opts.get('bundlecaps')):
868 if proto.name == 'http':
868 if proto.name == 'http':
869 return ooberror(str(exc) + '\n')
869 return ooberror(str(exc) + '\n')
870 raise # cannot do better for bundle1 + ssh
870 raise # cannot do better for bundle1 + ssh
871 # bundle2 request expect a bundle2 reply
871 # bundle2 request expect a bundle2 reply
872 bundler = bundle2.bundle20(repo.ui)
872 bundler = bundle2.bundle20(repo.ui)
873 manargs = [('message', str(exc))]
873 manargs = [('message', str(exc))]
874 advargs = []
874 advargs = []
875 if exc.hint is not None:
875 if exc.hint is not None:
876 advargs.append(('hint', exc.hint))
876 advargs.append(('hint', exc.hint))
877 bundler.addpart(bundle2.bundlepart('error:abort',
877 bundler.addpart(bundle2.bundlepart('error:abort',
878 manargs, advargs))
878 manargs, advargs))
879 return streamres(gen=bundler.getchunks(), v1compressible=True)
879 return streamres(gen=bundler.getchunks(), v1compressible=True)
880 return streamres(gen=chunks, v1compressible=True)
880 return streamres(gen=chunks, v1compressible=True)
881
881
882 @wireprotocommand('heads')
882 @wireprotocommand('heads')
883 def heads(repo, proto):
883 def heads(repo, proto):
884 h = repo.heads()
884 h = repo.heads()
885 return encodelist(h) + "\n"
885 return encodelist(h) + "\n"
886
886
887 @wireprotocommand('hello')
887 @wireprotocommand('hello')
888 def hello(repo, proto):
888 def hello(repo, proto):
889 '''the hello command returns a set of lines describing various
889 '''the hello command returns a set of lines describing various
890 interesting things about the server, in an RFC822-like format.
890 interesting things about the server, in an RFC822-like format.
891 Currently the only one defined is "capabilities", which
891 Currently the only one defined is "capabilities", which
892 consists of a line in the form:
892 consists of a line in the form:
893
893
894 capabilities: space separated list of tokens
894 capabilities: space separated list of tokens
895 '''
895 '''
896 return "capabilities: %s\n" % (capabilities(repo, proto))
896 return "capabilities: %s\n" % (capabilities(repo, proto))
897
897
898 @wireprotocommand('listkeys', 'namespace')
898 @wireprotocommand('listkeys', 'namespace')
899 def listkeys(repo, proto, namespace):
899 def listkeys(repo, proto, namespace):
900 d = repo.listkeys(encoding.tolocal(namespace)).items()
900 d = repo.listkeys(encoding.tolocal(namespace)).items()
901 return pushkeymod.encodekeys(d)
901 return pushkeymod.encodekeys(d)
902
902
903 @wireprotocommand('lookup', 'key')
903 @wireprotocommand('lookup', 'key')
904 def lookup(repo, proto, key):
904 def lookup(repo, proto, key):
905 try:
905 try:
906 k = encoding.tolocal(key)
906 k = encoding.tolocal(key)
907 c = repo[k]
907 c = repo[k]
908 r = c.hex()
908 r = c.hex()
909 success = 1
909 success = 1
910 except Exception as inst:
910 except Exception as inst:
911 r = str(inst)
911 r = str(inst)
912 success = 0
912 success = 0
913 return "%d %s\n" % (success, r)
913 return "%d %s\n" % (success, r)
914
914
915 @wireprotocommand('known', 'nodes *')
915 @wireprotocommand('known', 'nodes *')
916 def known(repo, proto, nodes, others):
916 def known(repo, proto, nodes, others):
917 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
917 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
918
918
919 @wireprotocommand('pushkey', 'namespace key old new')
919 @wireprotocommand('pushkey', 'namespace key old new')
920 def pushkey(repo, proto, namespace, key, old, new):
920 def pushkey(repo, proto, namespace, key, old, new):
921 # compatibility with pre-1.8 clients which were accidentally
921 # compatibility with pre-1.8 clients which were accidentally
922 # sending raw binary nodes rather than utf-8-encoded hex
922 # sending raw binary nodes rather than utf-8-encoded hex
923 if len(new) == 20 and util.escapestr(new) != new:
923 if len(new) == 20 and util.escapestr(new) != new:
924 # looks like it could be a binary node
924 # looks like it could be a binary node
925 try:
925 try:
926 new.decode('utf-8')
926 new.decode('utf-8')
927 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
927 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
928 except UnicodeDecodeError:
928 except UnicodeDecodeError:
929 pass # binary, leave unmodified
929 pass # binary, leave unmodified
930 else:
930 else:
931 new = encoding.tolocal(new) # normal path
931 new = encoding.tolocal(new) # normal path
932
932
933 if util.safehasattr(proto, 'restore'):
933 if util.safehasattr(proto, 'restore'):
934
934
935 proto.redirect()
935 proto.redirect()
936
936
937 try:
937 try:
938 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
938 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
939 encoding.tolocal(old), new) or False
939 encoding.tolocal(old), new) or False
940 except error.Abort:
940 except error.Abort:
941 r = False
941 r = False
942
942
943 output = proto.restore()
943 output = proto.restore()
944
944
945 return '%s\n%s' % (int(r), output)
945 return '%s\n%s' % (int(r), output)
946
946
947 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
947 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
948 encoding.tolocal(old), new)
948 encoding.tolocal(old), new)
949 return '%s\n' % int(r)
949 return '%s\n' % int(r)
950
950
951 @wireprotocommand('stream_out')
951 @wireprotocommand('stream_out')
952 def stream(repo, proto):
952 def stream(repo, proto):
953 '''If the server supports streaming clone, it advertises the "stream"
953 '''If the server supports streaming clone, it advertises the "stream"
954 capability with a value representing the version and flags of the repo
954 capability with a value representing the version and flags of the repo
955 it is serving. Client checks to see if it understands the format.
955 it is serving. Client checks to see if it understands the format.
956 '''
956 '''
957 if not streamclone.allowservergeneration(repo):
957 if not streamclone.allowservergeneration(repo):
958 return '1\n'
958 return '1\n'
959
959
960 def getstream(it):
960 def getstream(it):
961 yield '0\n'
961 yield '0\n'
962 for chunk in it:
962 for chunk in it:
963 yield chunk
963 yield chunk
964
964
965 try:
965 try:
966 # LockError may be raised before the first result is yielded. Don't
966 # LockError may be raised before the first result is yielded. Don't
967 # emit output until we're sure we got the lock successfully.
967 # emit output until we're sure we got the lock successfully.
968 it = streamclone.generatev1wireproto(repo)
968 it = streamclone.generatev1wireproto(repo)
969 return streamres(gen=getstream(it))
969 return streamres(gen=getstream(it))
970 except error.LockError:
970 except error.LockError:
971 return '2\n'
971 return '2\n'
972
972
973 @wireprotocommand('unbundle', 'heads')
973 @wireprotocommand('unbundle', 'heads')
974 def unbundle(repo, proto, heads):
974 def unbundle(repo, proto, heads):
975 their_heads = decodelist(heads)
975 their_heads = decodelist(heads)
976
976
977 try:
977 try:
978 proto.redirect()
978 proto.redirect()
979
979
980 exchange.check_heads(repo, their_heads, 'preparing changes')
980 exchange.check_heads(repo, their_heads, 'preparing changes')
981
981
982 # write bundle data to temporary file because it can be big
982 # write bundle data to temporary file because it can be big
983 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
983 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
984 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
984 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
985 r = 0
985 r = 0
986 try:
986 try:
987 proto.getfile(fp)
987 proto.getfile(fp)
988 fp.seek(0)
988 fp.seek(0)
989 gen = exchange.readbundle(repo.ui, fp, None)
989 gen = exchange.readbundle(repo.ui, fp, None)
990 if (isinstance(gen, changegroupmod.cg1unpacker)
990 if (isinstance(gen, changegroupmod.cg1unpacker)
991 and not bundle1allowed(repo, 'push')):
991 and not bundle1allowed(repo, 'push')):
992 if proto.name == 'http':
992 if proto.name == 'http':
993 # need to special case http because stderr do not get to
993 # need to special case http because stderr do not get to
994 # the http client on failed push so we need to abuse some
994 # the http client on failed push so we need to abuse some
995 # other error type to make sure the message get to the
995 # other error type to make sure the message get to the
996 # user.
996 # user.
997 return ooberror(bundle2required)
997 return ooberror(bundle2required)
998 raise error.Abort(bundle2requiredmain,
998 raise error.Abort(bundle2requiredmain,
999 hint=bundle2requiredhint)
999 hint=bundle2requiredhint)
1000
1000
1001 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1001 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1002 proto._client())
1002 proto._client())
1003 if util.safehasattr(r, 'addpart'):
1003 if util.safehasattr(r, 'addpart'):
1004 # The return looks streamable, we are in the bundle2 case and
1004 # The return looks streamable, we are in the bundle2 case and
1005 # should return a stream.
1005 # should return a stream.
1006 return streamres(gen=r.getchunks())
1006 return streamres(gen=r.getchunks())
1007 return pushres(r)
1007 return pushres(r)
1008
1008
1009 finally:
1009 finally:
1010 fp.close()
1010 fp.close()
1011 os.unlink(tempname)
1011 os.unlink(tempname)
1012
1012
1013 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1013 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1014 # handle non-bundle2 case first
1014 # handle non-bundle2 case first
1015 if not getattr(exc, 'duringunbundle2', False):
1015 if not getattr(exc, 'duringunbundle2', False):
1016 try:
1016 try:
1017 raise
1017 raise
1018 except error.Abort:
1018 except error.Abort:
1019 # The old code we moved used util.stderr directly.
1019 # The old code we moved used util.stderr directly.
1020 # We did not change it to minimise code change.
1020 # We did not change it to minimise code change.
1021 # This need to be moved to something proper.
1021 # This need to be moved to something proper.
1022 # Feel free to do it.
1022 # Feel free to do it.
1023 util.stderr.write("abort: %s\n" % exc)
1023 util.stderr.write("abort: %s\n" % exc)
1024 if exc.hint is not None:
1024 if exc.hint is not None:
1025 util.stderr.write("(%s)\n" % exc.hint)
1025 util.stderr.write("(%s)\n" % exc.hint)
1026 return pushres(0)
1026 return pushres(0)
1027 except error.PushRaced:
1027 except error.PushRaced:
1028 return pusherr(str(exc))
1028 return pusherr(str(exc))
1029
1029
1030 bundler = bundle2.bundle20(repo.ui)
1030 bundler = bundle2.bundle20(repo.ui)
1031 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1031 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1032 bundler.addpart(out)
1032 bundler.addpart(out)
1033 try:
1033 try:
1034 try:
1034 try:
1035 raise
1035 raise
1036 except error.PushkeyFailed as exc:
1036 except error.PushkeyFailed as exc:
1037 # check client caps
1037 # check client caps
1038 remotecaps = getattr(exc, '_replycaps', None)
1038 remotecaps = getattr(exc, '_replycaps', None)
1039 if (remotecaps is not None
1039 if (remotecaps is not None
1040 and 'pushkey' not in remotecaps.get('error', ())):
1040 and 'pushkey' not in remotecaps.get('error', ())):
1041 # no support remote side, fallback to Abort handler.
1041 # no support remote side, fallback to Abort handler.
1042 raise
1042 raise
1043 part = bundler.newpart('error:pushkey')
1043 part = bundler.newpart('error:pushkey')
1044 part.addparam('in-reply-to', exc.partid)
1044 part.addparam('in-reply-to', exc.partid)
1045 if exc.namespace is not None:
1045 if exc.namespace is not None:
1046 part.addparam('namespace', exc.namespace, mandatory=False)
1046 part.addparam('namespace', exc.namespace, mandatory=False)
1047 if exc.key is not None:
1047 if exc.key is not None:
1048 part.addparam('key', exc.key, mandatory=False)
1048 part.addparam('key', exc.key, mandatory=False)
1049 if exc.new is not None:
1049 if exc.new is not None:
1050 part.addparam('new', exc.new, mandatory=False)
1050 part.addparam('new', exc.new, mandatory=False)
1051 if exc.old is not None:
1051 if exc.old is not None:
1052 part.addparam('old', exc.old, mandatory=False)
1052 part.addparam('old', exc.old, mandatory=False)
1053 if exc.ret is not None:
1053 if exc.ret is not None:
1054 part.addparam('ret', exc.ret, mandatory=False)
1054 part.addparam('ret', exc.ret, mandatory=False)
1055 except error.BundleValueError as exc:
1055 except error.BundleValueError as exc:
1056 errpart = bundler.newpart('error:unsupportedcontent')
1056 errpart = bundler.newpart('error:unsupportedcontent')
1057 if exc.parttype is not None:
1057 if exc.parttype is not None:
1058 errpart.addparam('parttype', exc.parttype)
1058 errpart.addparam('parttype', exc.parttype)
1059 if exc.params:
1059 if exc.params:
1060 errpart.addparam('params', '\0'.join(exc.params))
1060 errpart.addparam('params', '\0'.join(exc.params))
1061 except error.Abort as exc:
1061 except error.Abort as exc:
1062 manargs = [('message', str(exc))]
1062 manargs = [('message', str(exc))]
1063 advargs = []
1063 advargs = []
1064 if exc.hint is not None:
1064 if exc.hint is not None:
1065 advargs.append(('hint', exc.hint))
1065 advargs.append(('hint', exc.hint))
1066 bundler.addpart(bundle2.bundlepart('error:abort',
1066 bundler.addpart(bundle2.bundlepart('error:abort',
1067 manargs, advargs))
1067 manargs, advargs))
1068 except error.PushRaced as exc:
1068 except error.PushRaced as exc:
1069 bundler.newpart('error:pushraced', [('message', str(exc))])
1069 bundler.newpart('error:pushraced', [('message', str(exc))])
1070 return streamres(gen=bundler.getchunks())
1070 return streamres(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now