##// END OF EJS Templates
wireproto: bounce kwargs to/from bytes/str as needed...
Augie Fackler -
r34740:b880cc11 default
parent child Browse files
Show More
@@ -1,1067 +1,1068 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import os
11 import os
12 import tempfile
12 import tempfile
13
13
14 from .i18n import _
14 from .i18n import _
15 from .node import (
15 from .node import (
16 bin,
16 bin,
17 hex,
17 hex,
18 nullid,
18 nullid,
19 )
19 )
20
20
21 from . import (
21 from . import (
22 bundle2,
22 bundle2,
23 changegroup as changegroupmod,
23 changegroup as changegroupmod,
24 discovery,
24 discovery,
25 encoding,
25 encoding,
26 error,
26 error,
27 exchange,
27 exchange,
28 peer,
28 peer,
29 pushkey as pushkeymod,
29 pushkey as pushkeymod,
30 pycompat,
30 pycompat,
31 repository,
31 repository,
32 streamclone,
32 streamclone,
33 util,
33 util,
34 )
34 )
35
35
36 urlerr = util.urlerr
36 urlerr = util.urlerr
37 urlreq = util.urlreq
37 urlreq = util.urlreq
38
38
39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
41 'IncompatibleClient')
41 'IncompatibleClient')
42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
43
43
44 class abstractserverproto(object):
44 class abstractserverproto(object):
45 """abstract class that summarizes the protocol API
45 """abstract class that summarizes the protocol API
46
46
47 Used as reference and documentation.
47 Used as reference and documentation.
48 """
48 """
49
49
50 def getargs(self, args):
50 def getargs(self, args):
51 """return the value for arguments in <args>
51 """return the value for arguments in <args>
52
52
53 returns a list of values (same order as <args>)"""
53 returns a list of values (same order as <args>)"""
54 raise NotImplementedError()
54 raise NotImplementedError()
55
55
56 def getfile(self, fp):
56 def getfile(self, fp):
57 """write the whole content of a file into a file like object
57 """write the whole content of a file into a file like object
58
58
59 The file is in the form::
59 The file is in the form::
60
60
61 (<chunk-size>\n<chunk>)+0\n
61 (<chunk-size>\n<chunk>)+0\n
62
62
63 chunk size is the ascii version of the int.
63 chunk size is the ascii version of the int.
64 """
64 """
65 raise NotImplementedError()
65 raise NotImplementedError()
66
66
67 def redirect(self):
67 def redirect(self):
68 """may setup interception for stdout and stderr
68 """may setup interception for stdout and stderr
69
69
70 See also the `restore` method."""
70 See also the `restore` method."""
71 raise NotImplementedError()
71 raise NotImplementedError()
72
72
73 # If the `redirect` function does install interception, the `restore`
73 # If the `redirect` function does install interception, the `restore`
74 # function MUST be defined. If interception is not used, this function
74 # function MUST be defined. If interception is not used, this function
75 # MUST NOT be defined.
75 # MUST NOT be defined.
76 #
76 #
77 # left commented here on purpose
77 # left commented here on purpose
78 #
78 #
79 #def restore(self):
79 #def restore(self):
80 # """reinstall previous stdout and stderr and return intercepted stdout
80 # """reinstall previous stdout and stderr and return intercepted stdout
81 # """
81 # """
82 # raise NotImplementedError()
82 # raise NotImplementedError()
83
83
84 class remoteiterbatcher(peer.iterbatcher):
84 class remoteiterbatcher(peer.iterbatcher):
85 def __init__(self, remote):
85 def __init__(self, remote):
86 super(remoteiterbatcher, self).__init__()
86 super(remoteiterbatcher, self).__init__()
87 self._remote = remote
87 self._remote = remote
88
88
89 def __getattr__(self, name):
89 def __getattr__(self, name):
90 # Validate this method is batchable, since submit() only supports
90 # Validate this method is batchable, since submit() only supports
91 # batchable methods.
91 # batchable methods.
92 fn = getattr(self._remote, name)
92 fn = getattr(self._remote, name)
93 if not getattr(fn, 'batchable', None):
93 if not getattr(fn, 'batchable', None):
94 raise error.ProgrammingError('Attempted to batch a non-batchable '
94 raise error.ProgrammingError('Attempted to batch a non-batchable '
95 'call to %r' % name)
95 'call to %r' % name)
96
96
97 return super(remoteiterbatcher, self).__getattr__(name)
97 return super(remoteiterbatcher, self).__getattr__(name)
98
98
99 def submit(self):
99 def submit(self):
100 """Break the batch request into many patch calls and pipeline them.
100 """Break the batch request into many patch calls and pipeline them.
101
101
102 This is mostly valuable over http where request sizes can be
102 This is mostly valuable over http where request sizes can be
103 limited, but can be used in other places as well.
103 limited, but can be used in other places as well.
104 """
104 """
105 # 2-tuple of (command, arguments) that represents what will be
105 # 2-tuple of (command, arguments) that represents what will be
106 # sent over the wire.
106 # sent over the wire.
107 requests = []
107 requests = []
108
108
109 # 4-tuple of (command, final future, @batchable generator, remote
109 # 4-tuple of (command, final future, @batchable generator, remote
110 # future).
110 # future).
111 results = []
111 results = []
112
112
113 for command, args, opts, finalfuture in self.calls:
113 for command, args, opts, finalfuture in self.calls:
114 mtd = getattr(self._remote, command)
114 mtd = getattr(self._remote, command)
115 batchable = mtd.batchable(mtd.__self__, *args, **opts)
115 batchable = mtd.batchable(mtd.__self__, *args, **opts)
116
116
117 commandargs, fremote = next(batchable)
117 commandargs, fremote = next(batchable)
118 assert fremote
118 assert fremote
119 requests.append((command, commandargs))
119 requests.append((command, commandargs))
120 results.append((command, finalfuture, batchable, fremote))
120 results.append((command, finalfuture, batchable, fremote))
121
121
122 if requests:
122 if requests:
123 self._resultiter = self._remote._submitbatch(requests)
123 self._resultiter = self._remote._submitbatch(requests)
124
124
125 self._results = results
125 self._results = results
126
126
127 def results(self):
127 def results(self):
128 for command, finalfuture, batchable, remotefuture in self._results:
128 for command, finalfuture, batchable, remotefuture in self._results:
129 # Get the raw result, set it in the remote future, feed it
129 # Get the raw result, set it in the remote future, feed it
130 # back into the @batchable generator so it can be decoded, and
130 # back into the @batchable generator so it can be decoded, and
131 # set the result on the final future to this value.
131 # set the result on the final future to this value.
132 remoteresult = next(self._resultiter)
132 remoteresult = next(self._resultiter)
133 remotefuture.set(remoteresult)
133 remotefuture.set(remoteresult)
134 finalfuture.set(next(batchable))
134 finalfuture.set(next(batchable))
135
135
136 # Verify our @batchable generators only emit 2 values.
136 # Verify our @batchable generators only emit 2 values.
137 try:
137 try:
138 next(batchable)
138 next(batchable)
139 except StopIteration:
139 except StopIteration:
140 pass
140 pass
141 else:
141 else:
142 raise error.ProgrammingError('%s @batchable generator emitted '
142 raise error.ProgrammingError('%s @batchable generator emitted '
143 'unexpected value count' % command)
143 'unexpected value count' % command)
144
144
145 yield finalfuture.value
145 yield finalfuture.value
146
146
147 # Forward a couple of names from peer to make wireproto interactions
147 # Forward a couple of names from peer to make wireproto interactions
148 # slightly more sensible.
148 # slightly more sensible.
149 batchable = peer.batchable
149 batchable = peer.batchable
150 future = peer.future
150 future = peer.future
151
151
152 # list of nodes encoding / decoding
152 # list of nodes encoding / decoding
153
153
154 def decodelist(l, sep=' '):
154 def decodelist(l, sep=' '):
155 if l:
155 if l:
156 return [bin(v) for v in l.split(sep)]
156 return [bin(v) for v in l.split(sep)]
157 return []
157 return []
158
158
159 def encodelist(l, sep=' '):
159 def encodelist(l, sep=' '):
160 try:
160 try:
161 return sep.join(map(hex, l))
161 return sep.join(map(hex, l))
162 except TypeError:
162 except TypeError:
163 raise
163 raise
164
164
165 # batched call argument encoding
165 # batched call argument encoding
166
166
167 def escapearg(plain):
167 def escapearg(plain):
168 return (plain
168 return (plain
169 .replace(':', ':c')
169 .replace(':', ':c')
170 .replace(',', ':o')
170 .replace(',', ':o')
171 .replace(';', ':s')
171 .replace(';', ':s')
172 .replace('=', ':e'))
172 .replace('=', ':e'))
173
173
174 def unescapearg(escaped):
174 def unescapearg(escaped):
175 return (escaped
175 return (escaped
176 .replace(':e', '=')
176 .replace(':e', '=')
177 .replace(':s', ';')
177 .replace(':s', ';')
178 .replace(':o', ',')
178 .replace(':o', ',')
179 .replace(':c', ':'))
179 .replace(':c', ':'))
180
180
181 def encodebatchcmds(req):
181 def encodebatchcmds(req):
182 """Return a ``cmds`` argument value for the ``batch`` command."""
182 """Return a ``cmds`` argument value for the ``batch`` command."""
183 cmds = []
183 cmds = []
184 for op, argsdict in req:
184 for op, argsdict in req:
185 # Old servers didn't properly unescape argument names. So prevent
185 # Old servers didn't properly unescape argument names. So prevent
186 # the sending of argument names that may not be decoded properly by
186 # the sending of argument names that may not be decoded properly by
187 # servers.
187 # servers.
188 assert all(escapearg(k) == k for k in argsdict)
188 assert all(escapearg(k) == k for k in argsdict)
189
189
190 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
190 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
191 for k, v in argsdict.iteritems())
191 for k, v in argsdict.iteritems())
192 cmds.append('%s %s' % (op, args))
192 cmds.append('%s %s' % (op, args))
193
193
194 return ';'.join(cmds)
194 return ';'.join(cmds)
195
195
196 # mapping of options accepted by getbundle and their types
196 # mapping of options accepted by getbundle and their types
197 #
197 #
198 # Meant to be extended by extensions. It is extensions responsibility to ensure
198 # Meant to be extended by extensions. It is extensions responsibility to ensure
199 # such options are properly processed in exchange.getbundle.
199 # such options are properly processed in exchange.getbundle.
200 #
200 #
201 # supported types are:
201 # supported types are:
202 #
202 #
203 # :nodes: list of binary nodes
203 # :nodes: list of binary nodes
204 # :csv: list of comma-separated values
204 # :csv: list of comma-separated values
205 # :scsv: list of comma-separated values return as set
205 # :scsv: list of comma-separated values return as set
206 # :plain: string with no transformation needed.
206 # :plain: string with no transformation needed.
207 gboptsmap = {'heads': 'nodes',
207 gboptsmap = {'heads': 'nodes',
208 'common': 'nodes',
208 'common': 'nodes',
209 'obsmarkers': 'boolean',
209 'obsmarkers': 'boolean',
210 'phases': 'boolean',
210 'phases': 'boolean',
211 'bundlecaps': 'scsv',
211 'bundlecaps': 'scsv',
212 'listkeys': 'csv',
212 'listkeys': 'csv',
213 'cg': 'boolean',
213 'cg': 'boolean',
214 'cbattempted': 'boolean'}
214 'cbattempted': 'boolean'}
215
215
216 # client side
216 # client side
217
217
218 class wirepeer(repository.legacypeer):
218 class wirepeer(repository.legacypeer):
219 """Client-side interface for communicating with a peer repository.
219 """Client-side interface for communicating with a peer repository.
220
220
221 Methods commonly call wire protocol commands of the same name.
221 Methods commonly call wire protocol commands of the same name.
222
222
223 See also httppeer.py and sshpeer.py for protocol-specific
223 See also httppeer.py and sshpeer.py for protocol-specific
224 implementations of this interface.
224 implementations of this interface.
225 """
225 """
226 # Begin of basewirepeer interface.
226 # Begin of basewirepeer interface.
227
227
228 def iterbatch(self):
228 def iterbatch(self):
229 return remoteiterbatcher(self)
229 return remoteiterbatcher(self)
230
230
231 @batchable
231 @batchable
232 def lookup(self, key):
232 def lookup(self, key):
233 self.requirecap('lookup', _('look up remote revision'))
233 self.requirecap('lookup', _('look up remote revision'))
234 f = future()
234 f = future()
235 yield {'key': encoding.fromlocal(key)}, f
235 yield {'key': encoding.fromlocal(key)}, f
236 d = f.value
236 d = f.value
237 success, data = d[:-1].split(" ", 1)
237 success, data = d[:-1].split(" ", 1)
238 if int(success):
238 if int(success):
239 yield bin(data)
239 yield bin(data)
240 else:
240 else:
241 self._abort(error.RepoError(data))
241 self._abort(error.RepoError(data))
242
242
243 @batchable
243 @batchable
244 def heads(self):
244 def heads(self):
245 f = future()
245 f = future()
246 yield {}, f
246 yield {}, f
247 d = f.value
247 d = f.value
248 try:
248 try:
249 yield decodelist(d[:-1])
249 yield decodelist(d[:-1])
250 except ValueError:
250 except ValueError:
251 self._abort(error.ResponseError(_("unexpected response:"), d))
251 self._abort(error.ResponseError(_("unexpected response:"), d))
252
252
253 @batchable
253 @batchable
254 def known(self, nodes):
254 def known(self, nodes):
255 f = future()
255 f = future()
256 yield {'nodes': encodelist(nodes)}, f
256 yield {'nodes': encodelist(nodes)}, f
257 d = f.value
257 d = f.value
258 try:
258 try:
259 yield [bool(int(b)) for b in d]
259 yield [bool(int(b)) for b in d]
260 except ValueError:
260 except ValueError:
261 self._abort(error.ResponseError(_("unexpected response:"), d))
261 self._abort(error.ResponseError(_("unexpected response:"), d))
262
262
263 @batchable
263 @batchable
264 def branchmap(self):
264 def branchmap(self):
265 f = future()
265 f = future()
266 yield {}, f
266 yield {}, f
267 d = f.value
267 d = f.value
268 try:
268 try:
269 branchmap = {}
269 branchmap = {}
270 for branchpart in d.splitlines():
270 for branchpart in d.splitlines():
271 branchname, branchheads = branchpart.split(' ', 1)
271 branchname, branchheads = branchpart.split(' ', 1)
272 branchname = encoding.tolocal(urlreq.unquote(branchname))
272 branchname = encoding.tolocal(urlreq.unquote(branchname))
273 branchheads = decodelist(branchheads)
273 branchheads = decodelist(branchheads)
274 branchmap[branchname] = branchheads
274 branchmap[branchname] = branchheads
275 yield branchmap
275 yield branchmap
276 except TypeError:
276 except TypeError:
277 self._abort(error.ResponseError(_("unexpected response:"), d))
277 self._abort(error.ResponseError(_("unexpected response:"), d))
278
278
279 @batchable
279 @batchable
280 def listkeys(self, namespace):
280 def listkeys(self, namespace):
281 if not self.capable('pushkey'):
281 if not self.capable('pushkey'):
282 yield {}, None
282 yield {}, None
283 f = future()
283 f = future()
284 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
284 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
285 yield {'namespace': encoding.fromlocal(namespace)}, f
285 yield {'namespace': encoding.fromlocal(namespace)}, f
286 d = f.value
286 d = f.value
287 self.ui.debug('received listkey for "%s": %i bytes\n'
287 self.ui.debug('received listkey for "%s": %i bytes\n'
288 % (namespace, len(d)))
288 % (namespace, len(d)))
289 yield pushkeymod.decodekeys(d)
289 yield pushkeymod.decodekeys(d)
290
290
291 @batchable
291 @batchable
292 def pushkey(self, namespace, key, old, new):
292 def pushkey(self, namespace, key, old, new):
293 if not self.capable('pushkey'):
293 if not self.capable('pushkey'):
294 yield False, None
294 yield False, None
295 f = future()
295 f = future()
296 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
296 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
297 yield {'namespace': encoding.fromlocal(namespace),
297 yield {'namespace': encoding.fromlocal(namespace),
298 'key': encoding.fromlocal(key),
298 'key': encoding.fromlocal(key),
299 'old': encoding.fromlocal(old),
299 'old': encoding.fromlocal(old),
300 'new': encoding.fromlocal(new)}, f
300 'new': encoding.fromlocal(new)}, f
301 d = f.value
301 d = f.value
302 d, output = d.split('\n', 1)
302 d, output = d.split('\n', 1)
303 try:
303 try:
304 d = bool(int(d))
304 d = bool(int(d))
305 except ValueError:
305 except ValueError:
306 raise error.ResponseError(
306 raise error.ResponseError(
307 _('push failed (unexpected response):'), d)
307 _('push failed (unexpected response):'), d)
308 for l in output.splitlines(True):
308 for l in output.splitlines(True):
309 self.ui.status(_('remote: '), l)
309 self.ui.status(_('remote: '), l)
310 yield d
310 yield d
311
311
312 def stream_out(self):
312 def stream_out(self):
313 return self._callstream('stream_out')
313 return self._callstream('stream_out')
314
314
315 def getbundle(self, source, **kwargs):
315 def getbundle(self, source, **kwargs):
316 kwargs = pycompat.byteskwargs(kwargs)
316 self.requirecap('getbundle', _('look up remote changes'))
317 self.requirecap('getbundle', _('look up remote changes'))
317 opts = {}
318 opts = {}
318 bundlecaps = kwargs.get('bundlecaps')
319 bundlecaps = kwargs.get('bundlecaps')
319 if bundlecaps is not None:
320 if bundlecaps is not None:
320 kwargs['bundlecaps'] = sorted(bundlecaps)
321 kwargs['bundlecaps'] = sorted(bundlecaps)
321 else:
322 else:
322 bundlecaps = () # kwargs could have it to None
323 bundlecaps = () # kwargs could have it to None
323 for key, value in kwargs.iteritems():
324 for key, value in kwargs.iteritems():
324 if value is None:
325 if value is None:
325 continue
326 continue
326 keytype = gboptsmap.get(key)
327 keytype = gboptsmap.get(key)
327 if keytype is None:
328 if keytype is None:
328 raise error.ProgrammingError(
329 raise error.ProgrammingError(
329 'Unexpectedly None keytype for key %s' % key)
330 'Unexpectedly None keytype for key %s' % key)
330 elif keytype == 'nodes':
331 elif keytype == 'nodes':
331 value = encodelist(value)
332 value = encodelist(value)
332 elif keytype in ('csv', 'scsv'):
333 elif keytype in ('csv', 'scsv'):
333 value = ','.join(value)
334 value = ','.join(value)
334 elif keytype == 'boolean':
335 elif keytype == 'boolean':
335 value = '%i' % bool(value)
336 value = '%i' % bool(value)
336 elif keytype != 'plain':
337 elif keytype != 'plain':
337 raise KeyError('unknown getbundle option type %s'
338 raise KeyError('unknown getbundle option type %s'
338 % keytype)
339 % keytype)
339 opts[key] = value
340 opts[key] = value
340 f = self._callcompressable("getbundle", **opts)
341 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
341 if any((cap.startswith('HG2') for cap in bundlecaps)):
342 if any((cap.startswith('HG2') for cap in bundlecaps)):
342 return bundle2.getunbundler(self.ui, f)
343 return bundle2.getunbundler(self.ui, f)
343 else:
344 else:
344 return changegroupmod.cg1unpacker(f, 'UN')
345 return changegroupmod.cg1unpacker(f, 'UN')
345
346
346 def unbundle(self, cg, heads, url):
347 def unbundle(self, cg, heads, url):
347 '''Send cg (a readable file-like object representing the
348 '''Send cg (a readable file-like object representing the
348 changegroup to push, typically a chunkbuffer object) to the
349 changegroup to push, typically a chunkbuffer object) to the
349 remote server as a bundle.
350 remote server as a bundle.
350
351
351 When pushing a bundle10 stream, return an integer indicating the
352 When pushing a bundle10 stream, return an integer indicating the
352 result of the push (see changegroup.apply()).
353 result of the push (see changegroup.apply()).
353
354
354 When pushing a bundle20 stream, return a bundle20 stream.
355 When pushing a bundle20 stream, return a bundle20 stream.
355
356
356 `url` is the url the client thinks it's pushing to, which is
357 `url` is the url the client thinks it's pushing to, which is
357 visible to hooks.
358 visible to hooks.
358 '''
359 '''
359
360
360 if heads != ['force'] and self.capable('unbundlehash'):
361 if heads != ['force'] and self.capable('unbundlehash'):
361 heads = encodelist(['hashed',
362 heads = encodelist(['hashed',
362 hashlib.sha1(''.join(sorted(heads))).digest()])
363 hashlib.sha1(''.join(sorted(heads))).digest()])
363 else:
364 else:
364 heads = encodelist(heads)
365 heads = encodelist(heads)
365
366
366 if util.safehasattr(cg, 'deltaheader'):
367 if util.safehasattr(cg, 'deltaheader'):
367 # this a bundle10, do the old style call sequence
368 # this a bundle10, do the old style call sequence
368 ret, output = self._callpush("unbundle", cg, heads=heads)
369 ret, output = self._callpush("unbundle", cg, heads=heads)
369 if ret == "":
370 if ret == "":
370 raise error.ResponseError(
371 raise error.ResponseError(
371 _('push failed:'), output)
372 _('push failed:'), output)
372 try:
373 try:
373 ret = int(ret)
374 ret = int(ret)
374 except ValueError:
375 except ValueError:
375 raise error.ResponseError(
376 raise error.ResponseError(
376 _('push failed (unexpected response):'), ret)
377 _('push failed (unexpected response):'), ret)
377
378
378 for l in output.splitlines(True):
379 for l in output.splitlines(True):
379 self.ui.status(_('remote: '), l)
380 self.ui.status(_('remote: '), l)
380 else:
381 else:
381 # bundle2 push. Send a stream, fetch a stream.
382 # bundle2 push. Send a stream, fetch a stream.
382 stream = self._calltwowaystream('unbundle', cg, heads=heads)
383 stream = self._calltwowaystream('unbundle', cg, heads=heads)
383 ret = bundle2.getunbundler(self.ui, stream)
384 ret = bundle2.getunbundler(self.ui, stream)
384 return ret
385 return ret
385
386
386 # End of basewirepeer interface.
387 # End of basewirepeer interface.
387
388
388 # Begin of baselegacywirepeer interface.
389 # Begin of baselegacywirepeer interface.
389
390
390 def branches(self, nodes):
391 def branches(self, nodes):
391 n = encodelist(nodes)
392 n = encodelist(nodes)
392 d = self._call("branches", nodes=n)
393 d = self._call("branches", nodes=n)
393 try:
394 try:
394 br = [tuple(decodelist(b)) for b in d.splitlines()]
395 br = [tuple(decodelist(b)) for b in d.splitlines()]
395 return br
396 return br
396 except ValueError:
397 except ValueError:
397 self._abort(error.ResponseError(_("unexpected response:"), d))
398 self._abort(error.ResponseError(_("unexpected response:"), d))
398
399
399 def between(self, pairs):
400 def between(self, pairs):
400 batch = 8 # avoid giant requests
401 batch = 8 # avoid giant requests
401 r = []
402 r = []
402 for i in xrange(0, len(pairs), batch):
403 for i in xrange(0, len(pairs), batch):
403 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
404 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
404 d = self._call("between", pairs=n)
405 d = self._call("between", pairs=n)
405 try:
406 try:
406 r.extend(l and decodelist(l) or [] for l in d.splitlines())
407 r.extend(l and decodelist(l) or [] for l in d.splitlines())
407 except ValueError:
408 except ValueError:
408 self._abort(error.ResponseError(_("unexpected response:"), d))
409 self._abort(error.ResponseError(_("unexpected response:"), d))
409 return r
410 return r
410
411
411 def changegroup(self, nodes, kind):
412 def changegroup(self, nodes, kind):
412 n = encodelist(nodes)
413 n = encodelist(nodes)
413 f = self._callcompressable("changegroup", roots=n)
414 f = self._callcompressable("changegroup", roots=n)
414 return changegroupmod.cg1unpacker(f, 'UN')
415 return changegroupmod.cg1unpacker(f, 'UN')
415
416
416 def changegroupsubset(self, bases, heads, kind):
417 def changegroupsubset(self, bases, heads, kind):
417 self.requirecap('changegroupsubset', _('look up remote changes'))
418 self.requirecap('changegroupsubset', _('look up remote changes'))
418 bases = encodelist(bases)
419 bases = encodelist(bases)
419 heads = encodelist(heads)
420 heads = encodelist(heads)
420 f = self._callcompressable("changegroupsubset",
421 f = self._callcompressable("changegroupsubset",
421 bases=bases, heads=heads)
422 bases=bases, heads=heads)
422 return changegroupmod.cg1unpacker(f, 'UN')
423 return changegroupmod.cg1unpacker(f, 'UN')
423
424
424 # End of baselegacywirepeer interface.
425 # End of baselegacywirepeer interface.
425
426
426 def _submitbatch(self, req):
427 def _submitbatch(self, req):
427 """run batch request <req> on the server
428 """run batch request <req> on the server
428
429
429 Returns an iterator of the raw responses from the server.
430 Returns an iterator of the raw responses from the server.
430 """
431 """
431 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
432 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
432 chunk = rsp.read(1024)
433 chunk = rsp.read(1024)
433 work = [chunk]
434 work = [chunk]
434 while chunk:
435 while chunk:
435 while ';' not in chunk and chunk:
436 while ';' not in chunk and chunk:
436 chunk = rsp.read(1024)
437 chunk = rsp.read(1024)
437 work.append(chunk)
438 work.append(chunk)
438 merged = ''.join(work)
439 merged = ''.join(work)
439 while ';' in merged:
440 while ';' in merged:
440 one, merged = merged.split(';', 1)
441 one, merged = merged.split(';', 1)
441 yield unescapearg(one)
442 yield unescapearg(one)
442 chunk = rsp.read(1024)
443 chunk = rsp.read(1024)
443 work = [merged, chunk]
444 work = [merged, chunk]
444 yield unescapearg(''.join(work))
445 yield unescapearg(''.join(work))
445
446
446 def _submitone(self, op, args):
447 def _submitone(self, op, args):
447 return self._call(op, **args)
448 return self._call(op, **pycompat.strkwargs(args))
448
449
449 def debugwireargs(self, one, two, three=None, four=None, five=None):
450 def debugwireargs(self, one, two, three=None, four=None, five=None):
450 # don't pass optional arguments left at their default value
451 # don't pass optional arguments left at their default value
451 opts = {}
452 opts = {}
452 if three is not None:
453 if three is not None:
453 opts['three'] = three
454 opts['three'] = three
454 if four is not None:
455 if four is not None:
455 opts['four'] = four
456 opts['four'] = four
456 return self._call('debugwireargs', one=one, two=two, **opts)
457 return self._call('debugwireargs', one=one, two=two, **opts)
457
458
458 def _call(self, cmd, **args):
459 def _call(self, cmd, **args):
459 """execute <cmd> on the server
460 """execute <cmd> on the server
460
461
461 The command is expected to return a simple string.
462 The command is expected to return a simple string.
462
463
463 returns the server reply as a string."""
464 returns the server reply as a string."""
464 raise NotImplementedError()
465 raise NotImplementedError()
465
466
466 def _callstream(self, cmd, **args):
467 def _callstream(self, cmd, **args):
467 """execute <cmd> on the server
468 """execute <cmd> on the server
468
469
469 The command is expected to return a stream. Note that if the
470 The command is expected to return a stream. Note that if the
470 command doesn't return a stream, _callstream behaves
471 command doesn't return a stream, _callstream behaves
471 differently for ssh and http peers.
472 differently for ssh and http peers.
472
473
473 returns the server reply as a file like object.
474 returns the server reply as a file like object.
474 """
475 """
475 raise NotImplementedError()
476 raise NotImplementedError()
476
477
477 def _callcompressable(self, cmd, **args):
478 def _callcompressable(self, cmd, **args):
478 """execute <cmd> on the server
479 """execute <cmd> on the server
479
480
480 The command is expected to return a stream.
481 The command is expected to return a stream.
481
482
482 The stream may have been compressed in some implementations. This
483 The stream may have been compressed in some implementations. This
483 function takes care of the decompression. This is the only difference
484 function takes care of the decompression. This is the only difference
484 with _callstream.
485 with _callstream.
485
486
486 returns the server reply as a file like object.
487 returns the server reply as a file like object.
487 """
488 """
488 raise NotImplementedError()
489 raise NotImplementedError()
489
490
490 def _callpush(self, cmd, fp, **args):
491 def _callpush(self, cmd, fp, **args):
491 """execute a <cmd> on server
492 """execute a <cmd> on server
492
493
493 The command is expected to be related to a push. Push has a special
494 The command is expected to be related to a push. Push has a special
494 return method.
495 return method.
495
496
496 returns the server reply as a (ret, output) tuple. ret is either
497 returns the server reply as a (ret, output) tuple. ret is either
497 empty (error) or a stringified int.
498 empty (error) or a stringified int.
498 """
499 """
499 raise NotImplementedError()
500 raise NotImplementedError()
500
501
501 def _calltwowaystream(self, cmd, fp, **args):
502 def _calltwowaystream(self, cmd, fp, **args):
502 """execute <cmd> on server
503 """execute <cmd> on server
503
504
504 The command will send a stream to the server and get a stream in reply.
505 The command will send a stream to the server and get a stream in reply.
505 """
506 """
506 raise NotImplementedError()
507 raise NotImplementedError()
507
508
508 def _abort(self, exception):
509 def _abort(self, exception):
509 """clearly abort the wire protocol connection and raise the exception
510 """clearly abort the wire protocol connection and raise the exception
510 """
511 """
511 raise NotImplementedError()
512 raise NotImplementedError()
512
513
513 # server side
514 # server side
514
515
515 # wire protocol command can either return a string or one of these classes.
516 # wire protocol command can either return a string or one of these classes.
516 class streamres(object):
517 class streamres(object):
517 """wireproto reply: binary stream
518 """wireproto reply: binary stream
518
519
519 The call was successful and the result is a stream.
520 The call was successful and the result is a stream.
520
521
521 Accepts either a generator or an object with a ``read(size)`` method.
522 Accepts either a generator or an object with a ``read(size)`` method.
522
523
523 ``v1compressible`` indicates whether this data can be compressed to
524 ``v1compressible`` indicates whether this data can be compressed to
524 "version 1" clients (technically: HTTP peers using
525 "version 1" clients (technically: HTTP peers using
525 application/mercurial-0.1 media type). This flag should NOT be used on
526 application/mercurial-0.1 media type). This flag should NOT be used on
526 new commands because new clients should support a more modern compression
527 new commands because new clients should support a more modern compression
527 mechanism.
528 mechanism.
528 """
529 """
529 def __init__(self, gen=None, reader=None, v1compressible=False):
530 def __init__(self, gen=None, reader=None, v1compressible=False):
530 self.gen = gen
531 self.gen = gen
531 self.reader = reader
532 self.reader = reader
532 self.v1compressible = v1compressible
533 self.v1compressible = v1compressible
533
534
534 class pushres(object):
535 class pushres(object):
535 """wireproto reply: success with simple integer return
536 """wireproto reply: success with simple integer return
536
537
537 The call was successful and returned an integer contained in `self.res`.
538 The call was successful and returned an integer contained in `self.res`.
538 """
539 """
539 def __init__(self, res):
540 def __init__(self, res):
540 self.res = res
541 self.res = res
541
542
542 class pusherr(object):
543 class pusherr(object):
543 """wireproto reply: failure
544 """wireproto reply: failure
544
545
545 The call failed. The `self.res` attribute contains the error message.
546 The call failed. The `self.res` attribute contains the error message.
546 """
547 """
547 def __init__(self, res):
548 def __init__(self, res):
548 self.res = res
549 self.res = res
549
550
550 class ooberror(object):
551 class ooberror(object):
551 """wireproto reply: failure of a batch of operation
552 """wireproto reply: failure of a batch of operation
552
553
553 Something failed during a batch call. The error message is stored in
554 Something failed during a batch call. The error message is stored in
554 `self.message`.
555 `self.message`.
555 """
556 """
556 def __init__(self, message):
557 def __init__(self, message):
557 self.message = message
558 self.message = message
558
559
559 def getdispatchrepo(repo, proto, command):
560 def getdispatchrepo(repo, proto, command):
560 """Obtain the repo used for processing wire protocol commands.
561 """Obtain the repo used for processing wire protocol commands.
561
562
562 The intent of this function is to serve as a monkeypatch point for
563 The intent of this function is to serve as a monkeypatch point for
563 extensions that need commands to operate on different repo views under
564 extensions that need commands to operate on different repo views under
564 specialized circumstances.
565 specialized circumstances.
565 """
566 """
566 return repo.filtered('served')
567 return repo.filtered('served')
567
568
568 def dispatch(repo, proto, command):
569 def dispatch(repo, proto, command):
569 repo = getdispatchrepo(repo, proto, command)
570 repo = getdispatchrepo(repo, proto, command)
570 func, spec = commands[command]
571 func, spec = commands[command]
571 args = proto.getargs(spec)
572 args = proto.getargs(spec)
572 return func(repo, proto, *args)
573 return func(repo, proto, *args)
573
574
574 def options(cmd, keys, others):
575 def options(cmd, keys, others):
575 opts = {}
576 opts = {}
576 for k in keys:
577 for k in keys:
577 if k in others:
578 if k in others:
578 opts[k] = others[k]
579 opts[k] = others[k]
579 del others[k]
580 del others[k]
580 if others:
581 if others:
581 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
582 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
582 % (cmd, ",".join(others)))
583 % (cmd, ",".join(others)))
583 return opts
584 return opts
584
585
585 def bundle1allowed(repo, action):
586 def bundle1allowed(repo, action):
586 """Whether a bundle1 operation is allowed from the server.
587 """Whether a bundle1 operation is allowed from the server.
587
588
588 Priority is:
589 Priority is:
589
590
590 1. server.bundle1gd.<action> (if generaldelta active)
591 1. server.bundle1gd.<action> (if generaldelta active)
591 2. server.bundle1.<action>
592 2. server.bundle1.<action>
592 3. server.bundle1gd (if generaldelta active)
593 3. server.bundle1gd (if generaldelta active)
593 4. server.bundle1
594 4. server.bundle1
594 """
595 """
595 ui = repo.ui
596 ui = repo.ui
596 gd = 'generaldelta' in repo.requirements
597 gd = 'generaldelta' in repo.requirements
597
598
598 if gd:
599 if gd:
599 v = ui.configbool('server', 'bundle1gd.%s' % action)
600 v = ui.configbool('server', 'bundle1gd.%s' % action)
600 if v is not None:
601 if v is not None:
601 return v
602 return v
602
603
603 v = ui.configbool('server', 'bundle1.%s' % action)
604 v = ui.configbool('server', 'bundle1.%s' % action)
604 if v is not None:
605 if v is not None:
605 return v
606 return v
606
607
607 if gd:
608 if gd:
608 v = ui.configbool('server', 'bundle1gd')
609 v = ui.configbool('server', 'bundle1gd')
609 if v is not None:
610 if v is not None:
610 return v
611 return v
611
612
612 return ui.configbool('server', 'bundle1')
613 return ui.configbool('server', 'bundle1')
613
614
614 def supportedcompengines(ui, proto, role):
615 def supportedcompengines(ui, proto, role):
615 """Obtain the list of supported compression engines for a request."""
616 """Obtain the list of supported compression engines for a request."""
616 assert role in (util.CLIENTROLE, util.SERVERROLE)
617 assert role in (util.CLIENTROLE, util.SERVERROLE)
617
618
618 compengines = util.compengines.supportedwireengines(role)
619 compengines = util.compengines.supportedwireengines(role)
619
620
620 # Allow config to override default list and ordering.
621 # Allow config to override default list and ordering.
621 if role == util.SERVERROLE:
622 if role == util.SERVERROLE:
622 configengines = ui.configlist('server', 'compressionengines')
623 configengines = ui.configlist('server', 'compressionengines')
623 config = 'server.compressionengines'
624 config = 'server.compressionengines'
624 else:
625 else:
625 # This is currently implemented mainly to facilitate testing. In most
626 # This is currently implemented mainly to facilitate testing. In most
626 # cases, the server should be in charge of choosing a compression engine
627 # cases, the server should be in charge of choosing a compression engine
627 # because a server has the most to lose from a sub-optimal choice. (e.g.
628 # because a server has the most to lose from a sub-optimal choice. (e.g.
628 # CPU DoS due to an expensive engine or a network DoS due to poor
629 # CPU DoS due to an expensive engine or a network DoS due to poor
629 # compression ratio).
630 # compression ratio).
630 configengines = ui.configlist('experimental',
631 configengines = ui.configlist('experimental',
631 'clientcompressionengines')
632 'clientcompressionengines')
632 config = 'experimental.clientcompressionengines'
633 config = 'experimental.clientcompressionengines'
633
634
634 # No explicit config. Filter out the ones that aren't supposed to be
635 # No explicit config. Filter out the ones that aren't supposed to be
635 # advertised and return default ordering.
636 # advertised and return default ordering.
636 if not configengines:
637 if not configengines:
637 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
638 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
638 return [e for e in compengines
639 return [e for e in compengines
639 if getattr(e.wireprotosupport(), attr) > 0]
640 if getattr(e.wireprotosupport(), attr) > 0]
640
641
641 # If compression engines are listed in the config, assume there is a good
642 # If compression engines are listed in the config, assume there is a good
642 # reason for it (like server operators wanting to achieve specific
643 # reason for it (like server operators wanting to achieve specific
643 # performance characteristics). So fail fast if the config references
644 # performance characteristics). So fail fast if the config references
644 # unusable compression engines.
645 # unusable compression engines.
645 validnames = set(e.name() for e in compengines)
646 validnames = set(e.name() for e in compengines)
646 invalidnames = set(e for e in configengines if e not in validnames)
647 invalidnames = set(e for e in configengines if e not in validnames)
647 if invalidnames:
648 if invalidnames:
648 raise error.Abort(_('invalid compression engine defined in %s: %s') %
649 raise error.Abort(_('invalid compression engine defined in %s: %s') %
649 (config, ', '.join(sorted(invalidnames))))
650 (config, ', '.join(sorted(invalidnames))))
650
651
651 compengines = [e for e in compengines if e.name() in configengines]
652 compengines = [e for e in compengines if e.name() in configengines]
652 compengines = sorted(compengines,
653 compengines = sorted(compengines,
653 key=lambda e: configengines.index(e.name()))
654 key=lambda e: configengines.index(e.name()))
654
655
655 if not compengines:
656 if not compengines:
656 raise error.Abort(_('%s config option does not specify any known '
657 raise error.Abort(_('%s config option does not specify any known '
657 'compression engines') % config,
658 'compression engines') % config,
658 hint=_('usable compression engines: %s') %
659 hint=_('usable compression engines: %s') %
659 ', '.sorted(validnames))
660 ', '.sorted(validnames))
660
661
661 return compengines
662 return compengines
662
663
663 # list of commands
664 # list of commands
664 commands = {}
665 commands = {}
665
666
666 def wireprotocommand(name, args=''):
667 def wireprotocommand(name, args=''):
667 """decorator for wire protocol command"""
668 """decorator for wire protocol command"""
668 def register(func):
669 def register(func):
669 commands[name] = (func, args)
670 commands[name] = (func, args)
670 return func
671 return func
671 return register
672 return register
672
673
673 @wireprotocommand('batch', 'cmds *')
674 @wireprotocommand('batch', 'cmds *')
674 def batch(repo, proto, cmds, others):
675 def batch(repo, proto, cmds, others):
675 repo = repo.filtered("served")
676 repo = repo.filtered("served")
676 res = []
677 res = []
677 for pair in cmds.split(';'):
678 for pair in cmds.split(';'):
678 op, args = pair.split(' ', 1)
679 op, args = pair.split(' ', 1)
679 vals = {}
680 vals = {}
680 for a in args.split(','):
681 for a in args.split(','):
681 if a:
682 if a:
682 n, v = a.split('=')
683 n, v = a.split('=')
683 vals[unescapearg(n)] = unescapearg(v)
684 vals[unescapearg(n)] = unescapearg(v)
684 func, spec = commands[op]
685 func, spec = commands[op]
685 if spec:
686 if spec:
686 keys = spec.split()
687 keys = spec.split()
687 data = {}
688 data = {}
688 for k in keys:
689 for k in keys:
689 if k == '*':
690 if k == '*':
690 star = {}
691 star = {}
691 for key in vals.keys():
692 for key in vals.keys():
692 if key not in keys:
693 if key not in keys:
693 star[key] = vals[key]
694 star[key] = vals[key]
694 data['*'] = star
695 data['*'] = star
695 else:
696 else:
696 data[k] = vals[k]
697 data[k] = vals[k]
697 result = func(repo, proto, *[data[k] for k in keys])
698 result = func(repo, proto, *[data[k] for k in keys])
698 else:
699 else:
699 result = func(repo, proto)
700 result = func(repo, proto)
700 if isinstance(result, ooberror):
701 if isinstance(result, ooberror):
701 return result
702 return result
702 res.append(escapearg(result))
703 res.append(escapearg(result))
703 return ';'.join(res)
704 return ';'.join(res)
704
705
705 @wireprotocommand('between', 'pairs')
706 @wireprotocommand('between', 'pairs')
706 def between(repo, proto, pairs):
707 def between(repo, proto, pairs):
707 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
708 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
708 r = []
709 r = []
709 for b in repo.between(pairs):
710 for b in repo.between(pairs):
710 r.append(encodelist(b) + "\n")
711 r.append(encodelist(b) + "\n")
711 return "".join(r)
712 return "".join(r)
712
713
713 @wireprotocommand('branchmap')
714 @wireprotocommand('branchmap')
714 def branchmap(repo, proto):
715 def branchmap(repo, proto):
715 branchmap = repo.branchmap()
716 branchmap = repo.branchmap()
716 heads = []
717 heads = []
717 for branch, nodes in branchmap.iteritems():
718 for branch, nodes in branchmap.iteritems():
718 branchname = urlreq.quote(encoding.fromlocal(branch))
719 branchname = urlreq.quote(encoding.fromlocal(branch))
719 branchnodes = encodelist(nodes)
720 branchnodes = encodelist(nodes)
720 heads.append('%s %s' % (branchname, branchnodes))
721 heads.append('%s %s' % (branchname, branchnodes))
721 return '\n'.join(heads)
722 return '\n'.join(heads)
722
723
723 @wireprotocommand('branches', 'nodes')
724 @wireprotocommand('branches', 'nodes')
724 def branches(repo, proto, nodes):
725 def branches(repo, proto, nodes):
725 nodes = decodelist(nodes)
726 nodes = decodelist(nodes)
726 r = []
727 r = []
727 for b in repo.branches(nodes):
728 for b in repo.branches(nodes):
728 r.append(encodelist(b) + "\n")
729 r.append(encodelist(b) + "\n")
729 return "".join(r)
730 return "".join(r)
730
731
731 @wireprotocommand('clonebundles', '')
732 @wireprotocommand('clonebundles', '')
732 def clonebundles(repo, proto):
733 def clonebundles(repo, proto):
733 """Server command for returning info for available bundles to seed clones.
734 """Server command for returning info for available bundles to seed clones.
734
735
735 Clients will parse this response and determine what bundle to fetch.
736 Clients will parse this response and determine what bundle to fetch.
736
737
737 Extensions may wrap this command to filter or dynamically emit data
738 Extensions may wrap this command to filter or dynamically emit data
738 depending on the request. e.g. you could advertise URLs for the closest
739 depending on the request. e.g. you could advertise URLs for the closest
739 data center given the client's IP address.
740 data center given the client's IP address.
740 """
741 """
741 return repo.vfs.tryread('clonebundles.manifest')
742 return repo.vfs.tryread('clonebundles.manifest')
742
743
743 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
744 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
744 'known', 'getbundle', 'unbundlehash', 'batch']
745 'known', 'getbundle', 'unbundlehash', 'batch']
745
746
746 def _capabilities(repo, proto):
747 def _capabilities(repo, proto):
747 """return a list of capabilities for a repo
748 """return a list of capabilities for a repo
748
749
749 This function exists to allow extensions to easily wrap capabilities
750 This function exists to allow extensions to easily wrap capabilities
750 computation
751 computation
751
752
752 - returns a lists: easy to alter
753 - returns a lists: easy to alter
753 - change done here will be propagated to both `capabilities` and `hello`
754 - change done here will be propagated to both `capabilities` and `hello`
754 command without any other action needed.
755 command without any other action needed.
755 """
756 """
756 # copy to prevent modification of the global list
757 # copy to prevent modification of the global list
757 caps = list(wireprotocaps)
758 caps = list(wireprotocaps)
758 if streamclone.allowservergeneration(repo):
759 if streamclone.allowservergeneration(repo):
759 if repo.ui.configbool('server', 'preferuncompressed'):
760 if repo.ui.configbool('server', 'preferuncompressed'):
760 caps.append('stream-preferred')
761 caps.append('stream-preferred')
761 requiredformats = repo.requirements & repo.supportedformats
762 requiredformats = repo.requirements & repo.supportedformats
762 # if our local revlogs are just revlogv1, add 'stream' cap
763 # if our local revlogs are just revlogv1, add 'stream' cap
763 if not requiredformats - {'revlogv1'}:
764 if not requiredformats - {'revlogv1'}:
764 caps.append('stream')
765 caps.append('stream')
765 # otherwise, add 'streamreqs' detailing our local revlog format
766 # otherwise, add 'streamreqs' detailing our local revlog format
766 else:
767 else:
767 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
768 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
768 if repo.ui.configbool('experimental', 'bundle2-advertise'):
769 if repo.ui.configbool('experimental', 'bundle2-advertise'):
769 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
770 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
770 caps.append('bundle2=' + urlreq.quote(capsblob))
771 caps.append('bundle2=' + urlreq.quote(capsblob))
771 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
772 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
772
773
773 if proto.name == 'http':
774 if proto.name == 'http':
774 caps.append('httpheader=%d' %
775 caps.append('httpheader=%d' %
775 repo.ui.configint('server', 'maxhttpheaderlen'))
776 repo.ui.configint('server', 'maxhttpheaderlen'))
776 if repo.ui.configbool('experimental', 'httppostargs'):
777 if repo.ui.configbool('experimental', 'httppostargs'):
777 caps.append('httppostargs')
778 caps.append('httppostargs')
778
779
779 # FUTURE advertise 0.2rx once support is implemented
780 # FUTURE advertise 0.2rx once support is implemented
780 # FUTURE advertise minrx and mintx after consulting config option
781 # FUTURE advertise minrx and mintx after consulting config option
781 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
782 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
782
783
783 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
784 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
784 if compengines:
785 if compengines:
785 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
786 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
786 for e in compengines)
787 for e in compengines)
787 caps.append('compression=%s' % comptypes)
788 caps.append('compression=%s' % comptypes)
788
789
789 return caps
790 return caps
790
791
791 # If you are writing an extension and consider wrapping this function. Wrap
792 # If you are writing an extension and consider wrapping this function. Wrap
792 # `_capabilities` instead.
793 # `_capabilities` instead.
793 @wireprotocommand('capabilities')
794 @wireprotocommand('capabilities')
794 def capabilities(repo, proto):
795 def capabilities(repo, proto):
795 return ' '.join(_capabilities(repo, proto))
796 return ' '.join(_capabilities(repo, proto))
796
797
797 @wireprotocommand('changegroup', 'roots')
798 @wireprotocommand('changegroup', 'roots')
798 def changegroup(repo, proto, roots):
799 def changegroup(repo, proto, roots):
799 nodes = decodelist(roots)
800 nodes = decodelist(roots)
800 outgoing = discovery.outgoing(repo, missingroots=nodes,
801 outgoing = discovery.outgoing(repo, missingroots=nodes,
801 missingheads=repo.heads())
802 missingheads=repo.heads())
802 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
803 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
803 return streamres(reader=cg, v1compressible=True)
804 return streamres(reader=cg, v1compressible=True)
804
805
805 @wireprotocommand('changegroupsubset', 'bases heads')
806 @wireprotocommand('changegroupsubset', 'bases heads')
806 def changegroupsubset(repo, proto, bases, heads):
807 def changegroupsubset(repo, proto, bases, heads):
807 bases = decodelist(bases)
808 bases = decodelist(bases)
808 heads = decodelist(heads)
809 heads = decodelist(heads)
809 outgoing = discovery.outgoing(repo, missingroots=bases,
810 outgoing = discovery.outgoing(repo, missingroots=bases,
810 missingheads=heads)
811 missingheads=heads)
811 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
812 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
812 return streamres(reader=cg, v1compressible=True)
813 return streamres(reader=cg, v1compressible=True)
813
814
814 @wireprotocommand('debugwireargs', 'one two *')
815 @wireprotocommand('debugwireargs', 'one two *')
815 def debugwireargs(repo, proto, one, two, others):
816 def debugwireargs(repo, proto, one, two, others):
816 # only accept optional args from the known set
817 # only accept optional args from the known set
817 opts = options('debugwireargs', ['three', 'four'], others)
818 opts = options('debugwireargs', ['three', 'four'], others)
818 return repo.debugwireargs(one, two, **opts)
819 return repo.debugwireargs(one, two, **opts)
819
820
820 @wireprotocommand('getbundle', '*')
821 @wireprotocommand('getbundle', '*')
821 def getbundle(repo, proto, others):
822 def getbundle(repo, proto, others):
822 opts = options('getbundle', gboptsmap.keys(), others)
823 opts = options('getbundle', gboptsmap.keys(), others)
823 for k, v in opts.iteritems():
824 for k, v in opts.iteritems():
824 keytype = gboptsmap[k]
825 keytype = gboptsmap[k]
825 if keytype == 'nodes':
826 if keytype == 'nodes':
826 opts[k] = decodelist(v)
827 opts[k] = decodelist(v)
827 elif keytype == 'csv':
828 elif keytype == 'csv':
828 opts[k] = list(v.split(','))
829 opts[k] = list(v.split(','))
829 elif keytype == 'scsv':
830 elif keytype == 'scsv':
830 opts[k] = set(v.split(','))
831 opts[k] = set(v.split(','))
831 elif keytype == 'boolean':
832 elif keytype == 'boolean':
832 # Client should serialize False as '0', which is a non-empty string
833 # Client should serialize False as '0', which is a non-empty string
833 # so it evaluates as a True bool.
834 # so it evaluates as a True bool.
834 if v == '0':
835 if v == '0':
835 opts[k] = False
836 opts[k] = False
836 else:
837 else:
837 opts[k] = bool(v)
838 opts[k] = bool(v)
838 elif keytype != 'plain':
839 elif keytype != 'plain':
839 raise KeyError('unknown getbundle option type %s'
840 raise KeyError('unknown getbundle option type %s'
840 % keytype)
841 % keytype)
841
842
842 if not bundle1allowed(repo, 'pull'):
843 if not bundle1allowed(repo, 'pull'):
843 if not exchange.bundle2requested(opts.get('bundlecaps')):
844 if not exchange.bundle2requested(opts.get('bundlecaps')):
844 if proto.name == 'http':
845 if proto.name == 'http':
845 return ooberror(bundle2required)
846 return ooberror(bundle2required)
846 raise error.Abort(bundle2requiredmain,
847 raise error.Abort(bundle2requiredmain,
847 hint=bundle2requiredhint)
848 hint=bundle2requiredhint)
848
849
849 try:
850 try:
850 if repo.ui.configbool('server', 'disablefullbundle'):
851 if repo.ui.configbool('server', 'disablefullbundle'):
851 # Check to see if this is a full clone.
852 # Check to see if this is a full clone.
852 clheads = set(repo.changelog.heads())
853 clheads = set(repo.changelog.heads())
853 heads = set(opts.get('heads', set()))
854 heads = set(opts.get('heads', set()))
854 common = set(opts.get('common', set()))
855 common = set(opts.get('common', set()))
855 common.discard(nullid)
856 common.discard(nullid)
856 if not common and clheads == heads:
857 if not common and clheads == heads:
857 raise error.Abort(
858 raise error.Abort(
858 _('server has pull-based clones disabled'),
859 _('server has pull-based clones disabled'),
859 hint=_('remove --pull if specified or upgrade Mercurial'))
860 hint=_('remove --pull if specified or upgrade Mercurial'))
860
861
861 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
862 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
862 except error.Abort as exc:
863 except error.Abort as exc:
863 # cleanly forward Abort error to the client
864 # cleanly forward Abort error to the client
864 if not exchange.bundle2requested(opts.get('bundlecaps')):
865 if not exchange.bundle2requested(opts.get('bundlecaps')):
865 if proto.name == 'http':
866 if proto.name == 'http':
866 return ooberror(str(exc) + '\n')
867 return ooberror(str(exc) + '\n')
867 raise # cannot do better for bundle1 + ssh
868 raise # cannot do better for bundle1 + ssh
868 # bundle2 request expect a bundle2 reply
869 # bundle2 request expect a bundle2 reply
869 bundler = bundle2.bundle20(repo.ui)
870 bundler = bundle2.bundle20(repo.ui)
870 manargs = [('message', str(exc))]
871 manargs = [('message', str(exc))]
871 advargs = []
872 advargs = []
872 if exc.hint is not None:
873 if exc.hint is not None:
873 advargs.append(('hint', exc.hint))
874 advargs.append(('hint', exc.hint))
874 bundler.addpart(bundle2.bundlepart('error:abort',
875 bundler.addpart(bundle2.bundlepart('error:abort',
875 manargs, advargs))
876 manargs, advargs))
876 return streamres(gen=bundler.getchunks(), v1compressible=True)
877 return streamres(gen=bundler.getchunks(), v1compressible=True)
877 return streamres(gen=chunks, v1compressible=True)
878 return streamres(gen=chunks, v1compressible=True)
878
879
879 @wireprotocommand('heads')
880 @wireprotocommand('heads')
880 def heads(repo, proto):
881 def heads(repo, proto):
881 h = repo.heads()
882 h = repo.heads()
882 return encodelist(h) + "\n"
883 return encodelist(h) + "\n"
883
884
884 @wireprotocommand('hello')
885 @wireprotocommand('hello')
885 def hello(repo, proto):
886 def hello(repo, proto):
886 '''the hello command returns a set of lines describing various
887 '''the hello command returns a set of lines describing various
887 interesting things about the server, in an RFC822-like format.
888 interesting things about the server, in an RFC822-like format.
888 Currently the only one defined is "capabilities", which
889 Currently the only one defined is "capabilities", which
889 consists of a line in the form:
890 consists of a line in the form:
890
891
891 capabilities: space separated list of tokens
892 capabilities: space separated list of tokens
892 '''
893 '''
893 return "capabilities: %s\n" % (capabilities(repo, proto))
894 return "capabilities: %s\n" % (capabilities(repo, proto))
894
895
895 @wireprotocommand('listkeys', 'namespace')
896 @wireprotocommand('listkeys', 'namespace')
896 def listkeys(repo, proto, namespace):
897 def listkeys(repo, proto, namespace):
897 d = repo.listkeys(encoding.tolocal(namespace)).items()
898 d = repo.listkeys(encoding.tolocal(namespace)).items()
898 return pushkeymod.encodekeys(d)
899 return pushkeymod.encodekeys(d)
899
900
900 @wireprotocommand('lookup', 'key')
901 @wireprotocommand('lookup', 'key')
901 def lookup(repo, proto, key):
902 def lookup(repo, proto, key):
902 try:
903 try:
903 k = encoding.tolocal(key)
904 k = encoding.tolocal(key)
904 c = repo[k]
905 c = repo[k]
905 r = c.hex()
906 r = c.hex()
906 success = 1
907 success = 1
907 except Exception as inst:
908 except Exception as inst:
908 r = str(inst)
909 r = str(inst)
909 success = 0
910 success = 0
910 return "%d %s\n" % (success, r)
911 return "%d %s\n" % (success, r)
911
912
912 @wireprotocommand('known', 'nodes *')
913 @wireprotocommand('known', 'nodes *')
913 def known(repo, proto, nodes, others):
914 def known(repo, proto, nodes, others):
914 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
915 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
915
916
916 @wireprotocommand('pushkey', 'namespace key old new')
917 @wireprotocommand('pushkey', 'namespace key old new')
917 def pushkey(repo, proto, namespace, key, old, new):
918 def pushkey(repo, proto, namespace, key, old, new):
918 # compatibility with pre-1.8 clients which were accidentally
919 # compatibility with pre-1.8 clients which were accidentally
919 # sending raw binary nodes rather than utf-8-encoded hex
920 # sending raw binary nodes rather than utf-8-encoded hex
920 if len(new) == 20 and util.escapestr(new) != new:
921 if len(new) == 20 and util.escapestr(new) != new:
921 # looks like it could be a binary node
922 # looks like it could be a binary node
922 try:
923 try:
923 new.decode('utf-8')
924 new.decode('utf-8')
924 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
925 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
925 except UnicodeDecodeError:
926 except UnicodeDecodeError:
926 pass # binary, leave unmodified
927 pass # binary, leave unmodified
927 else:
928 else:
928 new = encoding.tolocal(new) # normal path
929 new = encoding.tolocal(new) # normal path
929
930
930 if util.safehasattr(proto, 'restore'):
931 if util.safehasattr(proto, 'restore'):
931
932
932 proto.redirect()
933 proto.redirect()
933
934
934 try:
935 try:
935 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
936 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
936 encoding.tolocal(old), new) or False
937 encoding.tolocal(old), new) or False
937 except error.Abort:
938 except error.Abort:
938 r = False
939 r = False
939
940
940 output = proto.restore()
941 output = proto.restore()
941
942
942 return '%s\n%s' % (int(r), output)
943 return '%s\n%s' % (int(r), output)
943
944
944 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
945 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
945 encoding.tolocal(old), new)
946 encoding.tolocal(old), new)
946 return '%s\n' % int(r)
947 return '%s\n' % int(r)
947
948
948 @wireprotocommand('stream_out')
949 @wireprotocommand('stream_out')
949 def stream(repo, proto):
950 def stream(repo, proto):
950 '''If the server supports streaming clone, it advertises the "stream"
951 '''If the server supports streaming clone, it advertises the "stream"
951 capability with a value representing the version and flags of the repo
952 capability with a value representing the version and flags of the repo
952 it is serving. Client checks to see if it understands the format.
953 it is serving. Client checks to see if it understands the format.
953 '''
954 '''
954 if not streamclone.allowservergeneration(repo):
955 if not streamclone.allowservergeneration(repo):
955 return '1\n'
956 return '1\n'
956
957
957 def getstream(it):
958 def getstream(it):
958 yield '0\n'
959 yield '0\n'
959 for chunk in it:
960 for chunk in it:
960 yield chunk
961 yield chunk
961
962
962 try:
963 try:
963 # LockError may be raised before the first result is yielded. Don't
964 # LockError may be raised before the first result is yielded. Don't
964 # emit output until we're sure we got the lock successfully.
965 # emit output until we're sure we got the lock successfully.
965 it = streamclone.generatev1wireproto(repo)
966 it = streamclone.generatev1wireproto(repo)
966 return streamres(gen=getstream(it))
967 return streamres(gen=getstream(it))
967 except error.LockError:
968 except error.LockError:
968 return '2\n'
969 return '2\n'
969
970
970 @wireprotocommand('unbundle', 'heads')
971 @wireprotocommand('unbundle', 'heads')
971 def unbundle(repo, proto, heads):
972 def unbundle(repo, proto, heads):
972 their_heads = decodelist(heads)
973 their_heads = decodelist(heads)
973
974
974 try:
975 try:
975 proto.redirect()
976 proto.redirect()
976
977
977 exchange.check_heads(repo, their_heads, 'preparing changes')
978 exchange.check_heads(repo, their_heads, 'preparing changes')
978
979
979 # write bundle data to temporary file because it can be big
980 # write bundle data to temporary file because it can be big
980 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
981 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
981 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
982 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
982 r = 0
983 r = 0
983 try:
984 try:
984 proto.getfile(fp)
985 proto.getfile(fp)
985 fp.seek(0)
986 fp.seek(0)
986 gen = exchange.readbundle(repo.ui, fp, None)
987 gen = exchange.readbundle(repo.ui, fp, None)
987 if (isinstance(gen, changegroupmod.cg1unpacker)
988 if (isinstance(gen, changegroupmod.cg1unpacker)
988 and not bundle1allowed(repo, 'push')):
989 and not bundle1allowed(repo, 'push')):
989 if proto.name == 'http':
990 if proto.name == 'http':
990 # need to special case http because stderr do not get to
991 # need to special case http because stderr do not get to
991 # the http client on failed push so we need to abuse some
992 # the http client on failed push so we need to abuse some
992 # other error type to make sure the message get to the
993 # other error type to make sure the message get to the
993 # user.
994 # user.
994 return ooberror(bundle2required)
995 return ooberror(bundle2required)
995 raise error.Abort(bundle2requiredmain,
996 raise error.Abort(bundle2requiredmain,
996 hint=bundle2requiredhint)
997 hint=bundle2requiredhint)
997
998
998 r = exchange.unbundle(repo, gen, their_heads, 'serve',
999 r = exchange.unbundle(repo, gen, their_heads, 'serve',
999 proto._client())
1000 proto._client())
1000 if util.safehasattr(r, 'addpart'):
1001 if util.safehasattr(r, 'addpart'):
1001 # The return looks streamable, we are in the bundle2 case and
1002 # The return looks streamable, we are in the bundle2 case and
1002 # should return a stream.
1003 # should return a stream.
1003 return streamres(gen=r.getchunks())
1004 return streamres(gen=r.getchunks())
1004 return pushres(r)
1005 return pushres(r)
1005
1006
1006 finally:
1007 finally:
1007 fp.close()
1008 fp.close()
1008 os.unlink(tempname)
1009 os.unlink(tempname)
1009
1010
1010 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1011 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1011 # handle non-bundle2 case first
1012 # handle non-bundle2 case first
1012 if not getattr(exc, 'duringunbundle2', False):
1013 if not getattr(exc, 'duringunbundle2', False):
1013 try:
1014 try:
1014 raise
1015 raise
1015 except error.Abort:
1016 except error.Abort:
1016 # The old code we moved used util.stderr directly.
1017 # The old code we moved used util.stderr directly.
1017 # We did not change it to minimise code change.
1018 # We did not change it to minimise code change.
1018 # This need to be moved to something proper.
1019 # This need to be moved to something proper.
1019 # Feel free to do it.
1020 # Feel free to do it.
1020 util.stderr.write("abort: %s\n" % exc)
1021 util.stderr.write("abort: %s\n" % exc)
1021 if exc.hint is not None:
1022 if exc.hint is not None:
1022 util.stderr.write("(%s)\n" % exc.hint)
1023 util.stderr.write("(%s)\n" % exc.hint)
1023 return pushres(0)
1024 return pushres(0)
1024 except error.PushRaced:
1025 except error.PushRaced:
1025 return pusherr(str(exc))
1026 return pusherr(str(exc))
1026
1027
1027 bundler = bundle2.bundle20(repo.ui)
1028 bundler = bundle2.bundle20(repo.ui)
1028 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1029 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1029 bundler.addpart(out)
1030 bundler.addpart(out)
1030 try:
1031 try:
1031 try:
1032 try:
1032 raise
1033 raise
1033 except error.PushkeyFailed as exc:
1034 except error.PushkeyFailed as exc:
1034 # check client caps
1035 # check client caps
1035 remotecaps = getattr(exc, '_replycaps', None)
1036 remotecaps = getattr(exc, '_replycaps', None)
1036 if (remotecaps is not None
1037 if (remotecaps is not None
1037 and 'pushkey' not in remotecaps.get('error', ())):
1038 and 'pushkey' not in remotecaps.get('error', ())):
1038 # no support remote side, fallback to Abort handler.
1039 # no support remote side, fallback to Abort handler.
1039 raise
1040 raise
1040 part = bundler.newpart('error:pushkey')
1041 part = bundler.newpart('error:pushkey')
1041 part.addparam('in-reply-to', exc.partid)
1042 part.addparam('in-reply-to', exc.partid)
1042 if exc.namespace is not None:
1043 if exc.namespace is not None:
1043 part.addparam('namespace', exc.namespace, mandatory=False)
1044 part.addparam('namespace', exc.namespace, mandatory=False)
1044 if exc.key is not None:
1045 if exc.key is not None:
1045 part.addparam('key', exc.key, mandatory=False)
1046 part.addparam('key', exc.key, mandatory=False)
1046 if exc.new is not None:
1047 if exc.new is not None:
1047 part.addparam('new', exc.new, mandatory=False)
1048 part.addparam('new', exc.new, mandatory=False)
1048 if exc.old is not None:
1049 if exc.old is not None:
1049 part.addparam('old', exc.old, mandatory=False)
1050 part.addparam('old', exc.old, mandatory=False)
1050 if exc.ret is not None:
1051 if exc.ret is not None:
1051 part.addparam('ret', exc.ret, mandatory=False)
1052 part.addparam('ret', exc.ret, mandatory=False)
1052 except error.BundleValueError as exc:
1053 except error.BundleValueError as exc:
1053 errpart = bundler.newpart('error:unsupportedcontent')
1054 errpart = bundler.newpart('error:unsupportedcontent')
1054 if exc.parttype is not None:
1055 if exc.parttype is not None:
1055 errpart.addparam('parttype', exc.parttype)
1056 errpart.addparam('parttype', exc.parttype)
1056 if exc.params:
1057 if exc.params:
1057 errpart.addparam('params', '\0'.join(exc.params))
1058 errpart.addparam('params', '\0'.join(exc.params))
1058 except error.Abort as exc:
1059 except error.Abort as exc:
1059 manargs = [('message', str(exc))]
1060 manargs = [('message', str(exc))]
1060 advargs = []
1061 advargs = []
1061 if exc.hint is not None:
1062 if exc.hint is not None:
1062 advargs.append(('hint', exc.hint))
1063 advargs.append(('hint', exc.hint))
1063 bundler.addpart(bundle2.bundlepart('error:abort',
1064 bundler.addpart(bundle2.bundlepart('error:abort',
1064 manargs, advargs))
1065 manargs, advargs))
1065 except error.PushRaced as exc:
1066 except error.PushRaced as exc:
1066 bundler.newpart('error:pushraced', [('message', str(exc))])
1067 bundler.newpart('error:pushraced', [('message', str(exc))])
1067 return streamres(gen=bundler.getchunks())
1068 return streamres(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now