##// END OF EJS Templates
wireproto: use %d to encode int, not %s...
Augie Fackler -
r34732:31fdd050 default
parent child Browse files
Show More
@@ -1,1067 +1,1067 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import os
11 import os
12 import tempfile
12 import tempfile
13
13
14 from .i18n import _
14 from .i18n import _
15 from .node import (
15 from .node import (
16 bin,
16 bin,
17 hex,
17 hex,
18 nullid,
18 nullid,
19 )
19 )
20
20
21 from . import (
21 from . import (
22 bundle2,
22 bundle2,
23 changegroup as changegroupmod,
23 changegroup as changegroupmod,
24 discovery,
24 discovery,
25 encoding,
25 encoding,
26 error,
26 error,
27 exchange,
27 exchange,
28 peer,
28 peer,
29 pushkey as pushkeymod,
29 pushkey as pushkeymod,
30 pycompat,
30 pycompat,
31 repository,
31 repository,
32 streamclone,
32 streamclone,
33 util,
33 util,
34 )
34 )
35
35
36 urlerr = util.urlerr
36 urlerr = util.urlerr
37 urlreq = util.urlreq
37 urlreq = util.urlreq
38
38
39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
41 'IncompatibleClient')
41 'IncompatibleClient')
42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
43
43
44 class abstractserverproto(object):
44 class abstractserverproto(object):
45 """abstract class that summarizes the protocol API
45 """abstract class that summarizes the protocol API
46
46
47 Used as reference and documentation.
47 Used as reference and documentation.
48 """
48 """
49
49
50 def getargs(self, args):
50 def getargs(self, args):
51 """return the value for arguments in <args>
51 """return the value for arguments in <args>
52
52
53 returns a list of values (same order as <args>)"""
53 returns a list of values (same order as <args>)"""
54 raise NotImplementedError()
54 raise NotImplementedError()
55
55
56 def getfile(self, fp):
56 def getfile(self, fp):
57 """write the whole content of a file into a file like object
57 """write the whole content of a file into a file like object
58
58
59 The file is in the form::
59 The file is in the form::
60
60
61 (<chunk-size>\n<chunk>)+0\n
61 (<chunk-size>\n<chunk>)+0\n
62
62
63 chunk size is the ascii version of the int.
63 chunk size is the ascii version of the int.
64 """
64 """
65 raise NotImplementedError()
65 raise NotImplementedError()
66
66
67 def redirect(self):
67 def redirect(self):
68 """may setup interception for stdout and stderr
68 """may setup interception for stdout and stderr
69
69
70 See also the `restore` method."""
70 See also the `restore` method."""
71 raise NotImplementedError()
71 raise NotImplementedError()
72
72
73 # If the `redirect` function does install interception, the `restore`
73 # If the `redirect` function does install interception, the `restore`
74 # function MUST be defined. If interception is not used, this function
74 # function MUST be defined. If interception is not used, this function
75 # MUST NOT be defined.
75 # MUST NOT be defined.
76 #
76 #
77 # left commented here on purpose
77 # left commented here on purpose
78 #
78 #
79 #def restore(self):
79 #def restore(self):
80 # """reinstall previous stdout and stderr and return intercepted stdout
80 # """reinstall previous stdout and stderr and return intercepted stdout
81 # """
81 # """
82 # raise NotImplementedError()
82 # raise NotImplementedError()
83
83
84 class remoteiterbatcher(peer.iterbatcher):
84 class remoteiterbatcher(peer.iterbatcher):
85 def __init__(self, remote):
85 def __init__(self, remote):
86 super(remoteiterbatcher, self).__init__()
86 super(remoteiterbatcher, self).__init__()
87 self._remote = remote
87 self._remote = remote
88
88
89 def __getattr__(self, name):
89 def __getattr__(self, name):
90 # Validate this method is batchable, since submit() only supports
90 # Validate this method is batchable, since submit() only supports
91 # batchable methods.
91 # batchable methods.
92 fn = getattr(self._remote, name)
92 fn = getattr(self._remote, name)
93 if not getattr(fn, 'batchable', None):
93 if not getattr(fn, 'batchable', None):
94 raise error.ProgrammingError('Attempted to batch a non-batchable '
94 raise error.ProgrammingError('Attempted to batch a non-batchable '
95 'call to %r' % name)
95 'call to %r' % name)
96
96
97 return super(remoteiterbatcher, self).__getattr__(name)
97 return super(remoteiterbatcher, self).__getattr__(name)
98
98
99 def submit(self):
99 def submit(self):
100 """Break the batch request into many patch calls and pipeline them.
100 """Break the batch request into many patch calls and pipeline them.
101
101
102 This is mostly valuable over http where request sizes can be
102 This is mostly valuable over http where request sizes can be
103 limited, but can be used in other places as well.
103 limited, but can be used in other places as well.
104 """
104 """
105 # 2-tuple of (command, arguments) that represents what will be
105 # 2-tuple of (command, arguments) that represents what will be
106 # sent over the wire.
106 # sent over the wire.
107 requests = []
107 requests = []
108
108
109 # 4-tuple of (command, final future, @batchable generator, remote
109 # 4-tuple of (command, final future, @batchable generator, remote
110 # future).
110 # future).
111 results = []
111 results = []
112
112
113 for command, args, opts, finalfuture in self.calls:
113 for command, args, opts, finalfuture in self.calls:
114 mtd = getattr(self._remote, command)
114 mtd = getattr(self._remote, command)
115 batchable = mtd.batchable(mtd.__self__, *args, **opts)
115 batchable = mtd.batchable(mtd.__self__, *args, **opts)
116
116
117 commandargs, fremote = next(batchable)
117 commandargs, fremote = next(batchable)
118 assert fremote
118 assert fremote
119 requests.append((command, commandargs))
119 requests.append((command, commandargs))
120 results.append((command, finalfuture, batchable, fremote))
120 results.append((command, finalfuture, batchable, fremote))
121
121
122 if requests:
122 if requests:
123 self._resultiter = self._remote._submitbatch(requests)
123 self._resultiter = self._remote._submitbatch(requests)
124
124
125 self._results = results
125 self._results = results
126
126
127 def results(self):
127 def results(self):
128 for command, finalfuture, batchable, remotefuture in self._results:
128 for command, finalfuture, batchable, remotefuture in self._results:
129 # Get the raw result, set it in the remote future, feed it
129 # Get the raw result, set it in the remote future, feed it
130 # back into the @batchable generator so it can be decoded, and
130 # back into the @batchable generator so it can be decoded, and
131 # set the result on the final future to this value.
131 # set the result on the final future to this value.
132 remoteresult = next(self._resultiter)
132 remoteresult = next(self._resultiter)
133 remotefuture.set(remoteresult)
133 remotefuture.set(remoteresult)
134 finalfuture.set(next(batchable))
134 finalfuture.set(next(batchable))
135
135
136 # Verify our @batchable generators only emit 2 values.
136 # Verify our @batchable generators only emit 2 values.
137 try:
137 try:
138 next(batchable)
138 next(batchable)
139 except StopIteration:
139 except StopIteration:
140 pass
140 pass
141 else:
141 else:
142 raise error.ProgrammingError('%s @batchable generator emitted '
142 raise error.ProgrammingError('%s @batchable generator emitted '
143 'unexpected value count' % command)
143 'unexpected value count' % command)
144
144
145 yield finalfuture.value
145 yield finalfuture.value
146
146
147 # Forward a couple of names from peer to make wireproto interactions
147 # Forward a couple of names from peer to make wireproto interactions
148 # slightly more sensible.
148 # slightly more sensible.
149 batchable = peer.batchable
149 batchable = peer.batchable
150 future = peer.future
150 future = peer.future
151
151
152 # list of nodes encoding / decoding
152 # list of nodes encoding / decoding
153
153
154 def decodelist(l, sep=' '):
154 def decodelist(l, sep=' '):
155 if l:
155 if l:
156 return [bin(v) for v in l.split(sep)]
156 return [bin(v) for v in l.split(sep)]
157 return []
157 return []
158
158
159 def encodelist(l, sep=' '):
159 def encodelist(l, sep=' '):
160 try:
160 try:
161 return sep.join(map(hex, l))
161 return sep.join(map(hex, l))
162 except TypeError:
162 except TypeError:
163 raise
163 raise
164
164
165 # batched call argument encoding
165 # batched call argument encoding
166
166
167 def escapearg(plain):
167 def escapearg(plain):
168 return (plain
168 return (plain
169 .replace(':', ':c')
169 .replace(':', ':c')
170 .replace(',', ':o')
170 .replace(',', ':o')
171 .replace(';', ':s')
171 .replace(';', ':s')
172 .replace('=', ':e'))
172 .replace('=', ':e'))
173
173
174 def unescapearg(escaped):
174 def unescapearg(escaped):
175 return (escaped
175 return (escaped
176 .replace(':e', '=')
176 .replace(':e', '=')
177 .replace(':s', ';')
177 .replace(':s', ';')
178 .replace(':o', ',')
178 .replace(':o', ',')
179 .replace(':c', ':'))
179 .replace(':c', ':'))
180
180
181 def encodebatchcmds(req):
181 def encodebatchcmds(req):
182 """Return a ``cmds`` argument value for the ``batch`` command."""
182 """Return a ``cmds`` argument value for the ``batch`` command."""
183 cmds = []
183 cmds = []
184 for op, argsdict in req:
184 for op, argsdict in req:
185 # Old servers didn't properly unescape argument names. So prevent
185 # Old servers didn't properly unescape argument names. So prevent
186 # the sending of argument names that may not be decoded properly by
186 # the sending of argument names that may not be decoded properly by
187 # servers.
187 # servers.
188 assert all(escapearg(k) == k for k in argsdict)
188 assert all(escapearg(k) == k for k in argsdict)
189
189
190 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
190 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
191 for k, v in argsdict.iteritems())
191 for k, v in argsdict.iteritems())
192 cmds.append('%s %s' % (op, args))
192 cmds.append('%s %s' % (op, args))
193
193
194 return ';'.join(cmds)
194 return ';'.join(cmds)
195
195
196 # mapping of options accepted by getbundle and their types
196 # mapping of options accepted by getbundle and their types
197 #
197 #
198 # Meant to be extended by extensions. It is extensions responsibility to ensure
198 # Meant to be extended by extensions. It is extensions responsibility to ensure
199 # such options are properly processed in exchange.getbundle.
199 # such options are properly processed in exchange.getbundle.
200 #
200 #
201 # supported types are:
201 # supported types are:
202 #
202 #
203 # :nodes: list of binary nodes
203 # :nodes: list of binary nodes
204 # :csv: list of comma-separated values
204 # :csv: list of comma-separated values
205 # :scsv: list of comma-separated values return as set
205 # :scsv: list of comma-separated values return as set
206 # :plain: string with no transformation needed.
206 # :plain: string with no transformation needed.
207 gboptsmap = {'heads': 'nodes',
207 gboptsmap = {'heads': 'nodes',
208 'common': 'nodes',
208 'common': 'nodes',
209 'obsmarkers': 'boolean',
209 'obsmarkers': 'boolean',
210 'phases': 'boolean',
210 'phases': 'boolean',
211 'bundlecaps': 'scsv',
211 'bundlecaps': 'scsv',
212 'listkeys': 'csv',
212 'listkeys': 'csv',
213 'cg': 'boolean',
213 'cg': 'boolean',
214 'cbattempted': 'boolean'}
214 'cbattempted': 'boolean'}
215
215
216 # client side
216 # client side
217
217
218 class wirepeer(repository.legacypeer):
218 class wirepeer(repository.legacypeer):
219 """Client-side interface for communicating with a peer repository.
219 """Client-side interface for communicating with a peer repository.
220
220
221 Methods commonly call wire protocol commands of the same name.
221 Methods commonly call wire protocol commands of the same name.
222
222
223 See also httppeer.py and sshpeer.py for protocol-specific
223 See also httppeer.py and sshpeer.py for protocol-specific
224 implementations of this interface.
224 implementations of this interface.
225 """
225 """
226 # Begin of basewirepeer interface.
226 # Begin of basewirepeer interface.
227
227
228 def iterbatch(self):
228 def iterbatch(self):
229 return remoteiterbatcher(self)
229 return remoteiterbatcher(self)
230
230
231 @batchable
231 @batchable
232 def lookup(self, key):
232 def lookup(self, key):
233 self.requirecap('lookup', _('look up remote revision'))
233 self.requirecap('lookup', _('look up remote revision'))
234 f = future()
234 f = future()
235 yield {'key': encoding.fromlocal(key)}, f
235 yield {'key': encoding.fromlocal(key)}, f
236 d = f.value
236 d = f.value
237 success, data = d[:-1].split(" ", 1)
237 success, data = d[:-1].split(" ", 1)
238 if int(success):
238 if int(success):
239 yield bin(data)
239 yield bin(data)
240 else:
240 else:
241 self._abort(error.RepoError(data))
241 self._abort(error.RepoError(data))
242
242
243 @batchable
243 @batchable
244 def heads(self):
244 def heads(self):
245 f = future()
245 f = future()
246 yield {}, f
246 yield {}, f
247 d = f.value
247 d = f.value
248 try:
248 try:
249 yield decodelist(d[:-1])
249 yield decodelist(d[:-1])
250 except ValueError:
250 except ValueError:
251 self._abort(error.ResponseError(_("unexpected response:"), d))
251 self._abort(error.ResponseError(_("unexpected response:"), d))
252
252
253 @batchable
253 @batchable
254 def known(self, nodes):
254 def known(self, nodes):
255 f = future()
255 f = future()
256 yield {'nodes': encodelist(nodes)}, f
256 yield {'nodes': encodelist(nodes)}, f
257 d = f.value
257 d = f.value
258 try:
258 try:
259 yield [bool(int(b)) for b in d]
259 yield [bool(int(b)) for b in d]
260 except ValueError:
260 except ValueError:
261 self._abort(error.ResponseError(_("unexpected response:"), d))
261 self._abort(error.ResponseError(_("unexpected response:"), d))
262
262
263 @batchable
263 @batchable
264 def branchmap(self):
264 def branchmap(self):
265 f = future()
265 f = future()
266 yield {}, f
266 yield {}, f
267 d = f.value
267 d = f.value
268 try:
268 try:
269 branchmap = {}
269 branchmap = {}
270 for branchpart in d.splitlines():
270 for branchpart in d.splitlines():
271 branchname, branchheads = branchpart.split(' ', 1)
271 branchname, branchheads = branchpart.split(' ', 1)
272 branchname = encoding.tolocal(urlreq.unquote(branchname))
272 branchname = encoding.tolocal(urlreq.unquote(branchname))
273 branchheads = decodelist(branchheads)
273 branchheads = decodelist(branchheads)
274 branchmap[branchname] = branchheads
274 branchmap[branchname] = branchheads
275 yield branchmap
275 yield branchmap
276 except TypeError:
276 except TypeError:
277 self._abort(error.ResponseError(_("unexpected response:"), d))
277 self._abort(error.ResponseError(_("unexpected response:"), d))
278
278
279 @batchable
279 @batchable
280 def listkeys(self, namespace):
280 def listkeys(self, namespace):
281 if not self.capable('pushkey'):
281 if not self.capable('pushkey'):
282 yield {}, None
282 yield {}, None
283 f = future()
283 f = future()
284 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
284 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
285 yield {'namespace': encoding.fromlocal(namespace)}, f
285 yield {'namespace': encoding.fromlocal(namespace)}, f
286 d = f.value
286 d = f.value
287 self.ui.debug('received listkey for "%s": %i bytes\n'
287 self.ui.debug('received listkey for "%s": %i bytes\n'
288 % (namespace, len(d)))
288 % (namespace, len(d)))
289 yield pushkeymod.decodekeys(d)
289 yield pushkeymod.decodekeys(d)
290
290
291 @batchable
291 @batchable
292 def pushkey(self, namespace, key, old, new):
292 def pushkey(self, namespace, key, old, new):
293 if not self.capable('pushkey'):
293 if not self.capable('pushkey'):
294 yield False, None
294 yield False, None
295 f = future()
295 f = future()
296 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
296 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
297 yield {'namespace': encoding.fromlocal(namespace),
297 yield {'namespace': encoding.fromlocal(namespace),
298 'key': encoding.fromlocal(key),
298 'key': encoding.fromlocal(key),
299 'old': encoding.fromlocal(old),
299 'old': encoding.fromlocal(old),
300 'new': encoding.fromlocal(new)}, f
300 'new': encoding.fromlocal(new)}, f
301 d = f.value
301 d = f.value
302 d, output = d.split('\n', 1)
302 d, output = d.split('\n', 1)
303 try:
303 try:
304 d = bool(int(d))
304 d = bool(int(d))
305 except ValueError:
305 except ValueError:
306 raise error.ResponseError(
306 raise error.ResponseError(
307 _('push failed (unexpected response):'), d)
307 _('push failed (unexpected response):'), d)
308 for l in output.splitlines(True):
308 for l in output.splitlines(True):
309 self.ui.status(_('remote: '), l)
309 self.ui.status(_('remote: '), l)
310 yield d
310 yield d
311
311
312 def stream_out(self):
312 def stream_out(self):
313 return self._callstream('stream_out')
313 return self._callstream('stream_out')
314
314
315 def getbundle(self, source, **kwargs):
315 def getbundle(self, source, **kwargs):
316 self.requirecap('getbundle', _('look up remote changes'))
316 self.requirecap('getbundle', _('look up remote changes'))
317 opts = {}
317 opts = {}
318 bundlecaps = kwargs.get('bundlecaps')
318 bundlecaps = kwargs.get('bundlecaps')
319 if bundlecaps is not None:
319 if bundlecaps is not None:
320 kwargs['bundlecaps'] = sorted(bundlecaps)
320 kwargs['bundlecaps'] = sorted(bundlecaps)
321 else:
321 else:
322 bundlecaps = () # kwargs could have it to None
322 bundlecaps = () # kwargs could have it to None
323 for key, value in kwargs.iteritems():
323 for key, value in kwargs.iteritems():
324 if value is None:
324 if value is None:
325 continue
325 continue
326 keytype = gboptsmap.get(key)
326 keytype = gboptsmap.get(key)
327 if keytype is None:
327 if keytype is None:
328 raise error.ProgrammingError(
328 raise error.ProgrammingError(
329 'Unexpectedly None keytype for key %s' % key)
329 'Unexpectedly None keytype for key %s' % key)
330 elif keytype == 'nodes':
330 elif keytype == 'nodes':
331 value = encodelist(value)
331 value = encodelist(value)
332 elif keytype in ('csv', 'scsv'):
332 elif keytype in ('csv', 'scsv'):
333 value = ','.join(value)
333 value = ','.join(value)
334 elif keytype == 'boolean':
334 elif keytype == 'boolean':
335 value = '%i' % bool(value)
335 value = '%i' % bool(value)
336 elif keytype != 'plain':
336 elif keytype != 'plain':
337 raise KeyError('unknown getbundle option type %s'
337 raise KeyError('unknown getbundle option type %s'
338 % keytype)
338 % keytype)
339 opts[key] = value
339 opts[key] = value
340 f = self._callcompressable("getbundle", **opts)
340 f = self._callcompressable("getbundle", **opts)
341 if any((cap.startswith('HG2') for cap in bundlecaps)):
341 if any((cap.startswith('HG2') for cap in bundlecaps)):
342 return bundle2.getunbundler(self.ui, f)
342 return bundle2.getunbundler(self.ui, f)
343 else:
343 else:
344 return changegroupmod.cg1unpacker(f, 'UN')
344 return changegroupmod.cg1unpacker(f, 'UN')
345
345
346 def unbundle(self, cg, heads, url):
346 def unbundle(self, cg, heads, url):
347 '''Send cg (a readable file-like object representing the
347 '''Send cg (a readable file-like object representing the
348 changegroup to push, typically a chunkbuffer object) to the
348 changegroup to push, typically a chunkbuffer object) to the
349 remote server as a bundle.
349 remote server as a bundle.
350
350
351 When pushing a bundle10 stream, return an integer indicating the
351 When pushing a bundle10 stream, return an integer indicating the
352 result of the push (see changegroup.apply()).
352 result of the push (see changegroup.apply()).
353
353
354 When pushing a bundle20 stream, return a bundle20 stream.
354 When pushing a bundle20 stream, return a bundle20 stream.
355
355
356 `url` is the url the client thinks it's pushing to, which is
356 `url` is the url the client thinks it's pushing to, which is
357 visible to hooks.
357 visible to hooks.
358 '''
358 '''
359
359
360 if heads != ['force'] and self.capable('unbundlehash'):
360 if heads != ['force'] and self.capable('unbundlehash'):
361 heads = encodelist(['hashed',
361 heads = encodelist(['hashed',
362 hashlib.sha1(''.join(sorted(heads))).digest()])
362 hashlib.sha1(''.join(sorted(heads))).digest()])
363 else:
363 else:
364 heads = encodelist(heads)
364 heads = encodelist(heads)
365
365
366 if util.safehasattr(cg, 'deltaheader'):
366 if util.safehasattr(cg, 'deltaheader'):
367 # this a bundle10, do the old style call sequence
367 # this a bundle10, do the old style call sequence
368 ret, output = self._callpush("unbundle", cg, heads=heads)
368 ret, output = self._callpush("unbundle", cg, heads=heads)
369 if ret == "":
369 if ret == "":
370 raise error.ResponseError(
370 raise error.ResponseError(
371 _('push failed:'), output)
371 _('push failed:'), output)
372 try:
372 try:
373 ret = int(ret)
373 ret = int(ret)
374 except ValueError:
374 except ValueError:
375 raise error.ResponseError(
375 raise error.ResponseError(
376 _('push failed (unexpected response):'), ret)
376 _('push failed (unexpected response):'), ret)
377
377
378 for l in output.splitlines(True):
378 for l in output.splitlines(True):
379 self.ui.status(_('remote: '), l)
379 self.ui.status(_('remote: '), l)
380 else:
380 else:
381 # bundle2 push. Send a stream, fetch a stream.
381 # bundle2 push. Send a stream, fetch a stream.
382 stream = self._calltwowaystream('unbundle', cg, heads=heads)
382 stream = self._calltwowaystream('unbundle', cg, heads=heads)
383 ret = bundle2.getunbundler(self.ui, stream)
383 ret = bundle2.getunbundler(self.ui, stream)
384 return ret
384 return ret
385
385
386 # End of basewirepeer interface.
386 # End of basewirepeer interface.
387
387
388 # Begin of baselegacywirepeer interface.
388 # Begin of baselegacywirepeer interface.
389
389
390 def branches(self, nodes):
390 def branches(self, nodes):
391 n = encodelist(nodes)
391 n = encodelist(nodes)
392 d = self._call("branches", nodes=n)
392 d = self._call("branches", nodes=n)
393 try:
393 try:
394 br = [tuple(decodelist(b)) for b in d.splitlines()]
394 br = [tuple(decodelist(b)) for b in d.splitlines()]
395 return br
395 return br
396 except ValueError:
396 except ValueError:
397 self._abort(error.ResponseError(_("unexpected response:"), d))
397 self._abort(error.ResponseError(_("unexpected response:"), d))
398
398
399 def between(self, pairs):
399 def between(self, pairs):
400 batch = 8 # avoid giant requests
400 batch = 8 # avoid giant requests
401 r = []
401 r = []
402 for i in xrange(0, len(pairs), batch):
402 for i in xrange(0, len(pairs), batch):
403 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
403 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
404 d = self._call("between", pairs=n)
404 d = self._call("between", pairs=n)
405 try:
405 try:
406 r.extend(l and decodelist(l) or [] for l in d.splitlines())
406 r.extend(l and decodelist(l) or [] for l in d.splitlines())
407 except ValueError:
407 except ValueError:
408 self._abort(error.ResponseError(_("unexpected response:"), d))
408 self._abort(error.ResponseError(_("unexpected response:"), d))
409 return r
409 return r
410
410
411 def changegroup(self, nodes, kind):
411 def changegroup(self, nodes, kind):
412 n = encodelist(nodes)
412 n = encodelist(nodes)
413 f = self._callcompressable("changegroup", roots=n)
413 f = self._callcompressable("changegroup", roots=n)
414 return changegroupmod.cg1unpacker(f, 'UN')
414 return changegroupmod.cg1unpacker(f, 'UN')
415
415
416 def changegroupsubset(self, bases, heads, kind):
416 def changegroupsubset(self, bases, heads, kind):
417 self.requirecap('changegroupsubset', _('look up remote changes'))
417 self.requirecap('changegroupsubset', _('look up remote changes'))
418 bases = encodelist(bases)
418 bases = encodelist(bases)
419 heads = encodelist(heads)
419 heads = encodelist(heads)
420 f = self._callcompressable("changegroupsubset",
420 f = self._callcompressable("changegroupsubset",
421 bases=bases, heads=heads)
421 bases=bases, heads=heads)
422 return changegroupmod.cg1unpacker(f, 'UN')
422 return changegroupmod.cg1unpacker(f, 'UN')
423
423
424 # End of baselegacywirepeer interface.
424 # End of baselegacywirepeer interface.
425
425
426 def _submitbatch(self, req):
426 def _submitbatch(self, req):
427 """run batch request <req> on the server
427 """run batch request <req> on the server
428
428
429 Returns an iterator of the raw responses from the server.
429 Returns an iterator of the raw responses from the server.
430 """
430 """
431 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
431 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
432 chunk = rsp.read(1024)
432 chunk = rsp.read(1024)
433 work = [chunk]
433 work = [chunk]
434 while chunk:
434 while chunk:
435 while ';' not in chunk and chunk:
435 while ';' not in chunk and chunk:
436 chunk = rsp.read(1024)
436 chunk = rsp.read(1024)
437 work.append(chunk)
437 work.append(chunk)
438 merged = ''.join(work)
438 merged = ''.join(work)
439 while ';' in merged:
439 while ';' in merged:
440 one, merged = merged.split(';', 1)
440 one, merged = merged.split(';', 1)
441 yield unescapearg(one)
441 yield unescapearg(one)
442 chunk = rsp.read(1024)
442 chunk = rsp.read(1024)
443 work = [merged, chunk]
443 work = [merged, chunk]
444 yield unescapearg(''.join(work))
444 yield unescapearg(''.join(work))
445
445
446 def _submitone(self, op, args):
446 def _submitone(self, op, args):
447 return self._call(op, **args)
447 return self._call(op, **args)
448
448
449 def debugwireargs(self, one, two, three=None, four=None, five=None):
449 def debugwireargs(self, one, two, three=None, four=None, five=None):
450 # don't pass optional arguments left at their default value
450 # don't pass optional arguments left at their default value
451 opts = {}
451 opts = {}
452 if three is not None:
452 if three is not None:
453 opts['three'] = three
453 opts['three'] = three
454 if four is not None:
454 if four is not None:
455 opts['four'] = four
455 opts['four'] = four
456 return self._call('debugwireargs', one=one, two=two, **opts)
456 return self._call('debugwireargs', one=one, two=two, **opts)
457
457
458 def _call(self, cmd, **args):
458 def _call(self, cmd, **args):
459 """execute <cmd> on the server
459 """execute <cmd> on the server
460
460
461 The command is expected to return a simple string.
461 The command is expected to return a simple string.
462
462
463 returns the server reply as a string."""
463 returns the server reply as a string."""
464 raise NotImplementedError()
464 raise NotImplementedError()
465
465
466 def _callstream(self, cmd, **args):
466 def _callstream(self, cmd, **args):
467 """execute <cmd> on the server
467 """execute <cmd> on the server
468
468
469 The command is expected to return a stream. Note that if the
469 The command is expected to return a stream. Note that if the
470 command doesn't return a stream, _callstream behaves
470 command doesn't return a stream, _callstream behaves
471 differently for ssh and http peers.
471 differently for ssh and http peers.
472
472
473 returns the server reply as a file like object.
473 returns the server reply as a file like object.
474 """
474 """
475 raise NotImplementedError()
475 raise NotImplementedError()
476
476
477 def _callcompressable(self, cmd, **args):
477 def _callcompressable(self, cmd, **args):
478 """execute <cmd> on the server
478 """execute <cmd> on the server
479
479
480 The command is expected to return a stream.
480 The command is expected to return a stream.
481
481
482 The stream may have been compressed in some implementations. This
482 The stream may have been compressed in some implementations. This
483 function takes care of the decompression. This is the only difference
483 function takes care of the decompression. This is the only difference
484 with _callstream.
484 with _callstream.
485
485
486 returns the server reply as a file like object.
486 returns the server reply as a file like object.
487 """
487 """
488 raise NotImplementedError()
488 raise NotImplementedError()
489
489
490 def _callpush(self, cmd, fp, **args):
490 def _callpush(self, cmd, fp, **args):
491 """execute a <cmd> on server
491 """execute a <cmd> on server
492
492
493 The command is expected to be related to a push. Push has a special
493 The command is expected to be related to a push. Push has a special
494 return method.
494 return method.
495
495
496 returns the server reply as a (ret, output) tuple. ret is either
496 returns the server reply as a (ret, output) tuple. ret is either
497 empty (error) or a stringified int.
497 empty (error) or a stringified int.
498 """
498 """
499 raise NotImplementedError()
499 raise NotImplementedError()
500
500
501 def _calltwowaystream(self, cmd, fp, **args):
501 def _calltwowaystream(self, cmd, fp, **args):
502 """execute <cmd> on server
502 """execute <cmd> on server
503
503
504 The command will send a stream to the server and get a stream in reply.
504 The command will send a stream to the server and get a stream in reply.
505 """
505 """
506 raise NotImplementedError()
506 raise NotImplementedError()
507
507
508 def _abort(self, exception):
508 def _abort(self, exception):
509 """clearly abort the wire protocol connection and raise the exception
509 """clearly abort the wire protocol connection and raise the exception
510 """
510 """
511 raise NotImplementedError()
511 raise NotImplementedError()
512
512
513 # server side
513 # server side
514
514
515 # wire protocol command can either return a string or one of these classes.
515 # wire protocol command can either return a string or one of these classes.
516 class streamres(object):
516 class streamres(object):
517 """wireproto reply: binary stream
517 """wireproto reply: binary stream
518
518
519 The call was successful and the result is a stream.
519 The call was successful and the result is a stream.
520
520
521 Accepts either a generator or an object with a ``read(size)`` method.
521 Accepts either a generator or an object with a ``read(size)`` method.
522
522
523 ``v1compressible`` indicates whether this data can be compressed to
523 ``v1compressible`` indicates whether this data can be compressed to
524 "version 1" clients (technically: HTTP peers using
524 "version 1" clients (technically: HTTP peers using
525 application/mercurial-0.1 media type). This flag should NOT be used on
525 application/mercurial-0.1 media type). This flag should NOT be used on
526 new commands because new clients should support a more modern compression
526 new commands because new clients should support a more modern compression
527 mechanism.
527 mechanism.
528 """
528 """
529 def __init__(self, gen=None, reader=None, v1compressible=False):
529 def __init__(self, gen=None, reader=None, v1compressible=False):
530 self.gen = gen
530 self.gen = gen
531 self.reader = reader
531 self.reader = reader
532 self.v1compressible = v1compressible
532 self.v1compressible = v1compressible
533
533
534 class pushres(object):
534 class pushres(object):
535 """wireproto reply: success with simple integer return
535 """wireproto reply: success with simple integer return
536
536
537 The call was successful and returned an integer contained in `self.res`.
537 The call was successful and returned an integer contained in `self.res`.
538 """
538 """
539 def __init__(self, res):
539 def __init__(self, res):
540 self.res = res
540 self.res = res
541
541
542 class pusherr(object):
542 class pusherr(object):
543 """wireproto reply: failure
543 """wireproto reply: failure
544
544
545 The call failed. The `self.res` attribute contains the error message.
545 The call failed. The `self.res` attribute contains the error message.
546 """
546 """
547 def __init__(self, res):
547 def __init__(self, res):
548 self.res = res
548 self.res = res
549
549
550 class ooberror(object):
550 class ooberror(object):
551 """wireproto reply: failure of a batch of operation
551 """wireproto reply: failure of a batch of operation
552
552
553 Something failed during a batch call. The error message is stored in
553 Something failed during a batch call. The error message is stored in
554 `self.message`.
554 `self.message`.
555 """
555 """
556 def __init__(self, message):
556 def __init__(self, message):
557 self.message = message
557 self.message = message
558
558
559 def getdispatchrepo(repo, proto, command):
559 def getdispatchrepo(repo, proto, command):
560 """Obtain the repo used for processing wire protocol commands.
560 """Obtain the repo used for processing wire protocol commands.
561
561
562 The intent of this function is to serve as a monkeypatch point for
562 The intent of this function is to serve as a monkeypatch point for
563 extensions that need commands to operate on different repo views under
563 extensions that need commands to operate on different repo views under
564 specialized circumstances.
564 specialized circumstances.
565 """
565 """
566 return repo.filtered('served')
566 return repo.filtered('served')
567
567
568 def dispatch(repo, proto, command):
568 def dispatch(repo, proto, command):
569 repo = getdispatchrepo(repo, proto, command)
569 repo = getdispatchrepo(repo, proto, command)
570 func, spec = commands[command]
570 func, spec = commands[command]
571 args = proto.getargs(spec)
571 args = proto.getargs(spec)
572 return func(repo, proto, *args)
572 return func(repo, proto, *args)
573
573
574 def options(cmd, keys, others):
574 def options(cmd, keys, others):
575 opts = {}
575 opts = {}
576 for k in keys:
576 for k in keys:
577 if k in others:
577 if k in others:
578 opts[k] = others[k]
578 opts[k] = others[k]
579 del others[k]
579 del others[k]
580 if others:
580 if others:
581 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
581 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
582 % (cmd, ",".join(others)))
582 % (cmd, ",".join(others)))
583 return opts
583 return opts
584
584
585 def bundle1allowed(repo, action):
585 def bundle1allowed(repo, action):
586 """Whether a bundle1 operation is allowed from the server.
586 """Whether a bundle1 operation is allowed from the server.
587
587
588 Priority is:
588 Priority is:
589
589
590 1. server.bundle1gd.<action> (if generaldelta active)
590 1. server.bundle1gd.<action> (if generaldelta active)
591 2. server.bundle1.<action>
591 2. server.bundle1.<action>
592 3. server.bundle1gd (if generaldelta active)
592 3. server.bundle1gd (if generaldelta active)
593 4. server.bundle1
593 4. server.bundle1
594 """
594 """
595 ui = repo.ui
595 ui = repo.ui
596 gd = 'generaldelta' in repo.requirements
596 gd = 'generaldelta' in repo.requirements
597
597
598 if gd:
598 if gd:
599 v = ui.configbool('server', 'bundle1gd.%s' % action)
599 v = ui.configbool('server', 'bundle1gd.%s' % action)
600 if v is not None:
600 if v is not None:
601 return v
601 return v
602
602
603 v = ui.configbool('server', 'bundle1.%s' % action)
603 v = ui.configbool('server', 'bundle1.%s' % action)
604 if v is not None:
604 if v is not None:
605 return v
605 return v
606
606
607 if gd:
607 if gd:
608 v = ui.configbool('server', 'bundle1gd')
608 v = ui.configbool('server', 'bundle1gd')
609 if v is not None:
609 if v is not None:
610 return v
610 return v
611
611
612 return ui.configbool('server', 'bundle1')
612 return ui.configbool('server', 'bundle1')
613
613
614 def supportedcompengines(ui, proto, role):
614 def supportedcompengines(ui, proto, role):
615 """Obtain the list of supported compression engines for a request."""
615 """Obtain the list of supported compression engines for a request."""
616 assert role in (util.CLIENTROLE, util.SERVERROLE)
616 assert role in (util.CLIENTROLE, util.SERVERROLE)
617
617
618 compengines = util.compengines.supportedwireengines(role)
618 compengines = util.compengines.supportedwireengines(role)
619
619
620 # Allow config to override default list and ordering.
620 # Allow config to override default list and ordering.
621 if role == util.SERVERROLE:
621 if role == util.SERVERROLE:
622 configengines = ui.configlist('server', 'compressionengines')
622 configengines = ui.configlist('server', 'compressionengines')
623 config = 'server.compressionengines'
623 config = 'server.compressionengines'
624 else:
624 else:
625 # This is currently implemented mainly to facilitate testing. In most
625 # This is currently implemented mainly to facilitate testing. In most
626 # cases, the server should be in charge of choosing a compression engine
626 # cases, the server should be in charge of choosing a compression engine
627 # because a server has the most to lose from a sub-optimal choice. (e.g.
627 # because a server has the most to lose from a sub-optimal choice. (e.g.
628 # CPU DoS due to an expensive engine or a network DoS due to poor
628 # CPU DoS due to an expensive engine or a network DoS due to poor
629 # compression ratio).
629 # compression ratio).
630 configengines = ui.configlist('experimental',
630 configengines = ui.configlist('experimental',
631 'clientcompressionengines')
631 'clientcompressionengines')
632 config = 'experimental.clientcompressionengines'
632 config = 'experimental.clientcompressionengines'
633
633
634 # No explicit config. Filter out the ones that aren't supposed to be
634 # No explicit config. Filter out the ones that aren't supposed to be
635 # advertised and return default ordering.
635 # advertised and return default ordering.
636 if not configengines:
636 if not configengines:
637 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
637 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
638 return [e for e in compengines
638 return [e for e in compengines
639 if getattr(e.wireprotosupport(), attr) > 0]
639 if getattr(e.wireprotosupport(), attr) > 0]
640
640
641 # If compression engines are listed in the config, assume there is a good
641 # If compression engines are listed in the config, assume there is a good
642 # reason for it (like server operators wanting to achieve specific
642 # reason for it (like server operators wanting to achieve specific
643 # performance characteristics). So fail fast if the config references
643 # performance characteristics). So fail fast if the config references
644 # unusable compression engines.
644 # unusable compression engines.
645 validnames = set(e.name() for e in compengines)
645 validnames = set(e.name() for e in compengines)
646 invalidnames = set(e for e in configengines if e not in validnames)
646 invalidnames = set(e for e in configengines if e not in validnames)
647 if invalidnames:
647 if invalidnames:
648 raise error.Abort(_('invalid compression engine defined in %s: %s') %
648 raise error.Abort(_('invalid compression engine defined in %s: %s') %
649 (config, ', '.join(sorted(invalidnames))))
649 (config, ', '.join(sorted(invalidnames))))
650
650
651 compengines = [e for e in compengines if e.name() in configengines]
651 compengines = [e for e in compengines if e.name() in configengines]
652 compengines = sorted(compengines,
652 compengines = sorted(compengines,
653 key=lambda e: configengines.index(e.name()))
653 key=lambda e: configengines.index(e.name()))
654
654
655 if not compengines:
655 if not compengines:
656 raise error.Abort(_('%s config option does not specify any known '
656 raise error.Abort(_('%s config option does not specify any known '
657 'compression engines') % config,
657 'compression engines') % config,
658 hint=_('usable compression engines: %s') %
658 hint=_('usable compression engines: %s') %
659 ', '.sorted(validnames))
659 ', '.sorted(validnames))
660
660
661 return compengines
661 return compengines
662
662
663 # list of commands
663 # list of commands
664 commands = {}
664 commands = {}
665
665
666 def wireprotocommand(name, args=''):
666 def wireprotocommand(name, args=''):
667 """decorator for wire protocol command"""
667 """decorator for wire protocol command"""
668 def register(func):
668 def register(func):
669 commands[name] = (func, args)
669 commands[name] = (func, args)
670 return func
670 return func
671 return register
671 return register
672
672
673 @wireprotocommand('batch', 'cmds *')
673 @wireprotocommand('batch', 'cmds *')
674 def batch(repo, proto, cmds, others):
674 def batch(repo, proto, cmds, others):
675 repo = repo.filtered("served")
675 repo = repo.filtered("served")
676 res = []
676 res = []
677 for pair in cmds.split(';'):
677 for pair in cmds.split(';'):
678 op, args = pair.split(' ', 1)
678 op, args = pair.split(' ', 1)
679 vals = {}
679 vals = {}
680 for a in args.split(','):
680 for a in args.split(','):
681 if a:
681 if a:
682 n, v = a.split('=')
682 n, v = a.split('=')
683 vals[unescapearg(n)] = unescapearg(v)
683 vals[unescapearg(n)] = unescapearg(v)
684 func, spec = commands[op]
684 func, spec = commands[op]
685 if spec:
685 if spec:
686 keys = spec.split()
686 keys = spec.split()
687 data = {}
687 data = {}
688 for k in keys:
688 for k in keys:
689 if k == '*':
689 if k == '*':
690 star = {}
690 star = {}
691 for key in vals.keys():
691 for key in vals.keys():
692 if key not in keys:
692 if key not in keys:
693 star[key] = vals[key]
693 star[key] = vals[key]
694 data['*'] = star
694 data['*'] = star
695 else:
695 else:
696 data[k] = vals[k]
696 data[k] = vals[k]
697 result = func(repo, proto, *[data[k] for k in keys])
697 result = func(repo, proto, *[data[k] for k in keys])
698 else:
698 else:
699 result = func(repo, proto)
699 result = func(repo, proto)
700 if isinstance(result, ooberror):
700 if isinstance(result, ooberror):
701 return result
701 return result
702 res.append(escapearg(result))
702 res.append(escapearg(result))
703 return ';'.join(res)
703 return ';'.join(res)
704
704
705 @wireprotocommand('between', 'pairs')
705 @wireprotocommand('between', 'pairs')
706 def between(repo, proto, pairs):
706 def between(repo, proto, pairs):
707 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
707 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
708 r = []
708 r = []
709 for b in repo.between(pairs):
709 for b in repo.between(pairs):
710 r.append(encodelist(b) + "\n")
710 r.append(encodelist(b) + "\n")
711 return "".join(r)
711 return "".join(r)
712
712
713 @wireprotocommand('branchmap')
713 @wireprotocommand('branchmap')
714 def branchmap(repo, proto):
714 def branchmap(repo, proto):
715 branchmap = repo.branchmap()
715 branchmap = repo.branchmap()
716 heads = []
716 heads = []
717 for branch, nodes in branchmap.iteritems():
717 for branch, nodes in branchmap.iteritems():
718 branchname = urlreq.quote(encoding.fromlocal(branch))
718 branchname = urlreq.quote(encoding.fromlocal(branch))
719 branchnodes = encodelist(nodes)
719 branchnodes = encodelist(nodes)
720 heads.append('%s %s' % (branchname, branchnodes))
720 heads.append('%s %s' % (branchname, branchnodes))
721 return '\n'.join(heads)
721 return '\n'.join(heads)
722
722
723 @wireprotocommand('branches', 'nodes')
723 @wireprotocommand('branches', 'nodes')
724 def branches(repo, proto, nodes):
724 def branches(repo, proto, nodes):
725 nodes = decodelist(nodes)
725 nodes = decodelist(nodes)
726 r = []
726 r = []
727 for b in repo.branches(nodes):
727 for b in repo.branches(nodes):
728 r.append(encodelist(b) + "\n")
728 r.append(encodelist(b) + "\n")
729 return "".join(r)
729 return "".join(r)
730
730
731 @wireprotocommand('clonebundles', '')
731 @wireprotocommand('clonebundles', '')
732 def clonebundles(repo, proto):
732 def clonebundles(repo, proto):
733 """Server command for returning info for available bundles to seed clones.
733 """Server command for returning info for available bundles to seed clones.
734
734
735 Clients will parse this response and determine what bundle to fetch.
735 Clients will parse this response and determine what bundle to fetch.
736
736
737 Extensions may wrap this command to filter or dynamically emit data
737 Extensions may wrap this command to filter or dynamically emit data
738 depending on the request. e.g. you could advertise URLs for the closest
738 depending on the request. e.g. you could advertise URLs for the closest
739 data center given the client's IP address.
739 data center given the client's IP address.
740 """
740 """
741 return repo.vfs.tryread('clonebundles.manifest')
741 return repo.vfs.tryread('clonebundles.manifest')
742
742
743 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
743 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
744 'known', 'getbundle', 'unbundlehash', 'batch']
744 'known', 'getbundle', 'unbundlehash', 'batch']
745
745
746 def _capabilities(repo, proto):
746 def _capabilities(repo, proto):
747 """return a list of capabilities for a repo
747 """return a list of capabilities for a repo
748
748
749 This function exists to allow extensions to easily wrap capabilities
749 This function exists to allow extensions to easily wrap capabilities
750 computation
750 computation
751
751
752 - returns a lists: easy to alter
752 - returns a lists: easy to alter
753 - change done here will be propagated to both `capabilities` and `hello`
753 - change done here will be propagated to both `capabilities` and `hello`
754 command without any other action needed.
754 command without any other action needed.
755 """
755 """
756 # copy to prevent modification of the global list
756 # copy to prevent modification of the global list
757 caps = list(wireprotocaps)
757 caps = list(wireprotocaps)
758 if streamclone.allowservergeneration(repo):
758 if streamclone.allowservergeneration(repo):
759 if repo.ui.configbool('server', 'preferuncompressed'):
759 if repo.ui.configbool('server', 'preferuncompressed'):
760 caps.append('stream-preferred')
760 caps.append('stream-preferred')
761 requiredformats = repo.requirements & repo.supportedformats
761 requiredformats = repo.requirements & repo.supportedformats
762 # if our local revlogs are just revlogv1, add 'stream' cap
762 # if our local revlogs are just revlogv1, add 'stream' cap
763 if not requiredformats - {'revlogv1'}:
763 if not requiredformats - {'revlogv1'}:
764 caps.append('stream')
764 caps.append('stream')
765 # otherwise, add 'streamreqs' detailing our local revlog format
765 # otherwise, add 'streamreqs' detailing our local revlog format
766 else:
766 else:
767 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
767 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
768 if repo.ui.configbool('experimental', 'bundle2-advertise'):
768 if repo.ui.configbool('experimental', 'bundle2-advertise'):
769 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
769 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
770 caps.append('bundle2=' + urlreq.quote(capsblob))
770 caps.append('bundle2=' + urlreq.quote(capsblob))
771 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
771 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
772
772
773 if proto.name == 'http':
773 if proto.name == 'http':
774 caps.append('httpheader=%d' %
774 caps.append('httpheader=%d' %
775 repo.ui.configint('server', 'maxhttpheaderlen'))
775 repo.ui.configint('server', 'maxhttpheaderlen'))
776 if repo.ui.configbool('experimental', 'httppostargs'):
776 if repo.ui.configbool('experimental', 'httppostargs'):
777 caps.append('httppostargs')
777 caps.append('httppostargs')
778
778
779 # FUTURE advertise 0.2rx once support is implemented
779 # FUTURE advertise 0.2rx once support is implemented
780 # FUTURE advertise minrx and mintx after consulting config option
780 # FUTURE advertise minrx and mintx after consulting config option
781 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
781 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
782
782
783 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
783 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
784 if compengines:
784 if compengines:
785 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
785 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
786 for e in compengines)
786 for e in compengines)
787 caps.append('compression=%s' % comptypes)
787 caps.append('compression=%s' % comptypes)
788
788
789 return caps
789 return caps
790
790
791 # If you are writing an extension and consider wrapping this function. Wrap
791 # If you are writing an extension and consider wrapping this function. Wrap
792 # `_capabilities` instead.
792 # `_capabilities` instead.
793 @wireprotocommand('capabilities')
793 @wireprotocommand('capabilities')
794 def capabilities(repo, proto):
794 def capabilities(repo, proto):
795 return ' '.join(_capabilities(repo, proto))
795 return ' '.join(_capabilities(repo, proto))
796
796
797 @wireprotocommand('changegroup', 'roots')
797 @wireprotocommand('changegroup', 'roots')
798 def changegroup(repo, proto, roots):
798 def changegroup(repo, proto, roots):
799 nodes = decodelist(roots)
799 nodes = decodelist(roots)
800 outgoing = discovery.outgoing(repo, missingroots=nodes,
800 outgoing = discovery.outgoing(repo, missingroots=nodes,
801 missingheads=repo.heads())
801 missingheads=repo.heads())
802 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
802 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
803 return streamres(reader=cg, v1compressible=True)
803 return streamres(reader=cg, v1compressible=True)
804
804
805 @wireprotocommand('changegroupsubset', 'bases heads')
805 @wireprotocommand('changegroupsubset', 'bases heads')
806 def changegroupsubset(repo, proto, bases, heads):
806 def changegroupsubset(repo, proto, bases, heads):
807 bases = decodelist(bases)
807 bases = decodelist(bases)
808 heads = decodelist(heads)
808 heads = decodelist(heads)
809 outgoing = discovery.outgoing(repo, missingroots=bases,
809 outgoing = discovery.outgoing(repo, missingroots=bases,
810 missingheads=heads)
810 missingheads=heads)
811 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
811 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
812 return streamres(reader=cg, v1compressible=True)
812 return streamres(reader=cg, v1compressible=True)
813
813
814 @wireprotocommand('debugwireargs', 'one two *')
814 @wireprotocommand('debugwireargs', 'one two *')
815 def debugwireargs(repo, proto, one, two, others):
815 def debugwireargs(repo, proto, one, two, others):
816 # only accept optional args from the known set
816 # only accept optional args from the known set
817 opts = options('debugwireargs', ['three', 'four'], others)
817 opts = options('debugwireargs', ['three', 'four'], others)
818 return repo.debugwireargs(one, two, **opts)
818 return repo.debugwireargs(one, two, **opts)
819
819
820 @wireprotocommand('getbundle', '*')
820 @wireprotocommand('getbundle', '*')
821 def getbundle(repo, proto, others):
821 def getbundle(repo, proto, others):
822 opts = options('getbundle', gboptsmap.keys(), others)
822 opts = options('getbundle', gboptsmap.keys(), others)
823 for k, v in opts.iteritems():
823 for k, v in opts.iteritems():
824 keytype = gboptsmap[k]
824 keytype = gboptsmap[k]
825 if keytype == 'nodes':
825 if keytype == 'nodes':
826 opts[k] = decodelist(v)
826 opts[k] = decodelist(v)
827 elif keytype == 'csv':
827 elif keytype == 'csv':
828 opts[k] = list(v.split(','))
828 opts[k] = list(v.split(','))
829 elif keytype == 'scsv':
829 elif keytype == 'scsv':
830 opts[k] = set(v.split(','))
830 opts[k] = set(v.split(','))
831 elif keytype == 'boolean':
831 elif keytype == 'boolean':
832 # Client should serialize False as '0', which is a non-empty string
832 # Client should serialize False as '0', which is a non-empty string
833 # so it evaluates as a True bool.
833 # so it evaluates as a True bool.
834 if v == '0':
834 if v == '0':
835 opts[k] = False
835 opts[k] = False
836 else:
836 else:
837 opts[k] = bool(v)
837 opts[k] = bool(v)
838 elif keytype != 'plain':
838 elif keytype != 'plain':
839 raise KeyError('unknown getbundle option type %s'
839 raise KeyError('unknown getbundle option type %s'
840 % keytype)
840 % keytype)
841
841
842 if not bundle1allowed(repo, 'pull'):
842 if not bundle1allowed(repo, 'pull'):
843 if not exchange.bundle2requested(opts.get('bundlecaps')):
843 if not exchange.bundle2requested(opts.get('bundlecaps')):
844 if proto.name == 'http':
844 if proto.name == 'http':
845 return ooberror(bundle2required)
845 return ooberror(bundle2required)
846 raise error.Abort(bundle2requiredmain,
846 raise error.Abort(bundle2requiredmain,
847 hint=bundle2requiredhint)
847 hint=bundle2requiredhint)
848
848
849 try:
849 try:
850 if repo.ui.configbool('server', 'disablefullbundle'):
850 if repo.ui.configbool('server', 'disablefullbundle'):
851 # Check to see if this is a full clone.
851 # Check to see if this is a full clone.
852 clheads = set(repo.changelog.heads())
852 clheads = set(repo.changelog.heads())
853 heads = set(opts.get('heads', set()))
853 heads = set(opts.get('heads', set()))
854 common = set(opts.get('common', set()))
854 common = set(opts.get('common', set()))
855 common.discard(nullid)
855 common.discard(nullid)
856 if not common and clheads == heads:
856 if not common and clheads == heads:
857 raise error.Abort(
857 raise error.Abort(
858 _('server has pull-based clones disabled'),
858 _('server has pull-based clones disabled'),
859 hint=_('remove --pull if specified or upgrade Mercurial'))
859 hint=_('remove --pull if specified or upgrade Mercurial'))
860
860
861 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
861 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
862 except error.Abort as exc:
862 except error.Abort as exc:
863 # cleanly forward Abort error to the client
863 # cleanly forward Abort error to the client
864 if not exchange.bundle2requested(opts.get('bundlecaps')):
864 if not exchange.bundle2requested(opts.get('bundlecaps')):
865 if proto.name == 'http':
865 if proto.name == 'http':
866 return ooberror(str(exc) + '\n')
866 return ooberror(str(exc) + '\n')
867 raise # cannot do better for bundle1 + ssh
867 raise # cannot do better for bundle1 + ssh
868 # bundle2 request expect a bundle2 reply
868 # bundle2 request expect a bundle2 reply
869 bundler = bundle2.bundle20(repo.ui)
869 bundler = bundle2.bundle20(repo.ui)
870 manargs = [('message', str(exc))]
870 manargs = [('message', str(exc))]
871 advargs = []
871 advargs = []
872 if exc.hint is not None:
872 if exc.hint is not None:
873 advargs.append(('hint', exc.hint))
873 advargs.append(('hint', exc.hint))
874 bundler.addpart(bundle2.bundlepart('error:abort',
874 bundler.addpart(bundle2.bundlepart('error:abort',
875 manargs, advargs))
875 manargs, advargs))
876 return streamres(gen=bundler.getchunks(), v1compressible=True)
876 return streamres(gen=bundler.getchunks(), v1compressible=True)
877 return streamres(gen=chunks, v1compressible=True)
877 return streamres(gen=chunks, v1compressible=True)
878
878
879 @wireprotocommand('heads')
879 @wireprotocommand('heads')
880 def heads(repo, proto):
880 def heads(repo, proto):
881 h = repo.heads()
881 h = repo.heads()
882 return encodelist(h) + "\n"
882 return encodelist(h) + "\n"
883
883
884 @wireprotocommand('hello')
884 @wireprotocommand('hello')
885 def hello(repo, proto):
885 def hello(repo, proto):
886 '''the hello command returns a set of lines describing various
886 '''the hello command returns a set of lines describing various
887 interesting things about the server, in an RFC822-like format.
887 interesting things about the server, in an RFC822-like format.
888 Currently the only one defined is "capabilities", which
888 Currently the only one defined is "capabilities", which
889 consists of a line in the form:
889 consists of a line in the form:
890
890
891 capabilities: space separated list of tokens
891 capabilities: space separated list of tokens
892 '''
892 '''
893 return "capabilities: %s\n" % (capabilities(repo, proto))
893 return "capabilities: %s\n" % (capabilities(repo, proto))
894
894
895 @wireprotocommand('listkeys', 'namespace')
895 @wireprotocommand('listkeys', 'namespace')
896 def listkeys(repo, proto, namespace):
896 def listkeys(repo, proto, namespace):
897 d = repo.listkeys(encoding.tolocal(namespace)).items()
897 d = repo.listkeys(encoding.tolocal(namespace)).items()
898 return pushkeymod.encodekeys(d)
898 return pushkeymod.encodekeys(d)
899
899
900 @wireprotocommand('lookup', 'key')
900 @wireprotocommand('lookup', 'key')
901 def lookup(repo, proto, key):
901 def lookup(repo, proto, key):
902 try:
902 try:
903 k = encoding.tolocal(key)
903 k = encoding.tolocal(key)
904 c = repo[k]
904 c = repo[k]
905 r = c.hex()
905 r = c.hex()
906 success = 1
906 success = 1
907 except Exception as inst:
907 except Exception as inst:
908 r = str(inst)
908 r = str(inst)
909 success = 0
909 success = 0
910 return "%s %s\n" % (success, r)
910 return "%d %s\n" % (success, r)
911
911
912 @wireprotocommand('known', 'nodes *')
912 @wireprotocommand('known', 'nodes *')
913 def known(repo, proto, nodes, others):
913 def known(repo, proto, nodes, others):
914 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
914 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
915
915
916 @wireprotocommand('pushkey', 'namespace key old new')
916 @wireprotocommand('pushkey', 'namespace key old new')
917 def pushkey(repo, proto, namespace, key, old, new):
917 def pushkey(repo, proto, namespace, key, old, new):
918 # compatibility with pre-1.8 clients which were accidentally
918 # compatibility with pre-1.8 clients which were accidentally
919 # sending raw binary nodes rather than utf-8-encoded hex
919 # sending raw binary nodes rather than utf-8-encoded hex
920 if len(new) == 20 and util.escapestr(new) != new:
920 if len(new) == 20 and util.escapestr(new) != new:
921 # looks like it could be a binary node
921 # looks like it could be a binary node
922 try:
922 try:
923 new.decode('utf-8')
923 new.decode('utf-8')
924 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
924 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
925 except UnicodeDecodeError:
925 except UnicodeDecodeError:
926 pass # binary, leave unmodified
926 pass # binary, leave unmodified
927 else:
927 else:
928 new = encoding.tolocal(new) # normal path
928 new = encoding.tolocal(new) # normal path
929
929
930 if util.safehasattr(proto, 'restore'):
930 if util.safehasattr(proto, 'restore'):
931
931
932 proto.redirect()
932 proto.redirect()
933
933
934 try:
934 try:
935 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
935 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
936 encoding.tolocal(old), new) or False
936 encoding.tolocal(old), new) or False
937 except error.Abort:
937 except error.Abort:
938 r = False
938 r = False
939
939
940 output = proto.restore()
940 output = proto.restore()
941
941
942 return '%s\n%s' % (int(r), output)
942 return '%s\n%s' % (int(r), output)
943
943
944 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
944 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
945 encoding.tolocal(old), new)
945 encoding.tolocal(old), new)
946 return '%s\n' % int(r)
946 return '%s\n' % int(r)
947
947
948 @wireprotocommand('stream_out')
948 @wireprotocommand('stream_out')
949 def stream(repo, proto):
949 def stream(repo, proto):
950 '''If the server supports streaming clone, it advertises the "stream"
950 '''If the server supports streaming clone, it advertises the "stream"
951 capability with a value representing the version and flags of the repo
951 capability with a value representing the version and flags of the repo
952 it is serving. Client checks to see if it understands the format.
952 it is serving. Client checks to see if it understands the format.
953 '''
953 '''
954 if not streamclone.allowservergeneration(repo):
954 if not streamclone.allowservergeneration(repo):
955 return '1\n'
955 return '1\n'
956
956
957 def getstream(it):
957 def getstream(it):
958 yield '0\n'
958 yield '0\n'
959 for chunk in it:
959 for chunk in it:
960 yield chunk
960 yield chunk
961
961
962 try:
962 try:
963 # LockError may be raised before the first result is yielded. Don't
963 # LockError may be raised before the first result is yielded. Don't
964 # emit output until we're sure we got the lock successfully.
964 # emit output until we're sure we got the lock successfully.
965 it = streamclone.generatev1wireproto(repo)
965 it = streamclone.generatev1wireproto(repo)
966 return streamres(gen=getstream(it))
966 return streamres(gen=getstream(it))
967 except error.LockError:
967 except error.LockError:
968 return '2\n'
968 return '2\n'
969
969
970 @wireprotocommand('unbundle', 'heads')
970 @wireprotocommand('unbundle', 'heads')
971 def unbundle(repo, proto, heads):
971 def unbundle(repo, proto, heads):
972 their_heads = decodelist(heads)
972 their_heads = decodelist(heads)
973
973
974 try:
974 try:
975 proto.redirect()
975 proto.redirect()
976
976
977 exchange.check_heads(repo, their_heads, 'preparing changes')
977 exchange.check_heads(repo, their_heads, 'preparing changes')
978
978
979 # write bundle data to temporary file because it can be big
979 # write bundle data to temporary file because it can be big
980 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
980 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
981 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
981 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
982 r = 0
982 r = 0
983 try:
983 try:
984 proto.getfile(fp)
984 proto.getfile(fp)
985 fp.seek(0)
985 fp.seek(0)
986 gen = exchange.readbundle(repo.ui, fp, None)
986 gen = exchange.readbundle(repo.ui, fp, None)
987 if (isinstance(gen, changegroupmod.cg1unpacker)
987 if (isinstance(gen, changegroupmod.cg1unpacker)
988 and not bundle1allowed(repo, 'push')):
988 and not bundle1allowed(repo, 'push')):
989 if proto.name == 'http':
989 if proto.name == 'http':
990 # need to special case http because stderr do not get to
990 # need to special case http because stderr do not get to
991 # the http client on failed push so we need to abuse some
991 # the http client on failed push so we need to abuse some
992 # other error type to make sure the message get to the
992 # other error type to make sure the message get to the
993 # user.
993 # user.
994 return ooberror(bundle2required)
994 return ooberror(bundle2required)
995 raise error.Abort(bundle2requiredmain,
995 raise error.Abort(bundle2requiredmain,
996 hint=bundle2requiredhint)
996 hint=bundle2requiredhint)
997
997
998 r = exchange.unbundle(repo, gen, their_heads, 'serve',
998 r = exchange.unbundle(repo, gen, their_heads, 'serve',
999 proto._client())
999 proto._client())
1000 if util.safehasattr(r, 'addpart'):
1000 if util.safehasattr(r, 'addpart'):
1001 # The return looks streamable, we are in the bundle2 case and
1001 # The return looks streamable, we are in the bundle2 case and
1002 # should return a stream.
1002 # should return a stream.
1003 return streamres(gen=r.getchunks())
1003 return streamres(gen=r.getchunks())
1004 return pushres(r)
1004 return pushres(r)
1005
1005
1006 finally:
1006 finally:
1007 fp.close()
1007 fp.close()
1008 os.unlink(tempname)
1008 os.unlink(tempname)
1009
1009
1010 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1010 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1011 # handle non-bundle2 case first
1011 # handle non-bundle2 case first
1012 if not getattr(exc, 'duringunbundle2', False):
1012 if not getattr(exc, 'duringunbundle2', False):
1013 try:
1013 try:
1014 raise
1014 raise
1015 except error.Abort:
1015 except error.Abort:
1016 # The old code we moved used util.stderr directly.
1016 # The old code we moved used util.stderr directly.
1017 # We did not change it to minimise code change.
1017 # We did not change it to minimise code change.
1018 # This need to be moved to something proper.
1018 # This need to be moved to something proper.
1019 # Feel free to do it.
1019 # Feel free to do it.
1020 util.stderr.write("abort: %s\n" % exc)
1020 util.stderr.write("abort: %s\n" % exc)
1021 if exc.hint is not None:
1021 if exc.hint is not None:
1022 util.stderr.write("(%s)\n" % exc.hint)
1022 util.stderr.write("(%s)\n" % exc.hint)
1023 return pushres(0)
1023 return pushres(0)
1024 except error.PushRaced:
1024 except error.PushRaced:
1025 return pusherr(str(exc))
1025 return pusherr(str(exc))
1026
1026
1027 bundler = bundle2.bundle20(repo.ui)
1027 bundler = bundle2.bundle20(repo.ui)
1028 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1028 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1029 bundler.addpart(out)
1029 bundler.addpart(out)
1030 try:
1030 try:
1031 try:
1031 try:
1032 raise
1032 raise
1033 except error.PushkeyFailed as exc:
1033 except error.PushkeyFailed as exc:
1034 # check client caps
1034 # check client caps
1035 remotecaps = getattr(exc, '_replycaps', None)
1035 remotecaps = getattr(exc, '_replycaps', None)
1036 if (remotecaps is not None
1036 if (remotecaps is not None
1037 and 'pushkey' not in remotecaps.get('error', ())):
1037 and 'pushkey' not in remotecaps.get('error', ())):
1038 # no support remote side, fallback to Abort handler.
1038 # no support remote side, fallback to Abort handler.
1039 raise
1039 raise
1040 part = bundler.newpart('error:pushkey')
1040 part = bundler.newpart('error:pushkey')
1041 part.addparam('in-reply-to', exc.partid)
1041 part.addparam('in-reply-to', exc.partid)
1042 if exc.namespace is not None:
1042 if exc.namespace is not None:
1043 part.addparam('namespace', exc.namespace, mandatory=False)
1043 part.addparam('namespace', exc.namespace, mandatory=False)
1044 if exc.key is not None:
1044 if exc.key is not None:
1045 part.addparam('key', exc.key, mandatory=False)
1045 part.addparam('key', exc.key, mandatory=False)
1046 if exc.new is not None:
1046 if exc.new is not None:
1047 part.addparam('new', exc.new, mandatory=False)
1047 part.addparam('new', exc.new, mandatory=False)
1048 if exc.old is not None:
1048 if exc.old is not None:
1049 part.addparam('old', exc.old, mandatory=False)
1049 part.addparam('old', exc.old, mandatory=False)
1050 if exc.ret is not None:
1050 if exc.ret is not None:
1051 part.addparam('ret', exc.ret, mandatory=False)
1051 part.addparam('ret', exc.ret, mandatory=False)
1052 except error.BundleValueError as exc:
1052 except error.BundleValueError as exc:
1053 errpart = bundler.newpart('error:unsupportedcontent')
1053 errpart = bundler.newpart('error:unsupportedcontent')
1054 if exc.parttype is not None:
1054 if exc.parttype is not None:
1055 errpart.addparam('parttype', exc.parttype)
1055 errpart.addparam('parttype', exc.parttype)
1056 if exc.params:
1056 if exc.params:
1057 errpart.addparam('params', '\0'.join(exc.params))
1057 errpart.addparam('params', '\0'.join(exc.params))
1058 except error.Abort as exc:
1058 except error.Abort as exc:
1059 manargs = [('message', str(exc))]
1059 manargs = [('message', str(exc))]
1060 advargs = []
1060 advargs = []
1061 if exc.hint is not None:
1061 if exc.hint is not None:
1062 advargs.append(('hint', exc.hint))
1062 advargs.append(('hint', exc.hint))
1063 bundler.addpart(bundle2.bundlepart('error:abort',
1063 bundler.addpart(bundle2.bundlepart('error:abort',
1064 manargs, advargs))
1064 manargs, advargs))
1065 except error.PushRaced as exc:
1065 except error.PushRaced as exc:
1066 bundler.newpart('error:pushraced', [('message', str(exc))])
1066 bundler.newpart('error:pushraced', [('message', str(exc))])
1067 return streamres(gen=bundler.getchunks())
1067 return streamres(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now